1
1
mirror of https://github.com/KenanZhu/AutoLibrary.git synced 2026-06-20 00:13:02 +08:00

Compare commits

...

12 Commits

Author SHA1 Message Date
KenanZhu 02463f087e feat(MsgBase, gui, operators): 增强日志输出功能
- 为 _showTrace 方法添加 no_log 参数,支持控制日志写入
- 在主窗口各关键操作点添加日志输出
- 优化错误信息输出策略,分离 trace 和 log 输出
- 改进配置目录初始化过程的日志记录
2026-03-18 12:46:37 +08:00
KenanZhu e481824344 refactor(AppInitializer): 初始化应用程序时,先初始化日志管理器,再初始化配置管理器 2026-03-18 11:03:44 +08:00
KenanZhu 160d6a2428 refactor(operators): 为 _showTrace 方法添加合适的 TraceLevel 参数 2026-03-18 11:02:52 +08:00
KenanZhu ec683cf154 feat(LogManager): 新增日志持久化功能 2026-03-18 10:21:53 +08:00
KenanZhu 2d0782c368 refactor(AppInitializer): 将初始化逻辑提取到 AppInitializer 模块中
- 本次提交将 Main.py 中的 ConfigManager, LogManager 等初始化逻辑提取到 AppInitializer 模块中
- 更改默认的配置文件路径从 config 目录变为 configs 目录,并考虑兼容性问题
2026-03-18 10:17:09 +08:00
KenanZhu 824b9b8869 fix(ALMainWindow): 修复 ALMainWindow 的配置路径同步问题
- 先前的实现并未考虑到配置窗口更改时的同步问题,本次提交在
  每次配置窗口更改并关闭保存时,同步更新 ALMainWindow 中的配置路径
2026-03-18 10:14:27 +08:00
KenanZhu c26f19b6b3 feat(LogManager): 新增日志持久化功能
- 新增 LogManager 单例类,支持日志文件按日期滚动
- 创建 CallerInfoFormatter 自定义格式化器,提取真实调用位置
- 为 MsgBase._showTrace 方法添加日志级别参数,集成日志系统
- 新增 initializeLogManager 初始化函数,日志存储于 AppDataLocation/logs/
- 日志输出格式对齐:[时间] - [类名(15)|级别(8)] - [文件:行号(20:4)] - 消息
- 控制台/INFO级别,全量日志 / DEBUG 级别,错误日志 / ERROR级别
- 全量日志保留7天,错误日志保留14天
2026-03-17 21:37:24 +08:00
KenanZhu 1d99ca92f2 fix(LibReserve): 修复日期比较逻辑错误并优化时间处理代码
- 修复使用字符串直接比较日期导致的逻辑错误,改用时间戳比较
- 优化时间验证逻辑,支持 satisfy_duration 模式下的开始晚于结束时间时的交换时间处理
- 添加必要的注释说明 place 参数检查的跳过原因和边界情况处理
- 重构变量命名,提高代码可读性(cur_date -> cur_date_str)
- 修正字符串引号风格,统一使用单引号
2026-03-17 20:46:00 +08:00
KenanZhu 50ebeb0fab style(LibReserve): 修复 __selectSeatTime 参数的拼写错误
- expct_duration -> expect_duration
2026-03-17 20:43:00 +08:00
KenanZhu faa26b489a fix(LibReserve): 修复冗余的链式赋值 2026-03-17 20:42:42 +08:00
KenanZhu c03eed1d51 fix(LibReserve): 修复错误使用的海象运算符条件判断 2026-03-17 20:42:31 +08:00
KenanZhu 2f5680c547 fix(LibTimeSelector) style(LibReserve): 修复时间转换方法 _timeToMins 并重命名为 _timeStrToMins
- 之前的实现未严格限制传入参数为整形,导致在转换时间字符串时可能出现类型错误。
- 重命名为 _timeStrToMins 以明确表示该方法仅用于时间字符串转换。并更新相关调用。

