""" This script processes the trace files and extracts the dom_content, screenshots and other information. """ import argparse import asyncio import base64 import collections import glob import json import os import re from playwright.async_api import async_playwright # Playwright Trace Viewer (https://playwright.dev/python/docs/trace-viewer) opens the recorded trace file in a browser. # You need to first serve the downloaded trace files via HTTP. k_http_base = "http://localhost:8123" # Change this to your http file service address k_trace_url = "https://trace.playwright.dev/?trace={}/{}" # Sometime the process will fail due to timeout or other reasons. We retry for a few times. If the process still fails, you can rerun without headless mode. k_retry = 3 async def process_trace(trace_file, page): trace_url = k_trace_url.format(k_http_base, trace_file) print(f"Processing {trace_url}...") await page.goto(trace_url) await page.wait_for_timeout(10000) # await page.screenshot(path="screenshot.png", full_page=True) # print("screenshot taken") processed_annotation = [] processed_snapshots = [] processed_screenshots = [] action_mapping = collections.defaultdict(list) action_repr_mapping = collections.defaultdict() action_idx_mapping = collections.defaultdict() action_uids = [] await page.locator(".action-title").first.wait_for(timeout=10000) for idx in range(await page.locator(".action-title").count()): action = page.locator(".action-title").nth(idx) action_repr = await action.text_content() print(f"action_repr: {action_repr}") # 为每个动作生成唯一ID,使用索引作为后备方案 action_uid = f"action_{idx}" # 尝试从不同模式的动作中提取更有意义的ID if action_repr.startswith("Keyboard.type"): # 从键盘输入动作中提取信息 keyboard_match = re.search(r"Keyboard\.type\(\"(.+?)\"\)", action_repr) if keyboard_match: action_uid = f"keyboard_{keyboard_match.group(1)}" elif "get_by_test_id" in action_repr: # 原有的提取测试ID的逻辑 test_id_match = re.findall(r"get_by_test_id\(\"(.+?)\"\)", action_repr) if test_id_match: action_uid = test_id_match[0] elif "get_by_role" in action_repr: # 提取基于角色的选择器 role_match = re.search(r"get_by_role\(\"(.+?)\", name=\"(.+?)\"", action_repr) if role_match: action_uid = f"{role_match.group(1)}_{role_match.group(2)}" elif "get_by_label" in action_repr: # 提取基于标签的选择器 label_match = re.search(r"get_by_label\(\"(.+?)\"\)", action_repr) if label_match: action_uid = f"label_{label_match.group(1)}" elif "get_by_text" in action_repr: # 提取基于文本的选择器 text_match = re.search(r"get_by_text\(\"(.+?)\"\)", action_repr) if text_match: action_uid = f"text_{text_match.group(1)}" # 只跳过非必要的定位器操作 if action_repr.startswith("Locator.count") or action_repr.startswith("Locator.all"): continue # 记录所有动作 if action_uid not in action_mapping: action_uids.append(action_uid) action_mapping[action_uid].append(action) action_repr_mapping[action_uid] = action_repr action_idx_mapping[action_uid] = idx # print(f"action_uid: {action_uid}") for action_uid in action_uids: error = [] action_seq = action_mapping[action_uid] await action_seq[0].click() print('\n') await page.locator('div.tabbed-pane-tab-label').get_by_text("Action", exact=True).click() await page.locator('div.tabbed-pane-tab-label:text("Before")').click() # path = f'output/screenshots/{action_uid}_before.png' # await page.screenshot(path=path, full_page=True) locator = page.locator('div.browser-frame-address-bar') before_url = await locator.text_content() print(f'before url {before_url}') action_repr = action_repr_mapping[action_uid] print(f'action_repr: {action_repr}') await page.locator('div.tabbed-pane-tab-label:text("After")').click() # path = f'output/screenshots/{action_uid}_after.png' # await page.screenshot(path=path, full_page=True) locator = page.locator('div.browser-frame-address-bar') after_url = await locator.text_content() print(f'after url {after_url}') action_idx = action_idx_mapping[action_uid] processed_annotation.append( { "action_uid": action_uid, "idx": action_idx, "action_repr": action_repr, "before": { "url": before_url, }, "after": { "url": after_url, } } ) print(f"{len(processed_annotation)} actions found.") return processed_annotation, processed_snapshots, processed_screenshots async def main(trace_files, args): async with async_playwright() as p: # print(f"Launching browser, trace_files: {trace_files}, args: {args}") p.selectors.set_test_id_attribute("data-pw-testid-buckeye") browser = await p.chromium.launch(headless=True) context = await browser.new_context(viewport={"width": 1280, "height": 1080}) for trace_file in trace_files: success = False for _ in range(1): page = await context.new_page() try: ( processed_annotation, processed_snapshots, processed_screenshots, ) = await process_trace(trace_file, page) output_dir = os.path.join(args.output_dir) if not os.path.exists(output_dir): os.makedirs(output_dir) filename = trace_file.split('/')[-1] with open( os.path.join( output_dir, f"{filename}.content.json", ), "w", ) as f: json.dump(processed_annotation, f) success = True except Exception as e: print(e) print("Retrying...") await page.close() if success: break if not success: print(f"Failed to process {trace_file}") await browser.close() if __name__ == "__main__": args = argparse.ArgumentParser() args.add_argument("--input_pattern", type=str, required=True) args.add_argument("--output_dir", type=str, required=True) args = args.parse_args() trace_files = [] for trace_file in glob.glob(args.input_pattern): filename = trace_file.split('/')[-1] output_dir = os.path.join(args.output_dir) path = os.path.join( output_dir, f"{filename}.content.json", ) # print(f"path: {path}") if not os.path.exists(path): trace_files.append(trace_file) print(f"total trace number {len(trace_files)}") asyncio.run(main(trace_files, args))