1
1
mirror of https://github.com/KenanZhu/AutoLibrary.git synced 2026-06-19 16:03:02 +08:00

Compare commits

...

12 Commits

Author SHA1 Message Date
KenanZhu 02463f087e feat(MsgBase, gui, operators): 增强日志输出功能
- 为 _showTrace 方法添加 no_log 参数,支持控制日志写入
- 在主窗口各关键操作点添加日志输出
- 优化错误信息输出策略,分离 trace 和 log 输出
- 改进配置目录初始化过程的日志记录
2026-03-18 12:46:37 +08:00
KenanZhu e481824344 refactor(AppInitializer): 初始化应用程序时,先初始化日志管理器,再初始化配置管理器 2026-03-18 11:03:44 +08:00
KenanZhu 160d6a2428 refactor(operators): 为 _showTrace 方法添加合适的 TraceLevel 参数 2026-03-18 11:02:52 +08:00
KenanZhu ec683cf154 feat(LogManager): 新增日志持久化功能 2026-03-18 10:21:53 +08:00
KenanZhu 2d0782c368 refactor(AppInitializer): 将初始化逻辑提取到 AppInitializer 模块中
- 本次提交将 Main.py 中的 ConfigManager, LogManager 等初始化逻辑提取到 AppInitializer 模块中
- 更改默认的配置文件路径从 config 目录变为 configs 目录,并考虑兼容性问题
2026-03-18 10:17:09 +08:00
KenanZhu 824b9b8869 fix(ALMainWindow): 修复 ALMainWindow 的配置路径同步问题
- 先前的实现并未考虑到配置窗口更改时的同步问题,本次提交在
  每次配置窗口更改并关闭保存时,同步更新 ALMainWindow 中的配置路径
2026-03-18 10:14:27 +08:00
KenanZhu c26f19b6b3 feat(LogManager): 新增日志持久化功能
- 新增 LogManager 单例类,支持日志文件按日期滚动
- 创建 CallerInfoFormatter 自定义格式化器,提取真实调用位置
- 为 MsgBase._showTrace 方法添加日志级别参数,集成日志系统
- 新增 initializeLogManager 初始化函数,日志存储于 AppDataLocation/logs/
- 日志输出格式对齐:[时间] - [类名(15)|级别(8)] - [文件:行号(20:4)] - 消息
- 控制台/INFO级别,全量日志 / DEBUG 级别,错误日志 / ERROR级别
- 全量日志保留7天,错误日志保留14天
2026-03-17 21:37:24 +08:00
KenanZhu 1d99ca92f2 fix(LibReserve): 修复日期比较逻辑错误并优化时间处理代码
- 修复使用字符串直接比较日期导致的逻辑错误,改用时间戳比较
- 优化时间验证逻辑,支持 satisfy_duration 模式下的开始晚于结束时间时的交换时间处理
- 添加必要的注释说明 place 参数检查的跳过原因和边界情况处理
- 重构变量命名,提高代码可读性(cur_date -> cur_date_str)
- 修正字符串引号风格,统一使用单引号
2026-03-17 20:46:00 +08:00
KenanZhu 50ebeb0fab style(LibReserve): 修复 __selectSeatTime 参数的拼写错误
- expct_duration -> expect_duration
2026-03-17 20:43:00 +08:00
KenanZhu faa26b489a fix(LibReserve): 修复冗余的链式赋值 2026-03-17 20:42:42 +08:00
KenanZhu c03eed1d51 fix(LibReserve): 修复错误使用的海象运算符条件判断 2026-03-17 20:42:31 +08:00
KenanZhu 2f5680c547 fix(LibTimeSelector) style(LibReserve): 修复时间转换方法 _timeToMins 并重命名为 _timeStrToMins
- 之前的实现未严格限制传入参数为整形,导致在转换时间字符串时可能出现类型错误。
- 重命名为 _timeStrToMins 以明确表示该方法仅用于时间字符串转换。并更新相关调用。

- 重命名 __selectSeatTime 中的冗长局部变量,便于理解和维护。
- 删除多余的时间格式转换嗲用
2026-03-17 20:00:57 +08:00
15 changed files with 544 additions and 172 deletions
+4 -13
View File
@@ -7,26 +7,17 @@ This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import os
import sys
from PySide6.QtCore import QTranslator, QStandardPaths, QDir
from PySide6.QtCore import QTranslator
from PySide6.QtWidgets import QApplication
from gui.ALMainWindow import ALMainWindow
from gui.resources import ALResource
from utils.ConfigManager import instance
from utils.AppInitializer import initializeApp
def initializeConfigManager():
app_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppDataLocation)
config_dir = os.path.join(app_dir, "config")
if not QDir(config_dir).exists():
QDir().mkpath(config_dir)
instance(config_dir)
def main():
app = QApplication(sys.argv)
@@ -35,12 +26,12 @@ def main():
app.installTranslator(translator)
app.setStyle('Fusion')
app.setApplicationName("AutoLibrary")
initializeConfigManager()
if not initializeApp():
sys.exit(-1)
window = ALMainWindow()
window.show()
sys.exit(app.exec_())
if __name__ == "__main__":
main()
+16 -10
View File
@@ -9,6 +9,8 @@ See the LICENSE file for details.
"""
import queue
from datetime import datetime
from base.LibOperator import LibOperator
@@ -29,25 +31,33 @@ class LibTimeSelector(LibOperator):
super().__init__(input_queue, output_queue)
@staticmethod
def _timeToMins(
def _timeStrToMins(
time_str: str
) -> int:
"""
Convert time string "HH:MM" to minutes since midnight.
Example:
"10:00" -> 600
"13:30" -> 810
"""
hour, minute = map(int, time_str.split(":"))
return hour*60 + minute
@staticmethod
def _minsToTime(
def _minsToTimeStr(
mins: int
) -> str:
"""
Convert minutes since midnight to time string "HH:MM".
Example:
600 -> "10:00"
810 -> "13:30"
"""
hour, minute = divmod(mins, 60)
hour, minute = divmod(int(mins), 60)
return f"{hour:02d}:{minute:02d}"
@@ -99,11 +109,11 @@ class LibTimeSelector(LibOperator):
for time_opt in time_options:
# Parse time value based on context
if is_reserve:
# Reservation context: parse 'time' attribute
time_attr = time_opt.get_attribute("time")
if time_attr == "now":
from datetime import datetime
now = datetime.now()
time_val = now.hour * 60 + now.minute
time_val = now.hour*60 + now.minute
elif time_attr and time_attr.isdigit():
time_val = int(time_attr)
else:
@@ -114,9 +124,7 @@ class LibTimeSelector(LibOperator):
if not (time_attr and time_attr.isdigit()):
continue
time_val = int(time_attr)
free_times.append(time_opt.text.strip() if not is_reserve else self._minsToTime(time_val))
free_times.append(time_opt.text.strip() if not is_reserve else self._minsToTimeStr(time_val))
actual_diff = time_val - target_time
abs_diff = abs(actual_diff)
@@ -125,11 +133,9 @@ class LibTimeSelector(LibOperator):
(abs_diff == best_time_diff and
((prefer_earlier and actual_diff <= 0) or
(not prefer_earlier and actual_diff >= 0)))):
best_time_diff = abs_diff
best_actual_diff = actual_diff
best_time_opt = time_opt
if best_time_opt is not None:
return (best_time_opt, best_time_opt.text.strip(), best_actual_diff, free_times)
return (None, None, None, free_times)
+34 -1
View File
@@ -7,9 +7,12 @@ This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import logging
import queue
import datetime
from utils.LogManager import getLogger
class MsgBase:
"""
@@ -29,6 +32,18 @@ class MsgBase:
implement queue polling to retrieve and process messages.
"""
class TraceLevel:
"""
Enum class for trace levels.
This class provides the trace levels for the logger.
"""
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL
def __init__(
self,
input_queue: queue.Queue,
@@ -38,6 +53,10 @@ class MsgBase:
self._class_name = self.__class__.__name__
self._input_queue = input_queue
self._output_queue = output_queue
try:
self._logger = getLogger(self._class_name)
except RuntimeError:
self._logger = None
def _showMsg(
@@ -50,11 +69,25 @@ class MsgBase:
def _showTrace(
self,
msg: str
msg: str,
level: int = logging.INFO,
no_log: bool = False
):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self._output_queue.put(f"{timestamp}-[{self._class_name:<15}] : {msg}")
if self._logger and not no_log:
self._logger.log(level, msg)
def _showLog(
self,
msg: str,
level: int = logging.INFO
):
if self._logger:
self._logger.log(level, msg)
def _waitMsg(
-1
View File
@@ -8,7 +8,6 @@ You may use, modify, and distribute this file under the terms of the MIT License
See the LICENSE file for details.
"""
import os
import sys
from PySide6.QtCore import (
Qt, Signal, Slot, QTime, QDate, QDir, QFileInfo
+11 -3
View File
@@ -59,6 +59,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
self.connectSignals()
self.startMsgPolling()
self.startTimerTaskPolling()
self._showLog("主窗口初始化完成")
def modifyUi(
@@ -113,7 +114,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
):
if not QSystemTrayIcon.isSystemTrayAvailable():
self._showTrace("操作系统不支持系统托盘功能, 无法创建系统托盘图标")
self._showTrace("操作系统不支持系统托盘功能, 无法创建系统托盘图标", self.TraceLevel.WARNING)
return
self.TrayIcon = QSystemTrayIcon(self.icon, self)
self.TrayIcon.setToolTip("AutoLibrary")
@@ -186,6 +187,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
if self.__alConfigWidget:
self.__alConfigWidget.close()
# the config widget is already deleted in the 'self.onConfigWidgetClosed'
self._showLog("主窗口关闭")
QMainWindow.closeEvent(self, event)
@@ -298,7 +300,9 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
self.__alConfigWidget.configWidgetIsClosed.disconnect(self.onConfigWidgetClosed)
self.__alConfigWidget.deleteLater()
self.__alConfigWidget = None
self.__config_paths = ConfigManager.getValidateAutomationConfigPaths()
self.setControlButtons(True, None, None)
self._showLog("配置窗口已关闭,配置文件路径已更新")
@Slot(dict)
def onTimerTaskIsReady(
@@ -346,6 +350,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
self.__alTimerTaskManageWidget.raise_()
self.__alTimerTaskManageWidget.activateWindow()
self.TimerTaskManageWidgetButton.setEnabled(False)
self._showLog("打开定时任务管理窗口")
@Slot()
def onConfigButtonClicked(
@@ -359,6 +364,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
self.__alConfigWidget.raise_()
self.__alConfigWidget.activateWindow()
self.ConfigButton.setEnabled(False)
self._showLog("打开配置窗口")
@Slot()
def onStartButtonClicked(
@@ -375,6 +381,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
self.__auto_lib_thread.autoLibWorkerIsFinished.connect(self.onStopButtonClicked)
self.__auto_lib_thread.autoLibWorkerFinishedWithError.connect(self.onStopButtonClicked)
self.__auto_lib_thread.start()
self._showLog("开始手动执行任务")
@Slot()
def onStopButtonClicked(
@@ -382,14 +389,15 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
):
if self.__auto_lib_thread:
self._showTrace("正在停止操作......")
self._showTrace("正在停止操作......", no_log=True)
self.__auto_lib_thread.wait(2000)
self._showTrace("操作已停止")
self._showTrace("操作已停止", no_log=True)
self.__auto_lib_thread.autoLibWorkerIsFinished.disconnect(self.onStopButtonClicked)
self.__auto_lib_thread.autoLibWorkerFinishedWithError.disconnect(self.onStopButtonClicked)
self.__auto_lib_thread.deleteLater()
self.__auto_lib_thread = None
self.setControlButtons(None, False, True)
self._showLog("任务已停止")
@Slot()
def onSendButtonClicked(
+29 -11
View File
@@ -44,9 +44,11 @@ class AutoLibWorker(MsgBase, QThread):
current_time = time.strftime("%H:%M", time.localtime())
if current_time >= "23:30" or current_time <= "07:30":
self._showTrace(
"当前时间不在图书馆开放时间内, 请在 07:30 - 23:30 之间尝试"
"当前时间不在图书馆开放时间内, 请在 07:30 - 23:30 之间尝试",
self.TraceLevel.WARNING
)
return False
self._showLog(f"时间检查通过, 当前时间: {current_time}", self.TraceLevel.INFO)
return True
@@ -57,8 +59,12 @@ class AutoLibWorker(MsgBase, QThread):
if not all(
os.path.exists(path) for path in self.__config_paths.values()
):
self._showTrace("配置文件路径不存在, 请检查配置文件路径是否正确")
self._showTrace(
"配置文件路径不存在, 请检查配置文件路径是否正确",
self.TraceLevel.ERROR
)
return False
self._showLog(f"配置文件路径检查通过, 路径: {self.__config_paths}", self.TraceLevel.INFO)
return True
@@ -67,22 +73,28 @@ class AutoLibWorker(MsgBase, QThread):
) -> bool:
self._showTrace(
f"正在加载配置文件, 运行配置文件路径: {self.__config_paths["run"]}"
f"正在加载配置文件, 运行配置文件路径: {self.__config_paths["run"]}",
no_log=True
)
self.__run_config = JSONReader(self.__config_paths["run"]).data()
self._showTrace(
f"正在加载配置文件, 用户配置文件路径: {self.__config_paths["user"]}"
f"正在加载配置文件, 用户配置文件路径: {self.__config_paths["user"]}",
no_log=True
)
self.__user_config = JSONReader(self.__config_paths["user"]).data()
if self.__run_config is None or self.__user_config is None:
self._showTrace("配置文件加载失败, 请检查配置文件是否正确")
self._showTrace("配置文件加载失败, 请检查配置文件是否正确")
self._showTrace(
"配置文件加载失败, 请检查配置文件是否正确",
self.TraceLevel.ERROR
)
return False
if not self.__user_config.get("groups"):
self._showTrace(
"用户配置文件中无有效任务组, 请检查用户配置文件是否正确"
"用户配置文件中无有效任务组, 请检查用户配置文件是否正确",
self.TraceLevel.WARNING
)
return False
self._showLog(f"配置文件加载成功, 任务组数量: {len(self.__user_config.get('groups', []))}", self.TraceLevel.INFO)
return True
@@ -108,14 +120,17 @@ class AutoLibWorker(MsgBase, QThread):
groups = self.__user_config.get("groups")
for group in groups:
if not group["enabled"]:
self._showTrace(f"任务组 {group["name"]} 已跳过")
self._showTrace(f"任务组 {group["name"]} 已跳过", no_log=True)
continue
self._showTrace(f"正在运行任务组 {group["name"]}")
self._showTrace(f"正在运行任务组 {group["name"]}", no_log=True)
auto_lib.run(
{ "users": group.get("users", []) }
)
except Exception as e:
self._showTrace(f"AutoLibrary 运行时发生异常 : {e}")
self._showTrace(
f"AutoLibrary 运行时发生异常 : {e}",
self.TraceLevel.ERROR
)
self.autoLibWorkerFinishedWithError.emit()
return
if auto_lib:
@@ -154,7 +169,10 @@ class TimerTaskWorker(AutoLibWorker):
self
):
self._showTrace(f"定时任务 {self.__timer_task['name']} 运行时发生异常")
self._showTrace(
f"定时任务 {self.__timer_task['name']} 运行时发生异常",
self.TraceLevel.ERROR
)
self.timerTaskWorkerIsFinished.emit(True, self.__timer_task)
@Slot()
+40 -17
View File
@@ -54,7 +54,7 @@ class AutoLib(MsgBase):
self
) -> bool:
self._showTrace("正在初始化浏览器驱动......")
self._showTrace("正在初始化浏览器驱动......", no_log=True)
web_driver_config = self.__run_config.get("web_driver", None)
self.__driver_type = web_driver_config.get("driver_type")
@@ -66,11 +66,14 @@ class AutoLib(MsgBase):
case "firefox":
driver_options = webdriver.FirefoxOptions()
case _:
self._showTrace(f"不支持的浏览器驱动类型: {self.__driver_type} !")
self._showTrace(
f"不支持的浏览器驱动类型: {self.__driver_type} !",
self.TraceLevel.WARNING
)
return False
if not web_driver_config:
self._showTrace("未配置浏览器驱动参数 !")
self._showTrace("未配置浏览器驱动参数 !", self.TraceLevel.ERROR)
return False
if web_driver_config.get("headless"):
driver_options.add_argument("--headless")
@@ -110,7 +113,7 @@ class AutoLib(MsgBase):
# init browser driver
self.__driver_path = web_driver_config.get("driver_path")
if not self.__driver_path:
self._showTrace("未配置浏览器驱动路径 !")
self._showTrace("未配置浏览器驱动路径 !", self.TraceLevel.WARNING)
return False
self.__driver_path = os.path.abspath(self.__driver_path)
try:
@@ -123,18 +126,18 @@ class AutoLib(MsgBase):
service = ChromeService(executable_path=self.__driver_path)
self.__driver = webdriver.Chrome(service=service, options=driver_options)
case "firefox":
self._showTrace(f"Firefox 浏览器驱动初始化略慢, 请耐心等待...")
self._showTrace(f"Firefox 浏览器驱动初始化略慢, 请耐心等待...", no_log=True)
service = FirefoxService(executable_path=self.__driver_path)
self.__driver = webdriver.Firefox(service=service, options=driver_options)
case _: # actually will not happen, beacuse we have checked it at the initlization
# of 'driver_options'
raise Exception(f"不支持的浏览器驱动类型: {self.__driver_type}")
raise Exception(f"不支持的浏览器驱动类型: {self.__driver_type} !")
self.__driver.implicitly_wait(1)
self.__driver.execute_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
)
except Exception as e:
self._showTrace(f"浏览器驱动初始化失败: {e}")
self._showTrace(f"浏览器驱动初始化失败: {e}", self.TraceLevel.ERROR)
return False
self._showTrace(f"浏览器驱动已初始化, 类型: {self.__driver_type}, 路径: {self.__driver_path}")
return True
@@ -145,7 +148,7 @@ class AutoLib(MsgBase):
):
if not self.__driver:
self._showTrace(f"浏览器驱动未初始化, 请先初始化浏览器驱动 !")
self._showTrace(f"浏览器驱动未初始化, 请先初始化浏览器驱动 !", self.TraceLevel.WARNING)
return
self.__lib_checker = LibChecker(self._input_queue, self._output_queue, self.__driver)
self.__lib_login = LibLogin(self._input_queue, self._output_queue, self.__driver)
@@ -178,7 +181,7 @@ class AutoLib(MsgBase):
)
return True
except:
self._showTrace(f"登录页面加载失败 !")
self._showTrace(f"登录页面加载失败 !", self.TraceLevel.ERROR)
return False
@@ -188,7 +191,7 @@ class AutoLib(MsgBase):
lib_config = self.__run_config.get("library", None)
if not lib_config:
self._showTrace("未配置图书馆参数 !")
self._showTrace("未配置图书馆参数 !", self.TraceLevel.ERROR)
return False
url = lib_config.get("host_url") + lib_config.get("login_url")
self.__driver.set_page_load_timeout(5)
@@ -196,7 +199,9 @@ class AutoLib(MsgBase):
self.__driver.get(url)
except TimeoutException:
self.__driver.execute_script("window.stop();")
self._showTrace(f"图书馆登录页面加载超时 ! 请检查网络环境是否正常")
self._showTrace(
f"图书馆登录页面加载超时 ! 请检查网络环境是否正常", self.TraceLevel.ERROR
)
return False
if not self.__waitResponseLoad():
return False
@@ -240,8 +245,10 @@ class AutoLib(MsgBase):
else:
self._showTrace(f"用户 {username} 无法预约,已跳过")
result = 2
# checkin
if run_mode["auto_checkin"] and result != 1:
last_result = result
if run_mode["auto_checkin"] and last_result != 1:
if self.__lib_checker.canCheckin():
if self.__lib_checkin.checkin(username):
result = 0
@@ -250,12 +257,20 @@ class AutoLib(MsgBase):
else:
self._showTrace(f"用户 {username} 无法签到,已跳过")
result = 2
if last_result == 0: # partly success
result = 0
# renewal
if run_mode["auto_renewal"] and result != 1:
last_result = result
if run_mode["auto_renewal"] and last_result != 1:
can_renew, record = self.__lib_checker.canRenew()
if can_renew:
if self.__lib_renew.renew(username, record, reserve_info):
if self.__lib_checker.postRenewCheck(record):
self._showTrace(f"用户 {username} 续约成功 !")
result = 0
else:
if result != 1: # partly success
result = 0
else:
result = 1
@@ -264,6 +279,9 @@ class AutoLib(MsgBase):
else:
self._showTrace(f"用户 {username} 无法续约,已跳过")
result = 2
if last_result == 0: # partly success
result = 0
# logout
if not self.__lib_logout.logout(
username
@@ -288,7 +306,8 @@ class AutoLib(MsgBase):
for user in users:
user_counter["current"] += 1
self._showTrace(
f"正在处理第 {user_counter["current"]}/{len(users)} 个用户: {user["username"]}......"
f"正在处理第 {user_counter["current"]}/{len(users)} 个用户: {user["username"]}......",
no_log=True
)
if not user["enabled"]:
self._showTrace(f"用户 {user["username"]} 已跳过")
@@ -303,7 +322,8 @@ class AutoLib(MsgBase):
)
if r == -1:
self._showTrace(
f"用户 {user["username"]} 处理过程中页面发生异常,无法继续操作, 任务已终止 !"
f"用户 {user["username"]} 处理过程中页面发生异常,无法继续操作, 任务已终止 !",
self.TraceLevel.WARNING
)
break
elif r == 0:
@@ -326,11 +346,14 @@ class AutoLib(MsgBase):
if self.__driver:
if self.__driver_type.lower() == "firefox":
self._showTrace(f"Firefox 浏览器驱动关闭略慢, 请耐心等待...")
self._showTrace(
f"Firefox 浏览器驱动关闭略慢, 请耐心等待...",
no_log=True
)
self.__driver.quit()
self.__driver = None
self._showTrace(f"浏览器驱动已关闭")
return True
else:
self._showTrace(f"浏览器驱动未初始化, 无需关闭")
self._showTrace(f"浏览器驱动未初始化, 无需关闭", no_log=True)
return False
+8 -7
View File
@@ -63,7 +63,7 @@ class LibChecker(LibOperator):
EC.presence_of_element_located((By.CLASS_NAME, "myReserveList"))
)
except:
self._showTrace("加载预约记录页面失败 !")
self._showTrace("加载预约记录页面失败 !", self.TraceLevel.ERROR)
return False
return True
@@ -174,7 +174,7 @@ class LibChecker(LibOperator):
)
return reservations
except:
self._showTrace("加载预约记录失败 !")
self._showTrace("加载预约记录失败 !", self.TraceLevel.ERROR)
return None
@@ -197,10 +197,10 @@ class LibChecker(LibOperator):
self.__driver.execute_script("arguments[0].click();", more_btn)
return True
else:
self._showTrace("用户无法加载更多预约记录")
self._showTrace("用户无法加载更多预约记录", self.TraceLevel.WARNING)
return False
except:
self._showTrace("加载更多预约记录失败 !")
self._showTrace("加载更多预约记录失败 !", self.TraceLevel.ERROR)
return False
@@ -211,9 +211,9 @@ class LibChecker(LibOperator):
) -> dict:
if wanted_date is None:
self._showTrace("日期未指定, 无法检查当前预约状态")
self._showTrace("日期未指定, 无法检查当前预约状态", self.TraceLevel.WARNING)
return None
self._showTrace(f"正在检查用户在 {wanted_date} 是否有预约状态为 {wanted_status} 的预约记录......")
self._showTrace(f"正在检查用户在 {wanted_date} 是否有预约状态为 {wanted_status} 的预约记录......", no_log=True)
checked_count = 0
max_check_times = 6 # we only check (4*(6-1)=)20 reservations, the last time cant be checked
@@ -245,7 +245,8 @@ class LibChecker(LibOperator):
self._showTrace(
f"寻找到用户第 {checked_count} 条状态为 {wanted_status} 的预约记录, "
f"详细信息: {record["date"]} "
f"{record["time"]["begin"]} - {record["time"]["end"]} {record["info"]["location"]}"
f"{record["time"]["begin"]} - {record["time"]["end"]} {record["info"]["location"]}",
no_log=True
)
return record
if not self.__showMoreReserveRecords():
+9 -9
View File
@@ -51,7 +51,7 @@ class LibCheckin(LibOperator):
)
ok_btn = self.__driver.find_element(By.CLASS_NAME, "btnOK")
except:
self._showTrace("签到时发生未知错误 !")
self._showTrace("签到时发生未知错误 !", self.TraceLevel.ERROR)
return False
result_message = result_message_element.text
if "签到成功" in result_message:
@@ -107,9 +107,9 @@ class LibCheckin(LibOperator):
result = self.__driver.execute_script(script)
time.sleep(0.1)
if result:
self._showTrace("签到按钮已启用")
self._showTrace("签到按钮已启用", no_log=True)
else:
self._showTrace("签到按钮启用失败")
self._showTrace("签到按钮启用失败", self.TraceLevel.WARNING)
return result
@@ -119,24 +119,24 @@ class LibCheckin(LibOperator):
) -> bool:
if self.__driver is None:
self._showTrace("未提供有效 WebDriver 实例 !")
self._showTrace("未提供有效 WebDriver 实例 !", self.TraceLevel.WARNING)
return False
try:
checkin_btn = WebDriverWait(self.__driver, 2).until(
EC.element_to_be_clickable((By.ID, "btnCheckIn"))
)
except:
self._showTrace(f"用户 {username} 签到界面加载失败 !")
self._showTrace(f"用户 {username} 签到界面加载失败 !", self.TraceLevel.ERROR)
return False
if "disabled" in checkin_btn.get_attribute("class"):
self._showTrace("签到按钮不可用, 可能不在场馆内, 正在尝试启用......")
self._showTrace("签到按钮不可用, 可能不在场馆内, 正在尝试启用......", no_log=True)
if not self.__enableCheckinBtn():
self._showTrace(f"签到按钮启用失败 !")
self._showTrace(f"签到按钮启用失败 !", self.TraceLevel.ERROR)
return False
checkin_btn.click()
if self._waitResponseLoad():
self._showTrace(f"用户 {username} 签到成功 !")
self._showTrace(f"用户 {username} 签到成功 !", no_log=True)
return True
else:
self._showTrace(f"用户 {username} 签到失败 !")
self._showTrace(f"用户 {username} 签到失败 !", self.TraceLevel.ERROR)
return False
+23 -15
View File
@@ -52,7 +52,10 @@ class LibLogin(LibOperator):
)
return True
except:
self._showTrace(f"登录页面加载失败 ! : 用户账号或者密码错误/验证码错误, 具体以页面提示为准")
self._showTrace(
f"登录页面加载失败 ! : 用户账号或者密码错误/验证码错误, 具体以页面提示为准",
self.TraceLevel.ERROR
)
return False
@@ -71,7 +74,7 @@ class LibLogin(LibOperator):
password_element.clear()
password_element.send_keys(password)
except Exception as e:
self._showTrace(f"用户名或密码填写失败 ! : {e}")
self._showTrace(f"用户名或密码填写失败 ! : {e}", self.TraceLevel.ERROR)
return False
return True
@@ -88,12 +91,13 @@ class LibLogin(LibOperator):
captcha_img = base64.b64decode(base64_str)
captcha_text = self.__ddddocr.classification(captcha_img)
captcha_text = ''.join(filter(str.isalnum, captcha_text)).lower()
self._showTrace(f"识别到验证码为 : '{captcha_text}'")
self._showTrace(f"识别到验证码为 : '{captcha_text}'", no_log=True)
if len(captcha_text) != 4:
self._showLog("识别到的验证码长度不等于 4 个字符 !", self.TraceLevel.WARNING)
raise Exception("识别到的验证码长度不等于 4 个字符 !")
return captcha_text
except Exception as e:
self._showTrace(f"验证码识别失败 ! : {e}")
self._showTrace(f"验证码识别失败 ! : {e}", self.TraceLevel.ERROR)
return ""
@@ -105,12 +109,13 @@ class LibLogin(LibOperator):
try:
self._showMsg("请输入验证码:")
captcha_text = self._waitMsg(timeout=15)
self._showTrace(f"输入的验证码为 : '{captcha_text}'")
self._showTrace(f"输入的验证码为 : '{captcha_text}'", no_log=True)
if len(captcha_text) != 4:
self._showLog("输入的验证码长度不等于 4 个字符 !", self.TraceLevel.WARNING)
raise Exception("输入的验证码长度不等于 4 个字符 !")
return captcha_text
except Exception as e:
self._showTrace(f"输入验证码失败 ! : {e}")
self._showTrace(f"输入验证码失败 ! : {e}", self.TraceLevel.ERROR)
return ""
@@ -120,13 +125,13 @@ class LibLogin(LibOperator):
# refresh captcha
try:
self._showTrace("刷新验证码......")
self._showTrace("刷新验证码......", no_log=True)
self.__driver.find_element(
By.ID, "loadImgId"
).click()
return True
except Exception as e:
self._showTrace(f"刷新验证码失败 ! : {e}")
self._showTrace(f"刷新验证码失败 ! : {e}", self.TraceLevel.ERROR)
return False
@@ -140,14 +145,17 @@ class LibLogin(LibOperator):
if auto_captcha:
captcha_text = self.__autoRecognizeCaptcha()
else:
self._showTrace(f"用户未配置自动识别验证码, 请手动输入验证码 !")
self._showTrace(f"用户未配置自动识别验证码, 请手动输入验证码 !", no_log=True)
captcha_text = self.__manualRecognizeCaptcha()
if captcha_text:
return captcha_text
else:
if not self.__refreshCaptcha():
return ""
self._showTrace(f"验证码识别失败 {max_attempts} 次, 达到最大尝试次数 !")
self._showTrace(
f"验证码识别失败 {max_attempts} 次, 达到最大尝试次数 !",
self.TraceLevel.WARNING
)
return ""
@@ -162,7 +170,7 @@ class LibLogin(LibOperator):
captcha_element.send_keys(captcha_text)
return True
except Exception as e:
self._showTrace(f"验证码填写失败 ! : {e}")
self._showTrace(f"验证码填写失败 ! : {e}", self.TraceLevel.ERROR)
return False
@@ -175,11 +183,11 @@ class LibLogin(LibOperator):
) -> bool:
if self.__driver is None:
self._showTrace("未提供有效 WebDriver 实例 !")
self._showTrace("未提供有效 WebDriver 实例 !", self.TraceLevel.WARNING)
return False
# begin login process
for attempt in range(max_attempts):
self._showTrace(f"用户 {username}{attempt + 1} 次尝试登录......")
self._showTrace(f"用户 {username}{attempt + 1} 次尝试登录......", no_log=True)
if not self.__fillLogInElements(
username,
password,
@@ -190,7 +198,7 @@ class LibLogin(LibOperator):
continue
if not self.__fillCaptchaElement(captcha_text):
continue
self._showTrace("尝试登录...")
self._showTrace("尝试登录...", no_log=True)
try:
self.__driver.find_element(
By.XPATH,
@@ -203,5 +211,5 @@ class LibLogin(LibOperator):
self._showTrace(f"用户 {username}{attempt + 1} 次登录成功 !")
return True
else:
self._showTrace(f"用户 {username}{attempt + 1} 次登录失败 !")
self._showTrace(f"用户 {username}{attempt + 1} 次登录失败 !",self.TraceLevel.WARNING)
return False
+2 -2
View File
@@ -42,7 +42,7 @@ class LibLogout(LibOperator):
) -> bool:
if self.__driver is None:
self._showTrace("未提供有效 WebDriver 实例 !")
self._showTrace("未提供有效 WebDriver 实例 !", self.TraceLevel.WARNING)
return False
try:
self.__driver.find_element(
@@ -51,5 +51,5 @@ class LibLogout(LibOperator):
self._showTrace(f"用户 {username} 注销成功 !")
return True
except Exception as e:
self._showTrace(f"用户 {username} 注销失败 ! : {e}")
self._showTrace(f"用户 {username} 注销失败 ! : {e}", self.TraceLevel.ERROR)
return False
+17 -15
View File
@@ -54,14 +54,14 @@ class LibRenew(LibTimeSelector):
EC.presence_of_element_located((By.CSS_SELECTOR, "#extendDiv div.resultMessage"))
)
except:
self._showTrace("续约时间选择界面加载失败 !")
self._showTrace("续约时间选择界面加载失败 !", self.TraceLevel.ERROR)
return False
head_message = head_message.text.strip()
if "警告" in head_message:
result_message = result_message.text.strip()
self._showTrace(f"\n"\
f" 续约失败 !\n"\
f" {result_message}")
f" {result_message}", no_log=True)
return False
try:
WebDriverWait(self.__driver, 2).until(
@@ -73,7 +73,7 @@ class LibRenew(LibTimeSelector):
EC.presence_of_element_located((By.CSS_SELECTOR, "#extendDiv .btnOK"))
)
except:
self._showTrace("续约时间选择界面加载失败 !")
self._showTrace("续约时间选择界面加载失败 !", self.TraceLevel.ERROR)
return False
return True
@@ -91,7 +91,7 @@ class LibRenew(LibTimeSelector):
renew_info = reserve_info["renew_time"]
max_diff = renew_info["max_diff"]
prefer_earlier = renew_info["prefer_early"]
target_renew_mins = self._timeToMins(end_time) + renew_info["expect_duration"]*60
target_renew_mins = self._timeStrToMins(end_time) + renew_info["expect_duration"]*60
# Validate and adjust target renew time to library closing time
if not self.__validateAndAdjustRenewTime(end_time, target_renew_mins):
@@ -99,7 +99,7 @@ class LibRenew(LibTimeSelector):
renew_ok_btn = self.__driver.find_element(By.CSS_SELECTOR, "#extendDiv .btnOK")
renew_time_opts = self.__driver.find_elements(By.CSS_SELECTOR, "#extendDiv .renewal_List li")
if not renew_time_opts:
self._showTrace("当前未查询到可用续约时间 !")
self._showTrace("当前未查询到可用续约时间 !", self.TraceLevel.WARNING)
return False
# Find best renewal time option
@@ -110,7 +110,8 @@ class LibRenew(LibTimeSelector):
return self.__confirmRenewal(best_opt, best_text, actual_diff, record, renew_ok_btn)
self._showTrace(
"无法选择最近的可用续约时间 ! "
f"所有可选时间与目标时间相差都超过了 {max_diff} 分钟 !"
f"所有可选时间与目标时间相差都超过了 {max_diff} 分钟 !",
self.TraceLevel.WARNING
)
self._showTrace(f"当前可供续约的时间有: {free_times}")
return False
@@ -127,12 +128,12 @@ class LibRenew(LibTimeSelector):
"""
LIBRARY_CLOSE_TIME = 1410 # 23:30 in minutes
if target_renew_mins > LIBRARY_CLOSE_TIME:
actual_renew_duration = LIBRARY_CLOSE_TIME - self._timeToMins(end_time)
actual_renew_duration = LIBRARY_CLOSE_TIME - self._timeStrToMins(end_time)
if actual_renew_duration <= 0:
self._showTrace(f"当前结束时间 {end_time} 已接近闭馆时间,无法续约 !")
self._showTrace(f"当前结束时间 {end_time} 已接近闭馆时间,无法续约 !", self.TraceLevel.ERROR)
return False
self._showTrace(
f"续约时间已调整至闭馆时间 {self._minsToTime(LIBRARY_CLOSE_TIME)}"
f"续约时间已调整至闭馆时间 {self._minsToTimeStr(LIBRARY_CLOSE_TIME)}"
f"实际续约时长为 {actual_renew_duration//60} 小时 {actual_renew_duration%60} 分钟"
)
return True
@@ -163,7 +164,7 @@ class LibRenew(LibTimeSelector):
ok_btn.click()
return True
except:
self._showTrace("确认续约时发生错误 !")
self._showTrace("确认续约时发生错误 !", self.TraceLevel.ERROR)
return False
@@ -175,28 +176,29 @@ class LibRenew(LibTimeSelector):
) -> bool:
if self.__driver is None:
self._showTrace("未提供有效 WebDriver 实例 !")
self._showTrace("未提供有效 WebDriver 实例 !", self.TraceLevel.WARNING)
return False
try:
renew_btn = WebDriverWait(self.__driver, 2).until(
EC.element_to_be_clickable((By.ID, "btnExtend"))
)
except:
self._showTrace(f"用户 {username} 续约界面加载失败 !")
self._showTrace(f"用户 {username} 续约界面加载失败 !", self.TraceLevel.ERROR)
return False
if "disabled" in renew_btn.get_attribute("class"):
self._showTrace(f"用户 {username} 续约按钮不可用, 可能不在场馆内, 请连接图书馆网络后重试")
self._showLog(f"用户 {username} 续约按钮不可用, 可能不在场馆内")
self._showTrace(f"用户 {username} 续约按钮不可用, 可能不在场馆内, 请连接图书馆网络后重试", no_log=True)
return False
renew_btn.click()
if not self.__waitRenewDialog():
self._showTrace(f"用户 {username} 续约失败 !")
self._showTrace(f"用户 {username} 续约失败 !", self.TraceLevel.ERROR)
# After the renewal, the webpage will display a mask overlay,
# so we need to refresh the page for subsequent operations.
self.__driver.refresh()
return False
if not self.__selectNearestTime(record, reserve_info):
self._showTrace(f"用户 {username} 续约失败 !")
self._showTrace(f"用户 {username} 续约失败 !", self.TraceLevel.ERROR)
self.__driver.refresh()
return False
if self._waitResponseLoad():
+101 -67
View File
@@ -72,13 +72,13 @@ class LibReserve(LibTimeSelector):
By.CSS_SELECTOR, ".layoutSeat dd"
)
if not content_elements:
self._showTrace("未找到预约结果")
self._showTrace("未找到预约结果", self.TraceLevel.WARNING)
raise
title = title_elements[0].text if title_elements else ""
contents = [element.text for element in content_elements if element.text.strip()]
for message in contents:
if "预约失败" in message or "已有1个有效预约" in message:
self._showTrace(f"预约失败 - {"".join(contents)}")
self._showTrace(f"预约失败 - {"".join(contents)}", self.TraceLevel.ERROR)
raise
if "预定好了" in title or "预约成功" in title or "操作成功" in title:
if len(contents) >= 6:
@@ -96,7 +96,7 @@ class LibReserve(LibTimeSelector):
)
return True
except:
self._showTrace(f"预约结果加载失败 !")
self._showTrace(f"预约结果加载失败 !", self.TraceLevel.ERROR)
return False
@@ -107,6 +107,8 @@ class LibReserve(LibTimeSelector):
try:
# must contain the required infomation
# key 'place' is no need to check
# because 'place' is only has one possible value '1' or '图书馆'
if reserve_info.get("floor") is None: # if existence ?
raise ValueError("未指定楼层")
if reserve_info["floor"] not in self.__floor_map: # if in the mao ?
@@ -123,7 +125,13 @@ class LibReserve(LibTimeSelector):
except ValueError as e:
self._showTrace(
f"预约信息错误 ! : {e}, "\
f"由于缺少必要的预约信息, 无法开始预约流程, 请检查预约信息是否完整"
f"由于缺少必要的预约信息, 无法开始预约流程",
self.TraceLevel.ERROR
)
self._showTrace(
f"预约信息错误 ! : {e}, "\
f"由于缺少必要的预约信息, 无法开始预约流程, 请检查预约信息是否完整",
no_log=True
)
return False
@@ -133,17 +141,20 @@ class LibReserve(LibTimeSelector):
reserve_info: dict
) -> bool:
cur_date = time.strftime("%Y-%m-%d", time.localtime())
cur_date_str = time.strftime("%Y-%m-%d", time.localtime())
cur_timestamp = time.mktime(time.strptime(cur_date_str, "%Y-%m-%d"))
if reserve_info.get("date") is None:
reserve_info["date"] = cur_date
self._showTrace(f"预约日期未指定, 自动设置为当前日期: {cur_date}")
reserve_info["date"] = cur_date_str
self._showTrace(f"预约日期未指定, 自动设置为当前日期: {cur_date_str}")
else:
if reserve_info["date"] < cur_date:
res_timestamp = time.mktime(time.strptime(reserve_info["date"], "%Y-%m-%d"))
if res_timestamp < cur_timestamp:
self._showTrace(
f"预约日期错误 ! :"\
f"{reserve_info['date']} 早于当前日期 {cur_date}, 自动设置为当前日期"
f"{reserve_info['date']} 早于当前日期 {cur_date_str}, 自动设置为当前日期",
self.TraceLevel.WARNING
)
reserve_info["date"] = cur_date
reserve_info["date"] = cur_date_str
return True
@@ -190,10 +201,13 @@ class LibReserve(LibTimeSelector):
if reserve_info.get("end_time") is None:
reserve_info["end_time"] = {}
if "time" not in reserve_info["end_time"]:
end_mins = self._timeToMins(reserve_info["begin_time"]["time"])
# here we add the expect duration to the begin time first,
# the edge case that the end time is later than 23:30 will
# be handled in __finalCheck. so no need to concern about it.
end_mins = self._timeStrToMins(reserve_info["begin_time"]["time"])
end_mins = end_mins + int(reserve_info["expect_duration"]*60)
reserve_info["end_time"] = {
"time": self._minsToTime(end_mins),
"time": self._minsToTimeStr(end_mins),
"max_diff": 30,
"prefer_early": False
}
@@ -215,32 +229,39 @@ class LibReserve(LibTimeSelector):
):
begin_time, end_time = reserve_info["begin_time"], reserve_info["end_time"]
begin_mins = self._timeToMins(begin_time["time"])
end_mins = self._timeToMins(end_time["time"])
begin_mins = self._timeStrToMins(begin_time["time"])
end_mins = self._timeStrToMins(end_time["time"])
# if end time is earlier than begin_time, exchange them
if end_mins < begin_mins:
# except that the user has set the satisfy_duration to True
if end_mins < begin_mins and reserve_info["satisfy_duration"] is False:
self._showTrace(
f"结束时间 {end_time['time']} 早于开始时间 {begin_time['time']}, 尝试交换时间"
f"结束时间 {end_time['time']} 早于开始时间 {begin_time['time']}, 尝试交换时间",
self.TraceLevel.WARNING
)
reserve_info["end_time"] = begin_time
reserve_info["begin_time"] = end_time
begin_time, end_time = reserve_info["begin_time"], reserve_info["end_time"]
begin_mins = self._timeToMins(begin_time["time"])
end_mins = self._timeToMins(end_time["time"])
reserve_info["end_time"], reserve_info["begin_time"] = begin_time, end_time
begin_time, end_time = end_time, begin_time
begin_mins = self._timeStrToMins(begin_time["time"])
end_mins = self._timeStrToMins(end_time["time"])
# ensure the end time is not later than 23:30
if end_mins > self._timeToMins("23:30"):
max_end_mins = self._timeStrToMins("23:30")
if end_mins > max_end_mins:
self._showTrace(
f"结束时间 {end_time['time']} 晚于 23:30, 自动设置为 23:30"
f"结束时间 {end_time['time']} 晚于 23:30, 自动设置为 23:30",
self.TraceLevel.WARNING
)
reserve_info["end_time"]["time"] = "23:30"
end_mins = self._timeToMins("23:30")
end_mins = max_end_mins
# ensure the duration is not longer than 8 hours
if reserve_info["satisfy_duration"]:
if reserve_info["expect_duration"] > 8:
self._showTrace(
f"该用户设置了优先满足时长要求, 但是预约期望持续时间 "
f"{reserve_info['expect_duration']} 小时 "
f"超出最大时长 8 小时, 自动设置为 8 小时"
f"超出最大时长 8 小时, 自动设置为 8 小时",
self.TraceLevel.WARNING
)
reserve_info["expect_duration"] = 8
else:
@@ -248,9 +269,10 @@ class LibReserve(LibTimeSelector):
self._showTrace(
f"该用户未设置优先满足时长要求, 但是检查到预约持续时间 "
f"{float((end_mins - begin_mins)/60)} 小时 "
f"超出最大时长 8 小时, 自动设置为 8 小时"
f"超出最大时长 8 小时, 自动设置为 8 小时",
self.TraceLevel.WARNING
)
reserve_info["end_time"]["time"] = self._minsToTime(begin_mins + 8*60)
reserve_info["end_time"]["time"] = self._minsToTimeStr(begin_mins + 8*60)
return True
@@ -274,8 +296,8 @@ class LibReserve(LibTimeSelector):
self._showTrace(
f"预约信息检查完成, 准备预约 "
f"{reserve_info['date']} "
f"{reserve_info['begin_time']["time"]} - "
f"{reserve_info['end_time']["time"]} "
f"{reserve_info['begin_time']['time']} - "
f"{reserve_info['end_time']['time']} "
f"图书馆 "
f"{self.__floor_map[reserve_info['floor']]} "
f"{self.__room_map[reserve_info['room']]} "
@@ -418,7 +440,7 @@ class LibReserve(LibTimeSelector):
EC.element_to_be_clickable((By.ID, "findRoom"))
).click()
except:
self._showTrace("加载房间/区域失败 !")
self._showTrace("加载房间/区域失败 !", self.TraceLevel.ERROR)
return False
# select room
try:
@@ -428,7 +450,7 @@ class LibReserve(LibTimeSelector):
self._showTrace(f"房间 {display_room} 选择成功 !")
return True
except:
self._showTrace(f"选择房间失败 ! : {display_room} 不可用")
self._showTrace(f"选择房间失败 ! : {display_room} 不可用", self.TraceLevel.ERROR)
return False
@@ -446,7 +468,7 @@ class LibReserve(LibTimeSelector):
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "li[id^='seat_']"))
)
except:
self._showTrace(f"座位加载失败 !")
self._showTrace(f"座位加载失败 !", self.TraceLevel.ERROR)
return False
try:
all_seats = self.__driver.find_elements(
@@ -464,9 +486,10 @@ class LibReserve(LibTimeSelector):
seat_status = seat_link.get_attribute("title")
self._showTrace(f"座位 {seat_id} 选择成功 ! : 当前状态 - '{seat_status}'")
return True
self._showTrace(f"座位 {seat_id} 在该楼层区域中不存在, 请检查座位号是否正确")
self._showLog(f"座位 {seat_id} 在该楼层区域中不存在", self.TraceLevel.WARNING)
self._showTrace(f"座位 {seat_id} 在该楼层区域中不存在, 请检查座位号是否正确", no_log=True)
except:
self._showTrace(f"座位选择失败 !")
self._showTrace(f"座位选择失败 !", self.TraceLevel.ERROR)
return False
@@ -481,6 +504,9 @@ class LibReserve(LibTimeSelector):
"""
Select the nearest available time option.
Returns:
int: The actual selected time value in minutes.
"""
# Wait for time options to load
try:
@@ -490,7 +516,7 @@ class LibReserve(LibTimeSelector):
)
)
except:
self._showTrace(f"{time_type} 选择失败 ! : 当前未查询到可用时间")
self._showTrace(f"{time_type} 选择失败 ! : 当前未查询到可用时间", self.TraceLevel.ERROR)
return -1
# Find best time option
@@ -499,7 +525,7 @@ class LibReserve(LibTimeSelector):
f"#{time_id} ul li a"
)
if not all_time_opts:
self._showTrace(f"{time_type} 选择失败 ! : 当前未查询到可用时间")
self._showTrace(f"{time_type} 选择失败 ! : 当前未查询到可用时间", self.TraceLevel.ERROR)
return -1
best_opt, best_text, actual_diff, free_times = self._findBestTimeOption(
all_time_opts, target_time, max_time_diff, prefer_earlier, is_reserve=True
@@ -515,8 +541,8 @@ class LibReserve(LibTimeSelector):
)
return target_time
self._showTrace(
f"无法选择最近的 {time_type} {self._minsToTime(target_time)}, "
f"所有可选时间与目标时间相差都超过 {max_time_diff} 分钟"
f"无法选择最近的 {time_type} {self._minsToTimeStr(target_time)}, "
f"所有可选时间与目标时间相差都超过 {max_time_diff} 分钟", self.TraceLevel.WARNING
)
self._showTrace(f"当前可供预约的 {time_type} 有: {free_times}")
return -1
@@ -526,51 +552,58 @@ class LibReserve(LibTimeSelector):
self,
begin_time: dict,
end_time: dict,
expct_duration: int = 4,
expect_duration: int = 4,
satisfy_duration: bool = True
) -> bool:
"""Select seat begin and end time."""
expect_begin_time = actual_begin_time = begin_time["time"]
expect_end_time = actual_end_time = end_time["time"]
expect_begin_mins = self._timeToMins(expect_begin_time)
actual_begin_mins = expect_begin_mins
expect_end_mins = self._timeToMins(expect_end_time)
"""
Select seat begin and end time.
"""
exp_beg_tm_str = begin_time["time"]
exp_end_tm_str = end_time["time"]
# Initialize actual time strings for logging
act_beg_tm_str = exp_beg_tm_str
act_end_tm_str = exp_end_tm_str
exp_beg_mins = self._timeStrToMins(exp_beg_tm_str)
act_beg_mins = exp_beg_mins
exp_end_mins = self._timeStrToMins(exp_end_tm_str)
act_end_mins = exp_end_mins
# Select begin time
if self.__selectNearestTime(
act_beg_mins = self.__selectNearestTime(
time_id="startTime",
time_type="开始时间",
target_time=expect_begin_mins,
target_time=exp_beg_mins,
max_time_diff=begin_time["max_diff"],
prefer_earlier=begin_time["prefer_early"]
) == -1:
)
if act_beg_mins == -1:
return False
actual_begin_time = self._minsToTime(expect_begin_mins)
actual_begin_mins = self._timeToMins(actual_begin_time)
act_beg_tm_str = self._minsToTimeStr(act_beg_mins)
# If 'satisfy_duration' is True, select end time based on actual begin time
if satisfy_duration:
expect_end_mins = self.validateAndAdjustEndTime(actual_begin_mins, expct_duration)
expect_end_time = self._minsToTime(expect_end_mins)
exp_end_mins = int(self.validateAndAdjustEndTime(act_beg_mins, expect_duration))
exp_end_tm_str = self._minsToTimeStr(exp_end_mins)
self._showTrace(
f"需要满足期望预约持续时间: {expct_duration} 小时, "
f"根据开始时间 {actual_begin_time} 计算结束时间: {expect_end_time}"
f"需要满足期望预约持续时间: {expect_duration} 小时, "
f"根据开始时间 {act_beg_tm_str} 计算结束时间: {exp_end_tm_str}"
)
# Select end time
if self.__selectNearestTime(
act_end_mins = self.__selectNearestTime(
time_id="endTime",
time_type="结束时间",
target_time=expect_end_mins,
target_time=exp_end_mins,
max_time_diff=end_time["max_diff"],
prefer_earlier=end_time["prefer_early"]
) == -1:
)
if act_end_mins == -1:
return False
actual_end_time = self._minsToTime(expect_end_mins)
act_end_tm_str = self._minsToTimeStr(act_end_mins)
self._showTrace(
f"期望预约时间段: {expect_begin_time} - {expect_end_time}, "
f"实际预约时间段: {actual_begin_time} - {actual_end_time}"
f"期望预约时间段: {exp_beg_tm_str} - {exp_end_tm_str}, "
f"实际预约时间段: {act_beg_tm_str} - {act_end_tm_str}"
)
return True
@@ -584,12 +617,13 @@ class LibReserve(LibTimeSelector):
"""
Validate and adjust reserve end time to library closing time if needed.
"""
LIBRARY_CLOSE_TIME = self._timeToMins("23:30")
expect_end_mins = begin_mins + duration * 60
LIBRARY_CLOSE_TIME = self._timeStrToMins("23:30")
expect_end_mins = int(begin_mins + duration*60)
if expect_end_mins > LIBRARY_CLOSE_TIME:
expect_end_mins = LIBRARY_CLOSE_TIME
self._showTrace(
f"预约持续时间 {duration} 小时, 超过最大预约时间 23:30, 自动调整为 23:30"
f"预约持续时间 {duration} 小时, 超过最大预约时间 23:30, 自动调整为 23:30",
self.TraceLevel.WARNING
)
return expect_end_mins
@@ -616,7 +650,7 @@ class LibReserve(LibTimeSelector):
EC.presence_of_element_located((By.ID, "seatLayout"))
)
except:
self._showTrace(f"加载预约选座页面失败 !")
self._showTrace(f"加载预约选座页面失败 !", self.TraceLevel.ERROR)
return False
# date, place, floor, room
if not self.__selectDate(reserve_info["date"]):
@@ -635,7 +669,7 @@ class LibReserve(LibTimeSelector):
elif not self.__selectSeatTime(
begin_time=reserve_info["begin_time"],
end_time=reserve_info["end_time"],
expct_duration=reserve_info["expect_duration"],
expect_duration=reserve_info["expect_duration"],
satisfy_duration=reserve_info["satisfy_duration"]
):
pass
@@ -649,11 +683,11 @@ class LibReserve(LibTimeSelector):
raise
reserve_success = True
except:
self._showTrace(f"预约提交失败 !")
self._showTrace(f"预约提交失败 !", self.TraceLevel.ERROR)
if not submit_reserve and have_hover_on_page:
self.__driver.refresh()
if reserve_success:
self._showTrace(f"用户 {username} 预约成功 !")
else:
self._showTrace(f"用户 {username} 预约失败 !")
self._showTrace(f"用户 {username} 预约失败 !", self.TraceLevel.ERROR)
return reserve_success
+58
View File
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2025 - 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import os
from PySide6.QtCore import QStandardPaths, QDir
from utils.ConfigManager import instance as configInstance
from utils.LogManager import instance as logInstance
def initializeConfigManager(
) -> bool:
logger = logInstance().getLogger("AppInitializer")
app_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppDataLocation)
old_config_dir = os.path.join(app_dir, "config")
new_config_dir = os.path.join(app_dir, "configs")
if QDir(old_config_dir).exists(): # old config dir exists
#we rename it to compatible with new version
logger.info("存在旧配置目录 %s,将其重命名为 %s", old_config_dir, new_config_dir)
if not QDir().rename(old_config_dir, new_config_dir):
logger.error("重命名旧配置目录 %s%s 失败", old_config_dir, new_config_dir)
return False
elif not QDir(new_config_dir).exists():
logger.info("初始化配置目录 %s", new_config_dir)
if not QDir().mkpath(new_config_dir):
logger.error("创建配置目录 %s 失败", new_config_dir)
return False
configInstance(new_config_dir)
return True
def initializeLogManager(
) -> bool:
app_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppDataLocation)
log_dir = os.path.join(app_dir, "logs")
if not QDir(log_dir).exists():
if not QDir().mkpath(log_dir):
return False
logInstance(log_dir)
return True
def initializeApp(
) -> bool:
if not initializeLogManager():
return False
if not initializeConfigManager():
return False
return True
+191
View File
@@ -0,0 +1,191 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import logging
import os
import threading
from logging.handlers import TimedRotatingFileHandler
from typing import Optional
class CallerInfoFormatter(logging.Formatter):
"""
Custom formatter to extract real caller information.
Skips MsgBase._showTrace to show the actual calling location.
Format:
- Logger name: left-aligned, max 15 chars
- Level name: left-aligned, max 8 chars
- Filename: left-aligned, max 20 chars
- Line number: left-aligned, max 4 digits
"""
def __init__(
self,
fmt=None,
datefmt=None,
style='%'
):
super().__init__(fmt, datefmt, style)
self.basefmt = fmt
def format(
self,
record
):
depth = 0
while depth < 10:
record.filename = os.path.basename(record.pathname)
if 'MsgBase.py' not in record.filename and record.funcName != '_showTrace':
break
if not hasattr(record, 'stack'):
record.stack = True
import traceback
record.stack_list = traceback.extract_stack()
depth += 1
if depth < len(record.stack_list):
frame = record.stack_list[-depth-1]
record.filename = os.path.basename(frame.filename)
record.lineno = frame.lineno
record.funcName = frame.name
record.name = record.name[-15:].ljust(15)
record.levelname = record.levelname.ljust(8)
record.filename = record.filename[-20:].ljust(20)
record.lineno = f"{record.lineno:04d}"
return super().format(record)
class LogManager:
"""
Log Manager Singleton Class
Args:
log_dir (str): The directory to store log files.
"""
def __init__(
self,
log_dir: str
):
self.__log_dir = os.path.abspath(log_dir)
self.__logger = None
self.__initialized = False
self.initialize()
def initialize(
self
):
if self.__initialized:
return
os.makedirs(self.__log_dir, exist_ok=True)
self.__logger = logging.getLogger("AutoLibrary")
self.__logger.setLevel(logging.DEBUG)
self.__logger.handlers.clear()
formatter = CallerInfoFormatter(
'[%(asctime)s] - [%(name)s] - [%(levelname)s] - [%(filename)s:%(lineno)s] - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
self.__logger.addHandler(console_handler)
all_log_file = os.path.join(self.__log_dir, "all.log")
file_handler_all = TimedRotatingFileHandler(
all_log_file,
when='midnight',
interval=1,
backupCount=7,
encoding='utf-8'
)
file_handler_all.suffix = "%Y-%m-%d.log"
file_handler_all.setLevel(logging.DEBUG)
file_handler_all.setFormatter(formatter)
self.__logger.addHandler(file_handler_all)
error_log_file = os.path.join(self.__log_dir, "error.log")
file_handler_error = TimedRotatingFileHandler(
error_log_file,
when='midnight',
interval=1,
backupCount=14,
encoding='utf-8'
)
file_handler_error.suffix = "%Y-%m-%d.log"
file_handler_error.setLevel(logging.ERROR)
file_handler_error.setFormatter(formatter)
self.__logger.addHandler(file_handler_error)
self.__initialized = True
def getLogger(
self,
name: Optional[str] = None
) -> logging.Logger:
if name:
return self.__logger.getChild(name)
return self.__logger
def setLevel(
self,
level: int
):
if self.__logger:
self.__logger.setLevel(level)
def logDir(
self
) -> str:
return self.__log_dir
# LogManager singleton instance.
_log_manager_instance = None
# Singleton instance lock.
_instance_lock = threading.Lock()
def instance(
log_dir: str = ""
) -> LogManager:
global _log_manager_instance
with _instance_lock:
if _log_manager_instance is None:
if not log_dir:
raise ValueError("LogManager initialization requires log_dir parameter")
_log_manager_instance = LogManager(log_dir)
else:
if log_dir and _log_manager_instance.logDir() != os.path.abspath(log_dir):
raise ValueError("LogManager instance already initialized with a different log directory")
return _log_manager_instance
def getLogger(
name: Optional[str] = None
) -> logging.Logger:
if _log_manager_instance is None:
raise RuntimeError("LogManager not initialized, please call LogManager.instance(log_dir) first")
return _log_manager_instance.getLogger(name)