mirror of
https://github.com/KenanZhu/AutoLibrary.git
synced 2026-06-18 07:23:03 +08:00
refactor(autoscript): 使用 ASTokenizer 和 NodeVisitor 重构解析与执行流程
This commit is contained in:
+177
-167
@@ -8,10 +8,29 @@ You may use, modify, and distribute this file under the terms of the MIT License
|
||||
See the LICENSE file for details.
|
||||
"""
|
||||
import re
|
||||
from datetime import datetime, timedelta, date, time
|
||||
from datetime import (
|
||||
datetime,
|
||||
timedelta,
|
||||
date,
|
||||
time
|
||||
)
|
||||
|
||||
from .ASObject import ASObject, _META_VARS, _inferType
|
||||
from .ASObject import (
|
||||
ASObject,
|
||||
_META_VARS,
|
||||
_inferType
|
||||
)
|
||||
from .ASOperator import ASOperator
|
||||
from .ASTokenizer import (
|
||||
ASTokenizer,
|
||||
NodeVisitor,
|
||||
Script,
|
||||
IfNode,
|
||||
SetNode,
|
||||
OpNode,
|
||||
PassNode,
|
||||
UnrecogNode
|
||||
)
|
||||
|
||||
|
||||
__all__ = ["execute", "addTargetVar"]
|
||||
@@ -24,70 +43,30 @@ _TARGET_VARS = {}
|
||||
_SCRIPT_VARS = {}
|
||||
# Name -> ASObject lookup map built from _META_VARS, _TARGET_VARS, and display names
|
||||
_FIELD_MAP = {}
|
||||
# Current line number for error reporting
|
||||
_CUR_LINE = 0
|
||||
|
||||
|
||||
def _errPos(
|
||||
line: int,
|
||||
message: str
|
||||
) -> str:
|
||||
"""
|
||||
Format an error message with the current script line number.
|
||||
Format an error message with a script line number.
|
||||
|
||||
Args:
|
||||
line (int): The script line number where the error occurred.
|
||||
message (str): The error description.
|
||||
|
||||
Returns:
|
||||
str: A formatted error string like "AutoScript syntax error(line X): message".
|
||||
"""
|
||||
return f"AutoScript 语法错误(第{_CUR_LINE}行): {message}"
|
||||
return f"AutoScript 语法错误(第{line}行): {message}"
|
||||
|
||||
|
||||
def _findConditionBegin(
|
||||
upper_line: str
|
||||
) -> int:
|
||||
"""
|
||||
Find the position of the opening parenthesis that starts a condition.
|
||||
|
||||
Args:
|
||||
upper_line (str): The uppercased IF / ELSE IF line.
|
||||
|
||||
Returns:
|
||||
int: Index of '(' or -1 if not found.
|
||||
"""
|
||||
return upper_line.find("(")
|
||||
|
||||
|
||||
def _findConditionEnd(
|
||||
upper_line: str,
|
||||
start_pos: int
|
||||
) -> int:
|
||||
"""
|
||||
Find the matching closing parenthesis for a condition expression.
|
||||
|
||||
Handles nested parentheses and optionally strips a trailing "THEN" keyword.
|
||||
|
||||
Args:
|
||||
upper_line (str): The uppercased IF / ELSE IF line.
|
||||
start_pos (int): Index of the opening '('.
|
||||
|
||||
Returns:
|
||||
int: Index of the matching ')' or -1 if unbalanced.
|
||||
"""
|
||||
|
||||
line = upper_line.rstrip()
|
||||
if line.endswith(" THEN"):
|
||||
line = line[:-5].rstrip()
|
||||
depth = 1
|
||||
for i in range(start_pos + 1, len(line)):
|
||||
ch = line[i]
|
||||
if ch == "(":
|
||||
depth += 1
|
||||
elif ch == ")":
|
||||
depth -= 1
|
||||
if depth == 0:
|
||||
return i
|
||||
return -1
|
||||
# Pre-compiled regex patterns for value resolution
|
||||
_RE_TIME = re.compile(r"^TIME\((\d{1,2}):(\d{2})\)$", re.IGNORECASE)
|
||||
_RE_DATE = re.compile(r"^DATE\((\d{4})-(\d{2})-(\d{2})\)$", re.IGNORECASE)
|
||||
_RE_CUR_DATE_OFFSET = re.compile(r"^CURRENT_DATE\s*\+\s*(\d+)$", re.IGNORECASE)
|
||||
_RE_CUR_TIME_OFFSET = re.compile(r"^CURRENT_TIME\s*\+\s*(\d+)$", re.IGNORECASE)
|
||||
|
||||
|
||||
def _splitTopLevel(
|
||||
@@ -203,10 +182,10 @@ def _resolveValue(
|
||||
"""
|
||||
|
||||
s = value_str.strip()
|
||||
m = re.match(r"^TIME\((\d{1,2}):(\d{2})\)$", s, re.IGNORECASE)
|
||||
m = _RE_TIME.match(s)
|
||||
if m:
|
||||
return time(int(m.group(1)), int(m.group(2)))
|
||||
m = re.match(r"^DATE\((\d{4})-(\d{2})-(\d{2})\)$", s, re.IGNORECASE)
|
||||
m = _RE_DATE.match(s)
|
||||
if m:
|
||||
return date(int(m.group(1)), int(m.group(2)), int(m.group(3)))
|
||||
up = s.upper()
|
||||
@@ -218,11 +197,11 @@ def _resolveValue(
|
||||
return s[1:-1].replace("''", "'")
|
||||
if s.startswith('"') and s.endswith('"'):
|
||||
return s[1:-1]
|
||||
m = re.match(r"^CURRENT_DATE\s*\+\s*(\d+)$", s, re.IGNORECASE)
|
||||
m = _RE_CUR_DATE_OFFSET.match(s)
|
||||
if m:
|
||||
days = int(m.group(1))
|
||||
return datetime.now().date() + timedelta(days=days)
|
||||
m = re.match(r"^CURRENT_TIME\s*\+\s*(\d+)$", s, re.IGNORECASE)
|
||||
m = _RE_CUR_TIME_OFFSET.match(s)
|
||||
if m:
|
||||
hours = int(m.group(1))
|
||||
return (datetime.now() + timedelta(hours=hours)).time()
|
||||
@@ -273,7 +252,8 @@ def _resolveAsObject(
|
||||
|
||||
def _evaluateCondition(
|
||||
condition_str: str,
|
||||
target_data: dict
|
||||
target_data: dict,
|
||||
line: int = 0
|
||||
) -> bool:
|
||||
"""
|
||||
Evaluate a condition expression and return a boolean result.
|
||||
@@ -304,18 +284,18 @@ def _evaluateCondition(
|
||||
or_parts = _splitTopLevel(s, ".OR.")
|
||||
if len(or_parts) > 1:
|
||||
return any(
|
||||
_evaluateCondition(p.strip(), target_data)
|
||||
_evaluateCondition(p.strip(), target_data, line)
|
||||
for p in or_parts
|
||||
)
|
||||
and_parts = _splitTopLevel(s, ".AND.")
|
||||
if len(and_parts) > 1:
|
||||
return all(
|
||||
_evaluateCondition(p.strip(), target_data)
|
||||
_evaluateCondition(p.strip(), target_data, line)
|
||||
for p in and_parts
|
||||
)
|
||||
s = s.strip()
|
||||
if s.startswith("(") and s.endswith(")"):
|
||||
return _evaluateCondition(s[1:-1], target_data)
|
||||
return _evaluateCondition(s[1:-1], target_data, line)
|
||||
up = s.upper()
|
||||
if up == ".TRUE.":
|
||||
return True
|
||||
@@ -331,13 +311,14 @@ def _evaluateCondition(
|
||||
right_obj = _resolveAsObject(right_raw, target_data)
|
||||
return ASOperator.compare(left_obj, right_obj, op, target_data)
|
||||
raise ValueError(
|
||||
_errPos(f"无法识别的条件表达式 '{condition_str}'")
|
||||
_errPos(line, f"无法识别的条件表达式 '{condition_str}'")
|
||||
)
|
||||
|
||||
|
||||
def _executeSet(
|
||||
line: str,
|
||||
target_data: dict
|
||||
line_text: str,
|
||||
target_data: dict,
|
||||
line: int = 0
|
||||
):
|
||||
"""
|
||||
Execute a SET statement to assign a value to a field or script variable.
|
||||
@@ -354,7 +335,7 @@ def _executeSet(
|
||||
ValueError: If the value string contains unexpected extra tokens.
|
||||
"""
|
||||
|
||||
rest = line[3:].strip()
|
||||
rest = line_text[3:].strip()
|
||||
eq_idx = rest.find("=")
|
||||
if eq_idx < 0:
|
||||
return
|
||||
@@ -365,7 +346,7 @@ def _executeSet(
|
||||
resolved = _resolveValue(value_str, target_data)
|
||||
stripped = value_str.strip()
|
||||
if resolved == "" and stripped not in ("''", '""') and len(stripped.split()) > 1:
|
||||
raise ValueError(_errPos(f"SET 值中存在多余内容 '{stripped}'"))
|
||||
raise ValueError(_errPos(line, f"SET 值中存在多余内容 '{stripped}'"))
|
||||
upper_name = field_name.upper().strip()
|
||||
obj = _FIELD_MAP.get(upper_name)
|
||||
if not obj:
|
||||
@@ -385,8 +366,9 @@ def _executeSet(
|
||||
|
||||
|
||||
def _executeOperation(
|
||||
line: str,
|
||||
target_data: dict
|
||||
line_text: str,
|
||||
target_data: dict,
|
||||
line: int = 0
|
||||
):
|
||||
"""
|
||||
Execute a field operation statement: "FIELD .ADD. N" or "FIELD .SUB. N".
|
||||
@@ -403,42 +385,23 @@ def _executeOperation(
|
||||
or the type does not support the operation.
|
||||
"""
|
||||
|
||||
parts = line.split()
|
||||
parts = line_text.split()
|
||||
if len(parts) < 3:
|
||||
return
|
||||
if len(parts) > 3:
|
||||
raise ValueError(
|
||||
_errPos(f"操作语句中存在多余内容 '{' '.join(parts[3:])}'")
|
||||
_errPos(line, f"操作语句中存在多余内容 '{' '.join(parts[3:])}'")
|
||||
)
|
||||
field_name = parts[0].upper().strip()
|
||||
op = parts[1].upper().strip()
|
||||
raw_value = parts[2].strip()
|
||||
target = _resolveFieldObj(field_name)
|
||||
if target is None:
|
||||
raise ValueError(_errPos(f"未知字段 '{field_name}'"))
|
||||
raise ValueError(_errPos(line, f"未知字段 '{field_name}'"))
|
||||
operand = _resolveAsObject(raw_value, target_data)
|
||||
ASOperator.apply(target, operand, op, target_data)
|
||||
|
||||
|
||||
def _assertInIf(
|
||||
if_stack: list,
|
||||
line: str
|
||||
):
|
||||
"""
|
||||
Assert that an executable statement is inside an IF block.
|
||||
|
||||
Args:
|
||||
if_stack (list): The current IF nesting stack.
|
||||
line (str): The statement line (used for error message).
|
||||
|
||||
Raises:
|
||||
ValueError: If if_stack is empty (statement is outside any IF block).
|
||||
"""
|
||||
|
||||
if not if_stack:
|
||||
raise ValueError(_errPos(f"可执行语句必须位于 IF 块内: {line}"))
|
||||
|
||||
|
||||
def addTargetVar(
|
||||
name: str,
|
||||
var_type: str,
|
||||
@@ -472,6 +435,125 @@ def addTargetVar(
|
||||
_TARGET_VARS[upper_name] = obj
|
||||
|
||||
|
||||
class _EngineExecutor(NodeVisitor):
|
||||
"""
|
||||
AST visitor that executes AutoScript against target_data.
|
||||
Walks the AST and dispatches SET / ADD / SUB operations
|
||||
via visitScript / visitIf / visitSet / visitOp / visitPass / visitUnrecog.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
target_data: dict
|
||||
):
|
||||
|
||||
super().__init__()
|
||||
self._target_data = target_data
|
||||
self._cur_line = 0
|
||||
|
||||
@property
|
||||
def _line(self) -> int:
|
||||
"""Return current line number for _errPos calls."""
|
||||
|
||||
return self._cur_line
|
||||
|
||||
def _incLine(
|
||||
self
|
||||
):
|
||||
|
||||
self._cur_line += 1
|
||||
|
||||
def visitScript(
|
||||
self,
|
||||
_node: Script
|
||||
):
|
||||
|
||||
for child in _node.body:
|
||||
child.accept(self)
|
||||
|
||||
def visitIf(
|
||||
self,
|
||||
_node: IfNode
|
||||
):
|
||||
|
||||
self._incLine()
|
||||
if not _node.closed:
|
||||
raise ValueError(_errPos(self._line, "IF 与 ENDIF / END IF 不匹配"))
|
||||
matched = _evaluateCondition(_node.condition, self._target_data, self._line)
|
||||
if matched:
|
||||
for child in _node.body:
|
||||
child.accept(self)
|
||||
else:
|
||||
executed = False
|
||||
for elif_node in _node.elif_branches:
|
||||
self._incLine()
|
||||
if _evaluateCondition(elif_node.condition, self._target_data, self._line):
|
||||
for child in elif_node.body:
|
||||
child.accept(self)
|
||||
executed = True
|
||||
break
|
||||
if not executed and _node.else_body:
|
||||
self._incLine()
|
||||
for child in _node.else_body:
|
||||
child.accept(self)
|
||||
|
||||
def visitSet(
|
||||
self,
|
||||
_node: SetNode
|
||||
):
|
||||
|
||||
self._incLine()
|
||||
full_line = f"SET {_node.target} = {_node.value}"
|
||||
_executeSet(full_line, self._target_data, self._line)
|
||||
|
||||
def visitOp(
|
||||
self,
|
||||
_node: OpNode
|
||||
):
|
||||
|
||||
self._incLine()
|
||||
op_upper = _node.op_type.upper()
|
||||
full_line = f"{_node.target} .{op_upper}. {_node.value}"
|
||||
_executeOperation(full_line, self._target_data, self._line)
|
||||
|
||||
def visitPass(
|
||||
self,
|
||||
_node: PassNode
|
||||
):
|
||||
|
||||
self._incLine()
|
||||
|
||||
def visitUnrecog(
|
||||
self,
|
||||
_node: UnrecogNode
|
||||
):
|
||||
|
||||
self._incLine()
|
||||
upper = _node.raw_line.upper().strip()
|
||||
if upper.startswith("IF"):
|
||||
paren_open = upper.find("(")
|
||||
if paren_open < 0:
|
||||
raise ValueError(_errPos(self._line, "IF 缺少左括号"))
|
||||
depth = 1
|
||||
for ci in range(paren_open + 1, len(upper)):
|
||||
if upper[ci] == "(":
|
||||
depth += 1
|
||||
elif upper[ci] == ")":
|
||||
depth -= 1
|
||||
if depth == 0:
|
||||
remaining = upper[ci + 1:].strip()
|
||||
if remaining and remaining != "THEN":
|
||||
raise ValueError(_errPos(self._line, f"IF 条件后存在多余内容 '{remaining}'"))
|
||||
break
|
||||
if depth > 0:
|
||||
raise ValueError(_errPos(self._line, "IF 缺少右括号"))
|
||||
elif upper.startswith("ELSE IF"):
|
||||
paren_open = upper.find("(")
|
||||
if paren_open < 0:
|
||||
raise ValueError(_errPos(self._line, "ELSE IF 缺少左括号"))
|
||||
_executeOperation(_node.raw_line, self._target_data, self._line)
|
||||
|
||||
|
||||
def execute(
|
||||
script_text: str,
|
||||
target_data: dict
|
||||
@@ -479,9 +561,9 @@ def execute(
|
||||
"""
|
||||
Execute an AutoScript on the given target data.
|
||||
|
||||
Parses the script line by line, maintaining an IF nesting stack to control
|
||||
which blocks are active. Supports IF / ELSE IF / ELSE / END IF control flow,
|
||||
SET assignments, PASS no-ops, and .ADD. / .SUB. field operations.
|
||||
Parses the script into an AST via ASTokenizer.parse(),
|
||||
then walks the tree with a visitor to evaluate conditions
|
||||
and dispatch SET / ADD / SUB operations.
|
||||
|
||||
Args:
|
||||
script_text (str): The AutoScript source code.
|
||||
@@ -489,85 +571,13 @@ def execute(
|
||||
|
||||
Raises:
|
||||
ValueError: On syntax errors, unbalanced IF/END IF, unknown fields, etc.
|
||||
|
||||
Example:
|
||||
>>> data = {"reserve_info": {"date": "2026-05-01"}}
|
||||
>>> execute(
|
||||
... "IF(.TRUE.)\\n"
|
||||
... " RESERVE_DATE .ADD. 1\\n"
|
||||
... "END IF",
|
||||
... data
|
||||
... )
|
||||
>>> data["reserve_info"]["date"]
|
||||
'2026-05-02'
|
||||
"""
|
||||
|
||||
global _CUR_LINE
|
||||
|
||||
_buildFieldMap()
|
||||
if not script_text or not script_text.strip():
|
||||
return
|
||||
lines = [l.strip() for l in script_text.split("\n") if l.strip()]
|
||||
if not lines:
|
||||
ast = ASTokenizer.parse(script_text)
|
||||
if not ast.body:
|
||||
return
|
||||
if_stack = []
|
||||
for _CUR_LINE, line in enumerate(lines, 1):
|
||||
upper_line = line.upper().strip()
|
||||
if upper_line.startswith("IF"):
|
||||
paren_open = _findConditionBegin(upper_line)
|
||||
if paren_open < 0:
|
||||
raise ValueError(_errPos("IF 缺少左括号"))
|
||||
cond_end = _findConditionEnd(upper_line, paren_open)
|
||||
if cond_end < 0:
|
||||
raise ValueError(_errPos("IF 缺少右括号"))
|
||||
remaining = upper_line[cond_end + 1:].strip()
|
||||
if remaining and remaining.upper() != "THEN":
|
||||
raise ValueError(_errPos(f"IF 条件后存在多余内容 '{remaining}'"))
|
||||
condition_str = line[paren_open + 1:cond_end].strip()
|
||||
matched = _evaluateCondition(condition_str, target_data)
|
||||
if_stack.append([matched, matched])
|
||||
elif upper_line.startswith("ELSE IF"):
|
||||
if not if_stack:
|
||||
raise ValueError(_errPos("ELSE IF 前缺少 IF"))
|
||||
paren_open = _findConditionBegin(upper_line)
|
||||
if paren_open < 0:
|
||||
raise ValueError(_errPos("ELSE IF 缺少左括号"))
|
||||
cond_end = _findConditionEnd(upper_line, paren_open)
|
||||
if cond_end < 0:
|
||||
raise ValueError(_errPos("ELSE IF 缺少右括号"))
|
||||
remaining = upper_line[cond_end + 1:].strip()
|
||||
if remaining and remaining.upper() != "THEN":
|
||||
raise ValueError(_errPos(f"ELSE IF 条件后存在多余内容 '{remaining}'"))
|
||||
_, branch_matched = if_stack[-1]
|
||||
if not branch_matched:
|
||||
condition_str = line[paren_open + 1:cond_end].strip()
|
||||
matched = _evaluateCondition(condition_str, target_data)
|
||||
if_stack[-1] = [matched, matched]
|
||||
else:
|
||||
if_stack[-1][0] = False
|
||||
elif upper_line == "ELSE":
|
||||
if not if_stack:
|
||||
raise ValueError(_errPos("ELSE 前缺少 IF"))
|
||||
_, branch_matched = if_stack[-1]
|
||||
if not branch_matched:
|
||||
if_stack[-1] = [True, True]
|
||||
else:
|
||||
if_stack[-1][0] = False
|
||||
elif upper_line in ("ENDIF", "END IF"):
|
||||
if not if_stack:
|
||||
raise ValueError(_errPos("ENDIF / END IF 前缺少 IF"))
|
||||
if_stack.pop()
|
||||
elif upper_line.startswith("SET "):
|
||||
_assertInIf(if_stack, line)
|
||||
if all(ctx[0] for ctx in if_stack):
|
||||
_executeSet(line, target_data)
|
||||
elif upper_line == "PASS":
|
||||
continue
|
||||
else:
|
||||
_assertInIf(if_stack, line)
|
||||
if all(ctx[0] for ctx in if_stack):
|
||||
_executeOperation(line, target_data)
|
||||
if if_stack:
|
||||
raise ValueError(
|
||||
"AutoScript 语法错误: IF 与 ENDIF / END IF 不匹配"
|
||||
)
|
||||
executor = _EngineExecutor(target_data)
|
||||
ast.accept(executor)
|
||||
|
||||
Reference in New Issue
Block a user