464 lines
19 KiB
Python
464 lines
19 KiB
Python
# final_summarize.py
|
||
import os
|
||
import json
|
||
import glob
|
||
import re
|
||
import logging
|
||
from dotenv import load_dotenv
|
||
from openai import OpenAI, RateLimitError, APIError
|
||
|
||
load_dotenv()
|
||
|
||
# --- Logging Configuration ---
|
||
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
||
|
||
# --- Configuration ---
|
||
try:
|
||
# Best practice: Use environment variables for API keys
|
||
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"),
|
||
base_url=os.getenv("OPENAI_BASE_URL"))
|
||
# Trigger a dummy request or check credentials if necessary
|
||
client.models.list() # Check if API key is valid
|
||
logging.info("OpenAI client initialized successfully.")
|
||
except Exception as e:
|
||
logging.error(f"Failed to initialize OpenAI client. Ensure OPENAI_API_KEY and OPENAI_BASE_URL environment variables are set correctly. Error: {e}")
|
||
# Exit or handle the absence of API key appropriately
|
||
exit(1) # Exit if OpenAI client cannot be initialized
|
||
|
||
|
||
MODEL_NAME = "aiproxy/deepseek-reasoner" # Or specify another model like "gpt-4-turbo"
|
||
logging.info(f"Using OpenAI model: {MODEL_NAME}")
|
||
|
||
# Determine base directory assuming the script is run from its location or workspace root
|
||
# Adjust BASE_DIR if your script is located elsewhere relative to the data folders
|
||
try:
|
||
# Assumes script might be run from workspace root or its own directory
|
||
script_dir = os.path.dirname(os.path.abspath(__file__))
|
||
# Simple check for common directories to guess the base path
|
||
if os.path.exists(os.path.join(script_dir, 'tasks')) and os.path.exists(os.path.join(script_dir, 'trace')):
|
||
BASE_DIR = script_dir
|
||
elif os.path.exists('tasks') and os.path.exists('trace'): # Check relative to current working directory
|
||
BASE_DIR = os.getcwd()
|
||
else:
|
||
# Fallback or raise error if structure is not found
|
||
logging.warning("Could not automatically determine BASE_DIR. Assuming current working directory. Adjust if needed.")
|
||
BASE_DIR = os.getcwd()
|
||
|
||
logging.info(f"Using base directory: {BASE_DIR}")
|
||
|
||
TASKS_DIR = os.path.join(BASE_DIR, "tasks")
|
||
TRACE_DIR = os.path.join(BASE_DIR, "trace")
|
||
VIDEO_DIR = os.path.join(BASE_DIR, "video")
|
||
TRACE_EXTRACT_DIR = os.path.join(BASE_DIR, "trace_extract")
|
||
SUMMARY_DIR = os.path.join(BASE_DIR, "summary")
|
||
# DESIGN_DOC_PATH is no longer needed
|
||
|
||
# Ensure all necessary source directories exist
|
||
required_dirs = [TASKS_DIR, TRACE_DIR, VIDEO_DIR, TRACE_EXTRACT_DIR]
|
||
for d in required_dirs:
|
||
if not os.path.isdir(d):
|
||
logging.error(f"Required directory not found: {d}")
|
||
exit(1)
|
||
|
||
# Create summary directory if it doesn't exist
|
||
os.makedirs(SUMMARY_DIR, exist_ok=True)
|
||
logging.info(f"Summary directory ensured: {SUMMARY_DIR}")
|
||
|
||
except Exception as e:
|
||
logging.error(f"Error setting up directories: {e}")
|
||
exit(1)
|
||
|
||
|
||
# --- Fixed Prompt Sections (Embedded as Strings) ---
|
||
INSTRUCTION = """
|
||
- You are an expert in cleaning process data descriptions. Given a task, you are provided with a set of annotation description
|
||
data for a certain visual LLM related to human user operation videos. Plus, You are provided with full trace of playwright action,
|
||
whic includes action and url before and after the action.
|
||
- You need to analyze all the descriptive data and ultimately summarize a complete and reasonable user operation description that can accomplish the given task.
|
||
- For each strategy, give a clear list of the low level action sequence.
|
||
""".strip()
|
||
|
||
OUTPUT_FORMAT = """
|
||
- 先总结整个任务的Objective,然后按照Strategy-SubStrategy-action三级层次来给出整个过程,
|
||
- 接着给出整个操作流程后的观察和有趣的发现,最后严格按照json格式输出三级层次的过程描述。
|
||
- 最后的输出json应该是包在```{json}```之间,最底层动作需要包含描述、对应的playwright动作指令顺序编号,以及具体指令内容。
|
||
""".strip()
|
||
|
||
EXAMPLE = """
|
||
### Complete User Operation Description to Display Labeled Issues in kkroening/ffmpeg-python
|
||
|
||
**Objective:** Filter and display all issues labeled as "question" in the kkroening/ffmpeg-python repository.
|
||
|
||
---
|
||
|
||
#### **Strategy 1: Navigate to the Repository**
|
||
**Low-Level Action Sequence:**
|
||
1. **Search for the user "kkroening"**
|
||
- Click the global search bar (placeholder: "Search GitLab").
|
||
- Type "kkroening" and press `Enter`.
|
||
2. **Select the user from results**
|
||
- Click the "Users" tab in search results.
|
||
- Click on "Karl Kroening @kkroening" in the user list.
|
||
3. **Access the repository**
|
||
- Navigate to the "Personal projects" section.
|
||
- Click on the "ffmpeg-python" project.
|
||
|
||
---
|
||
|
||
#### **Strategy 2: Filter Issues by Label**
|
||
**Low-Level Action Sequence:**
|
||
1. **Open the Issues tab**
|
||
- Scroll to the left sidebar menu.
|
||
- Click the "Issues" tab (displaying the count, e.g., "Issues 402").
|
||
2. **Apply label filtering**
|
||
- Click the search/filter bar in the issues list.
|
||
- Select the "Label" dropdown from the filter options.
|
||
- Type or select "question" from the label dropdown.
|
||
- Click the search/apply button to confirm the filter.
|
||
|
||
---
|
||
|
||
#### **Final Oberservation**
|
||
The issues list will refresh to show only issues with the "question" label. The URL will reflect the filter:
|
||
`.../ffmpeg-python/-/issues/?label_name[]=question`.
|
||
|
||
---
|
||
|
||
### Key Observations from Playwright Trace
|
||
- The final URL after filtering:
|
||
`http://ec2-3-135-39-80.../ffmpeg-python/-/issues/?label_name%5B%5D=question`
|
||
confirms the "question" label filter is applied.
|
||
- Critical interactions include selecting the "Label" dropdown and explicitly choosing "question" to refine results.
|
||
|
||
### Final output
|
||
```json
|
||
[{
|
||
"strategy" : "Navigate to the Repository",
|
||
"substrategies": [
|
||
{
|
||
"substrategy": "Search for the user \\"kkroening\\"",
|
||
"actions" : [
|
||
{
|
||
"description": "Click the global search bar (placeholder: \\"Search GitLab\\"). ",
|
||
"playwright_idx" : 18,
|
||
"playwright_instruction" : "frame.pressget_by_placeholder(\\"Search GitLab\\")Enter"
|
||
}
|
||
]
|
||
},
|
||
{
|
||
"substrategy": "Select the user from results",
|
||
"actions" : [
|
||
]
|
||
}
|
||
]
|
||
},
|
||
{
|
||
"strategy" : "Filter Issues by Label",
|
||
"substrategies" : [
|
||
]
|
||
}]
|
||
```
|
||
""".strip()
|
||
|
||
# --- Helper Functions ---
|
||
def read_file_content(filepath):
|
||
"""Reads the entire content of a file with error handling."""
|
||
try:
|
||
with open(filepath, 'r', encoding='utf-8') as f:
|
||
return f.read()
|
||
except FileNotFoundError:
|
||
logging.warning(f"File not found: {filepath}")
|
||
return None
|
||
except Exception as e:
|
||
logging.error(f"Error reading file {filepath}: {e}")
|
||
return None
|
||
|
||
def read_json_file(filepath):
|
||
"""Reads and parses a JSON file with error handling."""
|
||
content = read_file_content(filepath)
|
||
if content:
|
||
try:
|
||
return json.loads(content)
|
||
except json.JSONDecodeError as e:
|
||
logging.error(f"Error decoding JSON from {filepath}: {e}")
|
||
return None
|
||
|
||
def get_task_intent(task_id):
|
||
"""Gets the intent from the task JSON file."""
|
||
task_file = os.path.join(TASKS_DIR, f"{task_id}.json")
|
||
data = read_json_file(task_file)
|
||
if data and "intent" in data:
|
||
return data["intent"]
|
||
elif data:
|
||
logging.warning(f"'intent' field not found in {task_file}")
|
||
return "Intent not found in task file"
|
||
else:
|
||
logging.warning(f"Task file not found or invalid: {task_file}")
|
||
return "Task file not found or invalid"
|
||
|
||
def get_annotation_description(task_id):
|
||
"""Gets the annotation description from video trace files."""
|
||
annotation_dir = os.path.join(VIDEO_DIR, f"{task_id}.trace")
|
||
description_parts = []
|
||
|
||
if not os.path.isdir(annotation_dir):
|
||
logging.warning(f"Annotation directory not found: {annotation_dir}")
|
||
return "Annotation description directory not found."
|
||
|
||
txt_files = sorted(glob.glob(os.path.join(annotation_dir, "*.txt")))
|
||
|
||
if not txt_files:
|
||
logging.warning(f"No .txt files found in {annotation_dir}")
|
||
return "Annotation description files not found."
|
||
|
||
logging.info(f"Found {len(txt_files)} annotation file(s) for task {task_id}.")
|
||
for i, txt_file in enumerate(txt_files):
|
||
content = read_file_content(txt_file)
|
||
if content:
|
||
# Add Part header only if there are multiple files
|
||
part_header = f"## Part {i+1}\n" if len(txt_files) > 1 else ""
|
||
description_parts.append(f"{part_header}{content.strip()}")
|
||
else:
|
||
logging.warning(f"Annotation file is empty or could not be read: {txt_file}")
|
||
|
||
|
||
return "\n\n---\n\n".join(description_parts) # Use a separator for clarity
|
||
|
||
def get_playwright_actions(task_id):
|
||
"""Gets the Playwright actions from the extracted trace JSON file."""
|
||
actions_file = os.path.join(TRACE_EXTRACT_DIR, f"{task_id}.trace.zip.content.json")
|
||
data = read_json_file(actions_file)
|
||
if data:
|
||
# Return the JSON data directly, not a string dump, if the prompt structure expects it
|
||
# Based on the example prompt, it seems a string dump is needed.
|
||
return json.dumps(data, indent=2)
|
||
else:
|
||
logging.warning(f"Playwright actions file not found or invalid: {actions_file}")
|
||
return "Playwright actions not found or invalid."
|
||
|
||
def construct_prompt(task_id, task_intent, annotation_desc, playwright_actions):
|
||
"""Constructs the full prompt for the OpenAI API using globally loaded fixed sections."""
|
||
# Basic validation of dynamic parts
|
||
if "not found" in task_intent:
|
||
logging.warning(f"Task {task_id}: Constructing prompt with missing intent.")
|
||
if "not found" in annotation_desc:
|
||
logging.warning(f"Task {task_id}: Constructing prompt with missing annotation description.")
|
||
if "not found" in playwright_actions:
|
||
logging.warning(f"Task {task_id}: Constructing prompt with missing Playwright actions.")
|
||
|
||
|
||
prompt = f"""
|
||
# Instruction
|
||
{INSTRUCTION}
|
||
|
||
# Task
|
||
{task_intent}
|
||
|
||
# Annotation description
|
||
{annotation_desc}
|
||
|
||
# Playwright action
|
||
{playwright_actions}
|
||
|
||
# Output format
|
||
{OUTPUT_FORMAT}
|
||
|
||
# Example
|
||
{EXAMPLE}
|
||
"""
|
||
# Remove potential leading/trailing whitespace from the final formatted string
|
||
return prompt.strip()
|
||
|
||
|
||
def call_openai_api(prompt, task_id):
|
||
"""Calls the OpenAI API and returns the response content."""
|
||
if not prompt:
|
||
logging.error(f"Task {task_id}: Cannot call OpenAI API with an empty prompt.")
|
||
return None
|
||
logging.info(f"Task {task_id}: Calling OpenAI API...")
|
||
try:
|
||
response = client.chat.completions.create(
|
||
model=MODEL_NAME,
|
||
messages=[
|
||
# System message can be omitted if the main instruction is comprehensive
|
||
# {"role": "system", "content": "You are an expert in analyzing process data."},
|
||
{"role": "user", "content": prompt}
|
||
],
|
||
temperature=0.3, # Lower temperature for more deterministic summary
|
||
# max_tokens=4000, # Adjust based on expected output size and model limits
|
||
# top_p=1.0,
|
||
# frequency_penalty=0.0,
|
||
# presence_penalty=0.0
|
||
)
|
||
logging.info(f"Task {task_id}: Received response from OpenAI API.")
|
||
# Check if response and choices are valid
|
||
if response and response.choices:
|
||
message = response.choices[0].message
|
||
if message and message.content: # Also check if message.content is not None
|
||
return message.content
|
||
else:
|
||
logging.error(f"Task {task_id}: OpenAI response message or content is empty.")
|
||
return None
|
||
else:
|
||
logging.error(f"Task {task_id}: Invalid response structure from OpenAI: {response}")
|
||
return None
|
||
except RateLimitError as e:
|
||
logging.error(f"Task {task_id}: OpenAI API rate limit exceeded: {e}. Consider adding retries or slowing down.")
|
||
# Implement retry logic here if needed
|
||
return None
|
||
except APIError as e:
|
||
logging.error(f"Task {task_id}: OpenAI API returned an error: {e}")
|
||
return None
|
||
except Exception as e:
|
||
# Catch other potential errors (network issues, etc.)
|
||
logging.error(f"Task {task_id}: An unexpected error occurred during OpenAI API call: {e}")
|
||
return None
|
||
|
||
def extract_final_output(response_text, task_id):
|
||
"""Extracts the JSON block marked with ```json ... ``` from the API response."""
|
||
if not response_text:
|
||
logging.warning(f"Task {task_id}: Cannot extract JSON from empty response.")
|
||
return {"error": "Empty API response"}
|
||
|
||
# More robust regex to handle potential variations in ```json tag
|
||
match = re.search(r'```(?:json)?\s*([\s\S]+?)\s*```', response_text, re.IGNORECASE | re.DOTALL)
|
||
if match:
|
||
json_str = match.group(1).strip()
|
||
try:
|
||
# Attempt to load the extracted string as JSON
|
||
parsed_json = json.loads(json_str)
|
||
logging.info(f"Task {task_id}: Successfully extracted and parsed final output JSON.")
|
||
return parsed_json
|
||
except json.JSONDecodeError as e:
|
||
logging.error(f"Task {task_id}: Error decoding extracted JSON: {e}")
|
||
logging.debug(f"Task {task_id}: Raw extracted string: {json_str}")
|
||
# Return the raw string along with the error for debugging
|
||
return {"error": f"Failed to decode JSON: {e}", "raw_json_string": json_str}
|
||
else:
|
||
logging.warning(f"Task {task_id}: Could not find ```json block in the response.")
|
||
# Optionally return the full response if no JSON block is found
|
||
return {"error": "JSON block not found in response", "full_response": response_text}
|
||
|
||
# --- Main Logic ---
|
||
def main():
|
||
"""Main function to process all tasks."""
|
||
logging.info("Starting summarization process...")
|
||
# No need to load fixed sections anymore, they are global constants
|
||
|
||
if not os.path.isdir(TRACE_DIR):
|
||
logging.error(f"Trace directory not found: {TRACE_DIR}. Exiting.")
|
||
return
|
||
|
||
trace_files = glob.glob(os.path.join(TRACE_DIR, "*.trace.zip"))
|
||
|
||
if not trace_files:
|
||
logging.warning(f"No *.trace.zip files found in {TRACE_DIR}. Nothing to process.")
|
||
return
|
||
|
||
logging.info(f"Found {len(trace_files)} trace files to process.")
|
||
|
||
for trace_file in trace_files:
|
||
filename = os.path.basename(trace_file)
|
||
# Regex to capture task ID reliably
|
||
match = re.match(r"(\d+)\.trace\.zip", filename)
|
||
if not match:
|
||
logging.warning(f"Skipping file with unexpected format: {filename}")
|
||
continue
|
||
|
||
task_id = match.group(1)
|
||
logging.info(f"--- Processing Task ID: {task_id} ---")
|
||
|
||
# Define the output path early
|
||
output_filepath = os.path.join(SUMMARY_DIR, f"{task_id}.json")
|
||
|
||
# Skip if output already exists? Optional: add a flag to overwrite
|
||
# if os.path.exists(output_filepath):
|
||
# logging.info(f"Task {task_id}: Output file already exists. Skipping. ({output_filepath})")
|
||
# continue
|
||
|
||
|
||
# 1. Get Task Intent
|
||
task_intent = get_task_intent(task_id)
|
||
if "not found" in task_intent: # Handle specific error messages
|
||
logging.error(f"Task {task_id}: Critical data missing (Intent). Skipping task.")
|
||
continue
|
||
logging.info(f"Task {task_id}: Task Intent loaded.")
|
||
|
||
# 2. Get Annotation Description
|
||
annotation_desc = get_annotation_description(task_id)
|
||
if "not found" in annotation_desc:
|
||
logging.warning(f"Task {task_id}: Annotation description missing or empty.")
|
||
# Decide if you want to continue without annotations or skip
|
||
# continue # Uncomment to skip if annotations are mandatory
|
||
|
||
logging.info(f"Task {task_id}: Annotation Description loaded.")
|
||
|
||
# 3. Get Playwright Actions
|
||
playwright_actions = get_playwright_actions(task_id)
|
||
if "not found" in playwright_actions:
|
||
logging.error(f"Task {task_id}: Critical data missing (Playwright Actions). Skipping task.")
|
||
continue
|
||
logging.info(f"Task {task_id}: Playwright Actions loaded.")
|
||
|
||
# 4. Construct Prompt
|
||
prompt = construct_prompt(task_id, task_intent, annotation_desc, playwright_actions)
|
||
if not prompt:
|
||
logging.error(f"Task {task_id}: Failed to construct prompt. Skipping task.")
|
||
continue
|
||
logging.info(f"Task {task_id}: Prompt constructed successfully.")
|
||
# For debugging: save the prompt
|
||
prompt_debug_file = os.path.join(SUMMARY_DIR, f"{task_id}_prompt_debug.txt")
|
||
try:
|
||
with open(prompt_debug_file, 'w', encoding='utf-8') as pf:
|
||
pf.write(prompt)
|
||
except Exception as e:
|
||
logging.warning(f"Could not write debug prompt file: {e}")
|
||
|
||
|
||
# 5. Call OpenAI API
|
||
api_response_content = call_openai_api(prompt, task_id)
|
||
|
||
if not api_response_content:
|
||
logging.error(f"Task {task_id}: Failed to get response from OpenAI API. Skipping task.")
|
||
# Optionally save partial results or error info
|
||
error_data = {
|
||
"task_id": task_id,
|
||
"error": "Failed to get response from OpenAI API",
|
||
"task_intent": task_intent,
|
||
"prompt": prompt # Save prompt for debugging failed API calls
|
||
}
|
||
error_filepath = os.path.join(SUMMARY_DIR, f"{task_id}_error.json")
|
||
try:
|
||
with open(error_filepath, 'w', encoding='utf-8') as f:
|
||
json.dump(error_data, f, ensure_ascii=False, indent=2)
|
||
except Exception as e:
|
||
logging.error(f"Task {task_id}: Could not save error file: {e}")
|
||
continue # Skip to the next task
|
||
|
||
logging.info(f"Task {task_id}: Received API response.")
|
||
|
||
# 6. Extract Final Output JSON
|
||
final_output = extract_final_output(api_response_content, task_id)
|
||
if isinstance(final_output, dict) and "error" in final_output:
|
||
logging.warning(f"Task {task_id}: Could not extract valid final output JSON from response.")
|
||
# The final_output dict contains error details, it will be saved as is.
|
||
|
||
# 7. Save Results
|
||
output_data = {
|
||
"task_intent": task_intent,
|
||
"prompt": prompt,
|
||
"response": api_response_content, # Save the raw string response
|
||
"final_output": final_output # Save the parsed JSON or error dict
|
||
}
|
||
|
||
try:
|
||
with open(output_filepath, 'w', encoding='utf-8') as f:
|
||
json.dump(output_data, f, ensure_ascii=False, indent=2)
|
||
logging.info(f"Task {task_id}: Successfully saved results to {output_filepath}")
|
||
except Exception as e:
|
||
logging.error(f"Task {task_id}: Error saving results to {output_filepath}: {e}")
|
||
|
||
logging.info("--- Summarization process finished ---")
|
||
|
||
if __name__ == "__main__":
|
||
main() |