1
1
mirror of https://github.com/KenanZhu/AutoLibrary.git synced 2026-06-18 07:23:03 +08:00
Files
AutoLibrary/src/operators/LibChecker.py
T

378 lines
13 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
Copyright (c) 2025 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import re
import time
import queue
from datetime import datetime, timedelta
from selenium.webdriver.common.by import By
from selenium.webdriver.chrome.webdriver import WebDriver
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from base.LibOperator import LibOperator
class LibChecker(LibOperator):
def __init__(
self,
input_queue: queue.Queue,
output_queue: queue.Queue,
driver: WebDriver
):
super().__init__(input_queue, output_queue)
self.__driver = driver
def _waitResponseLoad(
self
) -> bool:
pass
@staticmethod
def __formatDiffTime(
seconds: float
) -> str:
hours = int(seconds//3600)
minutes = int(seconds%3600//60)
seconds = int(seconds%60)
return f"{hours}{minutes}{seconds}"
def __navigateToReserveRecordPage(
self
) -> bool:
try:
WebDriverWait(self.__driver, 2).until(
EC.element_to_be_clickable((By.XPATH, "//a[@href='/history?type=SEAT']"))
).click()
WebDriverWait(self.__driver, 2).until(
EC.presence_of_element_located((By.CLASS_NAME, "myReserveList"))
)
except:
self._showTrace("加载预约记录页面失败 !", self.TraceLevel.ERROR)
return False
return True
def __decodeReserveTime(
self,
time_element
) -> dict:
time_str = time_element.text.strip()
today = datetime.now().date()
if "明天" in time_str:
target_date = today + timedelta(days=1)
date = target_date.strftime("%Y-%m-%d")
elif "今天" in time_str:
target_date = today
date = target_date.strftime("%Y-%m-%d")
elif "昨天" in time_str:
target_date = today - timedelta(days=1)
date = target_date.strftime("%Y-%m-%d")
else:
date_match = re.search(r"(\d{4}-\d{1,2}-\d{1,2})", time_str)
if date_match:
date = date_match.group(1)
else:
date = ""
time_match = re.search(r"(\d{1,2}:\d{2}) -- (\d{1,2}:\d{2})", time_str)
if time_match:
begin_time = time_match.group(1)
end_time = time_match.group(2)
else:
begin_time = ""
end_time = ""
return {
"date": date,
"time": {
"begin": begin_time,
"end": end_time
}
}
def __decodeReserveInfo(
self,
info_elements
) -> str:
location = ""
status = ""
for info in info_elements:
if "已预约" in info.text:
status = "已预约"
elif "使用中" in info.text:
status = "使用中"
elif "已完成" in info.text:
status = "已完成"
elif "已结束使用" in info.text:
status = "已结束使用"
elif "已取消" in info.text:
status = "已取消"
elif "失约" in info.text:
status = "失约"
elif "图书馆" in info.text:
location = info.text.strip()
return {
"location": location,
"status": status,
}
def __decodeReserveRecord(
self,
reservation
) -> dict:
try:
time_element = reservation.find_element(
By.CSS_SELECTOR, "dt"
)
info_elements = reservation.find_elements(
By.CSS_SELECTOR, "a"
)
except:
return {
"date": "",
"time": {"begin": "", "end": ""},
"info": {"location": "", "status": ""}
}
time = self.__decodeReserveTime(time_element)
info = self.__decodeReserveInfo(info_elements)
return {
"date": time["date"],
"time": time["time"],
"info": info
}
def __loadReserveRecords(
self
) -> list:
try:
# check if there's any reservation on the date
WebDriverWait(self.__driver, 2).until(
EC.presence_of_element_located((By.CSS_SELECTOR, ".myReserveList > dl"))
)
reservations = self.__driver.find_elements(
By.CSS_SELECTOR, ".myReserveList > dl:not(#moreBlock)"
)
return reservations
except:
self._showTrace("加载预约记录失败 !", self.TraceLevel.ERROR)
return None
def __showMoreReserveRecords(
self
) -> bool:
# load new reservations if still not sure
try:
WebDriverWait(self.__driver, 0.1).until(
EC.element_to_be_clickable((By.ID, "moreBtn"))
)
except:
# the reservation is the last one
return False
try:
more_btn = self.__driver.find_element(By.ID, "moreBtn")
if more_btn.is_displayed() and more_btn.is_enabled():
self.__driver.execute_script("arguments[0].scrollIntoView(true);", more_btn)
self.__driver.execute_script("arguments[0].click();", more_btn)
return True
else:
self._showTrace("用户无法加载更多预约记录", self.TraceLevel.WARNING)
return False
except:
self._showTrace("加载更多预约记录失败 !", self.TraceLevel.ERROR)
return False
def __getReserveRecord(
self,
wanted_date: str,
wanted_status: str
) -> dict:
if wanted_date is None:
self._showTrace("日期未指定, 无法检查当前预约状态", self.TraceLevel.WARNING)
return None
self._showTrace(f"正在检查用户在 {wanted_date} 是否有预约状态为 {wanted_status} 的预约记录......", no_log=True)
checked_count = 0
max_check_times = 6 # we only check (4*(6-1)=)20 reservations, the last time cant be checked
if not self.__navigateToReserveRecordPage():
return None
for _ in range(max_check_times):
reservations = self.__loadReserveRecords()
if reservations is None:
return None
for reservation in reservations[checked_count:]:
record = self.__decodeReserveRecord(reservation)
checked_count += 1
if record is None:
continue
if record["date"] == "":
continue
if record["time"] == {"begin": "", "end": ""}:
continue
# record date is later than the given date, check the next one
if datetime.strptime(record["date"], "%Y-%m-%d").date() >\
datetime.strptime(wanted_date, "%Y-%m-%d").date():
continue
# record date is earlier than the given date, so there is no wanted record
if datetime.strptime(record["date"], "%Y-%m-%d").date() <\
datetime.strptime(wanted_date, "%Y-%m-%d").date():
return None
if record["info"]["status"] == wanted_status:
self._showTrace(
f"寻找到用户第 {checked_count} 条状态为 {wanted_status} 的预约记录, "
f"详细信息: {record["date"]} "
f"{record["time"]["begin"]} - {record["time"]["end"]} {record["info"]["location"]}",
no_log=True
)
return record
if not self.__showMoreReserveRecords():
break
return None
def canReserve(
self,
date: str
) -> bool:
# no reserved or using record in the given date
# then can reserve
if self.__getReserveRecord(date, "已预约") is None:
if self.__getReserveRecord(date, "使用中") is None:
self._showTrace(f"用户在 {date} 可以预约")
return True
self._showTrace(f"用户在 {date} 有使用中的预约, 无法预约")
return False
self._showTrace(f"用户在 {date} 已存在有效预约, 无法预约")
return False
def canCheckin(
self
) -> bool:
# only check the current date
date = time.strftime("%Y-%m-%d", time.localtime())
record = self.__getReserveRecord(date, "已预约")
if record is not None:
begin_time = record["time"]["begin"]
begin_time = datetime.strptime(f"{date} {begin_time}", "%Y-%m-%d %H:%M")
time_diff = datetime.now() - begin_time
time_diff_seconds = time_diff.total_seconds()
# before 30 minutes, cant checkin
if time_diff_seconds < -30*60:
self._showTrace(
f"用户在 {date} 的预约开始时间为 {begin_time}, "
f"当前距离预约开始时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}, 无法签到"
)
return False
# before in 30 minutes, can checkin
elif -30*60 <= time_diff_seconds < 0:
self._showTrace(
f"用户在 {date} 的预约开始时间为 {begin_time}, "
f"当前距离预约开始时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}, 可以签到"
)
return True
# past less than 30 minutes, can checkin
elif 0 <= time_diff_seconds < 30*60 - 5: # spare 5 seconds for the checkin process
self._showTrace(
f"用户在 {date} 的预约开始时间为 {begin_time}, "
f"当前距离预约开始时间已经过去 {self.__formatDiffTime(abs(time_diff_seconds))}, 可以签到"
)
return True
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法签到")
return False
def canRenew(
self
) -> tuple[bool, dict]:
# only check the current date
date = time.strftime("%Y-%m-%d", time.localtime())
record = self.__getReserveRecord(date, "使用中")
if record is not None:
end_time = record["time"]["end"]
end_time = datetime.strptime(f"{date} {end_time}", "%Y-%m-%d %H:%M")
time_diff = end_time - datetime.now()
time_diff_seconds = time_diff.total_seconds()
# a using record is definitely after the begin time
trace_msg = (
f"用户在 {date} 的预约结束时间为 {end_time}, "
f"当前距离预约结束时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}"
)
if abs(time_diff_seconds) < 120*60:
self._showTrace(f"{trace_msg}, 可以续约")
return True, record
else:
self._showTrace(f"{trace_msg}, 无法续约")
return False, None # we do not need to return the record, because if current
# time is not available for renewal, the record is not required
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法续约")
return False, None
def postRenewCheck(
self,
record: dict
) -> bool:
"""
Check if the renew operation is successful
Args:
record (dict): The expected record after renewal
Returns:
bool: True if the renew operation is successful, False otherwise
"""
# because the special circumstance that the renew operation
# do not show the success message or anything else,
# we need to check the record data to make sure the renew operation is successful.
# only check the given record date
date = record["date"]
act_record = self.__getReserveRecord(date, "使用中")
if act_record is not None:
if act_record["time"]["begin"] == record["time"]["begin"] and\
act_record["time"]["end"] == record["time"]["end"]:
self._showTrace(f"\n"\
f" 续约成功 !\n"\
f" 日 期 {date}\n"\
f" 时 间 {act_record["time"]["begin"]} - {act_record["time"]["end"]}\n"\
f" 位 置 {act_record["info"]["location"]}\n"
f" 状 态 {act_record["info"]["status"]}"
)
return True
else:
self._showTrace(f"\n"\
f" 续约失败 !\n"\
f" 续约后结束时间为 {act_record["time"]["end"]},与预期结束时间 {record["time"]["end"]} 不符 !"
)
return False
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法检查续约结果")
return False