commit e9f495fbb3c143c5d8e415868e693820fc1fc777 Author: yuyr Date: Tue Apr 15 22:44:08 2025 +0800 init diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..25ec711 --- /dev/null +++ b/.gitignore @@ -0,0 +1,9 @@ +.env + +trace/*.zip + +trace_extract/* + +video/* + +video.zip \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..5802104 --- /dev/null +++ b/README.md @@ -0,0 +1,15 @@ + +# 文件结构 +- video_gen.py:使用ffmpeg 将trace中的jpeg 合成视频webm,按30秒分片; +- qwen_vl_process.py: 使用qwen-vl-max-latest,去解读视频中的动作; +- trace_server.py:本地http 服务器,用来给playwright网页应用来加载trace.zip文件 +- trace_action_extract_url.py:通过playwright去读取trace.zip文件,保存pw动作列表,以及每个动作前后的url。 +- trace: 存放webarena 项目提供的179个人类操作轨迹zip +- video: 存放每个trace转成视频的文件 +- trace_extract:存放从trace文件提取的动作、url信息 + +# TODO +- 最终综合:调用r1对视频解读以及动作/url列表信息进行综合分析,通过推理还原出strategy和low level action; +- + +# 存在问题 \ No newline at end of file diff --git a/qwen_vl_process.py b/qwen_vl_process.py new file mode 100644 index 0000000..604be4b --- /dev/null +++ b/qwen_vl_process.py @@ -0,0 +1,111 @@ +from openai import OpenAI +import os +import base64 + + +# Base64 编码格式 +def encode_video(video_path): + try: + with open(video_path, "rb") as video_file: + return base64.b64encode(video_file.read()).decode("utf-8") + except FileNotFoundError: + print(f"错误:找不到视频文件 {video_path}") + return None + except Exception as e: + print(f"编码视频 {video_path} 时出错: {e}") + return None + +# --- 主处理逻辑 --- +VIDEO_DIR = "video" # 视频文件所在的根目录 +SUPPORTED_EXTENSIONS = ('.mp4', '.webm') # 支持的视频文件扩展名 + +# --- OpenAI 客户端初始化 --- +# 请确保 API Key 和 Base URL 配置正确 +try: + client = OpenAI( + # 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx" + api_key="sk-5d90a63c1e784e8f801dee65add68867", # 请替换为你的 API Key + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", + ) +except Exception as e: + print(f"初始化 OpenAI 客户端时出错: {e}") + exit() # 如果客户端初始化失败,则退出脚本 + +# --- 通用 Prompt --- +prompt = """ +You are an annotator tasked with carefully observing and faithfully documenting the exact actions shown in a provided video segment. Your role is to describe, step-by-step, the precise actions I am performing on the website, strictly using the first-person perspective (e.g., "I click...", "I select..."). + +For each action, clearly include: +- Action: Clearly state the specific buttons, links, menus, text boxes, or other UI elements I interact with. +- Page Changes: Accurately describe any changes or responses on the webpage resulting from my actions. +- Possible Purpose: Provide an objective and detailed hypothesis about the likely intent or purpose of each action, without subjective interpretations or assumptions beyond what is directly observable. + +Important Note: +The video is split into 4 parts; this request specifically refers ONLY to this Part. Do NOT describe any content from other parts or add any additional context or speculation. Your description must strictly reflect only what you observe in this video segment. +""" + +# --- 遍历目录并处理视频 --- +if not os.path.isdir(VIDEO_DIR): + print(f"错误:视频目录 '{VIDEO_DIR}' 不存在或不是一个目录。") +else: + print(f"开始处理目录 '{VIDEO_DIR}' 中的视频...") + for root, dirs, files in os.walk(VIDEO_DIR): + print(f"正在检查目录: {root}") + for filename in files: + if filename.lower().endswith(SUPPORTED_EXTENSIONS): + video_path = os.path.join(root, filename) + output_filename = os.path.splitext(filename)[0] + ".txt" + output_path = os.path.join(root, output_filename) + + print(f" 发现视频文件: {video_path}") + + # 检查输出文件是否已存在 + if os.path.exists(output_path): + print(f" 跳过: 输出文件 '{output_path}' 已存在。") + continue + + print(f" 正在处理: {video_path}") + # Base64 编码 + base64_video = encode_video(video_path) + if base64_video is None: + continue # 编码失败,跳过此视频 + + # 调用 OpenAI API + try: + completion = client.chat.completions.create( + model="qwen-vl-max-latest", + messages=[ + { + "role": "system", + "content": [{"type":"text","text": "You are a helpful assistant."}]}, + { + "role": "user", + "content": [ + { + "type": "video_url", + "video_url": {"url": f"data:video/mp4;base64,{base64_video}"}, # 注意:这里假设所有视频都用 mp4 mime type,如果主要是 webm,可以改成 video/webm + }, + {"type": "text", "text": prompt}, + ], + } + ], + # 可以根据需要调整 temperature 等参数 + # temperature=0.7, + ) + result_text = completion.choices[0].message.content + + # 保存结果到 txt 文件 + try: + with open(output_path, "w", encoding="utf-8") as f: + f.write(result_text) + print(f" 成功: 结果已保存到 '{output_path}'") + except IOError as e: + print(f" 错误: 无法写入输出文件 '{output_path}': {e}") + except Exception as e: + print(f" 保存文件 '{output_path}' 时发生未知错误: {e}") + + + except Exception as e: + print(f" 错误: 调用 API 处理 '{video_path}' 时失败: {e}") + + print("所有视频处理完成。") diff --git a/toy/ali_vl.py b/toy/ali_vl.py new file mode 100644 index 0000000..d2b83a3 --- /dev/null +++ b/toy/ali_vl.py @@ -0,0 +1,56 @@ +from openai import OpenAI +import os +import base64 +from dotenv import load_dotenv + +load_dotenv() + +# Base64 编码格式 +def encode_video(video_path): + with open(video_path, "rb") as video_file: + return base64.b64encode(video_file.read()).decode("utf-8") + +# 将xxxx/test.mp4替换为你本地视频的绝对路径 +base64_video = encode_video("235.trace_recording.webm") +# base64_video = encode_video("50221078283.mp4") +prompt = """ +You are an annotator tasked with carefully observing and faithfully documenting the exact actions shown in a provided video segment. Your role is to describe, step-by-step, the precise actions I am performing on the website, strictly using the first-person perspective (e.g., "I click...", "I select..."). + +For each action, clearly include: +- Action: Clearly state the specific buttons, links, menus, text boxes, or other UI elements I interact with. +- Page Changes: Accurately describe any changes or responses on the webpage resulting from my actions. +- Possible Purpose: Provide an objective and detailed hypothesis about the likely intent or purpose of each action, without subjective interpretations or assumptions beyond what is directly observable. + +Important Note: +The video is split into 4 parts; this request specifically refers ONLY to this Part. Do NOT describe any content from other parts or add any additional context or speculation. Your description must strictly reflect only what you observe in this video segment. +""" + + +client = OpenAI( + # 若没有配置环境变量,请用百炼API Key将下行替换为:api_key="sk-xxx" + api_key=os.environ["OPENAI_API_KEY"] + base_url="https://dashscope.aliyuncs.com/compatible-mode/v1", +) +completion = client.chat.completions.create( + model="qwen-vl-max-latest", + # model="qwen2.5-vl-72b-instruct", + messages=[ + { + "role": "system", + "content": [{"type":"text","text": "You are a helpful assistant."}]}, + { + "role": "user", + "content": [ + { + # 直接传入视频文件时,请将type的值设置为video_url + "type": "video_url", + "video_url": {"url": f"data:video/mp4;base64,{base64_video}"}, + }, + {"type": "text", "text": prompt}, + ], + } + ], +) +print(completion.choices[0].message.content) + +# print(completion) diff --git a/trace_action_extract_url.py b/trace_action_extract_url.py new file mode 100644 index 0000000..4fe3f9e --- /dev/null +++ b/trace_action_extract_url.py @@ -0,0 +1,201 @@ +""" +This script processes the trace files and extracts the dom_content, screenshots and other information. +""" + +import argparse +import asyncio +import base64 +import collections +import glob +import json +import os +import re + +from playwright.async_api import async_playwright + +# Playwright Trace Viewer (https://playwright.dev/python/docs/trace-viewer) opens the recorded trace file in a browser. +# You need to first serve the downloaded trace files via HTTP. +k_http_base = "http://localhost:8123" # Change this to your http file service address +k_trace_url = "https://trace.playwright.dev/?trace={}/{}" + +# Sometime the process will fail due to timeout or other reasons. We retry for a few times. If the process still fails, you can rerun without headless mode. +k_retry = 3 + + +async def process_trace(trace_file, page): + trace_url = k_trace_url.format(k_http_base, trace_file) + print(f"Processing {trace_url}...") + await page.goto(trace_url) + + await page.wait_for_timeout(10000) + + # await page.screenshot(path="screenshot.png", full_page=True) + # print("screenshot taken") + processed_annotation = [] + processed_snapshots = [] + processed_screenshots = [] + action_mapping = collections.defaultdict(list) + action_repr_mapping = collections.defaultdict() + action_idx_mapping = collections.defaultdict() + + action_uids = [] + await page.locator(".action-title").first.wait_for(timeout=10000) + for idx in range(await page.locator(".action-title").count()): + action = page.locator(".action-title").nth(idx) + action_repr = await action.text_content() + print(f"action_repr: {action_repr}") + + # 为每个动作生成唯一ID,使用索引作为后备方案 + action_uid = f"action_{idx}" + + # 尝试从不同模式的动作中提取更有意义的ID + if action_repr.startswith("Keyboard.type"): + # 从键盘输入动作中提取信息 + keyboard_match = re.search(r"Keyboard\.type\(\"(.+?)\"\)", action_repr) + if keyboard_match: + action_uid = f"keyboard_{keyboard_match.group(1)}" + elif "get_by_test_id" in action_repr: + # 原有的提取测试ID的逻辑 + test_id_match = re.findall(r"get_by_test_id\(\"(.+?)\"\)", action_repr) + if test_id_match: + action_uid = test_id_match[0] + elif "get_by_role" in action_repr: + # 提取基于角色的选择器 + role_match = re.search(r"get_by_role\(\"(.+?)\", name=\"(.+?)\"", action_repr) + if role_match: + action_uid = f"{role_match.group(1)}_{role_match.group(2)}" + elif "get_by_label" in action_repr: + # 提取基于标签的选择器 + label_match = re.search(r"get_by_label\(\"(.+?)\"\)", action_repr) + if label_match: + action_uid = f"label_{label_match.group(1)}" + elif "get_by_text" in action_repr: + # 提取基于文本的选择器 + text_match = re.search(r"get_by_text\(\"(.+?)\"\)", action_repr) + if text_match: + action_uid = f"text_{text_match.group(1)}" + + # 只跳过非必要的定位器操作 + if action_repr.startswith("Locator.count") or action_repr.startswith("Locator.all"): + continue + + # 记录所有动作 + if action_uid not in action_mapping: + action_uids.append(action_uid) + action_mapping[action_uid].append(action) + action_repr_mapping[action_uid] = action_repr + action_idx_mapping[action_uid] = idx + + # print(f"action_uid: {action_uid}") + + for action_uid in action_uids: + error = [] + action_seq = action_mapping[action_uid] + await action_seq[0].click() + + print('\n') + + await page.locator('div.tabbed-pane-tab-label').get_by_text("Action", exact=True).click() + + await page.locator('div.tabbed-pane-tab-label:text("Before")').click() + + # path = f'output/screenshots/{action_uid}_before.png' + # await page.screenshot(path=path, full_page=True) + + locator = page.locator('div.browser-frame-address-bar') + before_url = await locator.text_content() + print(f'before url {before_url}') + + action_repr = action_repr_mapping[action_uid] + print(f'action_repr: {action_repr}') + + await page.locator('div.tabbed-pane-tab-label:text("After")').click() + + # path = f'output/screenshots/{action_uid}_after.png' + # await page.screenshot(path=path, full_page=True) + + locator = page.locator('div.browser-frame-address-bar') + after_url = await locator.text_content() + print(f'after url {after_url}') + + action_idx = action_idx_mapping[action_uid] + + processed_annotation.append( + { + "action_uid": action_uid, + "idx": action_idx, + "action_repr": action_repr, + "before": { + "url": before_url, + }, + "after": { + "url": after_url, + } + } + ) + + print(f"{len(processed_annotation)} actions found.") + return processed_annotation, processed_snapshots, processed_screenshots + + +async def main(trace_files, args): + async with async_playwright() as p: + # print(f"Launching browser, trace_files: {trace_files}, args: {args}") + p.selectors.set_test_id_attribute("data-pw-testid-buckeye") + browser = await p.chromium.launch(headless=True) + context = await browser.new_context(viewport={"width": 1280, "height": 1080}) + for trace_file in trace_files: + success = False + for _ in range(1): + page = await context.new_page() + try: + ( + processed_annotation, + processed_snapshots, + processed_screenshots, + ) = await process_trace(trace_file, page) + output_dir = os.path.join(args.output_dir) + if not os.path.exists(output_dir): + os.makedirs(output_dir) + + filename = trace_file.split('/')[-1] + with open( + os.path.join( + output_dir, + f"{filename}.content.json", + ), + "w", + ) as f: + json.dump(processed_annotation, f) + success = True + except Exception as e: + print(e) + print("Retrying...") + await page.close() + if success: + break + if not success: + print(f"Failed to process {trace_file}") + await browser.close() + + +if __name__ == "__main__": + args = argparse.ArgumentParser() + args.add_argument("--input_pattern", type=str, required=True) + args.add_argument("--output_dir", type=str, required=True) + args = args.parse_args() + + trace_files = [] + for trace_file in glob.glob(args.input_pattern): + filename = trace_file.split('/')[-1] + output_dir = os.path.join(args.output_dir) + path = os.path.join( + output_dir, + f"{filename}.content.json", + ) + # print(f"path: {path}") + if not os.path.exists(path): + trace_files.append(trace_file) + + print(f"total trace number {len(trace_files)}") + asyncio.run(main(trace_files, args)) diff --git a/trace_server.py b/trace_server.py new file mode 100644 index 0000000..ffc9920 --- /dev/null +++ b/trace_server.py @@ -0,0 +1,19 @@ +from http.server import SimpleHTTPRequestHandler, HTTPServer + +class CORSRequestHandler(SimpleHTTPRequestHandler): + def end_headers(self): + self.send_header('Access-Control-Allow-Origin', '*') # 允许所有来源 + self.send_header('Access-Control-Allow-Methods', 'GET, POST, OPTIONS') + self.send_header('Access-Control-Allow-Headers', 'Content-Type') + super().end_headers() + + def do_OPTIONS(self): + self.send_response(200, "ok") + self.end_headers() + +if __name__ == '__main__': + port = 8123 + httpd = HTTPServer(('0.0.0.0', port), CORSRequestHandler) + print(f"Serving HTTP with CORS on port {port}") + httpd.serve_forever() + diff --git a/video_gen.py b/video_gen.py new file mode 100644 index 0000000..004fdbc --- /dev/null +++ b/video_gen.py @@ -0,0 +1,228 @@ +import os +import glob +import re +import subprocess +import sys +import zipfile +import shutil + +def extract_timestamp(filename): + """Extracts the float timestamp from the filename.""" + # Use regex for robustness, matching the pattern before '.jpeg' + # It looks for one or more digits, optionally followed by a dot and more digits, + # right before the .jpeg extension, preceded by a hyphen. + match = re.search(r'-(\d+(\.\d+)?)\.jpeg$', filename) + if match: + try: + return float(match.group(1)) + except ValueError: + return None + return None + +def create_video_from_images(image_folder, output_video_file, default_last_frame_duration=0.1, max_duration=5.0): + """ + Creates a WebM video from timestamped JPEG images in a folder. + + Args: + image_folder (str): Path to the folder containing JPEG images. + output_video_file (str): Path for the output WebM video file. + default_last_frame_duration (float): Duration (in seconds) to display the last frame. + max_duration (float): Maximum duration (in seconds) for a single output video. + If total duration exceeds this, multiple videos will be created. + """ + print(f"Scanning folder: {image_folder}") + search_pattern = os.path.join(image_folder, 'page@*.jpeg') + image_files = glob.glob(search_pattern) + + if not image_files: + print(f"Error: No JPEG files matching pattern '{search_pattern}' found.") + return + + print(f"Found {len(image_files)} matching image files.") + + # Extract timestamps and store as (timestamp, full_path) tuples + timed_files = [] + for img_path in image_files: + timestamp = extract_timestamp(os.path.basename(img_path)) + if timestamp is not None: + timed_files.append((timestamp, img_path)) + else: + print(f"Warning: Could not extract timestamp from {os.path.basename(img_path)}. Skipping.") + + if not timed_files: + print("Error: No files with valid timestamps found.") + return + + # Sort files chronologically based on timestamp + timed_files.sort() + + print(f"Processing {len(timed_files)} files with valid timestamps.") + + # Split into segments based on max_duration + segments = [] + current_segment = [] + current_segment_duration = 0.0 + + for i in range(len(timed_files)): + timestamp, img_path = timed_files[i] + + # Calculate duration for this frame + if i < len(timed_files) - 1: + next_timestamp, _ = timed_files[i+1] + duration = next_timestamp - timestamp + # Prevent zero or negative durations + if duration <= 0: + duration = 0.01 + else: + # Duration for the last frame + duration = default_last_frame_duration + + # Check if adding this frame would exceed max_duration + if current_segment_duration + duration > max_duration and current_segment: + # Current segment is full, start a new one + segments.append(current_segment) + current_segment = [(timestamp, img_path, duration)] + current_segment_duration = duration + else: + # Add to current segment + current_segment.append((timestamp, img_path, duration)) + current_segment_duration += duration + + # Add the last segment if not empty + if current_segment: + segments.append(current_segment) + + print(f"Split into {len(segments)} segments (max duration: {max_duration} seconds)") + + # Process each segment + for segment_index, segment in enumerate(segments): + # Generate output filename + if len(segments) > 1: + # Extract base name and extension + base_name, extension = os.path.splitext(output_video_file) + segment_output_file = f"{base_name}_part{segment_index+1}{extension}" + else: + segment_output_file = output_video_file + + print(f"\nProcessing segment {segment_index+1}/{len(segments)} -> {segment_output_file}") + + # Create FFmpeg input file for this segment + ffmpeg_input_file = f"ffmpeg_input_segment_{segment_index+1}.txt" + try: + with open(ffmpeg_input_file, 'w', encoding='utf-8') as f: + f.write("ffconcat version 1.0\n") + + for timestamp, img_path, duration in segment: + abs_img_path = os.path.abspath(img_path) + safe_img_path = abs_img_path.replace("'", "'\\''") + + f.write(f"file '{safe_img_path}'\n") + f.write(f"duration {duration:.6f}\n") + + # Add the last frame again for final duration to apply + _, last_img_path, _ = segment[-1] + abs_last_img_path = os.path.abspath(last_img_path) + safe_last_img_path = abs_last_img_path.replace("'", "'\\''") + f.write(f"file '{safe_last_img_path}'\n") + + # Run FFmpeg command + ffmpeg_command = [ + 'ffmpeg', + '-f', 'concat', + '-safe', '0', + '-i', ffmpeg_input_file, + '-c:v', 'libvpx-vp9', + '-crf', '30', + '-b:v', '0', + '-pix_fmt', 'yuv420p', + '-y', + segment_output_file + ] + + print("\nRunning FFmpeg command:") + print(" ".join(f"'{arg}'" if " " in arg else arg for arg in ffmpeg_command)) + + try: + result = subprocess.run(ffmpeg_command, check=True, capture_output=True, text=True, encoding='utf-8') + print("\nFFmpeg output:") + print(result.stdout) + print(f"Segment {segment_index+1} finished successfully!") + print(f"Output video saved to: {segment_output_file}") + + except subprocess.CalledProcessError as e: + print(f"\nError running FFmpeg for segment {segment_index+1}!") + print(f"Return code: {e.returncode}") + print("FFmpeg stdout:") + print(e.stdout) + print("FFmpeg stderr:") + print(e.stderr) + except FileNotFoundError: + print("\nError: 'ffmpeg' command not found. Make sure FFmpeg is installed and in your system's PATH.") + + finally: + # Clean up the temporary input file + if os.path.exists(ffmpeg_input_file): + os.remove(ffmpeg_input_file) + print(f"Cleaned up temporary file: {ffmpeg_input_file}") + +def process_zip_files(): + """ + Processes all zip files in the current directory, extracts them, + and creates videos from the extracted images. + """ + # 获取当前目录下所有的zip文件 + zip_files = glob.glob('*.trace.zip') + + if not zip_files: + print("没有找到任何.trace.zip文件") + return + + print(f"找到{len(zip_files)}个zip文件") + + # 创建video目录(如果不存在) + video_base_dir = os.path.join(os.getcwd(), 'video') + if not os.path.exists(video_base_dir): + os.makedirs(video_base_dir) + + # 处理每个zip文件 + for zip_file in zip_files: + # 获取不带.zip扩展名的文件名 + base_name = zip_file[:-4] # 移除.zip + extract_dir = os.path.join(os.getcwd(), base_name) + video_output_dir = os.path.join(video_base_dir, base_name) + + print(f"\n处理zip文件: {zip_file}") + + # 如果解压目录已存在,先删除 + if os.path.exists(extract_dir): + print(f"删除已存在的目录: {extract_dir}") + shutil.rmtree(extract_dir) + + # 创建解压目录 + os.makedirs(extract_dir) + + # 创建视频输出目录 + if not os.path.exists(video_output_dir): + os.makedirs(video_output_dir) + + # 解压文件 + try: + print(f"解压文件到: {extract_dir}") + with zipfile.ZipFile(zip_file, 'r') as zip_ref: + zip_ref.extractall(extract_dir) + + # 进入解压目录并处理图像 + print(f"进入目录: {extract_dir}") + output_video = os.path.join(video_output_dir, f"{base_name}_recording.webm") + + image_dir = extract_dir + "\\resources\\" + + # 调用视频创建函数 + create_video_from_images(image_dir, output_video, max_duration=30.0) + + except Exception as e: + print(f"处理{zip_file}时出错: {str(e)}") + +# --- 主程序入口 --- +if __name__ == "__main__": + process_zip_files() \ No newline at end of file