1
1
mirror of https://github.com/KenanZhu/AutoLibrary.git synced 2026-06-18 07:23:03 +08:00
Files
AutoLibrary/AutoLib.py
T

281 lines
10 KiB
Python

# -*- coding: utf-8 -*-
"""
Copyright (c) 2025 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import os
import queue
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.edge.service import Service
from MsgBase import MsgBase
from LibChecker import LibChecker
from LibLogin import LibLogin
from LibLogout import LibLogout
from LibReserve import LibReserve
from LibCheckin import LibCheckin
from ConfigReader import ConfigReader
class AutoLib(MsgBase):
def __init__(
self,
input_queue: queue.Queue,
output_queue: queue.Queue
):
super().__init__(input_queue, output_queue)
self.__system_config_reader = None
self.__users_config_reader = None
self.__driver = None
def __initBrowserDriver(
self
) -> bool:
self._showTrace("正在初始化浏览器驱动......")
edge_options = webdriver.EdgeOptions()
if self.__system_config_reader.get("web_driver/headless"):
edge_options.add_argument("--headless")
edge_options.add_argument("--disable-gpu")
edge_options.add_argument("--no-sandbox")
edge_options.add_argument("--disable-dev-shm-usage")
# must be 1920x1080, otherwise the page will cause some elements not accessible
edge_options.add_argument("--window-size=1920,1080")
edge_options.add_argument("--remote-allow-origins=*")
# omit ssl errors and verbose log level
edge_options.add_argument("--ignore-certificate-errors")
edge_options.add_argument("--ignore-ssl-errors")
edge_options.add_argument("--log-level=OFF")
edge_options.add_argument("--silent")
edge_options.add_experimental_option("excludeSwitches", ["enable-automation"])
edge_options.add_experimental_option("useAutomationExtension", False)
edge_options.add_argument("--disable-blink-features=AutomationControlled")
edge_options.add_argument(
"--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) "\
"AppleWebKit/537.36 (KHTML, like Gecko) "\
"Chrome/120.0.0.0 "\
"Safari/537.36 "\
"Edg/120.0.0.0"
)
# init browser driver
self.__driver_path = self.__system_config_reader.get("web_driver/driver_path")
self.__driver_type = self.__system_config_reader.get("web_driver/driver_type")
self.__driver_path = os.path.abspath(self.__driver_path)
try:
service = None
if self.__driver_path:
service = Service(executable_path=self.__driver_path)
match self.__driver_type.lower():
case "edge":
self.__driver = webdriver.Edge(service=service, options=edge_options)
case "chrome":
self.__driver = webdriver.Chrome(service=service, options=edge_options)
case "firefox":
self.__driver = webdriver.Firefox(service=service, options=edge_options)
case _:
raise Exception(f"不支持的浏览器驱动类型: {self.__driver_type}")
self.__driver.implicitly_wait(10)
self.__driver.execute_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
)
except Exception as e:
self._showTrace(f"浏览器驱动初始化失败: {e}")
return False
self._showTrace(f"浏览器驱动已初始化, 类型: {self.__driver_type}, 路径: {self.__driver_path}")
return True
def __initLibOperators(
self
):
if not self.__driver:
self._showTrace(f"浏览器驱动未初始化, 请先初始化浏览器驱动 !")
return
self.__lib_checker = LibChecker(self._input_queue, self._output_queue, self.__driver)
self.__lib_login = LibLogin(self._input_queue, self._output_queue, self.__driver)
self.__lib_logout = LibLogout(self._input_queue, self._output_queue, self.__driver)
self.__lib_reserve = LibReserve(self._input_queue, self._output_queue, self.__driver)
self.__lib_checkin = LibCheckin(self._input_queue, self._output_queue, self.__driver)
def __waitResponseLoad(
self
) -> bool:
# wait for page load
try:
WebDriverWait(self.__driver, 5).until( # title contains "首页"
EC.title_contains("首页")
)
WebDriverWait(self.__driver, 2).until( # username field presence
EC.presence_of_element_located((By.NAME, "username"))
)
WebDriverWait(self.__driver, 2).until( # password field presence
EC.presence_of_element_located((By.NAME, "password"))
)
WebDriverWait(self.__driver, 2).until( # captcha field presence
EC.presence_of_element_located((By.NAME, "answer"))
)
WebDriverWait(self.__driver, 2).until( # captcha image presence
EC.presence_of_element_located((By.ID, "loadImgId"))
)
return True
except:
self._showTrace(f"登录页面加载失败 !")
return False
def __initDriverUrl(
self,
) -> bool:
self.__driver.get(self.__system_config_reader.get("library/host_url"))
if not self.__waitResponseLoad():
return False
return True
def __run(
self,
username: str,
password: str,
reserve_info: dict
) -> int:
# result : 0 - success, 1 - failed, 2 - passed
result = 2
# login
if not self.__lib_login.login(
username,
password,
self.__system_config_reader.get("login/max_attempt", 5),
self.__system_config_reader.get("login/auto_captcha", True),
):
return 1
"""
Here, we collect the run mode from the config file.
"""
run_mode = self.__system_config_reader.get("mode/run_mode", 0)
run_mode = {
"auto_reserve": run_mode&0x1,
"auto_checkin": run_mode&0x2,
"auto_renewal": run_mode&0x4,
}
# reserve
if run_mode["auto_reserve"]:
if self.__lib_checker.canReserve(reserve_info.get("date")):
if self.__lib_reserve.reserve(reserve_info):
self._showTrace(f"用户 {username} 预约成功 !")
result = 0
else:
self._showTrace(f"用户 {username} 预约失败 !")
result = 1
else:
self._showTrace(f"用户 {username} 无法预约,已跳过")
result = 2
# checkin
if run_mode["auto_checkin"] and result == 2:
if self.__lib_checker.canCheckin(reserve_info.get("date")):
if self.__lib_checkin.checkin(username):
self._showTrace(f"用户 {username} 签到成功 !")
result = 0
else:
self._showTrace(f"用户 {username} 签到失败 !")
result = 1
else:
self._showTrace(f"用户 {username} 无法签到,已跳过")
result = 2
# renewal
if run_mode["auto_renewal"] and result == 2:
if self.__lib_checker.canRenew(reserve_info.get("date")):
pass
else:
self._showTrace(f"用户 {username} 无法续约,已跳过")
result = 2
# logout
if not self.__lib_logout.logout(
username,
):
# if logout is failed, we must make sure the host to be reloaded
# otherwise, the next login may fail
self.__driver.get(self.__system_config_reader.get("library/host_url"))
return 1
return result
def run(
self,
system_config_reader: ConfigReader,
users_config_reader: ConfigReader
):
self.__system_config_reader = system_config_reader
self.__users_config_reader = users_config_reader
if not self.__initBrowserDriver():
return
else:
if not self.__initDriverUrl():
return
self.__initLibOperators()
user_counter = {"current": 0, "success": 0, "failed": 0, "passed": 0}
users = self.__users_config_reader.get("users")
self._showTrace(
f"共发现 {len(users)} 个用户, "\
f"用户配置文件路径: {self.__users_config_reader.configPath()}"
)
for user in users:
user_counter["current"] += 1
self._showTrace(
f"正在处理第 {user_counter["current"]}/{len(users)} 个用户: {user['username']}......"
)
r = self.__run(
username=user["username"],
password=user["password"],
reserve_info=user["reserve_info"],
)
if r == 0:
user_counter["success"] += 1
elif r == 1:
user_counter["failed"] += 1
elif r == 2:
user_counter["passed"] += 1
self._showTrace(f"处理完成, 共计 {user_counter["current"]} 个用户, "\
f"成功 {user_counter["success"]} 个用户, "\
f"失败 {user_counter["failed"]} 个用户, "\
f"跳过 {user_counter["passed"]} 个用户"
)
return
def close(
self
) -> bool:
if self.__driver:
self.__driver.quit()
self.__driver = None
self._showTrace(f"浏览器驱动已关闭")
return True
else:
self._showTrace(f"浏览器驱动未初始化, 无需关闭")
return False