diff --git a/src/pages/AutoLibPages.py b/src/pages/AutoLibPages.py new file mode 100644 index 0000000..a200a10 --- /dev/null +++ b/src/pages/AutoLibPages.py @@ -0,0 +1,398 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2025 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +import os +import queue + +from selenium import webdriver +from selenium.common.exceptions import ( + TimeoutException, + WebDriverException, +) +from selenium.webdriver.edge.service import Service as EdgeService +from selenium.webdriver.chrome.service import Service as ChromeService +from selenium.webdriver.firefox.service import Service as FirefoxService + +from base.MsgBase import MsgBase +from pages.LoginPage import LoginPage +from pages.MainShell import MainShell +from pages.flows.ReserveFlow import ReserveFlow, ReserveContext +from pages.flows.CheckinFlow import CheckinFlow +from pages.flows.RenewFlow import RenewFlow +from pages.services.CaptchaHandler import CaptchaHandler +from pages.services.ReserveValidator import ReserveValidator +from pages.services.RecordChecker import RecordChecker + + +class AutoLibPages(MsgBase): + + def __init__( + self, + input_queue: queue.Queue, + output_queue: queue.Queue, + run_config: dict, + ) -> None: + super().__init__(input_queue, output_queue) + + self.__run_config: dict = run_config + self.__user_config: dict | None = None + self.__driver = None + self.__driver_type: str = "" + self.__driver_path: str = "" + self.__login_page: LoginPage = None + self.__shell: MainShell = None + self.__captcha_handler: CaptchaHandler = None + self.__record_checker: RecordChecker = None + self.__reserve_validator: ReserveValidator = None + self.__reserve_flow: ReserveFlow = None + self.__checkin_flow: CheckinFlow = None + self.__renew_flow: RenewFlow = None + + if not self.__initBrowserDriver(): + raise Exception("浏览器驱动初始化失败 !") + else: + if not self.__initDriverUrl(): + self.close() + raise Exception("浏览器驱动URL初始化失败 !") + self.__initPagesServices() + self.__initPagesFlows() + + def __initBrowserDriver( + self, + ) -> bool: + + self._showTrace("正在初始化浏览器驱动......", no_log=True) + web_driver_config: dict = self.__run_config.get("web_driver", None) + self.__driver_type = web_driver_config.get("driver_type") + match self.__driver_type.lower(): + case "edge": + driver_options = webdriver.EdgeOptions() + case "chrome": + driver_options = webdriver.ChromeOptions() + case "firefox": + driver_options = webdriver.FirefoxOptions() + case _: + self._showTrace( + f"不支持的浏览器驱动类型: {self.__driver_type} !", + self.TraceLevel.WARNING, + ) + return False + if not web_driver_config: + self._showTrace("未配置浏览器驱动参数 !", self.TraceLevel.ERROR) + return False + if web_driver_config.get("headless"): + driver_options.add_argument("--headless") + driver_options.add_argument("--disable-gpu") + driver_options.add_argument("--no-sandbox") + driver_options.add_argument("--disable-dev-shm-usage") + + # must be 1920x1080, otherwise the page will cause some elements not accessible + driver_options.add_argument("--window-size=1920,1080") + + # omit ssl errors and verbose log level + driver_options.add_argument("--ignore-certificate-errors") + driver_options.add_argument("--ignore-ssl-errors") + driver_options.add_argument("--log-level=OFF") + driver_options.add_argument("--silent") + + # set options for chrome and edge + if self.__driver_type.lower() in ["edge", "chrome"]: + driver_options.add_argument("--remote-allow-origins=*") + driver_options.add_experimental_option("excludeSwitches", ["enable-automation"]) + driver_options.add_experimental_option("useAutomationExtension", False) + driver_options.add_argument("--disable-blink-features=AutomationControlled") + user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) "\ + "AppleWebKit/537.36 (KHTML, like Gecko) "\ + "Chrome/120.0.0.0 "\ + "Safari/537.36" + if self.__driver_type.lower() == "edge": + user_agent += " Edg/120.0.0.0" + + # set options for firefox + elif self.__driver_type.lower() == "firefox": + driver_options.set_preference("dom.webdriver.enabled", False) + driver_options.set_preference("useAutomationExtension", False) + user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) "\ + "Gecko/20100101 Firefox/120.0" + driver_options.add_argument(f"user-agent={user_agent}") + + # init browser driver + self.__driver_path = web_driver_config.get("driver_path") + if not self.__driver_path: + self._showTrace("未配置浏览器驱动路径 !", self.TraceLevel.WARNING) + return False + self.__driver_path = os.path.abspath(self.__driver_path) + try: + service = None + match self.__driver_type.lower(): + case "edge": + service = EdgeService(executable_path=self.__driver_path) + self.__driver = webdriver.Edge(service=service, options=driver_options) + case "chrome": + service = ChromeService(executable_path=self.__driver_path) + self.__driver = webdriver.Chrome(service=service, options=driver_options) + case "firefox": + self._showTrace("Firefox 浏览器驱动初始化略慢, 请耐心等待...", no_log=True) + service = FirefoxService(executable_path=self.__driver_path) + self.__driver = webdriver.Firefox(service=service, options=driver_options) + case _: + raise Exception(f"不支持的浏览器驱动类型: {self.__driver_type} !") + self.__driver.implicitly_wait(1) + self.__driver.execute_script( + "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" + ) + except WebDriverException as e: + self._showTrace(f"浏览器驱动初始化失败: {e}", self.TraceLevel.ERROR) + return False + except Exception as e: + self._showTrace(f"浏览器驱动初始化失败: {e}", self.TraceLevel.ERROR) + return False + self._showTrace(f"浏览器驱动已初始化, 类型: {self.__driver_type}, 路径: {self.__driver_path}") + return True + + def __initDriverUrl( + self, + ) -> bool: + + lib_config: dict = self.__run_config.get("library", None) + if not lib_config: + self._showTrace("未配置图书馆参数 !", self.TraceLevel.ERROR) + return False + url: str = lib_config.get("host_url") + lib_config.get("login_url") + self.__login_page = LoginPage(self.__driver) + self.__driver.set_page_load_timeout(5) + try: + self.__driver.get(url) + except TimeoutException: + self.__login_page.stopPageLoad() + self._showTrace( + "图书馆登录页面加载超时 ! 请检查网络环境是否正常", self.TraceLevel.ERROR + ) + return False + except WebDriverException as e: + self._showTrace(f"图书馆页面加载失败: {e}", self.TraceLevel.ERROR) + return False + if not self.__login_page.waitUntilLoaded(): + return False + return True + + def __initPagesServices( + self, + ) -> None: + + if not self.__driver: + self._showTrace("浏览器驱动未初始化, 请先初始化浏览器驱动 !", self.TraceLevel.WARNING) + return + self.__shell = MainShell(self.__driver) + self.__captcha_handler = CaptchaHandler( + input_queue=self._input_queue, + output_queue=self._output_queue, + login_page=self.__login_page, + ) + self.__record_checker = RecordChecker( + input_queue=self._input_queue, + output_queue=self._output_queue, + shell=self.__shell, + ) + self.__reserve_validator = ReserveValidator( + input_queue=self._input_queue, + output_queue=self._output_queue, + ) + + def __initPagesFlows( + self, + ) -> None: + + self.__reserve_flow = ReserveFlow( + input_queue=self._input_queue, + output_queue=self._output_queue, + driver=self.__driver, + shell=self.__shell, + ) + self.__checkin_flow = CheckinFlow( + input_queue=self._input_queue, + output_queue=self._output_queue, + driver=self.__driver, + shell=self.__shell, + ) + self.__renew_flow = RenewFlow( + input_queue=self._input_queue, + output_queue=self._output_queue, + driver=self.__driver, + shell=self.__shell, + ) + + def __run( + self, + username: str, + password: str, + login_config: dict, + run_mode_config: dict, + reserve_info: dict, + ) -> int: + + # result : -1 - terminate, 0 - success, 1 - failed, 2 - passed + result: int = 2 + + # login + auto_captcha: bool = login_config.get("auto_captcha", True) + if not self.__login_page.login( + username, + password, + captcha_solver=lambda: self.__captcha_handler.solveCaptcha(auto_captcha), + tracer=self._showTrace, + log_level=self.TraceLevel, + max_attempts=login_config.get("max_attempt", 3), + ): + return 1 + run_mode_raw: int = run_mode_config.get("run_mode", 0) + run_mode: dict[str, bool] = { + "auto_reserve": run_mode_raw & 0x1, + "auto_checkin": run_mode_raw & 0x2, + "auto_renewal": run_mode_raw & 0x4, + } + # reserve + if run_mode["auto_reserve"]: + if self.__record_checker.canReserve(reserve_info.get("date")): + if self.__reserve_validator.validate(reserve_info): + ctx = ReserveContext( + username=username, + date=reserve_info["date"], + floor=reserve_info["floor"], + room=reserve_info["room"], + seat_id=reserve_info["seat_id"], + begin_time=reserve_info["begin_time"]["time"], + end_time=reserve_info["end_time"]["time"], + begin_max_diff=reserve_info["begin_time"]["max_diff"], + end_max_diff=reserve_info["end_time"]["max_diff"], + begin_prefer_early=reserve_info["begin_time"]["prefer_early"], + end_prefer_early=reserve_info["end_time"]["prefer_early"], + expect_duration=reserve_info["expect_duration"], + satisfy_duration=reserve_info["satisfy_duration"], + ) + if self.__reserve_flow.execute(ctx): + result = 0 + else: + result = 1 + else: + result = 1 + else: + self._showTrace(f"用户 {username} 无法预约, 已跳过") + result = 2 + + # checkin + last_result: int = result + if run_mode["auto_checkin"] and last_result != 1: + if self.__record_checker.canCheckin(): + if self.__checkin_flow.execute(username): + result = 0 + else: + result = 1 + else: + self._showTrace(f"用户 {username} 无法签到, 已跳过") + result = 2 + if last_result == 0: # partly success + result = 0 + + # renewal + last_result = result + if run_mode["auto_renewal"] and last_result != 1: + can_renew, record = self.__record_checker.canRenew() + if can_renew: + renew_info: dict = reserve_info.get("renew_time", {}) + if self.__renew_flow.execute(username, record, renew_info): + if self.__record_checker.postRenewCheck(record): + self._showTrace(f"用户 {username} 续约成功 !") + result = 0 + else: + if result != 1: # partly success + result = 0 + else: + result = 1 + else: + result = 1 + else: + self._showTrace(f"用户 {username} 无法续约, 已跳过") + result = 2 + if last_result == 0: # partly success + result = 0 + + # logout + if not self.__shell.logout(): + if not self.__initDriverUrl(): + return -1 + return result + + def run( + self, + user_config: dict, + ) -> None: + + self.__user_config = user_config + + user_counter: dict[str, int] = {"current": 0, "success": 0, "failed": 0, "passed": 0} + users: list = self.__user_config["users"] + self._showTrace(f"共发现 {len(users)} 个用户") + for user in users: + user_counter["current"] += 1 + self._showTrace( + f"正在处理第 {user_counter['current']}/{len(users)} 个用户: {user['username']}......", + no_log=True, + ) + if not user["enabled"]: + self._showTrace(f"用户 {user['username']} 已跳过") + user_counter["passed"] += 1 + continue + r: int = self.__run( + username=user["username"], + password=user["password"], + login_config=self.__run_config["login"], + run_mode_config=self.__run_config["mode"], + reserve_info=user["reserve_info"], + ) + if r == -1: + self._showTrace( + f"用户 {user['username']} 处理过程中页面发生异常, 无法继续操作, 任务已终止 !", + self.TraceLevel.WARNING, + ) + break + elif r == 0: + user_counter["success"] += 1 + elif r == 1: + user_counter["failed"] += 1 + elif r == 2: + user_counter["passed"] += 1 + self._showTrace( + f"处理完成, 共计 {user_counter['current']} 个用户, " + f"成功 {user_counter['success']} 个用户, " + f"失败 {user_counter['failed']} 个用户, " + f"跳过 {user_counter['passed']} 个用户" + ) + return + + def close( + self, + ) -> bool: + + if self.__driver: + if self.__driver_type.lower() == "firefox": + self._showTrace( + "Firefox 浏览器驱动关闭略慢, 请耐心等待...", + no_log=True, + ) + try: + self.__driver.quit() + except WebDriverException as e: + self._showTrace(f"浏览器驱动关闭时发生异常: {e}", self.TraceLevel.WARNING) + self.__driver = None + self._showTrace("浏览器驱动已关闭") + return True + else: + self._showTrace("浏览器驱动未初始化, 无需关闭", no_log=True) + return False diff --git a/src/pages/LoginPage.py b/src/pages/LoginPage.py new file mode 100644 index 0000000..24c9b1b --- /dev/null +++ b/src/pages/LoginPage.py @@ -0,0 +1,209 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +from typing import Callable + +from selenium.common.exceptions import ( + ElementNotInteractableException, + NoSuchElementException, + TimeoutException, +) +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.webdriver import WebDriver +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC + + +class LoginPage: + + USERNAME_INPUT = (By.NAME, "username") + PASSWORD_INPUT = (By.NAME, "password") + CAPTCHA_INPUT = (By.NAME, "answer") + CAPTCHA_IMG = (By.ID, "loadImgId") + LOGIN_BUTTON = (By.XPATH, "//input[@type='button' and @value='登录']") + + SUCCESS_INDICATOR_SEARCH = (By.ID, "search") + SUCCESS_INDICATOR_CONTENT = (By.CLASS_NAME, "selectContent") + SUCCESS_TITLE_KEYWORD = "自选座位 :: 座位预约系统" + + PAGE_LOAD_TIMEOUT = 5 + + def __init__( + self, + driver: WebDriver, + ) -> None: + + self._driver: WebDriver = driver + + def navigate( + self, + url: str, + ) -> bool: + + self._driver.set_page_load_timeout(self.PAGE_LOAD_TIMEOUT) + self._driver.get(url) + if not self.waitUntilLoaded(): + return False + return True + + def waitUntilLoaded( + self, + ) -> bool: + + try: + WebDriverWait(self._driver, 2).until( + EC.title_contains("首页") + ) + WebDriverWait(self._driver, 2).until( + EC.presence_of_element_located(self.USERNAME_INPUT) + ) + WebDriverWait(self._driver, 2).until( + EC.presence_of_element_located(self.PASSWORD_INPUT) + ) + WebDriverWait(self._driver, 2).until( + EC.presence_of_element_located(self.CAPTCHA_INPUT) + ) + WebDriverWait(self._driver, 2).until( + EC.presence_of_element_located(self.CAPTCHA_IMG) + ) + return True + except (NoSuchElementException, TimeoutException): + return False + except Exception: + return False + + def fillCredentials( + self, + username: str, + password: str, + ) -> bool: + + try: + el = self._driver.find_element(*self.USERNAME_INPUT) + el.clear() + el.send_keys(username) + el = self._driver.find_element(*self.PASSWORD_INPUT) + el.clear() + el.send_keys(password) + return True + except (NoSuchElementException, TimeoutException): + return False + except Exception: + return False + + def getCaptchaImageSrc( + self, + ) -> str: + + captcha_el = self._driver.find_element(*self.CAPTCHA_IMG) + return captcha_el.get_attribute("src") + + def refreshCaptcha( + self, + ) -> bool: + + try: + self._driver.find_element(*self.CAPTCHA_IMG).click() + return True + except (NoSuchElementException, TimeoutException, + ElementNotInteractableException): + return False + except Exception: + return False + + def fillCaptcha( + self, + captcha_text: str, + ) -> bool: + + try: + el = self._driver.find_element(*self.CAPTCHA_INPUT) + el.clear() + el.send_keys(captcha_text) + return True + except (NoSuchElementException, TimeoutException): + return False + except Exception: + return False + + def clickLogin( + self, + ) -> bool: + + try: + self._driver.find_element(*self.LOGIN_BUTTON).click() + return True + except (NoSuchElementException, TimeoutException, + ElementNotInteractableException): + return False + except Exception: + return False + + def waitLoginSuccess( + self, + ) -> bool: + + try: + WebDriverWait(self._driver, 2).until( + EC.title_contains(self.SUCCESS_TITLE_KEYWORD) + ) + WebDriverWait(self._driver, 2).until( + EC.presence_of_element_located(self.SUCCESS_INDICATOR_SEARCH) + ) + WebDriverWait(self._driver, 2).until( + EC.presence_of_element_located(self.SUCCESS_INDICATOR_CONTENT) + ) + return True + except (NoSuchElementException, TimeoutException): + return False + except Exception: + return False + + def stopPageLoad( + self, + ) -> None: + + self._driver.execute_script("window.stop();") + + def login( + self, + username: str, + password: str, + captcha_solver: Callable[[], str], + tracer: Callable[..., None], + log_level: type, + max_attempts: int = 5, + ) -> bool: + + ERR = log_level.ERROR + for attempt in range(max_attempts): + tracer( + f"用户 {username} 第 {attempt + 1} 次尝试登录......", + 20, no_log=True, + ) + if not self.fillCredentials(username, password): + continue + captcha_text = captcha_solver() + if not captcha_text: + continue + if not self.fillCaptcha(captcha_text): + continue + tracer("尝试登录...", 20, no_log=True) + if not self.clickLogin(): + continue + if self.waitLoginSuccess(): + tracer(f"用户 {username} 第 {attempt + 1} 次登录成功 !") + return True + else: + err_msg = ( + "登录页面加载失败 ! : " + "用户账号或者密码错误/验证码错误, 具体以页面提示为准" + ) + tracer(err_msg, ERR) + return False diff --git a/src/pages/MainShell.py b/src/pages/MainShell.py new file mode 100644 index 0000000..a9b6247 --- /dev/null +++ b/src/pages/MainShell.py @@ -0,0 +1,166 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +import time + +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.remote.webdriver import WebDriver +from selenium.common.exceptions import ( + NoSuchElementException, + TimeoutException, +) + +from pages.ReserveView import ReserveView +from pages.RecordsView import RecordsView + + +class MainShell: + + TAB_RESERVE = (By.XPATH, "//a[@href='/map']") + TAB_HISTORY = (By.XPATH, "//a[@href='/history?type=SEAT']") + TAB_LOGOUT = (By.XPATH, "//a[@href='/logout']") + + BTN_CHECKIN = (By.ID, "btnCheckIn") + BTN_EXTEND = (By.ID, "btnExtend") + + def __init__( + self, + driver: WebDriver, + ) -> None: + + self._driver = driver + + def gotoReserveView( + self, + ) -> ReserveView: + + self._clickTab(self.TAB_RESERVE) + WebDriverWait(self._driver, 2).until( + EC.presence_of_element_located((By.ID, "seatLayout")) + ) + return ReserveView(self._driver) + + def gotoRecordsView( + self, + ) -> RecordsView: + + self._clickTab(self.TAB_HISTORY) + WebDriverWait(self._driver, 2).until( + EC.presence_of_element_located((By.CLASS_NAME, "myReserveList")) + ) + return RecordsView(self._driver) + + def logout( + self, + ) -> bool: + + try: + self._driver.find_element(*self.TAB_LOGOUT).click() + return True + except NoSuchElementException: + return False + except Exception: + return False + + def waitCheckinButton( + self, + ) -> bool: + + try: + WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable(self.BTN_CHECKIN) + ) + return True + except TimeoutException: + return False + except Exception: + return False + + def waitExtendButton( + self, + ) -> bool: + + try: + WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable(self.BTN_EXTEND) + ) + return True + except TimeoutException: + return False + except Exception: + return False + + def isCheckinButtonDisabled( + self, + ) -> bool: + + btn = self._driver.find_element(*self.BTN_CHECKIN) + return "disabled" in btn.get_attribute("class") + + def isExtendButtonDisabled( + self, + ) -> bool: + + btn = self._driver.find_element(*self.BTN_EXTEND) + return "disabled" in btn.get_attribute("class") + + def clickCheckinButton( + self, + ) -> None: + + btn = WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable(self.BTN_CHECKIN) + ) + btn.click() + + def clickExtendButton( + self, + ) -> None: + + btn = WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable(self.BTN_EXTEND) + ) + btn.click() + + def enableCheckinButtonByJS( + self, + ) -> bool: + + script = """ + try { + var checkin_btn = document.getElementById('btnCheckIn'); + if (checkin_btn) { + checkin_btn.classList.remove('disabled'); + return true; + } + return false; + } catch (e) { + return false; + } + """ + result = self._driver.execute_script(script) + time.sleep(0.1) + return result + + def refresh( + self, + ) -> None: + + self._driver.refresh() + + def _clickTab( + self, + locator: tuple, + ) -> None: + + WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable(locator) + ).click() diff --git a/src/pages/RecordsView.py b/src/pages/RecordsView.py new file mode 100644 index 0000000..d4f838c --- /dev/null +++ b/src/pages/RecordsView.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.remote.webdriver import WebDriver +from selenium.webdriver.remote.webelement import WebElement +from selenium.common.exceptions import ( + NoSuchElementException, + StaleElementReferenceException, + TimeoutException, +) + + +class RecordsView: + + RECORDS_LIST = (By.CSS_SELECTOR, ".myReserveList > dl:not(#moreBlock)") + MORE_BTN = (By.ID, "more_btn") + RECORD_TIME = (By.CSS_SELECTOR, "dt") + RECORD_INFO = (By.CSS_SELECTOR, "a") + + def __init__( + self, + driver: WebDriver, + ) -> None: + + self._driver = driver + + def loadRecords( + self, + ) -> list | None: + + try: + WebDriverWait(self._driver, 2).until( + EC.presence_of_element_located(self.RECORDS_LIST) + ) + return self._driver.find_elements(*self.RECORDS_LIST) + except TimeoutException: + return None + except Exception: + return None + + def getRecordTimeElement( + self, + record: WebElement, + ) -> WebElement: + + return record.find_element(*self.RECORD_TIME) + + def getRecordInfoElements( + self, + record: WebElement, + ) -> list[WebElement]: + + return record.find_elements(*self.RECORD_INFO) + + def showMoreRecords( + self, + ) -> bool: + + try: + WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable(self.MORE_BTN) + ) + except TimeoutException: + return False + except Exception: + return False + try: + more_btn = self._driver.find_element(*self.MORE_BTN) + if more_btn.is_displayed() and more_btn.is_enabled(): + self._driver.execute_script("arguments[0].scrollIntoView(true);", more_btn) + self._driver.execute_script("arguments[0].click();", more_btn) + return True + return False + except (NoSuchElementException, StaleElementReferenceException): + return False + except Exception: + return False + + def getRecordText( + self, + record: WebElement, + ) -> str: + + return record.text.strip() diff --git a/src/pages/ReserveView.py b/src/pages/ReserveView.py new file mode 100644 index 0000000..13c4cb7 --- /dev/null +++ b/src/pages/ReserveView.py @@ -0,0 +1,266 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +import time + +from selenium.webdriver.common.by import By +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.remote.webdriver import WebDriver +from selenium.common.exceptions import ( + ElementNotInteractableException, + NoSuchElementException, + StaleElementReferenceException, + TimeoutException, +) + +from pages._dialogs import SeatMapOverlay, ReserveResultDialog + + +class ReserveView: + + DATE_SELECT = (By.ID, "onDate_select") + DATE_OPTION_FMT = "p#options_onDate a[value='{value}']" + DATE_XPATH_FMT = "//p[@id='options_onDate']/a[@value='{value}']" + + PLACE_SELECT = (By.ID, "display_building") + PLACE_OPTION_FMT = "p#options_building a[value='{value}']" + PLACE_XPATH_FMT = "//p[@id='options_building']/a[@value='{value}']" + + FLOOR_SELECT = (By.ID, "floor_select") + FLOOR_OPTION_FMT = "p#options_floor a[value='{value}']" + FLOOR_XPATH_FMT = "//p[@id='options_floor']/a[@value='{value}']" + + FIND_ROOM_BTN = (By.ID, "findRoom") + ROOM_BTN_FMT = "room_{room}" + + SEAT_LAYOUT = (By.ID, "seatLayout") + SEAT_ITEMS = (By.CSS_SELECTOR, "li[id^='seat_']") + RESERVE_BTN = (By.ID, "reserveBtn") + + START_TIME_OPTS = (By.CSS_SELECTOR, "#startTime ul li a") + END_TIME_OPTS = (By.CSS_SELECTOR, "#endTime ul li a") + + RESULT_DIALOG = (By.CLASS_NAME, "layoutSeat") + RESULT_TITLE = (By.CSS_SELECTOR, ".layoutSeat dt") + RESULT_DETAIL = (By.CSS_SELECTOR, ".layoutSeat dd") + + FLOOR_MAP = {"2": "二层", "3": "三层", "4": "四层", "5": "五层"} + ROOM_MAP = { + "1": "二层内环", "2": "二层西区", "3": "三层内环", "4": "三层外环", + "5": "四层内环", "6": "四层外环", "7": "四层期刊", "8": "五层考研", + } + + def __init__( + self, + driver: WebDriver, + ) -> None: + + self._driver = driver + + def selectDate( + self, + date_str: str, + ) -> bool: + + if self._clickOptionByJS( + trigger_id="onDate_select", + option_css=self.DATE_OPTION_FMT.format(value=date_str), + ): + return True + return self._clickOption( + trigger=self.DATE_SELECT, + option=(By.XPATH, self.DATE_XPATH_FMT.format(value=date_str)), + ) + + def selectPlace( + self, + place: str = "1", + ) -> bool: + + if self._clickOptionByJS( + trigger_id="display_building", + option_css=self.PLACE_OPTION_FMT.format(value=place), + ): + return True + return self._clickOption( + trigger=self.PLACE_SELECT, + option=(By.XPATH, self.PLACE_XPATH_FMT.format(value=place)), + ) + + def selectFloor( + self, + floor: str, + ) -> bool: + + if self._clickOptionByJS( + trigger_id="floor_select", + option_css=self.FLOOR_OPTION_FMT.format(value=floor), + ): + return True + return self._clickOption( + trigger=self.FLOOR_SELECT, + option=(By.XPATH, self.FLOOR_XPATH_FMT.format(value=floor)), + ) + + def selectRoom( + self, + room: str, + ) -> bool: + + try: + WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable(self.FIND_ROOM_BTN) + ).click() + except (TimeoutException, ElementNotInteractableException): + return False + except Exception: + return False + try: + WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable((By.ID, self.ROOM_BTN_FMT.format(room=room))) + ).click() + return True + except (TimeoutException, ElementNotInteractableException): + return False + except Exception: + return False + + def openSeatMap( + self, + ) -> SeatMapOverlay: + + return SeatMapOverlay(self._driver) + + def selectSeat( + self, + seat_id: str, + ) -> str | None: + + try: + WebDriverWait(self._driver, 2).until( + EC.presence_of_element_located(self.SEAT_LAYOUT) + ) + WebDriverWait(self._driver, 2).until( + EC.presence_of_all_elements_located(self.SEAT_ITEMS) + ) + except TimeoutException: + return None + except Exception: + return None + try: + all_seats = self._driver.find_elements(*self.SEAT_ITEMS) + seat_id_upper = seat_id.lstrip('0').upper() + for seat in all_seats: + if not seat_id_upper == seat.text.lstrip('0'): + continue + seat_link = seat.find_element(By.TAG_NAME, "a") + WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable(seat_link) + ) + seat_link.click() + return seat_link.get_attribute("title") + return None + except (NoSuchElementException, TimeoutException, + StaleElementReferenceException, ElementNotInteractableException): + return None + except Exception: + return None + + def submitReserve( + self, + ) -> bool: + + try: + WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable(self.RESERVE_BTN) + ).click() + return True + except (TimeoutException, ElementNotInteractableException): + return False + except Exception: + return False + + def waitResultDialog( + self, + ) -> ReserveResultDialog: + + return ReserveResultDialog(self._driver) + + def getAvailableTimeOptions( + self, + time_id: str, + ) -> list: + + try: + WebDriverWait(self._driver, 2).until( + EC.presence_of_all_elements_located( + (By.CSS_SELECTOR, f"#{time_id} ul li a") + ) + ) + except TimeoutException: + return [] + except Exception: + return [] + return self._driver.find_elements( + By.CSS_SELECTOR, + f"#{time_id} ul li a", + ) + + def refresh( + self, + ) -> None: + + self._driver.refresh() + + def _clickOptionByJS( + self, + trigger_id: str, + option_css: str, + ) -> bool: + + script = f""" + try {{ + var trigger = document.getElementById('{trigger_id}'); + if (trigger) {{ + trigger.click(); + var option = document.querySelector("{option_css}"); + if (option) {{ + option.click(); + return true; + }} + return false; + }} + return false; + }} catch (e) {{ + return false; + }} + """ + result = self._driver.execute_script(script) + time.sleep(0.1) + return result + + def _clickOption( + self, + trigger: tuple, + option: tuple, + ) -> bool: + + try: + WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable(trigger) + ).click() + WebDriverWait(self._driver, 2).until( + EC.element_to_be_clickable(option) + ).click() + return True + except (TimeoutException, ElementNotInteractableException): + return False + except Exception: + return False diff --git a/src/pages/__init__.py b/src/pages/__init__.py new file mode 100644 index 0000000..36f0450 --- /dev/null +++ b/src/pages/__init__.py @@ -0,0 +1,34 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +from pages.AutoLibPages import AutoLibPages +from pages.LoginPage import LoginPage +from pages.MainShell import MainShell +from pages.ReserveView import ReserveView +from pages.RecordsView import RecordsView +from pages._dialogs import ( + SeatMapOverlay, + TimeSelectDialog, + ReserveResultDialog, + CheckinResultDialog, + RenewDialog, +) + +__all__ = [ + "AutoLibPages", + "LoginPage", + "MainShell", + "ReserveView", + "RecordsView", + "SeatMapOverlay", + "TimeSelectDialog", + "ReserveResultDialog", + "CheckinResultDialog", + "RenewDialog", +] diff --git a/src/pages/_dialogs.py b/src/pages/_dialogs.py new file mode 100644 index 0000000..dc52602 --- /dev/null +++ b/src/pages/_dialogs.py @@ -0,0 +1,302 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +from selenium.common.exceptions import ( + ElementNotInteractableException, + NoSuchElementException, + StaleElementReferenceException, + TimeoutException, +) +from selenium.webdriver.common.by import By +from selenium.webdriver.remote.webdriver import WebDriver +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC + +from pages._overlay import Overlay + + +class SeatMapOverlay(Overlay): + """ + Seat selection overlay that opens after choosing a floor and room. + """ + + ROOT = (By.ID, "seatLayout") + SEAT_ITEMS = (By.CSS_SELECTOR, "li[id^='seat_']") + + def __init__( + self, + driver: WebDriver, + ) -> None: + + super().__init__(driver, self.ROOT) + + def selectSeat( + self, + seat_id: str, + ) -> str | None: + + try: + self._waitAllPresence(self.SEAT_ITEMS) + except (NoSuchElementException, TimeoutException): + return None + except Exception: + return None + try: + all_seats = self._findAll(*self.SEAT_ITEMS) + seat_id_upper = seat_id.lstrip('0').upper() + for seat in all_seats: + if not seat_id_upper == seat.text.lstrip('0'): + continue + seat_link = seat.find_element(By.TAG_NAME, "a") + self._waitClickable((By.TAG_NAME, "a")) + seat_link.click() + return seat_link.get_attribute("title") + return None + except (NoSuchElementException, TimeoutException, + ElementNotInteractableException, StaleElementReferenceException): + return None + except Exception: + return None + + +class TimeSelectDialog(Overlay): + """ + Time selection panel that appears after selecting a seat. + + Contains start-time and end-time option lists. + Does NOT auto-close — the reserve submission handles cleanup. + """ + + ROOT = (By.CSS_SELECTOR, "#startTime ul") + + def __init__( + self, + driver: WebDriver, + ) -> None: + + super().__init__(driver, self.ROOT, auto_close_on_exit=False) + + def getTimeOptions( + self, + time_id: str, + ) -> list[WebElement]: + + try: + self._waitAllPresence( + (By.CSS_SELECTOR, f"#{time_id} ul li a") + ) + except (NoSuchElementException, TimeoutException): + return [] + except Exception: + return [] + return self._findAll( + By.CSS_SELECTOR, + f"#{time_id} ul li a", + ) + + +class ReserveResultDialog(Overlay): + """ + Reservation result dialog shown after submitting a reserve request. + """ + + ROOT = (By.CLASS_NAME, "layoutSeat") + + def __init__( + self, + driver: WebDriver, + ) -> None: + + super().__init__(driver, self.ROOT, auto_close_on_exit=False) + + def getTitle( + self, + ) -> str: + + try: + return self._find(*self._titleLocator()).text + except (NoSuchElementException, StaleElementReferenceException): + return "" + except Exception: + return "" + + def isSuccess( + self, + ) -> bool: + + title = self.getTitle() + return any( + kw in title + for kw in ("预定好了", "预约成功", "操作成功") + ) + + def isFailure( + self, + ) -> bool: + + contents = self.getDetailTexts() + return any( + "预约失败" in msg or "已有1个有效预约" in msg + for msg in contents + ) + + def getDetailTexts( + self, + ) -> list[str]: + + try: + elements = self._findAll(By.CSS_SELECTOR, ".layoutSeat dd") + return [el.text for el in elements if el.text.strip()] + except (NoSuchElementException, StaleElementReferenceException): + return [] + except Exception: + return [] + + def _titleLocator( + self, + ) -> tuple: + + return (By.CSS_SELECTOR, ".layoutSeat dt") + + +class CheckinResultDialog(Overlay): + """ + Check-in result dialog. + """ + + ROOT = (By.CLASS_NAME, "ui_dialog") + + RESULT_MSG = (By.CLASS_NAME, "resultMessage") + OK_BTN = (By.CLASS_NAME, "btnOK") + DETAIL_DD = (By.CSS_SELECTOR, ".resultMessage dd") + + def __init__( + self, + driver: WebDriver, + ) -> None: + + super().__init__(driver, self.ROOT, auto_close_on_exit=False) + + def getResultMessage( + self, + ) -> str: + + try: + self._waitPresence(self.RESULT_MSG) + el = self._find(*self.RESULT_MSG) + return el.text + except (TimeoutException, NoSuchElementException, StaleElementReferenceException): + return "" + except Exception: + return "" + + def getDetails( + self, + ) -> list[str]: + + try: + elements = self._findAll(*self.DETAIL_DD) + return [el.text for el in elements if el.text.strip()] + except (NoSuchElementException, StaleElementReferenceException): + return [] + except Exception: + return [] + + def clickOk( + self, + ) -> bool: + + try: + self._waitClickable(self.OK_BTN).click() + return True + except (NoSuchElementException, TimeoutException, ElementNotInteractableException): + return False + except Exception: + return False + + +class RenewDialog(Overlay): + """ + Renewal time selection dialog. + """ + + ROOT = (By.ID, "extendDiv") + + MESSAGE_HEAD = (By.CSS_SELECTOR, "#extendDiv p.messageHead") + RESULT_MSG = (By.CSS_SELECTOR, "#extendDiv div.resultMessage") + TIME_OPTS = (By.CSS_SELECTOR, "#extendDiv .renewal_List li") + OK_BTN = (By.CSS_SELECTOR, "#extendDiv .btnOK") + + def __init__( + self, + driver: WebDriver, + ) -> None: + + super().__init__(driver, self.ROOT, auto_close_on_exit=False) + + def waitUntilReady( + self, + ) -> bool: + + try: + self._waitVisible(self.ROOT) + self._waitPresence(self.MESSAGE_HEAD) + self._waitPresence(self.RESULT_MSG) + except (NoSuchElementException, TimeoutException): + return False + except Exception: + return False + head_msg = self._find(*self.MESSAGE_HEAD).text.strip() + if "警告" in head_msg: + return False + try: + self._waitAllPresence(self.TIME_OPTS) + self._waitPresence(self.OK_BTN) + except (NoSuchElementException, TimeoutException): + return False + except Exception: + return False + return True + + def getHeadMessage( + self, + ) -> str: + + return self._find(*self.MESSAGE_HEAD).text.strip() + + def getResultMessage( + self, + ) -> str: + + return self._find(*self.RESULT_MSG).text.strip() + + def getTimeOptions( + self, + ) -> list[WebElement]: + + return self._findAll(*self.TIME_OPTS) + + def getOkButton( + self, + ) -> WebElement: + + return self._find(*self.OK_BTN) + + def clickOk( + self, + ) -> bool: + + try: + self._find(*self.OK_BTN).click() + return True + except (NoSuchElementException, TimeoutException, ElementNotInteractableException): + return False + except Exception: + return False diff --git a/src/pages/_overlay.py b/src/pages/_overlay.py new file mode 100644 index 0000000..12041e6 --- /dev/null +++ b/src/pages/_overlay.py @@ -0,0 +1,110 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +from selenium.webdriver.remote.webdriver import WebDriver +from selenium.webdriver.remote.webelement import WebElement +from selenium.webdriver.support.ui import WebDriverWait +from selenium.webdriver.support import expected_conditions as EC + + +class Overlay: + """ + Context-managed overlay / modal / dialog on a page. + + Automates the lifecycle: wait for appearance on enter, + optionally wait for disappearance on exit. + """ + + def __init__( + self, + driver: WebDriver, + root_locator: tuple, + auto_close_on_exit: bool = True, + wait_timeout: float = 3.0, + ) -> None: + + self._driver: WebDriver = driver + self._root_locator: tuple = root_locator + self._auto_close: bool = auto_close_on_exit + self._timeout: float = wait_timeout + + def __enter__( + self, + ) -> "Overlay": + + WebDriverWait(self._driver, self._timeout).until( + EC.visibility_of_element_located(self._root_locator) + ) + return self + + def __exit__( + self, + *args: object, + ) -> None: + + if self._auto_close: + WebDriverWait(self._driver, self._timeout).until( + EC.invisibility_of_element_located(self._root_locator) + ) + + def _find( + self, + by: str, + value: str, + ) -> WebElement: + + return self._driver.find_element(by, value) + + def _findAll( + self, + by: str, + value: str, + ) -> list[WebElement]: + + return self._driver.find_elements(by, value) + + def _waitClickable( + self, + locator: tuple, + timeout: float = 2.0, + ) -> WebElement: + + return WebDriverWait(self._driver, timeout).until( + EC.element_to_be_clickable(locator) + ) + + def _waitPresence( + self, + locator: tuple, + timeout: float = 2.0, + ) -> WebElement: + + return WebDriverWait(self._driver, timeout).until( + EC.presence_of_element_located(locator) + ) + + def _waitVisible( + self, + locator: tuple, + timeout: float = 2.0, + ) -> WebElement: + + return WebDriverWait(self._driver, timeout).until( + EC.visibility_of_element_located(locator) + ) + + def _waitAllPresence( + self, + locator: tuple, + timeout: float = 2.0, + ) -> list[WebElement]: + + return WebDriverWait(self._driver, timeout).until( + EC.presence_of_all_elements_located(locator) + ) diff --git a/src/pages/flows/CheckinFlow.py b/src/pages/flows/CheckinFlow.py new file mode 100644 index 0000000..299ab04 --- /dev/null +++ b/src/pages/flows/CheckinFlow.py @@ -0,0 +1,94 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +import queue + +from selenium.common.exceptions import ( + NoSuchElementException, + TimeoutException, +) +from selenium.webdriver.remote.webdriver import WebDriver + +from base.MsgBase import MsgBase +from pages.MainShell import MainShell +from pages._dialogs import CheckinResultDialog + + +class CheckinFlow(MsgBase): + + def __init__( + self, + input_queue: queue.Queue, + output_queue: queue.Queue, + driver: WebDriver, + shell: MainShell, + ) -> None: + + super().__init__(input_queue, output_queue) + self._driver: WebDriver = driver + self._shell: MainShell = shell + + def execute( + self, + username: str, + ) -> bool: + + if not self._shell.waitCheckinButton(): + self._showTrace(f"用户 {username} 签到界面加载失败 !", self.TraceLevel.ERROR) + return False + + if self._shell.isCheckinButtonDisabled(): + self._showTrace("签到按钮不可用, 可能不在场馆内, 正在尝试启用......") + if not self._shell.enableCheckinButtonByJS(): + self._showTrace(f"签到按钮启用失败 !", self.TraceLevel.ERROR) + return False + self._showTrace("签到按钮已启用") + + self._shell.clickCheckinButton() + + try: + with CheckinResultDialog(self._driver) as dialog: + result_msg = dialog.getResultMessage() + if "签到成功" in result_msg: + details = dialog.getDetails() + if details: + if len(details) >= 5: + self._showTrace( + f"\n" + f" 签到成功 !\n" + f" {details[1]}\n" + f" {details[2]}\n" + f" {details[3]}\n" + f" {details[4]}" + ) + else: + self._showTrace( + "\n" + " 签到成功 !\n" + " 未获取到签到详情 !" + ) + dialog.clickOk() + self._showTrace(f"用户 {username} 签到成功 !") + return True + else: + failure_reason = result_msg.replace("签到失败", "").strip() + self._showTrace( + f"\n" + " 签到失败 !\n" + f" {failure_reason}" + ) + dialog.clickOk() + self._showTrace(f"用户 {username} 签到失败 !", self.TraceLevel.ERROR) + return False + except (NoSuchElementException, TimeoutException): + self._showTrace("签到时发生未知错误 !", self.TraceLevel.ERROR) + return False + except Exception: + self._showTrace("签到时发生未知错误 !", self.TraceLevel.ERROR) + return False diff --git a/src/pages/flows/RenewFlow.py b/src/pages/flows/RenewFlow.py new file mode 100644 index 0000000..35b48f1 --- /dev/null +++ b/src/pages/flows/RenewFlow.py @@ -0,0 +1,156 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +import queue + +from selenium.common.exceptions import ( + ElementNotInteractableException, + NoSuchElementException, + TimeoutException, +) +from selenium.webdriver.remote.webdriver import WebDriver + +from base.MsgBase import MsgBase +from pages.MainShell import MainShell +from pages._dialogs import RenewDialog +from pages.flows._helpers import ( + timeStrToMins, + minsToTimeStr, + findBestTimeOption, +) + + +class RenewFlow(MsgBase): + + LIBRARY_CLOSE_MINS = 1410 + + def __init__( + self, + input_queue: queue.Queue, + output_queue: queue.Queue, + driver: WebDriver, + shell: MainShell, + ) -> None: + + super().__init__(input_queue, output_queue) + self._driver: WebDriver = driver + self._shell: MainShell = shell + + def execute( + self, + username: str, + record: dict, + renew_info: dict, + ) -> bool: + + max_diff = renew_info["max_diff"] + prefer_earlier = renew_info["prefer_early"] + end_time = record["time"]["end"] + target_renew_mins = timeStrToMins(end_time) + renew_info["expect_duration"] * 60 + + if not self._validateRenewTime(end_time, target_renew_mins): + return False + + if not self._shell.waitExtendButton(): + self._showTrace(f"用户 {username} 续约界面加载失败 !", self.TraceLevel.ERROR) + return False + + if self._shell.isExtendButtonDisabled(): + self._showTrace( + f"用户 {username} 续约按钮不可用, 可能不在场馆内, " + f"请连接图书馆网络后重试" + ) + return False + + self._shell.clickExtendButton() + + try: + with RenewDialog(self._driver) as dialog: + if not dialog.waitUntilReady(): + result_msg = dialog.getResultMessage() + self._showTrace( + f"\n" + f" 续约失败 !\n" + f" {result_msg}" + ) + self._shell.refresh() + self._showTrace(f"用户 {username} 续约失败 !", self.TraceLevel.ERROR) + return False + + renew_ok_btn = dialog.getOkButton() + renew_time_opts = dialog.getTimeOptions() + if not renew_time_opts: + self._showTrace("当前未查询到可用续约时间 !", self.TraceLevel.WARNING) + self._shell.refresh() + return False + + best_opt, best_text, actual_diff, free_times = findBestTimeOption( + renew_time_opts, target_renew_mins, max_diff, prefer_earlier, + is_reserve=False, + ) + if best_opt is not None: + best_opt.click() + abs_diff = abs(actual_diff) + if actual_diff < 0: + relation = f"早了 {abs_diff} 分钟" + elif actual_diff > 0: + relation = f"晚了 {abs_diff} 分钟" + else: + relation = "正好等于 续约时间" + self._showTrace( + f"选择距离期望续约时间最近的 {best_text}, " + f"与期望续约时间相比 {relation}" + ) + record["time"]["end"] = best_text.strip() + renew_ok_btn.click() + self._shell.refresh() + return True + + self._showTrace( + "无法选择最近的可用续约时间 ! " + f"所有可选时间与目标时间相差都超过了 {max_diff} 分钟 !", + self.TraceLevel.WARNING, + ) + self._showTrace(f"当前可供续约的时间有: {free_times}") + self._shell.refresh() + return False + except (NoSuchElementException, TimeoutException) as e: + self._showTrace(f"用户 {username} 续约失败 ! : {e}", self.TraceLevel.ERROR) + self._shell.refresh() + return False + except (ElementNotInteractableException) as e: + self._showTrace(f"用户 {username} 续约失败 ! : {e}", self.TraceLevel.ERROR) + self._shell.refresh() + return False + except Exception as e: + self._showTrace(f"用户 {username} 续约失败 ! : {e}", self.TraceLevel.ERROR) + self._shell.refresh() + return False + + def _validateRenewTime( + self, + end_time: str, + target_renew_mins: int, + ) -> bool: + + if target_renew_mins > self.LIBRARY_CLOSE_MINS: + actual_renew_duration = self.LIBRARY_CLOSE_MINS - timeStrToMins(end_time) + if actual_renew_duration <= 0: + self._showTrace( + f"当前结束时间 {end_time} 已接近闭馆时间,无法续约 !", self.TraceLevel.ERROR + ) + return False + self._showTrace( + f"续约时间已调整至闭馆时间 " + f"{minsToTimeStr(self.LIBRARY_CLOSE_MINS)}," + f"实际续约时长为 " + f"{actual_renew_duration // 60} 小时 " + f"{actual_renew_duration % 60} 分钟" + ) + return True diff --git a/src/pages/flows/ReserveFlow.py b/src/pages/flows/ReserveFlow.py new file mode 100644 index 0000000..037e36c --- /dev/null +++ b/src/pages/flows/ReserveFlow.py @@ -0,0 +1,272 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +import queue +from dataclasses import dataclass +from typing import Optional + +from selenium.common.exceptions import ( + ElementNotInteractableException, + NoSuchElementException, + TimeoutException, +) +from selenium.webdriver.remote.webdriver import WebDriver + +from base.MsgBase import MsgBase +from pages.MainShell import MainShell +from pages.flows._helpers import ( + timeStrToMins, + minsToTimeStr, + findBestTimeOption, +) +from pages.ReserveView import ReserveView +from pages._dialogs import ReserveResultDialog + + +@dataclass +class ReserveContext: + + username: str + date: str + floor: str + room: str + seat_id: str + begin_time: str + end_time: str + begin_max_diff: int = 30 + end_max_diff: int = 30 + begin_prefer_early: bool = True + end_prefer_early: bool = False + expect_duration: int = 4 + satisfy_duration: bool = True + + +class ReserveFlow(MsgBase): + + LIBRARY_CLOSE_MINS = timeStrToMins("23:30") + + def __init__( + self, + input_queue: queue.Queue, + output_queue: queue.Queue, + driver: WebDriver, + shell: MainShell, + ) -> None: + + super().__init__(input_queue, output_queue) + self._driver: WebDriver = driver + self._shell: MainShell = shell + self._ctx: Optional[ReserveContext] = None + + def execute( + self, + ctx: ReserveContext, + ) -> bool: + + self._ctx = ctx + submit_reserve = False + reserve_success = False + have_hover_on_page = False + + try: + view = self._shell.gotoReserveView() + except (NoSuchElementException, TimeoutException) as e: + self._showTrace(f"加载预约选座页面失败 ! : {e}", self.TraceLevel.ERROR) + return False + except Exception as e: + self._showTrace(f"加载预约选座页面失败 ! : {e}", self.TraceLevel.ERROR) + return False + + if not view.selectDate(ctx.date): + self._showTrace(f"选择日期失败 ! : {ctx.date} 不可用", self.TraceLevel.ERROR) + return False + self._showTrace(f"日期 {ctx.date} 选择成功 !") + + if not view.selectPlace("1"): + self._showTrace("选择预约场所失败 ! : 图书馆 不可用", self.TraceLevel.ERROR) + return False + self._showTrace("预约场所 图书馆 选择成功 !") + + if not view.selectFloor(ctx.floor): + display_floor = ReserveView.FLOOR_MAP.get(ctx.floor, ctx.floor) + self._showTrace(f"选择楼层失败 ! : {display_floor} 不可用", self.TraceLevel.ERROR) + return False + self._showTrace(f"楼层 {ReserveView.FLOOR_MAP.get(ctx.floor)} 选择成功 !") + + if not view.selectRoom(ctx.room): + display_room = ReserveView.ROOM_MAP.get(ctx.room, ctx.room) + self._showTrace(f"选择房间失败 ! : {display_room} 不可用", self.TraceLevel.ERROR) + return False + self._showTrace(f"房间 {ReserveView.ROOM_MAP.get(ctx.room)} 选择成功 !") + have_hover_on_page = True + + seat_status = view.selectSeat(ctx.seat_id) + if seat_status is None: + self._showTrace( + f"座位 {ctx.seat_id} 在该楼层区域中不存在, 请检查座位号是否正确", + self.TraceLevel.WARNING, + ) + else: + self._showTrace(f"座位 {ctx.seat_id} 选择成功 ! : 当前状态 - '{seat_status}'") + + select_time_ok = self._selectSeatTime(view) + if not select_time_ok: + self._showTrace("选择时间失败 !", self.TraceLevel.ERROR) + else: + try: + view.submitReserve() + submit_reserve = True + with ReserveResultDialog(self._driver) as result: + if result.isFailure(): + self._showTrace("预约失败", self.TraceLevel.ERROR) + elif result.isSuccess(): + details = result.getDetailTexts() + if len(details) >= 6: + self._showTrace( + f"\n" + f" 预约成功 !\n" + f" {details[1]}\n" + f" {details[2]}\n" + f" {details[3]}\n" + f" 签到时间 :{details[5]}" + ) + else: + self._showTrace( + "\n" + " 预约成功 !\n" + " 未找获取到详细信息" + ) + reserve_success = True + else: + self._showTrace("预约结果加载失败 !", self.TraceLevel.ERROR) + except (TimeoutException, ElementNotInteractableException): + self._showTrace("预约提交失败 !", self.TraceLevel.ERROR) + except Exception: + self._showTrace("预约提交失败 !", self.TraceLevel.ERROR) + + if not submit_reserve and have_hover_on_page: + view.refresh() + if reserve_success: + self._showTrace(f"用户 {ctx.username} 预约成功 !") + else: + self._showTrace(f"用户 {ctx.username} 预约失败 !", self.TraceLevel.ERROR) + return reserve_success + + def _selectSeatTime( + self, + view: ReserveView, + ) -> bool: + + ctx = self._ctx + exp_beg_tm_str = ctx.begin_time + exp_end_tm_str = ctx.end_time + exp_beg_mins = timeStrToMins(exp_beg_tm_str) + exp_end_mins = timeStrToMins(exp_end_tm_str) + act_beg_mins = exp_beg_mins + act_beg_tm_str = exp_beg_tm_str + act_end_mins = exp_end_mins + act_end_tm_str = exp_end_tm_str + + act_beg_mins = self._selectNearestTime( + view, + time_id="startTime", + time_type="开始时间", + target_time=exp_beg_mins, + max_time_diff=ctx.begin_max_diff, + prefer_earlier=ctx.begin_prefer_early, + ) + if act_beg_mins == -1: + return False + act_beg_tm_str = minsToTimeStr(act_beg_mins) + + if ctx.satisfy_duration: + exp_end_mins = self._calcEndTime(act_beg_mins, ctx.expect_duration) + exp_end_tm_str = minsToTimeStr(exp_end_mins) + self._showTrace( + f"需要满足期望预约持续时间: {ctx.expect_duration} 小时, " + f"根据开始时间 {act_beg_tm_str} 计算结束时间: {exp_end_tm_str}" + ) + + act_end_mins = self._selectNearestTime( + view, + time_id="end_time", + time_type="结束时间", + target_time=exp_end_mins, + max_time_diff=ctx.end_max_diff, + prefer_earlier=ctx.end_prefer_early, + ) + if act_end_mins == -1: + return False + act_end_tm_str = minsToTimeStr(act_end_mins) + + self._showTrace( + f"期望预约时间段: {exp_beg_tm_str} - {exp_end_tm_str}, " + f"实际预约时间段: {act_beg_tm_str} - {act_end_tm_str}" + ) + return True + + def _selectNearestTime( + self, + view: ReserveView, + time_id: str, + time_type: str, + target_time: int, + max_time_diff: int, + prefer_earlier: bool, + ) -> int: + + all_time_opts = view.getAvailableTimeOptions(time_id) + if not all_time_opts: + self._showTrace( + f"{time_type} 选择失败 ! : 当前未查询到可用时间", self.TraceLevel.ERROR + ) + return -1 + + best_opt, best_text, actual_diff, free_times = findBestTimeOption( + all_time_opts, target_time, max_time_diff, prefer_earlier, is_reserve=True + ) + if best_opt is not None: + best_opt.click() + abs_diff = abs(actual_diff) + if actual_diff < 0: + relation = f"早了 {abs_diff} 分钟" + elif actual_diff > 0: + relation = f"晚了 {abs_diff} 分钟" + else: + relation = f"正好等于 {time_type}" + self._showTrace( + f"选择距离期望 {time_type} 最近的 {best_text}, " + f"与期望 {time_type} 相比 {relation}" + ) + return target_time + actual_diff + + target_time_str = minsToTimeStr(target_time) + self._showTrace( + f"无法选择最近的 {time_type} {target_time_str}, " + f"所有可选时间与目标时间相差都超过 {max_time_diff} 分钟", + self.TraceLevel.WARNING, + ) + self._showTrace(f"当前可供预约的 {time_type} 有: {free_times}") + return -1 + + def _calcEndTime( + self, + begin_mins: int, + duration: int, + ) -> int: + + expect_end_mins = int(begin_mins + duration * 60) + if expect_end_mins > self.LIBRARY_CLOSE_MINS: + expect_end_mins = self.LIBRARY_CLOSE_MINS + self._showTrace( + f"预约持续时间 {duration} 小时, 超过最大预约时间 23:30, " + f"自动调整为 23:30", + self.TraceLevel.WARNING, + ) + return expect_end_mins diff --git a/src/pages/flows/__init__.py b/src/pages/flows/__init__.py new file mode 100644 index 0000000..c8f1d9f --- /dev/null +++ b/src/pages/flows/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +from pages.flows.ReserveFlow import ReserveFlow +from pages.flows.CheckinFlow import CheckinFlow +from pages.flows.RenewFlow import RenewFlow + +__all__ = [ + "ReserveFlow", + "CheckinFlow", + "RenewFlow", +] diff --git a/src/pages/flows/_helpers.py b/src/pages/flows/_helpers.py new file mode 100644 index 0000000..1b2fcf5 --- /dev/null +++ b/src/pages/flows/_helpers.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +from datetime import datetime + + +def timeStrToMins( + time_str: str, +) -> int: + + hour, minute = map(int, time_str.split(":")) + return hour * 60 + minute + + +def minsToTimeStr( + mins: int, +) -> str: + + hour, minute = divmod(int(mins), 60) + return f"{hour:02d}:{minute:02d}" + + +def findBestTimeOption( + time_options: list, + target_time: int, + max_time_diff: int, + prefer_earlier: bool, + is_reserve: bool = True, +) -> tuple: + """ + Find the best time option from available WebElement options. + + Returns: + (bestElement, bestText, actual_diff, freeTimesList) + or (None, None, None, freeTimesList) if no suitable option. + """ + + free_times = [] + best_time_diff = max_time_diff + best_actual_diff = None + best_time_opt = None + + for time_opt in time_options: + if is_reserve: + time_attr = time_opt.get_attribute("time") + if time_attr == "now": + now = datetime.now() + time_val = now.hour * 60 + now.minute + elif time_attr and time_attr.isdigit(): + time_val = int(time_attr) + else: + continue + else: + time_attr = time_opt.get_attribute("id") + if not (time_attr and time_attr.isdigit()): + continue + time_val = int(time_attr) + free_times.append( + time_opt.text.strip() + if not is_reserve + else minsToTimeStr(time_val) + ) + actual_diff = time_val - target_time + abs_diff = abs(actual_diff) + + if abs_diff < best_time_diff or ( + abs_diff == best_time_diff + and ( + (prefer_earlier and actual_diff <= 0) + or (not prefer_earlier and actual_diff >= 0) + ) + ): + best_time_diff = abs_diff + best_actual_diff = actual_diff + best_time_opt = time_opt + + if best_time_opt is not None: + return (best_time_opt, best_time_opt.text.strip(), best_actual_diff, free_times) + return (None, None, None, free_times) diff --git a/src/pages/services/CaptchaHandler.py b/src/pages/services/CaptchaHandler.py new file mode 100644 index 0000000..2aec267 --- /dev/null +++ b/src/pages/services/CaptchaHandler.py @@ -0,0 +1,101 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +import base64 +import queue + +import ddddocr +from selenium.common.exceptions import ( + NoSuchElementException, + TimeoutException, +) + +from base.MsgBase import MsgBase +from pages.LoginPage import LoginPage + + +class CaptchaHandler(MsgBase): + + def __init__( + self, + input_queue: queue.Queue, + output_queue: queue.Queue, + login_page: LoginPage, + ) -> None: + + super().__init__(input_queue, output_queue) + self._login_page = login_page + self._ocr = ddddocr.DdddOcr() + + def solveCaptcha( + self, + auto_captcha: bool = True, + ) -> str: + + max_attempts = 3 + for _ in range(max_attempts): + if auto_captcha: + captcha_text = self._autoRecognize() + else: + self._showTrace("用户未配置自动识别验证码, 请手动输入验证码 !", 20, no_log=True) + captcha_text = self._manualRecognize() + if captcha_text: + return captcha_text + else: + if not self._login_page.refreshCaptcha(): + return "" + self._showTrace( + f"验证码识别失败 {max_attempts} 次, 达到最大尝试次数 !", + self.TraceLevel.WARNING, + ) + return "" + + def _autoRecognize( + self, + ) -> str: + + try: + img_src = self._login_page.getCaptchaImageSrc() + base64_str = img_src.split(',', 1)[1] + captcha_img = base64.b64decode(base64_str) + captcha_text = self._ocr.classification(captcha_img) + captcha_text = ''.join(filter(str.isalnum, captcha_text)).lower() + self._showTrace(f"识别到验证码为 : '{captcha_text}'", 20, no_log=True) + if len(captcha_text) != 4: + self._showLog("识别到的验证码长度不等于 4 个字符 !", self.TraceLevel.WARNING) + raise Exception("识别到的验证码长度不等于 4 个字符 !") + return captcha_text + except (NoSuchElementException, TimeoutException) as e: + self._showTrace(f"验证码识别失败 ! : {e}", self.TraceLevel.ERROR) + return "" + except (ValueError, OSError) as e: + self._showTrace(f"验证码识别失败 ! : {e}", self.TraceLevel.ERROR) + return "" + except Exception as e: + self._showTrace(f"验证码识别失败 ! : {e}", self.TraceLevel.ERROR) + return "" + + def _manualRecognize( + self, + ) -> str: + + try: + self._showMsg("请输入验证码:") + captcha_text = self._waitMsg(timeout=15) + self._showTrace(f"输入的验证码为 : '{captcha_text}'", 20, no_log=True) + if len(captcha_text) != 4: + self._showLog("输入的验证码长度不等于 4 个字符 !", self.TraceLevel.WARNING) + raise Exception("输入的验证码长度不等于 4 个字符 !") + return captcha_text + except ValueError as e: + self._showTrace(f"输入验证码失败 ! : {e}", self.TraceLevel.ERROR) + return "" + except Exception as e: + self._showTrace(f"输入验证码失败 ! : {e}", self.TraceLevel.ERROR) + return "" diff --git a/src/pages/services/RecordChecker.py b/src/pages/services/RecordChecker.py new file mode 100644 index 0000000..e00536a --- /dev/null +++ b/src/pages/services/RecordChecker.py @@ -0,0 +1,302 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +import queue +import re +import time +from datetime import datetime, timedelta + +from selenium.common.exceptions import ( + NoSuchElementException, + StaleElementReferenceException, + TimeoutException, +) + +from base.MsgBase import MsgBase +from pages.MainShell import MainShell +from pages.RecordsView import RecordsView + + +class RecordChecker(MsgBase): + + def __init__( + self, + input_queue: queue.Queue, + output_queue: queue.Queue, + shell: MainShell, + ) -> None: + + super().__init__(input_queue, output_queue) + self._shell = shell + + @staticmethod + def _formatDiffTime( + seconds: float, + ) -> str: + + hours = int(seconds // 3600) + minutes = int(seconds % 3600 // 60) + seconds = int(seconds % 60) + return f"{hours} 时 {minutes} 分 {seconds} 秒" + + def canReserve( + self, + date: str, + ) -> bool: + + if self._getReserveRecord(date, "已预约") is None: + if self._getReserveRecord(date, "使用中") is None: + self._showTrace(f"用户在 {date} 可以预约") + return True + self._showTrace(f"用户在 {date} 有使用中的预约, 无法预约") + return False + self._showTrace(f"用户在 {date} 已存在有效预约, 无法预约") + return False + + def canCheckin( + self, + ) -> bool: + + date = time.strftime("%Y-%m-%d", time.localtime()) + record = self._getReserveRecord(date, "已预约") + if record is not None: + begin_time = record["time"]["begin"] + begin_time = datetime.strptime( + f"{date} {begin_time}", "%Y-%m-%d %H:%M" + ) + time_diff = datetime.now() - begin_time + time_diff_seconds = time_diff.total_seconds() + if time_diff_seconds < -30 * 60: + self._showTrace( + f"用户在 {date} 的预约开始时间为 {begin_time}, " + f"当前距离预约开始时间还有 " + f"{self._formatDiffTime(abs(time_diff_seconds))}, 无法签到" + ) + return False + elif -30 * 60 <= time_diff_seconds < 0: + self._showTrace( + f"用户在 {date} 的预约开始时间为 {begin_time}, " + f"当前距离预约开始时间还有 " + f"{self._formatDiffTime(abs(time_diff_seconds))}, 可以签到" + ) + return True + elif 0 <= time_diff_seconds < 30 * 60 - 5: + self._showTrace( + f"用户在 {date} 的预约开始时间为 {begin_time}, " + f"当前距离预约开始时间已经过去 " + f"{self._formatDiffTime(abs(time_diff_seconds))}, 可以签到" + ) + return True + self._showTrace(f"用户在 {date} 没有有效预约记录, 无法签到") + return False + + def canRenew( + self, + ) -> tuple[bool, dict]: + + date = time.strftime("%Y-%m-%d", time.localtime()) + record = self._getReserveRecord(date, "使用中") + if record is not None: + end_time = record["time"]["end"] + end_time = datetime.strptime( + f"{date} {end_time}", "%Y-%m-%d %H:%M" + ) + time_diff = end_time - datetime.now() + time_diff_seconds = time_diff.total_seconds() + trace_msg = ( + f"用户在 {date} 的预约结束时间为 {end_time}, " + f"当前距离预约结束时间还有 " + f"{self._formatDiffTime(abs(time_diff_seconds))}" + ) + if abs(time_diff_seconds) < 120 * 60: + self._showTrace(f"{trace_msg}, 可以续约") + return True, record + else: + self._showTrace(f"{trace_msg}, 无法续约") + return False, None + self._showTrace(f"用户在 {date} 没有有效预约记录, 无法续约") + return False, None + + def postRenewCheck( + self, + record: dict, + ) -> bool: + + date = record["date"] + act_record = self._getReserveRecord(date, "使用中") + if act_record is not None: + if ( + act_record["time"]["begin"] == record["time"]["begin"] + and act_record["time"]["end"] == record["time"]["end"] + ): + self._showTrace( + f"\n" + f" 续约成功 !\n" + f" 日 期 :{date}\n" + f" 时 间 :{act_record['time']['begin']}" + f" - {act_record['time']['end']}\n" + f" 位 置 :{act_record['info']['location']}\n" + f" 状 态 :{act_record['info']['status']}" + ) + return True + else: + self._showTrace( + f"\n" + f" 续约失败 !\n" + f" 续约后结束时间为 {act_record['time']['end']}," + f"与预期结束时间 {record['time']['end']} 不符 !" + ) + return False + self._showTrace(f"用户在 {date} 没有有效预约记录, 无法检查续约结果") + return False + + def _getReserveRecord( + self, + wanted_date: str, + wanted_status: str, + ) -> dict | None: + + if wanted_date is None: + self._showTrace("日期未指定, 无法检查当前预约状态", self.TraceLevel.WARNING) + return None + self._showTrace( + f"正在检查用户在 {wanted_date} 是否有预约状态为 " + f"{wanted_status} 的预约记录......", 20, no_log=True + ) + + checked_count = 0 + max_check_times = 6 + + records_view = self._shell.gotoRecordsView() + for _ in range(max_check_times): + reservations = records_view.loadRecords() + if reservations is None: + return None + for reservation in reservations[checked_count:]: + record = self._decodeReserveRecord(reservation, records_view) + checked_count += 1 + if record is None: + continue + if record["date"] == "": + continue + if record["time"] == {"begin": "", "end": ""}: + continue + if ( + datetime.strptime(record["date"], "%Y-%m-%d").date() + > datetime.strptime(wanted_date, "%Y-%m-%d").date() + ): + continue + if ( + datetime.strptime(record["date"], "%Y-%m-%d").date() + < datetime.strptime(wanted_date, "%Y-%m-%d").date() + ): + return None + if record["info"]["status"] == wanted_status: + self._showTrace( + f"寻找到用户第 {checked_count} 条状态为 " + f"{wanted_status} 的预约记录, " + f"详细信息: {record['date']} " + f"{record['time']['begin']} - " + f"{record['time']['end']} " + f"{record['info']['location']}", + 20, no_log=True, + ) + return record + if not records_view.showMoreRecords(): + break + return None + + def _decodeReserveRecord( + self, + reservation, + records_view: RecordsView, + ) -> dict: + + try: + time_element = records_view.getRecordTimeElement(reservation) + info_elements = records_view.getRecordInfoElements(reservation) + except (NoSuchElementException, TimeoutException, StaleElementReferenceException): + return { + "date": "", + "time": {"begin": "", "end": ""}, + "info": {"location": "", "status": ""}, + } + except Exception: + return { + "date": "", + "time": {"begin": "", "end": ""}, + "info": {"location": "", "status": ""}, + } + time_data = self._decodeReserveTime(time_element) + info_data = self._decodeReserveInfo(info_elements) + return { + "date": time_data["date"], + "time": time_data["time"], + "info": info_data, + } + + def _decodeReserveTime( + self, + time_element, + ) -> dict: + + time_str = time_element.text.strip() + today = datetime.now().date() + if "明天" in time_str: + target_date = today + timedelta(days=1) + date = target_date.strftime("%Y-%m-%d") + elif "今天" in time_str: + target_date = today + date = target_date.strftime("%Y-%m-%d") + elif "昨天" in time_str: + target_date = today - timedelta(days=1) + date = target_date.strftime("%Y-%m-%d") + else: + date_match = re.search(r"(\d{4}-\d{1,2}-\d{1,2})", time_str) + if date_match: + date = date_match.group(1) + else: + date = "" + time_match = re.search( + r"(\d{1,2}:\d{2}) -- (\d{1,2}:\d{2})", time_str + ) + if time_match: + begin_time = time_match.group(1) + end_time = time_match.group(2) + else: + begin_time = "" + end_time = "" + return { + "date": date, + "time": {"begin": begin_time, "end": end_time}, + } + + def _decodeReserveInfo( + self, + info_elements, + ) -> dict: + + location = "" + status = "" + for info in info_elements: + if "已预约" in info.text: + status = "已预约" + elif "使用中" in info.text: + status = "使用中" + elif "已完成" in info.text: + status = "已完成" + elif "已结束使用" in info.text: + status = "已结束使用" + elif "已取消" in info.text: + status = "已取消" + elif "失约" in info.text: + status = "失约" + elif "图书馆" in info.text: + location = info.text.strip() + return {"location": location, "status": status} diff --git a/src/pages/services/ReserveValidator.py b/src/pages/services/ReserveValidator.py new file mode 100644 index 0000000..5587839 --- /dev/null +++ b/src/pages/services/ReserveValidator.py @@ -0,0 +1,221 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +import queue +import time + +from base.MsgBase import MsgBase +from pages.ReserveView import ReserveView +from pages.flows._helpers import timeStrToMins, minsToTimeStr + + +class ReserveValidator(MsgBase): + + def __init__( + self, + input_queue: queue.Queue, + output_queue: queue.Queue, + ) -> None: + + super().__init__(input_queue, output_queue) + + def validate( + self, + reserve_info: dict, + ) -> bool: + + if not self._containRequiredInfo(reserve_info): + return False + if not self._isValidDate(reserve_info): + return False + if not self._isValidBeginTime(reserve_info): + return False + if not self._isValidExpectDuration(reserve_info): + return False + if not self._isValidEndTime(reserve_info): + return False + if not self._finalCheck(reserve_info): + return False + self._showTrace( + f"预约信息检查完成, 准备预约 " + f"{reserve_info['date']} " + f"{reserve_info['begin_time']['time']} - " + f"{reserve_info['end_time']['time']} " + f"图书馆 " + f"{ReserveView.FLOOR_MAP[reserve_info['floor']]} " + f"{ReserveView.ROOM_MAP[reserve_info['room']]} " + f"的座位 {reserve_info['seat_id']}" + ) + return True + + def _containRequiredInfo( + self, + reserve_info: dict, + ) -> bool: + + floor_map = ReserveView.FLOOR_MAP + room_map = ReserveView.ROOM_MAP + try: + if reserve_info.get("floor") is None: + raise ValueError("未指定楼层") + if reserve_info["floor"] not in floor_map: + raise ValueError(f"该楼层 '{reserve_info['floor']}' 不存在") + if reserve_info.get("room") is None: + raise ValueError("未指定房间") + if reserve_info["room"] not in room_map: + raise ValueError(f"该房间 '{reserve_info['room']}' 不存在") + if reserve_info.get("seat_id") is None: + raise ValueError("未指定座位") + if reserve_info["seat_id"] == "": + raise ValueError("未指定座位号") + return True + except ValueError as e: + msg = ( + f"预约信息错误 ! : {e}, " + f"由于缺少必要的预约信息, 无法开始预约流程" + ) + self._showTrace(msg, self.TraceLevel.ERROR) + self._showTrace( + f"预约信息错误 ! : {e}, " + f"由于缺少必要的预约信息, 无法开始预约流程, 请检查预约信息是否完整", + 20, + no_log=True, + ) + return False + + def _isValidDate( + self, + reserve_info: dict, + ) -> bool: + + cur_date_str = time.strftime("%Y-%m-%d", time.localtime()) + cur_timestamp = time.mktime(time.strptime(cur_date_str, "%Y-%m-%d")) + if reserve_info.get("date") is None: + reserve_info["date"] = cur_date_str + self._showTrace(f"预约日期未指定, 自动设置为当前日期: {cur_date_str}") + else: + res_timestamp = time.mktime(time.strptime(reserve_info["date"], "%Y-%m-%d")) + if res_timestamp < cur_timestamp: + self._showTrace( + f"预约日期错误 ! :" + f"{reserve_info['date']} 早于当前日期 {cur_date_str}, 自动设置为当前日期", + self.TraceLevel.WARNING, + ) + reserve_info["date"] = cur_date_str + return True + + def _isValidBeginTime( + self, + reserve_info: dict, + ) -> bool: + + cur_time = time.strftime("%H:%M", time.localtime()) + if reserve_info.get("begin_time") is None: + reserve_info["begin_time"] = {} + if "time" not in reserve_info["begin_time"]: + reserve_info["begin_time"]["time"] = cur_time + self._showTrace(f"开始时间未指定, 自动设置为当前时间: {cur_time}") + if "max_diff" not in reserve_info["begin_time"]: + reserve_info["begin_time"]["max_diff"] = 30 + self._showTrace("开始时间最大时间差未指定, 自动设置为 30 分钟") + if "prefer_early" not in reserve_info["begin_time"]: + reserve_info["begin_time"]["prefer_early"] = True + self._showTrace("是否优先选择更早开始时间未指定, 自动设置为 True") + return True + + def _isValidExpectDuration( + self, + reserve_info: dict, + ) -> bool: + + if reserve_info.get("satisfy_duration") is None: + reserve_info["satisfy_duration"] = True + self._showTrace("预约满足时长要求未指定, 默认满足") + if reserve_info["satisfy_duration"]: + if reserve_info.get("expect_duration") is None: + reserve_info["expect_duration"] = 4 + self._showTrace("需要满足预约持续时间, 但未指定, 使用默认时长为 4 小时") + return True + + def _isValidEndTime( + self, + reserve_info: dict, + ) -> bool: + + if reserve_info.get("end_time") is None: + reserve_info["end_time"] = {} + if "time" not in reserve_info["end_time"]: + end_mins = timeStrToMins(reserve_info["begin_time"]["time"]) + end_mins = end_mins + int(reserve_info["expect_duration"] * 60) + reserve_info["end_time"] = { + "time": minsToTimeStr(end_mins), + "max_diff": 30, + "prefer_early": False, + } + self._showTrace( + f"结束时间未指定, 自动设置为开始时间加上期望时长: " + f"{reserve_info['end_time']['time']}" + ) + if "max_diff" not in reserve_info["end_time"]: + reserve_info["end_time"]["max_diff"] = 30 + self._showTrace("结束时间最大时间差未指定, 自动设置为 30 分钟") + if "prefer_early" not in reserve_info["end_time"]: + reserve_info["end_time"]["prefer_early"] = False + self._showTrace("是否优先选择较晚结束时间未指定, 自动设置为 True") + return True + + def _finalCheck( + self, + reserve_info: dict, + ) -> bool: + + begin_time = reserve_info["begin_time"] + end_time = reserve_info["end_time"] + begin_mins = timeStrToMins(begin_time["time"]) + end_mins = timeStrToMins(end_time["time"]) + + if end_mins < begin_mins and reserve_info["satisfy_duration"] is False: + self._showTrace( + f"结束时间 {end_time['time']} 早于开始时间 {begin_time['time']}, " + f"尝试交换时间", + self.TraceLevel.WARNING, + ) + reserve_info["end_time"], reserve_info["begin_time"] = begin_time, end_time + begin_time, end_time = end_time, begin_time + begin_mins = timeStrToMins(begin_time["time"]) + end_mins = timeStrToMins(end_time["time"]) + + max_end_mins = timeStrToMins("23:30") + if end_mins > max_end_mins: + self._showTrace( + f"结束时间 {end_time['time']} 晚于 23:30, 自动设置为 23:30", + self.TraceLevel.WARNING, + ) + reserve_info["end_time"]["time"] = "23:30" + end_mins = max_end_mins + + if reserve_info["satisfy_duration"]: + if reserve_info["expect_duration"] > 8: + self._showTrace( + f"该用户设置了优先满足时长要求, 但是预约期望持续时间 " + f"{reserve_info['expect_duration']} 小时 " + f"超出最大时长 8 小时, 自动设置为 8 小时", + self.TraceLevel.WARNING, + ) + reserve_info["expect_duration"] = 8 + else: + if end_mins - begin_mins > 8 * 60: + self._showTrace( + f"该用户未设置优先满足时长要求, 但是检查到预约持续时间 " + f"{float((end_mins - begin_mins) / 60)} 小时 " + f"超出最大时长 8 小时, 自动设置为 8 小时", + self.TraceLevel.WARNING, + ) + reserve_info["end_time"]["time"] = minsToTimeStr(begin_mins + 8 * 60) + return True diff --git a/src/pages/services/__init__.py b/src/pages/services/__init__.py new file mode 100644 index 0000000..8545fc0 --- /dev/null +++ b/src/pages/services/__init__.py @@ -0,0 +1,18 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +from pages.services.CaptchaHandler import CaptchaHandler +from pages.services.ReserveValidator import ReserveValidator +from pages.services.RecordChecker import RecordChecker + +__all__ = [ + "CaptchaHandler", + "ReserveValidator", + "RecordChecker", +] diff --git a/src/test_pages_refactor.py b/src/test_pages_refactor.py new file mode 100644 index 0000000..bb6f28e --- /dev/null +++ b/src/test_pages_refactor.py @@ -0,0 +1,162 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. + +AutoLibrary 真实运行测试脚本。 +在 venv 中运行: + py -3 test_pages_refactor.py [--mode MODE] + +MODE 可选值 (默认 1): + 1 = 只预约 + 2 = 只签到 + 3 = 预约 + 签到 + 4 = 只续约 + 7 = 全部 (预约 + 签到 + 续约) +""" +import os +import sys +import argparse + +SRC = os.path.dirname(os.path.abspath(__file__)) +if SRC not in sys.path: + sys.path.insert(0, SRC) + + +def getAppConfigDir() -> str: + appData = os.environ.get("APPDATA", "") + if not appData: + appData = os.path.join(os.path.expanduser("~"), "AppData", "Roaming") + return os.path.join(appData, "AutoLibrary", "configs") + + +def main(): + parser = argparse.ArgumentParser(description="AutoLibrary 真实运行测试") + parser.add_argument( + "--mode", type=int, default=1, + help="运行模式 bitmask: 1=预约 2=签到 4=续约 (默认 1)" + ) + parser.add_argument( + "--group", type=int, default=0, + help="只运行第 N 个启用的任务组 (0=全部, 默认 0)" + ) + parser.add_argument( + "--headless", action="store_true", + help="使用 headless 模式运行浏览器" + ) + args = parser.parse_args() + + # ---- 1. 初始化 ConfigManager ---- + from managers.config.ConfigManager import instance as configInstance + from managers.config.ConfigUtils import ConfigUtils + from utils.JSONReader import JSONReader + + configDir = getAppConfigDir() + if not os.path.isdir(configDir): + print(f"[FAIL] 配置目录不存在: {configDir}") + print("请先启动一次 AutoLibrary GUI 以生成配置文件。") + return 1 + + try: + configInstance(configDir) + except ValueError: + pass + + configPaths = ConfigUtils.getAutomationConfigPaths() + runPath = configPaths.get("run") + userPath = configPaths.get("user") + + if not runPath or not os.path.isfile(runPath): + print(f"[FAIL] run.json 不存在: {runPath}") + return 1 + if not userPath or not os.path.isfile(userPath): + print(f"[FAIL] user.json 不存在: {userPath}") + return 1 + + print(f"[INFO] run : {runPath}") + print(f"[INFO] user : {userPath}") + + # ---- 2. 加载配置 ---- + runConfig = JSONReader(runPath).data() + userConfig = JSONReader(userPath).data() + + if args.mode is not None: + runConfig["mode"]["run_mode"] = args.mode + if args.headless: + runConfig["web_driver"]["headless"] = True + + groups = userConfig.get("groups", []) + if not groups: + print("[FAIL] user.json 中没有任务组") + return 1 + + print(f"[INFO] 运行模式: {runConfig['mode']['run_mode']}") + if args.headless: + print("[INFO] Headless 模式已启用") + + # ---- 3. 创建 AutoLib 并运行 ---- + from pages.AutoLibPages import AutoLibPages + import queue + import threading + + for gi, group in enumerate(groups): + if args.group > 0 and gi + 1 != args.group: + continue + if not group.get("enabled", True): + print(f"[SKIP] 任务组 {gi + 1} '{group.get('name', '未命名')}' 已禁用") + continue + + users = group.get("users", []) + enabledUsers = [u for u in users if u.get("enabled", True)] + if not enabledUsers: + print(f"[SKIP] 任务组 {gi + 1} 没有启用的用户") + continue + + print(f"\n{'=' * 60}") + print(f"任务组 {gi + 1}/{len(groups)}: '{group.get('name', '未命名')}'") + print(f"启用的用户: {len(enabledUsers)}/{len(users)}") + print(f"{'=' * 60}") + + outputQueue = queue.Queue() + stopConsumer = threading.Event() + traceLines = [] + + def consumeTrace(): + while not stopConsumer.is_set(): + try: + msg = outputQueue.get(timeout=0.3) + traceLines.append(msg) + print(msg) + except queue.Empty: + continue + + consumer = threading.Thread(target=consumeTrace, daemon=True) + consumer.start() + + try: + autoLib = AutoLibPages( + input_queue=queue.Queue(), + output_queue=outputQueue, + run_config=runConfig, + ) + autoLib.run({"users": enabledUsers}) + autoLib.close() + except Exception as e: + print(f"[FAIL] 运行异常: {e}") + import traceback + traceback.print_exc() + return 1 + finally: + stopConsumer.set() + consumer.join(timeout=2) + + print("\n[OK] 测试完成") + return 0 + + +if __name__ == "__main__": + sys.exit(main())