From 500ddd41c59fdee7d97662442305f5664a06bccc Mon Sep 17 00:00:00 2001 From: KenanZhu <3471685733@qq.com> Date: Tue, 12 May 2026 11:49:43 +0800 Subject: [PATCH] =?UTF-8?q?refactor(autoscript):=20=E6=9B=BF=E6=8D=A2=20ds?= =?UTF-8?q?l=20=E5=8C=85=E4=B8=BA=20autoscript=20=E5=BC=95=E6=93=8E?= =?UTF-8?q?=E6=A8=A1=E5=9D=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/autoscript/ASEngine.py | 573 +++++++++++++++++++++++++++++++++++ src/autoscript/ASObject.py | 251 +++++++++++++++ src/autoscript/ASOperator.py | 185 +++++++++++ src/autoscript/__init__.py | 55 ++++ src/dsl/AutoScriptEngine.py | 386 ----------------------- src/dsl/__init__.py | 9 - 6 files changed, 1064 insertions(+), 395 deletions(-) create mode 100644 src/autoscript/ASEngine.py create mode 100644 src/autoscript/ASObject.py create mode 100644 src/autoscript/ASOperator.py create mode 100644 src/autoscript/__init__.py delete mode 100644 src/dsl/AutoScriptEngine.py delete mode 100644 src/dsl/__init__.py diff --git a/src/autoscript/ASEngine.py b/src/autoscript/ASEngine.py new file mode 100644 index 0000000..9e62490 --- /dev/null +++ b/src/autoscript/ASEngine.py @@ -0,0 +1,573 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +import re +from datetime import datetime, timedelta, date, time + +from .ASObject import ASObject, _META_VARS, _inferType +from .ASOperator import ASOperator + + +__all__ = ["execute", "addTargetVar"] + + +# Engine state +# User-registered target variables (bound to target_data paths) +_TARGET_VARS = {} +# Free-form script variables (not bound to target_data) +_SCRIPT_VARS = {} +# Name -> ASObject lookup map built from _META_VARS, _TARGET_VARS, and display names +_FIELD_MAP = {} +# Current line number for error reporting +_CUR_LINE = 0 + + +def _errPos( + message: str +) -> str: + """ + Format an error message with the current script line number. + + Args: + message (str): The error description. + + Returns: + str: A formatted error string like "AutoScript syntax error(line X): message". + """ + return f"AutoScript 语法错误(第{_CUR_LINE}行): {message}" + + +def _findConditionBegin( + upper_line: str +) -> int: + """ + Find the position of the opening parenthesis that starts a condition. + + Args: + upper_line (str): The uppercased IF / ELSE IF line. + + Returns: + int: Index of '(' or -1 if not found. + """ + return upper_line.find("(") + + +def _findConditionEnd( + upper_line: str, + start_pos: int +) -> int: + """ + Find the matching closing parenthesis for a condition expression. + + Handles nested parentheses and optionally strips a trailing "THEN" keyword. + + Args: + upper_line (str): The uppercased IF / ELSE IF line. + start_pos (int): Index of the opening '('. + + Returns: + int: Index of the matching ')' or -1 if unbalanced. + """ + + line = upper_line.rstrip() + if line.endswith(" THEN"): + line = line[:-5].rstrip() + depth = 1 + for i in range(start_pos + 1, len(line)): + ch = line[i] + if ch == "(": + depth += 1 + elif ch == ")": + depth -= 1 + if depth == 0: + return i + return -1 + + +def _splitTopLevel( + text: str, + delimiter: str +) -> list: + """ + Split a condition expression by a delimiter (.AND. / .OR.), respecting parentheses. + + Only splits at the top nesting level; delimiters inside parentheses are ignored. + + Args: + text (str): The condition expression to split. + delimiter (str): The delimiter string, e.g. ".OR." or ".AND.". + + Returns: + list: A list of sub-expression strings (stripped of leading/trailing whitespace). + """ + + parts = [] + depth = 0 + buf = "" + i = 0 + text_upper = text.upper() + delim_upper = delimiter.upper() + dlen = len(delim_upper) + while i < len(text): + if text[i] == "(": + depth += 1 + buf += text[i] + elif text[i] == ")": + depth -= 1 + buf += text[i] + elif depth == 0 and text_upper[i:i + dlen] == delim_upper: + parts.append(buf) + buf = "" + i += dlen + continue + else: + buf += text[i] + i += 1 + if buf.strip(): + parts.append(buf) + return parts + + +def _buildFieldMap(): + """ + Rebuild the _FIELD_MAP lookup from _META_VARS and _TARGET_VARS. + + Each variable is registered under both its canonical name (uppercased) + and its display_name (if present), so that scripts can refer to either. + """ + + _FIELD_MAP.clear() + for ch_name, obj in _META_VARS.items(): + _FIELD_MAP[obj.name.upper()] = obj + _FIELD_MAP[ch_name.upper().strip()] = obj + for obj in _TARGET_VARS.values(): + _FIELD_MAP[obj.name.upper()] = obj + if obj.display_name: + _FIELD_MAP[obj.display_name.upper().strip()] = obj + + +def _resolveFieldObj( + field_name: str +): + """ + Resolve a field name to its ASObject by looking up _FIELD_MAP then _SCRIPT_VARS. + + Unlike getting a raw value, this returns the ASObject instance itself, + preserving type information for operations and comparisons. + + Args: + field_name (str): The field name (case-insensitive). + + Returns: + ASObject or None: The resolved ASObject, or None if not found. + """ + + upper_name = field_name.upper().strip() + obj = _FIELD_MAP.get(upper_name) + if obj: + return obj + obj = _SCRIPT_VARS.get(upper_name) + if obj: + return obj + return None + + +def _resolveValue( + value_str: str, + target_data: dict +): + """ + Parse and resolve a value string from a script into a Python object. + + Supports the following literal forms: + - TIME(hh:mm) + - DATE(yyyy-mm-dd) + - .TRUE. / .FALSE. + - Single/double quoted strings (with escaped single quotes) + - CURRENT_DATE + N / CURRENT_TIME + N (relative offsets) + - Numeric literals (int / float) + - Field references (resolved via _resolveFieldObj) + + Args: + value_str (str): The raw value string from the script. + target_data (dict): The application data dict. + + Returns: + The resolved Python value. + """ + + s = value_str.strip() + m = re.match(r"^TIME\((\d{1,2}):(\d{2})\)$", s, re.IGNORECASE) + if m: + return time(int(m.group(1)), int(m.group(2))) + m = re.match(r"^DATE\((\d{4})-(\d{2})-(\d{2})\)$", s, re.IGNORECASE) + if m: + return date(int(m.group(1)), int(m.group(2)), int(m.group(3))) + up = s.upper() + if up == ".TRUE.": + return True + if up == ".FALSE.": + return False + if s.startswith("'") and s.endswith("'"): + return s[1:-1].replace("''", "'") + if s.startswith('"') and s.endswith('"'): + return s[1:-1] + m = re.match(r"^CURRENT_DATE\s*\+\s*(\d+)$", s, re.IGNORECASE) + if m: + days = int(m.group(1)) + return datetime.now().date() + timedelta(days=days) + m = re.match(r"^CURRENT_TIME\s*\+\s*(\d+)$", s, re.IGNORECASE) + if m: + hours = int(m.group(1)) + return (datetime.now() + timedelta(hours=hours)).time() + try: + return int(s) + except ValueError: + pass + try: + return float(s) + except ValueError: + pass + obj = _resolveFieldObj(s) + if obj: + return obj.getValue(target_data) + return "" + + +def _resolveAsObject( + expr: str, + target_data: dict +) -> ASObject: + """ + Resolve a value expression to an ASObject. + + - If the expression is a registered field name, returns its ASObject directly. + - If the expression is a literal (number, string, DATE(), TIME(), bool), + creates a temporary ASObject with the inferred type. + + This is the key function that ensures all internal operations work + with typed ASObject instances rather than raw Python values. + + Args: + expr (str): The raw expression string from the script. + target_data (dict): The application data dict. + + Returns: + ASObject: A registered or temporary ASObject representing the expression value. + """ + + s = expr.strip() + obj = _resolveFieldObj(s) + if obj is not None: + return obj + value = _resolveValue(s, target_data) + inferred = _inferType(value, s) + return ASObject._makeTemp(value, inferred) + + +def _evaluateCondition( + condition_str: str, + target_data: dict +) -> bool: + """ + Evaluate a condition expression and return a boolean result. + + Supports: + - Boolean literals: .TRUE., .FALSE. + - .AND. / .OR. operators (lowest precedence) + - Parenthesised sub-expressions + - Comparison operators: .EQ., .NEQ., .BGT., .BLT., .BGE., .BLE. + + All operands are resolved as ASObject instances (via _resolveAsObject) + and comparisons are delegated to ASOperator.compare(). + + Args: + condition_str (str): The raw condition expression from the script. + target_data (dict): The application data dict. + + Returns: + bool: The evaluation result. + + Raises: + ValueError: If the expression contains unrecognised tokens or type-mismatched comparisons. + """ + + s = condition_str.strip() + if not s: + return False + or_parts = _splitTopLevel(s, ".OR.") + if len(or_parts) > 1: + return any( + _evaluateCondition(p.strip(), target_data) + for p in or_parts + ) + and_parts = _splitTopLevel(s, ".AND.") + if len(and_parts) > 1: + return all( + _evaluateCondition(p.strip(), target_data) + for p in and_parts + ) + s = s.strip() + if s.startswith("(") and s.endswith(")"): + return _evaluateCondition(s[1:-1], target_data) + up = s.upper() + if up == ".TRUE.": + return True + if up == ".FALSE.": + return False + for op in ASOperator._COMPARE: + idx = up.find(op.upper()) + if idx < 0: + continue + left_raw = s[:idx].strip() + right_raw = s[idx + len(op):].strip() + left_obj = _resolveAsObject(left_raw, target_data) + right_obj = _resolveAsObject(right_raw, target_data) + return ASOperator.compare(left_obj, right_obj, op, target_data) + raise ValueError( + _errPos(f"无法识别的条件表达式 '{condition_str}'") + ) + + +def _executeSet( + line: str, + target_data: dict +): + """ + Execute a SET statement to assign a value to a field or script variable. + + Parses the line as "SET field_name = value_expr", resolves the value, + and assigns it. If the target field does not exist, a new script variable + is created with an inferred type. + + Args: + line (str): The raw SET line from the script. + target_data (dict): The application data dict. + + Raises: + ValueError: If the value string contains unexpected extra tokens. + """ + + rest = line[3:].strip() + eq_idx = rest.find("=") + if eq_idx < 0: + return + field_name = rest[:eq_idx].strip() + value_str = rest[eq_idx + 1:].strip() + if not field_name: + return + resolved = _resolveValue(value_str, target_data) + stripped = value_str.strip() + if resolved == "" and stripped not in ("''", '""') and len(stripped.split()) > 1: + raise ValueError(_errPos(f"SET 值中存在多余内容 '{stripped}'")) + upper_name = field_name.upper().strip() + obj = _FIELD_MAP.get(upper_name) + if not obj: + obj = _SCRIPT_VARS.get(upper_name) + if obj: + obj.setValue(resolved, target_data) + return + inferred_type = _inferType(resolved, stripped) + new_var = ASObject( + upper_name, + inferred_type, + read_only=False, + is_config=False, + default_value=resolved + ) + _SCRIPT_VARS[upper_name] = new_var + + +def _executeOperation( + line: str, + target_data: dict +): + """ + Execute a field operation statement: "FIELD .ADD. N" or "FIELD .SUB. N". + + Resolves the left side as a registered ASObject and the right side + as a temporary numeric ASObject, then delegates to ASOperator.apply(). + + Args: + line (str): The raw operation line from the script (e.g. "RESERVE_DATE .ADD. 1"). + target_data (dict): The application data dict. + + Raises: + ValueError: If the field is unknown, the operand is invalid, + or the type does not support the operation. + """ + + parts = line.split() + if len(parts) < 3: + return + if len(parts) > 3: + raise ValueError( + _errPos(f"操作语句中存在多余内容 '{' '.join(parts[3:])}'") + ) + field_name = parts[0].upper().strip() + op = parts[1].upper().strip() + raw_value = parts[2].strip() + target = _resolveFieldObj(field_name) + if target is None: + raise ValueError(_errPos(f"未知字段 '{field_name}'")) + operand = _resolveAsObject(raw_value, target_data) + ASOperator.apply(target, operand, op, target_data) + + +def _assertInIf( + if_stack: list, + line: str +): + """ + Assert that an executable statement is inside an IF block. + + Args: + if_stack (list): The current IF nesting stack. + line (str): The statement line (used for error message). + + Raises: + ValueError: If if_stack is empty (statement is outside any IF block). + """ + + if not if_stack: + raise ValueError(_errPos(f"可执行语句必须位于 IF 块内: {line}")) + + +def addTargetVar( + name: str, + var_type: str, + key_path: list, + display_name: str = None +): + """ + Register a new target variable bound to a path in the application data dict. + + Once registered, the variable can be read, written, and operated on in scripts + using its canonical name or display_name. + + Args: + name (str): The canonical variable name (e.g. "RESERVE_DATE"). + var_type (str): The type ("Int", "Float", "Boolean", "Date", "Time", "String"). + key_path (list): The nested path into target_data, e.g. ["reserve_info", "date"]. + display_name (str): An optional Chinese alias for use in script conditions. + + Example: + >>> addTargetVar("MY_FIELD", "String", ["custom", "field"], display_name="自定义字段") + """ + + upper_name = name.upper().strip() + obj = ASObject( + upper_name, + var_type, + is_config=True, + key_path=key_path, + display_name=display_name + ) + _TARGET_VARS[upper_name] = obj + + +def execute( + script_text: str, + target_data: dict +): + """ + Execute an AutoScript on the given target data. + + Parses the script line by line, maintaining an IF nesting stack to control + which blocks are active. Supports IF / ELSE IF / ELSE / END IF control flow, + SET assignments, PASS no-ops, and .ADD. / .SUB. field operations. + + Args: + script_text (str): The AutoScript source code. + target_data (dict): The application data dict to read from / write to. + + Raises: + ValueError: On syntax errors, unbalanced IF/END IF, unknown fields, etc. + + Example: + >>> data = {"reserve_info": {"date": "2026-05-01"}} + >>> execute( + ... "IF(.TRUE.)\\n" + ... " RESERVE_DATE .ADD. 1\\n" + ... "END IF", + ... data + ... ) + >>> data["reserve_info"]["date"] + '2026-05-02' + """ + + global _CUR_LINE + + _buildFieldMap() + if not script_text or not script_text.strip(): + return + lines = [l.strip() for l in script_text.split("\n") if l.strip()] + if not lines: + return + if_stack = [] + for _CUR_LINE, line in enumerate(lines, 1): + upper_line = line.upper().strip() + if upper_line.startswith("IF"): + paren_open = _findConditionBegin(upper_line) + if paren_open < 0: + raise ValueError(_errPos("IF 缺少左括号")) + cond_end = _findConditionEnd(upper_line, paren_open) + if cond_end < 0: + raise ValueError(_errPos("IF 缺少右括号")) + remaining = upper_line[cond_end + 1:].strip() + if remaining and remaining.upper() != "THEN": + raise ValueError(_errPos(f"IF 条件后存在多余内容 '{remaining}'")) + condition_str = line[paren_open + 1:cond_end].strip() + matched = _evaluateCondition(condition_str, target_data) + if_stack.append([matched, matched]) + elif upper_line.startswith("ELSE IF"): + if not if_stack: + raise ValueError(_errPos("ELSE IF 前缺少 IF")) + paren_open = _findConditionBegin(upper_line) + if paren_open < 0: + raise ValueError(_errPos("ELSE IF 缺少左括号")) + cond_end = _findConditionEnd(upper_line, paren_open) + if cond_end < 0: + raise ValueError(_errPos("ELSE IF 缺少右括号")) + remaining = upper_line[cond_end + 1:].strip() + if remaining and remaining.upper() != "THEN": + raise ValueError(_errPos(f"ELSE IF 条件后存在多余内容 '{remaining}'")) + _, branch_matched = if_stack[-1] + if not branch_matched: + condition_str = line[paren_open + 1:cond_end].strip() + matched = _evaluateCondition(condition_str, target_data) + if_stack[-1] = [matched, matched] + else: + if_stack[-1][0] = False + elif upper_line == "ELSE": + if not if_stack: + raise ValueError(_errPos("ELSE 前缺少 IF")) + _, branch_matched = if_stack[-1] + if not branch_matched: + if_stack[-1] = [True, True] + else: + if_stack[-1][0] = False + elif upper_line in ("ENDIF", "END IF"): + if not if_stack: + raise ValueError(_errPos("ENDIF / END IF 前缺少 IF")) + if_stack.pop() + elif upper_line.startswith("SET "): + _assertInIf(if_stack, line) + if all(ctx[0] for ctx in if_stack): + _executeSet(line, target_data) + elif upper_line == "PASS": + continue + else: + _assertInIf(if_stack, line) + if all(ctx[0] for ctx in if_stack): + _executeOperation(line, target_data) + if if_stack: + raise ValueError( + "AutoScript 语法错误: IF 与 ENDIF / END IF 不匹配" + ) diff --git a/src/autoscript/ASObject.py b/src/autoscript/ASObject.py new file mode 100644 index 0000000..082c0f4 --- /dev/null +++ b/src/autoscript/ASObject.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +import re +from datetime import datetime, date, time + + +__all__ = ["ASObject", "_META_VARS", "_inferType"] + + +# Default values for each supported type when no value is present +_TYPE_DEFAULTS = { + "Int": 0, + "Float": 0.0, + "Boolean": False, + "Date": None, + "Time": None, + "String": "", +} + + +class ASObject: + """ + Represents a variable object used throughout the AutoScript engine. + + An ASObject can be a meta variable (read-only, e.g. CURRENT_DATE), + a target variable (bound to a config data dict via key_path), + or a script variable (free-form, stored internally). + + Args: + name (str): The canonical name of the variable (case-insensitive). + var_type (str): One of "Int", "Float", "Boolean", "Date", "Time", "String". + read_only (bool): Whether the variable is read-only (default: False). + is_config (bool): Whether the variable maps to a target_data path (default: False). + key_path (list): The nested key path into target_data, e.g. ["reserve_info", "date"]. + default_value: The fallback value when no target_data value is found. + display_name (str): An alias for use in script conditions and assignments. + + Example: + >>> obj = ASObject("MY_DATE", "Date", is_config=True, + ... key_path=["reserve_info", "date"], + ... display_name="预约日期") + >>> obj.getValue({"reserve_info": {"date": "2026-05-01"}}) + datetime.date(2026, 5, 1) + """ + + _TEMP_COUNTER = 0 + + def __init__( + self, + name: str, + var_type: str, *, + read_only: bool = False, + is_config: bool = False, + key_path: list = None, + default_value=None, + display_name: str = None + ): + + self.name = name + self.var_type = var_type + self.read_only = read_only + self.is_config = is_config + self.key_path = key_path or [] + self._value = default_value + self.display_name = display_name + + @classmethod + def _makeTemp( + cls, + value, + inferred_type: str + ): + """ + Create a temporary unnamed ASObject from a literal value. + + Temporary objects are used for inline script literals (e.g. 42, + 'hello', DATE(2026-01-01)) so they can participate in typed + operations alongside registered variables. + + Args: + value: The resolved Python value. + inferred_type (str): The AutoScript type name. + + Returns: + ASObject: A temporary, non-config, read-write ASObject. + """ + + cls._TEMP_COUNTER += 1 + return cls( + f"__TMP_{cls._TEMP_COUNTER}", + inferred_type, + read_only=False, + is_config=False, + default_value=value + ) + + + def _typeEmpty( + self + ): + """ + Return the type-appropriate empty / default value. + """ + + return _TYPE_DEFAULTS.get(self.var_type, "") + + + def getValue( + self, + target_data: dict = None + ): + """ + Retrieve the current value of this variable. + + For read-only variables (CURRENT_DATE, CURRENT_TIME), returns the + live datetime. For config variables, traverses the key_path into + target_data and parses Date/Time strings. Otherwise returns the + internal _value. + + Args: + target_data (dict): The application data dict (required for config vars). + + Returns: + The resolved value, or a type-appropriate default if missing. + """ + + if self.read_only: + if self.name == "CURRENT_DATE": + return datetime.now().date() + if self.name == "CURRENT_TIME": + return datetime.now().time() + return self._value + if self.is_config and target_data is not None and self.key_path: + d = target_data + for key in self.key_path[:-1]: + d = d.get(key, {}) + if not isinstance(d, dict): + return self._typeEmpty() + raw = d.get(self.key_path[-1]) + if raw is None: + return self._typeEmpty() + if self.var_type == "Date" and isinstance(raw, str): + try: + return datetime.strptime(raw, "%Y-%m-%d").date() + except ValueError: + return self._typeEmpty() + if self.var_type == "Time" and isinstance(raw, str): + try: + return datetime.strptime(raw, "%H:%M").time() + except ValueError: + return self._typeEmpty() + return raw + return self._value + + + def setValue( + self, + value, + target_data: dict = None + ): + """ + Assign a new value to this variable, with type coercion. + + Performs coercion for Boolean (string -> bool), Int, and Float types. + For config variables, dates/times are converted back to strings before + writing into target_data. + + Args: + value: The value to assign. + target_data (dict): The application data dict (required for config vars). + + Raises: + ValueError: If the variable is read-only or value cannot be coerced. + """ + + if self.read_only: + raise ValueError(f"不能修改只读变量 '{self.name}'") + if self.var_type == "Boolean" and not isinstance(value, bool): + value = (str(value).upper() == "TRUE") + if self.var_type == "Int" and not isinstance(value, int): + try: + value = int(value) + except (ValueError, TypeError): + raise ValueError(f"无法将值 '{value}' 转换为 Int 类型") + if self.var_type == "Float" and not isinstance(value, float): + try: + value = float(value) + except (ValueError, TypeError): + raise ValueError(f"无法将值 '{value}' 转换为 Float 类型") + if self.is_config: + if self.var_type == "Date" and isinstance(value, date): + value = value.strftime("%Y-%m-%d") + if self.var_type == "Time" and isinstance(value, time): + value = value.strftime("%H:%M") + if self.is_config and target_data is not None and self.key_path: + d = target_data + for key in self.key_path[:-1]: + d = d.setdefault(key, {}) + d[self.key_path[-1]] = value + else: + self._value = value + + +# Built-in read-only meta variables available to all scripts +_META_VARS = { + "CURRENT_DATE": ASObject("CURRENT_DATE", "Date", read_only=True, display_name="当前日期"), + "CURRENT_TIME": ASObject("CURRENT_TIME", "Time", read_only=True, display_name="当前时间"), +} + + +def _inferType( + value, + raw_expr: str = None +) -> str: + """ + Infer the ASObject type string from a Python value or raw expression. + + When the Python type is ambiguous (e.g. int can be Int or a component + of Date), the raw_expr is used as a hint. + + Args: + value: The resolved Python value. + raw_expr (str): The original expression string from the script (optional). + + Returns: + str: One of "Boolean", "Int", "Float", "Date", "Time", "String". + """ + + if isinstance(value, bool): + return "Boolean" + if isinstance(value, int): + return "Int" + if isinstance(value, float): + return "Float" + if isinstance(value, date): + return "Date" + if isinstance(value, time): + return "Time" + if raw_expr: + if re.match(r"^DATE\(\d{4}-\d{2}-\d{2}\)$", raw_expr, re.IGNORECASE): + return "Date" + if re.match(r"^TIME\(\d{1,2}:\d{2}\)$", raw_expr, re.IGNORECASE): + return "Time" + return "String" diff --git a/src/autoscript/ASOperator.py b/src/autoscript/ASOperator.py new file mode 100644 index 0000000..bc9cf1b --- /dev/null +++ b/src/autoscript/ASOperator.py @@ -0,0 +1,185 @@ +# -*- coding: utf-8 -*- +""" +Copyright (c) 2026 KenanZhu. +All rights reserved. + +This software is provided "as is", without any warranty of any kind. +You may use, modify, and distribute this file under the terms of the MIT License. +See the LICENSE file for details. +""" +from datetime import datetime, timedelta, date, time + +from .ASObject import ASObject + + +__all__ = ["ASOperator"] + + +class ASOperator: + """ + Centralised type-safe operations for AutoScript engine types. + + All arithmetic (ADD / SUB) and comparison operators are routed through + this class, which dispatches to the correct Python-level logic based on + the ASObject's var_type. This keeps type-specific branching in one + place instead of scattering it across the engine. + + Args: + op (str): One of ".ADD.", ".SUB.", ".EQ.", ".NEQ.", ".BGT.", + ".BLT.", ".BGE.", ".BLE.". + + Example: + >>> obj = ASObject("X", "Int", default_value=10) + >>> ASOperator.apply(obj, ASObject._makeTemp(5, "Int"), ".ADD.", None) + >>> obj.getValue() + 15 + >>> ASOperator.compare( + ... obj, + ... ASObject._makeTemp(15, "Int"), + ... ".EQ.", None + ... ) + True + """ + + _COMPARE = { + ".EQ." : lambda a, b: a == b, + ".NEQ.": lambda a, b: a != b, + ".BGT.": lambda a, b: a > b, + ".BLT.": lambda a, b: a < b, + ".BGE.": lambda a, b: a >= b, + ".BLE.": lambda a, b: a <= b, + } + _ARITH_TYPES = {"Date", "Time", "Int", "Float"} + + @staticmethod + def apply( + target: ASObject, + operand: ASObject, + op: str, + target_data: dict + ): + """ + Apply ADD or SUB to a target ASObject, modifying it in place. + + Args: + target (ASObject): The variable to modify. + operand (ASObject): The operand (numeric value for Date/Time/Int/Float). + op (str): ".ADD." or ".SUB.". + target_data (dict): Application data dict (passed through to getValue/setValue). + + Raises: + ValueError: If the type does not support the operation or values are invalid. + """ + + tp = target.var_type + op_tp = operand.var_type + if tp not in ASOperator._ARITH_TYPES: + raise ValueError(f"'{tp}' 类型字段不支持操作运算") + if op_tp not in ("Int", "Float"): + raise ValueError(f"操作数类型 '{op_tp}' 不能用于运算,需要数值类型 (Int / Float)") + if tp in ("Date", "Time") and op_tp != "Int": + raise ValueError(f"'{tp}' 类型的加减法操作数必须为 Int 类型,不允许 Float") + target_val = target.getValue(target_data) + if target_val is None: + raise ValueError(f"'{target.name}' 的值为空,无法进行运算") + if op == ".ADD.": + ASOperator._arithAdd(target, target_val, operand, target_data) + elif op == ".SUB.": + ASOperator._arithSub(target, target_val, operand, target_data) + else: + raise ValueError(f"不支持的操作 '{op}'") + + @staticmethod + def _arithAdd( + target: ASObject, + target_val, + operand: ASObject, + target_data: dict + ): + """Dispatch ADD per type.""" + + tp = target.var_type + raw_op = operand._value + + if tp == "Date": + if not isinstance(target_val, date): + raise ValueError(f"'{target.name}' 的值 '{target_val}' 不是有效日期") + new_val = target_val + timedelta(days=int(raw_op)) + elif tp == "Time": + if not isinstance(target_val, time): + raise ValueError(f"'{target.name}' 的值 '{target_val}' 不是有效时间") + dt = datetime.combine(datetime.today(), target_val) + timedelta(hours=int(raw_op)) + new_val = dt.time() + elif tp == "Int": + new_val = int(target_val) + int(raw_op) + elif tp == "Float": + new_val = float(target_val) + float(raw_op) + else: + raise ValueError(f"'{tp}' 类型不支持 ADD 操作") + target.setValue(new_val, target_data) + + @staticmethod + def _arithSub( + target: ASObject, + target_val, + operand: ASObject, + target_data: dict + ): + + """Dispatch SUB per type.""" + + tp = target.var_type + raw_op = operand._value + + if tp == "Date": + if not isinstance(target_val, date): + raise ValueError(f"'{target.name}' 的值 '{target_val}' 不是有效日期") + new_val = target_val - timedelta(days=int(raw_op)) + elif tp == "Time": + if not isinstance(target_val, time): + raise ValueError(f"'{target.name}' 的值 '{target_val}' 不是有效时间") + dt = datetime.combine(datetime.today(), target_val) - timedelta(hours=int(raw_op)) + new_val = dt.time() + elif tp == "Int": + new_val = int(target_val) - int(raw_op) + elif tp == "Float": + new_val = float(target_val) - float(raw_op) + else: + raise ValueError(f"'{tp}' 类型不支持 SUB 操作") + target.setValue(new_val, target_data) + + @staticmethod + def compare( + left: ASObject, + right: ASObject, + op: str, + target_data: dict + ) -> bool: + """ + Compare two ASObjects using the given comparison operator. + + Args: + left (ASObject): Left-hand side. + right (ASObject): Right-hand side. + op (str): One of ".EQ.", ".NEQ.", ".BGT.", ".BLT.", ".BGE.", ".BLE.". + target_data (dict): Application data dict. + + Returns: + bool: The comparison result. + + Raises: + ValueError: If the types are incompatible for comparison. + """ + + cmp_func = ASOperator._COMPARE.get(op) + if cmp_func is None: + raise ValueError(f"未知的比较操作 '{op}'") + left_val = left.getValue(target_data) + right_val = right.getValue(target_data) + try: + return cmp_func(left_val, right_val) + except TypeError: + raise ValueError( + f"无法比较 '{left.name}' ({left.var_type}) " + f"与 '{right.name}' ({right.var_type})" + ) diff --git a/src/autoscript/__init__.py b/src/autoscript/__init__.py new file mode 100644 index 0000000..a2a6f1e --- /dev/null +++ b/src/autoscript/__init__.py @@ -0,0 +1,55 @@ +""" + AutoScript module for the AutoLibrary project. + + A lightweight scripting DSL for preprocessing user reservation data + in repeatable timer tasks. Supports IF/ELSE IF/ELSE/END IF control + flow, SET assignments, .ADD./.SUB. operations, and rich comparisons. + + Public API: + - execute(script_text, target_data): Execute an AutoScript. + - addTargetVar(name, var_type, key_path, display_name): Register a variable. + - registerDefaultTargetVars(): Register all built-in target variables. + - META_VARS: dict of built-in read-only meta variables. + - ALL_VARIABLES: dict of all available variables (display_name -> (name, type)). +""" + +from autoscript.ASEngine import execute, addTargetVar +from autoscript.ASObject import _META_VARS as META_VARS + +__all__ = [ + "execute", "addTargetVar", "registerDefaultTargetVars", + "META_VARS", "ALL_VARIABLES", +] + +# All variables available to scripts (display_name -> (name, type)). +# This mirrors the old AutoScriptEngine.VARIABLE_META for backward +# compatibility in the UI orchestration dialog. +ALL_VARIABLES: dict = { + "用户名": ("USERNAME", "String"), + "用户启用": ("USER_ENABLE", "Boolean"), + "预约日期": ("RESERVE_DATE", "Date"), + "预约开始时间": ("RESERVE_BEGIN_TIME", "Time"), + "预约结束时间": ("RESERVE_END_TIME", "Time"), + "当前时间": ("CURRENT_TIME", "Time"), + "当前日期": ("CURRENT_DATE", "Date"), +} + +# Key paths into target_data dict for each target variable. +# (name, type, key_path, display_name) +_TARGET_VAR_DEFS = [ + ("USERNAME", "String",["username"], "用户名"), + ("USER_ENABLE", "Boolean",["enabled"], "用户启用"), + ("RESERVE_DATE", "Date", ["reserve_info", "date"], "预约日期"), + ("RESERVE_BEGIN_TIME", "Time", ["reserve_info", "begin_time", "time"], "预约开始时间"), + ("RESERVE_END_TIME", "Time", ["reserve_info", "end_time", "time"], "预约结束时间"), +] + + +def registerDefaultTargetVars() -> None: + """ + Register all built-in target variables with the engine. + This must be called before any script execution. + Calling multiple times is idempotent (re-registers same keys). + """ + for name, var_type, key_path, display_name in _TARGET_VAR_DEFS: + addTargetVar(name, var_type, key_path, display_name) diff --git a/src/dsl/AutoScriptEngine.py b/src/dsl/AutoScriptEngine.py deleted file mode 100644 index 7504ea6..0000000 --- a/src/dsl/AutoScriptEngine.py +++ /dev/null @@ -1,386 +0,0 @@ -# -*- coding: utf-8 -*- -""" -Copyright (c) 2026 KenanZhu. -All rights reserved. - -This software is provided "as is", without any warranty of any kind. -You may use, modify, and distribute this file under the terms of the MIT License. -See the LICENSE file for details. -""" -import re -from datetime import datetime, timedelta - - -class AutoScriptEngine: - """ - AutoScript script engine. - - Parses and executes AutoScript — a lightweight scripting DSL - used in repeatable timer tasks to preprocess user reservation - data before automation runs. - - Supports IF/ELSE IF/ELSE/END IF control flow, SET assignments, - .ADD./.SUB. operations on Date/Time fields, and rich comparison - operators (.EQ. .NEQ. .BGT. .BLT. .BGE. .BLE.). - - Examples: - >>> engine = AutoScriptEngine - >>> user = { - ... "username": "test", - ... "enabled": True, - ... "reserve_info": {"date": "2026-05-07"} - ... } - >>> engine.execute( - ... 'IF(CURRENT_TIME .BGT. TIME(19:00))\\n' - ... ' RESERVE_DATE .ADD. 1\\n' - ... 'END IF', - ... user - ... ) - """ - COMPARE_OPS = { # compare operators - ".EQ." : lambda a, b: a == b, - ".NEQ.": lambda a, b: a != b, - ".BGT.": lambda a, b: a > b, - ".BLT.": lambda a, b: a < b, - ".BGE.": lambda a, b: a >= b, - ".BLE.": lambda a, b: a <= b, - } - VARIABLE_META = { # variable metadata - "预约开始时间": ("RESERVE_BEGIN_TIME", "Time"), - "预约结束时间": ("RESERVE_END_TIME", "Time"), - "预约日期": ("RESERVE_DATE", "Date"), - "用户名": ("USERNAME", "String"), - "用户启用": ("USER_ENABLE", "Boolean"), - "当前时间": ("CURRENT_TIME", "Time"), - "当前日期": ("CURRENT_DATE", "Date"), - } - _FIELD_TYPE_MAP = {meta[0]: meta[1] for meta in VARIABLE_META.values()} - - @staticmethod - def execute( - script_text: str, - user_data: dict - ): - """ - Execute an AutoScript against the given user data. - - The script is parsed line-by-line. All modifications are - applied directly to ``user_data`` in-place. - - Args: - script_text (str): Raw AutoScript source code. - user_data (dict): User data dictionary to read from and - write to. Must conform to the standard user profile - structure (username, enabled, reserve_info, etc.). - - Raises: - ValueError: On any syntax or type error encountered - during parsing or execution. - """ - - if not script_text or not script_text.strip(): - return - lines = [l.strip() for l in script_text.split("\n") if l.strip()] - if not lines: - return - if_stack = [] - - for line in lines: - upper_line = line.upper().strip() - if upper_line.startswith("IF("): - cond_end = _findConditionEnd(upper_line) - if cond_end < 0: - raise ValueError("AutoScript 语法错误: IF 缺少右括号") - condition_str = line[3:cond_end].strip() - matched = AutoScriptEngine._evaluateCondition( - condition_str, user_data - ) - if_stack.append([matched, matched]) - elif upper_line.startswith("ELSE IF("): - if not if_stack: - raise ValueError("AutoScript 语法错误: ELSE IF 前缺少 IF") - cond_end = _findConditionEnd(upper_line) - if cond_end < 0: - raise ValueError("AutoScript 语法错误: ELSE IF 缺少右括号") - condition_str = line[8:cond_end].strip() - _, has_matched = if_stack[-1] - if not has_matched: - matched = AutoScriptEngine._evaluateCondition( - condition_str, user_data - ) - if_stack[-1] = [matched, matched] - else: - if_stack[-1][0] = False - elif upper_line == "ELSE": - if not if_stack: - raise ValueError("AutoScript 语法错误: ELSE 前缺少 IF") - _, has_matched = if_stack[-1] - if not has_matched: - if_stack[-1] = [True, True] - else: - if_stack[-1][0] = False - elif upper_line in ("ENDIF", "END IF"): - if not if_stack: - raise ValueError("AutoScript 语法错误: ENDIF/END IF 前缺少 IF") - if_stack.pop() - elif upper_line.startswith("SET "): - should_execute = ( - all(ctx[0] for ctx in if_stack) if if_stack else True - ) - if should_execute: - AutoScriptEngine._executeSet(line, user_data) - elif upper_line == "PASS": - continue - else: - should_execute = ( - all(ctx[0] for ctx in if_stack) if if_stack else True - ) - if should_execute: - AutoScriptEngine._executeOperation(line, user_data) - if if_stack: - raise ValueError("AutoScript 语法错误: IF 与 ENDIF/END IF 不匹配") - - @staticmethod - def _resolveField( - field_name: str, - user_data: dict - ): - - upper_name = field_name.upper().strip() - if upper_name == "CURRENT_DATE": - return datetime.now().strftime("%Y-%m-%d") - elif upper_name == "CURRENT_TIME": - return datetime.now().strftime("%H:%M") - elif upper_name == "USERNAME": - return user_data.get("username", "") - elif upper_name == "USER_ENABLE": - return user_data.get("enabled", False) - elif upper_name == "RESERVE_DATE": - return user_data.get("reserve_info", {}).get("date", "") - elif upper_name == "RESERVE_BEGIN_TIME": - return ( - user_data - .get("reserve_info", {}) - .get("begin_time", {}) - .get("time", "") - ) - elif upper_name == "RESERVE_END_TIME": - return ( - user_data - .get("reserve_info", {}) - .get("end_time", {}) - .get("time", "") - ) - return "" - - @staticmethod - def _resolveValue( - value_str: str, - user_data: dict - ): - - s = value_str.strip() - time_match = re.match(r"^TIME\((\d{1,2}):(\d{2})\)$", s, re.IGNORECASE) - if time_match: - h, m = time_match.group(1), time_match.group(2) - return f"{int(h):02d}:{int(m):02d}" - date_match = re.match(r"^DATE\((\d{4})-(\d{2})-(\d{2})\)$", s, re.IGNORECASE) - if date_match: - y, mo, d = date_match.group(1), date_match.group(2), date_match.group(3) - return f"{int(y):04d}-{int(mo):02d}-{int(d):02d}" - if s.upper() == ".TRUE.": - return True - if s.upper() == ".FALSE.": - return False - if s.startswith("'") and s.endswith("'"): - inner = s[1:-1].replace("''", "'") - return inner - if s.startswith('"') and s.endswith('"'): - return s[1:-1] - relDate = re.match(r"^CURRENT_DATE\s*\+\s*(\d+)$", s, re.IGNORECASE) - if relDate: - days = int(relDate.group(1)) - return (datetime.now() + timedelta(days=days)).strftime("%Y-%m-%d") - relTime = re.match(r"^CURRENT_TIME\s*\+\s*(\d+)$", s, re.IGNORECASE) - if relTime: - hours = int(relTime.group(1)) - return (datetime.now() + timedelta(hours=hours)).strftime("%H:%M") - try: - return int(s) - except ValueError: - pass - try: - return float(s) - except ValueError: - pass - resolved = AutoScriptEngine._resolveField(s, user_data) - return resolved - - @staticmethod - def _setField( - field_name: str, - value: str, - user_data: dict - ): - upper_name = field_name.upper().strip() - if upper_name == "RESERVE_DATE": - user_data.setdefault("reserve_info", {})["date"] = value - elif upper_name == "RESERVE_BEGIN_TIME": - ri = user_data.setdefault("reserve_info", {}) - ri.setdefault("begin_time", {})["time"] = value - elif upper_name == "RESERVE_END_TIME": - ri = user_data.setdefault("reserve_info", {}) - ri.setdefault("end_time", {})["time"] = value - elif upper_name == "USERNAME": - user_data["username"] = value - elif upper_name == "USER_ENABLE": - if isinstance(value, bool): - user_data["enabled"] = value - else: - user_data["enabled"] = (str(value).upper() == "TRUE") - - @staticmethod - def _evaluateCondition( - condition_str: str, - user_data: dict - ) -> bool: - - for op, cmp_func in AutoScriptEngine.COMPARE_OPS.items(): - if op not in condition_str.upper(): - continue - idx = condition_str.upper().find(op) - parts = [condition_str[:idx], condition_str[idx + len(op):]] - if len(parts) != 2: - continue - field_name = parts[0].strip() - value_str = parts[1].strip() - left_val = AutoScriptEngine._resolveField(field_name, user_data) - right_val = AutoScriptEngine._resolveValue(value_str, user_data) - try: - return cmp_func(left_val, right_val) - except TypeError: - raise ValueError( - f"AutoScript 语法错误: 无法比较 " - f"'{field_name}' ({type(left_val).__name__}) " - f"与 '{value_str}' ({type(right_val).__name__})" - ) - return False - - @staticmethod - def _executeSet( - line: str, - user_data: dict - ): - rest = line[3:].strip() - eq_idx = rest.find("=") - if eq_idx < 0: - return - field_name = rest[:eq_idx].strip() - value_str = rest[eq_idx + 1:].strip() - if not field_name: - return - resolved = AutoScriptEngine._resolveValue(value_str, user_data) - AutoScriptEngine._setField(field_name, resolved, user_data) - - @staticmethod - def _executeOperation( - line: str, - user_data: dict - ): - - parts = line.split() - if len(parts) < 3: - return - field_name = parts[0].upper().strip() - op = parts[1].upper().strip() - raw_value = parts[2].strip() - field_type = AutoScriptEngine._FIELD_TYPE_MAP.get(field_name) - if not field_type: - raise ValueError( - f"AutoScript 语法错误: 未知字段 '{field_name}'" - ) - try: - num_value = float(raw_value) if "." in raw_value else int(raw_value) - except (ValueError, TypeError): - raise ValueError( - f"AutoScript 语法错误: 无效操作数 '{raw_value}'" - ) - if field_type == "Date": - date_str = AutoScriptEngine._resolveField(field_name, user_data) - if not date_str: - return - try: - date_obj = datetime.strptime(date_str, "%Y-%m-%d") - except (ValueError, TypeError): - return - if op == ".ADD.": - date_obj += timedelta(days=num_value) - elif op == ".SUB.": - date_obj -= timedelta(days=num_value) - else: - raise ValueError( - f"AutoScript 语法错误: Date 类型不支持操作 '{op}'" - ) - AutoScriptEngine._setField( - field_name, date_obj.strftime("%Y-%m-%d"), user_data - ) - elif field_type == "Time": - time_str = AutoScriptEngine._resolveField(field_name, user_data) - if not time_str: - return - try: - time_obj = datetime.strptime(time_str, "%H:%M") - except (ValueError, TypeError): - return - if op == ".ADD.": - time_obj += timedelta(hours=num_value) - elif op == ".SUB.": - time_obj -= timedelta(hours=num_value) - else: - raise ValueError( - f"AutoScript 语法错误: Time 类型不支持操作 '{op}'" - ) - AutoScriptEngine._setField( - field_name, time_obj.strftime("%H:%M"), user_data - ) - elif field_type in ("String", "Boolean"): - raise ValueError( - f"AutoScript 语法错误: '{field_type}' 类型字段不支持操作运算" - ) - else: - raise ValueError( - f"AutoScript 语法错误: 未知字段类型 '{field_type}'" - ) - - -def _findConditionEnd( - upper_line: str -) -> int: - """ - Find the index of the closing parenthesis that matches the - opening parenthesis in a condition expression, handling nested - parentheses and optional ``THEN`` keyword. - - Args: - upper_line (str): The uppercased line text containing the - condition, e.g. ``"IF(A .BGT. B) THEN"``. - - Returns: - int: Index of the matching ``)``, or ``-1`` if no match - is found. - """ - - line = upper_line.rstrip() - if line.endswith(" THEN"): - line = line[:-5].rstrip() - paren_depth = 0 - start_found = False - for i, ch in enumerate(line): - if ch == "(": - paren_depth += 1 - start_found = True - elif ch == ")": - paren_depth -= 1 - if start_found and paren_depth == 0: - return i - return -1 diff --git a/src/dsl/__init__.py b/src/dsl/__init__.py deleted file mode 100644 index 878d740..0000000 --- a/src/dsl/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -""" - DSL module for the AutoLibrary project. - - Contains the AutoScript DSL engine and related components - for preprocessing user reservation data in timer tasks. - - Classes: - - AutoScriptEngine: AutoScript script engine class. -"""