mirror of
https://github.com/KenanZhu/AutoLibrary.git
synced 2026-06-17 23:13:03 +08:00
refactor(pages): 拆分 _dialogs 为独立组件文件,解耦 Service 构造函数,消除 PageObject 重复逻辑
- 将 _dialogs.py 拆分为 pages/components/ 下的独立文件,Overlay 基类同步移入 - CaptchaHandler / RecordChecker 构造函数不再持有 PageObject,改为方法参数注入 - LoginPage.login() 直接接收 auto_captcha 参数,简化 captcha_solver 调用链 - SeatMapOverlay.selectSeat 引入两层查找:先按 ID 直查,失败后遍历匹配 - 移除 ReserveView 中与 Dialog/Overlay 重复的方法(selectSeat、getAvailableTimeOptions) - AutoLibPages 拆分 __initPagesServices / __initPagesFlows - 修复 RecordsView.MORE_BTN 选择器被错误 snake_case 化(more_btn → moreBtn) Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
@@ -9,7 +9,6 @@ See the LICENSE file for details.
|
||||
"""
|
||||
import os
|
||||
import queue
|
||||
|
||||
from selenium import webdriver
|
||||
from selenium.common.exceptions import (
|
||||
TimeoutException,
|
||||
@@ -193,12 +192,10 @@ class AutoLibPages(MsgBase):
|
||||
self.__captcha_handler = CaptchaHandler(
|
||||
input_queue=self._input_queue,
|
||||
output_queue=self._output_queue,
|
||||
login_page=self.__login_page,
|
||||
)
|
||||
self.__record_checker = RecordChecker(
|
||||
input_queue=self._input_queue,
|
||||
output_queue=self._output_queue,
|
||||
shell=self.__shell,
|
||||
)
|
||||
self.__reserve_validator = ReserveValidator(
|
||||
input_queue=self._input_queue,
|
||||
@@ -245,7 +242,8 @@ class AutoLibPages(MsgBase):
|
||||
if not self.__login_page.login(
|
||||
username,
|
||||
password,
|
||||
captcha_solver=lambda: self.__captcha_handler.solveCaptcha(auto_captcha),
|
||||
captcha_solver=self.__captcha_handler.solveCaptcha,
|
||||
auto_captcha=auto_captcha,
|
||||
tracer=self._showTrace,
|
||||
log_level=self.TraceLevel,
|
||||
max_attempts=login_config.get("max_attempt", 3),
|
||||
@@ -259,7 +257,7 @@ class AutoLibPages(MsgBase):
|
||||
}
|
||||
# reserve
|
||||
if run_mode["auto_reserve"]:
|
||||
if self.__record_checker.canReserve(reserve_info.get("date")):
|
||||
if self.__record_checker.canReserve(self.__shell, reserve_info.get("date")):
|
||||
if self.__reserve_validator.validate(reserve_info):
|
||||
ctx = ReserveContext(
|
||||
username=username,
|
||||
@@ -289,7 +287,7 @@ class AutoLibPages(MsgBase):
|
||||
# checkin
|
||||
last_result: int = result
|
||||
if run_mode["auto_checkin"] and last_result != 1:
|
||||
if self.__record_checker.canCheckin():
|
||||
if self.__record_checker.canCheckin(self.__shell):
|
||||
if self.__checkin_flow.execute(username):
|
||||
result = 0
|
||||
else:
|
||||
@@ -303,11 +301,11 @@ class AutoLibPages(MsgBase):
|
||||
# renewal
|
||||
last_result = result
|
||||
if run_mode["auto_renewal"] and last_result != 1:
|
||||
can_renew, record = self.__record_checker.canRenew()
|
||||
can_renew, record = self.__record_checker.canRenew(self.__shell)
|
||||
if can_renew:
|
||||
renew_info: dict = reserve_info.get("renew_time", {})
|
||||
if self.__renew_flow.execute(username, record, renew_info):
|
||||
if self.__record_checker.postRenewCheck(record):
|
||||
if self.__record_checker.postRenewCheck(self.__shell, record):
|
||||
self._showTrace(f"用户 {username} 续约成功 !")
|
||||
result = 0
|
||||
else:
|
||||
|
||||
@@ -175,7 +175,8 @@ class LoginPage:
|
||||
self,
|
||||
username: str,
|
||||
password: str,
|
||||
captcha_solver: Callable[[], str],
|
||||
captcha_solver: Callable[["LoginPage", bool], str],
|
||||
auto_captcha: bool,
|
||||
tracer: Callable[..., None],
|
||||
log_level: type,
|
||||
max_attempts: int = 5,
|
||||
@@ -189,7 +190,7 @@ class LoginPage:
|
||||
)
|
||||
if not self.fillCredentials(username, password):
|
||||
continue
|
||||
captcha_text = captcha_solver()
|
||||
captcha_text = captcha_solver(self, auto_captcha)
|
||||
if not captcha_text:
|
||||
continue
|
||||
if not self.fillCaptcha(captcha_text):
|
||||
|
||||
@@ -22,7 +22,7 @@ from selenium.common.exceptions import (
|
||||
class RecordsView:
|
||||
|
||||
RECORDS_LIST = (By.CSS_SELECTOR, ".myReserveList > dl:not(#moreBlock)")
|
||||
MORE_BTN = (By.ID, "more_btn")
|
||||
MORE_BTN = (By.ID, "moreBtn")
|
||||
RECORD_TIME = (By.CSS_SELECTOR, "dt")
|
||||
RECORD_INFO = (By.CSS_SELECTOR, "a")
|
||||
|
||||
|
||||
@@ -15,12 +15,11 @@ from selenium.webdriver.support import expected_conditions as EC
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.common.exceptions import (
|
||||
ElementNotInteractableException,
|
||||
NoSuchElementException,
|
||||
StaleElementReferenceException,
|
||||
TimeoutException,
|
||||
)
|
||||
|
||||
from pages._dialogs import SeatMapOverlay, ReserveResultDialog
|
||||
from pages.components.SeatMapOverlay import SeatMapOverlay
|
||||
from pages.components.ReserveResultDialog import ReserveResultDialog
|
||||
|
||||
|
||||
class ReserveView:
|
||||
@@ -40,17 +39,8 @@ class ReserveView:
|
||||
FIND_ROOM_BTN = (By.ID, "findRoom")
|
||||
ROOM_BTN_FMT = "room_{room}"
|
||||
|
||||
SEAT_LAYOUT = (By.ID, "seatLayout")
|
||||
SEAT_ITEMS = (By.CSS_SELECTOR, "li[id^='seat_']")
|
||||
RESERVE_BTN = (By.ID, "reserveBtn")
|
||||
|
||||
START_TIME_OPTS = (By.CSS_SELECTOR, "#startTime ul li a")
|
||||
END_TIME_OPTS = (By.CSS_SELECTOR, "#endTime ul li a")
|
||||
|
||||
RESULT_DIALOG = (By.CLASS_NAME, "layoutSeat")
|
||||
RESULT_TITLE = (By.CSS_SELECTOR, ".layoutSeat dt")
|
||||
RESULT_DETAIL = (By.CSS_SELECTOR, ".layoutSeat dd")
|
||||
|
||||
FLOOR_MAP = {"2": "二层", "3": "三层", "4": "四层", "5": "五层"}
|
||||
ROOM_MAP = {
|
||||
"1": "二层内环", "2": "二层西区", "3": "三层内环", "4": "三层外环",
|
||||
@@ -138,41 +128,6 @@ class ReserveView:
|
||||
|
||||
return SeatMapOverlay(self._driver)
|
||||
|
||||
def selectSeat(
|
||||
self,
|
||||
seat_id: str,
|
||||
) -> str | None:
|
||||
|
||||
try:
|
||||
WebDriverWait(self._driver, 2).until(
|
||||
EC.presence_of_element_located(self.SEAT_LAYOUT)
|
||||
)
|
||||
WebDriverWait(self._driver, 2).until(
|
||||
EC.presence_of_all_elements_located(self.SEAT_ITEMS)
|
||||
)
|
||||
except TimeoutException:
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
try:
|
||||
all_seats = self._driver.find_elements(*self.SEAT_ITEMS)
|
||||
seat_id_upper = seat_id.lstrip('0').upper()
|
||||
for seat in all_seats:
|
||||
if not seat_id_upper == seat.text.lstrip('0'):
|
||||
continue
|
||||
seat_link = seat.find_element(By.TAG_NAME, "a")
|
||||
WebDriverWait(self._driver, 2).until(
|
||||
EC.element_to_be_clickable(seat_link)
|
||||
)
|
||||
seat_link.click()
|
||||
return seat_link.get_attribute("title")
|
||||
return None
|
||||
except (NoSuchElementException, TimeoutException,
|
||||
StaleElementReferenceException, ElementNotInteractableException):
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def submitReserve(
|
||||
self,
|
||||
) -> bool:
|
||||
@@ -193,26 +148,6 @@ class ReserveView:
|
||||
|
||||
return ReserveResultDialog(self._driver)
|
||||
|
||||
def getAvailableTimeOptions(
|
||||
self,
|
||||
time_id: str,
|
||||
) -> list:
|
||||
|
||||
try:
|
||||
WebDriverWait(self._driver, 2).until(
|
||||
EC.presence_of_all_elements_located(
|
||||
(By.CSS_SELECTOR, f"#{time_id} ul li a")
|
||||
)
|
||||
)
|
||||
except TimeoutException:
|
||||
return []
|
||||
except Exception:
|
||||
return []
|
||||
return self._driver.find_elements(
|
||||
By.CSS_SELECTOR,
|
||||
f"#{time_id} ul li a",
|
||||
)
|
||||
|
||||
def refresh(
|
||||
self,
|
||||
) -> None:
|
||||
|
||||
@@ -12,13 +12,11 @@ from pages.LoginPage import LoginPage
|
||||
from pages.MainShell import MainShell
|
||||
from pages.ReserveView import ReserveView
|
||||
from pages.RecordsView import RecordsView
|
||||
from pages._dialogs import (
|
||||
SeatMapOverlay,
|
||||
TimeSelectDialog,
|
||||
ReserveResultDialog,
|
||||
CheckinResultDialog,
|
||||
RenewDialog,
|
||||
)
|
||||
from pages.components.SeatMapOverlay import SeatMapOverlay
|
||||
from pages.components.TimeSelectDialog import TimeSelectDialog
|
||||
from pages.components.ReserveResultDialog import ReserveResultDialog
|
||||
from pages.components.CheckinResultDialog import CheckinResultDialog
|
||||
from pages.components.RenewDialog import RenewDialog
|
||||
|
||||
__all__ = [
|
||||
"AutoLibPages",
|
||||
|
||||
@@ -1,302 +0,0 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2026 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
from selenium.common.exceptions import (
|
||||
ElementNotInteractableException,
|
||||
NoSuchElementException,
|
||||
StaleElementReferenceException,
|
||||
TimeoutException,
|
||||
)
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.webdriver.remote.webelement import WebElement
|
||||
from selenium.webdriver.support.ui import WebDriverWait
|
||||
from selenium.webdriver.support import expected_conditions as EC
|
||||
|
||||
from pages._overlay import Overlay
|
||||
|
||||
|
||||
class SeatMapOverlay(Overlay):
|
||||
"""
|
||||
Seat selection overlay that opens after choosing a floor and room.
|
||||
"""
|
||||
|
||||
ROOT = (By.ID, "seatLayout")
|
||||
SEAT_ITEMS = (By.CSS_SELECTOR, "li[id^='seat_']")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
driver: WebDriver,
|
||||
) -> None:
|
||||
|
||||
super().__init__(driver, self.ROOT)
|
||||
|
||||
def selectSeat(
|
||||
self,
|
||||
seat_id: str,
|
||||
) -> str | None:
|
||||
|
||||
try:
|
||||
self._waitAllPresence(self.SEAT_ITEMS)
|
||||
except (NoSuchElementException, TimeoutException):
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
try:
|
||||
all_seats = self._findAll(*self.SEAT_ITEMS)
|
||||
seat_id_upper = seat_id.lstrip('0').upper()
|
||||
for seat in all_seats:
|
||||
if not seat_id_upper == seat.text.lstrip('0'):
|
||||
continue
|
||||
seat_link = seat.find_element(By.TAG_NAME, "a")
|
||||
self._waitClickable((By.TAG_NAME, "a"))
|
||||
seat_link.click()
|
||||
return seat_link.get_attribute("title")
|
||||
return None
|
||||
except (NoSuchElementException, TimeoutException,
|
||||
ElementNotInteractableException, StaleElementReferenceException):
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
class TimeSelectDialog(Overlay):
|
||||
"""
|
||||
Time selection panel that appears after selecting a seat.
|
||||
|
||||
Contains start-time and end-time option lists.
|
||||
Does NOT auto-close — the reserve submission handles cleanup.
|
||||
"""
|
||||
|
||||
ROOT = (By.CSS_SELECTOR, "#startTime ul")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
driver: WebDriver,
|
||||
) -> None:
|
||||
|
||||
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
|
||||
|
||||
def getTimeOptions(
|
||||
self,
|
||||
time_id: str,
|
||||
) -> list[WebElement]:
|
||||
|
||||
try:
|
||||
self._waitAllPresence(
|
||||
(By.CSS_SELECTOR, f"#{time_id} ul li a")
|
||||
)
|
||||
except (NoSuchElementException, TimeoutException):
|
||||
return []
|
||||
except Exception:
|
||||
return []
|
||||
return self._findAll(
|
||||
By.CSS_SELECTOR,
|
||||
f"#{time_id} ul li a",
|
||||
)
|
||||
|
||||
|
||||
class ReserveResultDialog(Overlay):
|
||||
"""
|
||||
Reservation result dialog shown after submitting a reserve request.
|
||||
"""
|
||||
|
||||
ROOT = (By.CLASS_NAME, "layoutSeat")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
driver: WebDriver,
|
||||
) -> None:
|
||||
|
||||
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
|
||||
|
||||
def getTitle(
|
||||
self,
|
||||
) -> str:
|
||||
|
||||
try:
|
||||
return self._find(*self._titleLocator()).text
|
||||
except (NoSuchElementException, StaleElementReferenceException):
|
||||
return ""
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def isSuccess(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
title = self.getTitle()
|
||||
return any(
|
||||
kw in title
|
||||
for kw in ("预定好了", "预约成功", "操作成功")
|
||||
)
|
||||
|
||||
def isFailure(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
contents = self.getDetailTexts()
|
||||
return any(
|
||||
"预约失败" in msg or "已有1个有效预约" in msg
|
||||
for msg in contents
|
||||
)
|
||||
|
||||
def getDetailTexts(
|
||||
self,
|
||||
) -> list[str]:
|
||||
|
||||
try:
|
||||
elements = self._findAll(By.CSS_SELECTOR, ".layoutSeat dd")
|
||||
return [el.text for el in elements if el.text.strip()]
|
||||
except (NoSuchElementException, StaleElementReferenceException):
|
||||
return []
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def _titleLocator(
|
||||
self,
|
||||
) -> tuple:
|
||||
|
||||
return (By.CSS_SELECTOR, ".layoutSeat dt")
|
||||
|
||||
|
||||
class CheckinResultDialog(Overlay):
|
||||
"""
|
||||
Check-in result dialog.
|
||||
"""
|
||||
|
||||
ROOT = (By.CLASS_NAME, "ui_dialog")
|
||||
|
||||
RESULT_MSG = (By.CLASS_NAME, "resultMessage")
|
||||
OK_BTN = (By.CLASS_NAME, "btnOK")
|
||||
DETAIL_DD = (By.CSS_SELECTOR, ".resultMessage dd")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
driver: WebDriver,
|
||||
) -> None:
|
||||
|
||||
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
|
||||
|
||||
def getResultMessage(
|
||||
self,
|
||||
) -> str:
|
||||
|
||||
try:
|
||||
self._waitPresence(self.RESULT_MSG)
|
||||
el = self._find(*self.RESULT_MSG)
|
||||
return el.text
|
||||
except (TimeoutException, NoSuchElementException, StaleElementReferenceException):
|
||||
return ""
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def getDetails(
|
||||
self,
|
||||
) -> list[str]:
|
||||
|
||||
try:
|
||||
elements = self._findAll(*self.DETAIL_DD)
|
||||
return [el.text for el in elements if el.text.strip()]
|
||||
except (NoSuchElementException, StaleElementReferenceException):
|
||||
return []
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def clickOk(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
self._waitClickable(self.OK_BTN).click()
|
||||
return True
|
||||
except (NoSuchElementException, TimeoutException, ElementNotInteractableException):
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
class RenewDialog(Overlay):
|
||||
"""
|
||||
Renewal time selection dialog.
|
||||
"""
|
||||
|
||||
ROOT = (By.ID, "extendDiv")
|
||||
|
||||
MESSAGE_HEAD = (By.CSS_SELECTOR, "#extendDiv p.messageHead")
|
||||
RESULT_MSG = (By.CSS_SELECTOR, "#extendDiv div.resultMessage")
|
||||
TIME_OPTS = (By.CSS_SELECTOR, "#extendDiv .renewal_List li")
|
||||
OK_BTN = (By.CSS_SELECTOR, "#extendDiv .btnOK")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
driver: WebDriver,
|
||||
) -> None:
|
||||
|
||||
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
|
||||
|
||||
def waitUntilReady(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
self._waitVisible(self.ROOT)
|
||||
self._waitPresence(self.MESSAGE_HEAD)
|
||||
self._waitPresence(self.RESULT_MSG)
|
||||
except (NoSuchElementException, TimeoutException):
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
head_msg = self._find(*self.MESSAGE_HEAD).text.strip()
|
||||
if "警告" in head_msg:
|
||||
return False
|
||||
try:
|
||||
self._waitAllPresence(self.TIME_OPTS)
|
||||
self._waitPresence(self.OK_BTN)
|
||||
except (NoSuchElementException, TimeoutException):
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
return True
|
||||
|
||||
def getHeadMessage(
|
||||
self,
|
||||
) -> str:
|
||||
|
||||
return self._find(*self.MESSAGE_HEAD).text.strip()
|
||||
|
||||
def getResultMessage(
|
||||
self,
|
||||
) -> str:
|
||||
|
||||
return self._find(*self.RESULT_MSG).text.strip()
|
||||
|
||||
def getTimeOptions(
|
||||
self,
|
||||
) -> list[WebElement]:
|
||||
|
||||
return self._findAll(*self.TIME_OPTS)
|
||||
|
||||
def getOkButton(
|
||||
self,
|
||||
) -> WebElement:
|
||||
|
||||
return self._find(*self.OK_BTN)
|
||||
|
||||
def clickOk(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
self._find(*self.OK_BTN).click()
|
||||
return True
|
||||
except (NoSuchElementException, TimeoutException, ElementNotInteractableException):
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
@@ -0,0 +1,75 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2026 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
from selenium.common.exceptions import (
|
||||
ElementNotInteractableException,
|
||||
NoSuchElementException,
|
||||
StaleElementReferenceException,
|
||||
TimeoutException,
|
||||
)
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
|
||||
from pages.components.Overlay import Overlay
|
||||
|
||||
|
||||
class CheckinResultDialog(Overlay):
|
||||
"""
|
||||
Check-in result dialog.
|
||||
"""
|
||||
|
||||
ROOT = (By.CLASS_NAME, "ui_dialog")
|
||||
|
||||
RESULT_MSG = (By.CLASS_NAME, "resultMessage")
|
||||
OK_BTN = (By.CLASS_NAME, "btnOK")
|
||||
DETAIL_DD = (By.CSS_SELECTOR, ".resultMessage dd")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
driver: WebDriver,
|
||||
) -> None:
|
||||
|
||||
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
|
||||
|
||||
def getResultMessage(
|
||||
self,
|
||||
) -> str:
|
||||
|
||||
try:
|
||||
self._waitPresence(self.RESULT_MSG)
|
||||
el = self._find(*self.RESULT_MSG)
|
||||
return el.text
|
||||
except (TimeoutException, NoSuchElementException, StaleElementReferenceException):
|
||||
return ""
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def getDetails(
|
||||
self,
|
||||
) -> list[str]:
|
||||
|
||||
try:
|
||||
elements = self._findAll(*self.DETAIL_DD)
|
||||
return [el.text for el in elements if el.text.strip()]
|
||||
except (NoSuchElementException, StaleElementReferenceException):
|
||||
return []
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def clickOk(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
self._waitClickable(self.OK_BTN).click()
|
||||
return True
|
||||
except (NoSuchElementException, TimeoutException, ElementNotInteractableException):
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
@@ -0,0 +1,99 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2026 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
from selenium.common.exceptions import (
|
||||
ElementNotInteractableException,
|
||||
NoSuchElementException,
|
||||
TimeoutException,
|
||||
)
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.webdriver.remote.webelement import WebElement
|
||||
|
||||
from pages.components.Overlay import Overlay
|
||||
|
||||
|
||||
class RenewDialog(Overlay):
|
||||
"""
|
||||
Renewal time selection dialog.
|
||||
"""
|
||||
|
||||
ROOT = (By.ID, "extendDiv")
|
||||
|
||||
MESSAGE_HEAD = (By.CSS_SELECTOR, "#extendDiv p.messageHead")
|
||||
RESULT_MSG = (By.CSS_SELECTOR, "#extendDiv div.resultMessage")
|
||||
TIME_OPTS = (By.CSS_SELECTOR, "#extendDiv .renewal_List li")
|
||||
OK_BTN = (By.CSS_SELECTOR, "#extendDiv .btnOK")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
driver: WebDriver,
|
||||
) -> None:
|
||||
|
||||
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
|
||||
|
||||
def waitUntilReady(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
self._waitVisible(self.ROOT)
|
||||
self._waitPresence(self.MESSAGE_HEAD)
|
||||
self._waitPresence(self.RESULT_MSG)
|
||||
except (NoSuchElementException, TimeoutException):
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
head_msg = self._find(*self.MESSAGE_HEAD).text.strip()
|
||||
if "警告" in head_msg:
|
||||
return False
|
||||
try:
|
||||
self._waitAllPresence(self.TIME_OPTS)
|
||||
self._waitPresence(self.OK_BTN)
|
||||
except (NoSuchElementException, TimeoutException):
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
return True
|
||||
|
||||
def getHeadMessage(
|
||||
self,
|
||||
) -> str:
|
||||
|
||||
return self._find(*self.MESSAGE_HEAD).text.strip()
|
||||
|
||||
def getResultMessage(
|
||||
self,
|
||||
) -> str:
|
||||
|
||||
return self._find(*self.RESULT_MSG).text.strip()
|
||||
|
||||
def getTimeOptions(
|
||||
self,
|
||||
) -> list[WebElement]:
|
||||
|
||||
return self._findAll(*self.TIME_OPTS)
|
||||
|
||||
def getOkButton(
|
||||
self,
|
||||
) -> WebElement:
|
||||
|
||||
return self._find(*self.OK_BTN)
|
||||
|
||||
def clickOk(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
try:
|
||||
self._find(*self.OK_BTN).click()
|
||||
return True
|
||||
except (NoSuchElementException, TimeoutException, ElementNotInteractableException):
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
@@ -0,0 +1,81 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2026 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
from selenium.common.exceptions import (
|
||||
NoSuchElementException,
|
||||
StaleElementReferenceException,
|
||||
)
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
|
||||
from pages.components.Overlay import Overlay
|
||||
|
||||
|
||||
class ReserveResultDialog(Overlay):
|
||||
"""
|
||||
Reservation result dialog shown after submitting a reserve request.
|
||||
"""
|
||||
|
||||
ROOT = (By.CLASS_NAME, "layoutSeat")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
driver: WebDriver,
|
||||
) -> None:
|
||||
|
||||
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
|
||||
|
||||
def getTitle(
|
||||
self,
|
||||
) -> str:
|
||||
|
||||
try:
|
||||
return self._find(*self._title_locator()).text
|
||||
except (NoSuchElementException, StaleElementReferenceException):
|
||||
return ""
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def isSuccess(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
title = self.getTitle()
|
||||
return any(
|
||||
kw in title
|
||||
for kw in ("预定好了", "预约成功", "操作成功")
|
||||
)
|
||||
|
||||
def isFailure(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
contents = self.getDetailTexts()
|
||||
return any(
|
||||
"预约失败" in msg or "已有1个有效预约" in msg
|
||||
for msg in contents
|
||||
)
|
||||
|
||||
def getDetailTexts(
|
||||
self,
|
||||
) -> list[str]:
|
||||
|
||||
try:
|
||||
elements = self._findAll(By.CSS_SELECTOR, ".layoutSeat dd")
|
||||
return [el.text for el in elements if el.text.strip()]
|
||||
except (NoSuchElementException, StaleElementReferenceException):
|
||||
return []
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
def _title_locator(
|
||||
self,
|
||||
) -> tuple:
|
||||
|
||||
return (By.CSS_SELECTOR, ".layoutSeat dt")
|
||||
@@ -0,0 +1,74 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2026 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
from selenium.common.exceptions import (
|
||||
ElementNotInteractableException,
|
||||
NoSuchElementException,
|
||||
StaleElementReferenceException,
|
||||
TimeoutException,
|
||||
)
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
|
||||
from pages.components.Overlay import Overlay
|
||||
|
||||
|
||||
class SeatMapOverlay(Overlay):
|
||||
"""
|
||||
Seat selection overlay that opens after choosing a floor and room.
|
||||
"""
|
||||
|
||||
ROOT = (By.ID, "seatLayout")
|
||||
SEAT_ITEMS = (By.CSS_SELECTOR, "li[id^='seat_']")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
driver: WebDriver,
|
||||
) -> None:
|
||||
|
||||
super().__init__(driver, self.ROOT)
|
||||
|
||||
def selectSeat(
|
||||
self,
|
||||
seat_id: str,
|
||||
) -> str | None:
|
||||
|
||||
try:
|
||||
self._waitAllPresence(self.SEAT_ITEMS)
|
||||
except (NoSuchElementException, TimeoutException):
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
try:
|
||||
seat_el = self._find(By.ID, f"seat_{int(seat_id):03d}")
|
||||
seat_link = seat_el.find_element(By.TAG_NAME, "a")
|
||||
self._waitClickable((By.TAG_NAME, "a"))
|
||||
seat_link.click()
|
||||
return seat_link.get_attribute("title")
|
||||
except (NoSuchElementException, ValueError, TimeoutException,
|
||||
ElementNotInteractableException, StaleElementReferenceException):
|
||||
pass
|
||||
except Exception:
|
||||
pass
|
||||
try:
|
||||
all_seats = self._findAll(*self.SEAT_ITEMS)
|
||||
seat_id_upper = seat_id.lstrip('0').upper()
|
||||
for seat in all_seats:
|
||||
if not seat_id_upper == seat.text.lstrip('0'):
|
||||
continue
|
||||
seat_link = seat.find_element(By.TAG_NAME, "a")
|
||||
self._waitClickable((By.TAG_NAME, "a"))
|
||||
seat_link.click()
|
||||
return seat_link.get_attribute("title")
|
||||
return None
|
||||
except (NoSuchElementException, TimeoutException,
|
||||
ElementNotInteractableException, StaleElementReferenceException):
|
||||
return None
|
||||
except Exception:
|
||||
return None
|
||||
@@ -0,0 +1,54 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2026 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
from selenium.common.exceptions import (
|
||||
NoSuchElementException,
|
||||
TimeoutException,
|
||||
)
|
||||
from selenium.webdriver.common.by import By
|
||||
from selenium.webdriver.remote.webdriver import WebDriver
|
||||
from selenium.webdriver.remote.webelement import WebElement
|
||||
|
||||
from pages.components.Overlay import Overlay
|
||||
|
||||
|
||||
class TimeSelectDialog(Overlay):
|
||||
"""
|
||||
Time selection panel that appears after selecting a seat.
|
||||
|
||||
Contains start-time and end-time option lists.
|
||||
Does NOT auto-close — the reserve submission handles cleanup.
|
||||
"""
|
||||
|
||||
ROOT = (By.CSS_SELECTOR, "#startTime ul")
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
driver: WebDriver,
|
||||
) -> None:
|
||||
|
||||
super().__init__(driver, self.ROOT, auto_close_on_exit=False)
|
||||
|
||||
def getTimeOptions(
|
||||
self,
|
||||
time_id: str,
|
||||
) -> list[WebElement]:
|
||||
|
||||
try:
|
||||
self._waitAllPresence(
|
||||
(By.CSS_SELECTOR, f"#{time_id} ul li a")
|
||||
)
|
||||
except (NoSuchElementException, TimeoutException):
|
||||
return []
|
||||
except Exception:
|
||||
return []
|
||||
return self._findAll(
|
||||
By.CSS_SELECTOR,
|
||||
f"#{time_id} ul li a",
|
||||
)
|
||||
@@ -0,0 +1,22 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
Copyright (c) 2026 KenanZhu.
|
||||
All rights reserved.
|
||||
|
||||
This software is provided "as is", without any warranty of any kind.
|
||||
You may use, modify, and distribute this file under the terms of the MIT License.
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
from pages.components.SeatMapOverlay import SeatMapOverlay
|
||||
from pages.components.TimeSelectDialog import TimeSelectDialog
|
||||
from pages.components.ReserveResultDialog import ReserveResultDialog
|
||||
from pages.components.CheckinResultDialog import CheckinResultDialog
|
||||
from pages.components.RenewDialog import RenewDialog
|
||||
|
||||
__all__ = [
|
||||
"SeatMapOverlay",
|
||||
"TimeSelectDialog",
|
||||
"ReserveResultDialog",
|
||||
"CheckinResultDialog",
|
||||
"RenewDialog",
|
||||
]
|
||||
@@ -17,7 +17,7 @@ from selenium.webdriver.remote.webdriver import WebDriver
|
||||
|
||||
from base.MsgBase import MsgBase
|
||||
from pages.MainShell import MainShell
|
||||
from pages._dialogs import CheckinResultDialog
|
||||
from pages.components.CheckinResultDialog import CheckinResultDialog
|
||||
|
||||
|
||||
class CheckinFlow(MsgBase):
|
||||
|
||||
@@ -18,7 +18,7 @@ from selenium.webdriver.remote.webdriver import WebDriver
|
||||
|
||||
from base.MsgBase import MsgBase
|
||||
from pages.MainShell import MainShell
|
||||
from pages._dialogs import RenewDialog
|
||||
from pages.components.RenewDialog import RenewDialog
|
||||
from pages.flows._helpers import (
|
||||
timeStrToMins,
|
||||
minsToTimeStr,
|
||||
|
||||
@@ -26,7 +26,8 @@ from pages.flows._helpers import (
|
||||
findBestTimeOption,
|
||||
)
|
||||
from pages.ReserveView import ReserveView
|
||||
from pages._dialogs import ReserveResultDialog
|
||||
from pages.components.ReserveResultDialog import ReserveResultDialog
|
||||
from pages.components.TimeSelectDialog import TimeSelectDialog
|
||||
|
||||
|
||||
@dataclass
|
||||
@@ -82,31 +83,27 @@ class ReserveFlow(MsgBase):
|
||||
except Exception as e:
|
||||
self._showTrace(f"加载预约选座页面失败 ! : {e}", self.TraceLevel.ERROR)
|
||||
return False
|
||||
|
||||
if not view.selectDate(ctx.date):
|
||||
self._showTrace(f"选择日期失败 ! : {ctx.date} 不可用", self.TraceLevel.ERROR)
|
||||
return False
|
||||
self._showTrace(f"日期 {ctx.date} 选择成功 !")
|
||||
|
||||
if not view.selectPlace("1"):
|
||||
self._showTrace("选择预约场所失败 ! : 图书馆 不可用", self.TraceLevel.ERROR)
|
||||
return False
|
||||
self._showTrace("预约场所 图书馆 选择成功 !")
|
||||
|
||||
if not view.selectFloor(ctx.floor):
|
||||
display_floor = ReserveView.FLOOR_MAP.get(ctx.floor, ctx.floor)
|
||||
self._showTrace(f"选择楼层失败 ! : {display_floor} 不可用", self.TraceLevel.ERROR)
|
||||
return False
|
||||
self._showTrace(f"楼层 {ReserveView.FLOOR_MAP.get(ctx.floor)} 选择成功 !")
|
||||
|
||||
if not view.selectRoom(ctx.room):
|
||||
display_room = ReserveView.ROOM_MAP.get(ctx.room, ctx.room)
|
||||
self._showTrace(f"选择房间失败 ! : {display_room} 不可用", self.TraceLevel.ERROR)
|
||||
return False
|
||||
self._showTrace(f"房间 {ReserveView.ROOM_MAP.get(ctx.room)} 选择成功 !")
|
||||
have_hover_on_page = True
|
||||
|
||||
seat_status = view.selectSeat(ctx.seat_id)
|
||||
seat_map = view.openSeatMap()
|
||||
seat_status = seat_map.selectSeat(ctx.seat_id)
|
||||
if seat_status is None:
|
||||
self._showTrace(
|
||||
f"座位 {ctx.seat_id} 在该楼层区域中不存在, 请检查座位号是否正确",
|
||||
@@ -114,8 +111,8 @@ class ReserveFlow(MsgBase):
|
||||
)
|
||||
else:
|
||||
self._showTrace(f"座位 {ctx.seat_id} 选择成功 ! : 当前状态 - '{seat_status}'")
|
||||
|
||||
select_time_ok = self._selectSeatTime(view)
|
||||
time_dialog = TimeSelectDialog(self._driver)
|
||||
select_time_ok = self._selectSeatTime(time_dialog)
|
||||
if not select_time_ok:
|
||||
self._showTrace("选择时间失败 !", self.TraceLevel.ERROR)
|
||||
else:
|
||||
@@ -149,7 +146,6 @@ class ReserveFlow(MsgBase):
|
||||
self._showTrace("预约提交失败 !", self.TraceLevel.ERROR)
|
||||
except Exception:
|
||||
self._showTrace("预约提交失败 !", self.TraceLevel.ERROR)
|
||||
|
||||
if not submit_reserve and have_hover_on_page:
|
||||
view.refresh()
|
||||
if reserve_success:
|
||||
@@ -160,7 +156,7 @@ class ReserveFlow(MsgBase):
|
||||
|
||||
def _selectSeatTime(
|
||||
self,
|
||||
view: ReserveView,
|
||||
time_dialog: TimeSelectDialog,
|
||||
) -> bool:
|
||||
|
||||
ctx = self._ctx
|
||||
@@ -172,9 +168,8 @@ class ReserveFlow(MsgBase):
|
||||
act_beg_tm_str = exp_beg_tm_str
|
||||
act_end_mins = exp_end_mins
|
||||
act_end_tm_str = exp_end_tm_str
|
||||
|
||||
act_beg_mins = self._selectNearestTime(
|
||||
view,
|
||||
time_dialog,
|
||||
time_id="startTime",
|
||||
time_type="开始时间",
|
||||
target_time=exp_beg_mins,
|
||||
@@ -184,7 +179,6 @@ class ReserveFlow(MsgBase):
|
||||
if act_beg_mins == -1:
|
||||
return False
|
||||
act_beg_tm_str = minsToTimeStr(act_beg_mins)
|
||||
|
||||
if ctx.satisfy_duration:
|
||||
exp_end_mins = self._calcEndTime(act_beg_mins, ctx.expect_duration)
|
||||
exp_end_tm_str = minsToTimeStr(exp_end_mins)
|
||||
@@ -192,10 +186,9 @@ class ReserveFlow(MsgBase):
|
||||
f"需要满足期望预约持续时间: {ctx.expect_duration} 小时, "
|
||||
f"根据开始时间 {act_beg_tm_str} 计算结束时间: {exp_end_tm_str}"
|
||||
)
|
||||
|
||||
act_end_mins = self._selectNearestTime(
|
||||
view,
|
||||
time_id="end_time",
|
||||
time_dialog,
|
||||
time_id="endTime",
|
||||
time_type="结束时间",
|
||||
target_time=exp_end_mins,
|
||||
max_time_diff=ctx.end_max_diff,
|
||||
@@ -204,7 +197,6 @@ class ReserveFlow(MsgBase):
|
||||
if act_end_mins == -1:
|
||||
return False
|
||||
act_end_tm_str = minsToTimeStr(act_end_mins)
|
||||
|
||||
self._showTrace(
|
||||
f"期望预约时间段: {exp_beg_tm_str} - {exp_end_tm_str}, "
|
||||
f"实际预约时间段: {act_beg_tm_str} - {act_end_tm_str}"
|
||||
@@ -213,7 +205,7 @@ class ReserveFlow(MsgBase):
|
||||
|
||||
def _selectNearestTime(
|
||||
self,
|
||||
view: ReserveView,
|
||||
time_dialog: TimeSelectDialog,
|
||||
time_id: str,
|
||||
time_type: str,
|
||||
target_time: int,
|
||||
@@ -221,13 +213,12 @@ class ReserveFlow(MsgBase):
|
||||
prefer_earlier: bool,
|
||||
) -> int:
|
||||
|
||||
all_time_opts = view.getAvailableTimeOptions(time_id)
|
||||
all_time_opts = time_dialog.getTimeOptions(time_id)
|
||||
if not all_time_opts:
|
||||
self._showTrace(
|
||||
f"{time_type} 选择失败 ! : 当前未查询到可用时间", self.TraceLevel.ERROR
|
||||
)
|
||||
return -1
|
||||
|
||||
best_opt, best_text, actual_diff, free_times = findBestTimeOption(
|
||||
all_time_opts, target_time, max_time_diff, prefer_earlier, is_reserve=True
|
||||
)
|
||||
@@ -245,7 +236,6 @@ class ReserveFlow(MsgBase):
|
||||
f"与期望 {time_type} 相比 {relation}"
|
||||
)
|
||||
return target_time + actual_diff
|
||||
|
||||
target_time_str = minsToTimeStr(target_time)
|
||||
self._showTrace(
|
||||
f"无法选择最近的 {time_type} {target_time_str}, "
|
||||
|
||||
@@ -26,42 +26,18 @@ class CaptchaHandler(MsgBase):
|
||||
self,
|
||||
input_queue: queue.Queue,
|
||||
output_queue: queue.Queue,
|
||||
login_page: LoginPage,
|
||||
) -> None:
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
self._login_page = login_page
|
||||
self._ocr = ddddocr.DdddOcr()
|
||||
|
||||
def solveCaptcha(
|
||||
self,
|
||||
auto_captcha: bool = True,
|
||||
) -> str:
|
||||
|
||||
max_attempts = 3
|
||||
for _ in range(max_attempts):
|
||||
if auto_captcha:
|
||||
captcha_text = self._autoRecognize()
|
||||
else:
|
||||
self._showTrace("用户未配置自动识别验证码, 请手动输入验证码 !", 20, no_log=True)
|
||||
captcha_text = self._manualRecognize()
|
||||
if captcha_text:
|
||||
return captcha_text
|
||||
else:
|
||||
if not self._login_page.refreshCaptcha():
|
||||
return ""
|
||||
self._showTrace(
|
||||
f"验证码识别失败 {max_attempts} 次, 达到最大尝试次数 !",
|
||||
self.TraceLevel.WARNING,
|
||||
)
|
||||
return ""
|
||||
|
||||
def _autoRecognize(
|
||||
self,
|
||||
login_page: LoginPage,
|
||||
) -> str:
|
||||
|
||||
try:
|
||||
img_src = self._login_page.getCaptchaImageSrc()
|
||||
img_src = login_page.getCaptchaImageSrc()
|
||||
base64_str = img_src.split(',', 1)[1]
|
||||
captcha_img = base64.b64decode(base64_str)
|
||||
captcha_text = self._ocr.classification(captcha_img)
|
||||
@@ -99,3 +75,27 @@ class CaptchaHandler(MsgBase):
|
||||
except Exception as e:
|
||||
self._showTrace(f"输入验证码失败 ! : {e}", self.TraceLevel.ERROR)
|
||||
return ""
|
||||
|
||||
def solveCaptcha(
|
||||
self,
|
||||
login_page: LoginPage,
|
||||
auto_captcha: bool = True,
|
||||
) -> str:
|
||||
|
||||
max_attempts = 3
|
||||
for _ in range(max_attempts):
|
||||
if auto_captcha:
|
||||
captcha_text = self._autoRecognize(login_page)
|
||||
else:
|
||||
self._showTrace("用户未配置自动识别验证码, 请手动输入验证码 !", 20, no_log=True)
|
||||
captcha_text = self._manualRecognize()
|
||||
if captcha_text:
|
||||
return captcha_text
|
||||
else:
|
||||
if not login_page.refreshCaptcha():
|
||||
return ""
|
||||
self._showTrace(
|
||||
f"验证码识别失败 {max_attempts} 次, 达到最大尝试次数 !",
|
||||
self.TraceLevel.WARNING,
|
||||
)
|
||||
return ""
|
||||
|
||||
+117
-114
@@ -29,11 +29,9 @@ class RecordChecker(MsgBase):
|
||||
self,
|
||||
input_queue: queue.Queue,
|
||||
output_queue: queue.Queue,
|
||||
shell: MainShell,
|
||||
) -> None:
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
self._shell = shell
|
||||
|
||||
@staticmethod
|
||||
def _formatDiffTime(
|
||||
@@ -45,119 +43,9 @@ class RecordChecker(MsgBase):
|
||||
seconds = int(seconds % 60)
|
||||
return f"{hours} 时 {minutes} 分 {seconds} 秒"
|
||||
|
||||
def canReserve(
|
||||
self,
|
||||
date: str,
|
||||
) -> bool:
|
||||
|
||||
if self._getReserveRecord(date, "已预约") is None:
|
||||
if self._getReserveRecord(date, "使用中") is None:
|
||||
self._showTrace(f"用户在 {date} 可以预约")
|
||||
return True
|
||||
self._showTrace(f"用户在 {date} 有使用中的预约, 无法预约")
|
||||
return False
|
||||
self._showTrace(f"用户在 {date} 已存在有效预约, 无法预约")
|
||||
return False
|
||||
|
||||
def canCheckin(
|
||||
self,
|
||||
) -> bool:
|
||||
|
||||
date = time.strftime("%Y-%m-%d", time.localtime())
|
||||
record = self._getReserveRecord(date, "已预约")
|
||||
if record is not None:
|
||||
begin_time = record["time"]["begin"]
|
||||
begin_time = datetime.strptime(
|
||||
f"{date} {begin_time}", "%Y-%m-%d %H:%M"
|
||||
)
|
||||
time_diff = datetime.now() - begin_time
|
||||
time_diff_seconds = time_diff.total_seconds()
|
||||
if time_diff_seconds < -30 * 60:
|
||||
self._showTrace(
|
||||
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
||||
f"当前距离预约开始时间还有 "
|
||||
f"{self._formatDiffTime(abs(time_diff_seconds))}, 无法签到"
|
||||
)
|
||||
return False
|
||||
elif -30 * 60 <= time_diff_seconds < 0:
|
||||
self._showTrace(
|
||||
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
||||
f"当前距离预约开始时间还有 "
|
||||
f"{self._formatDiffTime(abs(time_diff_seconds))}, 可以签到"
|
||||
)
|
||||
return True
|
||||
elif 0 <= time_diff_seconds < 30 * 60 - 5:
|
||||
self._showTrace(
|
||||
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
||||
f"当前距离预约开始时间已经过去 "
|
||||
f"{self._formatDiffTime(abs(time_diff_seconds))}, 可以签到"
|
||||
)
|
||||
return True
|
||||
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法签到")
|
||||
return False
|
||||
|
||||
def canRenew(
|
||||
self,
|
||||
) -> tuple[bool, dict]:
|
||||
|
||||
date = time.strftime("%Y-%m-%d", time.localtime())
|
||||
record = self._getReserveRecord(date, "使用中")
|
||||
if record is not None:
|
||||
end_time = record["time"]["end"]
|
||||
end_time = datetime.strptime(
|
||||
f"{date} {end_time}", "%Y-%m-%d %H:%M"
|
||||
)
|
||||
time_diff = end_time - datetime.now()
|
||||
time_diff_seconds = time_diff.total_seconds()
|
||||
trace_msg = (
|
||||
f"用户在 {date} 的预约结束时间为 {end_time}, "
|
||||
f"当前距离预约结束时间还有 "
|
||||
f"{self._formatDiffTime(abs(time_diff_seconds))}"
|
||||
)
|
||||
if abs(time_diff_seconds) < 120 * 60:
|
||||
self._showTrace(f"{trace_msg}, 可以续约")
|
||||
return True, record
|
||||
else:
|
||||
self._showTrace(f"{trace_msg}, 无法续约")
|
||||
return False, None
|
||||
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法续约")
|
||||
return False, None
|
||||
|
||||
def postRenewCheck(
|
||||
self,
|
||||
record: dict,
|
||||
) -> bool:
|
||||
|
||||
date = record["date"]
|
||||
act_record = self._getReserveRecord(date, "使用中")
|
||||
if act_record is not None:
|
||||
if (
|
||||
act_record["time"]["begin"] == record["time"]["begin"]
|
||||
and act_record["time"]["end"] == record["time"]["end"]
|
||||
):
|
||||
self._showTrace(
|
||||
f"\n"
|
||||
f" 续约成功 !\n"
|
||||
f" 日 期 :{date}\n"
|
||||
f" 时 间 :{act_record['time']['begin']}"
|
||||
f" - {act_record['time']['end']}\n"
|
||||
f" 位 置 :{act_record['info']['location']}\n"
|
||||
f" 状 态 :{act_record['info']['status']}"
|
||||
)
|
||||
return True
|
||||
else:
|
||||
self._showTrace(
|
||||
f"\n"
|
||||
f" 续约失败 !\n"
|
||||
f" 续约后结束时间为 {act_record['time']['end']},"
|
||||
f"与预期结束时间 {record['time']['end']} 不符 !"
|
||||
)
|
||||
return False
|
||||
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法检查续约结果")
|
||||
return False
|
||||
|
||||
def _getReserveRecord(
|
||||
self,
|
||||
shell: MainShell,
|
||||
wanted_date: str,
|
||||
wanted_status: str,
|
||||
) -> dict | None:
|
||||
@@ -173,7 +61,7 @@ class RecordChecker(MsgBase):
|
||||
checked_count = 0
|
||||
max_check_times = 6
|
||||
|
||||
records_view = self._shell.gotoRecordsView()
|
||||
records_view = shell.gotoRecordsView()
|
||||
for _ in range(max_check_times):
|
||||
reservations = records_view.loadRecords()
|
||||
if reservations is None:
|
||||
@@ -300,3 +188,118 @@ class RecordChecker(MsgBase):
|
||||
elif "图书馆" in info.text:
|
||||
location = info.text.strip()
|
||||
return {"location": location, "status": status}
|
||||
|
||||
def canReserve(
|
||||
self,
|
||||
shell: MainShell,
|
||||
date: str,
|
||||
) -> bool:
|
||||
|
||||
if self._getReserveRecord(shell, date, "已预约") is None:
|
||||
if self._getReserveRecord(shell, date, "使用中") is None:
|
||||
self._showTrace(f"用户在 {date} 可以预约")
|
||||
return True
|
||||
self._showTrace(f"用户在 {date} 有使用中的预约, 无法预约")
|
||||
return False
|
||||
self._showTrace(f"用户在 {date} 已存在有效预约, 无法预约")
|
||||
return False
|
||||
|
||||
def canCheckin(
|
||||
self,
|
||||
shell: MainShell,
|
||||
) -> bool:
|
||||
|
||||
date = time.strftime("%Y-%m-%d", time.localtime())
|
||||
record = self._getReserveRecord(shell, date, "已预约")
|
||||
if record is not None:
|
||||
begin_time = record["time"]["begin"]
|
||||
begin_time = datetime.strptime(
|
||||
f"{date} {begin_time}", "%Y-%m-%d %H:%M"
|
||||
)
|
||||
time_diff = datetime.now() - begin_time
|
||||
time_diff_seconds = time_diff.total_seconds()
|
||||
if time_diff_seconds < -30 * 60:
|
||||
self._showTrace(
|
||||
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
||||
f"当前距离预约开始时间还有 "
|
||||
f"{self._formatDiffTime(abs(time_diff_seconds))}, 无法签到"
|
||||
)
|
||||
return False
|
||||
elif -30 * 60 <= time_diff_seconds < 0:
|
||||
self._showTrace(
|
||||
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
||||
f"当前距离预约开始时间还有 "
|
||||
f"{self._formatDiffTime(abs(time_diff_seconds))}, 可以签到"
|
||||
)
|
||||
return True
|
||||
elif 0 <= time_diff_seconds < 30 * 60 - 5:
|
||||
self._showTrace(
|
||||
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
||||
f"当前距离预约开始时间已经过去 "
|
||||
f"{self._formatDiffTime(abs(time_diff_seconds))}, 可以签到"
|
||||
)
|
||||
return True
|
||||
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法签到")
|
||||
return False
|
||||
|
||||
def canRenew(
|
||||
self,
|
||||
shell: MainShell,
|
||||
) -> tuple[bool, dict]:
|
||||
|
||||
date = time.strftime("%Y-%m-%d", time.localtime())
|
||||
record = self._getReserveRecord(shell, date, "使用中")
|
||||
if record is not None:
|
||||
end_time = record["time"]["end"]
|
||||
end_time = datetime.strptime(
|
||||
f"{date} {end_time}", "%Y-%m-%d %H:%M"
|
||||
)
|
||||
time_diff = end_time - datetime.now()
|
||||
time_diff_seconds = time_diff.total_seconds()
|
||||
trace_msg = (
|
||||
f"用户在 {date} 的预约结束时间为 {end_time}, "
|
||||
f"当前距离预约结束时间还有 "
|
||||
f"{self._formatDiffTime(abs(time_diff_seconds))}"
|
||||
)
|
||||
if abs(time_diff_seconds) < 120 * 60:
|
||||
self._showTrace(f"{trace_msg}, 可以续约")
|
||||
return True, record
|
||||
else:
|
||||
self._showTrace(f"{trace_msg}, 无法续约")
|
||||
return False, None
|
||||
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法续约")
|
||||
return False, None
|
||||
|
||||
def postRenewCheck(
|
||||
self,
|
||||
shell: MainShell,
|
||||
record: dict,
|
||||
) -> bool:
|
||||
|
||||
date = record["date"]
|
||||
act_record = self._getReserveRecord(shell, date, "使用中")
|
||||
if act_record is not None:
|
||||
if (
|
||||
act_record["time"]["begin"] == record["time"]["begin"]
|
||||
and act_record["time"]["end"] == record["time"]["end"]
|
||||
):
|
||||
self._showTrace(
|
||||
f"\n"
|
||||
f" 续约成功 !\n"
|
||||
f" 日 期 :{date}\n"
|
||||
f" 时 间 :{act_record['time']['begin']}"
|
||||
f" - {act_record['time']['end']}\n"
|
||||
f" 位 置 :{act_record['info']['location']}\n"
|
||||
f" 状 态 :{act_record['info']['status']}"
|
||||
)
|
||||
return True
|
||||
else:
|
||||
self._showTrace(
|
||||
f"\n"
|
||||
f" 续约失败 !\n"
|
||||
f" 续约后结束时间为 {act_record['time']['end']},"
|
||||
f"与预期结束时间 {record['time']['end']} 不符 !"
|
||||
)
|
||||
return False
|
||||
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法检查续约结果")
|
||||
return False
|
||||
|
||||
@@ -25,35 +25,6 @@ class ReserveValidator(MsgBase):
|
||||
|
||||
super().__init__(input_queue, output_queue)
|
||||
|
||||
def validate(
|
||||
self,
|
||||
reserve_info: dict,
|
||||
) -> bool:
|
||||
|
||||
if not self._containRequiredInfo(reserve_info):
|
||||
return False
|
||||
if not self._isValidDate(reserve_info):
|
||||
return False
|
||||
if not self._isValidBeginTime(reserve_info):
|
||||
return False
|
||||
if not self._isValidExpectDuration(reserve_info):
|
||||
return False
|
||||
if not self._isValidEndTime(reserve_info):
|
||||
return False
|
||||
if not self._finalCheck(reserve_info):
|
||||
return False
|
||||
self._showTrace(
|
||||
f"预约信息检查完成, 准备预约 "
|
||||
f"{reserve_info['date']} "
|
||||
f"{reserve_info['begin_time']['time']} - "
|
||||
f"{reserve_info['end_time']['time']} "
|
||||
f"图书馆 "
|
||||
f"{ReserveView.FLOOR_MAP[reserve_info['floor']]} "
|
||||
f"{ReserveView.ROOM_MAP[reserve_info['room']]} "
|
||||
f"的座位 {reserve_info['seat_id']}"
|
||||
)
|
||||
return True
|
||||
|
||||
def _containRequiredInfo(
|
||||
self,
|
||||
reserve_info: dict,
|
||||
@@ -219,3 +190,32 @@ class ReserveValidator(MsgBase):
|
||||
)
|
||||
reserve_info["end_time"]["time"] = minsToTimeStr(begin_mins + 8 * 60)
|
||||
return True
|
||||
|
||||
def validate(
|
||||
self,
|
||||
reserve_info: dict,
|
||||
) -> bool:
|
||||
|
||||
if not self._containRequiredInfo(reserve_info):
|
||||
return False
|
||||
if not self._isValidDate(reserve_info):
|
||||
return False
|
||||
if not self._isValidBeginTime(reserve_info):
|
||||
return False
|
||||
if not self._isValidExpectDuration(reserve_info):
|
||||
return False
|
||||
if not self._isValidEndTime(reserve_info):
|
||||
return False
|
||||
if not self._finalCheck(reserve_info):
|
||||
return False
|
||||
self._showTrace(
|
||||
f"预约信息检查完成, 准备预约 "
|
||||
f"{reserve_info['date']} "
|
||||
f"{reserve_info['begin_time']['time']} - "
|
||||
f"{reserve_info['end_time']['time']} "
|
||||
f"图书馆 "
|
||||
f"{ReserveView.FLOOR_MAP[reserve_info['floor']]} "
|
||||
f"{ReserveView.ROOM_MAP[reserve_info['room']]} "
|
||||
f"的座位 {reserve_info['seat_id']}"
|
||||
)
|
||||
return True
|
||||
Reference in New Issue
Block a user