""" 分析多轮实验的结果中eige renwu 每个任务的平均成功率 每轮结果分别在不同的目录下,resutls/webrlvr_cms1k_easy_0, results/webrlvr_cms1k_easy_1, ... 每个目录下有一个actions目录,每个actions目录下有多个json文件, 0.json, 1.json等, 数字为任务id,每个json文件中记录了每个任务的执行结果 0.json文件内容示例: { "task_id": 0, "score": 0.0, "actions": [ "# Element: the 'Bestsellers' tab in the left sidebar menu\ndo(action=\"Click\", element=\"21\")", "# Element: the 'Most Viewed Products' link in the left sidebar under the 'Reports' section\ndo(action=\"Click\", element=\"22\")", "exit(message=\"N/A\")" ] } score 为0.0表示失败,为1.0表示成功 总共1003个任务, 从0到1002, 总共有4轮实验,分别是0, 1, 2, 3 现在要分别统计每个任务在所有轮次的成功次数,失败次数,成功率 结果保存到一个json文件中 """ import json import os import logging # Configure logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') def load_task_descriptions(): """Load task descriptions from the config file.""" config_path = "../VAB-WebArena-Lite/config_files/wa/webrlvr_cms1k_easy.json" try: with open(config_path, 'r') as f: tasks = json.load(f) return {str(task['task_id']): task['intent'] for task in tasks} except Exception as e: logging.error(f"Error loading task descriptions: {e}") return {} def analyze_results(): """ Analyzes multi-round experiment results to calculate average success rates for each task. """ logging.info("Starting analysis...") num_rounds = 4 num_tasks = 1003 base_path = "results" experiment_name = "webrlvr_cms1k_easy" output_filename = "analysis_results.json" stats_output_filename = "success_group_stats.json" # Load task descriptions task_descriptions = load_task_descriptions() task_stats = { str(i): {"success": 0, "failure": 0} for i in range(num_tasks) } for i in range(num_rounds): round_dir = os.path.join(base_path, f"{experiment_name}_{i}", "actions") if not os.path.exists(round_dir): logging.warning(f"Directory not found, skipping: {round_dir}") continue logging.info(f"Processing round {i} from {round_dir}") for task_id in range(num_tasks): file_path = os.path.join(round_dir, f"{task_id}.json") task_id_str = str(task_id) if not os.path.exists(file_path): task_stats[task_id_str]["failure"] += 1 continue try: with open(file_path, "r") as f: data = json.load(f) if data.get("score") == 1.0: task_stats[task_id_str]["success"] += 1 else: task_stats[task_id_str]["failure"] += 1 except (json.JSONDecodeError, KeyError) as e: logging.error(f"Error reading or parsing {file_path}: {e}") task_stats[task_id_str]["failure"] += 1 logging.info("Calculating success rates...") for task_id, stats in task_stats.items(): total_runs = stats["success"] + stats["failure"] if total_runs > 0: stats["success_rate"] = stats["success"] / total_runs else: stats["success_rate"] = 0.0 logging.info("Grouping tasks by success count...") success_groups = {i: [] for i in range(num_rounds + 1)} # Create dictionaries to track successful passes and their trace lengths for each task task_successful_passes = {str(i): [] for i in range(num_tasks)} task_successful_trace_lengths = {str(i): [] for i in range(num_tasks)} # First pass: collect successful passes and trace lengths for each task for i in range(num_rounds): round_dir = os.path.join(base_path, f"{experiment_name}_{i}", "actions") if not os.path.exists(round_dir): continue for task_id in range(num_tasks): file_path = os.path.join(round_dir, f"{task_id}.json") task_id_str = str(task_id) if os.path.exists(file_path): try: with open(file_path, "r") as f: data = json.load(f) if data.get("score") == 1.0: task_successful_passes[task_id_str].append(i) # Get the length of actions array for successful passes trace_length = len(data.get("actions", [])) task_successful_trace_lengths[task_id_str].append(trace_length) except (json.JSONDecodeError, KeyError): pass # Second pass: group tasks by success count for task_id, stats in task_stats.items(): success_count = stats["success"] success_groups[success_count].append(task_id) # Create detailed success group statistics success_group_stats = {} for success_count, task_ids in success_groups.items(): group_info = { "count": len(task_ids), "task_ids": task_ids } # Add task descriptions for groups with success count > 0 if success_count > 0: task_details = [] for task_id in task_ids: task_detail = { "task_id": task_id, "description": task_descriptions.get(task_id, "Description not found"), "successful_passes": task_successful_passes[task_id], "successful_trace_lengths": task_successful_trace_lengths[task_id] # Add trace lengths } task_details.append(task_detail) group_info["task_details"] = task_details success_group_stats[success_count] = group_info # Print detailed information for each success group logging.info("\n=== Success Group Statistics ===") for success_count, group_info in success_group_stats.items(): logging.info(f"\nSuccess Count {success_count}:") logging.info(f"Number of tasks: {group_info['count']}") logging.info(f"Task IDs: {group_info['task_ids']}") if success_count > 0: logging.info("Task Descriptions:") for task in group_info['task_details']: logging.info(f"Task {task['task_id']}: {task['description']}") logging.info(f"Successful passes: {task['successful_passes']}") logging.info(f"Trace lengths: {task['successful_trace_lengths']}") # Save success group statistics to separate file with open(stats_output_filename, "w") as f: json.dump(success_group_stats, f, indent=4) logging.info(f"\nSuccess group statistics saved to {stats_output_filename}") # Save original analysis results output_data = { "task_stats": task_stats, "success_groups": success_groups, "success_group_counts": {count: len(tasks) for count, tasks in success_groups.items()}, } with open(output_filename, "w") as f: json.dump(output_data, f, indent=4) logging.info(f"Analysis complete. Results saved to {output_filename}") if __name__ == "__main__": analyze_results()