1
1
mirror of https://github.com/KenanZhu/AutoLibrary.git synced 2026-06-17 23:13:03 +08:00

refactor(pages): 引入 Page Object 模式重构全部页面模块,变量统一为 snake_case

将原始 Selenium 操作脚本重构为三层 Page Object 架构:
- Page Objects(LoginPage/ReserveView/RecordsView/MainShell)
- Component Objects(Overlay 基类 + SeatMapOverlay/ReserveResultDialog 等对话框)
- Flow 状态机(ReserveFlow/CheckinFlow/RenewFlow)
- Services(CaptchaHandler/ReserveValidator/RecordChecker)

变量命名统一为 snake_case,方法名保持 camelCase,类名保持 PascalCase。

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
2026-05-26 12:39:21 +08:00
parent 106463b9e5
commit 2226e8ac90
18 changed files with 3007 additions and 0 deletions
+398
View File
@@ -0,0 +1,398 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2025 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import os
import queue
from selenium import webdriver
from selenium.common.exceptions import (
TimeoutException,
WebDriverException,
)
from selenium.webdriver.edge.service import Service as EdgeService
from selenium.webdriver.chrome.service import Service as ChromeService
from selenium.webdriver.firefox.service import Service as FirefoxService
from base.MsgBase import MsgBase
from pages.LoginPage import LoginPage
from pages.MainShell import MainShell
from pages.flows.ReserveFlow import ReserveFlow, ReserveContext
from pages.flows.CheckinFlow import CheckinFlow
from pages.flows.RenewFlow import RenewFlow
from pages.services.CaptchaHandler import CaptchaHandler
from pages.services.ReserveValidator import ReserveValidator
from pages.services.RecordChecker import RecordChecker
class AutoLibPages(MsgBase):
def __init__(
self,
input_queue: queue.Queue,
output_queue: queue.Queue,
run_config: dict,
) -> None:
super().__init__(input_queue, output_queue)
self.__run_config: dict = run_config
self.__user_config: dict | None = None
self.__driver = None
self.__driver_type: str = ""
self.__driver_path: str = ""
self.__login_page: LoginPage = None
self.__shell: MainShell = None
self.__captcha_handler: CaptchaHandler = None
self.__record_checker: RecordChecker = None
self.__reserve_validator: ReserveValidator = None
self.__reserve_flow: ReserveFlow = None
self.__checkin_flow: CheckinFlow = None
self.__renew_flow: RenewFlow = None
if not self.__initBrowserDriver():
raise Exception("浏览器驱动初始化失败 !")
else:
if not self.__initDriverUrl():
self.close()
raise Exception("浏览器驱动URL初始化失败 !")
self.__initPagesServices()
self.__initPagesFlows()
def __initBrowserDriver(
self,
) -> bool:
self._showTrace("正在初始化浏览器驱动......", no_log=True)
web_driver_config: dict = self.__run_config.get("web_driver", None)
self.__driver_type = web_driver_config.get("driver_type")
match self.__driver_type.lower():
case "edge":
driver_options = webdriver.EdgeOptions()
case "chrome":
driver_options = webdriver.ChromeOptions()
case "firefox":
driver_options = webdriver.FirefoxOptions()
case _:
self._showTrace(
f"不支持的浏览器驱动类型: {self.__driver_type} !",
self.TraceLevel.WARNING,
)
return False
if not web_driver_config:
self._showTrace("未配置浏览器驱动参数 !", self.TraceLevel.ERROR)
return False
if web_driver_config.get("headless"):
driver_options.add_argument("--headless")
driver_options.add_argument("--disable-gpu")
driver_options.add_argument("--no-sandbox")
driver_options.add_argument("--disable-dev-shm-usage")
# must be 1920x1080, otherwise the page will cause some elements not accessible
driver_options.add_argument("--window-size=1920,1080")
# omit ssl errors and verbose log level
driver_options.add_argument("--ignore-certificate-errors")
driver_options.add_argument("--ignore-ssl-errors")
driver_options.add_argument("--log-level=OFF")
driver_options.add_argument("--silent")
# set options for chrome and edge
if self.__driver_type.lower() in ["edge", "chrome"]:
driver_options.add_argument("--remote-allow-origins=*")
driver_options.add_experimental_option("excludeSwitches", ["enable-automation"])
driver_options.add_experimental_option("useAutomationExtension", False)
driver_options.add_argument("--disable-blink-features=AutomationControlled")
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "\
"AppleWebKit/537.36 (KHTML, like Gecko) "\
"Chrome/120.0.0.0 "\
"Safari/537.36"
if self.__driver_type.lower() == "edge":
user_agent += " Edg/120.0.0.0"
# set options for firefox
elif self.__driver_type.lower() == "firefox":
driver_options.set_preference("dom.webdriver.enabled", False)
driver_options.set_preference("useAutomationExtension", False)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) "\
"Gecko/20100101 Firefox/120.0"
driver_options.add_argument(f"user-agent={user_agent}")
# init browser driver
self.__driver_path = web_driver_config.get("driver_path")
if not self.__driver_path:
self._showTrace("未配置浏览器驱动路径 !", self.TraceLevel.WARNING)
return False
self.__driver_path = os.path.abspath(self.__driver_path)
try:
service = None
match self.__driver_type.lower():
case "edge":
service = EdgeService(executable_path=self.__driver_path)
self.__driver = webdriver.Edge(service=service, options=driver_options)
case "chrome":
service = ChromeService(executable_path=self.__driver_path)
self.__driver = webdriver.Chrome(service=service, options=driver_options)
case "firefox":
self._showTrace("Firefox 浏览器驱动初始化略慢, 请耐心等待...", no_log=True)
service = FirefoxService(executable_path=self.__driver_path)
self.__driver = webdriver.Firefox(service=service, options=driver_options)
case _:
raise Exception(f"不支持的浏览器驱动类型: {self.__driver_type} !")
self.__driver.implicitly_wait(1)
self.__driver.execute_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
)
except WebDriverException as e:
self._showTrace(f"浏览器驱动初始化失败: {e}", self.TraceLevel.ERROR)
return False
except Exception as e:
self._showTrace(f"浏览器驱动初始化失败: {e}", self.TraceLevel.ERROR)
return False
self._showTrace(f"浏览器驱动已初始化, 类型: {self.__driver_type}, 路径: {self.__driver_path}")
return True
def __initDriverUrl(
self,
) -> bool:
lib_config: dict = self.__run_config.get("library", None)
if not lib_config:
self._showTrace("未配置图书馆参数 !", self.TraceLevel.ERROR)
return False
url: str = lib_config.get("host_url") + lib_config.get("login_url")
self.__login_page = LoginPage(self.__driver)
self.__driver.set_page_load_timeout(5)
try:
self.__driver.get(url)
except TimeoutException:
self.__login_page.stopPageLoad()
self._showTrace(
"图书馆登录页面加载超时 ! 请检查网络环境是否正常", self.TraceLevel.ERROR
)
return False
except WebDriverException as e:
self._showTrace(f"图书馆页面加载失败: {e}", self.TraceLevel.ERROR)
return False
if not self.__login_page.waitUntilLoaded():
return False
return True
def __initPagesServices(
self,
) -> None:
if not self.__driver:
self._showTrace("浏览器驱动未初始化, 请先初始化浏览器驱动 !", self.TraceLevel.WARNING)
return
self.__shell = MainShell(self.__driver)
self.__captcha_handler = CaptchaHandler(
input_queue=self._input_queue,
output_queue=self._output_queue,
login_page=self.__login_page,
)
self.__record_checker = RecordChecker(
input_queue=self._input_queue,
output_queue=self._output_queue,
shell=self.__shell,
)
self.__reserve_validator = ReserveValidator(
input_queue=self._input_queue,
output_queue=self._output_queue,
)
def __initPagesFlows(
self,
) -> None:
self.__reserve_flow = ReserveFlow(
input_queue=self._input_queue,
output_queue=self._output_queue,
driver=self.__driver,
shell=self.__shell,
)
self.__checkin_flow = CheckinFlow(
input_queue=self._input_queue,
output_queue=self._output_queue,
driver=self.__driver,
shell=self.__shell,
)
self.__renew_flow = RenewFlow(
input_queue=self._input_queue,
output_queue=self._output_queue,
driver=self.__driver,
shell=self.__shell,
)
def __run(
self,
username: str,
password: str,
login_config: dict,
run_mode_config: dict,
reserve_info: dict,
) -> int:
# result : -1 - terminate, 0 - success, 1 - failed, 2 - passed
result: int = 2
# login
auto_captcha: bool = login_config.get("auto_captcha", True)
if not self.__login_page.login(
username,
password,
captcha_solver=lambda: self.__captcha_handler.solveCaptcha(auto_captcha),
tracer=self._showTrace,
log_level=self.TraceLevel,
max_attempts=login_config.get("max_attempt", 3),
):
return 1
run_mode_raw: int = run_mode_config.get("run_mode", 0)
run_mode: dict[str, bool] = {
"auto_reserve": run_mode_raw & 0x1,
"auto_checkin": run_mode_raw & 0x2,
"auto_renewal": run_mode_raw & 0x4,
}
# reserve
if run_mode["auto_reserve"]:
if self.__record_checker.canReserve(reserve_info.get("date")):
if self.__reserve_validator.validate(reserve_info):
ctx = ReserveContext(
username=username,
date=reserve_info["date"],
floor=reserve_info["floor"],
room=reserve_info["room"],
seat_id=reserve_info["seat_id"],
begin_time=reserve_info["begin_time"]["time"],
end_time=reserve_info["end_time"]["time"],
begin_max_diff=reserve_info["begin_time"]["max_diff"],
end_max_diff=reserve_info["end_time"]["max_diff"],
begin_prefer_early=reserve_info["begin_time"]["prefer_early"],
end_prefer_early=reserve_info["end_time"]["prefer_early"],
expect_duration=reserve_info["expect_duration"],
satisfy_duration=reserve_info["satisfy_duration"],
)
if self.__reserve_flow.execute(ctx):
result = 0
else:
result = 1
else:
result = 1
else:
self._showTrace(f"用户 {username} 无法预约, 已跳过")
result = 2
# checkin
last_result: int = result
if run_mode["auto_checkin"] and last_result != 1:
if self.__record_checker.canCheckin():
if self.__checkin_flow.execute(username):
result = 0
else:
result = 1
else:
self._showTrace(f"用户 {username} 无法签到, 已跳过")
result = 2
if last_result == 0: # partly success
result = 0
# renewal
last_result = result
if run_mode["auto_renewal"] and last_result != 1:
can_renew, record = self.__record_checker.canRenew()
if can_renew:
renew_info: dict = reserve_info.get("renew_time", {})
if self.__renew_flow.execute(username, record, renew_info):
if self.__record_checker.postRenewCheck(record):
self._showTrace(f"用户 {username} 续约成功 !")
result = 0
else:
if result != 1: # partly success
result = 0
else:
result = 1
else:
result = 1
else:
self._showTrace(f"用户 {username} 无法续约, 已跳过")
result = 2
if last_result == 0: # partly success
result = 0
# logout
if not self.__shell.logout():
if not self.__initDriverUrl():
return -1
return result
def run(
self,
user_config: dict,
) -> None:
self.__user_config = user_config
user_counter: dict[str, int] = {"current": 0, "success": 0, "failed": 0, "passed": 0}
users: list = self.__user_config["users"]
self._showTrace(f"共发现 {len(users)} 个用户")
for user in users:
user_counter["current"] += 1
self._showTrace(
f"正在处理第 {user_counter['current']}/{len(users)} 个用户: {user['username']}......",
no_log=True,
)
if not user["enabled"]:
self._showTrace(f"用户 {user['username']} 已跳过")
user_counter["passed"] += 1
continue
r: int = self.__run(
username=user["username"],
password=user["password"],
login_config=self.__run_config["login"],
run_mode_config=self.__run_config["mode"],
reserve_info=user["reserve_info"],
)
if r == -1:
self._showTrace(
f"用户 {user['username']} 处理过程中页面发生异常, 无法继续操作, 任务已终止 !",
self.TraceLevel.WARNING,
)
break
elif r == 0:
user_counter["success"] += 1
elif r == 1:
user_counter["failed"] += 1
elif r == 2:
user_counter["passed"] += 1
self._showTrace(
f"处理完成, 共计 {user_counter['current']} 个用户, "
f"成功 {user_counter['success']} 个用户, "
f"失败 {user_counter['failed']} 个用户, "
f"跳过 {user_counter['passed']} 个用户"
)
return
def close(
self,
) -> bool:
if self.__driver:
if self.__driver_type.lower() == "firefox":
self._showTrace(
"Firefox 浏览器驱动关闭略慢, 请耐心等待...",
no_log=True,
)
try:
self.__driver.quit()
except WebDriverException as e:
self._showTrace(f"浏览器驱动关闭时发生异常: {e}", self.TraceLevel.WARNING)
self.__driver = None
self._showTrace("浏览器驱动已关闭")
return True
else:
self._showTrace("浏览器驱动未初始化, 无需关闭", no_log=True)
return False
+209
View File
@@ -0,0 +1,209 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
from typing import Callable
from selenium.common.exceptions import (
ElementNotInteractableException,
NoSuchElementException,
TimeoutException,
)
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class LoginPage:
USERNAME_INPUT = (By.NAME, "username")
PASSWORD_INPUT = (By.NAME, "password")
CAPTCHA_INPUT = (By.NAME, "answer")
CAPTCHA_IMG = (By.ID, "loadImgId")
LOGIN_BUTTON = (By.XPATH, "//input[@type='button' and @value='登录']")
SUCCESS_INDICATOR_SEARCH = (By.ID, "search")
SUCCESS_INDICATOR_CONTENT = (By.CLASS_NAME, "selectContent")
SUCCESS_TITLE_KEYWORD = "自选座位 :: 座位预约系统"
PAGE_LOAD_TIMEOUT = 5
def __init__(
self,
driver: WebDriver,
) -> None:
self._driver: WebDriver = driver
def navigate(
self,
url: str,
) -> bool:
self._driver.set_page_load_timeout(self.PAGE_LOAD_TIMEOUT)
self._driver.get(url)
if not self.waitUntilLoaded():
return False
return True
def waitUntilLoaded(
self,
) -> bool:
try:
WebDriverWait(self._driver, 2).until(
EC.title_contains("首页")
)
WebDriverWait(self._driver, 2).until(
EC.presence_of_element_located(self.USERNAME_INPUT)
)
WebDriverWait(self._driver, 2).until(
EC.presence_of_element_located(self.PASSWORD_INPUT)
)
WebDriverWait(self._driver, 2).until(
EC.presence_of_element_located(self.CAPTCHA_INPUT)
)
WebDriverWait(self._driver, 2).until(
EC.presence_of_element_located(self.CAPTCHA_IMG)
)
return True
except (NoSuchElementException, TimeoutException):
return False
except Exception:
return False
def fillCredentials(
self,
username: str,
password: str,
) -> bool:
try:
el = self._driver.find_element(*self.USERNAME_INPUT)
el.clear()
el.send_keys(username)
el = self._driver.find_element(*self.PASSWORD_INPUT)
el.clear()
el.send_keys(password)
return True
except (NoSuchElementException, TimeoutException):
return False
except Exception:
return False
def getCaptchaImageSrc(
self,
) -> str:
captcha_el = self._driver.find_element(*self.CAPTCHA_IMG)
return captcha_el.get_attribute("src")
def refreshCaptcha(
self,
) -> bool:
try:
self._driver.find_element(*self.CAPTCHA_IMG).click()
return True
except (NoSuchElementException, TimeoutException,
ElementNotInteractableException):
return False
except Exception:
return False
def fillCaptcha(
self,
captcha_text: str,
) -> bool:
try:
el = self._driver.find_element(*self.CAPTCHA_INPUT)
el.clear()
el.send_keys(captcha_text)
return True
except (NoSuchElementException, TimeoutException):
return False
except Exception:
return False
def clickLogin(
self,
) -> bool:
try:
self._driver.find_element(*self.LOGIN_BUTTON).click()
return True
except (NoSuchElementException, TimeoutException,
ElementNotInteractableException):
return False
except Exception:
return False
def waitLoginSuccess(
self,
) -> bool:
try:
WebDriverWait(self._driver, 2).until(
EC.title_contains(self.SUCCESS_TITLE_KEYWORD)
)
WebDriverWait(self._driver, 2).until(
EC.presence_of_element_located(self.SUCCESS_INDICATOR_SEARCH)
)
WebDriverWait(self._driver, 2).until(
EC.presence_of_element_located(self.SUCCESS_INDICATOR_CONTENT)
)
return True
except (NoSuchElementException, TimeoutException):
return False
except Exception:
return False
def stopPageLoad(
self,
) -> None:
self._driver.execute_script("window.stop();")
def login(
self,
username: str,
password: str,
captcha_solver: Callable[[], str],
tracer: Callable[..., None],
log_level: type,
max_attempts: int = 5,
) -> bool:
ERR = log_level.ERROR
for attempt in range(max_attempts):
tracer(
f"用户 {username}{attempt + 1} 次尝试登录......",
20, no_log=True,
)
if not self.fillCredentials(username, password):
continue
captcha_text = captcha_solver()
if not captcha_text:
continue
if not self.fillCaptcha(captcha_text):
continue
tracer("尝试登录...", 20, no_log=True)
if not self.clickLogin():
continue
if self.waitLoginSuccess():
tracer(f"用户 {username}{attempt + 1} 次登录成功 !")
return True
else:
err_msg = (
"登录页面加载失败 ! : "
"用户账号或者密码错误/验证码错误, 具体以页面提示为准"
)
tracer(err_msg, ERR)
return False
+166
View File
@@ -0,0 +1,166 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import time
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.common.exceptions import (
NoSuchElementException,
TimeoutException,
)
from pages.ReserveView import ReserveView
from pages.RecordsView import RecordsView
class MainShell:
TAB_RESERVE = (By.XPATH, "//a[@href='/map']")
TAB_HISTORY = (By.XPATH, "//a[@href='/history?type=SEAT']")
TAB_LOGOUT = (By.XPATH, "//a[@href='/logout']")
BTN_CHECKIN = (By.ID, "btnCheckIn")
BTN_EXTEND = (By.ID, "btnExtend")
def __init__(
self,
driver: WebDriver,
) -> None:
self._driver = driver
def gotoReserveView(
self,
) -> ReserveView:
self._clickTab(self.TAB_RESERVE)
WebDriverWait(self._driver, 2).until(
EC.presence_of_element_located((By.ID, "seatLayout"))
)
return ReserveView(self._driver)
def gotoRecordsView(
self,
) -> RecordsView:
self._clickTab(self.TAB_HISTORY)
WebDriverWait(self._driver, 2).until(
EC.presence_of_element_located((By.CLASS_NAME, "myReserveList"))
)
return RecordsView(self._driver)
def logout(
self,
) -> bool:
try:
self._driver.find_element(*self.TAB_LOGOUT).click()
return True
except NoSuchElementException:
return False
except Exception:
return False
def waitCheckinButton(
self,
) -> bool:
try:
WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable(self.BTN_CHECKIN)
)
return True
except TimeoutException:
return False
except Exception:
return False
def waitExtendButton(
self,
) -> bool:
try:
WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable(self.BTN_EXTEND)
)
return True
except TimeoutException:
return False
except Exception:
return False
def isCheckinButtonDisabled(
self,
) -> bool:
btn = self._driver.find_element(*self.BTN_CHECKIN)
return "disabled" in btn.get_attribute("class")
def isExtendButtonDisabled(
self,
) -> bool:
btn = self._driver.find_element(*self.BTN_EXTEND)
return "disabled" in btn.get_attribute("class")
def clickCheckinButton(
self,
) -> None:
btn = WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable(self.BTN_CHECKIN)
)
btn.click()
def clickExtendButton(
self,
) -> None:
btn = WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable(self.BTN_EXTEND)
)
btn.click()
def enableCheckinButtonByJS(
self,
) -> bool:
script = """
try {
var checkin_btn = document.getElementById('btnCheckIn');
if (checkin_btn) {
checkin_btn.classList.remove('disabled');
return true;
}
return false;
} catch (e) {
return false;
}
"""
result = self._driver.execute_script(script)
time.sleep(0.1)
return result
def refresh(
self,
) -> None:
self._driver.refresh()
def _clickTab(
self,
locator: tuple,
) -> None:
WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable(locator)
).click()
+93
View File
@@ -0,0 +1,93 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.common.exceptions import (
NoSuchElementException,
StaleElementReferenceException,
TimeoutException,
)
class RecordsView:
RECORDS_LIST = (By.CSS_SELECTOR, ".myReserveList > dl:not(#moreBlock)")
MORE_BTN = (By.ID, "more_btn")
RECORD_TIME = (By.CSS_SELECTOR, "dt")
RECORD_INFO = (By.CSS_SELECTOR, "a")
def __init__(
self,
driver: WebDriver,
) -> None:
self._driver = driver
def loadRecords(
self,
) -> list | None:
try:
WebDriverWait(self._driver, 2).until(
EC.presence_of_element_located(self.RECORDS_LIST)
)
return self._driver.find_elements(*self.RECORDS_LIST)
except TimeoutException:
return None
except Exception:
return None
def getRecordTimeElement(
self,
record: WebElement,
) -> WebElement:
return record.find_element(*self.RECORD_TIME)
def getRecordInfoElements(
self,
record: WebElement,
) -> list[WebElement]:
return record.find_elements(*self.RECORD_INFO)
def showMoreRecords(
self,
) -> bool:
try:
WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable(self.MORE_BTN)
)
except TimeoutException:
return False
except Exception:
return False
try:
more_btn = self._driver.find_element(*self.MORE_BTN)
if more_btn.is_displayed() and more_btn.is_enabled():
self._driver.execute_script("arguments[0].scrollIntoView(true);", more_btn)
self._driver.execute_script("arguments[0].click();", more_btn)
return True
return False
except (NoSuchElementException, StaleElementReferenceException):
return False
except Exception:
return False
def getRecordText(
self,
record: WebElement,
) -> str:
return record.text.strip()
+266
View File
@@ -0,0 +1,266 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import time
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.common.exceptions import (
ElementNotInteractableException,
NoSuchElementException,
StaleElementReferenceException,
TimeoutException,
)
from pages._dialogs import SeatMapOverlay, ReserveResultDialog
class ReserveView:
DATE_SELECT = (By.ID, "onDate_select")
DATE_OPTION_FMT = "p#options_onDate a[value='{value}']"
DATE_XPATH_FMT = "//p[@id='options_onDate']/a[@value='{value}']"
PLACE_SELECT = (By.ID, "display_building")
PLACE_OPTION_FMT = "p#options_building a[value='{value}']"
PLACE_XPATH_FMT = "//p[@id='options_building']/a[@value='{value}']"
FLOOR_SELECT = (By.ID, "floor_select")
FLOOR_OPTION_FMT = "p#options_floor a[value='{value}']"
FLOOR_XPATH_FMT = "//p[@id='options_floor']/a[@value='{value}']"
FIND_ROOM_BTN = (By.ID, "findRoom")
ROOM_BTN_FMT = "room_{room}"
SEAT_LAYOUT = (By.ID, "seatLayout")
SEAT_ITEMS = (By.CSS_SELECTOR, "li[id^='seat_']")
RESERVE_BTN = (By.ID, "reserveBtn")
START_TIME_OPTS = (By.CSS_SELECTOR, "#startTime ul li a")
END_TIME_OPTS = (By.CSS_SELECTOR, "#endTime ul li a")
RESULT_DIALOG = (By.CLASS_NAME, "layoutSeat")
RESULT_TITLE = (By.CSS_SELECTOR, ".layoutSeat dt")
RESULT_DETAIL = (By.CSS_SELECTOR, ".layoutSeat dd")
FLOOR_MAP = {"2": "二层", "3": "三层", "4": "四层", "5": "五层"}
ROOM_MAP = {
"1": "二层内环", "2": "二层西区", "3": "三层内环", "4": "三层外环",
"5": "四层内环", "6": "四层外环", "7": "四层期刊", "8": "五层考研",
}
def __init__(
self,
driver: WebDriver,
) -> None:
self._driver = driver
def selectDate(
self,
date_str: str,
) -> bool:
if self._clickOptionByJS(
trigger_id="onDate_select",
option_css=self.DATE_OPTION_FMT.format(value=date_str),
):
return True
return self._clickOption(
trigger=self.DATE_SELECT,
option=(By.XPATH, self.DATE_XPATH_FMT.format(value=date_str)),
)
def selectPlace(
self,
place: str = "1",
) -> bool:
if self._clickOptionByJS(
trigger_id="display_building",
option_css=self.PLACE_OPTION_FMT.format(value=place),
):
return True
return self._clickOption(
trigger=self.PLACE_SELECT,
option=(By.XPATH, self.PLACE_XPATH_FMT.format(value=place)),
)
def selectFloor(
self,
floor: str,
) -> bool:
if self._clickOptionByJS(
trigger_id="floor_select",
option_css=self.FLOOR_OPTION_FMT.format(value=floor),
):
return True
return self._clickOption(
trigger=self.FLOOR_SELECT,
option=(By.XPATH, self.FLOOR_XPATH_FMT.format(value=floor)),
)
def selectRoom(
self,
room: str,
) -> bool:
try:
WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable(self.FIND_ROOM_BTN)
).click()
except (TimeoutException, ElementNotInteractableException):
return False
except Exception:
return False
try:
WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable((By.ID, self.ROOM_BTN_FMT.format(room=room)))
).click()
return True
except (TimeoutException, ElementNotInteractableException):
return False
except Exception:
return False
def openSeatMap(
self,
) -> SeatMapOverlay:
return SeatMapOverlay(self._driver)
def selectSeat(
self,
seat_id: str,
) -> str | None:
try:
WebDriverWait(self._driver, 2).until(
EC.presence_of_element_located(self.SEAT_LAYOUT)
)
WebDriverWait(self._driver, 2).until(
EC.presence_of_all_elements_located(self.SEAT_ITEMS)
)
except TimeoutException:
return None
except Exception:
return None
try:
all_seats = self._driver.find_elements(*self.SEAT_ITEMS)
seat_id_upper = seat_id.lstrip('0').upper()
for seat in all_seats:
if not seat_id_upper == seat.text.lstrip('0'):
continue
seat_link = seat.find_element(By.TAG_NAME, "a")
WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable(seat_link)
)
seat_link.click()
return seat_link.get_attribute("title")
return None
except (NoSuchElementException, TimeoutException,
StaleElementReferenceException, ElementNotInteractableException):
return None
except Exception:
return None
def submitReserve(
self,
) -> bool:
try:
WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable(self.RESERVE_BTN)
).click()
return True
except (TimeoutException, ElementNotInteractableException):
return False
except Exception:
return False
def waitResultDialog(
self,
) -> ReserveResultDialog:
return ReserveResultDialog(self._driver)
def getAvailableTimeOptions(
self,
time_id: str,
) -> list:
try:
WebDriverWait(self._driver, 2).until(
EC.presence_of_all_elements_located(
(By.CSS_SELECTOR, f"#{time_id} ul li a")
)
)
except TimeoutException:
return []
except Exception:
return []
return self._driver.find_elements(
By.CSS_SELECTOR,
f"#{time_id} ul li a",
)
def refresh(
self,
) -> None:
self._driver.refresh()
def _clickOptionByJS(
self,
trigger_id: str,
option_css: str,
) -> bool:
script = f"""
try {{
var trigger = document.getElementById('{trigger_id}');
if (trigger) {{
trigger.click();
var option = document.querySelector("{option_css}");
if (option) {{
option.click();
return true;
}}
return false;
}}
return false;
}} catch (e) {{
return false;
}}
"""
result = self._driver.execute_script(script)
time.sleep(0.1)
return result
def _clickOption(
self,
trigger: tuple,
option: tuple,
) -> bool:
try:
WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable(trigger)
).click()
WebDriverWait(self._driver, 2).until(
EC.element_to_be_clickable(option)
).click()
return True
except (TimeoutException, ElementNotInteractableException):
return False
except Exception:
return False
+34
View File
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
from pages.AutoLibPages import AutoLibPages
from pages.LoginPage import LoginPage
from pages.MainShell import MainShell
from pages.ReserveView import ReserveView
from pages.RecordsView import RecordsView
from pages._dialogs import (
SeatMapOverlay,
TimeSelectDialog,
ReserveResultDialog,
CheckinResultDialog,
RenewDialog,
)
__all__ = [
"AutoLibPages",
"LoginPage",
"MainShell",
"ReserveView",
"RecordsView",
"SeatMapOverlay",
"TimeSelectDialog",
"ReserveResultDialog",
"CheckinResultDialog",
"RenewDialog",
]
+302
View File
@@ -0,0 +1,302 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
from selenium.common.exceptions import (
ElementNotInteractableException,
NoSuchElementException,
StaleElementReferenceException,
TimeoutException,
)
from selenium.webdriver.common.by import By
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from pages._overlay import Overlay
class SeatMapOverlay(Overlay):
"""
Seat selection overlay that opens after choosing a floor and room.
"""
ROOT = (By.ID, "seatLayout")
SEAT_ITEMS = (By.CSS_SELECTOR, "li[id^='seat_']")
def __init__(
self,
driver: WebDriver,
) -> None:
super().__init__(driver, self.ROOT)
def selectSeat(
self,
seat_id: str,
) -> str | None:
try:
self._waitAllPresence(self.SEAT_ITEMS)
except (NoSuchElementException, TimeoutException):
return None
except Exception:
return None
try:
all_seats = self._findAll(*self.SEAT_ITEMS)
seat_id_upper = seat_id.lstrip('0').upper()
for seat in all_seats:
if not seat_id_upper == seat.text.lstrip('0'):
continue
seat_link = seat.find_element(By.TAG_NAME, "a")
self._waitClickable((By.TAG_NAME, "a"))
seat_link.click()
return seat_link.get_attribute("title")
return None
except (NoSuchElementException, TimeoutException,
ElementNotInteractableException, StaleElementReferenceException):
return None
except Exception:
return None
class TimeSelectDialog(Overlay):
"""
Time selection panel that appears after selecting a seat.
Contains start-time and end-time option lists.
Does NOT auto-close — the reserve submission handles cleanup.
"""
ROOT = (By.CSS_SELECTOR, "#startTime ul")
def __init__(
self,
driver: WebDriver,
) -> None:
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
def getTimeOptions(
self,
time_id: str,
) -> list[WebElement]:
try:
self._waitAllPresence(
(By.CSS_SELECTOR, f"#{time_id} ul li a")
)
except (NoSuchElementException, TimeoutException):
return []
except Exception:
return []
return self._findAll(
By.CSS_SELECTOR,
f"#{time_id} ul li a",
)
class ReserveResultDialog(Overlay):
"""
Reservation result dialog shown after submitting a reserve request.
"""
ROOT = (By.CLASS_NAME, "layoutSeat")
def __init__(
self,
driver: WebDriver,
) -> None:
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
def getTitle(
self,
) -> str:
try:
return self._find(*self._titleLocator()).text
except (NoSuchElementException, StaleElementReferenceException):
return ""
except Exception:
return ""
def isSuccess(
self,
) -> bool:
title = self.getTitle()
return any(
kw in title
for kw in ("预定好了", "预约成功", "操作成功")
)
def isFailure(
self,
) -> bool:
contents = self.getDetailTexts()
return any(
"预约失败" in msg or "已有1个有效预约" in msg
for msg in contents
)
def getDetailTexts(
self,
) -> list[str]:
try:
elements = self._findAll(By.CSS_SELECTOR, ".layoutSeat dd")
return [el.text for el in elements if el.text.strip()]
except (NoSuchElementException, StaleElementReferenceException):
return []
except Exception:
return []
def _titleLocator(
self,
) -> tuple:
return (By.CSS_SELECTOR, ".layoutSeat dt")
class CheckinResultDialog(Overlay):
"""
Check-in result dialog.
"""
ROOT = (By.CLASS_NAME, "ui_dialog")
RESULT_MSG = (By.CLASS_NAME, "resultMessage")
OK_BTN = (By.CLASS_NAME, "btnOK")
DETAIL_DD = (By.CSS_SELECTOR, ".resultMessage dd")
def __init__(
self,
driver: WebDriver,
) -> None:
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
def getResultMessage(
self,
) -> str:
try:
self._waitPresence(self.RESULT_MSG)
el = self._find(*self.RESULT_MSG)
return el.text
except (TimeoutException, NoSuchElementException, StaleElementReferenceException):
return ""
except Exception:
return ""
def getDetails(
self,
) -> list[str]:
try:
elements = self._findAll(*self.DETAIL_DD)
return [el.text for el in elements if el.text.strip()]
except (NoSuchElementException, StaleElementReferenceException):
return []
except Exception:
return []
def clickOk(
self,
) -> bool:
try:
self._waitClickable(self.OK_BTN).click()
return True
except (NoSuchElementException, TimeoutException, ElementNotInteractableException):
return False
except Exception:
return False
class RenewDialog(Overlay):
"""
Renewal time selection dialog.
"""
ROOT = (By.ID, "extendDiv")
MESSAGE_HEAD = (By.CSS_SELECTOR, "#extendDiv p.messageHead")
RESULT_MSG = (By.CSS_SELECTOR, "#extendDiv div.resultMessage")
TIME_OPTS = (By.CSS_SELECTOR, "#extendDiv .renewal_List li")
OK_BTN = (By.CSS_SELECTOR, "#extendDiv .btnOK")
def __init__(
self,
driver: WebDriver,
) -> None:
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
def waitUntilReady(
self,
) -> bool:
try:
self._waitVisible(self.ROOT)
self._waitPresence(self.MESSAGE_HEAD)
self._waitPresence(self.RESULT_MSG)
except (NoSuchElementException, TimeoutException):
return False
except Exception:
return False
head_msg = self._find(*self.MESSAGE_HEAD).text.strip()
if "警告" in head_msg:
return False
try:
self._waitAllPresence(self.TIME_OPTS)
self._waitPresence(self.OK_BTN)
except (NoSuchElementException, TimeoutException):
return False
except Exception:
return False
return True
def getHeadMessage(
self,
) -> str:
return self._find(*self.MESSAGE_HEAD).text.strip()
def getResultMessage(
self,
) -> str:
return self._find(*self.RESULT_MSG).text.strip()
def getTimeOptions(
self,
) -> list[WebElement]:
return self._findAll(*self.TIME_OPTS)
def getOkButton(
self,
) -> WebElement:
return self._find(*self.OK_BTN)
def clickOk(
self,
) -> bool:
try:
self._find(*self.OK_BTN).click()
return True
except (NoSuchElementException, TimeoutException, ElementNotInteractableException):
return False
except Exception:
return False
+110
View File
@@ -0,0 +1,110 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.remote.webelement import WebElement
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
class Overlay:
"""
Context-managed overlay / modal / dialog on a page.
Automates the lifecycle: wait for appearance on enter,
optionally wait for disappearance on exit.
"""
def __init__(
self,
driver: WebDriver,
root_locator: tuple,
auto_close_on_exit: bool = True,
wait_timeout: float = 3.0,
) -> None:
self._driver: WebDriver = driver
self._root_locator: tuple = root_locator
self._auto_close: bool = auto_close_on_exit
self._timeout: float = wait_timeout
def __enter__(
self,
) -> "Overlay":
WebDriverWait(self._driver, self._timeout).until(
EC.visibility_of_element_located(self._root_locator)
)
return self
def __exit__(
self,
*args: object,
) -> None:
if self._auto_close:
WebDriverWait(self._driver, self._timeout).until(
EC.invisibility_of_element_located(self._root_locator)
)
def _find(
self,
by: str,
value: str,
) -> WebElement:
return self._driver.find_element(by, value)
def _findAll(
self,
by: str,
value: str,
) -> list[WebElement]:
return self._driver.find_elements(by, value)
def _waitClickable(
self,
locator: tuple,
timeout: float = 2.0,
) -> WebElement:
return WebDriverWait(self._driver, timeout).until(
EC.element_to_be_clickable(locator)
)
def _waitPresence(
self,
locator: tuple,
timeout: float = 2.0,
) -> WebElement:
return WebDriverWait(self._driver, timeout).until(
EC.presence_of_element_located(locator)
)
def _waitVisible(
self,
locator: tuple,
timeout: float = 2.0,
) -> WebElement:
return WebDriverWait(self._driver, timeout).until(
EC.visibility_of_element_located(locator)
)
def _waitAllPresence(
self,
locator: tuple,
timeout: float = 2.0,
) -> list[WebElement]:
return WebDriverWait(self._driver, timeout).until(
EC.presence_of_all_elements_located(locator)
)
+94
View File
@@ -0,0 +1,94 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import queue
from selenium.common.exceptions import (
NoSuchElementException,
TimeoutException,
)
from selenium.webdriver.remote.webdriver import WebDriver
from base.MsgBase import MsgBase
from pages.MainShell import MainShell
from pages._dialogs import CheckinResultDialog
class CheckinFlow(MsgBase):
def __init__(
self,
input_queue: queue.Queue,
output_queue: queue.Queue,
driver: WebDriver,
shell: MainShell,
) -> None:
super().__init__(input_queue, output_queue)
self._driver: WebDriver = driver
self._shell: MainShell = shell
def execute(
self,
username: str,
) -> bool:
if not self._shell.waitCheckinButton():
self._showTrace(f"用户 {username} 签到界面加载失败 !", self.TraceLevel.ERROR)
return False
if self._shell.isCheckinButtonDisabled():
self._showTrace("签到按钮不可用, 可能不在场馆内, 正在尝试启用......")
if not self._shell.enableCheckinButtonByJS():
self._showTrace(f"签到按钮启用失败 !", self.TraceLevel.ERROR)
return False
self._showTrace("签到按钮已启用")
self._shell.clickCheckinButton()
try:
with CheckinResultDialog(self._driver) as dialog:
result_msg = dialog.getResultMessage()
if "签到成功" in result_msg:
details = dialog.getDetails()
if details:
if len(details) >= 5:
self._showTrace(
f"\n"
f" 签到成功 !\n"
f" {details[1]}\n"
f" {details[2]}\n"
f" {details[3]}\n"
f" {details[4]}"
)
else:
self._showTrace(
"\n"
" 签到成功 !\n"
" 未获取到签到详情 !"
)
dialog.clickOk()
self._showTrace(f"用户 {username} 签到成功 !")
return True
else:
failure_reason = result_msg.replace("签到失败", "").strip()
self._showTrace(
f"\n"
" 签到失败 !\n"
f" {failure_reason}"
)
dialog.clickOk()
self._showTrace(f"用户 {username} 签到失败 !", self.TraceLevel.ERROR)
return False
except (NoSuchElementException, TimeoutException):
self._showTrace("签到时发生未知错误 !", self.TraceLevel.ERROR)
return False
except Exception:
self._showTrace("签到时发生未知错误 !", self.TraceLevel.ERROR)
return False
+156
View File
@@ -0,0 +1,156 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import queue
from selenium.common.exceptions import (
ElementNotInteractableException,
NoSuchElementException,
TimeoutException,
)
from selenium.webdriver.remote.webdriver import WebDriver
from base.MsgBase import MsgBase
from pages.MainShell import MainShell
from pages._dialogs import RenewDialog
from pages.flows._helpers import (
timeStrToMins,
minsToTimeStr,
findBestTimeOption,
)
class RenewFlow(MsgBase):
LIBRARY_CLOSE_MINS = 1410
def __init__(
self,
input_queue: queue.Queue,
output_queue: queue.Queue,
driver: WebDriver,
shell: MainShell,
) -> None:
super().__init__(input_queue, output_queue)
self._driver: WebDriver = driver
self._shell: MainShell = shell
def execute(
self,
username: str,
record: dict,
renew_info: dict,
) -> bool:
max_diff = renew_info["max_diff"]
prefer_earlier = renew_info["prefer_early"]
end_time = record["time"]["end"]
target_renew_mins = timeStrToMins(end_time) + renew_info["expect_duration"] * 60
if not self._validateRenewTime(end_time, target_renew_mins):
return False
if not self._shell.waitExtendButton():
self._showTrace(f"用户 {username} 续约界面加载失败 !", self.TraceLevel.ERROR)
return False
if self._shell.isExtendButtonDisabled():
self._showTrace(
f"用户 {username} 续约按钮不可用, 可能不在场馆内, "
f"请连接图书馆网络后重试"
)
return False
self._shell.clickExtendButton()
try:
with RenewDialog(self._driver) as dialog:
if not dialog.waitUntilReady():
result_msg = dialog.getResultMessage()
self._showTrace(
f"\n"
f" 续约失败 !\n"
f" {result_msg}"
)
self._shell.refresh()
self._showTrace(f"用户 {username} 续约失败 !", self.TraceLevel.ERROR)
return False
renew_ok_btn = dialog.getOkButton()
renew_time_opts = dialog.getTimeOptions()
if not renew_time_opts:
self._showTrace("当前未查询到可用续约时间 !", self.TraceLevel.WARNING)
self._shell.refresh()
return False
best_opt, best_text, actual_diff, free_times = findBestTimeOption(
renew_time_opts, target_renew_mins, max_diff, prefer_earlier,
is_reserve=False,
)
if best_opt is not None:
best_opt.click()
abs_diff = abs(actual_diff)
if actual_diff < 0:
relation = f"早了 {abs_diff} 分钟"
elif actual_diff > 0:
relation = f"晚了 {abs_diff} 分钟"
else:
relation = "正好等于 续约时间"
self._showTrace(
f"选择距离期望续约时间最近的 {best_text}, "
f"与期望续约时间相比 {relation}"
)
record["time"]["end"] = best_text.strip()
renew_ok_btn.click()
self._shell.refresh()
return True
self._showTrace(
"无法选择最近的可用续约时间 ! "
f"所有可选时间与目标时间相差都超过了 {max_diff} 分钟 !",
self.TraceLevel.WARNING,
)
self._showTrace(f"当前可供续约的时间有: {free_times}")
self._shell.refresh()
return False
except (NoSuchElementException, TimeoutException) as e:
self._showTrace(f"用户 {username} 续约失败 ! : {e}", self.TraceLevel.ERROR)
self._shell.refresh()
return False
except (ElementNotInteractableException) as e:
self._showTrace(f"用户 {username} 续约失败 ! : {e}", self.TraceLevel.ERROR)
self._shell.refresh()
return False
except Exception as e:
self._showTrace(f"用户 {username} 续约失败 ! : {e}", self.TraceLevel.ERROR)
self._shell.refresh()
return False
def _validateRenewTime(
self,
end_time: str,
target_renew_mins: int,
) -> bool:
if target_renew_mins > self.LIBRARY_CLOSE_MINS:
actual_renew_duration = self.LIBRARY_CLOSE_MINS - timeStrToMins(end_time)
if actual_renew_duration <= 0:
self._showTrace(
f"当前结束时间 {end_time} 已接近闭馆时间,无法续约 !", self.TraceLevel.ERROR
)
return False
self._showTrace(
f"续约时间已调整至闭馆时间 "
f"{minsToTimeStr(self.LIBRARY_CLOSE_MINS)},"
f"实际续约时长为 "
f"{actual_renew_duration // 60} 小时 "
f"{actual_renew_duration % 60} 分钟"
)
return True
+272
View File
@@ -0,0 +1,272 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import queue
from dataclasses import dataclass
from typing import Optional
from selenium.common.exceptions import (
ElementNotInteractableException,
NoSuchElementException,
TimeoutException,
)
from selenium.webdriver.remote.webdriver import WebDriver
from base.MsgBase import MsgBase
from pages.MainShell import MainShell
from pages.flows._helpers import (
timeStrToMins,
minsToTimeStr,
findBestTimeOption,
)
from pages.ReserveView import ReserveView
from pages._dialogs import ReserveResultDialog
@dataclass
class ReserveContext:
username: str
date: str
floor: str
room: str
seat_id: str
begin_time: str
end_time: str
begin_max_diff: int = 30
end_max_diff: int = 30
begin_prefer_early: bool = True
end_prefer_early: bool = False
expect_duration: int = 4
satisfy_duration: bool = True
class ReserveFlow(MsgBase):
LIBRARY_CLOSE_MINS = timeStrToMins("23:30")
def __init__(
self,
input_queue: queue.Queue,
output_queue: queue.Queue,
driver: WebDriver,
shell: MainShell,
) -> None:
super().__init__(input_queue, output_queue)
self._driver: WebDriver = driver
self._shell: MainShell = shell
self._ctx: Optional[ReserveContext] = None
def execute(
self,
ctx: ReserveContext,
) -> bool:
self._ctx = ctx
submit_reserve = False
reserve_success = False
have_hover_on_page = False
try:
view = self._shell.gotoReserveView()
except (NoSuchElementException, TimeoutException) as e:
self._showTrace(f"加载预约选座页面失败 ! : {e}", self.TraceLevel.ERROR)
return False
except Exception as e:
self._showTrace(f"加载预约选座页面失败 ! : {e}", self.TraceLevel.ERROR)
return False
if not view.selectDate(ctx.date):
self._showTrace(f"选择日期失败 ! : {ctx.date} 不可用", self.TraceLevel.ERROR)
return False
self._showTrace(f"日期 {ctx.date} 选择成功 !")
if not view.selectPlace("1"):
self._showTrace("选择预约场所失败 ! : 图书馆 不可用", self.TraceLevel.ERROR)
return False
self._showTrace("预约场所 图书馆 选择成功 !")
if not view.selectFloor(ctx.floor):
display_floor = ReserveView.FLOOR_MAP.get(ctx.floor, ctx.floor)
self._showTrace(f"选择楼层失败 ! : {display_floor} 不可用", self.TraceLevel.ERROR)
return False
self._showTrace(f"楼层 {ReserveView.FLOOR_MAP.get(ctx.floor)} 选择成功 !")
if not view.selectRoom(ctx.room):
display_room = ReserveView.ROOM_MAP.get(ctx.room, ctx.room)
self._showTrace(f"选择房间失败 ! : {display_room} 不可用", self.TraceLevel.ERROR)
return False
self._showTrace(f"房间 {ReserveView.ROOM_MAP.get(ctx.room)} 选择成功 !")
have_hover_on_page = True
seat_status = view.selectSeat(ctx.seat_id)
if seat_status is None:
self._showTrace(
f"座位 {ctx.seat_id} 在该楼层区域中不存在, 请检查座位号是否正确",
self.TraceLevel.WARNING,
)
else:
self._showTrace(f"座位 {ctx.seat_id} 选择成功 ! : 当前状态 - '{seat_status}'")
select_time_ok = self._selectSeatTime(view)
if not select_time_ok:
self._showTrace("选择时间失败 !", self.TraceLevel.ERROR)
else:
try:
view.submitReserve()
submit_reserve = True
with ReserveResultDialog(self._driver) as result:
if result.isFailure():
self._showTrace("预约失败", self.TraceLevel.ERROR)
elif result.isSuccess():
details = result.getDetailTexts()
if len(details) >= 6:
self._showTrace(
f"\n"
f" 预约成功 !\n"
f" {details[1]}\n"
f" {details[2]}\n"
f" {details[3]}\n"
f" 签到时间 {details[5]}"
)
else:
self._showTrace(
"\n"
" 预约成功 !\n"
" 未找获取到详细信息"
)
reserve_success = True
else:
self._showTrace("预约结果加载失败 !", self.TraceLevel.ERROR)
except (TimeoutException, ElementNotInteractableException):
self._showTrace("预约提交失败 !", self.TraceLevel.ERROR)
except Exception:
self._showTrace("预约提交失败 !", self.TraceLevel.ERROR)
if not submit_reserve and have_hover_on_page:
view.refresh()
if reserve_success:
self._showTrace(f"用户 {ctx.username} 预约成功 !")
else:
self._showTrace(f"用户 {ctx.username} 预约失败 !", self.TraceLevel.ERROR)
return reserve_success
def _selectSeatTime(
self,
view: ReserveView,
) -> bool:
ctx = self._ctx
exp_beg_tm_str = ctx.begin_time
exp_end_tm_str = ctx.end_time
exp_beg_mins = timeStrToMins(exp_beg_tm_str)
exp_end_mins = timeStrToMins(exp_end_tm_str)
act_beg_mins = exp_beg_mins
act_beg_tm_str = exp_beg_tm_str
act_end_mins = exp_end_mins
act_end_tm_str = exp_end_tm_str
act_beg_mins = self._selectNearestTime(
view,
time_id="startTime",
time_type="开始时间",
target_time=exp_beg_mins,
max_time_diff=ctx.begin_max_diff,
prefer_earlier=ctx.begin_prefer_early,
)
if act_beg_mins == -1:
return False
act_beg_tm_str = minsToTimeStr(act_beg_mins)
if ctx.satisfy_duration:
exp_end_mins = self._calcEndTime(act_beg_mins, ctx.expect_duration)
exp_end_tm_str = minsToTimeStr(exp_end_mins)
self._showTrace(
f"需要满足期望预约持续时间: {ctx.expect_duration} 小时, "
f"根据开始时间 {act_beg_tm_str} 计算结束时间: {exp_end_tm_str}"
)
act_end_mins = self._selectNearestTime(
view,
time_id="end_time",
time_type="结束时间",
target_time=exp_end_mins,
max_time_diff=ctx.end_max_diff,
prefer_earlier=ctx.end_prefer_early,
)
if act_end_mins == -1:
return False
act_end_tm_str = minsToTimeStr(act_end_mins)
self._showTrace(
f"期望预约时间段: {exp_beg_tm_str} - {exp_end_tm_str}, "
f"实际预约时间段: {act_beg_tm_str} - {act_end_tm_str}"
)
return True
def _selectNearestTime(
self,
view: ReserveView,
time_id: str,
time_type: str,
target_time: int,
max_time_diff: int,
prefer_earlier: bool,
) -> int:
all_time_opts = view.getAvailableTimeOptions(time_id)
if not all_time_opts:
self._showTrace(
f"{time_type} 选择失败 ! : 当前未查询到可用时间", self.TraceLevel.ERROR
)
return -1
best_opt, best_text, actual_diff, free_times = findBestTimeOption(
all_time_opts, target_time, max_time_diff, prefer_earlier, is_reserve=True
)
if best_opt is not None:
best_opt.click()
abs_diff = abs(actual_diff)
if actual_diff < 0:
relation = f"早了 {abs_diff} 分钟"
elif actual_diff > 0:
relation = f"晚了 {abs_diff} 分钟"
else:
relation = f"正好等于 {time_type}"
self._showTrace(
f"选择距离期望 {time_type} 最近的 {best_text}, "
f"与期望 {time_type} 相比 {relation}"
)
return target_time + actual_diff
target_time_str = minsToTimeStr(target_time)
self._showTrace(
f"无法选择最近的 {time_type} {target_time_str}, "
f"所有可选时间与目标时间相差都超过 {max_time_diff} 分钟",
self.TraceLevel.WARNING,
)
self._showTrace(f"当前可供预约的 {time_type} 有: {free_times}")
return -1
def _calcEndTime(
self,
begin_mins: int,
duration: int,
) -> int:
expect_end_mins = int(begin_mins + duration * 60)
if expect_end_mins > self.LIBRARY_CLOSE_MINS:
expect_end_mins = self.LIBRARY_CLOSE_MINS
self._showTrace(
f"预约持续时间 {duration} 小时, 超过最大预约时间 23:30, "
f"自动调整为 23:30",
self.TraceLevel.WARNING,
)
return expect_end_mins
+18
View File
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
from pages.flows.ReserveFlow import ReserveFlow
from pages.flows.CheckinFlow import CheckinFlow
from pages.flows.RenewFlow import RenewFlow
__all__ = [
"ReserveFlow",
"CheckinFlow",
"RenewFlow",
]
+85
View File
@@ -0,0 +1,85 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
from datetime import datetime
def timeStrToMins(
time_str: str,
) -> int:
hour, minute = map(int, time_str.split(":"))
return hour * 60 + minute
def minsToTimeStr(
mins: int,
) -> str:
hour, minute = divmod(int(mins), 60)
return f"{hour:02d}:{minute:02d}"
def findBestTimeOption(
time_options: list,
target_time: int,
max_time_diff: int,
prefer_earlier: bool,
is_reserve: bool = True,
) -> tuple:
"""
Find the best time option from available WebElement options.
Returns:
(bestElement, bestText, actual_diff, freeTimesList)
or (None, None, None, freeTimesList) if no suitable option.
"""
free_times = []
best_time_diff = max_time_diff
best_actual_diff = None
best_time_opt = None
for time_opt in time_options:
if is_reserve:
time_attr = time_opt.get_attribute("time")
if time_attr == "now":
now = datetime.now()
time_val = now.hour * 60 + now.minute
elif time_attr and time_attr.isdigit():
time_val = int(time_attr)
else:
continue
else:
time_attr = time_opt.get_attribute("id")
if not (time_attr and time_attr.isdigit()):
continue
time_val = int(time_attr)
free_times.append(
time_opt.text.strip()
if not is_reserve
else minsToTimeStr(time_val)
)
actual_diff = time_val - target_time
abs_diff = abs(actual_diff)
if abs_diff < best_time_diff or (
abs_diff == best_time_diff
and (
(prefer_earlier and actual_diff <= 0)
or (not prefer_earlier and actual_diff >= 0)
)
):
best_time_diff = abs_diff
best_actual_diff = actual_diff
best_time_opt = time_opt
if best_time_opt is not None:
return (best_time_opt, best_time_opt.text.strip(), best_actual_diff, free_times)
return (None, None, None, free_times)
+101
View File
@@ -0,0 +1,101 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import base64
import queue
import ddddocr
from selenium.common.exceptions import (
NoSuchElementException,
TimeoutException,
)
from base.MsgBase import MsgBase
from pages.LoginPage import LoginPage
class CaptchaHandler(MsgBase):
def __init__(
self,
input_queue: queue.Queue,
output_queue: queue.Queue,
login_page: LoginPage,
) -> None:
super().__init__(input_queue, output_queue)
self._login_page = login_page
self._ocr = ddddocr.DdddOcr()
def solveCaptcha(
self,
auto_captcha: bool = True,
) -> str:
max_attempts = 3
for _ in range(max_attempts):
if auto_captcha:
captcha_text = self._autoRecognize()
else:
self._showTrace("用户未配置自动识别验证码, 请手动输入验证码 !", 20, no_log=True)
captcha_text = self._manualRecognize()
if captcha_text:
return captcha_text
else:
if not self._login_page.refreshCaptcha():
return ""
self._showTrace(
f"验证码识别失败 {max_attempts} 次, 达到最大尝试次数 !",
self.TraceLevel.WARNING,
)
return ""
def _autoRecognize(
self,
) -> str:
try:
img_src = self._login_page.getCaptchaImageSrc()
base64_str = img_src.split(',', 1)[1]
captcha_img = base64.b64decode(base64_str)
captcha_text = self._ocr.classification(captcha_img)
captcha_text = ''.join(filter(str.isalnum, captcha_text)).lower()
self._showTrace(f"识别到验证码为 : '{captcha_text}'", 20, no_log=True)
if len(captcha_text) != 4:
self._showLog("识别到的验证码长度不等于 4 个字符 !", self.TraceLevel.WARNING)
raise Exception("识别到的验证码长度不等于 4 个字符 !")
return captcha_text
except (NoSuchElementException, TimeoutException) as e:
self._showTrace(f"验证码识别失败 ! : {e}", self.TraceLevel.ERROR)
return ""
except (ValueError, OSError) as e:
self._showTrace(f"验证码识别失败 ! : {e}", self.TraceLevel.ERROR)
return ""
except Exception as e:
self._showTrace(f"验证码识别失败 ! : {e}", self.TraceLevel.ERROR)
return ""
def _manualRecognize(
self,
) -> str:
try:
self._showMsg("请输入验证码:")
captcha_text = self._waitMsg(timeout=15)
self._showTrace(f"输入的验证码为 : '{captcha_text}'", 20, no_log=True)
if len(captcha_text) != 4:
self._showLog("输入的验证码长度不等于 4 个字符 !", self.TraceLevel.WARNING)
raise Exception("输入的验证码长度不等于 4 个字符 !")
return captcha_text
except ValueError as e:
self._showTrace(f"输入验证码失败 ! : {e}", self.TraceLevel.ERROR)
return ""
except Exception as e:
self._showTrace(f"输入验证码失败 ! : {e}", self.TraceLevel.ERROR)
return ""
+302
View File
@@ -0,0 +1,302 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import queue
import re
import time
from datetime import datetime, timedelta
from selenium.common.exceptions import (
NoSuchElementException,
StaleElementReferenceException,
TimeoutException,
)
from base.MsgBase import MsgBase
from pages.MainShell import MainShell
from pages.RecordsView import RecordsView
class RecordChecker(MsgBase):
def __init__(
self,
input_queue: queue.Queue,
output_queue: queue.Queue,
shell: MainShell,
) -> None:
super().__init__(input_queue, output_queue)
self._shell = shell
@staticmethod
def _formatDiffTime(
seconds: float,
) -> str:
hours = int(seconds // 3600)
minutes = int(seconds % 3600 // 60)
seconds = int(seconds % 60)
return f"{hours}{minutes}{seconds}"
def canReserve(
self,
date: str,
) -> bool:
if self._getReserveRecord(date, "已预约") is None:
if self._getReserveRecord(date, "使用中") is None:
self._showTrace(f"用户在 {date} 可以预约")
return True
self._showTrace(f"用户在 {date} 有使用中的预约, 无法预约")
return False
self._showTrace(f"用户在 {date} 已存在有效预约, 无法预约")
return False
def canCheckin(
self,
) -> bool:
date = time.strftime("%Y-%m-%d", time.localtime())
record = self._getReserveRecord(date, "已预约")
if record is not None:
begin_time = record["time"]["begin"]
begin_time = datetime.strptime(
f"{date} {begin_time}", "%Y-%m-%d %H:%M"
)
time_diff = datetime.now() - begin_time
time_diff_seconds = time_diff.total_seconds()
if time_diff_seconds < -30 * 60:
self._showTrace(
f"用户在 {date} 的预约开始时间为 {begin_time}, "
f"当前距离预约开始时间还有 "
f"{self._formatDiffTime(abs(time_diff_seconds))}, 无法签到"
)
return False
elif -30 * 60 <= time_diff_seconds < 0:
self._showTrace(
f"用户在 {date} 的预约开始时间为 {begin_time}, "
f"当前距离预约开始时间还有 "
f"{self._formatDiffTime(abs(time_diff_seconds))}, 可以签到"
)
return True
elif 0 <= time_diff_seconds < 30 * 60 - 5:
self._showTrace(
f"用户在 {date} 的预约开始时间为 {begin_time}, "
f"当前距离预约开始时间已经过去 "
f"{self._formatDiffTime(abs(time_diff_seconds))}, 可以签到"
)
return True
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法签到")
return False
def canRenew(
self,
) -> tuple[bool, dict]:
date = time.strftime("%Y-%m-%d", time.localtime())
record = self._getReserveRecord(date, "使用中")
if record is not None:
end_time = record["time"]["end"]
end_time = datetime.strptime(
f"{date} {end_time}", "%Y-%m-%d %H:%M"
)
time_diff = end_time - datetime.now()
time_diff_seconds = time_diff.total_seconds()
trace_msg = (
f"用户在 {date} 的预约结束时间为 {end_time}, "
f"当前距离预约结束时间还有 "
f"{self._formatDiffTime(abs(time_diff_seconds))}"
)
if abs(time_diff_seconds) < 120 * 60:
self._showTrace(f"{trace_msg}, 可以续约")
return True, record
else:
self._showTrace(f"{trace_msg}, 无法续约")
return False, None
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法续约")
return False, None
def postRenewCheck(
self,
record: dict,
) -> bool:
date = record["date"]
act_record = self._getReserveRecord(date, "使用中")
if act_record is not None:
if (
act_record["time"]["begin"] == record["time"]["begin"]
and act_record["time"]["end"] == record["time"]["end"]
):
self._showTrace(
f"\n"
f" 续约成功 !\n"
f" 日 期 {date}\n"
f" 时 间 {act_record['time']['begin']}"
f" - {act_record['time']['end']}\n"
f" 位 置 {act_record['info']['location']}\n"
f" 状 态 {act_record['info']['status']}"
)
return True
else:
self._showTrace(
f"\n"
f" 续约失败 !\n"
f" 续约后结束时间为 {act_record['time']['end']},"
f"与预期结束时间 {record['time']['end']} 不符 !"
)
return False
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法检查续约结果")
return False
def _getReserveRecord(
self,
wanted_date: str,
wanted_status: str,
) -> dict | None:
if wanted_date is None:
self._showTrace("日期未指定, 无法检查当前预约状态", self.TraceLevel.WARNING)
return None
self._showTrace(
f"正在检查用户在 {wanted_date} 是否有预约状态为 "
f"{wanted_status} 的预约记录......", 20, no_log=True
)
checked_count = 0
max_check_times = 6
records_view = self._shell.gotoRecordsView()
for _ in range(max_check_times):
reservations = records_view.loadRecords()
if reservations is None:
return None
for reservation in reservations[checked_count:]:
record = self._decodeReserveRecord(reservation, records_view)
checked_count += 1
if record is None:
continue
if record["date"] == "":
continue
if record["time"] == {"begin": "", "end": ""}:
continue
if (
datetime.strptime(record["date"], "%Y-%m-%d").date()
> datetime.strptime(wanted_date, "%Y-%m-%d").date()
):
continue
if (
datetime.strptime(record["date"], "%Y-%m-%d").date()
< datetime.strptime(wanted_date, "%Y-%m-%d").date()
):
return None
if record["info"]["status"] == wanted_status:
self._showTrace(
f"寻找到用户第 {checked_count} 条状态为 "
f"{wanted_status} 的预约记录, "
f"详细信息: {record['date']} "
f"{record['time']['begin']} - "
f"{record['time']['end']} "
f"{record['info']['location']}",
20, no_log=True,
)
return record
if not records_view.showMoreRecords():
break
return None
def _decodeReserveRecord(
self,
reservation,
records_view: RecordsView,
) -> dict:
try:
time_element = records_view.getRecordTimeElement(reservation)
info_elements = records_view.getRecordInfoElements(reservation)
except (NoSuchElementException, TimeoutException, StaleElementReferenceException):
return {
"date": "",
"time": {"begin": "", "end": ""},
"info": {"location": "", "status": ""},
}
except Exception:
return {
"date": "",
"time": {"begin": "", "end": ""},
"info": {"location": "", "status": ""},
}
time_data = self._decodeReserveTime(time_element)
info_data = self._decodeReserveInfo(info_elements)
return {
"date": time_data["date"],
"time": time_data["time"],
"info": info_data,
}
def _decodeReserveTime(
self,
time_element,
) -> dict:
time_str = time_element.text.strip()
today = datetime.now().date()
if "明天" in time_str:
target_date = today + timedelta(days=1)
date = target_date.strftime("%Y-%m-%d")
elif "今天" in time_str:
target_date = today
date = target_date.strftime("%Y-%m-%d")
elif "昨天" in time_str:
target_date = today - timedelta(days=1)
date = target_date.strftime("%Y-%m-%d")
else:
date_match = re.search(r"(\d{4}-\d{1,2}-\d{1,2})", time_str)
if date_match:
date = date_match.group(1)
else:
date = ""
time_match = re.search(
r"(\d{1,2}:\d{2}) -- (\d{1,2}:\d{2})", time_str
)
if time_match:
begin_time = time_match.group(1)
end_time = time_match.group(2)
else:
begin_time = ""
end_time = ""
return {
"date": date,
"time": {"begin": begin_time, "end": end_time},
}
def _decodeReserveInfo(
self,
info_elements,
) -> dict:
location = ""
status = ""
for info in info_elements:
if "已预约" in info.text:
status = "已预约"
elif "使用中" in info.text:
status = "使用中"
elif "已完成" in info.text:
status = "已完成"
elif "已结束使用" in info.text:
status = "已结束使用"
elif "已取消" in info.text:
status = "已取消"
elif "失约" in info.text:
status = "失约"
elif "图书馆" in info.text:
location = info.text.strip()
return {"location": location, "status": status}
+221
View File
@@ -0,0 +1,221 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import queue
import time
from base.MsgBase import MsgBase
from pages.ReserveView import ReserveView
from pages.flows._helpers import timeStrToMins, minsToTimeStr
class ReserveValidator(MsgBase):
def __init__(
self,
input_queue: queue.Queue,
output_queue: queue.Queue,
) -> None:
super().__init__(input_queue, output_queue)
def validate(
self,
reserve_info: dict,
) -> bool:
if not self._containRequiredInfo(reserve_info):
return False
if not self._isValidDate(reserve_info):
return False
if not self._isValidBeginTime(reserve_info):
return False
if not self._isValidExpectDuration(reserve_info):
return False
if not self._isValidEndTime(reserve_info):
return False
if not self._finalCheck(reserve_info):
return False
self._showTrace(
f"预约信息检查完成, 准备预约 "
f"{reserve_info['date']} "
f"{reserve_info['begin_time']['time']} - "
f"{reserve_info['end_time']['time']} "
f"图书馆 "
f"{ReserveView.FLOOR_MAP[reserve_info['floor']]} "
f"{ReserveView.ROOM_MAP[reserve_info['room']]} "
f"的座位 {reserve_info['seat_id']}"
)
return True
def _containRequiredInfo(
self,
reserve_info: dict,
) -> bool:
floor_map = ReserveView.FLOOR_MAP
room_map = ReserveView.ROOM_MAP
try:
if reserve_info.get("floor") is None:
raise ValueError("未指定楼层")
if reserve_info["floor"] not in floor_map:
raise ValueError(f"该楼层 '{reserve_info['floor']}' 不存在")
if reserve_info.get("room") is None:
raise ValueError("未指定房间")
if reserve_info["room"] not in room_map:
raise ValueError(f"该房间 '{reserve_info['room']}' 不存在")
if reserve_info.get("seat_id") is None:
raise ValueError("未指定座位")
if reserve_info["seat_id"] == "":
raise ValueError("未指定座位号")
return True
except ValueError as e:
msg = (
f"预约信息错误 ! : {e}, "
f"由于缺少必要的预约信息, 无法开始预约流程"
)
self._showTrace(msg, self.TraceLevel.ERROR)
self._showTrace(
f"预约信息错误 ! : {e}, "
f"由于缺少必要的预约信息, 无法开始预约流程, 请检查预约信息是否完整",
20,
no_log=True,
)
return False
def _isValidDate(
self,
reserve_info: dict,
) -> bool:
cur_date_str = time.strftime("%Y-%m-%d", time.localtime())
cur_timestamp = time.mktime(time.strptime(cur_date_str, "%Y-%m-%d"))
if reserve_info.get("date") is None:
reserve_info["date"] = cur_date_str
self._showTrace(f"预约日期未指定, 自动设置为当前日期: {cur_date_str}")
else:
res_timestamp = time.mktime(time.strptime(reserve_info["date"], "%Y-%m-%d"))
if res_timestamp < cur_timestamp:
self._showTrace(
f"预约日期错误 ! :"
f"{reserve_info['date']} 早于当前日期 {cur_date_str}, 自动设置为当前日期",
self.TraceLevel.WARNING,
)
reserve_info["date"] = cur_date_str
return True
def _isValidBeginTime(
self,
reserve_info: dict,
) -> bool:
cur_time = time.strftime("%H:%M", time.localtime())
if reserve_info.get("begin_time") is None:
reserve_info["begin_time"] = {}
if "time" not in reserve_info["begin_time"]:
reserve_info["begin_time"]["time"] = cur_time
self._showTrace(f"开始时间未指定, 自动设置为当前时间: {cur_time}")
if "max_diff" not in reserve_info["begin_time"]:
reserve_info["begin_time"]["max_diff"] = 30
self._showTrace("开始时间最大时间差未指定, 自动设置为 30 分钟")
if "prefer_early" not in reserve_info["begin_time"]:
reserve_info["begin_time"]["prefer_early"] = True
self._showTrace("是否优先选择更早开始时间未指定, 自动设置为 True")
return True
def _isValidExpectDuration(
self,
reserve_info: dict,
) -> bool:
if reserve_info.get("satisfy_duration") is None:
reserve_info["satisfy_duration"] = True
self._showTrace("预约满足时长要求未指定, 默认满足")
if reserve_info["satisfy_duration"]:
if reserve_info.get("expect_duration") is None:
reserve_info["expect_duration"] = 4
self._showTrace("需要满足预约持续时间, 但未指定, 使用默认时长为 4 小时")
return True
def _isValidEndTime(
self,
reserve_info: dict,
) -> bool:
if reserve_info.get("end_time") is None:
reserve_info["end_time"] = {}
if "time" not in reserve_info["end_time"]:
end_mins = timeStrToMins(reserve_info["begin_time"]["time"])
end_mins = end_mins + int(reserve_info["expect_duration"] * 60)
reserve_info["end_time"] = {
"time": minsToTimeStr(end_mins),
"max_diff": 30,
"prefer_early": False,
}
self._showTrace(
f"结束时间未指定, 自动设置为开始时间加上期望时长: "
f"{reserve_info['end_time']['time']}"
)
if "max_diff" not in reserve_info["end_time"]:
reserve_info["end_time"]["max_diff"] = 30
self._showTrace("结束时间最大时间差未指定, 自动设置为 30 分钟")
if "prefer_early" not in reserve_info["end_time"]:
reserve_info["end_time"]["prefer_early"] = False
self._showTrace("是否优先选择较晚结束时间未指定, 自动设置为 True")
return True
def _finalCheck(
self,
reserve_info: dict,
) -> bool:
begin_time = reserve_info["begin_time"]
end_time = reserve_info["end_time"]
begin_mins = timeStrToMins(begin_time["time"])
end_mins = timeStrToMins(end_time["time"])
if end_mins < begin_mins and reserve_info["satisfy_duration"] is False:
self._showTrace(
f"结束时间 {end_time['time']} 早于开始时间 {begin_time['time']}, "
f"尝试交换时间",
self.TraceLevel.WARNING,
)
reserve_info["end_time"], reserve_info["begin_time"] = begin_time, end_time
begin_time, end_time = end_time, begin_time
begin_mins = timeStrToMins(begin_time["time"])
end_mins = timeStrToMins(end_time["time"])
max_end_mins = timeStrToMins("23:30")
if end_mins > max_end_mins:
self._showTrace(
f"结束时间 {end_time['time']} 晚于 23:30, 自动设置为 23:30",
self.TraceLevel.WARNING,
)
reserve_info["end_time"]["time"] = "23:30"
end_mins = max_end_mins
if reserve_info["satisfy_duration"]:
if reserve_info["expect_duration"] > 8:
self._showTrace(
f"该用户设置了优先满足时长要求, 但是预约期望持续时间 "
f"{reserve_info['expect_duration']} 小时 "
f"超出最大时长 8 小时, 自动设置为 8 小时",
self.TraceLevel.WARNING,
)
reserve_info["expect_duration"] = 8
else:
if end_mins - begin_mins > 8 * 60:
self._showTrace(
f"该用户未设置优先满足时长要求, 但是检查到预约持续时间 "
f"{float((end_mins - begin_mins) / 60)} 小时 "
f"超出最大时长 8 小时, 自动设置为 8 小时",
self.TraceLevel.WARNING,
)
reserve_info["end_time"]["time"] = minsToTimeStr(begin_mins + 8 * 60)
return True
+18
View File
@@ -0,0 +1,18 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
from pages.services.CaptchaHandler import CaptchaHandler
from pages.services.ReserveValidator import ReserveValidator
from pages.services.RecordChecker import RecordChecker
__all__ = [
"CaptchaHandler",
"ReserveValidator",
"RecordChecker",
]
+162
View File
@@ -0,0 +1,162 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
AutoLibrary 真实运行测试脚本。
在 venv 中运行:
py -3 test_pages_refactor.py [--mode MODE]
MODE 可选值 (默认 1):
1 = 只预约
2 = 只签到
3 = 预约 + 签到
4 = 只续约
7 = 全部 (预约 + 签到 + 续约)
"""
import os
import sys
import argparse
SRC = os.path.dirname(os.path.abspath(__file__))
if SRC not in sys.path:
sys.path.insert(0, SRC)
def getAppConfigDir() -> str:
appData = os.environ.get("APPDATA", "")
if not appData:
appData = os.path.join(os.path.expanduser("~"), "AppData", "Roaming")
return os.path.join(appData, "AutoLibrary", "configs")
def main():
parser = argparse.ArgumentParser(description="AutoLibrary 真实运行测试")
parser.add_argument(
"--mode", type=int, default=1,
help="运行模式 bitmask: 1=预约 2=签到 4=续约 (默认 1)"
)
parser.add_argument(
"--group", type=int, default=0,
help="只运行第 N 个启用的任务组 (0=全部, 默认 0)"
)
parser.add_argument(
"--headless", action="store_true",
help="使用 headless 模式运行浏览器"
)
args = parser.parse_args()
# ---- 1. 初始化 ConfigManager ----
from managers.config.ConfigManager import instance as configInstance
from managers.config.ConfigUtils import ConfigUtils
from utils.JSONReader import JSONReader
configDir = getAppConfigDir()
if not os.path.isdir(configDir):
print(f"[FAIL] 配置目录不存在: {configDir}")
print("请先启动一次 AutoLibrary GUI 以生成配置文件。")
return 1
try:
configInstance(configDir)
except ValueError:
pass
configPaths = ConfigUtils.getAutomationConfigPaths()
runPath = configPaths.get("run")
userPath = configPaths.get("user")
if not runPath or not os.path.isfile(runPath):
print(f"[FAIL] run.json 不存在: {runPath}")
return 1
if not userPath or not os.path.isfile(userPath):
print(f"[FAIL] user.json 不存在: {userPath}")
return 1
print(f"[INFO] run : {runPath}")
print(f"[INFO] user : {userPath}")
# ---- 2. 加载配置 ----
runConfig = JSONReader(runPath).data()
userConfig = JSONReader(userPath).data()
if args.mode is not None:
runConfig["mode"]["run_mode"] = args.mode
if args.headless:
runConfig["web_driver"]["headless"] = True
groups = userConfig.get("groups", [])
if not groups:
print("[FAIL] user.json 中没有任务组")
return 1
print(f"[INFO] 运行模式: {runConfig['mode']['run_mode']}")
if args.headless:
print("[INFO] Headless 模式已启用")
# ---- 3. 创建 AutoLib 并运行 ----
from pages.AutoLibPages import AutoLibPages
import queue
import threading
for gi, group in enumerate(groups):
if args.group > 0 and gi + 1 != args.group:
continue
if not group.get("enabled", True):
print(f"[SKIP] 任务组 {gi + 1} '{group.get('name', '未命名')}' 已禁用")
continue
users = group.get("users", [])
enabledUsers = [u for u in users if u.get("enabled", True)]
if not enabledUsers:
print(f"[SKIP] 任务组 {gi + 1} 没有启用的用户")
continue
print(f"\n{'=' * 60}")
print(f"任务组 {gi + 1}/{len(groups)}: '{group.get('name', '未命名')}'")
print(f"启用的用户: {len(enabledUsers)}/{len(users)}")
print(f"{'=' * 60}")
outputQueue = queue.Queue()
stopConsumer = threading.Event()
traceLines = []
def consumeTrace():
while not stopConsumer.is_set():
try:
msg = outputQueue.get(timeout=0.3)
traceLines.append(msg)
print(msg)
except queue.Empty:
continue
consumer = threading.Thread(target=consumeTrace, daemon=True)
consumer.start()
try:
autoLib = AutoLibPages(
input_queue=queue.Queue(),
output_queue=outputQueue,
run_config=runConfig,
)
autoLib.run({"users": enabledUsers})
autoLib.close()
except Exception as e:
print(f"[FAIL] 运行异常: {e}")
import traceback
traceback.print_exc()
return 1
finally:
stopConsumer.set()
consumer.join(timeout=2)
print("\n[OK] 测试完成")
return 0
if __name__ == "__main__":
sys.exit(main())