diff --git a/src/Main.py b/src/Main.py index 8716d23..c9e50ca 100644 --- a/src/Main.py +++ b/src/Main.py @@ -24,7 +24,7 @@ def main(): translator = QTranslator() if translator.load(":/res/translators/qtbase_zh_CN.ts"): app.installTranslator(translator) - app.setStyle('Fusion') + app.setStyle("Fusion") app.setApplicationName("AutoLibrary") if not initializeApp(): sys.exit(-1) diff --git a/src/gui/ALAutoScriptOrchDialog/_helpers.py b/src/gui/ALAutoScriptOrchDialog/_helpers.py index e287560..16a53c5 100644 --- a/src/gui/ALAutoScriptOrchDialog/_helpers.py +++ b/src/gui/ALAutoScriptOrchDialog/_helpers.py @@ -204,11 +204,11 @@ class _DateOffsetContainer(QWidget): val = self._spinBox.value() unit = self._unitCombo.currentData() if unit == "weeks": - return val * 7 + return val*7 if unit == "months": - return val * 30 + return val*30 if unit == "years": - return val * 365 + return val*365 return val diff --git a/src/gui/ALMainWorkers.py b/src/gui/ALMainWorkers.py index 66d5319..343acfa 100644 --- a/src/gui/ALMainWorkers.py +++ b/src/gui/ALMainWorkers.py @@ -12,7 +12,8 @@ import time import queue from PySide6.QtCore import ( - Slot, Signal, QThread + Signal, + QThread, ) from base.MsgBase import MsgBase @@ -30,7 +31,7 @@ class AutoLibWorker(MsgBase, QThread): self, input_queue: queue.Queue, output_queue: queue.Queue, - config_paths: dict + config_paths: dict, ): MsgBase.__init__(self, input_queue, output_queue) @@ -45,7 +46,7 @@ class AutoLibWorker(MsgBase, QThread): if current_time >= "23:30" or current_time <= "07:30": self._showTrace( "当前时间不在图书馆开放时间内, 请在 07:30 - 23:30 之间尝试", - self.TraceLevel.WARNING + self.TraceLevel.WARNING, ) return False self._showLog(f"时间检查通过, 当前时间: {current_time}", self.TraceLevel.INFO) @@ -60,83 +61,113 @@ class AutoLibWorker(MsgBase, QThread): ): self._showTrace( "配置文件路径不存在, 请检查配置文件路径是否正确", - self.TraceLevel.ERROR + self.TraceLevel.ERROR, ) return False - self._showLog(f"配置文件路径检查通过, 路径: {self.__config_paths}", self.TraceLevel.INFO) + self._showLog( + f"配置文件路径检查通过, 路径: {self.__config_paths}", + self.TraceLevel.INFO, + ) return True def loadConfigs( - self + self, ) -> bool: self._showTrace( f"正在加载配置文件, 运行配置文件路径: {self.__config_paths["run"]}", - no_log=True + no_log=True, ) self._run_config = JSONReader(self.__config_paths["run"]).data() self._showTrace( f"正在加载配置文件, 用户配置文件路径: {self.__config_paths["user"]}", - no_log=True + no_log=True, ) self._user_config = JSONReader(self.__config_paths["user"]).data() if self._run_config is None or self._user_config is None: self._showTrace( "配置文件加载失败, 请检查配置文件是否正确", - self.TraceLevel.ERROR + self.TraceLevel.ERROR, ) return False if not self._user_config.get("groups"): self._showTrace( "用户配置文件中无有效任务组, 请检查用户配置文件是否正确", - self.TraceLevel.WARNING + self.TraceLevel.WARNING, ) return False self._showLog( - f"配置文件加载成功, 任务组数量: {len(self._user_config.get('groups', []))}", - self.TraceLevel.INFO + f"配置文件加载成功, 任务组数量: {len(self._user_config.get("groups"))}", + self.TraceLevel.INFO, ) return True + def _runName( + self, + ) -> str: + + return "常规任务" + + def _beforeCreateAutoLib( + self, + ): + + return + + def _onChecksFailed( + self, + ) -> bool: + + return True + + def _onFinished( + self, + ): + + self.autoLibWorkerIsFinished.emit() + + def _onError( + self, + error_msg: str, + ): + + self._showTrace(error_msg, self.TraceLevel.ERROR) + self.autoLibWorkerFinishedWithError.emit() + def run( - self + self, ): auto_lib = None - self._showTrace("AutoLibrary 开始运行") - if not self.checkTimeAvailable()\ - or not self.checkConfigPaths(): - # time or config existence check failed, skip and finish - pass + self._showTrace(f"{self._runName()} 开始运行") + + if not self.checkTimeAvailable() or not self.checkConfigPaths(): + if not self._onChecksFailed(): + return else: try: if not self.loadConfigs(): raise Exception("配置文件加载失败") + self._beforeCreateAutoLib() auto_lib = AutoLib( self._input_queue, self._output_queue, - self._run_config + self._run_config, ) groups = self._user_config.get("groups") for group in groups: - if not group["enabled"]: - self._showTrace(f"任务组 {group["name"]} 已跳过", no_log=True) + if not group.get("enabled", False): + self._showTrace(f"任务组 {group.get("name", "未知")} 已跳过", no_log=True) continue - self._showTrace(f"正在运行任务组 {group["name"]}", no_log=True) - auto_lib.run( - { "users": group.get("users", []) } - ) + self._showTrace(f"正在运行任务组 {group.get("name", "未知")}", no_log=True) + auto_lib.run({"users": group.get("users", [])}) except Exception as e: - self._showTrace( - f"AutoLibrary 运行时发生异常 : {e}", - self.TraceLevel.ERROR - ) - self.autoLibWorkerFinishedWithError.emit() + self._onError(f"{self._runName()} 运行时发生异常 : {e}") return if auto_lib: auto_lib.close() - self._showTrace("AutoLibrary 运行结束") - self.autoLibWorkerIsFinished.emit() + self._showTrace(f"{self._runName()} 运行结束") + self._onFinished() class TimerTaskWorker(AutoLibWorker): @@ -148,70 +179,54 @@ class TimerTaskWorker(AutoLibWorker): timer_task: dict, input_queue: queue.Queue, output_queue: queue.Queue, - config_paths: dict + config_paths: dict, ): super().__init__(input_queue, output_queue, config_paths) self.__timer_task = timer_task - self.autoLibWorkerIsFinished.connect(self.onTimerTaskIsFinished) - self.autoLibWorkerFinishedWithError.connect(self.onTimerTaskFinishedWithError) + def _runName( + self, + ) -> str: - def run( - self + return f"定时任务 '{self.__timer_task.get("name", "未知")}'" + + def _beforeCreateAutoLib( + self, + ): + + self.applyRepeatAutoScript() + + def _onChecksFailed( + self, + ) -> bool: + + self._showTrace("定时任务跳过执行: 时间或配置文件检查未通过") + self.timerTaskWorkerIsFinished.emit(False, self.__timer_task) + return False + + def _onFinished( + self, ): - self._showTrace(f"定时任务 {self.__timer_task['name']} 开始运行") - if not self.checkTimeAvailable() or not self.checkConfigPaths(): - self._showTrace("定时任务跳过执行 (时间或配置文件检查未通过)") - self.timerTaskWorkerIsFinished.emit(False, self.__timer_task) - return - try: - if not self.loadConfigs(): - raise Exception("配置文件加载失败") - self.applyRepeatAutoScript() - auto_lib = AutoLib( - self._input_queue, - self._output_queue, - self._run_config - ) - groups = self._user_config.get("groups") - for group in groups: - if not group["enabled"]: - self._showTrace( - f"任务组 {group['name']} 已跳过", - no_log=True - ) - continue - self._showTrace( - f"正在运行任务组 {group['name']}", - no_log=True - ) - auto_lib.run( - {"users": group.get("users", [])} - ) - auto_lib.close() - except Exception as e: - self._showTrace( - f"定时任务 {self.__timer_task['name']} 运行时发生异常: {e}", - self.TraceLevel.ERROR - ) - self.timerTaskWorkerIsFinished.emit(True, self.__timer_task) - return - self._showTrace(f"定时任务 {self.__timer_task['name']} 运行结束") self.timerTaskWorkerIsFinished.emit(False, self.__timer_task) + def _onError( + self, + error_msg: str, + ): + + self._showTrace(error_msg, self.TraceLevel.ERROR) + self.timerTaskWorkerIsFinished.emit(True, self.__timer_task) + def applyRepeatAutoScript( - self + self, ): auto_script = self.__timer_task.get("repeat_auto_script", "") if not auto_script or not auto_script.strip(): return - self._showTrace( - f"检测到重复定时任务 AutoScript, 开始执行...", - no_log=True - ) + self._showTrace("检测到重复定时任务 AutoScript, 开始执行...", no_log=True) groups = self._user_config.get("groups", []) affected_count = 0 for group in groups: @@ -224,30 +239,10 @@ class TimerTaskWorker(AutoLibWorker): affected_count += 1 except ValueError as e: self._showTrace( - f"AutoScript 执行错误 (用户 {user['username']}): {e}", - self.TraceLevel.ERROR + f"AutoScript 执行错误 (用户 {user.get("username", "未知")}): {e}", + self.TraceLevel.ERROR, ) self._showLog( - f"AutoScript 执行完毕, " - f"影响 {affected_count} 个用户", - self.TraceLevel.INFO + f"AutoScript 执行完毕, 影响 {affected_count} 个用户", + self.TraceLevel.INFO, ) - - @Slot() - def onTimerTaskIsFinished( - self - ): - - self._showTrace(f"定时任务 {self.__timer_task['name']} 运行结束") - self.timerTaskWorkerIsFinished.emit(False, self.__timer_task) - - @Slot() - def onTimerTaskFinishedWithError( - self - ): - - self._showTrace( - f"定时任务 {self.__timer_task['name']} 运行时发生异常", - self.TraceLevel.ERROR - ) - self.timerTaskWorkerIsFinished.emit(True, self.__timer_task) diff --git a/src/gui/ALSeatMapView.py b/src/gui/ALSeatMapView.py index a215e2a..2fd1e2c 100644 --- a/src/gui/ALSeatMapView.py +++ b/src/gui/ALSeatMapView.py @@ -8,7 +8,9 @@ You may use, modify, and distribute this file under the terms of the MIT License See the LICENSE file for details. """ from PySide6.QtCore import ( - Qt, Slot, QEvent + Qt, + Slot, + QEvent ) from PySide6.QtWidgets import ( QFrame, diff --git a/src/gui/ALTimerTaskAddDialog.py b/src/gui/ALTimerTaskAddDialog.py index 2252909..10b4d65 100644 --- a/src/gui/ALTimerTaskAddDialog.py +++ b/src/gui/ALTimerTaskAddDialog.py @@ -12,9 +12,23 @@ import uuid from enum import Enum from datetime import datetime, timedelta -from PySide6.QtCore import Slot, QDateTime, QUrl +from PySide6.QtCore import ( + Slot, + QDateTime, + QUrl +) from PySide6.QtGui import QDesktopServices -from PySide6.QtWidgets import QLabel, QDialog, QWidget, QSpinBox, QHBoxLayout, QVBoxLayout, QGridLayout, QDateTimeEdit, QGroupBox, QPushButton +from PySide6.QtWidgets import ( + QLabel, + QDialog, + QWidget, + QSpinBox, + QHBoxLayout, + QVBoxLayout, + QDateTimeEdit, + QGroupBox, + QPushButton +) from gui.resources.ui.Ui_ALTimerTaskAddDialog import Ui_ALTimerTaskAddDialog from utils.TimerUtils import TimerUtils diff --git a/src/pages/AutoLib.py b/src/pages/AutoLib.py index bfac01a..fab83e4 100644 --- a/src/pages/AutoLib.py +++ b/src/pages/AutoLib.py @@ -68,7 +68,7 @@ class AutoLib(MsgBase): self._showTrace("正在初始化浏览器驱动......", no_log=True) web_driver_config: dict = self.__run_config.get("web_driver", None) - self.__driver_type = web_driver_config.get("driver_type") + self.__driver_type = web_driver_config.get("driver_type", "none") match self.__driver_type.lower(): case "edge": driver_options = webdriver.EdgeOptions() @@ -85,7 +85,7 @@ class AutoLib(MsgBase): if not web_driver_config: self._showTrace("未配置浏览器驱动参数 !", self.TraceLevel.ERROR) return False - if web_driver_config.get("headless"): + if web_driver_config.get("headless", False): driver_options.add_argument("--headless") driver_options.add_argument("--disable-gpu") driver_options.add_argument("--no-sandbox") @@ -122,12 +122,12 @@ class AutoLib(MsgBase): driver_options.add_argument(f"user-agent={user_agent}") # init browser driver - self.__driver_path = web_driver_config.get("driver_path") + self.__driver_path = web_driver_config.get("driver_path", "") if not self.__driver_path: self._showTrace("未配置浏览器驱动路径 !", self.TraceLevel.WARNING) return False - self.__driver_path = os.path.abspath(self.__driver_path) try: + self.__driver_path = os.path.abspath(self.__driver_path) service = None match self.__driver_type.lower(): case "edge": @@ -236,7 +236,6 @@ class AutoLib(MsgBase): # result : -1 - terminate, 0 - success, 1 - failed, 2 - passed result: int = 2 - # login auto_captcha: bool = login_config.get("auto_captcha", True) if not self.__login_page.login( @@ -255,7 +254,7 @@ class AutoLib(MsgBase): } # reserve if run_mode["auto_reserve"]: - if self.__record_checker.canReserve(self.__shell, reserve_info.get("date")): + if self.__record_checker.canReserve(self.__shell, reserve_info["date"]): if self.__reserve_checker.check(reserve_info): ctx = ReserveContext( username=username, @@ -331,30 +330,29 @@ class AutoLib(MsgBase): ) -> None: self.__user_config = user_config - user_counter: dict[str, int] = {"current": 0, "success": 0, "failed": 0, "passed": 0} - users: list = self.__user_config["users"] + users: list = self.__user_config.get("users", []) self._showTrace(f"共发现 {len(users)} 个用户") for user in users: user_counter["current"] += 1 self._showTrace( - f"正在处理第 {user_counter['current']}/{len(users)} 个用户: {user['username']}......", + f"正在处理第 {user_counter["current"]}/{len(users)} 个用户: {user.get("username", "未知")}......", no_log=True, ) - if not user["enabled"]: - self._showTrace(f"用户 {user['username']} 已跳过") + if not user.get("enabled", False): + self._showTrace(f"用户 {user.get("username", "未知")} 已跳过") user_counter["passed"] += 1 continue r: int = self.__run( - username=user["username"], - password=user["password"], - login_config=self.__run_config["login"], - run_mode_config=self.__run_config["mode"], - reserve_info=user["reserve_info"], + username=user.get("username", ""), + password=user.get("password", ""), + login_config=self.__run_config.get("login", {}), + run_mode_config=self.__run_config.get("mode", {}), + reserve_info=user.get("reserve_info", {}), ) if r == -1: self._showTrace( - f"用户 {user['username']} 处理过程中页面发生异常, 无法继续操作, 任务已终止 !", + f"用户 {user.get("username", "未知")} 处理过程中页面发生异常, 无法继续操作, 任务已终止 !", self.TraceLevel.WARNING, ) break @@ -365,10 +363,10 @@ class AutoLib(MsgBase): elif r == 2: user_counter["passed"] += 1 self._showTrace( - f"处理完成, 共计 {user_counter['current']} 个用户, " - f"成功 {user_counter['success']} 个用户, " - f"失败 {user_counter['failed']} 个用户, " - f"跳过 {user_counter['passed']} 个用户" + f"处理完成, 共计 {user_counter["current"]} 个用户, " + f"成功 {user_counter["success"]} 个用户, " + f"失败 {user_counter["failed"]} 个用户, " + f"跳过 {user_counter["passed"]} 个用户" ) return diff --git a/src/pages/ReserveView.py b/src/pages/ReserveView.py index eda9b54..1f7e459 100644 --- a/src/pages/ReserveView.py +++ b/src/pages/ReserveView.py @@ -19,7 +19,6 @@ from selenium.common.exceptions import ( ) from pages.components.SeatMapDialog import SeatMapDialog -from pages.components.ReserveResultDialog import ReserveResultDialog class ReserveView: @@ -102,31 +101,30 @@ class ReserveView: def selectRoom( self, room: str, - ) -> bool: + ) -> SeatMapDialog | None: try: WebDriverWait(self._driver, 2).until( EC.element_to_be_clickable(self.FIND_ROOM_BTN) ).click() except (TimeoutException, ElementNotInteractableException): - return False + return None except Exception: - return False + return None try: WebDriverWait(self._driver, 2).until( EC.element_to_be_clickable((By.ID, self.ROOM_BTN_FMT.format(room=room))) ).click() - return True except (TimeoutException, ElementNotInteractableException): - return False + return None except Exception: - return False - - def openSeatMap( - self, - ) -> SeatMapDialog: - - return SeatMapDialog(self._driver) + return None + try: + return SeatMapDialog(self._driver) + except (TimeoutException): + return None + except Exception: + return None def submitReserve( self, @@ -142,12 +140,6 @@ class ReserveView: except Exception: return False - def waitResultDialog( - self, - ) -> ReserveResultDialog: - - return ReserveResultDialog(self._driver) - def refresh( self, ) -> None: diff --git a/src/pages/components/Dialog.py b/src/pages/components/Dialog.py index 95d1bfe..9f4e268 100644 --- a/src/pages/components/Dialog.py +++ b/src/pages/components/Dialog.py @@ -17,6 +17,9 @@ class Dialog: """ Context-managed overlay / modal / dialog on a page. + The constructor verifies that the root element is visible — if not, + the dialog is not on screen and a :exc:`TimeoutException` is raised. + Automates the lifecycle: wait for appearance on enter, optionally wait for disappearance on exit. """ @@ -34,13 +37,14 @@ class Dialog: self._auto_close: bool = auto_close_on_exit self._timeout: float = wait_timeout + WebDriverWait(self._driver, self._timeout).until( + EC.visibility_of_element_located(self._root_locator) + ) + def __enter__( self, ) -> "Dialog": - WebDriverWait(self._driver, self._timeout).until( - EC.visibility_of_element_located(self._root_locator) - ) return self def __exit__( diff --git a/src/pages/components/RenewDialog.py b/src/pages/components/RenewDialog.py index d663a1c..150eca1 100644 --- a/src/pages/components/RenewDialog.py +++ b/src/pages/components/RenewDialog.py @@ -17,6 +17,10 @@ from selenium.webdriver.remote.webdriver import WebDriver from selenium.webdriver.remote.webelement import WebElement from pages.components.Dialog import Dialog +from pages.strategies.TimeSelectMaker import ( + TimeSelectionResult, + TimeSelectMaker, +) class RenewDialog(Dialog): @@ -80,6 +84,26 @@ class RenewDialog(Dialog): return self._findAll(*self.TIME_OPTS) + def selectBestTime( + self, + target_time: int, + max_time_diff: int, + prefer_earlier: bool, + ) -> TimeSelectionResult: + + all_time_opts = self.getTimeOptions() + if not all_time_opts: + return TimeSelectionResult() + result = TimeSelectMaker.forRenew().decide( + all_time_opts, + target_time, + max_time_diff, + prefer_earlier, + ) + if result.selected_index >= 0: + all_time_opts[result.selected_index].click() + return result + def getOkButton( self, ) -> WebElement: @@ -93,7 +117,8 @@ class RenewDialog(Dialog): try: self._find(*self.OK_BTN).click() return True - except (NoSuchElementException, TimeoutException, ElementNotInteractableException): + except (NoSuchElementException, TimeoutException, + ElementNotInteractableException): return False except Exception: return False diff --git a/src/pages/components/ReserveResultDialog.py b/src/pages/components/ReserveResultDialog.py index d1ac48e..8ddb858 100644 --- a/src/pages/components/ReserveResultDialog.py +++ b/src/pages/components/ReserveResultDialog.py @@ -31,12 +31,18 @@ class ReserveResultDialog(Dialog): super().__init__(driver, self.ROOT, auto_close_on_exit=False) + def _titleLocator( + self, + ) -> tuple: + + return (By.CSS_SELECTOR, ".layoutSeat dt") + def getTitle( self, ) -> str: try: - return self._find(*self._title_locator()).text + return self._find(*self._titleLocator()).text except (NoSuchElementException, StaleElementReferenceException): return "" except Exception: @@ -73,9 +79,3 @@ class ReserveResultDialog(Dialog): return [] except Exception: return [] - - def _title_locator( - self, - ) -> tuple: - - return (By.CSS_SELECTOR, ".layoutSeat dt") diff --git a/src/pages/components/TimeSelectDialog.py b/src/pages/components/TimeSelectDialog.py index 643769f..1a843a3 100644 --- a/src/pages/components/TimeSelectDialog.py +++ b/src/pages/components/TimeSelectDialog.py @@ -7,6 +7,11 @@ This software is provided "as is", without any warranty of any kind. You may use, modify, and distribute this file under the terms of the MIT License. See the LICENSE file for details. """ +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Callable, Optional + from selenium.common.exceptions import ( NoSuchElementException, TimeoutException, @@ -16,6 +21,16 @@ from selenium.webdriver.remote.webdriver import WebDriver from selenium.webdriver.remote.webelement import WebElement from pages.components.Dialog import Dialog +from pages.strategies.TimeSelectMaker import ( + TimeRangeResult, + TimeSelectionResult, + TimeSelectMaker, + minsToTimeStr, + timeStrToMins, +) + +if TYPE_CHECKING: + from pages.flows.ReserveFlow import ReserveContext class TimeSelectDialog(Dialog): @@ -31,9 +46,56 @@ class TimeSelectDialog(Dialog): def __init__( self, driver: WebDriver, + tracer: Optional[Callable[[str, int], None]] = None, ) -> None: super().__init__(driver, self.ROOT, auto_close_on_exit=False) + self._tracer = tracer + + def _trace( + self, + msg: str, + level: int = logging.INFO, + ) -> None: + + if self._tracer is not None: + self._tracer(msg, level) + + def _logTimeStep( + self, + time_type: str, + target_mins: int, + max_diff: int, + step_result: TimeSelectionResult, + ) -> bool: + + if step_result.selected_index >= 0: + abs_diff = abs(step_result.actual_diff) + if step_result.actual_diff < 0: + relation = f"早了 {abs_diff} 分钟" + elif step_result.actual_diff > 0: + relation = f"晚了 {abs_diff} 分钟" + else: + relation = f"正好等于 {time_type}" + self._trace( + f"选择距离期望 {time_type} 最近的 {step_result.display_text}, " + f"与期望 {time_type} 相比 {relation}" + ) + return True + if not step_result.free_times: + self._trace( + f"{time_type} 选择失败 ! : 当前未查询到可用时间", + logging.ERROR, + ) + else: + target_str = minsToTimeStr(target_mins) + self._trace( + f"无法选择最近的 {time_type} {target_str}, " + f"所有可选时间与目标时间相差都超过 {max_diff} 分钟", + logging.WARNING, + ) + self._trace(f"当前可供预约的 {time_type} 有: {step_result.free_times}") + return False def getTimeOptions( self, @@ -52,3 +114,119 @@ class TimeSelectDialog(Dialog): By.CSS_SELECTOR, f"#{time_id} ul li a", ) + + def selectNearestTime( + self, + time_id: str, + target_time: int, + max_time_diff: int, + prefer_earlier: bool, + ) -> TimeSelectionResult: + + all_time_opts = self.getTimeOptions(time_id) + if not all_time_opts: + return TimeSelectionResult() + result = TimeSelectMaker.forReserve().decide( + all_time_opts, + target_time, + max_time_diff, + prefer_earlier, + ) + if result.selected_index >= 0: + all_time_opts[result.selected_index].click() + return result + + def selectTimeRange( + self, + begin_target: int, + end_target: int, + begin_max_diff: int = 30, + end_max_diff: int = 30, + begin_prefer_early: bool = True, + end_prefer_early: bool = False, + satisfy_duration: bool = True, + expect_duration: int = 4, + library_close_mins: int = TimeSelectMaker.LIBRARY_CLOSE_MINS, + ) -> TimeRangeResult: + + begin_result = self.selectNearestTime( + "startTime", + begin_target, + begin_max_diff, + begin_prefer_early, + ) + if begin_result.selected_index < 0: + return TimeRangeResult(begin_result=begin_result) + actual_begin = begin_result.selected_value + if satisfy_duration: + end_target = TimeSelectMaker.calcEndTime( + actual_begin, + expect_duration, + library_close_mins, + ) + end_result = self.selectNearestTime( + "endTime", + end_target, + end_max_diff, + end_prefer_early, + ) + if end_result.selected_index < 0: + return TimeRangeResult( + begin_result=begin_result, + actual_begin_mins=actual_begin, + end_result=end_result, + expect_end_mins=end_target, + ) + return TimeRangeResult( + begin_result=begin_result, + end_result=end_result, + actual_begin_mins=actual_begin, + actual_end_mins=end_result.selected_value, + expect_end_mins=end_target, + ) + + def selectSeatTime( + self, + ctx: ReserveContext, + library_close_mins: int = TimeSelectMaker.LIBRARY_CLOSE_MINS, + ) -> bool: + + exp_beg_mins = timeStrToMins(ctx.begin_time) + exp_end_mins = timeStrToMins(ctx.end_time) + result = self.selectTimeRange( + begin_target=exp_beg_mins, + end_target=exp_end_mins, + begin_max_diff=ctx.begin_max_diff, + end_max_diff=ctx.end_max_diff, + begin_prefer_early=ctx.begin_prefer_early, + end_prefer_early=ctx.end_prefer_early, + satisfy_duration=ctx.satisfy_duration, + expect_duration=ctx.expect_duration, + library_close_mins=library_close_mins, + ) + if not self._logTimeStep("开始时间", exp_beg_mins, ctx.begin_max_diff, result.begin_result): + return False + if ctx.satisfy_duration: + unclipped = result.actual_begin_mins + ctx.expect_duration*60 + if unclipped > library_close_mins: + self._trace( + f"预约持续时间 {ctx.expect_duration} 小时, 超过最大预约时间 {minsToTimeStr(library_close_mins)}, " + f"自动调整为 {minsToTimeStr(library_close_mins)}", + logging.WARNING, + ) + act_beg_str = minsToTimeStr(result.actual_begin_mins) + exp_end_str = minsToTimeStr(result.expect_end_mins) + self._trace( + f"需要满足期望预约持续时间: {ctx.expect_duration} 小时, " + f"根据开始时间 {act_beg_str} 计算结束时间: {exp_end_str}" + ) + if not self._logTimeStep("结束时间", result.expect_end_mins, ctx.end_max_diff, result.end_result): + return False + act_beg_str = minsToTimeStr(result.actual_begin_mins) + act_end_str = minsToTimeStr(result.actual_end_mins) + exp_end_str = minsToTimeStr(result.expect_end_mins) + self._trace( + f"期望预约时间段: {ctx.begin_time} - {exp_end_str}, " + f"实际预约时间段: {act_beg_str} - {act_end_str}" + ) + return True diff --git a/src/pages/flows/RenewFlow.py b/src/pages/flows/RenewFlow.py index fb06111..b8f6421 100644 --- a/src/pages/flows/RenewFlow.py +++ b/src/pages/flows/RenewFlow.py @@ -20,7 +20,7 @@ from base.MsgBase import MsgBase from pages.MainShell import MainShell from pages.components.RenewDialog import RenewDialog from pages.flows._helpers import timeStrToMins, minsToTimeStr -from pages.strategies.timeSelectMaker import TimeSelectMaker +from pages.strategies.TimeSelectMaker import TimeSelectMaker class RenewFlow(MsgBase): @@ -46,10 +46,10 @@ class RenewFlow(MsgBase): renew_info: dict, ) -> bool: - max_diff = renew_info["max_diff"] - prefer_earlier = renew_info["prefer_early"] + max_diff = renew_info.get("max_diff", 30) + prefer_earlier = renew_info.get("prefer_early", True) end_time = record["time"]["end"] - target_renew_mins = timeStrToMins(end_time) + renew_info["expect_duration"] * 60 + target_renew_mins = timeStrToMins(end_time) + renew_info.get("expect_duration", 2) * 60 if not self._validateRenewTime(end_time, target_renew_mins): return False if not self._shell.waitExtendButton(): @@ -74,20 +74,12 @@ class RenewFlow(MsgBase): self._shell.refresh() self._showTrace(f"用户 {username} 续约失败 !", self.TraceLevel.ERROR) return False - renew_ok_btn = dialog.getOkButton() - renew_time_opts = dialog.getTimeOptions() - if not renew_time_opts: - self._showTrace("当前未查询到可用续约时间 !", self.TraceLevel.WARNING) - self._shell.refresh() - return False - result = TimeSelectMaker.forRenew().decide( - renew_time_opts, + result = dialog.selectBestTime( target_renew_mins, max_diff, - prefer_earlier + prefer_earlier, ) if result.selected_index >= 0: - renew_time_opts[result.selected_index].click() abs_diff = abs(result.actual_diff) if result.actual_diff < 0: relation = f"早了 {abs_diff} 分钟" @@ -100,15 +92,18 @@ class RenewFlow(MsgBase): f"与期望续约时间相比 {relation}" ) record["time"]["end"] = result.display_text.strip() - renew_ok_btn.click() + dialog.clickOk() self._shell.refresh() return True - self._showTrace( - "无法选择最近的可用续约时间 ! " - f"所有可选时间与目标时间相差都超过了 {max_diff} 分钟 !", - self.TraceLevel.WARNING, - ) - self._showTrace(f"当前可供续约的时间有: {result.free_times}") + if not result.free_times: + self._showTrace("当前未查询到可用续约时间 !", self.TraceLevel.WARNING) + else: + self._showTrace( + "无法选择最近的可用续约时间 ! " + f"所有可选时间与目标时间相差都超过了 {max_diff} 分钟 !", + self.TraceLevel.WARNING, + ) + self._showTrace(f"当前可供续约的时间有: {result.free_times}") self._shell.refresh() return False except (NoSuchElementException, TimeoutException) as e: diff --git a/src/pages/flows/ReserveFlow.py b/src/pages/flows/ReserveFlow.py index e26eab8..1722903 100644 --- a/src/pages/flows/ReserveFlow.py +++ b/src/pages/flows/ReserveFlow.py @@ -9,7 +9,6 @@ See the LICENSE file for details. """ import queue from dataclasses import dataclass -from typing import Optional from selenium.common.exceptions import ( ElementNotInteractableException, @@ -20,8 +19,7 @@ from selenium.webdriver.remote.webdriver import WebDriver from base.MsgBase import MsgBase from pages.MainShell import MainShell -from pages.flows._helpers import timeStrToMins, minsToTimeStr -from pages.strategies.timeSelectMaker import TimeSelectMaker +from pages.strategies.TimeSelectMaker import TimeSelectMaker from pages.ReserveView import ReserveView from pages.components.ReserveResultDialog import ReserveResultDialog from pages.components.TimeSelectDialog import TimeSelectDialog @@ -60,14 +58,12 @@ class ReserveFlow(MsgBase): super().__init__(input_queue, output_queue) self._driver: WebDriver = driver self._shell: MainShell = shell - self._ctx: Optional[ReserveContext] = None def execute( self, ctx: ReserveContext, ) -> bool: - self._ctx = ctx submit_reserve = False reserve_success = False have_hover_on_page = False @@ -93,13 +89,13 @@ class ReserveFlow(MsgBase): self._showTrace(f"选择楼层失败 ! : {display_floor} 不可用", self.TraceLevel.ERROR) return False self._showTrace(f"楼层 {ReserveView.FLOOR_MAP.get(ctx.floor)} 选择成功 !") - if not view.selectRoom(ctx.room): + seat_map = view.selectRoom(ctx.room) + if seat_map is None: display_room = ReserveView.ROOM_MAP.get(ctx.room, ctx.room) self._showTrace(f"选择房间失败 ! : {display_room} 不可用", self.TraceLevel.ERROR) return False self._showTrace(f"房间 {ReserveView.ROOM_MAP.get(ctx.room)} 选择成功 !") have_hover_on_page = True - seat_map = view.openSeatMap() seat_status = seat_map.selectSeat(ctx.seat_id) if seat_status is None: self._showTrace( @@ -108,41 +104,44 @@ class ReserveFlow(MsgBase): ) else: self._showTrace(f"座位 {ctx.seat_id} 选择成功 ! : 当前状态 - '{seat_status}'") - time_dialog = TimeSelectDialog(self._driver) - select_time_ok = self._selectSeatTime(time_dialog) - if not select_time_ok: - self._showTrace("选择时间失败 !", self.TraceLevel.ERROR) + try: + time_dialog = TimeSelectDialog(self._driver, tracer=self._showTrace) + except TimeoutException: + self._showTrace("时间选择面板未出现 !", self.TraceLevel.ERROR) else: - try: - view.submitReserve() - submit_reserve = True - with ReserveResultDialog(self._driver) as result: - if result.isFailure(): - self._showTrace("预约失败", self.TraceLevel.ERROR) - elif result.isSuccess(): - details = result.getDetailTexts() - if len(details) >= 6: - self._showTrace( - f"\n" - f" 预约成功 !\n" - f" {details[1]}\n" - f" {details[2]}\n" - f" {details[3]}\n" - f" 签到时间 :{details[5]}" - ) + if not time_dialog.selectSeatTime(ctx): + self._showTrace("选择时间失败 !", self.TraceLevel.ERROR) + else: + try: + view.submitReserve() + submit_reserve = True + with ReserveResultDialog(self._driver) as result: + if result.isFailure(): + self._showTrace("预约失败", self.TraceLevel.ERROR) + elif result.isSuccess(): + details = result.getDetailTexts() + if len(details) >= 6: + self._showTrace( + f"\n" + f" 预约成功 !\n" + f" {details[1]}\n" + f" {details[2]}\n" + f" {details[3]}\n" + f" 签到时间 :{details[5]}" + ) + else: + self._showTrace( + "\n" + " 预约成功 !\n" + " 未找获取到详细信息" + ) + reserve_success = True else: - self._showTrace( - "\n" - " 预约成功 !\n" - " 未找获取到详细信息" - ) - reserve_success = True - else: - self._showTrace("预约结果加载失败 !", self.TraceLevel.ERROR) - except (TimeoutException, ElementNotInteractableException): - self._showTrace("预约提交失败 !", self.TraceLevel.ERROR) - except Exception: - self._showTrace("预约提交失败 !", self.TraceLevel.ERROR) + self._showTrace("预约结果加载失败 !", self.TraceLevel.ERROR) + except (TimeoutException, ElementNotInteractableException): + self._showTrace("预约提交失败 !", self.TraceLevel.ERROR) + except Exception: + self._showTrace("预约提交失败 !", self.TraceLevel.ERROR) if not submit_reserve and have_hover_on_page: view.refresh() if reserve_success: @@ -150,113 +149,3 @@ class ReserveFlow(MsgBase): else: self._showTrace(f"用户 {ctx.username} 预约失败 !", self.TraceLevel.ERROR) return reserve_success - - def _selectSeatTime( - self, - time_dialog: TimeSelectDialog, - ) -> bool: - - ctx = self._ctx - exp_beg_tm_str = ctx.begin_time - exp_end_tm_str = ctx.end_time - exp_beg_mins = timeStrToMins(exp_beg_tm_str) - exp_end_mins = timeStrToMins(exp_end_tm_str) - act_beg_mins = exp_beg_mins - act_beg_tm_str = exp_beg_tm_str - act_end_mins = exp_end_mins - act_end_tm_str = exp_end_tm_str - act_beg_mins = self._selectNearestTime( - time_dialog, - time_id="startTime", - time_type="开始时间", - target_time=exp_beg_mins, - max_time_diff=ctx.begin_max_diff, - prefer_earlier=ctx.begin_prefer_early, - ) - if act_beg_mins == -1: - return False - act_beg_tm_str = minsToTimeStr(act_beg_mins) - if ctx.satisfy_duration: - exp_end_mins = self._calcEndTime(act_beg_mins, ctx.expect_duration) - exp_end_tm_str = minsToTimeStr(exp_end_mins) - self._showTrace( - f"需要满足期望预约持续时间: {ctx.expect_duration} 小时, " - f"根据开始时间 {act_beg_tm_str} 计算结束时间: {exp_end_tm_str}" - ) - act_end_mins = self._selectNearestTime( - time_dialog, - time_id="endTime", - time_type="结束时间", - target_time=exp_end_mins, - max_time_diff=ctx.end_max_diff, - prefer_earlier=ctx.end_prefer_early, - ) - if act_end_mins == -1: - return False - act_end_tm_str = minsToTimeStr(act_end_mins) - self._showTrace( - f"期望预约时间段: {exp_beg_tm_str} - {exp_end_tm_str}, " - f"实际预约时间段: {act_beg_tm_str} - {act_end_tm_str}" - ) - return True - - def _selectNearestTime( - self, - time_dialog: TimeSelectDialog, - time_id: str, - time_type: str, - target_time: int, - max_time_diff: int, - prefer_earlier: bool, - ) -> int: - - all_time_opts = time_dialog.getTimeOptions(time_id) - if not all_time_opts: - self._showTrace( - f"{time_type} 选择失败 ! : 当前未查询到可用时间", self.TraceLevel.ERROR - ) - return -1 - result = TimeSelectMaker.forReserve().decide( - all_time_opts, - target_time, - max_time_diff, - prefer_earlier - ) - if result.selected_index >= 0: - all_time_opts[result.selected_index].click() - abs_diff = abs(result.actual_diff) - if result.actual_diff < 0: - relation = f"早了 {abs_diff} 分钟" - elif result.actual_diff > 0: - relation = f"晚了 {abs_diff} 分钟" - else: - relation = f"正好等于 {time_type}" - self._showTrace( - f"选择距离期望 {time_type} 最近的 {result.display_text}, " - f"与期望 {time_type} 相比 {relation}" - ) - return target_time + result.actual_diff - target_time_str = minsToTimeStr(target_time) - self._showTrace( - f"无法选择最近的 {time_type} {target_time_str}, " - f"所有可选时间与目标时间相差都超过 {max_time_diff} 分钟", - self.TraceLevel.WARNING, - ) - self._showTrace(f"当前可供预约的 {time_type} 有: {result.free_times}") - return -1 - - def _calcEndTime( - self, - begin_mins: int, - duration: int, - ) -> int: - - expect_end_mins = int(begin_mins + duration*60) - if expect_end_mins > self.LIBRARY_CLOSE_MINS: - expect_end_mins = self.LIBRARY_CLOSE_MINS - self._showTrace( - f"预约持续时间 {duration} 小时, 超过最大预约时间 23:30, " - f"自动调整为 23:30", - self.TraceLevel.WARNING, - ) - return expect_end_mins diff --git a/src/pages/flows/_helpers.py b/src/pages/flows/_helpers.py index 936ccc5..886d17c 100644 --- a/src/pages/flows/_helpers.py +++ b/src/pages/flows/_helpers.py @@ -7,16 +7,13 @@ This software is provided "as is", without any warranty of any kind. You may use, modify, and distribute this file under the terms of the MIT License. See the LICENSE file for details. """ -def timeStrToMins( - time_str: str, -) -> int: +from pages.strategies.TimeSelectMaker import ( + minsToTimeStr, + timeStrToMins +) - hour, minute = map(int, time_str.split(":")) - return hour * 60 + minute -def minsToTimeStr( - mins: int, -) -> str: - - hour, minute = divmod(int(mins), 60) - return f"{hour:02d}:{minute:02d}" +__all__ = [ + "minsToTimeStr", + "timeStrToMins", +] \ No newline at end of file diff --git a/src/pages/services/RecordChecker.py b/src/pages/services/RecordChecker.py index 106ad33..c7cef7f 100644 --- a/src/pages/services/RecordChecker.py +++ b/src/pages/services/RecordChecker.py @@ -38,97 +38,11 @@ class RecordChecker(MsgBase): seconds: float, ) -> str: - hours = int(seconds // 3600) - minutes = int(seconds % 3600 // 60) - seconds = int(seconds % 60) + hours = int(seconds//3600) + minutes = int(seconds%3600//60) + seconds = int(seconds%60) return f"{hours} 时 {minutes} 分 {seconds} 秒" - def _getReserveRecord( - self, - shell: MainShell, - wanted_date: str, - wanted_status: str, - ) -> dict | None: - - if wanted_date is None: - self._showTrace("日期未指定, 无法检查当前预约状态", self.TraceLevel.WARNING) - return None - self._showTrace( - f"正在检查用户在 {wanted_date} 是否有预约状态为 " - f"{wanted_status} 的预约记录......", 20, no_log=True - ) - - checked_count = 0 - max_check_times = 6 - - records_view = shell.gotoRecordsView() - for _ in range(max_check_times): - reservations = records_view.loadRecords() - if reservations is None: - return None - for reservation in reservations[checked_count:]: - record = self._decodeReserveRecord(reservation, records_view) - checked_count += 1 - if record is None: - continue - if record["date"] == "": - continue - if record["time"] == {"begin": "", "end": ""}: - continue - if ( - datetime.strptime(record["date"], "%Y-%m-%d").date() - > datetime.strptime(wanted_date, "%Y-%m-%d").date() - ): - continue - if ( - datetime.strptime(record["date"], "%Y-%m-%d").date() - < datetime.strptime(wanted_date, "%Y-%m-%d").date() - ): - return None - if record["info"]["status"] == wanted_status: - self._showTrace( - f"寻找到用户第 {checked_count} 条状态为 " - f"{wanted_status} 的预约记录, " - f"详细信息: {record['date']} " - f"{record['time']['begin']} - " - f"{record['time']['end']} " - f"{record['info']['location']}", - 20, no_log=True, - ) - return record - if not records_view.showMoreRecords(): - break - return None - - def _decodeReserveRecord( - self, - reservation, - records_view: RecordsView, - ) -> dict: - - try: - time_element = records_view.getRecordTimeElement(reservation) - info_elements = records_view.getRecordInfoElements(reservation) - except (NoSuchElementException, TimeoutException, StaleElementReferenceException): - return { - "date": "", - "time": {"begin": "", "end": ""}, - "info": {"location": "", "status": ""}, - } - except Exception: - return { - "date": "", - "time": {"begin": "", "end": ""}, - "info": {"location": "", "status": ""}, - } - time_data = self._decodeReserveTime(time_element) - info_data = self._decodeReserveInfo(info_elements) - return { - "date": time_data["date"], - "time": time_data["time"], - "info": info_data, - } - def _decodeReserveTime( self, time_element, @@ -189,6 +103,92 @@ class RecordChecker(MsgBase): location = info.text.strip() return {"location": location, "status": status} + def _decodeReserveRecord( + self, + reservation, + records_view: RecordsView, + ) -> dict: + + try: + time_element = records_view.getRecordTimeElement(reservation) + info_elements = records_view.getRecordInfoElements(reservation) + except (NoSuchElementException, TimeoutException, StaleElementReferenceException): + return { + "date": "", + "time": {"begin": "", "end": ""}, + "info": {"location": "", "status": ""}, + } + except Exception: + return { + "date": "", + "time": {"begin": "", "end": ""}, + "info": {"location": "", "status": ""}, + } + time_data = self._decodeReserveTime(time_element) + info_data = self._decodeReserveInfo(info_elements) + return { + "date": time_data["date"], + "time": time_data["time"], + "info": info_data, + } + + def _getReserveRecord( + self, + shell: MainShell, + wanted_date: str, + wanted_status: str, + ) -> dict | None: + + if wanted_date is None: + self._showTrace("日期未指定, 无法检查当前预约状态", self.TraceLevel.WARNING) + return None + self._showTrace( + f"正在检查用户在 {wanted_date} 是否有预约状态为 " + f"{wanted_status} 的预约记录......", 20, no_log=True + ) + + checked_count = 0 + max_check_times = 6 + + records_view = shell.gotoRecordsView() + for _ in range(max_check_times): + reservations = records_view.loadRecords() + if reservations is None: + return None + for reservation in reservations[checked_count:]: + record = self._decodeReserveRecord(reservation, records_view) + checked_count += 1 + if record is None: + continue + if record["date"] == "": + continue + if record["time"] == {"begin": "", "end": ""}: + continue + if ( + datetime.strptime(record["date"], "%Y-%m-%d").date() + > datetime.strptime(wanted_date, "%Y-%m-%d").date() + ): + continue + if ( + datetime.strptime(record["date"], "%Y-%m-%d").date() + < datetime.strptime(wanted_date, "%Y-%m-%d").date() + ): + return None + if record["info"]["status"] == wanted_status: + self._showTrace( + f"寻找到用户第 {checked_count} 条状态为 " + f"{wanted_status} 的预约记录, " + f"详细信息: {record["date"]} " + f"{record["time"]["begin"]} - " + f"{record["time"]["end"]} " + f"{record["info"]["location"]}", + 20, no_log=True, + ) + return record + if not records_view.showMoreRecords(): + break + return None + def canReserve( self, shell: MainShell, @@ -232,7 +232,7 @@ class RecordChecker(MsgBase): f"{self._formatDiffTime(abs(time_diff_seconds))}, 可以签到" ) return True - elif 0 <= time_diff_seconds < 30 * 60 - 5: + elif 0 <= time_diff_seconds < 30*60 - 5: self._showTrace( f"用户在 {date} 的预约开始时间为 {begin_time}, " f"当前距离预约开始时间已经过去 " @@ -287,18 +287,18 @@ class RecordChecker(MsgBase): f"\n" f" 续约成功 !\n" f" 日 期 :{date}\n" - f" 时 间 :{act_record['time']['begin']}" - f" - {act_record['time']['end']}\n" - f" 位 置 :{act_record['info']['location']}\n" - f" 状 态 :{act_record['info']['status']}" + f" 时 间 :{act_record["time"]["begin"]}" + f" - {act_record["time"]["end"]}\n" + f" 位 置 :{act_record["info"]["location"]}\n" + f" 状 态 :{act_record["info"]["status"]}" ) return True else: self._showTrace( f"\n" f" 续约失败 !\n" - f" 续约后结束时间为 {act_record['time']['end']}," - f"与预期结束时间 {record['time']['end']} 不符 !" + f" 续约后结束时间为 {act_record["time"]["end"]}," + f"与预期结束时间 {record["time"]["end"]} 不符 !" ) return False self._showTrace(f"用户在 {date} 没有有效预约记录, 无法检查续约结果") diff --git a/src/pages/services/ReserveChecker.py b/src/pages/services/ReserveChecker.py index f38f449..d5785e0 100644 --- a/src/pages/services/ReserveChecker.py +++ b/src/pages/services/ReserveChecker.py @@ -36,11 +36,11 @@ class ReserveChecker(MsgBase): if reserve_info.get("floor") is None: raise ValueError("未指定楼层") if reserve_info["floor"] not in floor_map: - raise ValueError(f"该楼层 '{reserve_info['floor']}' 不存在") + raise ValueError(f"该楼层 '{reserve_info["floor"]}' 不存在") if reserve_info.get("room") is None: raise ValueError("未指定房间") if reserve_info["room"] not in room_map: - raise ValueError(f"该房间 '{reserve_info['room']}' 不存在") + raise ValueError(f"该房间 '{reserve_info["room"]}' 不存在") if reserve_info.get("seat_id") is None: raise ValueError("未指定座位") if reserve_info["seat_id"] == "": @@ -75,7 +75,7 @@ class ReserveChecker(MsgBase): if res_timestamp < cur_timestamp: self._showTrace( f"预约日期错误 ! :" - f"{reserve_info['date']} 早于当前日期 {cur_date_str}, 自动设置为当前日期", + f"{reserve_info["date"]} 早于当前日期 {cur_date_str}, 自动设置为当前日期", self.TraceLevel.WARNING, ) reserve_info["date"] = cur_date_str @@ -131,7 +131,7 @@ class ReserveChecker(MsgBase): } self._showTrace( f"结束时间未指定, 自动设置为开始时间加上期望时长: " - f"{reserve_info['end_time']['time']}" + f"{reserve_info["end_time"]["time"]}" ) if "max_diff" not in reserve_info["end_time"]: reserve_info["end_time"]["max_diff"] = 30 @@ -152,7 +152,7 @@ class ReserveChecker(MsgBase): end_mins = timeStrToMins(end_time["time"]) if end_mins < begin_mins and reserve_info["satisfy_duration"] is False: self._showTrace( - f"结束时间 {end_time['time']} 早于开始时间 {begin_time['time']}, " + f"结束时间 {end_time["time"]} 早于开始时间 {begin_time["time"]}, " f"尝试交换时间", self.TraceLevel.WARNING, ) @@ -163,7 +163,7 @@ class ReserveChecker(MsgBase): max_end_mins = timeStrToMins("23:30") if end_mins > max_end_mins: self._showTrace( - f"结束时间 {end_time['time']} 晚于 23:30, 自动设置为 23:30", + f"结束时间 {end_time["time"]} 晚于 23:30, 自动设置为 23:30", self.TraceLevel.WARNING, ) reserve_info["end_time"]["time"] = "23:30" @@ -172,20 +172,20 @@ class ReserveChecker(MsgBase): if reserve_info["expect_duration"] > 8: self._showTrace( f"该用户设置了优先满足时长要求, 但是预约期望持续时间 " - f"{reserve_info['expect_duration']} 小时 " + f"{reserve_info["expect_duration"]} 小时 " f"超出最大时长 8 小时, 自动设置为 8 小时", self.TraceLevel.WARNING, ) reserve_info["expect_duration"] = 8 else: - if end_mins - begin_mins > 8 * 60: + if end_mins - begin_mins > 8*60: self._showTrace( f"该用户未设置优先满足时长要求, 但是检查到预约持续时间 " f"{float((end_mins - begin_mins) / 60)} 小时 " f"超出最大时长 8 小时, 自动设置为 8 小时", self.TraceLevel.WARNING, ) - reserve_info["end_time"]["time"] = minsToTimeStr(begin_mins + 8 * 60) + reserve_info["end_time"]["time"] = minsToTimeStr(begin_mins + 8*60) return True def check( @@ -207,12 +207,12 @@ class ReserveChecker(MsgBase): return False self._showTrace( f"预约信息检查完成, 准备预约 " - f"{reserve_info['date']} " - f"{reserve_info['begin_time']['time']} - " - f"{reserve_info['end_time']['time']} " + f"{reserve_info["date"]} " + f"{reserve_info["begin_time"]["time"]} - " + f"{reserve_info["end_time"]["time"]} " f"图书馆 " - f"{ReserveView.FLOOR_MAP[reserve_info['floor']]} " - f"{ReserveView.ROOM_MAP[reserve_info['room']]} " - f"的座位 {reserve_info['seat_id']}" + f"{ReserveView.FLOOR_MAP[reserve_info["floor"]]} " + f"{ReserveView.ROOM_MAP[reserve_info["room"]]} " + f"的座位 {reserve_info["seat_id"]}" ) return True \ No newline at end of file diff --git a/src/pages/strategies/__init__.py b/src/pages/strategies/__init__.py index 6599a5f..23597e4 100644 --- a/src/pages/strategies/__init__.py +++ b/src/pages/strategies/__init__.py @@ -7,7 +7,7 @@ This software is provided "as is", without any warranty of any kind. You may use, modify, and distribute this file under the terms of the MIT License. See the LICENSE file for details. """ -from pages.strategies.timeSelectMaker import ( +from pages.strategies.TimeSelectMaker import ( TimeSelectMaker, TimeDecisionMaker, TimeOptionReader, @@ -15,6 +15,7 @@ from pages.strategies.timeSelectMaker import ( RenewTimeReader, TimeOption, TimeSelectionResult, + TimeRangeResult, ) __all__ = [ @@ -25,4 +26,5 @@ __all__ = [ "RenewTimeReader", "TimeOption", "TimeSelectionResult", + "TimeRangeResult", ] diff --git a/src/pages/strategies/timeSelectMaker.py b/src/pages/strategies/timeSelectMaker.py index ada47cf..92428b6 100644 --- a/src/pages/strategies/timeSelectMaker.py +++ b/src/pages/strategies/timeSelectMaker.py @@ -11,8 +11,20 @@ from abc import ABC, abstractmethod from dataclasses import dataclass, field from datetime import datetime -from pages.flows._helpers import minsToTimeStr +def timeStrToMins( + time_str: str, +) -> int: + + hour, minute = map(int, time_str.split(":")) + return hour*60 + minute + +def minsToTimeStr( + mins: int, +) -> str: + + hour, minute = divmod(int(mins), 60) + return f"{hour:02d}:{minute:02d}" @dataclass class TimeOption: @@ -31,18 +43,28 @@ class TimeSelectionResult: free_times: list[str] = field(default_factory=list) +@dataclass +class TimeRangeResult: + + begin_result: TimeSelectionResult = field(default_factory=TimeSelectionResult) + end_result: TimeSelectionResult = field(default_factory=TimeSelectionResult) + actual_begin_mins: int = -1 + actual_end_mins: int = -1 + expect_end_mins: int = 0 + + class TimeOptionReader(ABC): @abstractmethod def readOptions( self, - elements: list, + elements: list ) -> list[TimeOption]: ... def formatFreeTime( self, - opt: TimeOption, + opt: TimeOption ) -> str: return opt.element_text @@ -56,7 +78,7 @@ class ReserveTimeReader(TimeOptionReader): def readOptions( self, - elements: list, + elements: list ) -> list[TimeOption]: options: list[TimeOption] = [] @@ -74,7 +96,7 @@ class ReserveTimeReader(TimeOptionReader): def formatFreeTime( self, - opt: TimeOption, + opt: TimeOption ) -> str: return minsToTimeStr(opt.value) @@ -87,7 +109,7 @@ class RenewTimeReader(TimeOptionReader): def readOptions( self, - elements: list, + elements: list ) -> list[TimeOption]: options: list[TimeOption] = [] @@ -103,7 +125,7 @@ class TimeDecisionMaker: def __init__( self, - reader: TimeOptionReader, + reader: TimeOptionReader ) -> None: self._reader = reader @@ -113,7 +135,7 @@ class TimeDecisionMaker: elements: list, target_time: int, max_time_diff: int, - prefer_earlier: bool, + prefer_earlier: bool ) -> TimeSelectionResult: options = self._reader.readOptions(elements) @@ -148,9 +170,30 @@ class TimeDecisionMaker: class TimeSelectMaker: - LIBRARY_CLOSE_MINS = 1410 + LIBRARY_CLOSE_MINS = 1350 # 22:30 MAX_DURATION_HOURS = 8 + @staticmethod + def calcEndTime( + begin_mins: int, + duration: int, + library_close_mins: int = LIBRARY_CLOSE_MINS + ) -> int: + + expect_end_mins = int(begin_mins + duration*60) + if expect_end_mins > library_close_mins: + return library_close_mins + return expect_end_mins + + @staticmethod + def calcRemainingDuration( + end_time_str: str, + target_mins: int, + library_close_mins: int = LIBRARY_CLOSE_MINS + ) -> int: + + return library_close_mins - timeStrToMins(end_time_str) + @staticmethod def forReserve( ) -> TimeDecisionMaker: