Merge pull request #126 from tageniu/new-feature-branch

Implement a Ctrl+C signal handler to pause for debug purposes
Esse commit está contido em:
Vincent Tu
2025-08-15 13:37:31 -07:00
commit de GitHub
3 arquivos alterados com 247 adições e 3 exclusões
+82 -1
Ver Arquivo
@@ -4,6 +4,7 @@ import io
import logging
import os
import platform
import signal
import sys
import time
@@ -13,6 +14,71 @@ from gui_agents.s1.core.AgentS import GraphSearchAgent, UIAgent
current_platform = platform.system().lower()
# Global flag to track pause state for debugging
paused = False
def get_char():
"""Get a single character from stdin without pressing Enter"""
try:
# Import termios and tty on Unix-like systems
if platform.system() in ["Darwin", "Linux"]:
import termios
import tty
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
else:
# Windows fallback
import msvcrt
return msvcrt.getch().decode('utf-8', errors='ignore')
except:
return input() # Fallback for non-terminal environments
def signal_handler(signum, frame):
"""Handle Ctrl+C signal for debugging during agent execution"""
global paused
if not paused:
print("\n\n🔸 Agent-S Workflow Paused 🔸")
print("=" * 50)
print("Options:")
print(" • Press Ctrl+C again to quit")
print(" • Press Esc to resume workflow")
print("=" * 50)
paused = True
while paused:
try:
print("\n[PAUSED] Waiting for input... ", end="", flush=True)
char = get_char()
if ord(char) == 3: # Ctrl+C
print("\n\n🛑 Exiting Agent-S...")
sys.exit(0)
elif ord(char) == 27: # Esc
print("\n\n▶️ Resuming Agent-S workflow...")
paused = False
break
else:
print(f"\n Unknown command: '{char}' (ord: {ord(char)})")
except KeyboardInterrupt:
print("\n\n🛑 Exiting Agent-S...")
sys.exit(0)
else:
# Already paused, second Ctrl+C means quit
print("\n\n🛑 Exiting Agent-S...")
sys.exit(0)
# Set up signal handler for Ctrl+C
signal.signal(signal.SIGINT, signal_handler)
if current_platform == "darwin":
from gui_agents.s1.aci.MacOSACI import MacOSACI, UIElement
elif current_platform == "linux":
@@ -81,10 +147,14 @@ def show_permission_dialog(code: str, action_description: str):
def run_agent(agent: UIAgent, instruction: str):
global paused
obs = {}
traj = "Task:\n" + instruction
subtask_traj = ""
for _ in range(15):
for step in range(15):
# Check if we're in paused state and wait
while paused:
time.sleep(0.1)
obs["accessibility_tree"] = UIElement.systemWideElement()
# Get screen shot using pyautogui.
@@ -100,6 +170,12 @@ def run_agent(agent: UIAgent, instruction: str):
# Convert to base64 string.
obs["screenshot"] = screenshot_bytes
# Check again for pause state before prediction
while paused:
time.sleep(0.1)
print(f"\n🔄 Step {step + 1}/15: Getting next action from agent...")
# Get next action code from the agent
info, code = agent.predict(instruction=instruction, observation=obs)
@@ -120,6 +196,7 @@ def run_agent(agent: UIAgent, instruction: str):
continue
if "wait" in code[0].lower():
print("⏳ Agent requested wait...")
time.sleep(5)
continue
@@ -127,6 +204,10 @@ def run_agent(agent: UIAgent, instruction: str):
time.sleep(1.0)
print("EXECUTING CODE:", code[0])
# Check for pause state before execution
while paused:
time.sleep(0.1)
# Ask for permission before executing
exec(code[0])
time.sleep(1.0)
+83 -1
Ver Arquivo
@@ -5,6 +5,7 @@ import logging
import os
import platform
import pyautogui
import signal
import sys
import time
@@ -15,6 +16,71 @@ from gui_agents.s2.agents.agent_s import AgentS2
current_platform = platform.system().lower()
# Global flag to track pause state for debugging
paused = False
def get_char():
"""Get a single character from stdin without pressing Enter"""
try:
# Import termios and tty on Unix-like systems
if platform.system() in ["Darwin", "Linux"]:
import termios
import tty
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
else:
# Windows fallback
import msvcrt
return msvcrt.getch().decode('utf-8', errors='ignore')
except:
return input() # Fallback for non-terminal environments
def signal_handler(signum, frame):
"""Handle Ctrl+C signal for debugging during agent execution"""
global paused
if not paused:
print("\n\n🔸 Agent-S Workflow Paused 🔸")
print("=" * 50)
print("Options:")
print(" • Press Ctrl+C again to quit")
print(" • Press Esc to resume workflow")
print("=" * 50)
paused = True
while paused:
try:
print("\n[PAUSED] Waiting for input... ", end="", flush=True)
char = get_char()
if ord(char) == 3: # Ctrl+C
print("\n\n🛑 Exiting Agent-S...")
sys.exit(0)
elif ord(char) == 27: # Esc
print("\n\n▶️ Resuming Agent-S workflow...")
paused = False
break
else:
print(f"\n Unknown command: '{char}' (ord: {ord(char)})")
except KeyboardInterrupt:
print("\n\n🛑 Exiting Agent-S...")
sys.exit(0)
else:
# Already paused, second Ctrl+C means quit
print("\n\n🛑 Exiting Agent-S...")
sys.exit(0)
# Set up signal handler for Ctrl+C
signal.signal(signal.SIGINT, signal_handler)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
@@ -81,10 +147,15 @@ def scale_screen_dimensions(width: int, height: int, max_dim_size: int):
def run_agent(agent, instruction: str, scaled_width: int, scaled_height: int):
global paused
obs = {}
traj = "Task:\n" + instruction
subtask_traj = ""
for _ in range(15):
for step in range(15):
# Check if we're in paused state and wait
while paused:
time.sleep(0.1)
# Get screen shot using pyautogui
screenshot = pyautogui.screenshot()
screenshot = screenshot.resize((scaled_width, scaled_height), Image.LANCZOS)
@@ -98,6 +169,12 @@ def run_agent(agent, instruction: str, scaled_width: int, scaled_height: int):
# Convert to base64 string.
obs["screenshot"] = screenshot_bytes
# Check again for pause state before prediction
while paused:
time.sleep(0.1)
print(f"\n🔄 Step {step + 1}/15: Getting next action from agent...")
# Get next action code from the agent
info, code = agent.predict(instruction=instruction, observation=obs)
@@ -118,6 +195,7 @@ def run_agent(agent, instruction: str, scaled_width: int, scaled_height: int):
continue
if "wait" in code[0].lower():
print("⏳ Agent requested wait...")
time.sleep(5)
continue
@@ -125,6 +203,10 @@ def run_agent(agent, instruction: str, scaled_width: int, scaled_height: int):
time.sleep(1.0)
print("EXECUTING CODE:", code[0])
# Check for pause state before execution
while paused:
time.sleep(0.1)
# Ask for permission before executing
exec(code[0])
time.sleep(1.0)
+82 -1
Ver Arquivo
@@ -5,6 +5,7 @@ import logging
import os
import platform
import pyautogui
import signal
import sys
import time
@@ -15,6 +16,71 @@ from gui_agents.s2_5.agents.agent_s import AgentS2_5
current_platform = platform.system().lower()
# Global flag to track pause state for debugging
paused = False
def get_char():
"""Get a single character from stdin without pressing Enter"""
try:
# Import termios and tty on Unix-like systems
if platform.system() in ["Darwin", "Linux"]:
import termios
import tty
fd = sys.stdin.fileno()
old_settings = termios.tcgetattr(fd)
try:
tty.setraw(sys.stdin.fileno())
ch = sys.stdin.read(1)
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old_settings)
return ch
else:
# Windows fallback
import msvcrt
return msvcrt.getch().decode('utf-8', errors='ignore')
except:
return input() # Fallback for non-terminal environments
def signal_handler(signum, frame):
"""Handle Ctrl+C signal for debugging during agent execution"""
global paused
if not paused:
print("\n\n🔸 Agent-S Workflow Paused 🔸")
print("=" * 50)
print("Options:")
print(" • Press Ctrl+C again to quit")
print(" • Press Esc to resume workflow")
print("=" * 50)
paused = True
while paused:
try:
print("\n[PAUSED] Waiting for input... ", end="", flush=True)
char = get_char()
if ord(char) == 3: # Ctrl+C
print("\n\n🛑 Exiting Agent-S...")
sys.exit(0)
elif ord(char) == 27: # Esc
print("\n\n▶️ Resuming Agent-S workflow...")
paused = False
break
else:
print(f"\n Unknown command: '{char}' (ord: {ord(char)})")
except KeyboardInterrupt:
print("\n\n🛑 Exiting Agent-S...")
sys.exit(0)
else:
# Already paused, second Ctrl+C means quit
print("\n\n🛑 Exiting Agent-S...")
sys.exit(0)
# Set up signal handler for Ctrl+C
signal.signal(signal.SIGINT, signal_handler)
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
@@ -81,10 +147,14 @@ def scale_screen_dimensions(width: int, height: int, max_dim_size: int):
def run_agent(agent, instruction: str, scaled_width: int, scaled_height: int):
global paused
obs = {}
traj = "Task:\n" + instruction
subtask_traj = ""
for _ in range(15):
for step in range(15):
# Check if we're in paused state and wait
while paused:
time.sleep(0.1)
# Get screen shot using pyautogui
screenshot = pyautogui.screenshot()
screenshot = screenshot.resize((scaled_width, scaled_height), Image.LANCZOS)
@@ -98,6 +168,12 @@ def run_agent(agent, instruction: str, scaled_width: int, scaled_height: int):
# Convert to base64 string.
obs["screenshot"] = screenshot_bytes
# Check again for pause state before prediction
while paused:
time.sleep(0.1)
print(f"\n🔄 Step {step + 1}/15: Getting next action from agent...")
# Get next action code from the agent
info, code = agent.predict(instruction=instruction, observation=obs)
@@ -117,6 +193,7 @@ def run_agent(agent, instruction: str, scaled_width: int, scaled_height: int):
continue
if "wait" in code[0].lower():
print("⏳ Agent requested wait...")
time.sleep(5)
continue
@@ -124,6 +201,10 @@ def run_agent(agent, instruction: str, scaled_width: int, scaled_height: int):
time.sleep(1.0)
print("EXECUTING CODE:", code[0])
# Check for pause state before execution
while paused:
time.sleep(0.1)
# Ask for permission before executing
exec(code[0])
time.sleep(1.0)