1
1
mirror of https://github.com/KenanZhu/AutoLibrary.git synced 2026-06-18 07:23:03 +08:00

feat(preproc): 新增适用于重复性定时任务的预处理脚本以及可视化编排对话框

This commit is contained in:
Gogs
2026-05-08 15:23:24 +08:00
parent e11f696b76
commit 4d0d7a952c
7 changed files with 1560 additions and 10 deletions
+81 -8
View File
@@ -18,6 +18,7 @@ from PySide6.QtCore import (
from base.MsgBase import MsgBase from base.MsgBase import MsgBase
from operators.AutoLib import AutoLib from operators.AutoLib import AutoLib
from utils.JSONReader import JSONReader from utils.JSONReader import JSONReader
from utils.PreprocEngine import PreprocEngine
class AutoLibWorker(MsgBase, QThread): class AutoLibWorker(MsgBase, QThread):
@@ -76,25 +77,28 @@ class AutoLibWorker(MsgBase, QThread):
f"正在加载配置文件, 运行配置文件路径: {self.__config_paths["run"]}", f"正在加载配置文件, 运行配置文件路径: {self.__config_paths["run"]}",
no_log=True no_log=True
) )
self.__run_config = JSONReader(self.__config_paths["run"]).data() self._run_config = JSONReader(self.__config_paths["run"]).data()
self._showTrace( self._showTrace(
f"正在加载配置文件, 用户配置文件路径: {self.__config_paths["user"]}", f"正在加载配置文件, 用户配置文件路径: {self.__config_paths["user"]}",
no_log=True no_log=True
) )
self.__user_config = JSONReader(self.__config_paths["user"]).data() self._user_config = JSONReader(self.__config_paths["user"]).data()
if self.__run_config is None or self.__user_config is None: if self._run_config is None or self._user_config is None:
self._showTrace( self._showTrace(
"配置文件加载失败, 请检查配置文件是否正确", "配置文件加载失败, 请检查配置文件是否正确",
self.TraceLevel.ERROR self.TraceLevel.ERROR
) )
return False return False
if not self.__user_config.get("groups"): if not self._user_config.get("groups"):
self._showTrace( self._showTrace(
"用户配置文件中无有效任务组, 请检查用户配置文件是否正确", "用户配置文件中无有效任务组, 请检查用户配置文件是否正确",
self.TraceLevel.WARNING self.TraceLevel.WARNING
) )
return False return False
self._showLog(f"配置文件加载成功, 任务组数量: {len(self.__user_config.get('groups', []))}", self.TraceLevel.INFO) self._showLog(
f"配置文件加载成功, 任务组数量: {len(self._user_config.get('groups', []))}",
self.TraceLevel.INFO
)
return True return True
@@ -115,9 +119,9 @@ class AutoLibWorker(MsgBase, QThread):
auto_lib = AutoLib( auto_lib = AutoLib(
self._input_queue, self._input_queue,
self._output_queue, self._output_queue,
self.__run_config self._run_config
) )
groups = self.__user_config.get("groups") groups = self._user_config.get("groups")
for group in groups: for group in groups:
if not group["enabled"]: if not group["enabled"]:
self._showTrace(f"任务组 {group["name"]} 已跳过", no_log=True) self._showTrace(f"任务组 {group["name"]} 已跳过", no_log=True)
@@ -162,7 +166,76 @@ class TimerTaskWorker(AutoLibWorker):
): ):
self._showTrace(f"定时任务 {self.__timer_task['name']} 开始运行") self._showTrace(f"定时任务 {self.__timer_task['name']} 开始运行")
super().run() if not self.checkTimeAvailable() or not self.checkConfigPaths():
self._showTrace("定时任务跳过执行 (时间或配置文件检查未通过)")
self.timerTaskWorkerIsFinished.emit(False, self.__timer_task)
return
try:
if not self.loadConfigs():
raise Exception("配置文件加载失败")
self._applyRepeatPreproc()
auto_lib = AutoLib(
self._input_queue,
self._output_queue,
self._run_config
)
groups = self._user_config.get("groups")
for group in groups:
if not group["enabled"]:
self._showTrace(
f"任务组 {group['name']} 已跳过",
no_log=True
)
continue
self._showTrace(
f"正在运行任务组 {group['name']}",
no_log=True
)
auto_lib.run(
{"users": group.get("users", [])}
)
auto_lib.close()
except Exception as e:
self._showTrace(
f"定时任务 {self.__timer_task['name']} 运行时发生异常: {e}",
self.TraceLevel.ERROR
)
self.timerTaskWorkerIsFinished.emit(True, self.__timer_task)
return
self._showTrace(f"定时任务 {self.__timer_task['name']} 运行结束")
self.timerTaskWorkerIsFinished.emit(False, self.__timer_task)
def _applyRepeatPreproc(
self
):
preproc_script = self.__timer_task.get("repeat_preproc", "")
if not preproc_script or not preproc_script.strip():
return
self._showTrace(
f"检测到重复定时任务预处理脚本, 开始执行...",
no_log=True
)
groups = self._user_config.get("groups", [])
affected_count = 0
for group in groups:
if not group.get("enabled", False):
continue
for user in group.get("users", []):
try:
PreprocEngine.execute(preproc_script, user)
affected_count += 1
except ValueError as e:
self._showTrace(
f"预处理脚本执行错误 (用户 {user['username']}): {e}",
self.TraceLevel.ERROR
)
self._showLog(
f"预处理脚本执行完毕, "
f"影响 {affected_count} 个用户",
self.TraceLevel.INFO
)
@Slot() @Slot()
def onTimerTaskFinishedWithError( def onTimerTaskFinishedWithError(
+226
View File
@@ -0,0 +1,226 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
from PySide6.QtGui import (
QSyntaxHighlighter, QTextCharFormat, QColor, QFont, QIcon
)
from PySide6.QtWidgets import (
QDialog, QVBoxLayout, QHBoxLayout, QPlainTextEdit,
QDialogButtonBox, QPushButton, QLabel, QApplication, QStyle
)
class ALScriptHighlighter(QSyntaxHighlighter):
def __init__(
self,
parent=None
):
super().__init__(parent)
self._rules = []
keywordFmt = QTextCharFormat()
keywordFmt.setForeground(QColor("#316BFF"))
keywordFmt.setFontWeight(QFont.Weight.Bold)
for kw in ["IF", "ELSE IF", "ELSE", "ENDIF", "END IF",
"SET", "PASS", "THEN", ".TRUE.", ".FALSE."]:
pattern = r"\b" + kw.replace(" ", r"\s+") + r"\b"
self._rules.append((pattern, keywordFmt))
opFmt = QTextCharFormat()
opFmt.setForeground(QColor("#9C27B0"))
for op in [r"\.EQ\.", r"\.NEQ\.", r"\.BGT\.", r"\.BLT\.",
r"\.BGE\.", r"\.BLE\.", r"\.ADD\.", r"\.SUB\."]:
self._rules.append((op, opFmt))
varFmt = QTextCharFormat()
varFmt.setForeground(QColor("#E65100"))
for var in ["RESERVE_BEGIN_TIME", "RESERVE_END_TIME",
"RESERVE_DATE", "USERNAME", "USER_ENABLE",
"PRIORITY", "CURRENT_TIME", "CURRENT_DATE"]:
self._rules.append((r"\b" + var + r"\b", varFmt))
funcFmt = QTextCharFormat()
funcFmt.setForeground(QColor("#2E7D32"))
self._rules.append((r"\bTIME\([^)]+\)", funcFmt))
self._rules.append((r"\bDATE\([^)]+\)", funcFmt))
strFmt = QTextCharFormat()
strFmt.setForeground(QColor("#388E3C"))
self._rules.append((r"'[^']*'", strFmt))
numFmt = QTextCharFormat()
numFmt.setForeground(QColor("#D32F2F"))
self._rules.append((r"\b\d+\b", numFmt))
commentFmt = QTextCharFormat()
commentFmt.setForeground(QColor("#999999"))
commentFmt.setFontItalic(True)
self._rules.append((r"//[^\n]*", commentFmt))
def highlightBlock(
self,
text
):
import re
for pattern, fmt in self._rules:
for match in re.finditer(pattern, text, re.IGNORECASE):
start = match.start()
length = match.end() - match.start()
self.setFormat(start, length, fmt)
class ALScriptPreviewDialog(QDialog):
def __init__(
self,
parent=None,
script: str = ""
):
super().__init__(parent)
self.__fontSize = 13
self.modifyUi()
self.connectSignals()
self._textEdit.setPlainText(script)
self._highlighter = ALScriptHighlighter(
self._textEdit.document()
)
def modifyUi(
self
):
self.setWindowTitle("预处理脚本预览 - AutoLibrary")
self.setMinimumSize(520, 360)
layout = QVBoxLayout(self)
toolbarLayout = QHBoxLayout()
self._zoomInBtn = QPushButton("")
self._zoomInBtn.setFixedSize(30, 25)
self._zoomOutBtn = QPushButton("")
self._zoomOutBtn.setFixedSize(30, 25)
self._zoomResetBtn = QPushButton(
QApplication.style().standardIcon(
QStyle.StandardPixmap.SP_BrowserReload
), ""
)
self._zoomResetBtn.setFixedSize(30, 25)
self._zoomResetBtn.setToolTip("重置缩放")
self._zoomLabel = QLabel(f"{self.__fontSize}px")
self._zoomLabel.setFixedHeight(25)
toolbarLayout.addWidget(self._zoomInBtn)
toolbarLayout.addWidget(self._zoomOutBtn)
toolbarLayout.addWidget(self._zoomResetBtn)
toolbarLayout.addWidget(self._zoomLabel)
toolbarLayout.addStretch()
self._copyBtn = QPushButton(
QApplication.style().standardIcon(
QStyle.StandardPixmap.SP_FileDialogDetailedView
), ""
)
self._copyBtn.setFixedSize(30, 25)
self._copyBtn.setToolTip("复制脚本")
toolbarLayout.addWidget(self._copyBtn)
layout.addLayout(toolbarLayout)
self._textEdit = QPlainTextEdit(self)
self._textEdit.setReadOnly(True)
self._textEdit.setLineWrapMode(
QPlainTextEdit.LineWrapMode.NoWrap
)
self._textEdit.setStyleSheet(
"QPlainTextEdit {"
" font-family: 'Courier New', 'Consolas', monospace;"
" font-size: 13px;"
"}"
)
layout.addWidget(self._textEdit)
self._btnBox = QDialogButtonBox(
QDialogButtonBox.StandardButton.Close
)
self._btnBox.button(
QDialogButtonBox.StandardButton.Close
).setText("关闭")
layout.addWidget(self._btnBox)
def connectSignals(
self
):
self._btnBox.rejected.connect(self.reject)
self._zoomInBtn.clicked.connect(self._onZoomIn)
self._zoomOutBtn.clicked.connect(self._onZoomOut)
self._zoomResetBtn.clicked.connect(self._onZoomReset)
self._copyBtn.clicked.connect(self._onCopy)
def _onZoomIn(
self
):
self.__fontSize = min(self.__fontSize + 2, 40)
self._updateFontSize()
def _onZoomOut(
self
):
self.__fontSize = max(self.__fontSize - 2, 8)
self._updateFontSize()
def _onZoomReset(
self
):
self.__fontSize = 13
self._updateFontSize()
def _onCopy(
self
):
clipboard = QApplication.clipboard()
clipboard.setText(self._textEdit.toPlainText())
original = self._copyBtn.text()
self._copyBtn.setText("已复制")
self._copyBtn.setEnabled(False)
from PySide6.QtCore import QTimer
QTimer.singleShot(2000, lambda: (
self._copyBtn.setText(original),
self._copyBtn.setEnabled(True)
))
def _updateFontSize(
self
):
font = self._textEdit.font()
font.setPointSize(self.__fontSize)
self._textEdit.setFont(font)
self._textEdit.setStyleSheet(
"QPlainTextEdit {"
" font-family: 'Courier New', 'Consolas', monospace;"
f" font-size: {self.__fontSize}px;"
"}"
)
self._zoomLabel.setText(f"{self.__fontSize}px")
+864
View File
@@ -0,0 +1,864 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
from PySide6.QtCore import QTime, QDate
from PySide6.QtWidgets import (
QDialog, QWidget, QVBoxLayout, QHBoxLayout, QLabel,
QComboBox, QPushButton, QScrollArea, QTimeEdit,
QDateEdit, QLineEdit, QSpinBox, QDoubleSpinBox,
QStackedWidget, QFrame, QDialogButtonBox,
QGroupBox, QSizePolicy
)
from utils.PreprocEngine import PreprocEngine
VARIABLE_META = PreprocEngine.VARIABLE_META
_VAR_COMBO_ITEMS = [
(display, varname, vartype)
for display, (varname, vartype) in VARIABLE_META.items()
]
_VAR_COMBO_ITEMS_SET = [
(display, varname, vartype)
for display, (varname, vartype) in VARIABLE_META.items()
if not varname.startswith("CURRENT_")
]
OP_ITEMS = [
("等于", ".EQ."),
("不等于", ".NEQ."),
("大于", ".BGT."),
("小于", ".BLT."),
("大于等于", ".BGE."),
("小于等于", ".BLE."),
]
def _makeVarCombo(
) -> QComboBox:
cb = QComboBox()
for display, varname, vartype in _VAR_COMBO_ITEMS:
cb.addItem(display, (varname, vartype))
cb.setMinimumWidth(120)
cb.setFixedHeight(25)
return cb
def _makeSetVarCombo(
) -> QComboBox:
cb = QComboBox()
for display, varname, vartype in _VAR_COMBO_ITEMS_SET:
cb.addItem(display, (varname, vartype))
cb.setMinimumWidth(120)
cb.setFixedHeight(25)
return cb
def _makeOpCombo(
) -> QComboBox:
cb = QComboBox()
for display, op in OP_ITEMS:
cb.addItem(display, op)
cb.setMinimumWidth(80)
cb.setFixedHeight(25)
return cb
def _makeValueWidget(
data_type: str
) -> QWidget:
if data_type == "Time":
w = QTimeEdit()
w.setDisplayFormat("HH:mm")
w.setMinimumWidth(100)
w.setFixedHeight(25)
elif data_type == "Date":
w = QDateEdit()
w.setDisplayFormat("yyyy-MM-dd")
w.setCalendarPopup(True)
w.setMinimumWidth(130)
w.setFixedHeight(25)
elif data_type == "Integer":
w = QSpinBox()
w.setMinimum(-999999)
w.setMaximum(999999)
w.setMinimumWidth(100)
w.setFixedHeight(25)
elif data_type == "Float":
w = QDoubleSpinBox()
w.setMinimum(-999999)
w.setMaximum(999999)
w.setDecimals(2)
w.setMinimumWidth(100)
w.setFixedHeight(25)
elif data_type == "Boolean":
w = QComboBox()
w.addItem(".TRUE.", ".TRUE.")
w.addItem(".FALSE.", ".FALSE.")
w.setMinimumWidth(100)
w.setFixedHeight(25)
else:
w = QLineEdit()
w.setPlaceholderText("输入值")
w.setMinimumWidth(120)
w.setFixedHeight(25)
return w
def _makeActionValueWidget(
data_type: str
) -> QWidget:
if data_type == "Date":
w = QComboBox()
w.addItem("今天", "today")
w.addItem("明天", "tomorrow")
w.setFixedHeight(25)
w.setMinimumWidth(100)
w._is_date_action = True
return w
if data_type == "Time":
container = QWidget()
layout = QHBoxLayout(container)
layout.setContentsMargins(0, 0, 0, 0)
layout.setSpacing(2)
modeCombo = QComboBox()
modeCombo.addItem("固定时间", "fixed")
modeCombo.addItem("相对当前", "relative")
modeCombo.setFixedHeight(25)
stack = QStackedWidget()
timeEdit = QTimeEdit()
timeEdit.setDisplayFormat("HH:mm")
timeEdit.setFixedHeight(25)
spinBox = QSpinBox()
spinBox.setRange(0, 23)
spinBox.setSuffix("小时")
spinBox.setFixedHeight(25)
stack.addWidget(timeEdit)
stack.addWidget(spinBox)
modeCombo.currentIndexChanged.connect(
lambda i: stack.setCurrentIndex(i)
)
layout.addWidget(modeCombo)
layout.addWidget(stack)
container._modeCombo = modeCombo
container._timeEdit = timeEdit
container._spinBox = spinBox
container._isActionTime = True
return container
return _makeValueWidget(data_type)
def _getValueFromWidget(
w: QWidget
) -> str:
if getattr(w, '_isActionTime', False):
if w._modeCombo.currentData() == "fixed":
return w._timeEdit.time().toString("HH:mm")
else:
return f"+{w._spinBox.value()}"
if isinstance(w, QTimeEdit):
return w.time().toString("HH:mm")
if isinstance(w, QDateEdit):
return w.date().toString("yyyy-MM-dd")
if isinstance(w, QComboBox):
return w.currentText()
if isinstance(w, QSpinBox):
return str(w.value())
if isinstance(w, QDoubleSpinBox):
return str(w.value())
if isinstance(w, QLineEdit):
return w.text()
return ""
def _encodeValueStr(
raw_value: str,
data_type: str
) -> str:
if data_type == "Time":
if raw_value.startswith("+"):
return raw_value
return f"TIME({raw_value})"
elif data_type == "Date":
if raw_value == "今天":
return "CURRENT_DATE"
elif raw_value == "明天":
return "CURRENT_DATE + 1"
return f"DATE({raw_value})"
elif data_type == "Boolean":
return raw_value
elif data_type == "String":
escaped = raw_value.replace("'", "''")
return f"'{escaped}'"
else:
return raw_value
def _setWidgetValue(
w: QWidget,
vartype: str,
expr: str
):
import re
s = expr.strip()
if getattr(w, '_isActionTime', False):
timeMatch = re.match(r"TIME\((\d{1,2}:\d{2})\)", s, re.IGNORECASE)
if timeMatch:
w._modeCombo.setCurrentIndex(0)
parts = timeMatch.group(1).split(":")
w._timeEdit.setTime(QTime(int(parts[0]), int(parts[1])))
return
relMatch = re.match(r"^\+(\d+)$", s)
if relMatch:
w._modeCombo.setCurrentIndex(1)
w._spinBox.setValue(int(relMatch.group(1)))
return
return
if getattr(w, '_is_date_action', False) and isinstance(w, QComboBox):
if s.upper() in ("CURRENT_DATE", "TODAY"):
w.setCurrentIndex(0)
elif s.upper() in ("CURRENT_DATE + 1", "TOMORROW"):
w.setCurrentIndex(1)
else:
dateMatch = re.match(
r"DATE\((\d{4}-\d{2}-\d{2})\)", s, re.IGNORECASE
)
if dateMatch:
from datetime import datetime, timedelta
dateStr = dateMatch.group(1)
today = datetime.now().strftime("%Y-%m-%d")
tomorrow = (
datetime.now() + timedelta(days=1)
).strftime("%Y-%m-%d")
if dateStr == today:
w.setCurrentIndex(0)
elif dateStr == tomorrow:
w.setCurrentIndex(1)
return
if vartype == "Time":
m = re.match(r"TIME\((\d{1,2}:\d{2})\)", s, re.IGNORECASE)
if m and isinstance(w, QTimeEdit):
parts = m.group(1).split(":")
w.setTime(QTime(int(parts[0]), int(parts[1])))
elif vartype == "Date":
m = re.match(r"DATE\((\d{4}-\d{2}-\d{2})\)", s, re.IGNORECASE)
if m and isinstance(w, QDateEdit):
parts = m.group(1).split("-")
w.setDate(QDate(int(parts[0]), int(parts[1]), int(parts[2])))
elif vartype == "Boolean" and isinstance(w, QComboBox):
for i in range(w.count()):
if w.itemData(i) == s.upper():
w.setCurrentIndex(i)
break
elif vartype == "Integer" and isinstance(w, QSpinBox):
try:
w.setValue(int(s))
except ValueError:
pass
elif vartype == "Float" and isinstance(w, QDoubleSpinBox):
try:
w.setValue(float(s))
except ValueError:
pass
elif isinstance(w, QLineEdit):
inner = s
if (inner.startswith("'") and inner.endswith("'")) or \
(inner.startswith('"') and inner.endswith('"')):
inner = inner[1:-1].replace("''", "'")
w.setText(inner)
class ActionStepFrame(QFrame):
def __init__(
self,
parent=None
):
super().__init__(parent)
self.setupUi()
self.connectSignals()
self._onTargetChanged(0)
def setupUi(
self
):
self.setFrameShape(QFrame.Shape.StyledPanel)
self.setFrameShadow(QFrame.Shadow.Raised)
self.setFixedHeight(35)
layout = QHBoxLayout(self)
layout.setContentsMargins(2, 2, 2, 2)
layout.setSpacing(4)
self.targetCombo = _makeSetVarCombo()
self.valueWidgetStack = QStackedWidget()
self.valueWidgetStack.setFixedHeight(25)
self._initValueStack()
setLabel = QLabel("设置")
setLabel.setFixedHeight(25)
layout.addWidget(setLabel)
layout.addWidget(self.targetCombo)
toLabel = QLabel("")
toLabel.setFixedHeight(25)
layout.addWidget(toLabel)
layout.addWidget(self.valueWidgetStack)
self.deleteBtn = QPushButton("×")
self.deleteBtn.setFixedSize(24, 25)
self.deleteBtn.setStyleSheet("color: red; font-weight: bold;")
layout.addWidget(self.deleteBtn)
def connectSignals(
self
):
self.targetCombo.currentIndexChanged.connect(self._onTargetChanged)
def _initValueStack(
self
):
self._valueWidgets = {}
for _, _, vartype in _VAR_COMBO_ITEMS:
if vartype not in self._valueWidgets:
w = _makeActionValueWidget(vartype)
self._valueWidgets[vartype] = w
self.valueWidgetStack.addWidget(w)
self.valueWidgetStack.setCurrentWidget(
self._valueWidgets.get("String", self.valueWidgetStack.widget(0))
)
def _onTargetChanged(
self,
idx
):
if idx < 0:
return
data = self.targetCombo.itemData(idx)
if data:
_, vartype = data
w = self._valueWidgets.get(vartype)
if w:
self.valueWidgetStack.setCurrentWidget(w)
def getTarget(
self
) -> str:
data = self.targetCombo.currentData()
return data[0] if data else ""
def getTargetType(
self
) -> str:
data = self.targetCombo.currentData()
return data[1] if data else "String"
def getValueRaw(
self
) -> str:
currentType = self.getTargetType()
w = self._valueWidgets.get(currentType)
if w:
return _getValueFromWidget(w)
return ""
def toScriptLine(
self
) -> str:
target = self.getTarget()
if not target:
return ""
rawVal = self.getValueRaw()
targetType = self.getTargetType()
encoded = _encodeValueStr(rawVal, targetType)
if targetType == "Time" and rawVal.startswith("+"):
hours = rawVal[1:]
return f" {target} .ADD. {hours}"
return f" SET {target} = {encoded}"
def loadFromScript(
self,
targetVar: str,
valueExpr: str
):
for idx in range(self.targetCombo.count()):
data = self.targetCombo.itemData(idx)
if data and data[0] == targetVar:
self.targetCombo.setCurrentIndex(idx)
break
self._setValueFromExpr(valueExpr)
def _setValueFromExpr(
self,
expr: str
):
targetType = self.getTargetType()
w = self._valueWidgets.get(targetType)
if not w:
return
_setWidgetValue(w, targetType, expr)
class ConditionalBlock(QGroupBox):
def __init__(
self,
blockIndex: int,
parent=None
):
super().__init__(parent)
self.blockIndex = blockIndex
self._actionWidgets = []
self.setupUi()
self.connectSignals()
self._onOperandChanged(0)
def setupUi(
self
):
self.setStyleSheet(
"QGroupBox { font-weight: bold; border: 1px solid #ccc; "
"margin-top: 8px; padding-top: 8px; }"
)
self.setSizePolicy(
QSizePolicy.Policy.Preferred,
QSizePolicy.Policy.Fixed
)
mainLayout = QVBoxLayout(self)
mainLayout.setSpacing(4)
mainLayout.setContentsMargins(5, 5, 5, 5)
headerLayout = QHBoxLayout()
self.typeCombo = QComboBox()
self.typeCombo.addItem("IF", "IF")
self.typeCombo.addItem("ELSE IF", "ELSE IF")
self.typeCombo.addItem("ELSE", "ELSE")
if self.blockIndex == 0:
self.typeCombo.setEnabled(False)
typeLabel = QLabel("类型:")
typeLabel.setFixedHeight(25)
headerLayout.addWidget(typeLabel)
headerLayout.addWidget(self.typeCombo)
headerLayout.addStretch()
self.deleteBlockBtn = QPushButton("删除此块")
self.deleteBlockBtn.setStyleSheet("color: red;")
self.deleteBlockBtn.setFixedHeight(25)
headerLayout.addWidget(self.deleteBlockBtn)
mainLayout.addLayout(headerLayout)
self.conditionWidget = QWidget()
self.conditionWidget.setFixedHeight(60)
condLayout = QHBoxLayout(self.conditionWidget)
condLayout.setContentsMargins(0, 0, 0, 0)
ifLabel = QLabel("如果")
ifLabel.setFixedHeight(25)
condLayout.addWidget(ifLabel)
self.operandCombo = _makeVarCombo()
condLayout.addWidget(self.operandCombo)
self.opCombo = _makeOpCombo()
condLayout.addWidget(self.opCombo)
self.condValueStack = QStackedWidget()
self.condValueStack.setFixedHeight(25)
self._condValueWidgets = {}
for vartype in ["Time", "Date", "String", "Integer", "Float", "Boolean"]:
w = _makeValueWidget(vartype)
self._condValueWidgets[vartype] = w
self.condValueStack.addWidget(w)
self.condValueStack.setCurrentWidget(self._condValueWidgets.get("String"))
condLayout.addWidget(self.condValueStack)
mainLayout.addWidget(self.conditionWidget)
self.actionLabel = QLabel("执行步骤:")
self.actionLabel.setFixedHeight(25)
mainLayout.addWidget(self.actionLabel)
self.actionsLayout = QVBoxLayout()
self.actionsLayout.setSpacing(2)
mainLayout.addLayout(self.actionsLayout)
self.addActionBtn = QPushButton("+ 添加执行步骤")
self.addActionBtn.setFixedHeight(25)
mainLayout.addWidget(self.addActionBtn)
def connectSignals(
self
):
self.operandCombo.currentIndexChanged.connect(self._onOperandChanged)
self.typeCombo.currentIndexChanged.connect(self._onTypeChanged)
self.addActionBtn.clicked.connect(self._addActionStep)
def _onOperandChanged(
self,
idx
):
if idx < 0:
return
data = self.operandCombo.itemData(idx)
if data:
_, vartype = data
w = self._condValueWidgets.get(vartype)
if w:
self.condValueStack.setCurrentWidget(w)
def _onTypeChanged(
self,
idx
):
isCond = self.typeCombo.currentData() in ("IF", "ELSE IF")
self.conditionWidget.setVisible(isCond)
self.actionLabel.setText("执行步骤:" if isCond else "ELSE 执行步骤:")
def _addActionStep(
self
):
step = ActionStepFrame(self)
step.deleteBtn.clicked.connect(lambda: self._removeActionStep(step))
self._actionWidgets.append(step)
self.actionsLayout.addWidget(step)
def _removeActionStep(
self,
step: ActionStepFrame
):
if step in self._actionWidgets:
self._actionWidgets.remove(step)
self.actionsLayout.removeWidget(step)
step.hide()
step.deleteLater()
def getBlockType(
self
) -> str:
return self.typeCombo.currentData()
def toScriptLines(
self
) -> list:
blockType = self.getBlockType()
lines = []
if blockType in ("IF", "ELSE IF"):
operand = self.operandCombo.currentData()
operandName = operand[0] if operand else ""
operandType = operand[1] if operand else "String"
opSym = self.opCombo.currentData()
rawVal = _getValueFromWidget(
self._condValueWidgets.get(operandType, QLineEdit())
)
encodedVal = _encodeValueStr(rawVal, operandType)
if blockType == "IF":
lines.append(f"IF({operandName} {opSym} {encodedVal}) THEN")
else:
lines.append(f"ELSE IF({operandName} {opSym} {encodedVal}) THEN")
else:
lines.append("ELSE")
for step in self._actionWidgets:
scriptLine = step.toScriptLine()
if scriptLine:
lines.append(scriptLine)
return lines
def getConditionSummary(
self
) -> str:
bt = self.getBlockType()
if bt == "ELSE":
return "ELSE"
operandData = self.operandCombo.currentData()
if not operandData:
return bt
operandDisplay = self.operandCombo.currentText()
opDisplay = self.opCombo.currentText()
rawVal = self.getConditionRawValuePreview()
return f"{bt} ({operandDisplay} {opDisplay} {rawVal})"
def getConditionRawValuePreview(
self
) -> str:
data = self.operandCombo.currentData()
if not data:
return ""
_, vartype = data
w = self._condValueWidgets.get(vartype)
if w:
return _getValueFromWidget(w)
return ""
def countActionSteps(
self
) -> int:
return len(self._actionWidgets)
class ALPreprocOrchDialog(QDialog):
def __init__(
self,
parent=None,
existingScript: str = ""
):
super().__init__(parent)
self._blocks: list[ConditionalBlock] = []
self.modifyUi()
self.connectSignals()
if existingScript and existingScript.strip():
self._loadFromScript(existingScript)
else:
self._addBlock()
self._scrollLayout.addStretch()
def modifyUi(
self
):
self.setWindowTitle("预处理指令编排 - AutoLibrary")
self.setMinimumSize(420, 400)
self.setModal(True)
mainLayout = QVBoxLayout(self)
scroll = QScrollArea()
scroll.setWidgetResizable(True)
scroll.setFrameShape(QFrame.Shape.NoFrame)
scrollContent = QWidget()
self._scrollLayout = QVBoxLayout(scrollContent)
self._scrollLayout.setSpacing(5)
scroll.setWidget(scrollContent)
mainLayout.addWidget(scroll)
addBlockLayout = QHBoxLayout()
self.addBlockBtn = QPushButton("+ 添加判断块")
self.addBlockBtn.setFixedHeight(25)
addBlockLayout.addStretch()
addBlockLayout.addWidget(self.addBlockBtn)
addBlockLayout.addStretch()
mainLayout.addLayout(addBlockLayout)
self.btnBox = QDialogButtonBox(
QDialogButtonBox.StandardButton.Ok |
QDialogButtonBox.StandardButton.Cancel
)
self.btnBox.button(QDialogButtonBox.StandardButton.Ok).setText("确定")
self.btnBox.button(QDialogButtonBox.StandardButton.Cancel).setText("取消")
mainLayout.addWidget(self.btnBox)
def connectSignals(
self
):
self.btnBox.accepted.connect(self.accept)
self.btnBox.rejected.connect(self.reject)
self.addBlockBtn.clicked.connect(self._addBlock)
def _addBlock(
self
):
block = ConditionalBlock(len(self._blocks), self)
block.deleteBlockBtn.clicked.connect(lambda: self._removeBlock(block))
self._blocks.append(block)
block._addActionStep()
if self._scrollLayout.count() > 0:
lastItem = self._scrollLayout.itemAt(
self._scrollLayout.count() - 1
)
if lastItem and lastItem.spacerItem():
self._scrollLayout.insertWidget(
self._scrollLayout.count() - 1, block
)
return
self._scrollLayout.addWidget(block)
def _removeBlock(
self,
block: ConditionalBlock
):
if block in self._blocks:
self._blocks.remove(block)
self._scrollLayout.removeWidget(block)
block.hide()
block.deleteLater()
def getScript(
self
) -> str:
parts = []
for i, block in enumerate(self._blocks):
blockType = block.getBlockType()
if blockType == "IF" and i > 0:
parts.append("ENDIF")
lines = block.toScriptLines()
parts.extend(lines)
if self._blocks and self._blocks[0].getBlockType() == "IF":
parts.append("ENDIF")
return "\n".join(parts)
def getScriptPreview(
self
) -> str:
s = self.getScript()
if len(s) > 10:
return s[:7] + "..."
return s
def _loadFromScript(
self,
script: str
):
import re
lines = [l.strip() for l in script.split("\n") if l.strip()]
if not lines:
self._addBlock()
return
currentBlock = None
currentBlockType = None
actionsBuffer = []
def flushBlock():
nonlocal currentBlock, currentBlockType, actionsBuffer
if currentBlock is None:
return
typeIdxMap = {"IF": 0, "ELSE IF": 1, "ELSE": 2}
idx = typeIdxMap.get(currentBlockType, 0)
currentBlock.typeCombo.setCurrentIndex(idx)
currentBlock._onTypeChanged(idx)
for oldStep in list(currentBlock._actionWidgets):
currentBlock._removeActionStep(oldStep)
for target, valueExpr in actionsBuffer:
currentBlock._addActionStep()
step = currentBlock._actionWidgets[-1]
step.loadFromScript(target, valueExpr)
self._blocks.clear()
while self._scrollLayout.count():
item = self._scrollLayout.takeAt(0)
if item.widget():
item.widget().deleteLater()
for line in lines:
upper = line.upper()
ifMatch = re.match(r"^IF\((.+)\)\s*THEN\s*$", upper)
if ifMatch:
flushBlock()
currentBlockType = "IF"
actionsBuffer = []
self._addBlock()
currentBlock = self._blocks[-1]
self._parseConditionToBlock(currentBlock, ifMatch.group(1))
continue
elifIfMatch = re.match(r"^ELSE\s+IF\((.+)\)\s*THEN\s*$", upper)
if elifIfMatch:
flushBlock()
currentBlockType = "ELSE IF"
actionsBuffer = []
self._addBlock()
currentBlock = self._blocks[-1]
self._parseConditionToBlock(currentBlock, elifIfMatch.group(1))
continue
if upper == "ELSE":
flushBlock()
currentBlockType = "ELSE"
actionsBuffer = []
self._addBlock()
currentBlock = self._blocks[-1]
currentBlock.conditionWidget.setVisible(False)
continue
setMatch = re.match(r"^SET\s+(\w+)\s*=\s*(.+)$", line, re.IGNORECASE)
if setMatch:
target = setMatch.group(1).strip()
valueExpr = setMatch.group(2).strip()
actionsBuffer.append((target, valueExpr))
continue
addMatch = re.match(r"^(\w+)\s+\.ADD\.\s+(\d+)$", line, re.IGNORECASE)
if addMatch:
target = addMatch.group(1).strip()
hours = addMatch.group(2).strip()
actionsBuffer.append((target, f"+{hours}"))
continue
if upper in ("ENDIF", "END IF"):
flushBlock()
currentBlock = None
currentBlockType = None
actionsBuffer = []
continue
flushBlock()
if not self._blocks:
self._addBlock()
def _parseConditionToBlock(
self,
block: ConditionalBlock,
condStr: str
):
condStr = condStr.strip()
for _, opSym in OP_ITEMS:
idx = condStr.upper().find(opSym)
if idx >= 0:
leftPart = condStr[:idx].strip()
rightPart = condStr[idx + len(opSym):].strip()
for ci in range(block.operandCombo.count()):
data = block.operandCombo.itemData(ci)
if data and data[0] == leftPart:
block.operandCombo.setCurrentIndex(ci)
break
for oi in range(block.opCombo.count()):
if block.opCombo.itemData(oi) == opSym:
block.opCombo.setCurrentIndex(oi)
break
opData = block.operandCombo.currentData()
vartype = opData[1] if opData else "String"
w = block._condValueWidgets.get(vartype)
if w:
_setWidgetValue(w, vartype, rightPart)
return
+59 -1
View File
@@ -13,9 +13,10 @@ from enum import Enum
from datetime import datetime, timedelta from datetime import datetime, timedelta
from PySide6.QtCore import Slot, QDateTime from PySide6.QtCore import Slot, QDateTime
from PySide6.QtWidgets import QLabel, QDialog, QWidget, QSpinBox, QHBoxLayout, QGridLayout, QDateTimeEdit from PySide6.QtWidgets import QLabel, QDialog, QWidget, QSpinBox, QHBoxLayout, QVBoxLayout, QGridLayout, QDateTimeEdit, QGroupBox, QPushButton
from gui.resources.ui.Ui_ALTimerTaskAddDialog import Ui_ALTimerTaskAddDialog from gui.resources.ui.Ui_ALTimerTaskAddDialog import Ui_ALTimerTaskAddDialog
from gui.ALPreprocOrchDialog import ALPreprocOrchDialog
from utils.TimerUtils import TimerUtils from utils.TimerUtils import TimerUtils
@@ -86,6 +87,33 @@ class ALTimerTaskAddDialog(QDialog, Ui_ALTimerTaskAddDialog):
self.TimerConfigLayout.addWidget(self.RelativeTimerWidget) self.TimerConfigLayout.addWidget(self.RelativeTimerWidget)
self.RelativeTimerWidget.setVisible(False) self.RelativeTimerWidget.setVisible(False)
self.PreprocGroupBox = QGroupBox("预处理脚本")
self.PreprocLayout = QVBoxLayout(self.PreprocGroupBox)
self.PreprocLayout.setContentsMargins(3, 3, 3, 3)
self.PreprocLayout.setSpacing(3)
preproc_btn_layout = QHBoxLayout()
self.PreprocSetButton = QPushButton("设置预处理指令")
self.PreprocSetButton.setMinimumHeight(25)
self.PreprocSetButton.setFixedWidth(130)
preproc_btn_layout.addWidget(self.PreprocSetButton)
self.PreprocPreviewButton = QPushButton("预览")
self.PreprocPreviewButton.setMinimumHeight(25)
self.PreprocPreviewButton.setFixedWidth(60)
self.PreprocPreviewButton.setEnabled(False)
preproc_btn_layout.addWidget(self.PreprocPreviewButton)
preproc_btn_layout.addStretch()
self.PreprocStatusLabel = QLabel("未设置")
self.PreprocStatusLabel.setStyleSheet("color: #969696;")
self.PreprocStatusLabel.setFixedHeight(25)
preproc_btn_layout.addWidget(self.PreprocStatusLabel)
self.PreprocLayout.addLayout(preproc_btn_layout)
self.ALAddTimerTaskLayout.insertWidget(
self.ALAddTimerTaskLayout.indexOf(self.TaskConfigGroupBox) + 1,
self.PreprocGroupBox
)
self.__repeat_preproc_script = ""
def connectSignals( def connectSignals(
self self
@@ -95,6 +123,35 @@ class ALTimerTaskAddDialog(QDialog, Ui_ALTimerTaskAddDialog):
self.ConfirmButton.clicked.connect(self.accept) self.ConfirmButton.clicked.connect(self.accept)
self.TimerTypeComboBox.currentIndexChanged.connect(self.onTimerTypeComboBoxIndexChanged) self.TimerTypeComboBox.currentIndexChanged.connect(self.onTimerTypeComboBoxIndexChanged)
self.RepeatCheckBox.toggled.connect(self.onRepeatCheckBoxToggled) self.RepeatCheckBox.toggled.connect(self.onRepeatCheckBoxToggled)
self.PreprocSetButton.clicked.connect(self._onSetPreproc)
self.PreprocPreviewButton.clicked.connect(self._onPreviewPreproc)
@Slot()
def _onSetPreproc(self):
dlg = ALPreprocOrchDialog(self, existingScript=self.__repeat_preproc_script)
if dlg.exec() == QDialog.DialogCode.Accepted:
script = dlg.getScript()
self.__repeat_preproc_script = script
if script:
self.PreprocStatusLabel.setText("已设置")
self.PreprocStatusLabel.setStyleSheet("color: #4CAF50;")
self.PreprocPreviewButton.setEnabled(True)
else:
self.PreprocStatusLabel.setText("未设置")
self.PreprocStatusLabel.setStyleSheet("color: #969696;")
self.PreprocPreviewButton.setEnabled(False)
dlg.deleteLater()
@Slot()
def _onPreviewPreproc(self):
if not self.__repeat_preproc_script:
return
from gui.ALPreProcPrevDialog import ALScriptPreviewDialog
dlg = ALScriptPreviewDialog(self, self.__repeat_preproc_script)
dlg.exec()
dlg.deleteLater()
def getTimerTask( def getTimerTask(
@@ -129,6 +186,7 @@ class ALTimerTaskAddDialog(QDialog, Ui_ALTimerTaskAddDialog):
"status": ALTimerTaskStatus.PENDING, "status": ALTimerTaskStatus.PENDING,
"executed": False, "executed": False,
"repeat": self.RepeatCheckBox.isChecked(), "repeat": self.RepeatCheckBox.isChecked(),
"repeat_preproc": self.__repeat_preproc_script,
} }
if task_data["repeat"]: if task_data["repeat"]:
task_data["history"] = [] # repeat history task_data["history"] = [] # repeat history
+1
View File
@@ -10,6 +10,7 @@
- ALSeatMapTable: Seat map table class. - ALSeatMapTable: Seat map table class.
- ALSeatMapSelectDialog: Seat map select dialog class. - ALSeatMapSelectDialog: Seat map select dialog class.
- ALTimerTaskAddDialog: Timer task add dialog class. - ALTimerTaskAddDialog: Timer task add dialog class.
- ALPreprocOrchDialog: Preprocessing script orchestration dialog class.
- ALTimerTaskHistoryDialog: Timer task history dialog class. - ALTimerTaskHistoryDialog: Timer task history dialog class.
- ALTimerTaskManageWidget: Timer task manage widget class. - ALTimerTaskManageWidget: Timer task manage widget class.
- ALUserTreeWidget: User tree widget class. - ALUserTreeWidget: User tree widget class.
+1 -1
View File
@@ -13,7 +13,7 @@
<property name="minimumSize"> <property name="minimumSize">
<size> <size>
<width>350</width> <width>350</width>
<height>400</height> <height>460</height>
</size> </size>
</property> </property>
<property name="maximumSize"> <property name="maximumSize">
+328
View File
@@ -0,0 +1,328 @@
# -*- coding: utf-8 -*-
"""
Copyright (c) 2026 KenanZhu.
All rights reserved.
This software is provided "as is", without any warranty of any kind.
You may use, modify, and distribute this file under the terms of the MIT License.
See the LICENSE file for details.
"""
import re
from datetime import datetime, timedelta
class PreprocEngine:
COMPARE_OPS = {
".EQ.": lambda a, b: a == b,
".NEQ.": lambda a, b: a != b,
".BGT.": lambda a, b: a > b,
".BLT.": lambda a, b: a < b,
".BGE.": lambda a, b: a >= b,
".BLE.": lambda a, b: a <= b,
}
VARIABLE_META = {
"预约开始时间": ("RESERVE_BEGIN_TIME", "Time"),
"预约结束时间": ("RESERVE_END_TIME", "Time"),
"预约日期": ("RESERVE_DATE", "Date"),
"用户名": ("USERNAME", "String"),
"用户启用": ("USER_ENABLE", "Boolean"),
"当前时间": ("CURRENT_TIME", "Time"),
"当前日期": ("CURRENT_DATE", "Date"),
}
@staticmethod
def execute(
script_text: str,
user_data: dict
):
if not script_text or not script_text.strip():
return
lines = [l.strip() for l in script_text.split("\n") if l.strip()]
if not lines:
return
if_stack = []
for line in lines:
upper_line = line.upper().strip()
if upper_line.startswith("IF("):
cond_end = _findConditionEnd(upper_line)
if cond_end < 0:
raise ValueError("语法错误: IF 缺少右括号")
condition_str = line[3:cond_end].strip()
matched = PreprocEngine._evaluateCondition(
condition_str, user_data
)
if_stack.append([matched, matched])
elif upper_line.startswith("ELSE IF("):
if not if_stack:
raise ValueError("语法错误: ELSE IF 前缺少 IF")
cond_end = _findConditionEnd(upper_line)
if cond_end < 0:
raise ValueError("语法错误: ELSE IF 缺少右括号")
condition_str = line[8:cond_end].strip()
_, has_matched = if_stack[-1]
if not has_matched:
matched = PreprocEngine._evaluateCondition(
condition_str, user_data
)
if_stack[-1] = [matched, matched]
else:
if_stack[-1][0] = False
elif upper_line == "ELSE":
if not if_stack:
raise ValueError("语法错误: ELSE 前缺少 IF")
_, has_matched = if_stack[-1]
if not has_matched:
if_stack[-1] = [True, True]
else:
if_stack[-1][0] = False
elif upper_line in ("ENDIF", "END IF"):
if not if_stack:
raise ValueError("语法错误: ENDIF/END IF 前缺少 IF")
if_stack.pop()
elif upper_line.startswith("SET "):
should_execute = (
all(ctx[0] for ctx in if_stack) if if_stack else True
)
if should_execute:
PreprocEngine._executeSet(line, user_data)
elif upper_line == "PASS":
continue
else:
should_execute = (
all(ctx[0] for ctx in if_stack) if if_stack else True
)
if should_execute:
PreprocEngine._executeOperation(line, user_data)
if if_stack:
raise ValueError("语法错误: IF 与 ENDIF/END IF 不匹配")
@staticmethod
def _resolveValue(
value_str: str,
user_data: dict
) -> str:
s = value_str.strip()
time_match = re.match(r"^TIME\((\d{1,2}):(\d{2})\)$", s, re.IGNORECASE)
if time_match:
h, m = time_match.group(1), time_match.group(2)
return f"{int(h):02d}:{int(m):02d}"
date_match = re.match(r"^DATE\((\d{4})-(\d{2})-(\d{2})\)$", s, re.IGNORECASE)
if date_match:
y, mo, d = date_match.group(1), date_match.group(2), date_match.group(3)
return f"{int(y):04d}-{int(mo):02d}-{int(d):02d}"
if s.upper() == ".TRUE.":
return "True"
if s.upper() == ".FALSE.":
return "False"
if s.startswith("'") and s.endswith("'"):
inner = s[1:-1].replace("''", "'")
return inner
if s.startswith('"') and s.endswith('"'):
return s[1:-1]
relDate = re.match(r"^CURRENT_DATE\s*\+\s*(\d+)$", s, re.IGNORECASE)
if relDate:
days = int(relDate.group(1))
return (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d")
relTime = re.match(r"^CURRENT_TIME\s*\+\s*(\d+)$", s, re.IGNORECASE)
if relTime:
hours = int(relTime.group(1))
return (datetime.now() + timedelta(hours=hours)).strftime("%H:%M")
try:
float(s)
return s
except ValueError:
pass
resolved = PreprocEngine._resolveField(s, user_data)
return resolved
@staticmethod
def _resolveField(
field_name: str,
user_data: dict
) -> str:
upper_name = field_name.upper().strip()
if upper_name == "CURRENT_DATE":
return datetime.now().strftime("%Y-%m-%d")
elif upper_name == "CURRENT_TIME":
return datetime.now().strftime("%H:%M")
elif upper_name == "USERNAME":
return user_data.get("username", "")
elif upper_name == "USER_ENABLE":
return str(user_data.get("enabled", "False"))
elif upper_name == "RESERVE_DATE":
return user_data.get("reserve_info", {}).get("date", "")
elif upper_name == "RESERVE_BEGIN_TIME":
return (
user_data
.get("reserve_info", {})
.get("begin_time", {})
.get("time", "")
)
elif upper_name == "RESERVE_END_TIME":
return (
user_data
.get("reserve_info", {})
.get("end_time", {})
.get("time", "")
)
return ""
@staticmethod
def _setField(
field_name: str,
value: str,
user_data: dict
):
upper_name = field_name.upper().strip()
if upper_name == "RESERVE_DATE":
user_data.setdefault("reserve_info", {})["date"] = value
elif upper_name == "RESERVE_BEGIN_TIME":
ri = user_data.setdefault("reserve_info", {})
ri.setdefault("begin_time", {})["time"] = value
elif upper_name == "RESERVE_END_TIME":
ri = user_data.setdefault("reserve_info", {})
ri.setdefault("end_time", {})["time"] = value
elif upper_name == "USERNAME":
user_data["username"] = value
elif upper_name == "USER_ENABLE":
user_data["enabled"] = value.upper() == "TRUE"
@staticmethod
def _evaluateCondition(
condition_str: str,
user_data: dict
) -> bool:
for op, cmp_func in PreprocEngine.COMPARE_OPS.items():
if op not in condition_str.upper():
continue
idx = condition_str.upper().find(op)
parts = [condition_str[:idx], condition_str[idx + len(op):]]
if len(parts) != 2:
continue
field_name = parts[0].strip()
value_str = parts[1].strip()
left_val = PreprocEngine._resolveField(field_name, user_data)
right_val = PreprocEngine._resolveValue(value_str, user_data)
return cmp_func(left_val, right_val)
return False
@staticmethod
def _executeSet(
line: str,
user_data: dict
):
rest = line[3:].strip()
eq_idx = rest.find("=")
if eq_idx < 0:
return
field_name = rest[:eq_idx].strip()
value_str = rest[eq_idx + 1:].strip()
if not field_name:
return
resolved = PreprocEngine._resolveValue(value_str, user_data)
PreprocEngine._setField(field_name, resolved, user_data)
@staticmethod
def _executeOperation(
line: str,
user_data: dict
):
parts = line.split()
if len(parts) < 3:
return
field_name = parts[0].upper().strip()
op = parts[1].upper().strip()
raw_value = parts[2].strip()
try:
num_value = float(raw_value) if "." in raw_value else int(raw_value)
except (ValueError, TypeError):
return
if field_name == "RESERVE_DATE":
date_str = user_data.get("reserve_info", {}).get("date", "")
if not date_str:
return
try:
date_obj = datetime.strptime(date_str, "%Y-%m-%d")
except (ValueError, TypeError):
return
if op == ".ADD.":
date_obj += timedelta(days=num_value)
elif op == ".SUB.":
date_obj -= timedelta(days=num_value)
else:
return
user_data.setdefault("reserve_info", {})["date"] = \
date_obj.strftime("%Y-%m-%d")
elif field_name == "RESERVE_BEGIN_TIME":
time_str = (
user_data
.get("reserve_info", {})
.get("begin_time", {})
.get("time", "")
)
if not time_str:
return
try:
time_obj = datetime.strptime(time_str, "%H:%M")
except (ValueError, TypeError):
return
if op == ".ADD.":
time_obj += timedelta(hours=num_value)
elif op == ".SUB.":
time_obj -= timedelta(hours=num_value)
else:
return
ri = user_data.setdefault("reserve_info", {})
ri.setdefault("begin_time", {})["time"] = \
time_obj.strftime("%H:%M")
elif field_name == "RESERVE_END_TIME":
time_str = (
user_data
.get("reserve_info", {})
.get("end_time", {})
.get("time", "")
)
if not time_str:
return
try:
time_obj = datetime.strptime(time_str, "%H:%M")
except (ValueError, TypeError):
return
if op == ".ADD.":
time_obj += timedelta(hours=num_value)
elif op == ".SUB.":
time_obj -= timedelta(hours=num_value)
else:
return
ri = user_data.setdefault("reserve_info", {})
ri.setdefault("end_time", {})["time"] = \
time_obj.strftime("%H:%M")
def _findConditionEnd(
upper_line: str
) -> int:
line = upper_line.rstrip()
if line.endswith(" THEN"):
line = line[:-5].rstrip()
paren_depth = 0
start_found = False
for i, ch in enumerate(line):
if ch == "(":
paren_depth += 1
start_found = True
elif ch == ")":
paren_depth -= 1
if start_found and paren_depth == 0:
return i
return -1