mirror of
https://github.com/KenanZhu/AutoLibrary.git
synced 2026-06-17 23:13:03 +08:00
Compare commits
3 Commits
609850ab60
...
0bad34d7a8
| Author | SHA1 | Date | |
|---|---|---|---|
| 0bad34d7a8 | |||
| 61d1b44402 | |||
| 72301c63fd |
+14
-12
@@ -10,6 +10,7 @@ See the LICENSE file for details.
|
||||
import os
|
||||
import queue
|
||||
from selenium import webdriver
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.common.exceptions import (
|
||||
TimeoutException,
|
||||
WebDriverException,
|
||||
@@ -37,11 +38,11 @@ class AutoLib(MsgBase):
|
||||
output_queue: queue.Queue,
|
||||
run_config: dict,
|
||||
) -> None:
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
self.__run_config: dict = run_config
|
||||
self.__user_config: dict | None = None
|
||||
self.__driver = None
|
||||
self.__driver: WebDriver | None = None
|
||||
self.__driver_type: str = ""
|
||||
self.__driver_path: str = ""
|
||||
self.__login_page: LoginPage = None
|
||||
@@ -67,9 +68,10 @@ class AutoLib(MsgBase):
|
||||
) -> bool:
|
||||
|
||||
self._showTrace("正在初始化浏览器驱动......", no_log=True)
|
||||
web_driver_config: dict = self.__run_config.get("web_driver", None)
|
||||
self.__driver_type = web_driver_config.get("driver_type", "none")
|
||||
match self.__driver_type.lower():
|
||||
driver_config: dict = self.__run_config.get("web_driver", None)
|
||||
self.__driver_type = driver_config.get("driver_type", "none")
|
||||
self.__driver_type = self.__driver_type.lower()
|
||||
match self.__driver_type:
|
||||
case "edge":
|
||||
driver_options = webdriver.EdgeOptions()
|
||||
case "chrome":
|
||||
@@ -82,10 +84,10 @@ class AutoLib(MsgBase):
|
||||
self.TraceLevel.WARNING,
|
||||
)
|
||||
return False
|
||||
if not web_driver_config:
|
||||
if not driver_config:
|
||||
self._showTrace("未配置浏览器驱动参数 !", self.TraceLevel.ERROR)
|
||||
return False
|
||||
if web_driver_config.get("headless", False):
|
||||
if driver_config.get("headless", False):
|
||||
driver_options.add_argument("--headless")
|
||||
driver_options.add_argument("--disable-gpu")
|
||||
driver_options.add_argument("--no-sandbox")
|
||||
@@ -110,11 +112,11 @@ class AutoLib(MsgBase):
|
||||
"AppleWebKit/537.36 (KHTML, like Gecko) "\
|
||||
"Chrome/120.0.0.0 "\
|
||||
"Safari/537.36"
|
||||
if self.__driver_type.lower() == "edge":
|
||||
if self.__driver_type == "edge":
|
||||
user_agent += " Edg/120.0.0.0"
|
||||
|
||||
# set options for firefox
|
||||
elif self.__driver_type.lower() == "firefox":
|
||||
elif self.__driver_type == "firefox":
|
||||
driver_options.set_preference("dom.webdriver.enabled", False)
|
||||
driver_options.set_preference("useAutomationExtension", False)
|
||||
user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:120.0) "\
|
||||
@@ -122,14 +124,14 @@ class AutoLib(MsgBase):
|
||||
driver_options.add_argument(f"user-agent={user_agent}")
|
||||
|
||||
# init browser driver
|
||||
self.__driver_path = web_driver_config.get("driver_path", "")
|
||||
self.__driver_path = driver_config.get("driver_path", "")
|
||||
if not self.__driver_path:
|
||||
self._showTrace("未配置浏览器驱动路径 !", self.TraceLevel.WARNING)
|
||||
return False
|
||||
try:
|
||||
self.__driver_path = os.path.abspath(self.__driver_path)
|
||||
service = None
|
||||
match self.__driver_type.lower():
|
||||
match self.__driver_type:
|
||||
case "edge":
|
||||
service = EdgeService(executable_path=self.__driver_path)
|
||||
self.__driver = webdriver.Edge(service=service, options=driver_options)
|
||||
@@ -161,7 +163,7 @@ class AutoLib(MsgBase):
|
||||
self._showTrace("未配置图书馆参数 !", self.TraceLevel.ERROR)
|
||||
return False
|
||||
url: str = lib_config.get("host_url") + lib_config.get("login_url")
|
||||
self.__login_page = LoginPage(self.__driver, tracer=self._showTrace)
|
||||
self.__login_page = LoginPage(self._input_queue, self._output_queue, self.__driver)
|
||||
self.__driver.set_page_load_timeout(5)
|
||||
try:
|
||||
self.__driver.get(url)
|
||||
|
||||
+13
-19
@@ -7,7 +7,8 @@ This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
from typing import Callable, Optional
|
||||
import queue
|
||||
from typing import Callable
|
||||
|
||||
from selenium.common.exceptions import (
|
||||
ElementNotInteractableException,
|
||||
@@ -19,8 +20,10 @@ from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
from base.MsgBase import MsgBase
|
||||
|
||||
class LoginPage:
|
||||
|
||||
class LoginPage(MsgBase):
|
||||
|
||||
USERNAME_INPUT = (By.NAME, "username")
|
||||
PASSWORD_INPUT = (By.NAME, "password")
|
||||
@@ -36,22 +39,13 @@ class LoginPage:
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
input_queue: queue.Queue,
|
||||
output_queue: queue.Queue,
|
||||
driver: WebDriver,
|
||||
tracer: Optional[Callable[..., None]] = None,
|
||||
) -> None:
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
self._driver: WebDriver = driver
|
||||
self._tracer: Optional[Callable[..., None]] = tracer
|
||||
|
||||
def _trace(
|
||||
self,
|
||||
msg: str,
|
||||
level: int = 20,
|
||||
no_log: bool = False,
|
||||
) -> None:
|
||||
|
||||
if self._tracer:
|
||||
self._tracer(msg, level, no_log)
|
||||
|
||||
def navigate(
|
||||
self,
|
||||
@@ -185,7 +179,7 @@ class LoginPage:
|
||||
) -> bool:
|
||||
|
||||
for attempt in range(max_attempts):
|
||||
self._trace(
|
||||
self._showTrace(
|
||||
f"用户 {username} 第 {attempt + 1} 次尝试登录......",
|
||||
no_log=True,
|
||||
)
|
||||
@@ -196,16 +190,16 @@ class LoginPage:
|
||||
continue
|
||||
if not self.fillCaptcha(captcha_text):
|
||||
continue
|
||||
self._trace("尝试登录...", no_log=True)
|
||||
self._showTrace("尝试登录...", no_log=True)
|
||||
if not self.clickLogin():
|
||||
continue
|
||||
if self.waitLoginSuccess():
|
||||
self._trace(f"用户 {username} 第 {attempt + 1} 次登录成功 !")
|
||||
self._showTrace(f"用户 {username} 第 {attempt + 1} 次登录成功 !")
|
||||
return True
|
||||
else:
|
||||
self._trace(
|
||||
self._showTrace(
|
||||
"登录页面加载失败 ! : "
|
||||
"用户账号或者密码错误/验证码错误, 具体以页面提示为准",
|
||||
level=40,
|
||||
level=self.TraceLevel.ERROR,
|
||||
)
|
||||
return False
|
||||
|
||||
@@ -35,7 +35,7 @@ class CheckinFlow(MsgBase):
|
||||
self._driver: WebDriver = driver
|
||||
self._shell: MainShell = shell
|
||||
|
||||
def execute(
|
||||
def _ensureCheckinButton(
|
||||
self,
|
||||
username: str,
|
||||
) -> bool:
|
||||
@@ -49,7 +49,13 @@ class CheckinFlow(MsgBase):
|
||||
self._showTrace(f"签到按钮启用失败 !", self.TraceLevel.ERROR)
|
||||
return False
|
||||
self._showTrace("签到按钮已启用")
|
||||
self._shell.clickCheckinButton()
|
||||
return True
|
||||
|
||||
def _processCheckinDialog(
|
||||
self,
|
||||
username: str,
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
with CheckinResultDialog(self._driver) as dialog:
|
||||
result_msg = dialog.getResultMessage()
|
||||
@@ -87,3 +93,13 @@ class CheckinFlow(MsgBase):
|
||||
except (TimeoutException, NoSuchElementException, ElementNotInteractableException):
|
||||
self._showTrace("签到时发生未知错误 !", self.TraceLevel.ERROR)
|
||||
return False
|
||||
|
||||
def execute(
|
||||
self,
|
||||
username: str,
|
||||
) -> bool:
|
||||
|
||||
if not self._ensureCheckinButton(username):
|
||||
return False
|
||||
self._shell.clickCheckinButton()
|
||||
return self._processCheckinDialog(username)
|
||||
|
||||
@@ -61,19 +61,23 @@ class RenewFlow(MsgBase):
|
||||
)
|
||||
return True
|
||||
|
||||
def execute(
|
||||
def _computeRenewTarget(
|
||||
self,
|
||||
username: str,
|
||||
record: dict,
|
||||
renew_info: dict,
|
||||
) -> bool:
|
||||
):
|
||||
|
||||
max_diff = renew_info.get("max_diff", 30)
|
||||
prefer_earlier = renew_info.get("prefer_early", True)
|
||||
end_time = record["time"]["end"]
|
||||
target_renew_mins = timeStrToMins(end_time) + renew_info.get("expect_duration", 2) * 60
|
||||
if not self._validateRenewTime(end_time, target_renew_mins):
|
||||
return False
|
||||
return None
|
||||
return target_renew_mins
|
||||
|
||||
def _ensureExtendButton(
|
||||
self,
|
||||
username: str,
|
||||
) -> bool:
|
||||
|
||||
if not self._shell.waitExtendButton():
|
||||
self._showTrace(f"用户 {username} 续约界面加载失败 !", self.TraceLevel.ERROR)
|
||||
return False
|
||||
@@ -83,7 +87,17 @@ class RenewFlow(MsgBase):
|
||||
f"请连接图书馆网络后重试"
|
||||
)
|
||||
return False
|
||||
self._shell.clickExtendButton()
|
||||
return True
|
||||
|
||||
def _processRenewDialog(
|
||||
self,
|
||||
username: str,
|
||||
record: dict,
|
||||
target_renew_mins: int,
|
||||
max_diff: int,
|
||||
prefer_earlier: bool,
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
with RenewDialog(self._driver) as dialog:
|
||||
if not dialog.waitUntilReady():
|
||||
@@ -132,3 +146,22 @@ class RenewFlow(MsgBase):
|
||||
self._showTrace(f"用户 {username} 续约失败 ! : {e}", self.TraceLevel.ERROR)
|
||||
self._shell.refresh()
|
||||
return False
|
||||
|
||||
def execute(
|
||||
self,
|
||||
username: str,
|
||||
record: dict,
|
||||
renew_info: dict,
|
||||
) -> bool:
|
||||
|
||||
max_diff = renew_info.get("max_diff", 30)
|
||||
prefer_earlier = renew_info.get("prefer_early", True)
|
||||
target_renew_mins = self._computeRenewTarget(record, renew_info)
|
||||
if target_renew_mins is None:
|
||||
return False
|
||||
if not self._ensureExtendButton(username):
|
||||
return False
|
||||
self._shell.clickExtendButton()
|
||||
return self._processRenewDialog(
|
||||
username, record, target_renew_mins, max_diff, prefer_earlier,
|
||||
)
|
||||
|
||||
+100
-32
@@ -58,59 +58,70 @@ class ReserveFlow(MsgBase):
|
||||
self._driver: WebDriver = driver
|
||||
self._shell: MainShell = shell
|
||||
|
||||
def execute(
|
||||
def _loadReserveView(
|
||||
self,
|
||||
) -> ReserveView | None:
|
||||
|
||||
try:
|
||||
return self._shell.gotoReserveView()
|
||||
except (TimeoutException, ElementNotInteractableException) as e:
|
||||
self._showTrace(f"加载预约选座页面失败 ! : {e}", self.TraceLevel.ERROR)
|
||||
return None
|
||||
|
||||
def _selectDate(
|
||||
self,
|
||||
view: ReserveView,
|
||||
ctx: ReserveContext,
|
||||
) -> bool:
|
||||
|
||||
submit_reserve = False
|
||||
reserve_success = False
|
||||
have_hover_on_page = False
|
||||
|
||||
try:
|
||||
view = self._shell.gotoReserveView()
|
||||
except (TimeoutException, ElementNotInteractableException) as e:
|
||||
self._showTrace(f"加载预约选座页面失败 ! : {e}", self.TraceLevel.ERROR)
|
||||
return False
|
||||
if not view.selectDate(ctx.date):
|
||||
self._showTrace(f"选择日期失败 ! : {ctx.date} 不可用", self.TraceLevel.ERROR)
|
||||
return False
|
||||
self._showTrace(f"日期 {ctx.date} 选择成功 !")
|
||||
return True
|
||||
|
||||
def _selectPlace(
|
||||
self,
|
||||
view: ReserveView,
|
||||
) -> bool:
|
||||
|
||||
if not view.selectPlace("1"):
|
||||
self._showTrace("选择预约场所失败 ! : 图书馆 不可用", self.TraceLevel.ERROR)
|
||||
return False
|
||||
self._showTrace("预约场所 图书馆 选择成功 !")
|
||||
return True
|
||||
|
||||
def _selectFloor(
|
||||
self,
|
||||
view: ReserveView,
|
||||
ctx: ReserveContext,
|
||||
) -> bool:
|
||||
|
||||
if not view.selectFloor(ctx.floor):
|
||||
display_floor = ReserveView.FLOOR_MAP.get(ctx.floor, ctx.floor)
|
||||
self._showTrace(f"选择楼层失败 ! : {display_floor} 不可用", self.TraceLevel.ERROR)
|
||||
return False
|
||||
self._showTrace(f"楼层 {ReserveView.FLOOR_MAP.get(ctx.floor)} 选择成功 !")
|
||||
return True
|
||||
|
||||
def _selectRoom(
|
||||
self,
|
||||
view: ReserveView,
|
||||
ctx: ReserveContext,
|
||||
):
|
||||
|
||||
seat_map = view.selectRoom(ctx.room)
|
||||
if seat_map is None:
|
||||
display_room = ReserveView.ROOM_MAP.get(ctx.room, ctx.room)
|
||||
self._showTrace(f"选择房间失败 ! : {display_room} 不可用", self.TraceLevel.ERROR)
|
||||
return False
|
||||
return None
|
||||
self._showTrace(f"房间 {ReserveView.ROOM_MAP.get(ctx.room)} 选择成功 !")
|
||||
have_hover_on_page = True
|
||||
seat_status = seat_map.selectSeat(ctx.seat_id)
|
||||
if seat_status is None:
|
||||
self._showTrace(
|
||||
f"座位 {ctx.seat_id} 在该楼层区域中不存在, 请检查座位号是否正确",
|
||||
self.TraceLevel.WARNING,
|
||||
)
|
||||
else:
|
||||
self._showTrace(f"座位 {ctx.seat_id} 选择成功 ! : 当前状态 - '{seat_status}'")
|
||||
try:
|
||||
time_dialog = TimeSelectDialog(self._driver, tracer=self._showTrace)
|
||||
except TimeoutException:
|
||||
self._showTrace("时间选择面板未出现 !", self.TraceLevel.ERROR)
|
||||
else:
|
||||
if not time_dialog.selectSeatTime(ctx):
|
||||
self._showTrace("选择时间失败 !", self.TraceLevel.ERROR)
|
||||
else:
|
||||
try:
|
||||
view.submitReserve()
|
||||
submit_reserve = True
|
||||
return seat_map
|
||||
|
||||
def _processReserveResult(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
with ReserveResultDialog(self._driver) as result:
|
||||
if result.isFailure():
|
||||
self._showTrace("预约失败", self.TraceLevel.ERROR)
|
||||
@@ -131,11 +142,68 @@ class ReserveFlow(MsgBase):
|
||||
" 预约成功 !\n"
|
||||
" 未找获取到详细信息"
|
||||
)
|
||||
reserve_success = True
|
||||
return True
|
||||
else:
|
||||
self._showTrace("预约结果加载失败 !", self.TraceLevel.ERROR)
|
||||
return False
|
||||
|
||||
def _selectSeatAndSubmit(
|
||||
self,
|
||||
view: ReserveView,
|
||||
seat_map,
|
||||
ctx: ReserveContext,
|
||||
) -> tuple[bool, bool]:
|
||||
|
||||
submit_reserve = False
|
||||
reserve_success = False
|
||||
|
||||
seat_status = seat_map.selectSeat(ctx.seat_id)
|
||||
if seat_status is None:
|
||||
self._showTrace(
|
||||
f"座位 {ctx.seat_id} 在该楼层区域中不存在, 请检查座位号是否正确",
|
||||
self.TraceLevel.WARNING,
|
||||
)
|
||||
else:
|
||||
self._showTrace(f"座位 {ctx.seat_id} 选择成功 ! : 当前状态 - '{seat_status}'")
|
||||
try:
|
||||
time_dialog = TimeSelectDialog(self._driver, tracer=self._showTrace)
|
||||
except TimeoutException:
|
||||
self._showTrace("时间选择面板未出现 !", self.TraceLevel.ERROR)
|
||||
else:
|
||||
if not time_dialog.selectSeatTime(ctx):
|
||||
self._showTrace("选择时间失败 !", self.TraceLevel.ERROR)
|
||||
else:
|
||||
try:
|
||||
view.submitReserve()
|
||||
submit_reserve = True
|
||||
reserve_success = self._processReserveResult()
|
||||
except (TimeoutException, ElementNotInteractableException):
|
||||
self._showTrace("预约提交失败 !", self.TraceLevel.ERROR)
|
||||
return submit_reserve, reserve_success
|
||||
|
||||
def execute(
|
||||
self,
|
||||
ctx: ReserveContext,
|
||||
) -> bool:
|
||||
|
||||
# reserve flow pipeline:
|
||||
# date > place > floor > room > seat (begin/end time) > submit > result
|
||||
view = self._loadReserveView()
|
||||
if view is None:
|
||||
return False
|
||||
if not self._selectDate(view, ctx):
|
||||
return False
|
||||
if not self._selectPlace(view):
|
||||
return False
|
||||
if not self._selectFloor(view, ctx):
|
||||
return False
|
||||
seat_map = self._selectRoom(view, ctx)
|
||||
if seat_map is None:
|
||||
return False
|
||||
have_hover_on_page = True
|
||||
submit_reserve, reserve_success = self._selectSeatAndSubmit(
|
||||
view, seat_map, ctx,
|
||||
)
|
||||
if not submit_reserve and have_hover_on_page:
|
||||
view.refresh()
|
||||
if reserve_success:
|
||||
|
||||
Reference in New Issue
Block a user