- 重命名 __selectSeatTime 中的冗长局部变量,便于理解和维护。
- 删除多余的时间格式转换嗲用
2026-03-17 20:00:57 +08:00
15 changed files with 544 additions and 172 deletions
+4 -13
View File
@@ -7,26 +7,17 @@ This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License. You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details. See the LICENSE file for details.
""" """
import os
import sys import sys
from PySide6.QtCore import QTranslator, QStandardPaths, QDir from PySide6.QtCore import QTranslator
from PySide6.QtWidgets import QApplication from PySide6.QtWidgets import QApplication
from gui.ALMainWindow import ALMainWindow from gui.ALMainWindow import ALMainWindow
from gui.resources import ALResource from gui.resources import ALResource
from utils.ConfigManager import instance from utils.AppInitializer import initializeApp
def initializeConfigManager():
app_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppDataLocation)
config_dir = os.path.join(app_dir, "config")
if not QDir(config_dir).exists():
QDir().mkpath(config_dir)
instance(config_dir)
def main(): def main():
app = QApplication(sys.argv) app = QApplication(sys.argv)
@@ -35,12 +26,12 @@ def main():
app.installTranslator(translator) app.installTranslator(translator)
app.setStyle('Fusion') app.setStyle('Fusion')
app.setApplicationName("AutoLibrary") app.setApplicationName("AutoLibrary")
initializeConfigManager() if not initializeApp():
sys.exit(-1)
window = ALMainWindow() window = ALMainWindow()
window.show() window.show()
sys.exit(app.exec_()) sys.exit(app.exec_())
if __name__ == "__main__": if __name__ == "__main__":
main() main()
+16 -10
View File
@@ -9,6 +9,8 @@ See the LICENSE file for details.
""" """
import queue import queue
from datetime import datetime
from base.LibOperator import LibOperator from base.LibOperator import LibOperator
@@ -29,25 +31,33 @@ class LibTimeSelector(LibOperator):
super().__init__(input_queue, output_queue) super().__init__(input_queue, output_queue)
@staticmethod @staticmethod
def _timeToMins( def _timeStrToMins(
time_str: str time_str: str
) -> int: ) -> int:
""" """
Convert time string "HH:MM" to minutes since midnight. Convert time string "HH:MM" to minutes since midnight.
Example:
"10:00" -> 600
"13:30" -> 810
""" """
hour, minute = map(int, time_str.split(":")) hour, minute = map(int, time_str.split(":"))
return hour*60 + minute return hour*60 + minute
@staticmethod @staticmethod
def _minsToTime( def _minsToTimeStr(
mins: int mins: int
) -> str: ) -> str:
""" """
Convert minutes since midnight to time string "HH:MM". Convert minutes since midnight to time string "HH:MM".
Example:
600 -> "10:00"
810 -> "13:30"
""" """
hour, minute = divmod(mins, 60) hour, minute = divmod(int(mins), 60)
return f"{hour:02d}:{minute:02d}" return f"{hour:02d}:{minute:02d}"
@@ -99,11 +109,11 @@ class LibTimeSelector(LibOperator):
for time_opt in time_options: for time_opt in time_options:
# Parse time value based on context # Parse time value based on context
if is_reserve: if is_reserve:
# Reservation context: parse 'time' attribute
time_attr = time_opt.get_attribute("time") time_attr = time_opt.get_attribute("time")
if time_attr == "now": if time_attr == "now":
from datetime import datetime
now = datetime.now() now = datetime.now()
time_val = now.hour * 60 + now.minute time_val = now.hour*60 + now.minute
elif time_attr and time_attr.isdigit(): elif time_attr and time_attr.isdigit():
time_val = int(time_attr) time_val = int(time_attr)
else: else:
@@ -114,9 +124,7 @@ class LibTimeSelector(LibOperator):
if not (time_attr and time_attr.isdigit()): if not (time_attr and time_attr.isdigit()):
continue continue
time_val = int(time_attr) time_val = int(time_attr)
free_times.append(time_opt.text.strip() if not is_reserve else self._minsToTimeStr(time_val))
free_times.append(time_opt.text.strip() if not is_reserve else self._minsToTime(time_val))
actual_diff = time_val - target_time actual_diff = time_val - target_time
abs_diff = abs(actual_diff) abs_diff = abs(actual_diff)
@@ -125,11 +133,9 @@ class LibTimeSelector(LibOperator):
(abs_diff == best_time_diff and (abs_diff == best_time_diff and
((prefer_earlier and actual_diff <= 0) or ((prefer_earlier and actual_diff <= 0) or
(not prefer_earlier and actual_diff >= 0)))): (not prefer_earlier and actual_diff >= 0)))):
best_time_diff = abs_diff best_time_diff = abs_diff
best_actual_diff = actual_diff best_actual_diff = actual_diff
best_time_opt = time_opt best_time_opt = time_opt
if best_time_opt is not None: if best_time_opt is not None:
return (best_time_opt, best_time_opt.text.strip(), best_actual_diff, free_times) return (best_time_opt, best_time_opt.text.strip(), best_actual_diff, free_times)
return (None, None, None, free_times) return (None, None, None, free_times)
+34 -1
View File
@@ -7,9 +7,12 @@ This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License. You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details. See the LICENSE file for details.
""" """
import logging
import queue import queue
import datetime import datetime
from utils.LogManager import getLogger
class MsgBase: class MsgBase:
""" """
@@ -29,6 +32,18 @@ class MsgBase:
implement queue polling to retrieve and process messages. implement queue polling to retrieve and process messages.
""" """
class TraceLevel:
"""
Enum class for trace levels.
This class provides the trace levels for the logger.
"""
DEBUG = logging.DEBUG
INFO = logging.INFO
WARNING = logging.WARNING
ERROR = logging.ERROR
CRITICAL = logging.CRITICAL
def __init__( def __init__(
self, self,
input_queue: queue.Queue, input_queue: queue.Queue,
@@ -38,6 +53,10 @@ class MsgBase:
self._class_name = self.__class__.__name__ self._class_name = self.__class__.__name__
self._input_queue = input_queue self._input_queue = input_queue
self._output_queue = output_queue self._output_queue = output_queue
try:
self._logger = getLogger(self._class_name)
except RuntimeError:
self._logger = None
def _showMsg( def _showMsg(
@@ -50,11 +69,25 @@ class MsgBase:
def _showTrace( def _showTrace(
self, self,
msg: str msg: str,
level: int = logging.INFO,
no_log: bool = False
): ):
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3]
self._output_queue.put(f"{timestamp}-[{self._class_name:<15}] : {msg}") self._output_queue.put(f"{timestamp}-[{self._class_name:<15}] : {msg}")
if self._logger and not no_log:
self._logger.log(level, msg)
def _showLog(
self,
msg: str,
level: int = logging.INFO
):
if self._logger:
self._logger.log(level, msg)
def _waitMsg( def _waitMsg(
-1
View File
@@ -8,7 +8,6 @@ You may use, modify, and distribute this file under the terms of the MIT License
See the LICENSE file for details. See the LICENSE file for details.
""" """
import os import os
import sys
from PySide6.QtCore import ( from PySide6.QtCore import (
Qt, Signal, Slot, QTime, QDate, QDir, QFileInfo Qt, Signal, Slot, QTime, QDate, QDir, QFileInfo
+11 -3
View File
@@ -59,6 +59,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
self.connectSignals() self.connectSignals()
self.startMsgPolling() self.startMsgPolling()
self.startTimerTaskPolling() self.startTimerTaskPolling()
self._showLog("主窗口初始化完成")
def modifyUi( def modifyUi(
@@ -113,7 +114,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
): ):
if not QSystemTrayIcon.isSystemTrayAvailable(): if not QSystemTrayIcon.isSystemTrayAvailable():
self._showTrace("操作系统不支持系统托盘功能, 无法创建系统托盘图标") self._showTrace("操作系统不支持系统托盘功能, 无法创建系统托盘图标", self.TraceLevel.WARNING)
return return
self.TrayIcon = QSystemTrayIcon(self.icon, self) self.TrayIcon = QSystemTrayIcon(self.icon, self)
self.TrayIcon.setToolTip("AutoLibrary") self.TrayIcon.setToolTip("AutoLibrary")
@@ -186,6 +187,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
if self.__alConfigWidget: if self.__alConfigWidget:
self.__alConfigWidget.close() self.__alConfigWidget.close()
# the config widget is already deleted in the 'self.onConfigWidgetClosed' # the config widget is already deleted in the 'self.onConfigWidgetClosed'
self._showLog("主窗口关闭")
QMainWindow.closeEvent(self, event) QMainWindow.closeEvent(self, event)
@@ -298,7 +300,9 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
self.__alConfigWidget.configWidgetIsClosed.disconnect(self.onConfigWidgetClosed) self.__alConfigWidget.configWidgetIsClosed.disconnect(self.onConfigWidgetClosed)
self.__alConfigWidget.deleteLater() self.__alConfigWidget.deleteLater()
self.__alConfigWidget = None self.__alConfigWidget = None
self.__config_paths = ConfigManager.getValidateAutomationConfigPaths()
self.setControlButtons(True, None, None) self.setControlButtons(True, None, None)
self._showLog("配置窗口已关闭,配置文件路径已更新")
@Slot(dict) @Slot(dict)
def onTimerTaskIsReady( def onTimerTaskIsReady(
@@ -346,6 +350,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
self.__alTimerTaskManageWidget.raise_() self.__alTimerTaskManageWidget.raise_()
self.__alTimerTaskManageWidget.activateWindow() self.__alTimerTaskManageWidget.activateWindow()
self.TimerTaskManageWidgetButton.setEnabled(False) self.TimerTaskManageWidgetButton.setEnabled(False)
self._showLog("打开定时任务管理窗口")
@Slot() @Slot()
def onConfigButtonClicked( def onConfigButtonClicked(
@@ -359,6 +364,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
self.__alConfigWidget.raise_() self.__alConfigWidget.raise_()
self.__alConfigWidget.activateWindow() self.__alConfigWidget.activateWindow()
self.ConfigButton.setEnabled(False) self.ConfigButton.setEnabled(False)
self._showLog("打开配置窗口")
@Slot() @Slot()
def onStartButtonClicked( def onStartButtonClicked(
@@ -375,6 +381,7 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
self.__auto_lib_thread.autoLibWorkerIsFinished.connect(self.onStopButtonClicked) self.__auto_lib_thread.autoLibWorkerIsFinished.connect(self.onStopButtonClicked)
self.__auto_lib_thread.autoLibWorkerFinishedWithError.connect(self.onStopButtonClicked) self.__auto_lib_thread.autoLibWorkerFinishedWithError.connect(self.onStopButtonClicked)
self.__auto_lib_thread.start() self.__auto_lib_thread.start()
self._showLog("开始手动执行任务")
@Slot() @Slot()
def onStopButtonClicked( def onStopButtonClicked(
@@ -382,14 +389,15 @@ class ALMainWindow(MsgBase, QMainWindow, Ui_ALMainWindow):
): ):
if self.__auto_lib_thread: if self.__auto_lib_thread:
self._showTrace("正在停止操作......") self._showTrace("正在停止操作......", no_log=True)
self.__auto_lib_thread.wait(2000) self.__auto_lib_thread.wait(2000)
self._showTrace("操作已停止") self._showTrace("操作已停止", no_log=True)
self.__auto_lib_thread.autoLibWorkerIsFinished.disconnect(self.onStopButtonClicked) self.__auto_lib_thread.autoLibWorkerIsFinished.disconnect(self.onStopButtonClicked)
self.__auto_lib_thread.autoLibWorkerFinishedWithError.disconnect(self.onStopButtonClicked) self.__auto_lib_thread.autoLibWorkerFinishedWithError.disconnect(self.onStopButtonClicked)
self.__auto_lib_thread.deleteLater() self.__auto_lib_thread.deleteLater()
self.__auto_lib_thread = None self.__auto_lib_thread = None
self.setControlButtons(None, False, True) self.setControlButtons(None, False, True)
self._showLog("任务已停止")
@Slot() @Slot()
def onSendButtonClicked( def onSendButtonClicked(
+29 -11
View File
@@ -44,9 +44,11 @@ class AutoLibWorker(MsgBase, QThread):
current_time = time.strftime("%H:%M", time.localtime()) current_time = time.strftime("%H:%M", time.localtime())
if current_time >= "23:30" or current_time <= "07:30": if current_time >= "23:30" or current_time <= "07:30":
self._showTrace( self._showTrace(
"当前时间不在图书馆开放时间内, 请在 07:30 - 23:30 之间尝试" "当前时间不在图书馆开放时间内, 请在 07:30 - 23:30 之间尝试",
self.TraceLevel.WARNING
) )
return False return False
self._showLog(f"时间检查通过, 当前时间: {current_time}", self.TraceLevel.INFO)
return True return True
@@ -57,8 +59,12 @@ class AutoLibWorker(MsgBase, QThread):
if not all( if not all(
os.path.exists(path) for path in self.__config_paths.values() os.path.exists(path) for path in self.__config_paths.values()
): ):
self._showTrace("配置文件路径不存在, 请检查配置文件路径是否正确") self._showTrace(
"配置文件路径不存在, 请检查配置文件路径是否正确",
self.TraceLevel.ERROR
)
return False return False
self._showLog(f"配置文件路径检查通过, 路径: {self.__config_paths}", self.TraceLevel.INFO)
return True return True
@@ -67,22 +73,28 @@ class AutoLibWorker(MsgBase, QThread):
) -> bool: ) -> bool:
self._showTrace( self._showTrace(
f"正在加载配置文件, 运行配置文件路径: {self.__config_paths["run"]}" f"正在加载配置文件, 运行配置文件路径: {self.__config_paths["run"]}",
no_log=True
) )
self.__run_config = JSONReader(self.__config_paths["run"]).data() self.__run_config = JSONReader(self.__config_paths["run"]).data()
self._showTrace( self._showTrace(
f"正在加载配置文件, 用户配置文件路径: {self.__config_paths["user"]}" f"正在加载配置文件, 用户配置文件路径: {self.__config_paths["user"]}",
no_log=True
) )
self.__user_config = JSONReader(self.__config_paths["user"]).data() self.__user_config = JSONReader(self.__config_paths["user"]).data()
if self.__run_config is None or self.__user_config is None: if self.__run_config is None or self.__user_config is None:
self._showTrace("配置文件加载失败, 请检查配置文件是否正确") self._showTrace(
self._showTrace("配置文件加载失败, 请检查配置文件是否正确") "配置文件加载失败, 请检查配置文件是否正确",
self.TraceLevel.ERROR
)
return False return False
if not self.__user_config.get("groups"): if not self.__user_config.get("groups"):
self._showTrace( self._showTrace(
"用户配置文件中无有效任务组, 请检查用户配置文件是否正确" "用户配置文件中无有效任务组, 请检查用户配置文件是否正确",
self.TraceLevel.WARNING
) )
return False return False
self._showLog(f"配置文件加载成功, 任务组数量: {len(self.__user_config.get('groups', []))}", self.TraceLevel.INFO)
return True return True
@@ -108,14 +120,17 @@ class AutoLibWorker(MsgBase, QThread):
groups = self.__user_config.get("groups") groups = self.__user_config.get("groups")
for group in groups: for group in groups:
if not group["enabled"]: if not group["enabled"]:
self._showTrace(f"任务组 {group["name"]} 已跳过") self._showTrace(f"任务组 {group["name"]} 已跳过", no_log=True)
continue continue
self._showTrace(f"正在运行任务组 {group["name"]}") self._showTrace(f"正在运行任务组 {group["name"]}", no_log=True)
auto_lib.run( auto_lib.run(
{ "users": group.get("users", []) } { "users": group.get("users", []) }
) )
except Exception as e: except Exception as e:
self._showTrace(f"AutoLibrary 运行时发生异常 : {e}") self._showTrace(
f"AutoLibrary 运行时发生异常 : {e}",
self.TraceLevel.ERROR
)
self.autoLibWorkerFinishedWithError.emit() self.autoLibWorkerFinishedWithError.emit()
return return
if auto_lib: if auto_lib:
@@ -154,7 +169,10 @@ class TimerTaskWorker(AutoLibWorker):
self self
): ):
self._showTrace(f"定时任务 {self.__timer_task['name']} 运行时发生异常") self._showTrace(
f"定时任务 {self.__timer_task['name']} 运行时发生异常",
self.TraceLevel.ERROR
)
self.timerTaskWorkerIsFinished.emit(True, self.__timer_task) self.timerTaskWorkerIsFinished.emit(True, self.__timer_task)
@Slot() @Slot()
+41 -18
View File
@@ -54,7 +54,7 @@ class AutoLib(MsgBase):
self self
) -> bool: ) -> bool:
self._showTrace("正在初始化浏览器驱动......") self._showTrace("正在初始化浏览器驱动......", no_log=True)
web_driver_config = self.__run_config.get("web_driver", None) web_driver_config = self.__run_config.get("web_driver", None)
self.__driver_type = web_driver_config.get("driver_type") self.__driver_type = web_driver_config.get("driver_type")
@@ -66,11 +66,14 @@ class AutoLib(MsgBase):
case "firefox": case "firefox":
driver_options = webdriver.FirefoxOptions() driver_options = webdriver.FirefoxOptions()
case _: case _:
self._showTrace(f"不支持的浏览器驱动类型: {self.__driver_type} !") self._showTrace(
f"不支持的浏览器驱动类型: {self.__driver_type} !",
self.TraceLevel.WARNING
)
return False return False
if not web_driver_config: if not web_driver_config:
self._showTrace("未配置浏览器驱动参数 !") self._showTrace("未配置浏览器驱动参数 !", self.TraceLevel.ERROR)
return False return False
if web_driver_config.get("headless"): if web_driver_config.get("headless"):
driver_options.add_argument("--headless") driver_options.add_argument("--headless")
@@ -110,7 +113,7 @@ class AutoLib(MsgBase):
# init browser driver # init browser driver
self.__driver_path = web_driver_config.get("driver_path") self.__driver_path = web_driver_config.get("driver_path")
if not self.__driver_path: if not self.__driver_path:
self._showTrace("未配置浏览器驱动路径 !") self._showTrace("未配置浏览器驱动路径 !", self.TraceLevel.WARNING)
return False return False
self.__driver_path = os.path.abspath(self.__driver_path) self.__driver_path = os.path.abspath(self.__driver_path)
try: try:
@@ -123,18 +126,18 @@ class AutoLib(MsgBase):
service = ChromeService(executable_path=self.__driver_path) service = ChromeService(executable_path=self.__driver_path)
self.__driver = webdriver.Chrome(service=service, options=driver_options) self.__driver = webdriver.Chrome(service=service, options=driver_options)
case "firefox": case "firefox":
self._showTrace(f"Firefox 浏览器驱动初始化略慢, 请耐心等待...") self._showTrace(f"Firefox 浏览器驱动初始化略慢, 请耐心等待...", no_log=True)
service = FirefoxService(executable_path=self.__driver_path) service = FirefoxService(executable_path=self.__driver_path)
self.__driver = webdriver.Firefox(service=service, options=driver_options) self.__driver = webdriver.Firefox(service=service, options=driver_options)
case _: # actually will not happen, beacuse we have checked it at the initlization case _: # actually will not happen, beacuse we have checked it at the initlization
# of 'driver_options' # of 'driver_options'
raise Exception(f"不支持的浏览器驱动类型: {self.__driver_type}") raise Exception(f"不支持的浏览器驱动类型: {self.__driver_type} !")
self.__driver.implicitly_wait(1) self.__driver.implicitly_wait(1)
self.__driver.execute_script( self.__driver.execute_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})" "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
) )
except Exception as e: except Exception as e:
self._showTrace(f"浏览器驱动初始化失败: {e}") self._showTrace(f"浏览器驱动初始化失败: {e}", self.TraceLevel.ERROR)
return False return False
self._showTrace(f"浏览器驱动已初始化, 类型: {self.__driver_type}, 路径: {self.__driver_path}") self._showTrace(f"浏览器驱动已初始化, 类型: {self.__driver_type}, 路径: {self.__driver_path}")
return True return True
@@ -145,7 +148,7 @@ class AutoLib(MsgBase):
): ):
if not self.__driver: if not self.__driver:
self._showTrace(f"浏览器驱动未初始化, 请先初始化浏览器驱动 !") self._showTrace(f"浏览器驱动未初始化, 请先初始化浏览器驱动 !", self.TraceLevel.WARNING)
return return
self.__lib_checker = LibChecker(self._input_queue, self._output_queue, self.__driver) self.__lib_checker = LibChecker(self._input_queue, self._output_queue, self.__driver)
self.__lib_login = LibLogin(self._input_queue, self._output_queue, self.__driver) self.__lib_login = LibLogin(self._input_queue, self._output_queue, self.__driver)
@@ -178,7 +181,7 @@ class AutoLib(MsgBase):
) )
return True return True
except: except:
self._showTrace(f"登录页面加载失败 !") self._showTrace(f"登录页面加载失败 !", self.TraceLevel.ERROR)
return False return False
@@ -188,7 +191,7 @@ class AutoLib(MsgBase):
lib_config = self.__run_config.get("library", None) lib_config = self.__run_config.get("library", None)
if not lib_config: if not lib_config:
self._showTrace("未配置图书馆参数 !") self._showTrace("未配置图书馆参数 !", self.TraceLevel.ERROR)
return False return False
url = lib_config.get("host_url") + lib_config.get("login_url") url = lib_config.get("host_url") + lib_config.get("login_url")
self.__driver.set_page_load_timeout(5) self.__driver.set_page_load_timeout(5)
@@ -196,7 +199,9 @@ class AutoLib(MsgBase):
self.__driver.get(url) self.__driver.get(url)
except TimeoutException: except TimeoutException:
self.__driver.execute_script("window.stop();") self.__driver.execute_script("window.stop();")
self._showTrace(f"图书馆登录页面加载超时 ! 请检查网络环境是否正常") self._showTrace(
f"图书馆登录页面加载超时 ! 请检查网络环境是否正常", self.TraceLevel.ERROR
)
return False return False
if not self.__waitResponseLoad(): if not self.__waitResponseLoad():
return False return False
@@ -240,8 +245,10 @@ class AutoLib(MsgBase):
else: else:
self._showTrace(f"用户 {username} 无法预约,已跳过") self._showTrace(f"用户 {username} 无法预约,已跳过")
result = 2 result = 2
# checkin # checkin
if run_mode["auto_checkin"] and result != 1: last_result = result
if run_mode["auto_checkin"] and last_result != 1:
if self.__lib_checker.canCheckin(): if self.__lib_checker.canCheckin():
if self.__lib_checkin.checkin(username): if self.__lib_checkin.checkin(username):
result = 0 result = 0
@@ -250,20 +257,31 @@ class AutoLib(MsgBase):
else: else:
self._showTrace(f"用户 {username} 无法签到,已跳过") self._showTrace(f"用户 {username} 无法签到,已跳过")
result = 2 result = 2
if last_result == 0: # partly success
result = 0
# renewal # renewal
if run_mode["auto_renewal"] and result != 1: last_result = result
if run_mode["auto_renewal"] and last_result != 1:
can_renew, record = self.__lib_checker.canRenew() can_renew, record = self.__lib_checker.canRenew()
if can_renew: if can_renew:
if self.__lib_renew.renew(username, record, reserve_info): if self.__lib_renew.renew(username, record, reserve_info):
if self.__lib_checker.postRenewCheck(record): if self.__lib_checker.postRenewCheck(record):
self._showTrace(f"用户 {username} 续约成功 !")
result = 0 result = 0
else: else:
result = 1 if result != 1: # partly success
result = 0
else:
result = 1
else: else:
result = 1 result = 1
else: else:
self._showTrace(f"用户 {username} 无法续约,已跳过") self._showTrace(f"用户 {username} 无法续约,已跳过")
result = 2 result = 2
if last_result == 0: # partly success
result = 0
# logout # logout
if not self.__lib_logout.logout( if not self.__lib_logout.logout(
username username
@@ -288,7 +306,8 @@ class AutoLib(MsgBase):
for user in users: for user in users:
user_counter["current"] += 1 user_counter["current"] += 1
self._showTrace( self._showTrace(
f"正在处理第 {user_counter["current"]}/{len(users)} 个用户: {user["username"]}......" f"正在处理第 {user_counter["current"]}/{len(users)} 个用户: {user["username"]}......",
no_log=True
) )
if not user["enabled"]: if not user["enabled"]:
self._showTrace(f"用户 {user["username"]} 已跳过") self._showTrace(f"用户 {user["username"]} 已跳过")
@@ -303,7 +322,8 @@ class AutoLib(MsgBase):
) )
if r == -1: if r == -1:
self._showTrace( self._showTrace(
f"用户 {user["username"]} 处理过程中页面发生异常,无法继续操作, 任务已终止 !" f"用户 {user["username"]} 处理过程中页面发生异常,无法继续操作, 任务已终止 !",
self.TraceLevel.WARNING
) )
break break
elif r == 0: elif r == 0:
@@ -326,11 +346,14 @@ class AutoLib(MsgBase):
if self.__driver: if self.__driver:
if self.__driver_type.lower() == "firefox": if self.__driver_type.lower() == "firefox":
self._showTrace(f"Firefox 浏览器驱动关闭略慢, 请耐心等待...") self._showTrace(
f"Firefox 浏览器驱动关闭略慢, 请耐心等待...",
no_log=True
)
self.__driver.quit() self.__driver.quit()
self.__driver = None self.__driver = None
self._showTrace(f"浏览器驱动已关闭") self._showTrace(f"浏览器驱动已关闭")
return True return True
else: else:
self._showTrace(f"浏览器驱动未初始化, 无需关闭") self._showTrace(f"浏览器驱动未初始化, 无需关闭", no_log=True)
return False return False
+8 -7
View File
@@ -63,7 +63,7 @@ class LibChecker(LibOperator):
EC.presence_of_element_located((By.CLASS_NAME, "myReserveList")) EC.presence_of_element_located((By.CLASS_NAME, "myReserveList"))
) )
except: except:
self._showTrace("加载预约记录页面失败 !") self._showTrace("加载预约记录页面失败 !", self.TraceLevel.ERROR)
return False return False
return True return True
@@ -174,7 +174,7 @@ class LibChecker(LibOperator):
) )
return reservations return reservations
except: except:
self._showTrace("加载预约记录失败 !") self._showTrace("加载预约记录失败 !", self.TraceLevel.ERROR)
return None return None
@@ -197,10 +197,10 @@ class LibChecker(LibOperator):
self.__driver.execute_script("arguments[0].click();", more_btn) self.__driver.execute_script("arguments[0].click();", more_btn)
return True return True
else: else:
self._showTrace("用户无法加载更多预约记录") self._showTrace("用户无法加载更多预约记录", self.TraceLevel.WARNING)
return False return False
except: except:
self._showTrace("加载更多预约记录失败 !") self._showTrace("加载更多预约记录失败 !", self.TraceLevel.ERROR)
return False return False
@@ -211,9 +211,9 @@ class LibChecker(LibOperator):
) -> dict: ) -> dict:
if wanted_date is None: if wanted_date is None:
self._showTrace("日期未指定, 无法检查当前预约状态") self._showTrace("日期未指定, 无法检查当前预约状态", self.TraceLevel.WARNING)
return None return None
self._showTrace(f"正在检查用户在 {wanted_date} 是否有预约状态为 {wanted_status} 的预约记录......") self._showTrace(f"正在检查用户在 {wanted_date} 是否有预约状态为 {wanted_status} 的预约记录......", no_log=True)
checked_count = 0 checked_count = 0
max_check_times = 6 # we only check (4*(6-1)=)20 reservations, the last time cant be checked max_check_times = 6 # we only check (4*(6-1)=)20 reservations, the last time cant be checked
@@ -245,7 +245,8 @@ class LibChecker(LibOperator):
self._showTrace( self._showTrace(
f"寻找到用户第 {checked_count} 条状态为 {wanted_status} 的预约记录, " f"寻找到用户第 {checked_count} 条状态为 {wanted_status} 的预约记录, "
f"详细信息: {record["date"]} " f"详细信息: {record["date"]} "
f"{record["time"]["begin"]} - {record["time"]["end"]} {record["info"]["location"]}" f"{record["time"]["begin"]} - {record["time"]["end"]} {record["info"]["location"]}",
no_log=True
) )
return record return record
if not self.__showMoreReserveRecords(): if not self.__showMoreReserveRecords():
+9 -9
View File
@@ -51,7 +51,7 @@ class LibCheckin(LibOperator):
) )
ok_btn = self.__driver.find_element(By.CLASS_NAME, "btnOK") ok_btn = self.__driver.find_element(By.CLASS_NAME, "btnOK")
except: except:
self._showTrace("签到时发生未知错误 !") self._showTrace("签到时发生未知错误 !", self.TraceLevel.ERROR)
return False return False
result_message = result_message_element.text result_message = result_message_element.text
if "签到成功" in result_message: if "签到成功" in result_message:
@@ -107,9 +107,9 @@ class LibCheckin(LibOperator):
result = self.__driver.execute_script(script) result = self.__driver.execute_script(script)
time.sleep(0.1) time.sleep(0.1)
if result: if result:
self._showTrace("签到按钮已启用") self._showTrace("签到按钮已启用", no_log=True)
else: else:
self._showTrace("签到按钮启用失败") self._showTrace("签到按钮启用失败", self.TraceLevel.WARNING)
return result return result
@@ -119,24 +119,24 @@ class LibCheckin(LibOperator):
) -> bool: ) -> bool:
if self.__driver is None: if self.__driver is None:
self._showTrace("未提供有效 WebDriver 实例 !") self._showTrace("未提供有效 WebDriver 实例 !", self.TraceLevel.WARNING)
return False return False
try: try:
checkin_btn = WebDriverWait(self.__driver, 2).until( checkin_btn = WebDriverWait(self.__driver, 2).until(
EC.element_to_be_clickable((By.ID, "btnCheckIn")) EC.element_to_be_clickable((By.ID, "btnCheckIn"))
) )
except: except:
self._showTrace(f"用户 {username} 签到界面加载失败 !") self._showTrace(f"用户 {username} 签到界面加载失败 !", self.TraceLevel.ERROR)
return False return False
if "disabled" in checkin_btn.get_attribute("class"): if "disabled" in checkin_btn.get_attribute("class"):
self._showTrace("签到按钮不可用, 可能不在场馆内, 正在尝试启用......") self._showTrace("签到按钮不可用, 可能不在场馆内, 正在尝试启用......", no_log=True)
if not self.__enableCheckinBtn(): if not self.__enableCheckinBtn():
self._showTrace(f"签到按钮启用失败 !") self._showTrace(f"签到按钮启用失败 !", self.TraceLevel.ERROR)
return False return False
checkin_btn.click() checkin_btn.click()
if self._waitResponseLoad(): if self._waitResponseLoad():
self._showTrace(f"用户 {username} 签到成功 !") self._showTrace(f"用户 {username} 签到成功 !", no_log=True)
return True return True
else: else:
self._showTrace(f"用户 {username} 签到失败 !") self._showTrace(f"用户 {username} 签到失败 !", self.TraceLevel.ERROR)
return False return False
+23 -15
View File
@@ -52,7 +52,10 @@ class LibLogin(LibOperator):
) )
return True return True
except: except:
self._showTrace(f"登录页面加载失败 ! : 用户账号或者密码错误/验证码错误, 具体以页面提示为准") self._showTrace(
f"登录页面加载失败 ! : 用户账号或者密码错误/验证码错误, 具体以页面提示为准",
self.TraceLevel.ERROR
)
return False return False
@@ -71,7 +74,7 @@ class LibLogin(LibOperator):
password_element.clear() password_element.clear()
password_element.send_keys(password) password_element.send_keys(password)
except Exception as e: except Exception as e:
self._showTrace(f"用户名或密码填写失败 ! : {e}") self._showTrace(f"用户名或密码填写失败 ! : {e}", self.TraceLevel.ERROR)
return False return False
return True return True
@@ -88,12 +91,13 @@ class LibLogin(LibOperator):
captcha_img = base64.b64decode(base64_str) captcha_img = base64.b64decode(base64_str)
captcha_text = self.__ddddocr.classification(captcha_img) captcha_text = self.__ddddocr.classification(captcha_img)
captcha_text = ''.join(filter(str.isalnum, captcha_text)).lower() captcha_text = ''.join(filter(str.isalnum, captcha_text)).lower()
self._showTrace(f"识别到验证码为 : '{captcha_text}'") self._showTrace(f"识别到验证码为 : '{captcha_text}'", no_log=True)
if len(captcha_text) != 4: if len(captcha_text) != 4:
self._showLog("识别到的验证码长度不等于 4 个字符 !", self.TraceLevel.WARNING)
raise Exception("识别到的验证码长度不等于 4 个字符 !") raise Exception("识别到的验证码长度不等于 4 个字符 !")
return captcha_text return captcha_text
except Exception as e: except Exception as e:
self._showTrace(f"验证码识别失败 ! : {e}") self._showTrace(f"验证码识别失败 ! : {e}", self.TraceLevel.ERROR)
return "" return ""
@@ -105,12 +109,13 @@ class LibLogin(LibOperator):
try: try:
self._showMsg("请输入验证码:") self._showMsg("请输入验证码:")
captcha_text = self._waitMsg(timeout=15) captcha_text = self._waitMsg(timeout=15)
self._showTrace(f"输入的验证码为 : '{captcha_text}'") self._showTrace(f"输入的验证码为 : '{captcha_text}'", no_log=True)
if len(captcha_text) != 4: if len(captcha_text) != 4:
self._showLog("输入的验证码长度不等于 4 个字符 !", self.TraceLevel.WARNING)
raise Exception("输入的验证码长度不等于 4 个字符 !") raise Exception("输入的验证码长度不等于 4 个字符 !")
return captcha_text return captcha_text
except Exception as e: except Exception as e:
self._showTrace(f"输入验证码失败 ! : {e}") self._showTrace(f"输入验证码失败 ! : {e}", self.TraceLevel.ERROR)
return "" return ""
@@ -120,13 +125,13 @@ class LibLogin(LibOperator):
# refresh captcha # refresh captcha
try: try:
self._showTrace("刷新验证码......") self._showTrace("刷新验证码......", no_log=True)
self.__driver.find_element( self.__driver.find_element(
By.ID, "loadImgId" By.ID, "loadImgId"
).click() ).click()
return True return True
except Exception as e: except Exception as e:
self._showTrace(f"刷新验证码失败 ! : {e}") self._showTrace(f"刷新验证码失败 ! : {e}", self.TraceLevel.ERROR)
return False return False
@@ -140,14 +145,17 @@ class LibLogin(LibOperator):
if auto_captcha: if auto_captcha:
captcha_text = self.__autoRecognizeCaptcha() captcha_text = self.__autoRecognizeCaptcha()
else: else:
self._showTrace(f"用户未配置自动识别验证码, 请手动输入验证码 !") self._showTrace(f"用户未配置自动识别验证码, 请手动输入验证码 !", no_log=True)
captcha_text = self.__manualRecognizeCaptcha() captcha_text = self.__manualRecognizeCaptcha()
if captcha_text: if captcha_text:
return captcha_text return captcha_text
else: else:
if not self.__refreshCaptcha(): if not self.__refreshCaptcha():
return "" return ""
self._showTrace(f"验证码识别失败 {max_attempts} 次, 达到最大尝试次数 !") self._showTrace(
f"验证码识别失败 {max_attempts} 次, 达到最大尝试次数 !",
self.TraceLevel.WARNING
)
return "" return ""
@@ -162,7 +170,7 @@ class LibLogin(LibOperator):
captcha_element.send_keys(captcha_text) captcha_element.send_keys(captcha_text)
return True return True
except Exception as e: except Exception as e:
self._showTrace(f"验证码填写失败 ! : {e}") self._showTrace(f"验证码填写失败 ! : {e}", self.TraceLevel.ERROR)
return False return False
@@ -175,11 +183,11 @@ class LibLogin(LibOperator):
) -> bool: ) -> bool:
if self.__driver is None: if self.__driver is None:
self._showTrace("未提供有效 WebDriver 实例 !") self._showTrace("未提供有效 WebDriver 实例 !", self.TraceLevel.WARNING)
return False return False
# begin login process # begin login process
for attempt in range(max_attempts): for attempt in range(max_attempts):
self._showTrace(f"用户 {username}{attempt + 1} 次尝试登录......") self._showTrace(f"用户 {username}{attempt + 1} 次尝试登录......", no_log=True)
if not self.__fillLogInElements( if not self.__fillLogInElements(
username, username,
password, password,
@@ -190,7 +198,7 @@ class LibLogin(LibOperator):
continue continue
if not self.__fillCaptchaElement(captcha_text): if not self.__fillCaptchaElement(captcha_text):
continue continue
self._showTrace("尝试登录...") self._showTrace("尝试登录...", no_log=True)
try: try:
self.__driver.find_element( self.__driver.find_element(
By.XPATH, By.XPATH,
@@ -203,5 +211,5 @@ class LibLogin(LibOperator):
self._showTrace(f"用户 {username}{attempt + 1} 次登录成功 !") self._showTrace(f"用户 {username}{attempt + 1} 次登录成功 !")
return True return True
else: else:
self._showTrace(f"用户 {username}{attempt + 1} 次登录失败 !") self._showTrace(f"用户 {username}{attempt + 1} 次登录失败 !",self.TraceLevel.WARNING)
return False return False
+2 -2
View File
@@ -42,7 +42,7 @@ class LibLogout(LibOperator):
) -> bool: ) -> bool:
if self.__driver is None: if self.__driver is None:
self._showTrace("未提供有效 WebDriver 实例 !") self._showTrace("未提供有效 WebDriver 实例 !", self.TraceLevel.WARNING)
return False return False
try: try:
self.__driver.find_element( self.__driver.find_element(
@@ -51,5 +51,5 @@ class LibLogout(LibOperator):
self._showTrace(f"用户 {username} 注销成功 !") self._showTrace(f"用户 {username} 注销成功 !")
return True return True
except Exception as e: except Exception as e:
self._showTrace(f"用户 {username} 注销失败 ! : {e}") self._showTrace(f"用户 {username} 注销失败 ! : {e}", self.TraceLevel.ERROR)
return False return False
+17 -15
View File
@@ -54,14 +54,14 @@ class LibRenew(LibTimeSelector):
EC.presence_of_element_located((By.CSS_SELECTOR, "#extendDiv div.resultMessage")) EC.presence_of_element_located((By.CSS_SELECTOR, "#extendDiv div.resultMessage"))
) )
except: except:
self._showTrace("续约时间选择界面加载失败 !") self._showTrace("续约时间选择界面加载失败 !", self.TraceLevel.ERROR)
return False return False
head_message = head_message.text.strip() head_message = head_message.text.strip()
if "警告" in head_message: if "警告" in head_message:
result_message = result_message.text.strip() result_message = result_message.text.strip()
self._showTrace(f"\n"\ self._showTrace(f"\n"\
f" 续约失败 !\n"\ f" 续约失败 !\n"\
f" {result_message}") f" {result_message}", no_log=True)
return False return False
try: try:
WebDriverWait(self.__driver, 2).until( WebDriverWait(self.__driver, 2).until(
@@ -73,7 +73,7 @@ class LibRenew(LibTimeSelector):
EC.presence_of_element_located((By.CSS_SELECTOR, "#extendDiv .btnOK")) EC.presence_of_element_located((By.CSS_SELECTOR, "#extendDiv .btnOK"))
) )
except: except:
self._showTrace("续约时间选择界面加载失败 !") self._showTrace("续约时间选择界面加载失败 !", self.TraceLevel.ERROR)
return False return False
return True return True
@@ -91,7 +91,7 @@ class LibRenew(LibTimeSelector):
renew_info = reserve_info["renew_time"] renew_info = reserve_info["renew_time"]
max_diff = renew_info["max_diff"] max_diff = renew_info["max_diff"]
prefer_earlier = renew_info["prefer_early"] prefer_earlier = renew_info["prefer_early"]
target_renew_mins = self._timeToMins(end_time) + renew_info["expect_duration"]*60 target_renew_mins = self._timeStrToMins(end_time) + renew_info["expect_duration"]*60
# Validate and adjust target renew time to library closing time # Validate and adjust target renew time to library closing time
if not self.__validateAndAdjustRenewTime(end_time, target_renew_mins): if not self.__validateAndAdjustRenewTime(end_time, target_renew_mins):
@@ -99,7 +99,7 @@ class LibRenew(LibTimeSelector):
renew_ok_btn = self.__driver.find_element(By.CSS_SELECTOR, "#extendDiv .btnOK") renew_ok_btn = self.__driver.find_element(By.CSS_SELECTOR, "#extendDiv .btnOK")
renew_time_opts = self.__driver.find_elements(By.CSS_SELECTOR, "#extendDiv .renewal_List li") renew_time_opts = self.__driver.find_elements(By.CSS_SELECTOR, "#extendDiv .renewal_List li")
if not renew_time_opts: if not renew_time_opts:
self._showTrace("当前未查询到可用续约时间 !") self._showTrace("当前未查询到可用续约时间 !", self.TraceLevel.WARNING)
return False return False
# Find best renewal time option # Find best renewal time option
@@ -110,7 +110,8 @@ class LibRenew(LibTimeSelector):
return self.__confirmRenewal(best_opt, best_text, actual_diff, record, renew_ok_btn) return self.__confirmRenewal(best_opt, best_text, actual_diff, record, renew_ok_btn)
self._showTrace( self._showTrace(
"无法选择最近的可用续约时间 ! " "无法选择最近的可用续约时间 ! "
f"所有可选时间与目标时间相差都超过了 {max_diff} 分钟 !" f"所有可选时间与目标时间相差都超过了 {max_diff} 分钟 !",
self.TraceLevel.WARNING
) )
self._showTrace(f"当前可供续约的时间有: {free_times}") self._showTrace(f"当前可供续约的时间有: {free_times}")
return False return False
@@ -127,12 +128,12 @@ class LibRenew(LibTimeSelector):
""" """
LIBRARY_CLOSE_TIME = 1410 # 23:30 in minutes LIBRARY_CLOSE_TIME = 1410 # 23:30 in minutes
if target_renew_mins > LIBRARY_CLOSE_TIME: if target_renew_mins > LIBRARY_CLOSE_TIME:
actual_renew_duration = LIBRARY_CLOSE_TIME - self._timeToMins(end_time) actual_renew_duration = LIBRARY_CLOSE_TIME - self._timeStrToMins(end_time)
if actual_renew_duration <= 0: if actual_renew_duration <= 0:
self._showTrace(f"当前结束时间 {end_time} 已接近闭馆时间,无法续约 !") self._showTrace(f"当前结束时间 {end_time} 已接近闭馆时间,无法续约 !", self.TraceLevel.ERROR)
return False return False
self._showTrace( self._showTrace(
f"续约时间已调整至闭馆时间 {self._minsToTime(LIBRARY_CLOSE_TIME)}" f"续约时间已调整至闭馆时间 {self._minsToTimeStr(LIBRARY_CLOSE_TIME)}"
f"实际续约时长为 {actual_renew_duration//60} 小时 {actual_renew_duration%60} 分钟" f"实际续约时长为 {actual_renew_duration//60} 小时 {actual_renew_duration%60} 分钟"
) )
return True return True
@@ -163,7 +164,7 @@ class LibRenew(LibTimeSelector):
ok_btn.click() ok_btn.click()
return True return True
except: except:
self._showTrace("确认续约时发生错误 !") self._showTrace("确认续约时发生错误 !", self.TraceLevel.ERROR)
return False return False
@@ -175,28 +176,29 @@ class LibRenew(LibTimeSelector):
) -> bool: ) -> bool:
if self.__driver is None: if self.__driver is None:
self._showTrace("未提供有效 WebDriver 实例 !") self._showTrace("未提供有效 WebDriver 实例 !", self.TraceLevel.WARNING)
return False return False
try: try:
renew_btn = WebDriverWait(self.__driver, 2).until( renew_btn = WebDriverWait(self.__driver, 2).until(
EC.element_to_be_clickable((By.ID, "btnExtend")) EC.element_to_be_clickable((By.ID, "btnExtend"))
) )
except: except:
self._showTrace(f"用户 {username} 续约界面加载失败 !") self._showTrace(f"用户 {username} 续约界面加载失败 !", self.TraceLevel.ERROR)
return False return False
if "disabled" in renew_btn.get_attribute("class"): if "disabled" in renew_btn.get_attribute("class"):
self._showTrace(f"用户 {username} 续约按钮不可用, 可能不在场馆内, 请连接图书馆网络后重试") self._showLog(f"用户 {username} 续约按钮不可用, 可能不在场馆内")
self._showTrace(f"用户 {username} 续约按钮不可用, 可能不在场馆内, 请连接图书馆网络后重试", no_log=True)
return False return False
renew_btn.click() renew_btn.click()
if not self.__waitRenewDialog(): if not self.__waitRenewDialog():
self._showTrace(f"用户 {username} 续约失败 !") self._showTrace(f"用户 {username} 续约失败 !", self.TraceLevel.ERROR)
# After the renewal, the webpage will display a mask overlay, # After the renewal, the webpage will display a mask overlay,
# so we need to refresh the page for subsequent operations. # so we need to refresh the page for subsequent operations.
self.__driver.refresh() self.__driver.refresh()
return False return False
if not self.__selectNearestTime(record, reserve_info): if not self.__selectNearestTime(record, reserve_info):
self._showTrace(f"用户 {username} 续约失败 !") self._showTrace(f"用户 {username} 续约失败 !", self.TraceLevel.ERROR)
self.__driver.refresh() self.__driver.refresh()
return False return False
if self._waitResponseLoad(): if self._waitResponseLoad():
+101 -67
View File
@@ -72,13 +72,13 @@ class LibReserve(LibTimeSelector):
By.CSS_SELECTOR, ".layoutSeat dd" By.CSS_SELECTOR, ".layoutSeat dd"
) )
if not content_elements: if not content_elements:
self._showTrace("未找到预约结果") self._showTrace("未找到预约结果", self.TraceLevel.WARNING)
raise raise
title = title_elements[0].text if title_elements else "" title = title_elements[0].text if title_elements else ""
contents = [element.text for element in content_elements if element.text.strip()] contents = [element.text for element in content_elements if element.text.strip()]
for message in contents: for message in contents:
if "预约失败" in message or "已有1个有效预约" in message: if "预约失败" in message or "已有1个有效预约" in message:
self._showTrace(f"预约失败 - {"".join(contents)}") self._showTrace(f"预约失败 - {"".join(contents)}", self.TraceLevel.ERROR)
raise raise
if "预定好了" in title or "预约成功" in title or "操作成功" in title: if "预定好了" in title or "预约成功" in title or "操作成功" in title:
if len(contents) >= 6: if len(contents) >= 6:
@@ -96,7 +96,7 @@ class LibReserve(LibTimeSelector):
) )
return True return True
except: except:
self._showTrace(f"预约结果加载失败 !") self._showTrace(f"预约结果加载失败 !", self.TraceLevel.ERROR)
return False return False
@@ -107,6 +107,8 @@ class LibReserve(LibTimeSelector):
try: try:
# must contain the required infomation # must contain the required infomation
# key 'place' is no need to check
# because 'place' is only has one possible value '1' or '图书馆'
if reserve_info.get("floor") is None: # if existence ? if reserve_info.get("floor") is None: # if existence ?
raise ValueError("未指定楼层") raise ValueError("未指定楼层")
if reserve_info["floor"] not in self.__floor_map: # if in the mao ? if reserve_info["floor"] not in self.__floor_map: # if in the mao ?
@@ -123,7 +125,13 @@ class LibReserve(LibTimeSelector):
except ValueError as e: except ValueError as e:
self._showTrace( self._showTrace(
f"预约信息错误 ! : {e}, "\ f"预约信息错误 ! : {e}, "\
f"由于缺少必要的预约信息, 无法开始预约流程, 请检查预约信息是否完整" f"由于缺少必要的预约信息, 无法开始预约流程",
self.TraceLevel.ERROR
)
self._showTrace(
f"预约信息错误 ! : {e}, "\
f"由于缺少必要的预约信息, 无法开始预约流程, 请检查预约信息是否完整",
no_log=True
) )
return False return False
@@ -133,17 +141,20 @@ class LibReserve(LibTimeSelector):
reserve_info: dict reserve_info: dict
) -> bool: ) -> bool:
cur_date = time.strftime("%Y-%m-%d", time.localtime()) cur_date_str = time.strftime("%Y-%m-%d", time.localtime())
cur_timestamp = time.mktime(time.strptime(cur_date_str, "%Y-%m-%d"))
if reserve_info.get("date") is None: if reserve_info.get("date") is None:
reserve_info["date"] = cur_date reserve_info["date"] = cur_date_str
self._showTrace(f"预约日期未指定, 自动设置为当前日期: {cur_date}") self._showTrace(f"预约日期未指定, 自动设置为当前日期: {cur_date_str}")
else: else:
if reserve_info["date"] < cur_date: res_timestamp = time.mktime(time.strptime(reserve_info["date"], "%Y-%m-%d"))
if res_timestamp < cur_timestamp:
self._showTrace( self._showTrace(
f"预约日期错误 ! :"\ f"预约日期错误 ! :"\
f"{reserve_info['date']} 早于当前日期 {cur_date}, 自动设置为当前日期" f"{reserve_info['date']} 早于当前日期 {cur_date_str}, 自动设置为当前日期",
self.TraceLevel.WARNING
) )
reserve_info["date"] = cur_date reserve_info["date"] = cur_date_str
return True return True
@@ -190,10 +201,13 @@ class LibReserve(LibTimeSelector):
if reserve_info.get("end_time") is None: if reserve_info.get("end_time") is None:
reserve_info["end_time"] = {} reserve_info["end_time"] = {}
if "time" not in reserve_info["end_time"]: if "time" not in reserve_info["end_time"]:
end_mins = self._timeToMins(reserve_info["begin_time"]["time"]) # here we add the expect duration to the begin time first,
# the edge case that the end time is later than 23:30 will
# be handled in __finalCheck. so no need to concern about it.
end_mins = self._timeStrToMins(reserve_info["begin_time"]["time"])
end_mins = end_mins + int(reserve_info["expect_duration"]*60) end_mins = end_mins + int(reserve_info["expect_duration"]*60)
reserve_info["end_time"] = { reserve_info["end_time"] = {
"time": self._minsToTime(end_mins), "time": self._minsToTimeStr(end_mins),
"max_diff": 30, "max_diff": 30,
"prefer_early": False "prefer_early": False
} }
@@ -215,32 +229,39 @@ class LibReserve(LibTimeSelector):
): ):
begin_time, end_time = reserve_info["begin_time"], reserve_info["end_time"] begin_time, end_time = reserve_info["begin_time"], reserve_info["end_time"]
begin_mins = self._timeToMins(begin_time["time"]) begin_mins = self._timeStrToMins(begin_time["time"])
end_mins = self._timeToMins(end_time["time"]) end_mins = self._timeStrToMins(end_time["time"])
# if end time is earlier than begin_time, exchange them # if end time is earlier than begin_time, exchange them
if end_mins < begin_mins: # except that the user has set the satisfy_duration to True
if end_mins < begin_mins and reserve_info["satisfy_duration"] is False:
self._showTrace( self._showTrace(
f"结束时间 {end_time['time']} 早于开始时间 {begin_time['time']}, 尝试交换时间" f"结束时间 {end_time['time']} 早于开始时间 {begin_time['time']}, 尝试交换时间",
self.TraceLevel.WARNING
) )
reserve_info["end_time"] = begin_time reserve_info["end_time"], reserve_info["begin_time"] = begin_time, end_time
reserve_info["begin_time"] = end_time begin_time, end_time = end_time, begin_time
begin_time, end_time = reserve_info["begin_time"], reserve_info["end_time"] begin_mins = self._timeStrToMins(begin_time["time"])
begin_mins = self._timeToMins(begin_time["time"]) end_mins = self._timeStrToMins(end_time["time"])
end_mins = self._timeToMins(end_time["time"])
# ensure the end time is not later than 23:30 # ensure the end time is not later than 23:30
if end_mins > self._timeToMins("23:30"): max_end_mins = self._timeStrToMins("23:30")
if end_mins > max_end_mins:
self._showTrace( self._showTrace(
f"结束时间 {end_time['time']} 晚于 23:30, 自动设置为 23:30" f"结束时间 {end_time['time']} 晚于 23:30, 自动设置为 23:30",
self.TraceLevel.WARNING
) )
reserve_info["end_time"]["time"] = "23:30" reserve_info["end_time"]["time"] = "23:30"
end_mins = self._timeToMins("23:30") end_mins = max_end_mins
# ensure the duration is not longer than 8 hours # ensure the duration is not longer than 8 hours
if reserve_info["satisfy_duration"]: if reserve_info["satisfy_duration"]:
if reserve_info["expect_duration"] > 8: if reserve_info["expect_duration"] > 8:
self._showTrace( self._showTrace(
f"该用户设置了优先满足时长要求, 但是预约期望持续时间 " f"该用户设置了优先满足时长要求, 但是预约期望持续时间 "
f"{reserve_info['expect_duration']} 小时 " f"{reserve_info['expect_duration']} 小时 "
f"超出最大时长 8 小时, 自动设置为 8 小时" f"超出最大时长 8 小时, 自动设置为 8 小时",
self.TraceLevel.WARNING
) )
reserve_info["expect_duration"] = 8 reserve_info["expect_duration"] = 8
else: else:
@@ -248,9 +269,10 @@ class LibReserve(LibTimeSelector):
self._showTrace( self._showTrace(
f"该用户未设置优先满足时长要求, 但是检查到预约持续时间 " f"该用户未设置优先满足时长要求, 但是检查到预约持续时间 "
f"{float((end_mins - begin_mins)/60)} 小时 " f"{float((end_mins - begin_mins)/60)} 小时 "
f"超出最大时长 8 小时, 自动设置为 8 小时" f"超出最大时长 8 小时, 自动设置为 8 小时",
self.TraceLevel.WARNING
) )
reserve_info["end_time"]["time"] = self._minsToTime(begin_mins + 8*60) reserve_info["end_time"]["time"] = self._minsToTimeStr(begin_mins + 8*60)
return True return True
@@ -274,8 +296,8 @@ class LibReserve(LibTimeSelector):
self._showTrace( self._showTrace(
f"预约信息检查完成, 准备预约 " f"预约信息检查完成, 准备预约 "
f"{reserve_info['date']} " f"{reserve_info['date']} "
f"{reserve_info['begin_time']["time"]} - " f"{reserve_info['begin_time']['time']} - "
f"{reserve_info['end_time']["time"]} " f"{reserve_info['end_time']['time']} "
f"图书馆 " f"图书馆 "
f"{self.__floor_map[reserve_info['floor']]} " f"{self.__floor_map[reserve_info['floor']]} "
f"{self.__room_map[reserve_info['room']]} " f"{self.__room_map[reserve_info['room']]} "
@@ -418,7 +440,7 @@ class LibReserve(LibTimeSelector):
EC.element_to_be_clickable((By.ID, "findRoom")) EC.element_to_be_clickable((By.ID, "findRoom"))
).click() ).click()
except: except:
self._showTrace("加载房间/区域失败 !") self._showTrace("加载房间/区域失败 !", self.TraceLevel.ERROR)
return False return False
# select room # select room
try: try:
@@ -428,7 +450,7 @@ class LibReserve(LibTimeSelector):
self._showTrace(f"房间 {display_room} 选择成功 !") self._showTrace(f"房间 {display_room} 选择成功 !")
return True return True
except: except:
self._showTrace(f"选择房间失败 ! : {display_room} 不可用") self._showTrace(f"选择房间失败 ! : {display_room} 不可用", self.TraceLevel.ERROR)
return False return False
@@ -446,7 +468,7 @@ class LibReserve(LibTimeSelector):
EC.presence_of_all_elements_located((By.CSS_SELECTOR, "li[id^='seat_']")) EC.presence_of_all_elements_located((By.CSS_SELECTOR, "li[id^='seat_']"))
) )
except: except:
self._showTrace(f"座位加载失败 !") self._showTrace(f"座位加载失败 !", self.TraceLevel.ERROR)
return False return False
try: try:
all_seats = self.__driver.find_elements( all_seats = self.__driver.find_elements(
@@ -464,9 +486,10 @@ class LibReserve(LibTimeSelector):
seat_status = seat_link.get_attribute("title") seat_status = seat_link.get_attribute("title")
self._showTrace(f"座位 {seat_id} 选择成功 ! : 当前状态 - '{seat_status}'") self._showTrace(f"座位 {seat_id} 选择成功 ! : 当前状态 - '{seat_status}'")
return True return True
self._showTrace(f"座位 {seat_id} 在该楼层区域中不存在, 请检查座位号是否正确") self._showLog(f"座位 {seat_id} 在该楼层区域中不存在", self.TraceLevel.WARNING)
self._showTrace(f"座位 {seat_id} 在该楼层区域中不存在, 请检查座位号是否正确", no_log=True)
except: except:
self._showTrace(f"座位选择失败 !") self._showTrace(f"座位选择失败 !", self.TraceLevel.ERROR)
return False return False
@@ -481,6 +504,9 @@ class LibReserve(LibTimeSelector):
""" """
Select the nearest available time option. Select the nearest available time option.
Returns:
int: The actual selected time value in minutes.
""" """
# Wait for time options to load # Wait for time options to load
try: try:
@@ -490,7 +516,7 @@ class LibReserve(LibTimeSelector):
) )
) )
except: except:
self._showTrace(f"{time_type} 选择失败 ! : 当前未查询到可用时间") self._showTrace(f"{time_type} 选择失败 ! : 当前未查询到可用时间", self.TraceLevel.ERROR)
return -1 return -1
# Find best time option # Find best time option
@@ -499,7 +525,7 @@ class LibReserve(LibTimeSelector):
f"#{time_id} ul li a" f"#{time_id} ul li a"
) )
if not all_time_opts: if not all_time_opts:
self._showTrace(f"{time_type} 选择失败 ! : 当前未查询到可用时间") self._showTrace(f"{time_type} 选择失败 ! : 当前未查询到可用时间", self.TraceLevel.ERROR)
return -1 return -1
best_opt, best_text, actual_diff, free_times = self._findBestTimeOption( best_opt, best_text, actual_diff, free_times = self._findBestTimeOption(
all_time_opts, target_time, max_time_diff, prefer_earlier, is_reserve=True all_time_opts, target_time, max_time_diff, prefer_earlier, is_reserve=True
@@ -515,8 +541,8 @@ class LibReserve(LibTimeSelector):
) )
return target_time return target_time
self._showTrace( self._showTrace(
f"无法选择最近的 {time_type} {self._minsToTime(target_time)}, " f"无法选择最近的 {time_type} {self._minsToTimeStr(target_time)}, "
f"所有可选时间与目标时间相差都超过 {max_time_diff} 分钟" f"所有可选时间与目标时间相差都超过 {max_time_diff} 分钟", self.TraceLevel.WARNING
) )
self._showTrace(f"当前可供预约的 {time_type} 有: {free_times}") self._showTrace(f"当前可供预约的 {time_type} 有: {free_times}")
return -1 return -1
@@ -526,51 +552,58 @@ class LibReserve(LibTimeSelector):
self, self,
begin_time: dict, begin_time: dict,
end_time: dict, end_time: dict,
expct_duration: int = 4, expect_duration: int = 4,
satisfy_duration: bool = True satisfy_duration: bool = True
) -> bool: ) -> bool:
"""Select seat begin and end time.""" """
expect_begin_time = actual_begin_time = begin_time["time"] Select seat begin and end time.
expect_end_time = actual_end_time = end_time["time"] """
expect_begin_mins = self._timeToMins(expect_begin_time) exp_beg_tm_str = begin_time["time"]
actual_begin_mins = expect_begin_mins exp_end_tm_str = end_time["time"]
expect_end_mins = self._timeToMins(expect_end_time) # Initialize actual time strings for logging
act_beg_tm_str = exp_beg_tm_str
act_end_tm_str = exp_end_tm_str
exp_beg_mins = self._timeStrToMins(exp_beg_tm_str)
act_beg_mins = exp_beg_mins
exp_end_mins = self._timeStrToMins(exp_end_tm_str)
act_end_mins = exp_end_mins
# Select begin time # Select begin time
if self.__selectNearestTime( act_beg_mins = self.__selectNearestTime(
time_id="startTime", time_id="startTime",
time_type="开始时间", time_type="开始时间",
target_time=expect_begin_mins, target_time=exp_beg_mins,
max_time_diff=begin_time["max_diff"], max_time_diff=begin_time["max_diff"],
prefer_earlier=begin_time["prefer_early"] prefer_earlier=begin_time["prefer_early"]
) == -1: )
if act_beg_mins == -1:
return False return False
actual_begin_time = self._minsToTime(expect_begin_mins) act_beg_tm_str = self._minsToTimeStr(act_beg_mins)
actual_begin_mins = self._timeToMins(actual_begin_time)
# If 'satisfy_duration' is True, select end time based on actual begin time # If 'satisfy_duration' is True, select end time based on actual begin time
if satisfy_duration: if satisfy_duration:
expect_end_mins = self.validateAndAdjustEndTime(actual_begin_mins, expct_duration) exp_end_mins = int(self.validateAndAdjustEndTime(act_beg_mins, expect_duration))
expect_end_time = self._minsToTime(expect_end_mins) exp_end_tm_str = self._minsToTimeStr(exp_end_mins)
self._showTrace( self._showTrace(
f"需要满足期望预约持续时间: {expct_duration} 小时, " f"需要满足期望预约持续时间: {expect_duration} 小时, "
f"根据开始时间 {actual_begin_time} 计算结束时间: {expect_end_time}" f"根据开始时间 {act_beg_tm_str} 计算结束时间: {exp_end_tm_str}"
) )
# Select end time # Select end time
if self.__selectNearestTime( act_end_mins = self.__selectNearestTime(
time_id="endTime", time_id="endTime",
time_type="结束时间", time_type="结束时间",
target_time=expect_end_mins, target_time=exp_end_mins,
max_time_diff=end_time["max_diff"], max_time_diff=end_time["max_diff"],
prefer_earlier=end_time["prefer_early"] prefer_earlier=end_time["prefer_early"]
) == -1: )
if act_end_mins == -1:
return False return False
actual_end_time = self._minsToTime(expect_end_mins) act_end_tm_str = self._minsToTimeStr(act_end_mins)
self._showTrace( self._showTrace(
f"期望预约时间段: {expect_begin_time} - {expect_end_time}, " f"期望预约时间段: {exp_beg_tm_str} - {exp_end_tm_str}, "
f"实际预约时间段: {actual_begin_time} - {actual_end_time}" f"实际预约时间段: {act_beg_tm_str} - {act_end_tm_str}"
) )
return True return True
@@ -584,12 +617,13 @@ class LibReserve(LibTimeSelector):
""" """
Validate and adjust reserve end time to library closing time if needed. Validate and adjust reserve end time to library closing time if needed.
""" """
LIBRARY_CLOSE_TIME = self._timeToMins("23:30") LIBRARY_CLOSE_TIME = self._timeStrToMins("23:30")
expect_end_mins = begin_mins + duration * 60 expect_end_mins = int(begin_mins + duration*60)
if expect_end_mins > LIBRARY_CLOSE_TIME: if expect_end_mins > LIBRARY_CLOSE_TIME:
expect_end_mins = LIBRARY_CLOSE_TIME expect_end_mins = LIBRARY_CLOSE_TIME
self._showTrace( self._showTrace(
f"预约持续时间 {duration} 小时, 超过最大预约时间 23:30, 自动调整为 23:30" f"预约持续时间 {duration} 小时, 超过最大预约时间 23:30, 自动调整为 23:30",
self.TraceLevel.WARNING
) )
return expect_end_mins return expect_end_mins
@@ -616,7 +650,7 @@ class LibReserve(LibTimeSelector):
EC.presence_of_element_located((By.ID, "seatLayout")) EC.presence_of_element_located((By.ID, "seatLayout"))
) )
except: except:
self._showTrace(f"加载预约选座页面失败 !") self._showTrace(f"加载预约选座页面失败 !", self.TraceLevel.ERROR)
return False return False
# date, place, floor, room # date, place, floor, room
if not self.__selectDate(reserve_info["date"]): if not self.__selectDate(reserve_info["date"]):
@@ -635,7 +669,7 @@ class LibReserve(LibTimeSelector):
elif not self.__selectSeatTime( elif not self.__selectSeatTime(
begin_time=reserve_info["begin_time"], begin_time=reserve_info["begin_time"],
end_time=reserve_info["end_time"], end_time=reserve_info["end_time"],
expct_duration=reserve_info["expect_duration"], expect_duration=reserve_info["expect_duration"],
satisfy_duration=reserve_info["satisfy_duration"] satisfy_duration=reserve_info["satisfy_duration"]
): ):
pass pass
@@ -649,11 +683,11 @@ class LibReserve(LibTimeSelector):
raise raise
reserve_success = True reserve_success = True
except: except:
self._showTrace(f"预约提交失败 !") self._showTrace(f"预约提交失败 !", self.TraceLevel.ERROR)
if not submit_reserve and have_hover_on_page: if not submit_reserve and have_hover_on_page:
self.__driver.refresh() self.__driver.refresh()
if reserve_success: if reserve_success:
self._showTrace(f"用户 {username} 预约成功 !") self._showTrace(f"用户 {username} 预约成功 !")
else: else:
self._showTrace(f"用户 {username} 预约失败 !") self._showTrace(f"用户 {username} 预约失败 !", self.TraceLevel.ERROR)
return reserve_success return reserve_success
+58
View File
@@ -0,0 +1,58 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2025 - 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import os
from PySide6.QtCore import QStandardPaths, QDir
from utils.ConfigManager import instance as configInstance
from utils.LogManager import instance as logInstance
def initializeConfigManager(
) -> bool:
logger = logInstance().getLogger("AppInitializer")
app_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppDataLocation)
old_config_dir = os.path.join(app_dir, "config")
new_config_dir = os.path.join(app_dir, "configs")
if QDir(old_config_dir).exists(): # old config dir exists
#we rename it to compatible with new version
logger.info("存在旧配置目录 %s,将其重命名为 %s", old_config_dir, new_config_dir)
if not QDir().rename(old_config_dir, new_config_dir):
logger.error("重命名旧配置目录 %s%s 失败", old_config_dir, new_config_dir)
return False
elif not QDir(new_config_dir).exists():
logger.info("初始化配置目录 %s", new_config_dir)
if not QDir().mkpath(new_config_dir):
logger.error("创建配置目录 %s 失败", new_config_dir)
return False
configInstance(new_config_dir)
return True
def initializeLogManager(
) -> bool:
app_dir = QStandardPaths.writableLocation(QStandardPaths.StandardLocation.AppDataLocation)
log_dir = os.path.join(app_dir, "logs")
if not QDir(log_dir).exists():
if not QDir().mkpath(log_dir):
return False
logInstance(log_dir)
return True
def initializeApp(
) -> bool:
if not initializeLogManager():
return False
if not initializeConfigManager():
return False
return True
+191
View File
@@ -0,0 +1,191 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import logging
import os
import threading
from logging.handlers import TimedRotatingFileHandler
from typing import Optional
class CallerInfoFormatter(logging.Formatter):
"""
Custom formatter to extract real caller information.
Skips MsgBase._showTrace to show the actual calling location.
Format:
- Logger name: left-aligned, max 15 chars
- Level name: left-aligned, max 8 chars
- Filename: left-aligned, max 20 chars
- Line number: left-aligned, max 4 digits
"""
def __init__(
self,
fmt=None,
datefmt=None,
style='%'
):
super().__init__(fmt, datefmt, style)
self.basefmt = fmt
def format(
self,
record
):
depth = 0
while depth < 10:
record.filename = os.path.basename(record.pathname)
if 'MsgBase.py' not in record.filename and record.funcName != '_showTrace':
break
if not hasattr(record, 'stack'):
record.stack = True
import traceback
record.stack_list = traceback.extract_stack()
depth += 1
if depth < len(record.stack_list):
frame = record.stack_list[-depth-1]
record.filename = os.path.basename(frame.filename)
record.lineno = frame.lineno
record.funcName = frame.name
record.name = record.name[-15:].ljust(15)
record.levelname = record.levelname.ljust(8)
record.filename = record.filename[-20:].ljust(20)
record.lineno = f"{record.lineno:04d}"
return super().format(record)
class LogManager:
"""
Log Manager Singleton Class
Args:
log_dir (str): The directory to store log files.
"""
def __init__(
self,
log_dir: str
):
self.__log_dir = os.path.abspath(log_dir)
self.__logger = None
self.__initialized = False
self.initialize()
def initialize(
self
):
if self.__initialized:
return
os.makedirs(self.__log_dir, exist_ok=True)
self.__logger = logging.getLogger("AutoLibrary")
self.__logger.setLevel(logging.DEBUG)
self.__logger.handlers.clear()
formatter = CallerInfoFormatter(
'[%(asctime)s] - [%(name)s] - [%(levelname)s] - [%(filename)s:%(lineno)s] - %(message)s',
datefmt='%Y-%m-%d %H:%M:%S'
)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
console_handler.setFormatter(formatter)
self.__logger.addHandler(console_handler)
all_log_file = os.path.join(self.__log_dir, "all.log")
file_handler_all = TimedRotatingFileHandler(
all_log_file,
when='midnight',
interval=1,
backupCount=7,
encoding='utf-8'
)
file_handler_all.suffix = "%Y-%m-%d.log"
file_handler_all.setLevel(logging.DEBUG)
file_handler_all.setFormatter(formatter)
self.__logger.addHandler(file_handler_all)
error_log_file = os.path.join(self.__log_dir, "error.log")
file_handler_error = TimedRotatingFileHandler(
error_log_file,
when='midnight',
interval=1,
backupCount=14,
encoding='utf-8'
)
file_handler_error.suffix = "%Y-%m-%d.log"
file_handler_error.setLevel(logging.ERROR)
file_handler_error.setFormatter(formatter)
self.__logger.addHandler(file_handler_error)
self.__initialized = True
def getLogger(
self,
name: Optional[str] = None
) -> logging.Logger:
if name:
return self.__logger.getChild(name)
return self.__logger
def setLevel(
self,
level: int
):
if self.__logger:
self.__logger.setLevel(level)
def logDir(
self
) -> str:
return self.__log_dir
# LogManager singleton instance.
_log_manager_instance = None
# Singleton instance lock.
_instance_lock = threading.Lock()
def instance(
log_dir: str = ""
) -> LogManager:
global _log_manager_instance
with _instance_lock:
if _log_manager_instance is None:
if not log_dir:
raise ValueError("LogManager initialization requires log_dir parameter")
_log_manager_instance = LogManager(log_dir)
else:
if log_dir and _log_manager_instance.logDir() != os.path.abspath(log_dir):
raise ValueError("LogManager instance already initialized with a different log directory")
return _log_manager_instance
def getLogger(
name: Optional[str] = None
) -> logging.Logger:
if _log_manager_instance is None:
raise RuntimeError("LogManager not initialized, please call LogManager.instance(log_dir) first")
return _log_manager_instance.getLogger(name)