mirror of
https://github.com/KenanZhu/AutoLibrary.git
synced 2026-06-18 07:23:03 +08:00
347 lines
11 KiB
Python
347 lines
11 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""
|
|
Copyright (c) 2025 KenanZhu.
|
|
All rights reserved.
|
|
|
|
This software is provided "as is", without any warranty of any kind.
|
|
You may use, modify, and distribute this file under the terms of the MIT License.
|
|
See the LICENSE file for details.
|
|
"""
|
|
import re
|
|
import time
|
|
import queue
|
|
|
|
from datetime import datetime, timedelta
|
|
from selenium.webdriver.common.by import By
|
|
from selenium.webdriver.support.ui import WebDriverWait
|
|
from selenium.webdriver.support import expected_conditions as EC
|
|
|
|
from LibOperator import LibOperator
|
|
|
|
|
|
class LibChecker(LibOperator):
|
|
|
|
def __init__(
|
|
self,
|
|
input_queue: queue.Queue,
|
|
output_queue: queue.Queue,
|
|
driver
|
|
):
|
|
|
|
super().__init__(input_queue, output_queue)
|
|
|
|
self.__driver = driver
|
|
|
|
|
|
def _waitResponseLoad(
|
|
self
|
|
) -> bool:
|
|
|
|
pass
|
|
|
|
@staticmethod
|
|
def __formatDiffTime(
|
|
seconds: float
|
|
) -> str:
|
|
|
|
hours = int(seconds//3600)
|
|
minutes = int(seconds%3600//60)
|
|
seconds = int(seconds%60)
|
|
return f"{hours} 时 {minutes} 分 {seconds} 秒"
|
|
|
|
|
|
def __navigateToReserveRecordPage(
|
|
self
|
|
) -> bool:
|
|
|
|
try:
|
|
WebDriverWait(self.__driver, 2).until(
|
|
EC.element_to_be_clickable((By.XPATH, "//a[@href='/history?type=SEAT']"))
|
|
).click()
|
|
WebDriverWait(self.__driver, 2).until(
|
|
EC.presence_of_element_located((By.CLASS_NAME, "myReserveList"))
|
|
)
|
|
except:
|
|
self._showTrace("加载预约记录页面失败 !")
|
|
return False
|
|
return True
|
|
|
|
|
|
def __decodeReserveTime(
|
|
self,
|
|
time_element
|
|
) -> dict:
|
|
|
|
time_str = time_element.text.strip()
|
|
today = datetime.now().date()
|
|
if "明天" in time_str:
|
|
target_date = today + timedelta(days=1)
|
|
date = target_date.strftime("%Y-%m-%d")
|
|
elif "今天" in time_str:
|
|
target_date = today
|
|
date = target_date.strftime("%Y-%m-%d")
|
|
elif "昨天" in time_str:
|
|
target_date = today - timedelta(days=1)
|
|
date = target_date.strftime("%Y-%m-%d")
|
|
else:
|
|
date_match = re.search(r"(\d{4}-\d{1,2}-\d{1,2})", time_str)
|
|
if date_match:
|
|
date = date_match.group(1)
|
|
else:
|
|
date = ""
|
|
time_match = re.search(r"(\d{1,2}:\d{2}) -- (\d{1,2}:\d{2})", time_str)
|
|
if time_match:
|
|
begin_time = time_match.group(1)
|
|
end_time = time_match.group(2)
|
|
else:
|
|
begin_time = ""
|
|
end_time = ""
|
|
return {
|
|
"date": date,
|
|
"time": {
|
|
"begin": begin_time,
|
|
"end": end_time
|
|
}
|
|
}
|
|
|
|
|
|
def __decodeReserveInfo(
|
|
self,
|
|
info_elements
|
|
) -> str:
|
|
|
|
location = ""
|
|
status = ""
|
|
for info in info_elements:
|
|
if "已预约" in info.text:
|
|
status = "已预约"
|
|
elif "使用中" in info.text:
|
|
status = "使用中"
|
|
elif "已完成" in info.text:
|
|
status = "已完成"
|
|
elif "已结束使用" in info.text:
|
|
status = "已结束使用"
|
|
elif "已取消" in info.text:
|
|
status = "已取消"
|
|
elif "失约" in info.text:
|
|
status = "失约"
|
|
elif "图书馆" in info.text:
|
|
location = info.text.strip()
|
|
return {
|
|
"location": location,
|
|
"status": status,
|
|
}
|
|
|
|
|
|
def __decodeReserveRecord(
|
|
self,
|
|
reservation
|
|
) -> dict:
|
|
|
|
try:
|
|
time_element = reservation.find_element(
|
|
By.CSS_SELECTOR, "dt"
|
|
)
|
|
info_elements = reservation.find_elements(
|
|
By.CSS_SELECTOR, "a"
|
|
)
|
|
except:
|
|
return {
|
|
"date": "",
|
|
"time": {"begin": "", "end": ""},
|
|
"info": {"location": "", "status": ""}
|
|
}
|
|
time = self.__decodeReserveTime(time_element)
|
|
info = self.__decodeReserveInfo(info_elements)
|
|
return {
|
|
"date": time["date"],
|
|
"time": time["time"],
|
|
"info": info
|
|
}
|
|
|
|
|
|
def __decodeReserveRecords(
|
|
self,
|
|
reservations
|
|
) -> list:
|
|
|
|
records = []
|
|
|
|
for reservation in reservations:
|
|
record = self.__decodeReserveRecord(reservation)
|
|
if record["date"] == "":
|
|
record = None
|
|
if record["time"] == {"begin": "", "end": ""}:
|
|
record = None
|
|
records.append(record)
|
|
return records
|
|
|
|
|
|
def __loadReserveRecords(
|
|
self
|
|
) -> list:
|
|
try:
|
|
# check if there's any reservation on the date
|
|
WebDriverWait(self.__driver, 2).until(
|
|
EC.presence_of_element_located((By.CSS_SELECTOR, ".myReserveList > dl"))
|
|
)
|
|
reservations = self.__driver.find_elements(
|
|
By.CSS_SELECTOR, ".myReserveList > dl:not(#moreBlock)"
|
|
)
|
|
return reservations
|
|
except:
|
|
self._showTrace("加载预约记录失败 !")
|
|
return None
|
|
|
|
|
|
def __showMoreReserveRecords(
|
|
self
|
|
) -> bool:
|
|
|
|
# load new reservations if still not sure
|
|
try:
|
|
WebDriverWait(self.__driver, 0.1).until(
|
|
EC.element_to_be_clickable((By.ID, "moreBtn"))
|
|
)
|
|
except:
|
|
# the reservation is the last one
|
|
return False
|
|
try:
|
|
more_btn = self.__driver.find_element(By.ID, "moreBtn")
|
|
if more_btn.is_displayed() and more_btn.is_enabled():
|
|
self.__driver.execute_script("arguments[0].scrollIntoView(true);", more_btn)
|
|
self.__driver.execute_script("arguments[0].click();", more_btn)
|
|
return True
|
|
else:
|
|
self._showTrace("用户无法加载更多预约记录")
|
|
return False
|
|
except:
|
|
self._showTrace("加载更多预约记录失败 !")
|
|
return False
|
|
|
|
|
|
def __getReserveRecord(
|
|
self,
|
|
wanted_date: str,
|
|
wanted_status: str
|
|
) -> dict:
|
|
|
|
if wanted_date is None:
|
|
self._showTrace("日期未指定, 无法检查当前预约状态")
|
|
return None
|
|
self._showTrace(f"正在检查用户在 {wanted_date} 是否有预约状态为 {wanted_status} 的预约记录......")
|
|
|
|
checked_count = 0
|
|
max_check_times = 6 # we only check (4*(6-1)=)20 reservations, the last time cant be checked
|
|
|
|
if not self.__navigateToReserveRecordPage():
|
|
return None
|
|
for _ in range(max_check_times):
|
|
reservations = self.__loadReserveRecords()
|
|
if reservations is None:
|
|
return None
|
|
records = self.__decodeReserveRecords(reservations[checked_count:])
|
|
for record in records:
|
|
checked_count += 1
|
|
if record is None:
|
|
continue
|
|
# record date is later than the given date, check the next one
|
|
if datetime.strptime(record["date"], "%Y-%m-%d").date() >\
|
|
datetime.strptime(wanted_date, "%Y-%m-%d").date():
|
|
continue
|
|
# record date is earlier than the given date, so there is no wanted record
|
|
if datetime.strptime(record["date"], "%Y-%m-%d").date() <\
|
|
datetime.strptime(wanted_date, "%Y-%m-%d").date():
|
|
return None
|
|
if record["info"]["status"] == wanted_status:
|
|
self._showTrace(
|
|
f"寻找到用户第 {checked_count} 条状态为 {wanted_status} 的预约记录, "
|
|
f"详细信息: {record["date"]} "
|
|
f"{record["time"]["begin"]} - {record["time"]["end"]} {record["info"]["location"]}"
|
|
)
|
|
return record
|
|
if not self.__showMoreReserveRecords():
|
|
break
|
|
return None
|
|
|
|
|
|
def canReserve(
|
|
self,
|
|
date: str
|
|
) -> bool:
|
|
|
|
# no reserved or using record in the given date
|
|
# then can reserve
|
|
if self.__getReserveRecord(date, "已预约") is None:
|
|
if self.__getReserveRecord(date, "使用中") is None:
|
|
self._showTrace(f"用户在 {date} 可以预约")
|
|
return True
|
|
self._showTrace(f"用户在 {date} 有使用中的预约, 无法预约")
|
|
return False
|
|
self._showTrace(f"用户在 {date} 已存在有效预约, 无法预约")
|
|
return False
|
|
|
|
|
|
def canCheckin(
|
|
self,
|
|
date: str
|
|
) -> bool:
|
|
|
|
# have a reserved record in the given date
|
|
record = self.__getReserveRecord(date, "已预约")
|
|
if record is not None:
|
|
begin_time = record["time"]["begin"]
|
|
begin_time = datetime.strptime(f"{date} {begin_time}", "%Y-%m-%d %H:%M")
|
|
time_diff = datetime.now() - begin_time
|
|
time_diff_seconds = time_diff.total_seconds()
|
|
# before 30 minutes, cant checkin
|
|
if time_diff_seconds < -30*60:
|
|
self._showTrace(
|
|
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
|
f"当前距离预约开始时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}, 无法签到"
|
|
)
|
|
return False
|
|
# before in 30 minutes, can checkin
|
|
elif -30*60 <= time_diff_seconds < 0:
|
|
self._showTrace(
|
|
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
|
f"当前距离预约开始时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}, 可以签到"
|
|
)
|
|
return True
|
|
# past less than 30 minutes, can checkin
|
|
elif 0 <= time_diff_seconds < 30*60 - 5: # spare 5 seconds for the checkin process
|
|
self._showTrace(
|
|
f"用户在 {date} 的预约开始时间为 {begin_time}, "
|
|
f"当前距离预约开始时间已经过去 {self.__formatDiffTime(abs(time_diff_seconds))}, 可以签到"
|
|
)
|
|
return True
|
|
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法签到")
|
|
return False
|
|
|
|
|
|
def canRenew(
|
|
self,
|
|
date: str
|
|
) -> bool:
|
|
|
|
# have a using record in the given date
|
|
record = self.__getReserveRecord(date, "使用中")
|
|
if record is not None:
|
|
end_time = record["time"]["end"]
|
|
end_time = datetime.strptime(f"{date} {end_time}", "%Y-%m-%d %H:%M")
|
|
time_diff = end_time - datetime.now()
|
|
time_diff_seconds = time_diff.total_seconds()
|
|
# a using record is definitely after the begin time
|
|
trace_msg = (
|
|
f"用户在 {date} 的预约结束时间为 {end_time}, "
|
|
f"当前距离预约结束时间还有 {self.__formatDiffTime(abs(time_diff_seconds))}"
|
|
)
|
|
if abs(time_diff_seconds) < 120*60:
|
|
self._showTrace(f"{trace_msg}, 可以续约")
|
|
return True
|
|
else:
|
|
self._showTrace(f"{trace_msg}, 无法续约")
|
|
return False
|
|
self._showTrace(f"用户在 {date} 没有有效预约记录, 无法续约")
|
|
return False
|