# -*- coding: utf-8 -*- """ Copyright (c) 2025 KenanZhu. All rights reserved. This software is provided "as is", without any warranty of any kind. You may use, modify, and distribute this file under the terms of the MIT License. See the LICENSE file for details. """ import re import time import queue from datetime import datetime, timedelta from selenium.webdriver.common.by import By from selenium.webdriver.support.ui import WebDriverWait from selenium.webdriver.support import expected_conditions as EC from LibOperator import LibOperator class LibChecker(LibOperator): def __init__( self, input_queue: queue.Queue, output_queue: queue.Queue, driver ): super().__init__(input_queue, output_queue) self.__driver = driver def _waitResponseLoad( self ) -> bool: pass @staticmethod def __formatDiffTime( seconds: float ) -> str: hours = int(seconds//3600) minutes = int(seconds%3600//60) seconds = int(seconds%60) return f"{hours} 时 {minutes} 分 {seconds} 秒" def __navigateToReserveRecordPage( self ) -> bool: try: WebDriverWait(self.__driver, 2).until( EC.element_to_be_clickable((By.XPATH, "//a[@href='/history?type=SEAT']")) ).click() WebDriverWait(self.__driver, 2).until( EC.presence_of_element_located((By.CLASS_NAME, "myReserveList")) ) except: self._showTrace("加载预约记录页面失败 !") return False return True def __decodeReserveTime( self, time_element ) -> dict: time_str = time_element.text.strip() today = datetime.now().date() if "明天" in time_str: target_date = today + timedelta(days=1) date = target_date.strftime("%Y-%m-%d") elif "今天" in time_str: target_date = today date = target_date.strftime("%Y-%m-%d") elif "昨天" in time_str: target_date = today - timedelta(days=1) date = target_date.strftime("%Y-%m-%d") else: date_match = re.search(r"(\d{4}-\d{1,2}-\d{1,2})", time_str) if date_match: date = date_match.group(1) else: date = "" time_match = re.search(r"(\d{1,2}:\d{2}) -- (\d{1,2}:\d{2})", time_str) if time_match: begin_time = time_match.group(1) end_time = time_match.group(2) else: begin_time = "" end_time = "" return { "date": date, "time": { "begin": begin_time, "end": end_time } } def __decodeReserveInfo( self, info_elements ) -> str: location = "" status = "" for info in info_elements: if "已预约" in info.text: status = "已预约" elif "使用中" in info.text: status = "使用中" elif "已完成" in info.text: status = "已完成" elif "已结束使用" in info.text: status = "已结束使用" elif "已取消" in info.text: status = "已取消" elif "失约" in info.text: status = "失约" elif "图书馆" in info.text: location = info.text.strip() return { "location": location, "status": status, } def __decodeReserveRecord( self, reservation ) -> dict: try: time_element = reservation.find_element( By.CSS_SELECTOR, "dt" ) info_elements = reservation.find_elements( By.CSS_SELECTOR, "a" ) except: return { "date": "", "time": {"begin": "", "end": ""}, "info": {"location": "", "status": ""} } time = self.__decodeReserveTime(time_element) info = self.__decodeReserveInfo(info_elements) return { "date": time["date"], "time": time["time"], "info": info } def __loadReserveRecords( self ) -> list: try: # check if there's any reservation on the date WebDriverWait(self.__driver, 2).until( EC.presence_of_element_located((By.CSS_SELECTOR, ".myReserveList > dl")) ) reservations = self.__driver.find_elements( By.CSS_SELECTOR, ".myReserveList > dl:not(#moreBlock)" ) return reservations except: self._showTrace("加载预约记录失败 !") return None def __showMoreReserveRecords( self ) -> bool: # load new reservations if still not sure try: WebDriverWait(self.__driver, 0.1).until( EC.element_to_be_clickable((By.ID, "moreBtn")) ) except: # the reservation is the last one return False try: more_btn = self.__driver.find_element(By.ID, "moreBtn") if more_btn.is_displayed() and more_btn.is_enabled(): self.__driver.execute_script("arguments[0].scrollIntoView(true);", more_btn) self.__driver.execute_script("arguments[0].click();", more_btn) return True else: self._showTrace("用户无法加载更多预约记录") return False except: self._showTrace("加载更多预约记录失败 !") return False def __getReserveRecord( self, wanted_date: str, wanted_status: str ) -> dict: if wanted_date is None: self._showTrace("日期未指定, 无法检查当前预约状态") return None self._showTrace(f"正在检查用户在 {wanted_date} 是否有预约状态为 {wanted_status} 的预约记录......") checked_count = 0 max_check_times = 6 # we only check (4*(6-1)=)20 reservations, the last time cant be checked if not self.__navigateToReserveRecordPage(): return None for _ in range(max_check_times): reservations = self.__loadReserveRecords() if reservations is None: return None for reservation in reservations[checked_count:]: record = self.__decodeReserveRecord(reservation) checked_count += 1 if record is None: continue if record["date"] == "": continue if record["time"] == {"begin": "", "end": ""}: continue # record date is later than the given date, check the next one if datetime.strptime(record["date"], "%Y-%m-%d").date() >\ datetime.strptime(wanted_date, "%Y-%m-%d").date(): continue # record date is earlier than the given date, so there is no wanted record if datetime.strptime(record["date"], "%Y-%m-%d").date() <\ datetime.strptime(wanted_date, "%Y-%m-%d").date(): return None if record["info"]["status"] == wanted_status: self._showTrace( f"寻找到用户第 {checked_count} 条状态为 {wanted_status} 的预约记录, " f"详细信息: {record["date"]} " f"{record["time"]["begin"]} - {record["time"]["end"]} {record["info"]["location"]}" ) return record if not self.__showMoreReserveRecords(): break return None def canReserve( self, date: str ) -> bool: # no reserved or using record in the given date # then can reserve if self.__getReserveRecord(date, "已预约") is None: if self.__getReserveRecord(date, "使用中") is None: self._showTrace(f"用户在 {date} 可以预约") return True self._showTrace(f"用户在 {date} 有使用中的预约, 无法预约") return False self._showTrace(f"用户在 {date} 已存在有效预约, 无法预约") return False def canCheckin( self, date: str ) -> bool: # have a reserved record in the given date record = self.__getReserveRecord(date, "已预约") if record is not None: begin_time = record["time"]["begin"] begin_time = datetime.strptime(f"{date} {begin_time}", "%Y-%m-%d %H:%M") time_diff = datetime.now() - begin_time time_diff_seconds = time_diff.total_seconds() # before 30 minutes, cant checkin if time_diff_seconds < -30*60: self._showTrace( f"用户在 {date} 的预约开始时间为 {begin_time}, " f"当前距离预约开始时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}, 无法签到" ) return False # before in 30 minutes, can checkin elif -30*60 <= time_diff_seconds < 0: self._showTrace( f"用户在 {date} 的预约开始时间为 {begin_time}, " f"当前距离预约开始时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}, 可以签到" ) return True # past less than 30 minutes, can checkin elif 0 <= time_diff_seconds < 30*60 - 5: # spare 5 seconds for the checkin process self._showTrace( f"用户在 {date} 的预约开始时间为 {begin_time}, " f"当前距离预约开始时间已经过去 {self.__formatDiffTime(abs(time_diff_seconds))}, 可以签到" ) return True self._showTrace(f"用户在 {date} 没有有效预约记录, 无法签到") return False def canRenew( self, date: str ) -> bool: # have a using record in the given date record = self.__getReserveRecord(date, "使用中") if record is not None: end_time = record["time"]["end"] end_time = datetime.strptime(f"{date} {end_time}", "%Y-%m-%d %H:%M") time_diff = end_time - datetime.now() time_diff_seconds = time_diff.total_seconds() # a using record is definitely after the begin time trace_msg = ( f"用户在 {date} 的预约结束时间为 {end_time}, " f"当前距离预约结束时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}" ) if abs(time_diff_seconds) < 120*60: self._showTrace(f"{trace_msg}, 可以续约") return True else: self._showTrace(f"{trace_msg}, 无法续约") return False self._showTrace(f"用户在 {date} 没有有效预约记录, 无法续约") return False