trace_synthesis/final_summarize.py
yuyr a84d51a101 1. 增加r1生成综合策略代码和输出;
2. 增加tasks;
3. 增加analysis部分,对策略进行归纳分类,然后进行评测。
2025-04-17 17:40:15 +08:00

464 lines
19 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# final_summarize.py
import os
import json
import glob
import re
import logging
from dotenv import load_dotenv
from openai import OpenAI, RateLimitError, APIError
load_dotenv()
# --- Logging Configuration ---
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
# --- Configuration ---
try:
# Best practice: Use environment variables for API keys
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"))
# Trigger a dummy request or check credentials if necessary
client.models.list() # Check if API key is valid
logging.info("OpenAI client initialized successfully.")
except Exception as e:
logging.error(f"Failed to initialize OpenAI client. Ensure OPENAI_API_KEY and OPENAI_BASE_URL environment variables are set correctly. Error: {e}")
# Exit or handle the absence of API key appropriately
exit(1) # Exit if OpenAI client cannot be initialized
MODEL_NAME = "aiproxy/deepseek-reasoner" # Or specify another model like "gpt-4-turbo"
logging.info(f"Using OpenAI model: {MODEL_NAME}")
# Determine base directory assuming the script is run from its location or workspace root
# Adjust BASE_DIR if your script is located elsewhere relative to the data folders
try:
# Assumes script might be run from workspace root or its own directory
script_dir = os.path.dirname(os.path.abspath(__file__))
# Simple check for common directories to guess the base path
if os.path.exists(os.path.join(script_dir, 'tasks')) and os.path.exists(os.path.join(script_dir, 'trace')):
BASE_DIR = script_dir
elif os.path.exists('tasks') and os.path.exists('trace'): # Check relative to current working directory
BASE_DIR = os.getcwd()
else:
# Fallback or raise error if structure is not found
logging.warning("Could not automatically determine BASE_DIR. Assuming current working directory. Adjust if needed.")
BASE_DIR = os.getcwd()
logging.info(f"Using base directory: {BASE_DIR}")
TASKS_DIR = os.path.join(BASE_DIR, "tasks")
TRACE_DIR = os.path.join(BASE_DIR, "trace")
VIDEO_DIR = os.path.join(BASE_DIR, "video")
TRACE_EXTRACT_DIR = os.path.join(BASE_DIR, "trace_extract")
SUMMARY_DIR = os.path.join(BASE_DIR, "summary")
# DESIGN_DOC_PATH is no longer needed
# Ensure all necessary source directories exist
required_dirs = [TASKS_DIR, TRACE_DIR, VIDEO_DIR, TRACE_EXTRACT_DIR]
for d in required_dirs:
if not os.path.isdir(d):
logging.error(f"Required directory not found: {d}")
exit(1)
# Create summary directory if it doesn't exist
os.makedirs(SUMMARY_DIR, exist_ok=True)
logging.info(f"Summary directory ensured: {SUMMARY_DIR}")
except Exception as e:
logging.error(f"Error setting up directories: {e}")
exit(1)
# --- Fixed Prompt Sections (Embedded as Strings) ---
INSTRUCTION = """
- You are an expert in cleaning process data descriptions. Given a task, you are provided with a set of annotation description
data for a certain visual LLM related to human user operation videos. Plus, You are provided with full trace of playwright action,
whic includes action and url before and after the action.
- You need to analyze all the descriptive data and ultimately summarize a complete and reasonable user operation description that can accomplish the given task.
- For each strategy, give a clear list of the low level action sequence.
""".strip()
OUTPUT_FORMAT = """
- 先总结整个任务的Objective然后按照Strategy-SubStrategy-action三级层次来给出整个过程
- 接着给出整个操作流程后的观察和有趣的发现最后严格按照json格式输出三级层次的过程描述。
- 最后的输出json应该是包在```{json}```之间最底层动作需要包含描述、对应的playwright动作指令顺序编号以及具体指令内容。
""".strip()
EXAMPLE = """
### Complete User Operation Description to Display Labeled Issues in kkroening/ffmpeg-python
**Objective:** Filter and display all issues labeled as "question" in the kkroening/ffmpeg-python repository.
---
#### **Strategy 1: Navigate to the Repository**
**Low-Level Action Sequence:**
1. **Search for the user "kkroening"**
- Click the global search bar (placeholder: "Search GitLab").
- Type "kkroening" and press `Enter`.
2. **Select the user from results**
- Click the "Users" tab in search results.
- Click on "Karl Kroening @kkroening" in the user list.
3. **Access the repository**
- Navigate to the "Personal projects" section.
- Click on the "ffmpeg-python" project.
---
#### **Strategy 2: Filter Issues by Label**
**Low-Level Action Sequence:**
1. **Open the Issues tab**
- Scroll to the left sidebar menu.
- Click the "Issues" tab (displaying the count, e.g., "Issues 402").
2. **Apply label filtering**
- Click the search/filter bar in the issues list.
- Select the "Label" dropdown from the filter options.
- Type or select "question" from the label dropdown.
- Click the search/apply button to confirm the filter.
---
#### **Final Oberservation**
The issues list will refresh to show only issues with the "question" label. The URL will reflect the filter:
`.../ffmpeg-python/-/issues/?label_name[]=question`.
---
### Key Observations from Playwright Trace
- The final URL after filtering:
`http://ec2-3-135-39-80.../ffmpeg-python/-/issues/?label_name%5B%5D=question`
confirms the "question" label filter is applied.
- Critical interactions include selecting the "Label" dropdown and explicitly choosing "question" to refine results.
### Final output
```json
[{
"strategy" : "Navigate to the Repository",
"substrategies": [
{
"substrategy": "Search for the user \\"kkroening\\"",
"actions" : [
{
"description": "Click the global search bar (placeholder: \\"Search GitLab\\"). ",
"playwright_idx" : 18,
"playwright_instruction" : "frame.pressget_by_placeholder(\\"Search GitLab\\")Enter"
}
]
},
{
"substrategy": "Select the user from results",
"actions" : [
]
}
]
},
{
"strategy" : "Filter Issues by Label",
"substrategies" : [
]
}]
```
""".strip()
# --- Helper Functions ---
def read_file_content(filepath):
"""Reads the entire content of a file with error handling."""
try:
with open(filepath, 'r', encoding='utf-8') as f:
return f.read()
except FileNotFoundError:
logging.warning(f"File not found: {filepath}")
return None
except Exception as e:
logging.error(f"Error reading file {filepath}: {e}")
return None
def read_json_file(filepath):
"""Reads and parses a JSON file with error handling."""
content = read_file_content(filepath)
if content:
try:
return json.loads(content)
except json.JSONDecodeError as e:
logging.error(f"Error decoding JSON from {filepath}: {e}")
return None
def get_task_intent(task_id):
"""Gets the intent from the task JSON file."""
task_file = os.path.join(TASKS_DIR, f"{task_id}.json")
data = read_json_file(task_file)
if data and "intent" in data:
return data["intent"]
elif data:
logging.warning(f"'intent' field not found in {task_file}")
return "Intent not found in task file"
else:
logging.warning(f"Task file not found or invalid: {task_file}")
return "Task file not found or invalid"
def get_annotation_description(task_id):
"""Gets the annotation description from video trace files."""
annotation_dir = os.path.join(VIDEO_DIR, f"{task_id}.trace")
description_parts = []
if not os.path.isdir(annotation_dir):
logging.warning(f"Annotation directory not found: {annotation_dir}")
return "Annotation description directory not found."
txt_files = sorted(glob.glob(os.path.join(annotation_dir, "*.txt")))
if not txt_files:
logging.warning(f"No .txt files found in {annotation_dir}")
return "Annotation description files not found."
logging.info(f"Found {len(txt_files)} annotation file(s) for task {task_id}.")
for i, txt_file in enumerate(txt_files):
content = read_file_content(txt_file)
if content:
# Add Part header only if there are multiple files
part_header = f"## Part {i+1}\n" if len(txt_files) > 1 else ""
description_parts.append(f"{part_header}{content.strip()}")
else:
logging.warning(f"Annotation file is empty or could not be read: {txt_file}")
return "\n\n---\n\n".join(description_parts) # Use a separator for clarity
def get_playwright_actions(task_id):
"""Gets the Playwright actions from the extracted trace JSON file."""
actions_file = os.path.join(TRACE_EXTRACT_DIR, f"{task_id}.trace.zip.content.json")
data = read_json_file(actions_file)
if data:
# Return the JSON data directly, not a string dump, if the prompt structure expects it
# Based on the example prompt, it seems a string dump is needed.
return json.dumps(data, indent=2)
else:
logging.warning(f"Playwright actions file not found or invalid: {actions_file}")
return "Playwright actions not found or invalid."
def construct_prompt(task_id, task_intent, annotation_desc, playwright_actions):
"""Constructs the full prompt for the OpenAI API using globally loaded fixed sections."""
# Basic validation of dynamic parts
if "not found" in task_intent:
logging.warning(f"Task {task_id}: Constructing prompt with missing intent.")
if "not found" in annotation_desc:
logging.warning(f"Task {task_id}: Constructing prompt with missing annotation description.")
if "not found" in playwright_actions:
logging.warning(f"Task {task_id}: Constructing prompt with missing Playwright actions.")
prompt = f"""
# Instruction
{INSTRUCTION}
# Task
{task_intent}
# Annotation description
{annotation_desc}
# Playwright action
{playwright_actions}
# Output format
{OUTPUT_FORMAT}
# Example
{EXAMPLE}
"""
# Remove potential leading/trailing whitespace from the final formatted string
return prompt.strip()
def call_openai_api(prompt, task_id):
"""Calls the OpenAI API and returns the response content."""
if not prompt:
logging.error(f"Task {task_id}: Cannot call OpenAI API with an empty prompt.")
return None
logging.info(f"Task {task_id}: Calling OpenAI API...")
try:
response = client.chat.completions.create(
model=MODEL_NAME,
messages=[
# System message can be omitted if the main instruction is comprehensive
# {"role": "system", "content": "You are an expert in analyzing process data."},
{"role": "user", "content": prompt}
],
temperature=0.3, # Lower temperature for more deterministic summary
# max_tokens=4000, # Adjust based on expected output size and model limits
# top_p=1.0,
# frequency_penalty=0.0,
# presence_penalty=0.0
)
logging.info(f"Task {task_id}: Received response from OpenAI API.")
# Check if response and choices are valid
if response and response.choices:
message = response.choices[0].message
if message and message.content: # Also check if message.content is not None
return message.content
else:
logging.error(f"Task {task_id}: OpenAI response message or content is empty.")
return None
else:
logging.error(f"Task {task_id}: Invalid response structure from OpenAI: {response}")
return None
except RateLimitError as e:
logging.error(f"Task {task_id}: OpenAI API rate limit exceeded: {e}. Consider adding retries or slowing down.")
# Implement retry logic here if needed
return None
except APIError as e:
logging.error(f"Task {task_id}: OpenAI API returned an error: {e}")
return None
except Exception as e:
# Catch other potential errors (network issues, etc.)
logging.error(f"Task {task_id}: An unexpected error occurred during OpenAI API call: {e}")
return None
def extract_final_output(response_text, task_id):
"""Extracts the JSON block marked with ```json ... ``` from the API response."""
if not response_text:
logging.warning(f"Task {task_id}: Cannot extract JSON from empty response.")
return {"error": "Empty API response"}
# More robust regex to handle potential variations in ```json tag
match = re.search(r'```(?:json)?\s*([\s\S]+?)\s*```', response_text, re.IGNORECASE | re.DOTALL)
if match:
json_str = match.group(1).strip()
try:
# Attempt to load the extracted string as JSON
parsed_json = json.loads(json_str)
logging.info(f"Task {task_id}: Successfully extracted and parsed final output JSON.")
return parsed_json
except json.JSONDecodeError as e:
logging.error(f"Task {task_id}: Error decoding extracted JSON: {e}")
logging.debug(f"Task {task_id}: Raw extracted string: {json_str}")
# Return the raw string along with the error for debugging
return {"error": f"Failed to decode JSON: {e}", "raw_json_string": json_str}
else:
logging.warning(f"Task {task_id}: Could not find ```json block in the response.")
# Optionally return the full response if no JSON block is found
return {"error": "JSON block not found in response", "full_response": response_text}
# --- Main Logic ---
def main():
"""Main function to process all tasks."""
logging.info("Starting summarization process...")
# No need to load fixed sections anymore, they are global constants
if not os.path.isdir(TRACE_DIR):
logging.error(f"Trace directory not found: {TRACE_DIR}. Exiting.")
return
trace_files = glob.glob(os.path.join(TRACE_DIR, "*.trace.zip"))
if not trace_files:
logging.warning(f"No *.trace.zip files found in {TRACE_DIR}. Nothing to process.")
return
logging.info(f"Found {len(trace_files)} trace files to process.")
for trace_file in trace_files:
filename = os.path.basename(trace_file)
# Regex to capture task ID reliably
match = re.match(r"(\d+)\.trace\.zip", filename)
if not match:
logging.warning(f"Skipping file with unexpected format: {filename}")
continue
task_id = match.group(1)
logging.info(f"--- Processing Task ID: {task_id} ---")
# Define the output path early
output_filepath = os.path.join(SUMMARY_DIR, f"{task_id}.json")
# Skip if output already exists? Optional: add a flag to overwrite
# if os.path.exists(output_filepath):
# logging.info(f"Task {task_id}: Output file already exists. Skipping. ({output_filepath})")
# continue
# 1. Get Task Intent
task_intent = get_task_intent(task_id)
if "not found" in task_intent: # Handle specific error messages
logging.error(f"Task {task_id}: Critical data missing (Intent). Skipping task.")
continue
logging.info(f"Task {task_id}: Task Intent loaded.")
# 2. Get Annotation Description
annotation_desc = get_annotation_description(task_id)
if "not found" in annotation_desc:
logging.warning(f"Task {task_id}: Annotation description missing or empty.")
# Decide if you want to continue without annotations or skip
# continue # Uncomment to skip if annotations are mandatory
logging.info(f"Task {task_id}: Annotation Description loaded.")
# 3. Get Playwright Actions
playwright_actions = get_playwright_actions(task_id)
if "not found" in playwright_actions:
logging.error(f"Task {task_id}: Critical data missing (Playwright Actions). Skipping task.")
continue
logging.info(f"Task {task_id}: Playwright Actions loaded.")
# 4. Construct Prompt
prompt = construct_prompt(task_id, task_intent, annotation_desc, playwright_actions)
if not prompt:
logging.error(f"Task {task_id}: Failed to construct prompt. Skipping task.")
continue
logging.info(f"Task {task_id}: Prompt constructed successfully.")
# For debugging: save the prompt
prompt_debug_file = os.path.join(SUMMARY_DIR, f"{task_id}_prompt_debug.txt")
try:
with open(prompt_debug_file, 'w', encoding='utf-8') as pf:
pf.write(prompt)
except Exception as e:
logging.warning(f"Could not write debug prompt file: {e}")
# 5. Call OpenAI API
api_response_content = call_openai_api(prompt, task_id)
if not api_response_content:
logging.error(f"Task {task_id}: Failed to get response from OpenAI API. Skipping task.")
# Optionally save partial results or error info
error_data = {
"task_id": task_id,
"error": "Failed to get response from OpenAI API",
"task_intent": task_intent,
"prompt": prompt # Save prompt for debugging failed API calls
}
error_filepath = os.path.join(SUMMARY_DIR, f"{task_id}_error.json")
try:
with open(error_filepath, 'w', encoding='utf-8') as f:
json.dump(error_data, f, ensure_ascii=False, indent=2)
except Exception as e:
logging.error(f"Task {task_id}: Could not save error file: {e}")
continue # Skip to the next task
logging.info(f"Task {task_id}: Received API response.")
# 6. Extract Final Output JSON
final_output = extract_final_output(api_response_content, task_id)
if isinstance(final_output, dict) and "error" in final_output:
logging.warning(f"Task {task_id}: Could not extract valid final output JSON from response.")
# The final_output dict contains error details, it will be saved as is.
# 7. Save Results
output_data = {
"task_intent": task_intent,
"prompt": prompt,
"response": api_response_content, # Save the raw string response
"final_output": final_output # Save the parsed JSON or error dict
}
try:
with open(output_filepath, 'w', encoding='utf-8') as f:
json.dump(output_data, f, ensure_ascii=False, indent=2)
logging.info(f"Task {task_id}: Successfully saved results to {output_filepath}")
except Exception as e:
logging.error(f"Task {task_id}: Error saving results to {output_filepath}: {e}")
logging.info("--- Summarization process finished ---")
if __name__ == "__main__":
main()