mirror of
https://github.com/KenanZhu/AutoLibrary.git
synced 2026-06-18 07:23:03 +08:00
chore(*): refactor the project structure
This commit is contained in:
@@ -0,0 +1,283 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2025 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
import os
|
||||
import queue
|
||||
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.edge.service import Service
|
||||
|
||||
from base.MsgBase import MsgBase
|
||||
from operators.LibChecker import LibChecker
|
||||
from operators.LibLogin import LibLogin
|
||||
from operators.LibLogout import LibLogout
|
||||
from operators.LibReserve import LibReserve
|
||||
from operators.LibCheckin import LibCheckin
|
||||
|
||||
from utils.ConfigReader import ConfigReader
|
||||
|
||||
|
||||
class AutoLib(MsgBase):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_queue: queue.Queue,
|
||||
output_queue: queue.Queue
|
||||
):
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
self.__system_config_reader = None
|
||||
self.__users_config_reader = None
|
||||
self.__driver = None
|
||||
|
||||
|
||||
def __initBrowserDriver(
|
||||
self
|
||||
) -> bool:
|
||||
|
||||
self._showTrace("正在初始化浏览器驱动......")
|
||||
edge_options = webdriver.EdgeOptions()
|
||||
|
||||
if self.__system_config_reader.get("web_driver/headless"):
|
||||
edge_options.add_argument("--headless")
|
||||
edge_options.add_argument("--disable-gpu")
|
||||
edge_options.add_argument("--no-sandbox")
|
||||
edge_options.add_argument("--disable-dev-shm-usage")
|
||||
|
||||
# must be 1920x1080, otherwise the page will cause some elements not accessible
|
||||
edge_options.add_argument("--window-size=1920,1080")
|
||||
edge_options.add_argument("--remote-allow-origins=*")
|
||||
|
||||
# omit ssl errors and verbose log level
|
||||
edge_options.add_argument("--ignore-certificate-errors")
|
||||
edge_options.add_argument("--ignore-ssl-errors")
|
||||
edge_options.add_argument("--log-level=OFF")
|
||||
edge_options.add_argument("--silent")
|
||||
|
||||
edge_options.add_experimental_option("excludeSwitches", ["enable-automation"])
|
||||
edge_options.add_experimental_option("useAutomationExtension", False)
|
||||
edge_options.add_argument("--disable-blink-features=AutomationControlled")
|
||||
edge_options.add_argument(
|
||||
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "\
|
||||
"AppleWebKit/537.36 (KHTML, like Gecko) "\
|
||||
"Chrome/120.0.0.0 "\
|
||||
"Safari/537.36 "\
|
||||
"Edg/120.0.0.0"
|
||||
)
|
||||
|
||||
# init browser driver
|
||||
self.__driver_path = self.__system_config_reader.get("web_driver/driver_path")
|
||||
self.__driver_type = self.__system_config_reader.get("web_driver/driver_type")
|
||||
self.__driver_path = os.path.abspath(self.__driver_path)
|
||||
try:
|
||||
service = None
|
||||
if self.__driver_path:
|
||||
service = Service(executable_path=self.__driver_path)
|
||||
match self.__driver_type.lower():
|
||||
case "edge":
|
||||
self.__driver = webdriver.Edge(service=service, options=edge_options)
|
||||
case "chrome":
|
||||
self.__driver = webdriver.Chrome(service=service, options=edge_options)
|
||||
case "firefox":
|
||||
self.__driver = webdriver.Firefox(service=service, options=edge_options)
|
||||
case _:
|
||||
raise Exception(f"不支持的浏览器驱动类型: {self.__driver_type}")
|
||||
self.__driver.implicitly_wait(1)
|
||||
self.__driver.execute_script(
|
||||
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
|
||||
)
|
||||
except Exception as e:
|
||||
self._showTrace(f"浏览器驱动初始化失败: {e}")
|
||||
return False
|
||||
self._showTrace(f"浏览器驱动已初始化, 类型: {self.__driver_type}, 路径: {self.__driver_path}")
|
||||
return True
|
||||
|
||||
|
||||
def __initLibOperators(
|
||||
self
|
||||
):
|
||||
|
||||
if not self.__driver:
|
||||
self._showTrace(f"浏览器驱动未初始化, 请先初始化浏览器驱动 !")
|
||||
return
|
||||
self.__lib_checker = LibChecker(self._input_queue, self._output_queue, self.__driver)
|
||||
self.__lib_login = LibLogin(self._input_queue, self._output_queue, self.__driver)
|
||||
self.__lib_logout = LibLogout(self._input_queue, self._output_queue, self.__driver)
|
||||
self.__lib_reserve = LibReserve(self._input_queue, self._output_queue, self.__driver)
|
||||
self.__lib_checkin = LibCheckin(self._input_queue, self._output_queue, self.__driver)
|
||||
|
||||
|
||||
def __waitResponseLoad(
|
||||
self
|
||||
) -> bool:
|
||||
|
||||
# wait for page load
|
||||
try:
|
||||
WebDriverWait(self.__driver, 2).until( # title contains "首页"
|
||||
EC.title_contains("首页")
|
||||
)
|
||||
WebDriverWait(self.__driver, 2).until( # username field presence
|
||||
EC.presence_of_element_located((By.NAME, "username"))
|
||||
)
|
||||
WebDriverWait(self.__driver, 2).until( # password field presence
|
||||
EC.presence_of_element_located((By.NAME, "password"))
|
||||
)
|
||||
WebDriverWait(self.__driver, 2).until( # captcha field presence
|
||||
EC.presence_of_element_located((By.NAME, "answer"))
|
||||
)
|
||||
WebDriverWait(self.__driver, 2).until( # captcha image presence
|
||||
EC.presence_of_element_located((By.ID, "loadImgId"))
|
||||
)
|
||||
return True
|
||||
except:
|
||||
self._showTrace(f"登录页面加载失败 !")
|
||||
return False
|
||||
|
||||
|
||||
def __initDriverUrl(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
url = self.__system_config_reader.get("library/host_url")
|
||||
url += self.__system_config_reader.get("library/login_url")
|
||||
self.__driver.get(url)
|
||||
if not self.__waitResponseLoad():
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def __run(
|
||||
self,
|
||||
username: str,
|
||||
password: str,
|
||||
reserve_info: dict
|
||||
) -> int:
|
||||
|
||||
# result : 0 - success, 1 - failed, 2 - passed
|
||||
result = 2
|
||||
|
||||
# login
|
||||
if not self.__lib_login.login(
|
||||
username,
|
||||
password,
|
||||
self.__system_config_reader.get("login/max_attempt", 5),
|
||||
self.__system_config_reader.get("login/auto_captcha", True),
|
||||
):
|
||||
return 1
|
||||
"""
|
||||
Here, we collect the run mode from the config file.
|
||||
"""
|
||||
run_mode = self.__system_config_reader.get("mode/run_mode", 0)
|
||||
run_mode = {
|
||||
"auto_reserve": run_mode&0x1,
|
||||
"auto_checkin": run_mode&0x2,
|
||||
"auto_renewal": run_mode&0x4,
|
||||
}
|
||||
# reserve
|
||||
if run_mode["auto_reserve"]:
|
||||
if self.__lib_checker.canReserve(reserve_info.get("date")):
|
||||
if self.__lib_reserve.reserve(reserve_info):
|
||||
self._showTrace(f"用户 {username} 预约成功 !")
|
||||
result = 0
|
||||
else:
|
||||
self._showTrace(f"用户 {username} 预约失败 !")
|
||||
result = 1
|
||||
else:
|
||||
self._showTrace(f"用户 {username} 无法预约,已跳过")
|
||||
result = 2
|
||||
# checkin
|
||||
if run_mode["auto_checkin"] and result == 2:
|
||||
if self.__lib_checker.canCheckin(reserve_info.get("date")):
|
||||
if self.__lib_checkin.checkin(username):
|
||||
self._showTrace(f"用户 {username} 签到成功 !")
|
||||
result = 0
|
||||
else:
|
||||
self._showTrace(f"用户 {username} 签到失败 !")
|
||||
result = 1
|
||||
else:
|
||||
self._showTrace(f"用户 {username} 无法签到,已跳过")
|
||||
result = 2
|
||||
# renewal
|
||||
if run_mode["auto_renewal"] and result == 2:
|
||||
if self.__lib_checker.canRenew(reserve_info.get("date")):
|
||||
pass
|
||||
else:
|
||||
self._showTrace(f"用户 {username} 无法续约,已跳过")
|
||||
result = 2
|
||||
# logout
|
||||
if not self.__lib_logout.logout(
|
||||
username,
|
||||
):
|
||||
# if logout is failed, we must make sure the host to be reloaded
|
||||
# otherwise, the next login may fail
|
||||
self.__driver.get(self.__system_config_reader.get("library/host_url"))
|
||||
return 1
|
||||
return result
|
||||
|
||||
|
||||
def run(
|
||||
self,
|
||||
system_config_reader: ConfigReader,
|
||||
users_config_reader: ConfigReader
|
||||
):
|
||||
|
||||
self.__system_config_reader = system_config_reader
|
||||
self.__users_config_reader = users_config_reader
|
||||
if not self.__initBrowserDriver():
|
||||
return
|
||||
else:
|
||||
if not self.__initDriverUrl():
|
||||
return
|
||||
self.__initLibOperators()
|
||||
|
||||
user_counter = {"current": 0, "success": 0, "failed": 0, "passed": 0}
|
||||
users = self.__users_config_reader.get("users")
|
||||
self._showTrace(
|
||||
f"共发现 {len(users)} 个用户, "\
|
||||
f"用户配置文件路径: {self.__users_config_reader.configPath()}"
|
||||
)
|
||||
for user in users:
|
||||
user_counter["current"] += 1
|
||||
self._showTrace(
|
||||
f"正在处理第 {user_counter["current"]}/{len(users)} 个用户: {user['username']}......"
|
||||
)
|
||||
r = self.__run(
|
||||
username=user["username"],
|
||||
password=user["password"],
|
||||
reserve_info=user["reserve_info"],
|
||||
)
|
||||
if r == 0:
|
||||
user_counter["success"] += 1
|
||||
elif r == 1:
|
||||
user_counter["failed"] += 1
|
||||
elif r == 2:
|
||||
user_counter["passed"] += 1
|
||||
self._showTrace(f"处理完成, 共计 {user_counter["current"]} 个用户, "\
|
||||
f"成功 {user_counter["success"]} 个用户, "\
|
||||
f"失败 {user_counter["failed"]} 个用户, "\
|
||||
f"跳过 {user_counter["passed"]} 个用户"
|
||||
)
|
||||
return
|
||||
|
||||
|
||||
def close(
|
||||
self
|
||||
) -> bool:
|
||||
|
||||
if self.__driver:
|
||||
self.__driver.quit()
|
||||
self.__driver = None
|
||||
self._showTrace(f"浏览器驱动已关闭")
|
||||
return True
|
||||
else:
|
||||
self._showTrace(f"浏览器驱动未初始化, 无需关闭")
|
||||
return False
|
||||
@@ -0,0 +1,333 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2025 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
import re
|
||||
import time
|
||||
import queue
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
from base.LibOperator import LibOperator
|
||||
|
||||
|
||||
class LibChecker(LibOperator):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_queue: queue.Queue,
|
||||
output_queue: queue.Queue,
|
||||
driver
|
||||
):
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
self.__driver = driver
|
||||
|
||||
|
||||
def _waitResponseLoad(
|
||||
self
|
||||
) -> bool:
|
||||
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
def __formatDiffTime(
|
||||
seconds: float
|
||||
) -> str:
|
||||
|
||||
hours = int(seconds//3600)
|
||||
minutes = int(seconds%3600//60)
|
||||
seconds = int(seconds%60)
|
||||
return f"{hours} 时 {minutes} 分 {seconds} 秒"
|
||||
|
||||
|
||||
def __navigateToReserveRecordPage(
|
||||
self
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.element_to_be_clickable((By.XPATH, "//a[@href='/history?type=SEAT']"))
|
||||
).click()
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.presence_of_element_located((By.CLASS_NAME, "myReserveList"))
|
||||
)
|
||||
except:
|
||||
self._showTrace("加载预约记录页面失败 !")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def __decodeReserveTime(
|
||||
self,
|
||||
time_element
|
||||
) -> dict:
|
||||
|
||||
time_str = time_element.text.strip()
|
||||
today = datetime.now().date()
|
||||
if "明天" in time_str:
|
||||
target_date = today + timedelta(days=1)
|
||||
date = target_date.strftime("%Y-%m-%d")
|
||||
elif "今天" in time_str:
|
||||
target_date = today
|
||||
date = target_date.strftime("%Y-%m-%d")
|
||||
elif "昨天" in time_str:
|
||||
target_date = today - timedelta(days=1)
|
||||
date = target_date.strftime("%Y-%m-%d")
|
||||
else:
|
||||
date_match = re.search(r"(\d{4}-\d{1,2}-\d{1,2})", time_str)
|
||||
if date_match:
|
||||
date = date_match.group(1)
|
||||
else:
|
||||
date = ""
|
||||
time_match = re.search(r"(\d{1,2}:\d{2}) -- (\d{1,2}:\d{2})", time_str)
|
||||
if time_match:
|
||||
begin_time = time_match.group(1)
|
||||
end_time = time_match.group(2)
|
||||
else:
|
||||
begin_time = ""
|
||||
end_time = ""
|
||||
return {
|
||||
"date": date,
|
||||
"time": {
|
||||
"begin": begin_time,
|
||||
"end": end_time
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
def __decodeReserveInfo(
|
||||
self,
|
||||
info_elements
|
||||
) -> str:
|
||||
|
||||
location = ""
|
||||
status = ""
|
||||
for info in info_elements:
|
||||
if "已预约" in info.text:
|
||||
status = "已预约"
|
||||
elif "使用中" in info.text:
|
||||
status = "使用中"
|
||||
elif "已完成" in info.text:
|
||||
status = "已完成"
|
||||
elif "已结束使用" in info.text:
|
||||
status = "已结束使用"
|
||||
elif "已取消" in info.text:
|
||||
status = "已取消"
|
||||
elif "失约" in info.text:
|
||||
status = "失约"
|
||||
elif "图书馆" in info.text:
|
||||
location = info.text.strip()
|
||||
return {
|
||||
"location": location,
|
||||
"status": status,
|
||||
}
|
||||
|
||||
|
||||
def __decodeReserveRecord(
|
||||
self,
|
||||
reservation
|
||||
) -> dict:
|
||||
|
||||
try:
|
||||
time_element = reservation.find_element(
|
||||
By.CSS_SELECTOR, "dt"
|
||||
)
|
||||
info_elements = reservation.find_elements(
|
||||
By.CSS_SELECTOR, "a"
|
||||
)
|
||||
except:
|
||||
return {
|
||||
"date": "",
|
||||
"time": {"begin": "", "end": ""},
|
||||
"info": {"location": "", "status": ""}
|
||||
}
|
||||
time = self.__decodeReserveTime(time_element)
|
||||
info = self.__decodeReserveInfo(info_elements)
|
||||
return {
|
||||
"date": time["date"],
|
||||
"time": time["time"],
|
||||
"info": info
|
||||
}
|
||||
|
||||
|
||||
def __loadReserveRecords(
|
||||
self
|
||||
) -> list:
|
||||
try:
|
||||
# check if there's any reservation on the date
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.presence_of_element_located((By.CSS_SELECTOR, ".myReserveList > dl"))
|
||||
)
|
||||
reservations = self.__driver.find_elements(
|
||||
By.CSS_SELECTOR, ".myReserveList > dl:not(#moreBlock)"
|
||||
)
|
||||
return reservations
|
||||
except:
|
||||
self._showTrace("加载预约记录失败 !")
|
||||
return None
|
||||
|
||||
|
||||
def __showMoreReserveRecords(
|
||||
self
|
||||
) -> bool:
|
||||
|
||||
# load new reservations if still not sure
|
||||
try:
|
||||
WebDriverWait(self.__driver, 0.1).until(
|
||||
EC.element_to_be_clickable((By.ID, "moreBtn"))
|
||||
)
|
||||
except:
|
||||
# the reservation is the last one
|
||||
return False
|
||||
try:
|
||||
more_btn = self.__driver.find_element(By.ID, "moreBtn")
|
||||
if more_btn.is_displayed() and more_btn.is_enabled():
|
||||
self.__driver.execute_script("arguments[0].scrollIntoView(true);", more_btn)
|
||||
self.__driver.execute_script("arguments[0].click();", more_btn)
|
||||
return True
|
||||
else:
|
||||
self._showTrace("用户无法加载更多预约记录")
|
||||
return False
|
||||
except:
|
||||
self._showTrace("加载更多预约记录失败 !")
|
||||
return False
|
||||
|
||||
|
||||
def __getReserveRecord(
|
||||
self,
|
||||
wanted_date: str,
|
||||
wanted_status: str
|
||||
) -> dict:
|
||||
|
||||
if wanted_date is None:
|
||||
self._showTrace("日期未指定, 无法检查当前预约状态")
|
||||
return None
|
||||
self._showTrace(f"正在检查用户在 {wanted_date} 是否有预约状态为 {wanted_status} 的预约记录......")
|
||||
|
||||
checked_count = 0
|
||||
max_check_times = 6 # we only check (4*(6-1)=)20 reservations, the last time cant be checked
|
||||
|
||||
if not self.__navigateToReserveRecordPage():
|
||||
return None
|
||||
for _ in range(max_check_times):
|
||||
reservations = self.__loadReserveRecords()
|
||||
if reservations is None:
|
||||
return None
|
||||
for reservation in reservations[checked_count:]:
|
||||
record = self.__decodeReserveRecord(reservation)
|
||||
checked_count += 1
|
||||
if record is None:
|
||||
continue
|
||||
if record["date"] == "":
|
||||
continue
|
||||
if record["time"] == {"begin": "", "end": ""}:
|
||||
continue
|
||||
# record date is later than the given date, check the next one
|
||||
if datetime.strptime(record["date"], "%Y-%m-%d").date() >\
|
||||
datetime.strptime(wanted_date, "%Y-%m-%d").date():
|
||||
continue
|
||||
# record date is earlier than the given date, so there is no wanted record
|
||||
if datetime.strptime(record["date"], "%Y-%m-%d").date() <\
|
||||
datetime.strptime(wanted_date, "%Y-%m-%d").date():
|
||||
return None
|
||||
if record["info"]["status"] == wanted_status:
|
||||
self._showTrace(
|
||||
f"寻找到用户第 {checked_count} 条状态为 {wanted_status} 的预约记录, "
|
||||
f"详细信息: {record["date"]} "
|
||||
f"{record["time"]["begin"]} - {record["time"]["end"]} {record["info"]["location"]}"
|
||||
)
|
||||
return record
|
||||
if not self.__showMoreReserveRecords():
|
||||
break
|
||||
return None
|
||||
|
||||
|
||||
def canReserve(
|
||||
self,
|
||||
date: str
|
||||
) -> bool:
|
||||
|
||||
# no reserved or using record in the given date
|
||||
# then can reserve
|
||||
if self.__getReserveRecord(date, "已预约") is None:
|
||||
if self.__getReserveRecord(date, "使用中") is None:
|
||||
self._showTrace(f"用户在 {date} 可以预约")
|
||||
return True
|
||||
self._showTrace(f"用户在 {date} 有使用中的预约, 无法预约")
|
||||
return False
|
||||
self._showTrace(f"用户在 {date} 已存在有效预约, 无法预约")
|
||||
return False
|
||||
|
||||
|
||||
def canCheckin(
|
||||
self,
|
||||
date: str
|
||||
) -> bool:
|
||||
|
||||
# have a reserved record in the given date
|
||||
record = self.__getReserveRecord(date, "已预约")
|
||||
if record is not None:
|
||||
begin_time = record["time"]["begin"]
|
||||
begin_time = datetime.strptime(f"{date} {begin_time}", "%Y-%m-%d %H:%M")
|
||||
time_diff = datetime.now() - begin_time
|
||||
time_diff_seconds = time_diff.total_seconds()
|
||||
# before 30 minutes, cant checkin
|
||||
if time_diff_seconds < -30*60:
|
||||
self._showTrace(
|
||||
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
||||
f"当前距离预约开始时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}, 无法签到"
|
||||
)
|
||||
return False
|
||||
# before in 30 minutes, can checkin
|
||||
elif -30*60 <= time_diff_seconds < 0:
|
||||
self._showTrace(
|
||||
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
||||
f"当前距离预约开始时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}, 可以签到"
|
||||
)
|
||||
return True
|
||||
# past less than 30 minutes, can checkin
|
||||
elif 0 <= time_diff_seconds < 30*60 - 5: # spare 5 seconds for the checkin process
|
||||
self._showTrace(
|
||||
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
||||
f"当前距离预约开始时间已经过去 {self.__formatDiffTime(abs(time_diff_seconds))}, 可以签到"
|
||||
)
|
||||
return True
|
||||
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法签到")
|
||||
return False
|
||||
|
||||
|
||||
def canRenew(
|
||||
self,
|
||||
date: str
|
||||
) -> bool:
|
||||
|
||||
# have a using record in the given date
|
||||
record = self.__getReserveRecord(date, "使用中")
|
||||
if record is not None:
|
||||
end_time = record["time"]["end"]
|
||||
end_time = datetime.strptime(f"{date} {end_time}", "%Y-%m-%d %H:%M")
|
||||
time_diff = end_time - datetime.now()
|
||||
time_diff_seconds = time_diff.total_seconds()
|
||||
# a using record is definitely after the begin time
|
||||
trace_msg = (
|
||||
f"用户在 {date} 的预约结束时间为 {end_time}, "
|
||||
f"当前距离预约结束时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}"
|
||||
)
|
||||
if abs(time_diff_seconds) < 120*60:
|
||||
self._showTrace(f"{trace_msg}, 可以续约")
|
||||
return True
|
||||
else:
|
||||
self._showTrace(f"{trace_msg}, 无法续约")
|
||||
return False
|
||||
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法续约")
|
||||
return False
|
||||
@@ -0,0 +1,107 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2025 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
import re
|
||||
import time
|
||||
import queue
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
from base.LibOperator import LibOperator
|
||||
|
||||
|
||||
class LibCheckin(LibOperator):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_queue: queue.Queue,
|
||||
output_queue: queue.Queue,
|
||||
driver
|
||||
):
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
self.__driver = driver
|
||||
|
||||
|
||||
def _waitResponseLoad(
|
||||
self
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.presence_of_element_located((By.CLASS_NAME, "ui_dialog"))
|
||||
)
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.presence_of_element_located((By.CLASS_NAME, "resultMessage"))
|
||||
)
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.element_to_be_clickable((By.CLASS_NAME, "btnOK"))
|
||||
)
|
||||
result_message_element = self.__driver.find_element(
|
||||
By.CLASS_NAME, "resultMessage"
|
||||
)
|
||||
ok_btn = self.__driver.find_element(By.CLASS_NAME, "btnOK")
|
||||
except:
|
||||
self._showTrace("签到时发生未知错误 !")
|
||||
return False
|
||||
print(result_message_element)
|
||||
result_message = result_message_element.text
|
||||
if "签到成功" in result_message:
|
||||
try:
|
||||
detail_elements = self.__driver.find_elements(
|
||||
By.CSS_SELECTOR, ".resultMessage dd"
|
||||
)
|
||||
except:
|
||||
pass
|
||||
if detail_elements:
|
||||
details = [element.text for element in detail_elements if element.text.strip()]
|
||||
if len(details) >= 5:
|
||||
self._showTrace(f"\n"\
|
||||
f" 签到成功 !\n"\
|
||||
f" {details[1]}\n"\
|
||||
f" {details[2]}\n"\
|
||||
f" {details[3]}\n"\
|
||||
f" {details[4]}")
|
||||
else:
|
||||
self._showTrace(
|
||||
" 签到成功 !\n"\
|
||||
" 未获取到签到详情 !")
|
||||
ok_btn.click()
|
||||
return True
|
||||
else:
|
||||
failure_reason = result_message.replace("签到失败", "").strip()
|
||||
self._showTrace(f"签到失败: {failure_reason}")
|
||||
ok_btn.click()
|
||||
return False
|
||||
|
||||
|
||||
def checkin(
|
||||
self,
|
||||
username: str
|
||||
) -> bool:
|
||||
|
||||
if self.__driver is None:
|
||||
self._showTrace("未提供有效 WebDriver 实例 !")
|
||||
return False
|
||||
try:
|
||||
checkin_btn = WebDriverWait(self.__driver, 2).until(
|
||||
EC.element_to_be_clickable((By.ID, "btnCheckIn"))
|
||||
)
|
||||
except:
|
||||
self._showTrace(f"用户 {username} 签到界面加载失败 !")
|
||||
return False
|
||||
if "disabled" in checkin_btn.get_attribute("class"):
|
||||
self._showTrace("签到按钮不可用, 可能不在场馆内, 请连接图书馆网络后重试")
|
||||
return False
|
||||
checkin_btn.click()
|
||||
return self._waitResponseLoad()
|
||||
@@ -0,0 +1,40 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2025 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
import re
|
||||
import time
|
||||
import queue
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
from base.LibOperator import LibOperator
|
||||
|
||||
|
||||
class LibCheckout(LibOperator):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_queue: queue.Queue,
|
||||
output_queue: queue.Queue,
|
||||
driver
|
||||
):
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
self.__driver = driver
|
||||
|
||||
|
||||
def _waitResponseLoad(
|
||||
self
|
||||
) -> bool:
|
||||
|
||||
pass
|
||||
@@ -0,0 +1,210 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2025 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
import time
|
||||
import queue
|
||||
import base64
|
||||
|
||||
import ddddocr
|
||||
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
from base.LibOperator import LibOperator
|
||||
|
||||
|
||||
class LibLogin(LibOperator):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_queue: queue.Queue,
|
||||
output_queue: queue.Queue,
|
||||
driver
|
||||
):
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
self.__driver = driver
|
||||
self.__ddddocr = ddddocr.DdddOcr()
|
||||
|
||||
|
||||
def _waitResponseLoad(
|
||||
self
|
||||
) -> bool:
|
||||
|
||||
# wait to verify login success
|
||||
try:
|
||||
WebDriverWait(self.__driver, 2).until( # title contains "自选座位 :: 座位预约系统"
|
||||
EC.title_contains("自选座位 :: 座位预约系统")
|
||||
)
|
||||
WebDriverWait(self.__driver, 2).until( # search button presence
|
||||
EC.presence_of_element_located((By.ID, "search"))
|
||||
)
|
||||
WebDriverWait(self.__driver, 2).until( # select content presence
|
||||
EC.presence_of_element_located((By.CLASS_NAME, "selectContent"))
|
||||
)
|
||||
return True
|
||||
except:
|
||||
self._showTrace(f"登录页面加载失败 ! : 用户账号或者密码错误/验证码错误, 具体以页面提示为准")
|
||||
return False
|
||||
|
||||
|
||||
def __fillLogInElements(
|
||||
self,
|
||||
username: str,
|
||||
password: str
|
||||
) -> bool:
|
||||
|
||||
# ensure elements presence and fill them
|
||||
try:
|
||||
username_element = self.__driver.find_element(By.NAME, "username")
|
||||
username_element.clear()
|
||||
username_element.send_keys(username)
|
||||
password_element = self.__driver.find_element(By.NAME, "password")
|
||||
password_element.clear()
|
||||
password_element.send_keys(password)
|
||||
except Exception as e:
|
||||
self._showTrace(f"用户名或密码填写失败 ! : {e}")
|
||||
return False
|
||||
return True
|
||||
|
||||
|
||||
def __autoRecognizeCaptcha(
|
||||
self
|
||||
) -> str:
|
||||
|
||||
# auto recognize captcha
|
||||
try:
|
||||
captcha_img = self.__driver.find_element(By.ID, "loadImgId")
|
||||
img_src = captcha_img.get_attribute("src")
|
||||
base64_str = img_src.split(',', 1)[1]
|
||||
captcha_img = base64.b64decode(base64_str)
|
||||
captcha_text = self.__ddddocr.classification(captcha_img)
|
||||
captcha_text = ''.join(filter(str.isalnum, captcha_text)).lower()
|
||||
self._showTrace(f"识别到验证码为 : '{captcha_text}'.")
|
||||
if len(captcha_text) != 4:
|
||||
raise Exception("识别到的验证码长度不等于 4 个字符 !")
|
||||
return captcha_text
|
||||
except Exception as e:
|
||||
self._showTrace(f"验证码识别失败 ! : {e}")
|
||||
self.__refreshCaptcha()
|
||||
return ""
|
||||
|
||||
|
||||
def __manualRecognizeCaptcha(
|
||||
self
|
||||
) -> str:
|
||||
|
||||
# manual recognize captcha
|
||||
try:
|
||||
self._show_msg("请输入验证码:")
|
||||
captcha_text = self._wait_msg(timeout=15)
|
||||
self._showTrace(f"输入的验证码为 : '{captcha_text}'.")
|
||||
if len(captcha_text) != 4:
|
||||
raise Exception("输入的验证码长度不等于 4 个字符 !")
|
||||
return captcha_text
|
||||
except Exception as e:
|
||||
self._showTrace(f"输入验证码失败 ! : {e}")
|
||||
self.__refreshCaptcha()
|
||||
return ""
|
||||
|
||||
|
||||
def __refreshCaptcha(
|
||||
self
|
||||
):
|
||||
|
||||
# refresh captcha
|
||||
try:
|
||||
self._showTrace("刷新验证码......")
|
||||
self.__driver.find_element(
|
||||
By.ID, "loadImgId"
|
||||
).click()
|
||||
time.sleep(1)
|
||||
return True
|
||||
except Exception as e:
|
||||
self._showTrace(f"刷新验证码失败 ! : {e}")
|
||||
self.__refreshCaptcha()
|
||||
return False
|
||||
|
||||
|
||||
def __solveCaptcha(
|
||||
self,
|
||||
auto_captcha: bool = True
|
||||
) -> str:
|
||||
|
||||
max_attempts = 5
|
||||
|
||||
for _ in range(max_attempts):
|
||||
if auto_captcha:
|
||||
captcha_text = self.__autoRecognizeCaptcha()
|
||||
else:
|
||||
self._showTrace(f"用户未配置自动识别验证码, 请手动输入验证码 !")
|
||||
captcha_text = self.__manualRecognizeCaptcha()
|
||||
if captcha_text:
|
||||
return captcha_text
|
||||
self._showTrace(f"验证码识别失败 {max_attempts} 次, 请检查验证码是否正确 !")
|
||||
return ""
|
||||
|
||||
|
||||
def __fillCaptchaElement(
|
||||
self,
|
||||
captcha_text: str
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
captcha_element = self.__driver.find_element(By.NAME, "answer")
|
||||
captcha_element.clear()
|
||||
captcha_element.send_keys(captcha_text)
|
||||
return True
|
||||
except Exception as e:
|
||||
self._showTrace(f"验证码填写失败 ! : {e}")
|
||||
self.__refreshCaptcha()
|
||||
return False
|
||||
|
||||
|
||||
def login(
|
||||
self,
|
||||
username: str,
|
||||
password: str,
|
||||
max_attempts: int = 5,
|
||||
auto_captcha: bool = True
|
||||
) -> bool:
|
||||
|
||||
if self.__driver is None:
|
||||
self._showTrace("未提供有效 WebDriver 实例 !")
|
||||
return False
|
||||
# begin login process
|
||||
for attempt in range(max_attempts):
|
||||
self._showTrace(f"用户 {username} 第 {attempt + 1} 次尝试登录......")
|
||||
if not self.__fillLogInElements(
|
||||
username,
|
||||
password,
|
||||
):
|
||||
continue
|
||||
captcha_text = self.__solveCaptcha(auto_captcha)
|
||||
if not captcha_text:
|
||||
continue
|
||||
if not self.__fillCaptchaElement(captcha_text):
|
||||
continue
|
||||
self._showTrace("尝试登录...")
|
||||
try:
|
||||
self.__driver.find_element(
|
||||
By.XPATH,
|
||||
"//input[@type='button' and @value='登录']"
|
||||
).click()
|
||||
except Exception as e:
|
||||
self._showTrace(f"登录失败 ! : {e}")
|
||||
continue
|
||||
if self._waitResponseLoad():
|
||||
self._showTrace(f"用户 {username} 第 {attempt + 1} 次登录成功 !")
|
||||
return True
|
||||
else:
|
||||
self._showTrace(f"用户 {username} 第 {attempt + 1} 次登录失败 !")
|
||||
return False
|
||||
@@ -0,0 +1,56 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2025 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
import queue
|
||||
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
from base.LibOperator import LibOperator
|
||||
|
||||
|
||||
class LibLogout(LibOperator):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_queue: queue.Queue,
|
||||
output_queue: queue.Queue,
|
||||
driver
|
||||
):
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
self.__driver = driver
|
||||
|
||||
|
||||
def _waitResponseLoad(
|
||||
self
|
||||
) -> bool:
|
||||
|
||||
return True
|
||||
|
||||
|
||||
def logout(
|
||||
self,
|
||||
username: str
|
||||
) -> bool:
|
||||
|
||||
if self.__driver is None:
|
||||
self._showTrace("未提供有效 WebDriver 实例 !")
|
||||
return False
|
||||
try:
|
||||
self.__driver.find_element(
|
||||
By.XPATH, "//a[@href='/logout']"
|
||||
).click()
|
||||
self._showTrace(f"用户 {username} 注销成功 !")
|
||||
return True
|
||||
except Exception as e:
|
||||
self._showTrace(f"用户 {username} 注销失败 ! : {e}")
|
||||
return False
|
||||
@@ -0,0 +1,34 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2025 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
import os
|
||||
import queue
|
||||
|
||||
from base.LibOperator import LibOperator
|
||||
|
||||
|
||||
class LibRenew(LibOperator):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_queue: queue.Queue,
|
||||
output_queue: queue.Queue,
|
||||
driver
|
||||
):
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
self.__driver = driver
|
||||
|
||||
|
||||
def _waitResponseLoad(
|
||||
self
|
||||
) -> bool:
|
||||
|
||||
pass
|
||||
@@ -0,0 +1,618 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2025 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
import re
|
||||
import time
|
||||
import queue
|
||||
|
||||
from datetime import datetime, timedelta
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
from base.LibOperator import LibOperator
|
||||
|
||||
|
||||
class LibReserve(LibOperator):
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_queue: queue.Queue,
|
||||
output_queue: queue.Queue,
|
||||
driver
|
||||
):
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
self.__driver = driver
|
||||
# library floor and room mapping in website
|
||||
self.__floor_map = {
|
||||
"2": "二层",
|
||||
"3": "三层",
|
||||
"4": "四层",
|
||||
"5": "五层"
|
||||
}
|
||||
self.__room_map = {
|
||||
"1": "二层内环",
|
||||
"2": "二层外环",
|
||||
"3": "三层内环",
|
||||
"4": "三层外环",
|
||||
"5": "四层内环",
|
||||
"6": "四层外环",
|
||||
"7": "四层期刊区",
|
||||
"8": "五层考研"
|
||||
}
|
||||
|
||||
|
||||
def _waitResponseLoad(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.presence_of_element_located((By.CLASS_NAME, "layoutSeat"))
|
||||
)
|
||||
title_elements = []
|
||||
# reserve failed without title elements, so we need to try
|
||||
try:
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.presence_of_element_located((By.CSS_SELECTOR, ".layoutSeat dt"))
|
||||
)
|
||||
title_elements = self.__driver.find_elements(
|
||||
By.CSS_SELECTOR, ".layoutSeat dt"
|
||||
)
|
||||
except:
|
||||
pass
|
||||
content_elements = self.__driver.find_elements(
|
||||
By.CSS_SELECTOR, ".layoutSeat dd"
|
||||
)
|
||||
if not content_elements:
|
||||
self._showTrace("未找到预约结果")
|
||||
raise
|
||||
title = title_elements[0].text if title_elements else ""
|
||||
contents = [element.text for element in content_elements if element.text.strip()]
|
||||
for message in contents:
|
||||
if "预约失败" in message or "已有1个有效预约" in message:
|
||||
self._showTrace(f"预约失败 - {"".join(contents)}")
|
||||
raise
|
||||
if "预定好了" in title or "预约成功" in title or "操作成功" in title:
|
||||
if len(contents) >= 6:
|
||||
self._showTrace(f"\n"\
|
||||
f" 预约成功 !\n"\
|
||||
f" {contents[1]}\n"\
|
||||
f" {contents[2]}\n"\
|
||||
f" {contents[3]}\n"\
|
||||
f" 签到时间 :{contents[5]}")
|
||||
else:
|
||||
self._showTrace(f"\n"\
|
||||
f" 预约成功 !\n"\
|
||||
f" 未找获取到详细信息")
|
||||
return True
|
||||
except:
|
||||
self._showTrace(f"预约结果加载失败 !")
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def __timeToMins(
|
||||
time_str: str
|
||||
) -> int:
|
||||
|
||||
hour, minute = map(int, time_str.split(":"))
|
||||
return hour*60 + minute
|
||||
|
||||
@staticmethod
|
||||
def __minsToTime(
|
||||
mins: int
|
||||
) -> str:
|
||||
|
||||
hour, minute = divmod(mins, 60)
|
||||
return f"{hour:02d}:{minute:02d}"
|
||||
|
||||
|
||||
def __containRequiredInfo(
|
||||
self,
|
||||
reserve_info: dict
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
# must contain the required infomation
|
||||
if reserve_info.get("floor") is None: # if existence ?
|
||||
raise ValueError("未指定楼层")
|
||||
if reserve_info["floor"] not in self.__floor_map: # if in the mao ?
|
||||
raise ValueError(f"该楼层 '{reserve_info['floor']}' 不存在")
|
||||
if reserve_info.get("room") is None:
|
||||
raise ValueError("未指定房间")
|
||||
if reserve_info["room"] not in self.__room_map:
|
||||
raise ValueError(f"该房间 '{reserve_info['room']}' 不存在")
|
||||
if reserve_info.get("seat_id") is None:
|
||||
raise ValueError("未指定座位")
|
||||
if reserve_info["seat_id"] == "":
|
||||
raise ValueError("未指定座位号")
|
||||
return True
|
||||
except ValueError as e:
|
||||
self._showTrace(
|
||||
f"预约信息错误 ! : {e}, "\
|
||||
f"由于缺少必要的预约信息, 无法开始预约流程, 请检查预约信息是否完整"
|
||||
)
|
||||
return False
|
||||
|
||||
|
||||
def __isValidDate(
|
||||
self,
|
||||
reserve_info: dict
|
||||
) -> bool:
|
||||
|
||||
cur_date = time.strftime("%Y-%m-%d", time.localtime())
|
||||
if reserve_info.get("date") is None:
|
||||
reserve_info["date"] = cur_date
|
||||
self._showTrace(f"预约日期未指定, 自动设置为当前日期: {cur_date}")
|
||||
else:
|
||||
if reserve_info["date"] < cur_date:
|
||||
self._showTrace(
|
||||
f"预约日期错误 ! :"\
|
||||
f"{reserve_info['date']} 早于当前日期 {cur_date}, 自动设置为当前日期"
|
||||
)
|
||||
reserve_info["date"] = cur_date
|
||||
return True
|
||||
|
||||
|
||||
def __isValidBeginTime(
|
||||
self,
|
||||
reserve_info: dict
|
||||
) -> bool:
|
||||
|
||||
cur_time = time.strftime("%H:%M", time.localtime())
|
||||
if reserve_info.get("begin_time") is None:
|
||||
reserve_info["begin_time"] = {}
|
||||
if "time" not in reserve_info["begin_time"]:
|
||||
reserve_info["begin_time"]["time"] = cur_time
|
||||
self._showTrace(f"开始时间未指定, 自动设置为当前时间: {cur_time}")
|
||||
if "max_diff" not in reserve_info["begin_time"]:
|
||||
reserve_info["begin_time"]["max_diff"] = 30
|
||||
self._showTrace(f"开始时间最大时间差未指定, 自动设置为 30 分钟")
|
||||
if "prefer_early" not in reserve_info["begin_time"]:
|
||||
reserve_info["begin_time"]["prefer_early"] = True
|
||||
self._showTrace(f"是否优先选择更早开始时间未指定, 自动设置为 True")
|
||||
return True
|
||||
|
||||
|
||||
def __isValidExpectDuration(
|
||||
self,
|
||||
reserve_info: dict
|
||||
) -> bool:
|
||||
|
||||
if reserve_info.get("expect_duration") is None:
|
||||
reserve_info["expect_duration"] = 4
|
||||
self._showTrace("预约持续时间未指定, 使用默认时长为 4 小时")
|
||||
if reserve_info.get("satisfy_duration") is None:
|
||||
reserve_info["satisfy_duration"] = True
|
||||
self._showTrace("预约满足时长要求未指定, 默认满足")
|
||||
return True
|
||||
|
||||
|
||||
def __isValidEndTime(
|
||||
self,
|
||||
reserve_info: dict
|
||||
) -> bool:
|
||||
|
||||
if reserve_info.get("end_time") is None:
|
||||
reserve_info["end_time"] = {}
|
||||
if "time" not in reserve_info["end_time"]:
|
||||
end_mins = self.__timeToMins(reserve_info["begin_time"]["time"])
|
||||
end_mins = end_mins + int(reserve_info["expect_duration"]*60)
|
||||
reserve_info["end_time"] = {
|
||||
"time": self.__minsToTime(end_mins),
|
||||
"max_diff": 30,
|
||||
"prefer_early": False
|
||||
}
|
||||
self._showTrace(
|
||||
f"结束时间未指定, 自动设置为开始时间加上期望时长: {reserve_info['end_time']['time']}"
|
||||
)
|
||||
if "max_diff" not in reserve_info["end_time"]:
|
||||
reserve_info["end_time"]["max_diff"] = 30
|
||||
self._showTrace(f"结束时间最大时间差未指定, 自动设置为 30 分钟")
|
||||
if "prefer_early" not in reserve_info["end_time"]:
|
||||
reserve_info["end_time"]["prefer_early"] = False
|
||||
self._showTrace(f"是否优先选择较晚结束时间未指定, 自动设置为 True")
|
||||
return True
|
||||
|
||||
|
||||
def __finalCheck(
|
||||
self,
|
||||
reserve_info: dict
|
||||
):
|
||||
|
||||
begin_time, end_time = reserve_info["begin_time"], reserve_info["end_time"]
|
||||
begin_mins = self.__timeToMins(begin_time["time"])
|
||||
end_mins = self.__timeToMins(end_time["time"])
|
||||
# if end time is earlier than begin_time, exchange them
|
||||
if end_mins < begin_mins:
|
||||
self._showTrace(
|
||||
f"结束时间 {end_time['time']} 早于开始时间 {begin_time['time']}, 自动交换"
|
||||
)
|
||||
reserve_info["end_time"] = begin_time
|
||||
reserve_info["begin_time"] = end_time
|
||||
begin_time, end_time = reserve_info["begin_time"], reserve_info["end_time"]
|
||||
begin_mins = self.__timeToMins(begin_time["time"])
|
||||
end_mins = self.__timeToMins(end_time["time"])
|
||||
# ensure the end time is not later than 23:30
|
||||
if end_mins > self.__timeToMins("23:30"):
|
||||
self._showTrace(
|
||||
f"结束时间 {end_time['time']} 晚于 23:30, 自动设置为 23:30"
|
||||
)
|
||||
reserve_info["end_time"]["time"] = "23:30"
|
||||
end_mins = self.__timeToMins("23:30")
|
||||
# ensure the duration is not longer than 8 hours
|
||||
if reserve_info["satisfy_duration"]:
|
||||
if reserve_info["expect_duration"] > 8:
|
||||
self._showTrace(
|
||||
f"该用户设置了优先满足时长要求, 但是预约期望持续时间 "
|
||||
f"{reserve_info['expect_duration']} 小时 "
|
||||
f"超出最大时长 8 小时, 自动设置为 8 小时"
|
||||
)
|
||||
reserve_info["expect_duration"] = 8
|
||||
else:
|
||||
if end_mins - begin_mins > 8*60:
|
||||
self._showTrace(
|
||||
f"该用户未设置优先满足时长要求, 但是检查到预约持续时间 "
|
||||
f"{int((end_mins - begin_mins)/60)} 小时 "
|
||||
f"超出最大时长 8 小时, 自动设置为 8 小时"
|
||||
)
|
||||
reserve_info["expect_duration"] = 8
|
||||
reserve_info["end_time"]["time"] = self.__minsToTime(begin_mins + 8*60)
|
||||
return True
|
||||
|
||||
|
||||
def __checkReserveInfo(
|
||||
self,
|
||||
reserve_info: dict
|
||||
) -> bool:
|
||||
|
||||
if not self.__containRequiredInfo(reserve_info):
|
||||
return False
|
||||
if not self.__isValidDate(reserve_info):
|
||||
return False
|
||||
if not self.__isValidBeginTime(reserve_info):
|
||||
return False
|
||||
if not self.__isValidExpectDuration(reserve_info):
|
||||
return False
|
||||
if not self.__isValidEndTime(reserve_info):
|
||||
return False
|
||||
if not self.__finalCheck(reserve_info):
|
||||
return False
|
||||
self._showTrace(
|
||||
f"预约信息检查完成, 准备预约 "
|
||||
f"{reserve_info['date']} "
|
||||
f"{reserve_info['begin_time']["time"]} - "
|
||||
f"{reserve_info['end_time']["time"]} "
|
||||
f"图书馆 "
|
||||
f"{self.__floor_map[reserve_info['floor']]} "
|
||||
f"{self.__room_map[reserve_info['room']]} "
|
||||
f"的座位 {reserve_info['seat_id']}"
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def __clickElement(
|
||||
self,
|
||||
trigger_locator: tuple,
|
||||
fail_msg: str,
|
||||
success_msg: str,
|
||||
option_locator: tuple = None
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
# click the trigger element
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.element_to_be_clickable(trigger_locator)
|
||||
).click()
|
||||
if option_locator:
|
||||
# select the option element if specified
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.element_to_be_clickable(option_locator)
|
||||
).click()
|
||||
self._showTrace(success_msg)
|
||||
return True
|
||||
except:
|
||||
self._showTrace(fail_msg)
|
||||
return False
|
||||
|
||||
|
||||
def __selectDate(
|
||||
self,
|
||||
date_str: str
|
||||
) -> bool:
|
||||
|
||||
return self.__clickElement(
|
||||
trigger_locator=(By.ID, "onDate_select"),
|
||||
option_locator=(By.XPATH, f"//p[@id='options_onDate']/a[@value='{date_str}']"),
|
||||
success_msg=f"日期 {date_str} 选择成功 !",
|
||||
fail_msg=f"选择日期失败 ! : {date_str} 不可用"
|
||||
)
|
||||
|
||||
|
||||
def __selectPlace(
|
||||
self,
|
||||
place: str
|
||||
) -> bool:
|
||||
|
||||
actual_place = "1" if place == "图书馆" else "1"
|
||||
return self.__clickElement(
|
||||
trigger_locator=(By.ID, "display_building"),
|
||||
option_locator=(By.XPATH, f"//p[@id='options_building']/a[@value='{actual_place}']"),
|
||||
success_msg=f"预约场所 {place} 选择成功 !",
|
||||
fail_msg=f"选择预约场所失败 ! : {place} 不可用"
|
||||
)
|
||||
|
||||
|
||||
def __selectFloor(
|
||||
self,
|
||||
floor: str
|
||||
) -> bool:
|
||||
|
||||
display_floor = self.__floor_map.get(floor)
|
||||
return self.__clickElement(
|
||||
trigger_locator=(By.ID, "floor_select"),
|
||||
option_locator=(By.XPATH, f"//p[@id='options_floor']/a[@value='{floor}']"),
|
||||
success_msg=f"楼层 {display_floor} 选择成功 !",
|
||||
fail_msg=f"选择楼层失败 ! : {display_floor} 不可用"
|
||||
)
|
||||
|
||||
|
||||
def __selectRoom(
|
||||
self,
|
||||
room: str
|
||||
) -> bool:
|
||||
|
||||
display_room = self.__room_map.get(room)
|
||||
return self.__clickElement(
|
||||
trigger_locator=(By.ID, f"room_{room}"),
|
||||
option_locator=None,
|
||||
success_msg=f"房间 {display_room} 选择成功 !",
|
||||
fail_msg=f"选择房间失败 ! : {display_room} 不可用"
|
||||
)
|
||||
|
||||
|
||||
def __selectSeat(
|
||||
self,
|
||||
seat_id: str
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
# wait fot seat layout element to load
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.presence_of_element_located((By.ID, "seatLayout"))
|
||||
)
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "li[id^='seat_']"))
|
||||
)
|
||||
except:
|
||||
self._showTrace(f"座位加载失败 !")
|
||||
return False
|
||||
try:
|
||||
all_seats = self.__driver.find_elements(
|
||||
By.CSS_SELECTOR, "li[id^='seat_']"
|
||||
)
|
||||
seat_id_upper = seat_id.lstrip('0').upper()
|
||||
for seat in all_seats:
|
||||
if not seat_id_upper == seat.text.lstrip('0'):
|
||||
continue
|
||||
seat_link = seat.find_element(By.TAG_NAME, "a")
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.element_to_be_clickable(seat_link)
|
||||
)
|
||||
seat_link.click()
|
||||
seat_status = seat_link.get_attribute("title")
|
||||
self._showTrace(f"座位 {seat_id} 选择成功 ! : 当前状态 - '{seat_status}'")
|
||||
return True
|
||||
self._showTrace(f"座位 {seat_id} 在该楼层区域中不存在, 请检查座位号是否正确")
|
||||
except:
|
||||
self._showTrace(f"座位选择失败 !")
|
||||
return False
|
||||
|
||||
|
||||
def __selectNearestTime(
|
||||
self,
|
||||
time_id: str,
|
||||
time_type: str,
|
||||
target_time: int,
|
||||
max_time_diff: int = 30,
|
||||
prefer_earlier: bool = True
|
||||
) -> int:
|
||||
|
||||
try:
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.presence_of_all_elements_located(
|
||||
(By.CSS_SELECTOR, f"#{time_id} ul li a")
|
||||
)
|
||||
)
|
||||
except:
|
||||
self._showTrace(f"{time_type} 选择失败 ! : 当前未查询到可用时间")
|
||||
return -1
|
||||
try:
|
||||
all_time_opts = self.__driver.find_elements(
|
||||
By.CSS_SELECTOR,
|
||||
f"#{time_id} ul li a"
|
||||
)
|
||||
free_times = []
|
||||
best_time_diff = max_time_diff
|
||||
best_actual_diff = None
|
||||
best_time_opt = None
|
||||
|
||||
if not all_time_opts:
|
||||
self._showTrace(f"{time_type} 选择失败 ! : 当前未查询到可用时间")
|
||||
return -1
|
||||
for time_opt in all_time_opts:
|
||||
time_attr = time_opt.get_attribute("time")
|
||||
if time_attr == "now":
|
||||
now = datetime.now()
|
||||
time_val = int(now.hour*60 + now.minute)
|
||||
elif time_attr and time_attr.isdigit():
|
||||
time_val = int(time_attr)
|
||||
else:
|
||||
continue
|
||||
free_times.append(self.__minsToTime(time_val))
|
||||
actual_diff = time_val - target_time
|
||||
abs_diff = abs(actual_diff)
|
||||
if abs_diff < best_time_diff or (
|
||||
abs_diff == best_time_diff and (
|
||||
# prefer earlier time
|
||||
(prefer_earlier and actual_diff <= 0) or
|
||||
# prefer later time
|
||||
(not prefer_earlier and actual_diff >= 0)
|
||||
)
|
||||
):
|
||||
best_time_diff = abs_diff
|
||||
best_actual_diff = actual_diff
|
||||
best_time_opt = time_opt
|
||||
|
||||
if best_time_opt is not None:
|
||||
best_time_opt.click()
|
||||
abs_time_diff = abs(best_actual_diff)
|
||||
if best_actual_diff < 0:
|
||||
time_relation = f"早了 {abs_time_diff} 分钟"
|
||||
elif best_actual_diff > 0:
|
||||
time_relation = f"晚了 {abs_time_diff} 分钟"
|
||||
else:
|
||||
time_relation = f"正好等于 {time_type}"
|
||||
target_time += best_actual_diff
|
||||
self._showTrace(
|
||||
f"选择距离期望 {time_type} 最近的 {best_time_opt.text}, "\
|
||||
f"与期望 {time_type} 相比 {time_relation}"
|
||||
)
|
||||
return target_time
|
||||
self._showTrace(
|
||||
f"无法选择最近的 {time_type} {self.__minsToTime(target_time)}, "\
|
||||
f"所有可选时间与目标时间相差都超过 {max_time_diff} 分钟"
|
||||
)
|
||||
self._showTrace(f"当前可供预约的 {time_type} 有: {free_times}")
|
||||
return -1
|
||||
except:
|
||||
self._showTrace(f"{time_type} {self.__minsToTime(target_time)} 选择失败 !")
|
||||
return -1
|
||||
|
||||
|
||||
def __selectSeatTime(
|
||||
self,
|
||||
begin_time: dict,
|
||||
end_time: dict,
|
||||
expct_duration: int = 4,
|
||||
satisfy_duration: bool = True
|
||||
) -> bool:
|
||||
|
||||
expect_begin_time = actual_begin_time = begin_time["time"]
|
||||
expect_end_time = actual_end_time = end_time["time"]
|
||||
expect_begin_mins = self.__timeToMins(expect_begin_time)
|
||||
expect_end_mins = self.__timeToMins(expect_end_time)
|
||||
|
||||
# select the begin time
|
||||
if self.__selectNearestTime(
|
||||
time_id="startTime", # dont change into begin, this is the element in the page
|
||||
time_type="开始时间",
|
||||
target_time=expect_begin_mins,
|
||||
max_time_diff=begin_time["max_diff"],
|
||||
prefer_earlier=begin_time["prefer_early"]
|
||||
) == -1:
|
||||
return False
|
||||
else:
|
||||
actual_begin_time = self.__minsToTime(expect_begin_mins)
|
||||
# if 'satisfy_duration' is True.
|
||||
# select the end time based on the begin time
|
||||
# (because it may be changed under the 'max time diff' strategy) and expect duration.
|
||||
if satisfy_duration:
|
||||
expect_end_mins = int(expect_begin_mins + expct_duration*60)
|
||||
self._showTrace(
|
||||
f"需要满足期望预约持续时间: {expct_duration} 小时, "\
|
||||
f"根据开始时间 {actual_begin_time} 计算结束时间: {self.__minsToTime(expect_end_mins)}"
|
||||
)
|
||||
# select the end time
|
||||
if self.__selectNearestTime(
|
||||
time_id="endTime",
|
||||
time_type="结束时间",
|
||||
target_time=expect_end_mins,
|
||||
max_time_diff=end_time["max_diff"],
|
||||
prefer_earlier=end_time["prefer_early"]
|
||||
) == -1:
|
||||
return False
|
||||
else:
|
||||
actual_end_time = self.__minsToTime(expect_end_mins)
|
||||
self._showTrace(
|
||||
f"期望预约时间段: {expect_begin_time} - {expect_end_time}, "
|
||||
f"实际预约时间段: {actual_begin_time} - {actual_end_time}"
|
||||
)
|
||||
return True
|
||||
|
||||
|
||||
def reserve(
|
||||
self,
|
||||
reserve_info: dict
|
||||
) -> bool:
|
||||
|
||||
submit_reserve = False
|
||||
reserve_success = False
|
||||
have_hover_on_page = False
|
||||
|
||||
# reserve info
|
||||
if not self.__checkReserveInfo(reserve_info):
|
||||
return False
|
||||
# map page
|
||||
try:
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.element_to_be_clickable((By.XPATH, "//a[@href='/map']"))
|
||||
).click()
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.presence_of_element_located((By.ID, "seatLayout"))
|
||||
)
|
||||
except:
|
||||
self._showTrace(f"加载预约选座页面失败 !")
|
||||
return False
|
||||
# date, place, floor
|
||||
if not self.__selectDate(reserve_info["date"]):
|
||||
return False
|
||||
if not self.__selectPlace(reserve_info["place"]):
|
||||
return False
|
||||
if not self.__selectFloor(reserve_info["floor"]):
|
||||
return False
|
||||
# room find
|
||||
try:
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.element_to_be_clickable((By.ID, "findRoom"))
|
||||
).click()
|
||||
except:
|
||||
self._showTrace("加载房间/区域失败 !")
|
||||
return False
|
||||
# room
|
||||
if not self.__selectRoom(reserve_info["room"]):
|
||||
return False
|
||||
else:
|
||||
have_hover_on_page = True
|
||||
# seat selections
|
||||
if not self.__selectSeat(reserve_info["seat_id"]):
|
||||
pass
|
||||
elif not self.__selectSeatTime(
|
||||
begin_time=reserve_info["begin_time"],
|
||||
end_time=reserve_info["end_time"],
|
||||
expct_duration=reserve_info["expect_duration"],
|
||||
satisfy_duration=reserve_info["satisfy_duration"]
|
||||
):
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
WebDriverWait(self.__driver, 2).until(
|
||||
EC.element_to_be_clickable((By.ID, "reserveBtn"))
|
||||
).click()
|
||||
submit_reserve = True
|
||||
if not self._waitResponseLoad():
|
||||
raise
|
||||
reserve_success = True
|
||||
except:
|
||||
self._showTrace(f"预约提交失败 !")
|
||||
if not submit_reserve and have_hover_on_page:
|
||||
self.__driver.refresh()
|
||||
return reserve_success
|
||||
@@ -0,0 +1,12 @@
|
||||
"""
|
||||
Operators module for the AutoLibrary project.
|
||||
|
||||
Here are the classes and modules in this package:
|
||||
- AutoLib: AutoLibrary operator.
|
||||
- LibLogin: Library operator for logging in.
|
||||
- LibLogout: Library operator for logging out.
|
||||
- LibReserve: Library operator for reserving seat.
|
||||
- LibCheckin: Library operator for checking in seat.
|
||||
- LibCheckout: Library operator for checking out seat.
|
||||
- LibRenew: Library operator for renewing seat.
|
||||
"""
|
||||
Reference in New Issue
Block a user