# final_summarize.py import os import json import glob import re import logging from dotenv import load_dotenv from openai import OpenAI, RateLimitError, APIError load_dotenv() # --- Logging Configuration --- logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # --- Configuration --- try: # Best practice: Use environment variables for API keys client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"), base_url=os.getenv("OPENAI_BASE_URL")) # Trigger a dummy request or check credentials if necessary client.models.list() # Check if API key is valid logging.info("OpenAI client initialized successfully.") except Exception as e: logging.error(f"Failed to initialize OpenAI client. Ensure OPENAI_API_KEY and OPENAI_BASE_URL environment variables are set correctly. Error: {e}") # Exit or handle the absence of API key appropriately exit(1) # Exit if OpenAI client cannot be initialized MODEL_NAME = "aiproxy/deepseek-reasoner" # Or specify another model like "gpt-4-turbo" logging.info(f"Using OpenAI model: {MODEL_NAME}") # Determine base directory assuming the script is run from its location or workspace root # Adjust BASE_DIR if your script is located elsewhere relative to the data folders try: # Assumes script might be run from workspace root or its own directory script_dir = os.path.dirname(os.path.abspath(__file__)) # Simple check for common directories to guess the base path if os.path.exists(os.path.join(script_dir, 'tasks')) and os.path.exists(os.path.join(script_dir, 'trace')): BASE_DIR = script_dir elif os.path.exists('tasks') and os.path.exists('trace'): # Check relative to current working directory BASE_DIR = os.getcwd() else: # Fallback or raise error if structure is not found logging.warning("Could not automatically determine BASE_DIR. Assuming current working directory. Adjust if needed.") BASE_DIR = os.getcwd() logging.info(f"Using base directory: {BASE_DIR}") TASKS_DIR = os.path.join(BASE_DIR, "tasks") TRACE_DIR = os.path.join(BASE_DIR, "trace") VIDEO_DIR = os.path.join(BASE_DIR, "video") TRACE_EXTRACT_DIR = os.path.join(BASE_DIR, "trace_extract") SUMMARY_DIR = os.path.join(BASE_DIR, "summary") # DESIGN_DOC_PATH is no longer needed # Ensure all necessary source directories exist required_dirs = [TASKS_DIR, TRACE_DIR, VIDEO_DIR, TRACE_EXTRACT_DIR] for d in required_dirs: if not os.path.isdir(d): logging.error(f"Required directory not found: {d}") exit(1) # Create summary directory if it doesn't exist os.makedirs(SUMMARY_DIR, exist_ok=True) logging.info(f"Summary directory ensured: {SUMMARY_DIR}") except Exception as e: logging.error(f"Error setting up directories: {e}") exit(1) # --- Fixed Prompt Sections (Embedded as Strings) --- INSTRUCTION = """ - You are an expert in cleaning process data descriptions. Given a task, you are provided with a set of annotation description data for a certain visual LLM related to human user operation videos. Plus, You are provided with full trace of playwright action, whic includes action and url before and after the action. - You need to analyze all the descriptive data and ultimately summarize a complete and reasonable user operation description that can accomplish the given task. - For each strategy, give a clear list of the low level action sequence. """.strip() OUTPUT_FORMAT = """ - 先总结整个任务的Objective,然后按照Strategy-SubStrategy-action三级层次来给出整个过程, - 接着给出整个操作流程后的观察和有趣的发现,最后严格按照json格式输出三级层次的过程描述。 - 最后的输出json应该是包在```{json}```之间,最底层动作需要包含描述、对应的playwright动作指令顺序编号,以及具体指令内容。 """.strip() EXAMPLE = """ ### Complete User Operation Description to Display Labeled Issues in kkroening/ffmpeg-python **Objective:** Filter and display all issues labeled as "question" in the kkroening/ffmpeg-python repository. --- #### **Strategy 1: Navigate to the Repository** **Low-Level Action Sequence:** 1. **Search for the user "kkroening"** - Click the global search bar (placeholder: "Search GitLab"). - Type "kkroening" and press `Enter`. 2. **Select the user from results** - Click the "Users" tab in search results. - Click on "Karl Kroening @kkroening" in the user list. 3. **Access the repository** - Navigate to the "Personal projects" section. - Click on the "ffmpeg-python" project. --- #### **Strategy 2: Filter Issues by Label** **Low-Level Action Sequence:** 1. **Open the Issues tab** - Scroll to the left sidebar menu. - Click the "Issues" tab (displaying the count, e.g., "Issues 402"). 2. **Apply label filtering** - Click the search/filter bar in the issues list. - Select the "Label" dropdown from the filter options. - Type or select "question" from the label dropdown. - Click the search/apply button to confirm the filter. --- #### **Final Oberservation** The issues list will refresh to show only issues with the "question" label. The URL will reflect the filter: `.../ffmpeg-python/-/issues/?label_name[]=question`. --- ### Key Observations from Playwright Trace - The final URL after filtering: `http://ec2-3-135-39-80.../ffmpeg-python/-/issues/?label_name%5B%5D=question` confirms the "question" label filter is applied. - Critical interactions include selecting the "Label" dropdown and explicitly choosing "question" to refine results. ### Final output ```json [{ "strategy" : "Navigate to the Repository", "substrategies": [ { "substrategy": "Search for the user \\"kkroening\\"", "actions" : [ { "description": "Click the global search bar (placeholder: \\"Search GitLab\\"). ", "playwright_idx" : 18, "playwright_instruction" : "frame.pressget_by_placeholder(\\"Search GitLab\\")Enter" } ] }, { "substrategy": "Select the user from results", "actions" : [ ] } ] }, { "strategy" : "Filter Issues by Label", "substrategies" : [ ] }] ``` """.strip() # --- Helper Functions --- def read_file_content(filepath): """Reads the entire content of a file with error handling.""" try: with open(filepath, 'r', encoding='utf-8') as f: return f.read() except FileNotFoundError: logging.warning(f"File not found: {filepath}") return None except Exception as e: logging.error(f"Error reading file {filepath}: {e}") return None def read_json_file(filepath): """Reads and parses a JSON file with error handling.""" content = read_file_content(filepath) if content: try: return json.loads(content) except json.JSONDecodeError as e: logging.error(f"Error decoding JSON from {filepath}: {e}") return None def get_task_intent(task_id): """Gets the intent from the task JSON file.""" task_file = os.path.join(TASKS_DIR, f"{task_id}.json") data = read_json_file(task_file) if data and "intent" in data: return data["intent"] elif data: logging.warning(f"'intent' field not found in {task_file}") return "Intent not found in task file" else: logging.warning(f"Task file not found or invalid: {task_file}") return "Task file not found or invalid" def get_annotation_description(task_id): """Gets the annotation description from video trace files.""" annotation_dir = os.path.join(VIDEO_DIR, f"{task_id}.trace") description_parts = [] if not os.path.isdir(annotation_dir): logging.warning(f"Annotation directory not found: {annotation_dir}") return "Annotation description directory not found." txt_files = sorted(glob.glob(os.path.join(annotation_dir, "*.txt"))) if not txt_files: logging.warning(f"No .txt files found in {annotation_dir}") return "Annotation description files not found." logging.info(f"Found {len(txt_files)} annotation file(s) for task {task_id}.") for i, txt_file in enumerate(txt_files): content = read_file_content(txt_file) if content: # Add Part header only if there are multiple files part_header = f"## Part {i+1}\n" if len(txt_files) > 1 else "" description_parts.append(f"{part_header}{content.strip()}") else: logging.warning(f"Annotation file is empty or could not be read: {txt_file}") return "\n\n---\n\n".join(description_parts) # Use a separator for clarity def get_playwright_actions(task_id): """Gets the Playwright actions from the extracted trace JSON file.""" actions_file = os.path.join(TRACE_EXTRACT_DIR, f"{task_id}.trace.zip.content.json") data = read_json_file(actions_file) if data: # Return the JSON data directly, not a string dump, if the prompt structure expects it # Based on the example prompt, it seems a string dump is needed. return json.dumps(data, indent=2) else: logging.warning(f"Playwright actions file not found or invalid: {actions_file}") return "Playwright actions not found or invalid." def construct_prompt(task_id, task_intent, annotation_desc, playwright_actions): """Constructs the full prompt for the OpenAI API using globally loaded fixed sections.""" # Basic validation of dynamic parts if "not found" in task_intent: logging.warning(f"Task {task_id}: Constructing prompt with missing intent.") if "not found" in annotation_desc: logging.warning(f"Task {task_id}: Constructing prompt with missing annotation description.") if "not found" in playwright_actions: logging.warning(f"Task {task_id}: Constructing prompt with missing Playwright actions.") prompt = f""" # Instruction {INSTRUCTION} # Task {task_intent} # Annotation description {annotation_desc} # Playwright action {playwright_actions} # Output format {OUTPUT_FORMAT} # Example {EXAMPLE} """ # Remove potential leading/trailing whitespace from the final formatted string return prompt.strip() def call_openai_api(prompt, task_id): """Calls the OpenAI API and returns the response content.""" if not prompt: logging.error(f"Task {task_id}: Cannot call OpenAI API with an empty prompt.") return None logging.info(f"Task {task_id}: Calling OpenAI API...") try: response = client.chat.completions.create( model=MODEL_NAME, messages=[ # System message can be omitted if the main instruction is comprehensive # {"role": "system", "content": "You are an expert in analyzing process data."}, {"role": "user", "content": prompt} ], temperature=0.3, # Lower temperature for more deterministic summary # max_tokens=4000, # Adjust based on expected output size and model limits # top_p=1.0, # frequency_penalty=0.0, # presence_penalty=0.0 ) logging.info(f"Task {task_id}: Received response from OpenAI API.") # Check if response and choices are valid if response and response.choices: message = response.choices[0].message if message and message.content: # Also check if message.content is not None return message.content else: logging.error(f"Task {task_id}: OpenAI response message or content is empty.") return None else: logging.error(f"Task {task_id}: Invalid response structure from OpenAI: {response}") return None except RateLimitError as e: logging.error(f"Task {task_id}: OpenAI API rate limit exceeded: {e}. Consider adding retries or slowing down.") # Implement retry logic here if needed return None except APIError as e: logging.error(f"Task {task_id}: OpenAI API returned an error: {e}") return None except Exception as e: # Catch other potential errors (network issues, etc.) logging.error(f"Task {task_id}: An unexpected error occurred during OpenAI API call: {e}") return None def extract_final_output(response_text, task_id): """Extracts the JSON block marked with ```json ... ``` from the API response.""" if not response_text: logging.warning(f"Task {task_id}: Cannot extract JSON from empty response.") return {"error": "Empty API response"} # More robust regex to handle potential variations in ```json tag match = re.search(r'```(?:json)?\s*([\s\S]+?)\s*```', response_text, re.IGNORECASE | re.DOTALL) if match: json_str = match.group(1).strip() try: # Attempt to load the extracted string as JSON parsed_json = json.loads(json_str) logging.info(f"Task {task_id}: Successfully extracted and parsed final output JSON.") return parsed_json except json.JSONDecodeError as e: logging.error(f"Task {task_id}: Error decoding extracted JSON: {e}") logging.debug(f"Task {task_id}: Raw extracted string: {json_str}") # Return the raw string along with the error for debugging return {"error": f"Failed to decode JSON: {e}", "raw_json_string": json_str} else: logging.warning(f"Task {task_id}: Could not find ```json block in the response.") # Optionally return the full response if no JSON block is found return {"error": "JSON block not found in response", "full_response": response_text} # --- Main Logic --- def main(): """Main function to process all tasks.""" logging.info("Starting summarization process...") # No need to load fixed sections anymore, they are global constants if not os.path.isdir(TRACE_DIR): logging.error(f"Trace directory not found: {TRACE_DIR}. Exiting.") return trace_files = glob.glob(os.path.join(TRACE_DIR, "*.trace.zip")) if not trace_files: logging.warning(f"No *.trace.zip files found in {TRACE_DIR}. Nothing to process.") return logging.info(f"Found {len(trace_files)} trace files to process.") for trace_file in trace_files: filename = os.path.basename(trace_file) # Regex to capture task ID reliably match = re.match(r"(\d+)\.trace\.zip", filename) if not match: logging.warning(f"Skipping file with unexpected format: {filename}") continue task_id = match.group(1) logging.info(f"--- Processing Task ID: {task_id} ---") # Define the output path early output_filepath = os.path.join(SUMMARY_DIR, f"{task_id}.json") # Skip if output already exists? Optional: add a flag to overwrite # if os.path.exists(output_filepath): # logging.info(f"Task {task_id}: Output file already exists. Skipping. ({output_filepath})") # continue # 1. Get Task Intent task_intent = get_task_intent(task_id) if "not found" in task_intent: # Handle specific error messages logging.error(f"Task {task_id}: Critical data missing (Intent). Skipping task.") continue logging.info(f"Task {task_id}: Task Intent loaded.") # 2. Get Annotation Description annotation_desc = get_annotation_description(task_id) if "not found" in annotation_desc: logging.warning(f"Task {task_id}: Annotation description missing or empty.") # Decide if you want to continue without annotations or skip # continue # Uncomment to skip if annotations are mandatory logging.info(f"Task {task_id}: Annotation Description loaded.") # 3. Get Playwright Actions playwright_actions = get_playwright_actions(task_id) if "not found" in playwright_actions: logging.error(f"Task {task_id}: Critical data missing (Playwright Actions). Skipping task.") continue logging.info(f"Task {task_id}: Playwright Actions loaded.") # 4. Construct Prompt prompt = construct_prompt(task_id, task_intent, annotation_desc, playwright_actions) if not prompt: logging.error(f"Task {task_id}: Failed to construct prompt. Skipping task.") continue logging.info(f"Task {task_id}: Prompt constructed successfully.") # For debugging: save the prompt prompt_debug_file = os.path.join(SUMMARY_DIR, f"{task_id}_prompt_debug.txt") try: with open(prompt_debug_file, 'w', encoding='utf-8') as pf: pf.write(prompt) except Exception as e: logging.warning(f"Could not write debug prompt file: {e}") # 5. Call OpenAI API api_response_content = call_openai_api(prompt, task_id) if not api_response_content: logging.error(f"Task {task_id}: Failed to get response from OpenAI API. Skipping task.") # Optionally save partial results or error info error_data = { "task_id": task_id, "error": "Failed to get response from OpenAI API", "task_intent": task_intent, "prompt": prompt # Save prompt for debugging failed API calls } error_filepath = os.path.join(SUMMARY_DIR, f"{task_id}_error.json") try: with open(error_filepath, 'w', encoding='utf-8') as f: json.dump(error_data, f, ensure_ascii=False, indent=2) except Exception as e: logging.error(f"Task {task_id}: Could not save error file: {e}") continue # Skip to the next task logging.info(f"Task {task_id}: Received API response.") # 6. Extract Final Output JSON final_output = extract_final_output(api_response_content, task_id) if isinstance(final_output, dict) and "error" in final_output: logging.warning(f"Task {task_id}: Could not extract valid final output JSON from response.") # The final_output dict contains error details, it will be saved as is. # 7. Save Results output_data = { "task_intent": task_intent, "prompt": prompt, "response": api_response_content, # Save the raw string response "final_output": final_output # Save the parsed JSON or error dict } try: with open(output_filepath, 'w', encoding='utf-8') as f: json.dump(output_data, f, ensure_ascii=False, indent=2) logging.info(f"Task {task_id}: Successfully saved results to {output_filepath}") except Exception as e: logging.error(f"Task {task_id}: Error saving results to {output_filepath}: {e}") logging.info("--- Summarization process finished ---") if __name__ == "__main__": main()