1
1
mirror of https://github.com/KenanZhu/AutoLibrary.git synced 2026-06-17 23:13:03 +08:00

Compare commits

...

3 Commits

Author SHA1 Message Date
Kenan Zhu 0bad34d7a8 refactor(*): LoginPage 消息追踪统一与 Flow 长方法拆分 (#12)
refactor: LoginPage 消息追踪统一与 Flow 长方法拆分
2026-06-13 09:30:02 +08:00
KenanZhu 61d1b44402 refactor: 提取 Flow 类 execute 长方法为私有子方法,降低嵌套深度 2026-06-13 08:59:45 +08:00
KenanZhu 72301c63fd refactor: LoginPage 继承 MsgBase,统一页面消息追踪机制 2026-06-13 08:59:34 +08:00
5 changed files with 189 additions and 76 deletions
+15 -13
View File
@@ -10,6 +10,7 @@ See the LICENSE file for details.
import os
import queue
from selenium import webdriver
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.common.exceptions import (
TimeoutException,
WebDriverException,
@@ -37,11 +38,11 @@ class AutoLib(MsgBase):
output_queue: queue.Queue,
run_config: dict,
) -> None:
super().__init__(input_queue, output_queue)
super().__init__(input_queue, output_queue)
self.__run_config: dict = run_config
self.__user_config: dict | None = None
self.__driver = None
self.__driver: WebDriver | None = None
self.__driver_type: str = ""
self.__driver_path: str = ""
self.__login_page: LoginPage = None
@@ -58,7 +59,7 @@ class AutoLib(MsgBase):
else:
if not self.__initDriverUrl():
self.close()
raise Exception("浏览器驱动URL初始化失败 !")
raise Exception("浏览器驱动 URL 初始化失败 !")
self.__initPagesServices()
self.__initPagesFlows()
@@ -67,9 +68,10 @@ class AutoLib(MsgBase):
) -> bool:
self._showTrace("正在初始化浏览器驱动......", no_log=True)
web_driver_config: dict = self.__run_config.get("web_driver", None)
self.__driver_type = web_driver_config.get("driver_type", "none")
match self.__driver_type.lower():
driver_config: dict = self.__run_config.get("web_driver", None)
self.__driver_type = driver_config.get("driver_type", "none")
self.__driver_type = self.__driver_type.lower()
match self.__driver_type:
case "edge":
driver_options = webdriver.EdgeOptions()
case "chrome":
@@ -82,10 +84,10 @@ class AutoLib(MsgBase):
self.TraceLevel.WARNING,
)
return False
if not web_driver_config:
if not driver_config:
self._showTrace("未配置浏览器驱动参数 !", self.TraceLevel.ERROR)
return False
if web_driver_config.get("headless", False):
if driver_config.get("headless", False):
driver_options.add_argument("--headless")
driver_options.add_argument("--disable-gpu")
driver_options.add_argument("--no-sandbox")
@@ -110,11 +112,11 @@ class AutoLib(MsgBase):
"AppleWebKit/537.36 (KHTML, like Gecko) "\
"Chrome/120.0.0.0 "\
"Safari/537.36"
if self.__driver_type.lower() == "edge":
if self.__driver_type == "edge":
user_agent += " Edg/120.0.0.0"
# set options for firefox
elif self.__driver_type.lower() == "firefox":
elif self.__driver_type == "firefox":
driver_options.set_preference("dom.webdriver.enabled", False)
driver_options.set_preference("useAutomationExtension", False)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) "\
@@ -122,14 +124,14 @@ class AutoLib(MsgBase):
driver_options.add_argument(f"user-agent={user_agent}")
# init browser driver
self.__driver_path = web_driver_config.get("driver_path", "")
self.__driver_path = driver_config.get("driver_path", "")
if not self.__driver_path:
self._showTrace("未配置浏览器驱动路径 !", self.TraceLevel.WARNING)
return False
try:
self.__driver_path = os.path.abspath(self.__driver_path)
service = None
match self.__driver_type.lower():
match self.__driver_type:
case "edge":
service = EdgeService(executable_path=self.__driver_path)
self.__driver = webdriver.Edge(service=service, options=driver_options)
@@ -161,7 +163,7 @@ class AutoLib(MsgBase):
self._showTrace("未配置图书馆参数 !", self.TraceLevel.ERROR)
return False
url: str = lib_config.get("host_url") + lib_config.get("login_url")
self.__login_page = LoginPage(self.__driver, tracer=self._showTrace)
self.__login_page = LoginPage(self._input_queue, self._output_queue, self.__driver)
self.__driver.set_page_load_timeout(5)
try:
self.__driver.get(url)
+13 -19
View File
@@ -7,7 +7,8 @@ This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
from typing import Callable, Optional
import queue
from typing import Callable
from selenium.common.exceptions import (
ElementNotInteractableException,
@@ -19,8 +20,10 @@ from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from base.MsgBase import MsgBase
class LoginPage:
class LoginPage(MsgBase):
USERNAME_INPUT = (By.NAME, "username")
PASSWORD_INPUT = (By.NAME, "password")
@@ -36,22 +39,13 @@ class LoginPage:
def __init__(
self,
input_queue: queue.Queue,
output_queue: queue.Queue,
driver: WebDriver,
tracer: Optional[Callable[..., None]] = None,
) -> None:
super().__init__(input_queue, output_queue)
self._driver: WebDriver = driver
self._tracer: Optional[Callable[..., None]] = tracer
def _trace(
self,
msg: str,
level: int = 20,
no_log: bool = False,
) -> None:
if self._tracer:
self._tracer(msg, level, no_log)
def navigate(
self,
@@ -185,7 +179,7 @@ class LoginPage:
) -> bool:
for attempt in range(max_attempts):
self._trace(
self._showTrace(
f"用户 {username}{attempt + 1} 次尝试登录......",
no_log=True,
)
@@ -196,16 +190,16 @@ class LoginPage:
continue
if not self.fillCaptcha(captcha_text):
continue
self._trace("尝试登录...", no_log=True)
self._showTrace("尝试登录...", no_log=True)
if not self.clickLogin():
continue
if self.waitLoginSuccess():
self._trace(f"用户 {username}{attempt + 1} 次登录成功 !")
self._showTrace(f"用户 {username}{attempt + 1} 次登录成功 !")
return True
else:
self._trace(
self._showTrace(
"登录页面加载失败 ! : "
"用户账号或者密码错误/验证码错误, 具体以页面提示为准",
level=40,
level=self.TraceLevel.ERROR,
)
return False
+18 -2
View File
@@ -35,7 +35,7 @@ class CheckinFlow(MsgBase):
self._driver: WebDriver = driver
self._shell: MainShell = shell
def execute(
def _ensureCheckinButton(
self,
username: str,
) -> bool:
@@ -49,7 +49,13 @@ class CheckinFlow(MsgBase):
self._showTrace(f"签到按钮启用失败 !", self.TraceLevel.ERROR)
return False
self._showTrace("签到按钮已启用")
self._shell.clickCheckinButton()
return True
def _processCheckinDialog(
self,
username: str,
) -> bool:
try:
with CheckinResultDialog(self._driver) as dialog:
result_msg = dialog.getResultMessage()
@@ -87,3 +93,13 @@ class CheckinFlow(MsgBase):
except (TimeoutException, NoSuchElementException, ElementNotInteractableException):
self._showTrace("签到时发生未知错误 !", self.TraceLevel.ERROR)
return False
def execute(
self,
username: str,
) -> bool:
if not self._ensureCheckinButton(username):
return False
self._shell.clickCheckinButton()
return self._processCheckinDialog(username)
+40 -7
View File
@@ -61,19 +61,23 @@ class RenewFlow(MsgBase):
)
return True
def execute(
def _computeRenewTarget(
self,
username: str,
record: dict,
renew_info: dict,
) -> bool:
):
max_diff = renew_info.get("max_diff", 30)
prefer_earlier = renew_info.get("prefer_early", True)
end_time = record["time"]["end"]
target_renew_mins = timeStrToMins(end_time) + renew_info.get("expect_duration", 2) * 60
if not self._validateRenewTime(end_time, target_renew_mins):
return False
return None
return target_renew_mins
def _ensureExtendButton(
self,
username: str,
) -> bool:
if not self._shell.waitExtendButton():
self._showTrace(f"用户 {username} 续约界面加载失败 !", self.TraceLevel.ERROR)
return False
@@ -83,7 +87,17 @@ class RenewFlow(MsgBase):
f"请连接图书馆网络后重试"
)
return False
self._shell.clickExtendButton()
return True
def _processRenewDialog(
self,
username: str,
record: dict,
target_renew_mins: int,
max_diff: int,
prefer_earlier: bool,
) -> bool:
try:
with RenewDialog(self._driver) as dialog:
if not dialog.waitUntilReady():
@@ -132,3 +146,22 @@ class RenewFlow(MsgBase):
self._showTrace(f"用户 {username} 续约失败 ! : {e}", self.TraceLevel.ERROR)
self._shell.refresh()
return False
def execute(
self,
username: str,
record: dict,
renew_info: dict,
) -> bool:
max_diff = renew_info.get("max_diff", 30)
prefer_earlier = renew_info.get("prefer_early", True)
target_renew_mins = self._computeRenewTarget(record, renew_info)
if target_renew_mins is None:
return False
if not self._ensureExtendButton(username):
return False
self._shell.clickExtendButton()
return self._processRenewDialog(
username, record, target_renew_mins, max_diff, prefer_earlier,
)
+103 -35
View File
@@ -58,40 +58,105 @@ class ReserveFlow(MsgBase):
self._driver: WebDriver = driver
self._shell: MainShell = shell
def execute(
def _loadReserveView(
self,
) -> ReserveView | None:
try:
return self._shell.gotoReserveView()
except (TimeoutException, ElementNotInteractableException) as e:
self._showTrace(f"加载预约选座页面失败 ! : {e}", self.TraceLevel.ERROR)
return None
def _selectDate(
self,
view: ReserveView,
ctx: ReserveContext,
) -> bool:
submit_reserve = False
reserve_success = False
have_hover_on_page = False
try:
view = self._shell.gotoReserveView()
except (TimeoutException, ElementNotInteractableException) as e:
self._showTrace(f"加载预约选座页面失败 ! : {e}", self.TraceLevel.ERROR)
return False
if not view.selectDate(ctx.date):
self._showTrace(f"选择日期失败 ! : {ctx.date} 不可用", self.TraceLevel.ERROR)
return False
self._showTrace(f"日期 {ctx.date} 选择成功 !")
return True
def _selectPlace(
self,
view: ReserveView,
) -> bool:
if not view.selectPlace("1"):
self._showTrace("选择预约场所失败 ! : 图书馆 不可用", self.TraceLevel.ERROR)
return False
self._showTrace("预约场所 图书馆 选择成功 !")
return True
def _selectFloor(
self,
view: ReserveView,
ctx: ReserveContext,
) -> bool:
if not view.selectFloor(ctx.floor):
display_floor = ReserveView.FLOOR_MAP.get(ctx.floor, ctx.floor)
self._showTrace(f"选择楼层失败 ! : {display_floor} 不可用", self.TraceLevel.ERROR)
return False
self._showTrace(f"楼层 {ReserveView.FLOOR_MAP.get(ctx.floor)} 选择成功 !")
return True
def _selectRoom(
self,
view: ReserveView,
ctx: ReserveContext,
):
seat_map = view.selectRoom(ctx.room)
if seat_map is None:
display_room = ReserveView.ROOM_MAP.get(ctx.room, ctx.room)
self._showTrace(f"选择房间失败 ! : {display_room} 不可用", self.TraceLevel.ERROR)
return False
return None
self._showTrace(f"房间 {ReserveView.ROOM_MAP.get(ctx.room)} 选择成功 !")
have_hover_on_page = True
return seat_map
def _processReserveResult(
self,
) -> bool:
with ReserveResultDialog(self._driver) as result:
if result.isFailure():
self._showTrace("预约失败", self.TraceLevel.ERROR)
elif result.isSuccess():
details = result.getDetailTexts()
if len(details) >= 6:
self._showTrace(
f"\n"
f" 预约成功 !\n"
f" {details[1]}\n"
f" {details[2]}\n"
f" {details[3]}\n"
f" 签到时间 {details[5]}"
)
else:
self._showTrace(
"\n"
" 预约成功 !\n"
" 未找获取到详细信息"
)
return True
else:
self._showTrace("预约结果加载失败 !", self.TraceLevel.ERROR)
return False
def _selectSeatAndSubmit(
self,
view: ReserveView,
seat_map,
ctx: ReserveContext,
) -> tuple[bool, bool]:
submit_reserve = False
reserve_success = False
seat_status = seat_map.selectSeat(ctx.seat_id)
if seat_status is None:
self._showTrace(
@@ -111,31 +176,34 @@ class ReserveFlow(MsgBase):
try:
view.submitReserve()
submit_reserve = True
with ReserveResultDialog(self._driver) as result:
if result.isFailure():
self._showTrace("预约失败", self.TraceLevel.ERROR)
elif result.isSuccess():
details = result.getDetailTexts()
if len(details) >= 6:
self._showTrace(
f"\n"
f" 预约成功 !\n"
f" {details[1]}\n"
f" {details[2]}\n"
f" {details[3]}\n"
f" 签到时间 {details[5]}"
)
else:
self._showTrace(
"\n"
" 预约成功 !\n"
" 未找获取到详细信息"
)
reserve_success = True
else:
self._showTrace("预约结果加载失败 !", self.TraceLevel.ERROR)
reserve_success = self._processReserveResult()
except (TimeoutException, ElementNotInteractableException):
self._showTrace("预约提交失败 !", self.TraceLevel.ERROR)
return submit_reserve, reserve_success
def execute(
self,
ctx: ReserveContext,
) -> bool:
# reserve flow pipeline:
# date > place > floor > room > seat (begin/end time) > submit > result
view = self._loadReserveView()
if view is None:
return False
if not self._selectDate(view, ctx):
return False
if not self._selectPlace(view):
return False
if not self._selectFloor(view, ctx):
return False
seat_map = self._selectRoom(view, ctx)
if seat_map is None:
return False
have_hover_on_page = True
submit_reserve, reserve_success = self._selectSeatAndSubmit(
view, seat_map, ctx,
)
if not submit_reserve and have_hover_on_page:
view.refresh()
if reserve_success: