1
1
mirror of https://github.com/KenanZhu/AutoLibrary.git synced 2026-06-17 23:13:03 +08:00

refactor(*): LoginPage 消息追踪统一与 Flow 长方法拆分 (#12)

refactor: LoginPage 消息追踪统一与 Flow 长方法拆分
This commit is contained in:
Kenan Zhu
2026-06-13 09:30:02 +08:00
committed by GitHub
5 changed files with 189 additions and 76 deletions
+15 -13
View File
@@ -10,6 +10,7 @@ See the LICENSE file for details.
import os import os
import queue import queue
from selenium import webdriver from selenium import webdriver
from selenium.webdriver.remote.webdriver import WebDriver
from selenium.common.exceptions import ( from selenium.common.exceptions import (
TimeoutException, TimeoutException,
WebDriverException, WebDriverException,
@@ -37,11 +38,11 @@ class AutoLib(MsgBase):
output_queue: queue.Queue, output_queue: queue.Queue,
run_config: dict, run_config: dict,
) -> None: ) -> None:
super().__init__(input_queue, output_queue)
super().__init__(input_queue, output_queue)
self.__run_config: dict = run_config self.__run_config: dict = run_config
self.__user_config: dict | None = None self.__user_config: dict | None = None
self.__driver = None self.__driver: WebDriver | None = None
self.__driver_type: str = "" self.__driver_type: str = ""
self.__driver_path: str = "" self.__driver_path: str = ""
self.__login_page: LoginPage = None self.__login_page: LoginPage = None
@@ -58,7 +59,7 @@ class AutoLib(MsgBase):
else: else:
if not self.__initDriverUrl(): if not self.__initDriverUrl():
self.close() self.close()
raise Exception("浏览器驱动URL初始化失败 !") raise Exception("浏览器驱动 URL 初始化失败 !")
self.__initPagesServices() self.__initPagesServices()
self.__initPagesFlows() self.__initPagesFlows()
@@ -67,9 +68,10 @@ class AutoLib(MsgBase):
) -> bool: ) -> bool:
self._showTrace("正在初始化浏览器驱动......", no_log=True) self._showTrace("正在初始化浏览器驱动......", no_log=True)
web_driver_config: dict = self.__run_config.get("web_driver", None) driver_config: dict = self.__run_config.get("web_driver", None)
self.__driver_type = web_driver_config.get("driver_type", "none") self.__driver_type = driver_config.get("driver_type", "none")
match self.__driver_type.lower(): self.__driver_type = self.__driver_type.lower()
match self.__driver_type:
case "edge": case "edge":
driver_options = webdriver.EdgeOptions() driver_options = webdriver.EdgeOptions()
case "chrome": case "chrome":
@@ -82,10 +84,10 @@ class AutoLib(MsgBase):
self.TraceLevel.WARNING, self.TraceLevel.WARNING,
) )
return False return False
if not web_driver_config: if not driver_config:
self._showTrace("未配置浏览器驱动参数 !", self.TraceLevel.ERROR) self._showTrace("未配置浏览器驱动参数 !", self.TraceLevel.ERROR)
return False return False
if web_driver_config.get("headless", False): if driver_config.get("headless", False):
driver_options.add_argument("--headless") driver_options.add_argument("--headless")
driver_options.add_argument("--disable-gpu") driver_options.add_argument("--disable-gpu")
driver_options.add_argument("--no-sandbox") driver_options.add_argument("--no-sandbox")
@@ -110,11 +112,11 @@ class AutoLib(MsgBase):
"AppleWebKit/537.36 (KHTML, like Gecko) "\ "AppleWebKit/537.36 (KHTML, like Gecko) "\
"Chrome/120.0.0.0 "\ "Chrome/120.0.0.0 "\
"Safari/537.36" "Safari/537.36"
if self.__driver_type.lower() == "edge": if self.__driver_type == "edge":
user_agent += " Edg/120.0.0.0" user_agent += " Edg/120.0.0.0"
# set options for firefox # set options for firefox
elif self.__driver_type.lower() == "firefox": elif self.__driver_type == "firefox":
driver_options.set_preference("dom.webdriver.enabled", False) driver_options.set_preference("dom.webdriver.enabled", False)
driver_options.set_preference("useAutomationExtension", False) driver_options.set_preference("useAutomationExtension", False)
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) "\ user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) "\
@@ -122,14 +124,14 @@ class AutoLib(MsgBase):
driver_options.add_argument(f"user-agent={user_agent}") driver_options.add_argument(f"user-agent={user_agent}")
# init browser driver # init browser driver
self.__driver_path = web_driver_config.get("driver_path", "") self.__driver_path = driver_config.get("driver_path", "")
if not self.__driver_path: if not self.__driver_path:
self._showTrace("未配置浏览器驱动路径 !", self.TraceLevel.WARNING) self._showTrace("未配置浏览器驱动路径 !", self.TraceLevel.WARNING)
return False return False
try: try:
self.__driver_path = os.path.abspath(self.__driver_path) self.__driver_path = os.path.abspath(self.__driver_path)
service = None service = None
match self.__driver_type.lower(): match self.__driver_type:
case "edge": case "edge":
service = EdgeService(executable_path=self.__driver_path) service = EdgeService(executable_path=self.__driver_path)
self.__driver = webdriver.Edge(service=service, options=driver_options) self.__driver = webdriver.Edge(service=service, options=driver_options)
@@ -161,7 +163,7 @@ class AutoLib(MsgBase):
self._showTrace("未配置图书馆参数 !", self.TraceLevel.ERROR) self._showTrace("未配置图书馆参数 !", self.TraceLevel.ERROR)
return False return False
url: str = lib_config.get("host_url") + lib_config.get("login_url") url: str = lib_config.get("host_url") + lib_config.get("login_url")
self.__login_page = LoginPage(self.__driver, tracer=self._showTrace) self.__login_page = LoginPage(self._input_queue, self._output_queue, self.__driver)
self.__driver.set_page_load_timeout(5) self.__driver.set_page_load_timeout(5)
try: try:
self.__driver.get(url) self.__driver.get(url)
+13 -19
View File
@@ -7,7 +7,8 @@ This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License. You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details. See the LICENSE file for details.
""" """
from typing import Callable, Optional import queue
from typing import Callable
from selenium.common.exceptions import ( from selenium.common.exceptions import (
ElementNotInteractableException, ElementNotInteractableException,
@@ -19,8 +20,10 @@ from selenium.webdriver.remote.webdriver import WebDriver
from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC from selenium.webdriver.support import expected_conditions as EC
from base.MsgBase import MsgBase
class LoginPage:
class LoginPage(MsgBase):
USERNAME_INPUT = (By.NAME, "username") USERNAME_INPUT = (By.NAME, "username")
PASSWORD_INPUT = (By.NAME, "password") PASSWORD_INPUT = (By.NAME, "password")
@@ -36,22 +39,13 @@ class LoginPage:
def __init__( def __init__(
self, self,
input_queue: queue.Queue,
output_queue: queue.Queue,
driver: WebDriver, driver: WebDriver,
tracer: Optional[Callable[..., None]] = None,
) -> None: ) -> None:
super().__init__(input_queue, output_queue)
self._driver: WebDriver = driver self._driver: WebDriver = driver
self._tracer: Optional[Callable[..., None]] = tracer
def _trace(
self,
msg: str,
level: int = 20,
no_log: bool = False,
) -> None:
if self._tracer:
self._tracer(msg, level, no_log)
def navigate( def navigate(
self, self,
@@ -185,7 +179,7 @@ class LoginPage:
) -> bool: ) -> bool:
for attempt in range(max_attempts): for attempt in range(max_attempts):
self._trace( self._showTrace(
f"用户 {username}{attempt + 1} 次尝试登录......", f"用户 {username}{attempt + 1} 次尝试登录......",
no_log=True, no_log=True,
) )
@@ -196,16 +190,16 @@ class LoginPage:
continue continue
if not self.fillCaptcha(captcha_text): if not self.fillCaptcha(captcha_text):
continue continue
self._trace("尝试登录...", no_log=True) self._showTrace("尝试登录...", no_log=True)
if not self.clickLogin(): if not self.clickLogin():
continue continue
if self.waitLoginSuccess(): if self.waitLoginSuccess():
self._trace(f"用户 {username}{attempt + 1} 次登录成功 !") self._showTrace(f"用户 {username}{attempt + 1} 次登录成功 !")
return True return True
else: else:
self._trace( self._showTrace(
"登录页面加载失败 ! : " "登录页面加载失败 ! : "
"用户账号或者密码错误/验证码错误, 具体以页面提示为准", "用户账号或者密码错误/验证码错误, 具体以页面提示为准",
level=40, level=self.TraceLevel.ERROR,
) )
return False return False
+18 -2
View File
@@ -35,7 +35,7 @@ class CheckinFlow(MsgBase):
self._driver: WebDriver = driver self._driver: WebDriver = driver
self._shell: MainShell = shell self._shell: MainShell = shell
def execute( def _ensureCheckinButton(
self, self,
username: str, username: str,
) -> bool: ) -> bool:
@@ -49,7 +49,13 @@ class CheckinFlow(MsgBase):
self._showTrace(f"签到按钮启用失败 !", self.TraceLevel.ERROR) self._showTrace(f"签到按钮启用失败 !", self.TraceLevel.ERROR)
return False return False
self._showTrace("签到按钮已启用") self._showTrace("签到按钮已启用")
self._shell.clickCheckinButton() return True
def _processCheckinDialog(
self,
username: str,
) -> bool:
try: try:
with CheckinResultDialog(self._driver) as dialog: with CheckinResultDialog(self._driver) as dialog:
result_msg = dialog.getResultMessage() result_msg = dialog.getResultMessage()
@@ -87,3 +93,13 @@ class CheckinFlow(MsgBase):
except (TimeoutException, NoSuchElementException, ElementNotInteractableException): except (TimeoutException, NoSuchElementException, ElementNotInteractableException):
self._showTrace("签到时发生未知错误 !", self.TraceLevel.ERROR) self._showTrace("签到时发生未知错误 !", self.TraceLevel.ERROR)
return False return False
def execute(
self,
username: str,
) -> bool:
if not self._ensureCheckinButton(username):
return False
self._shell.clickCheckinButton()
return self._processCheckinDialog(username)
+40 -7
View File
@@ -61,19 +61,23 @@ class RenewFlow(MsgBase):
) )
return True return True
def execute( def _computeRenewTarget(
self, self,
username: str,
record: dict, record: dict,
renew_info: dict, renew_info: dict,
) -> bool: ):
max_diff = renew_info.get("max_diff", 30)
prefer_earlier = renew_info.get("prefer_early", True)
end_time = record["time"]["end"] end_time = record["time"]["end"]
target_renew_mins = timeStrToMins(end_time) + renew_info.get("expect_duration", 2) * 60 target_renew_mins = timeStrToMins(end_time) + renew_info.get("expect_duration", 2) * 60
if not self._validateRenewTime(end_time, target_renew_mins): if not self._validateRenewTime(end_time, target_renew_mins):
return False return None
return target_renew_mins
def _ensureExtendButton(
self,
username: str,
) -> bool:
if not self._shell.waitExtendButton(): if not self._shell.waitExtendButton():
self._showTrace(f"用户 {username} 续约界面加载失败 !", self.TraceLevel.ERROR) self._showTrace(f"用户 {username} 续约界面加载失败 !", self.TraceLevel.ERROR)
return False return False
@@ -83,7 +87,17 @@ class RenewFlow(MsgBase):
f"请连接图书馆网络后重试" f"请连接图书馆网络后重试"
) )
return False return False
self._shell.clickExtendButton() return True
def _processRenewDialog(
self,
username: str,
record: dict,
target_renew_mins: int,
max_diff: int,
prefer_earlier: bool,
) -> bool:
try: try:
with RenewDialog(self._driver) as dialog: with RenewDialog(self._driver) as dialog:
if not dialog.waitUntilReady(): if not dialog.waitUntilReady():
@@ -132,3 +146,22 @@ class RenewFlow(MsgBase):
self._showTrace(f"用户 {username} 续约失败 ! : {e}", self.TraceLevel.ERROR) self._showTrace(f"用户 {username} 续约失败 ! : {e}", self.TraceLevel.ERROR)
self._shell.refresh() self._shell.refresh()
return False return False
def execute(
self,
username: str,
record: dict,
renew_info: dict,
) -> bool:
max_diff = renew_info.get("max_diff", 30)
prefer_earlier = renew_info.get("prefer_early", True)
target_renew_mins = self._computeRenewTarget(record, renew_info)
if target_renew_mins is None:
return False
if not self._ensureExtendButton(username):
return False
self._shell.clickExtendButton()
return self._processRenewDialog(
username, record, target_renew_mins, max_diff, prefer_earlier,
)
+103 -35
View File
@@ -58,40 +58,105 @@ class ReserveFlow(MsgBase):
self._driver: WebDriver = driver self._driver: WebDriver = driver
self._shell: MainShell = shell self._shell: MainShell = shell
def execute( def _loadReserveView(
self, self,
) -> ReserveView | None:
try:
return self._shell.gotoReserveView()
except (TimeoutException, ElementNotInteractableException) as e:
self._showTrace(f"加载预约选座页面失败 ! : {e}", self.TraceLevel.ERROR)
return None
def _selectDate(
self,
view: ReserveView,
ctx: ReserveContext, ctx: ReserveContext,
) -> bool: ) -> bool:
submit_reserve = False
reserve_success = False
have_hover_on_page = False
try:
view = self._shell.gotoReserveView()
except (TimeoutException, ElementNotInteractableException) as e:
self._showTrace(f"加载预约选座页面失败 ! : {e}", self.TraceLevel.ERROR)
return False
if not view.selectDate(ctx.date): if not view.selectDate(ctx.date):
self._showTrace(f"选择日期失败 ! : {ctx.date} 不可用", self.TraceLevel.ERROR) self._showTrace(f"选择日期失败 ! : {ctx.date} 不可用", self.TraceLevel.ERROR)
return False return False
self._showTrace(f"日期 {ctx.date} 选择成功 !") self._showTrace(f"日期 {ctx.date} 选择成功 !")
return True
def _selectPlace(
self,
view: ReserveView,
) -> bool:
if not view.selectPlace("1"): if not view.selectPlace("1"):
self._showTrace("选择预约场所失败 ! : 图书馆 不可用", self.TraceLevel.ERROR) self._showTrace("选择预约场所失败 ! : 图书馆 不可用", self.TraceLevel.ERROR)
return False return False
self._showTrace("预约场所 图书馆 选择成功 !") self._showTrace("预约场所 图书馆 选择成功 !")
return True
def _selectFloor(
self,
view: ReserveView,
ctx: ReserveContext,
) -> bool:
if not view.selectFloor(ctx.floor): if not view.selectFloor(ctx.floor):
display_floor = ReserveView.FLOOR_MAP.get(ctx.floor, ctx.floor) display_floor = ReserveView.FLOOR_MAP.get(ctx.floor, ctx.floor)
self._showTrace(f"选择楼层失败 ! : {display_floor} 不可用", self.TraceLevel.ERROR) self._showTrace(f"选择楼层失败 ! : {display_floor} 不可用", self.TraceLevel.ERROR)
return False return False
self._showTrace(f"楼层 {ReserveView.FLOOR_MAP.get(ctx.floor)} 选择成功 !") self._showTrace(f"楼层 {ReserveView.FLOOR_MAP.get(ctx.floor)} 选择成功 !")
return True
def _selectRoom(
self,
view: ReserveView,
ctx: ReserveContext,
):
seat_map = view.selectRoom(ctx.room) seat_map = view.selectRoom(ctx.room)
if seat_map is None: if seat_map is None:
display_room = ReserveView.ROOM_MAP.get(ctx.room, ctx.room) display_room = ReserveView.ROOM_MAP.get(ctx.room, ctx.room)
self._showTrace(f"选择房间失败 ! : {display_room} 不可用", self.TraceLevel.ERROR) self._showTrace(f"选择房间失败 ! : {display_room} 不可用", self.TraceLevel.ERROR)
return False return None
self._showTrace(f"房间 {ReserveView.ROOM_MAP.get(ctx.room)} 选择成功 !") self._showTrace(f"房间 {ReserveView.ROOM_MAP.get(ctx.room)} 选择成功 !")
have_hover_on_page = True return seat_map
def _processReserveResult(
self,
) -> bool:
with ReserveResultDialog(self._driver) as result:
if result.isFailure():
self._showTrace("预约失败", self.TraceLevel.ERROR)
elif result.isSuccess():
details = result.getDetailTexts()
if len(details) >= 6:
self._showTrace(
f"\n"
f" 预约成功 !\n"
f" {details[1]}\n"
f" {details[2]}\n"
f" {details[3]}\n"
f" 签到时间 {details[5]}"
)
else:
self._showTrace(
"\n"
" 预约成功 !\n"
" 未找获取到详细信息"
)
return True
else:
self._showTrace("预约结果加载失败 !", self.TraceLevel.ERROR)
return False
def _selectSeatAndSubmit(
self,
view: ReserveView,
seat_map,
ctx: ReserveContext,
) -> tuple[bool, bool]:
submit_reserve = False
reserve_success = False
seat_status = seat_map.selectSeat(ctx.seat_id) seat_status = seat_map.selectSeat(ctx.seat_id)
if seat_status is None: if seat_status is None:
self._showTrace( self._showTrace(
@@ -111,31 +176,34 @@ class ReserveFlow(MsgBase):
try: try:
view.submitReserve() view.submitReserve()
submit_reserve = True submit_reserve = True
with ReserveResultDialog(self._driver) as result: reserve_success = self._processReserveResult()
if result.isFailure():
self._showTrace("预约失败", self.TraceLevel.ERROR)
elif result.isSuccess():
details = result.getDetailTexts()
if len(details) >= 6:
self._showTrace(
f"\n"
f" 预约成功 !\n"
f" {details[1]}\n"
f" {details[2]}\n"
f" {details[3]}\n"
f" 签到时间 {details[5]}"
)
else:
self._showTrace(
"\n"
" 预约成功 !\n"
" 未找获取到详细信息"
)
reserve_success = True
else:
self._showTrace("预约结果加载失败 !", self.TraceLevel.ERROR)
except (TimeoutException, ElementNotInteractableException): except (TimeoutException, ElementNotInteractableException):
self._showTrace("预约提交失败 !", self.TraceLevel.ERROR) self._showTrace("预约提交失败 !", self.TraceLevel.ERROR)
return submit_reserve, reserve_success
def execute(
self,
ctx: ReserveContext,
) -> bool:
# reserve flow pipeline:
# date > place > floor > room > seat (begin/end time) > submit > result
view = self._loadReserveView()
if view is None:
return False
if not self._selectDate(view, ctx):
return False
if not self._selectPlace(view):
return False
if not self._selectFloor(view, ctx):
return False
seat_map = self._selectRoom(view, ctx)
if seat_map is None:
return False
have_hover_on_page = True
submit_reserve, reserve_success = self._selectSeatAndSubmit(
view, seat_map, ctx,
)
if not submit_reserve and have_hover_on_page: if not submit_reserve and have_hover_on_page:
view.refresh() view.refresh()
if reserve_success: if reserve_success: