trace_synthesis/trace_action_extract_url.py
2025-04-15 22:44:08 +08:00

202 lines
7.5 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
This script processes the trace files and extracts the dom_content, screenshots and other information.
"""
import argparse
import asyncio
import base64
import collections
import glob
import json
import os
import re
from playwright.async_api import async_playwright
# Playwright Trace Viewer (https://playwright.dev/python/docs/trace-viewer) opens the recorded trace file in a browser.
# You need to first serve the downloaded trace files via HTTP.
k_http_base = "http://localhost:8123" # Change this to your http file service address
k_trace_url = "https://trace.playwright.dev/?trace={}/{}"
# Sometime the process will fail due to timeout or other reasons. We retry for a few times. If the process still fails, you can rerun without headless mode.
k_retry = 3
async def process_trace(trace_file, page):
trace_url = k_trace_url.format(k_http_base, trace_file)
print(f"Processing {trace_url}...")
await page.goto(trace_url)
await page.wait_for_timeout(10000)
# await page.screenshot(path="screenshot.png", full_page=True)
# print("screenshot taken")
processed_annotation = []
processed_snapshots = []
processed_screenshots = []
action_mapping = collections.defaultdict(list)
action_repr_mapping = collections.defaultdict()
action_idx_mapping = collections.defaultdict()
action_uids = []
await page.locator(".action-title").first.wait_for(timeout=10000)
for idx in range(await page.locator(".action-title").count()):
action = page.locator(".action-title").nth(idx)
action_repr = await action.text_content()
print(f"action_repr: {action_repr}")
# 为每个动作生成唯一ID使用索引作为后备方案
action_uid = f"action_{idx}"
# 尝试从不同模式的动作中提取更有意义的ID
if action_repr.startswith("Keyboard.type"):
# 从键盘输入动作中提取信息
keyboard_match = re.search(r"Keyboard\.type\(\"(.+?)\"\)", action_repr)
if keyboard_match:
action_uid = f"keyboard_{keyboard_match.group(1)}"
elif "get_by_test_id" in action_repr:
# 原有的提取测试ID的逻辑
test_id_match = re.findall(r"get_by_test_id\(\"(.+?)\"\)", action_repr)
if test_id_match:
action_uid = test_id_match[0]
elif "get_by_role" in action_repr:
# 提取基于角色的选择器
role_match = re.search(r"get_by_role\(\"(.+?)\", name=\"(.+?)\"", action_repr)
if role_match:
action_uid = f"{role_match.group(1)}_{role_match.group(2)}"
elif "get_by_label" in action_repr:
# 提取基于标签的选择器
label_match = re.search(r"get_by_label\(\"(.+?)\"\)", action_repr)
if label_match:
action_uid = f"label_{label_match.group(1)}"
elif "get_by_text" in action_repr:
# 提取基于文本的选择器
text_match = re.search(r"get_by_text\(\"(.+?)\"\)", action_repr)
if text_match:
action_uid = f"text_{text_match.group(1)}"
# 只跳过非必要的定位器操作
if action_repr.startswith("Locator.count") or action_repr.startswith("Locator.all"):
continue
# 记录所有动作
if action_uid not in action_mapping:
action_uids.append(action_uid)
action_mapping[action_uid].append(action)
action_repr_mapping[action_uid] = action_repr
action_idx_mapping[action_uid] = idx
# print(f"action_uid: {action_uid}")
for action_uid in action_uids:
error = []
action_seq = action_mapping[action_uid]
await action_seq[0].click()
print('\n')
await page.locator('div.tabbed-pane-tab-label').get_by_text("Action", exact=True).click()
await page.locator('div.tabbed-pane-tab-label:text("Before")').click()
# path = f'output/screenshots/{action_uid}_before.png'
# await page.screenshot(path=path, full_page=True)
locator = page.locator('div.browser-frame-address-bar')
before_url = await locator.text_content()
print(f'before url {before_url}')
action_repr = action_repr_mapping[action_uid]
print(f'action_repr: {action_repr}')
await page.locator('div.tabbed-pane-tab-label:text("After")').click()
# path = f'output/screenshots/{action_uid}_after.png'
# await page.screenshot(path=path, full_page=True)
locator = page.locator('div.browser-frame-address-bar')
after_url = await locator.text_content()
print(f'after url {after_url}')
action_idx = action_idx_mapping[action_uid]
processed_annotation.append(
{
"action_uid": action_uid,
"idx": action_idx,
"action_repr": action_repr,
"before": {
"url": before_url,
},
"after": {
"url": after_url,
}
}
)
print(f"{len(processed_annotation)} actions found.")
return processed_annotation, processed_snapshots, processed_screenshots
async def main(trace_files, args):
async with async_playwright() as p:
# print(f"Launching browser, trace_files: {trace_files}, args: {args}")
p.selectors.set_test_id_attribute("data-pw-testid-buckeye")
browser = await p.chromium.launch(headless=True)
context = await browser.new_context(viewport={"width": 1280, "height": 1080})
for trace_file in trace_files:
success = False
for _ in range(1):
page = await context.new_page()
try:
(
processed_annotation,
processed_snapshots,
processed_screenshots,
) = await process_trace(trace_file, page)
output_dir = os.path.join(args.output_dir)
if not os.path.exists(output_dir):
os.makedirs(output_dir)
filename = trace_file.split('/')[-1]
with open(
os.path.join(
output_dir,
f"{filename}.content.json",
),
"w",
) as f:
json.dump(processed_annotation, f)
success = True
except Exception as e:
print(e)
print("Retrying...")
await page.close()
if success:
break
if not success:
print(f"Failed to process {trace_file}")
await browser.close()
if __name__ == "__main__":
args = argparse.ArgumentParser()
args.add_argument("--input_pattern", type=str, required=True)
args.add_argument("--output_dir", type=str, required=True)
args = args.parse_args()
trace_files = []
for trace_file in glob.glob(args.input_pattern):
filename = trace_file.split('/')[-1]
output_dir = os.path.join(args.output_dir)
path = os.path.join(
output_dir,
f"{filename}.content.json",
)
# print(f"path: {path}")
if not os.path.exists(path):
trace_files.append(trace_file)
print(f"total trace number {len(trace_files)}")
asyncio.run(main(trace_files, args))