202 lines
7.5 KiB
Python
202 lines
7.5 KiB
Python
"""
|
||
This script processes the trace files and extracts the dom_content, screenshots and other information.
|
||
"""
|
||
|
||
import argparse
|
||
import asyncio
|
||
import base64
|
||
import collections
|
||
import glob
|
||
import json
|
||
import os
|
||
import re
|
||
|
||
from playwright.async_api import async_playwright
|
||
|
||
# Playwright Trace Viewer (https://playwright.dev/python/docs/trace-viewer) opens the recorded trace file in a browser.
|
||
# You need to first serve the downloaded trace files via HTTP.
|
||
k_http_base = "http://localhost:8123" # Change this to your http file service address
|
||
k_trace_url = "https://trace.playwright.dev/?trace={}/{}"
|
||
|
||
# Sometime the process will fail due to timeout or other reasons. We retry for a few times. If the process still fails, you can rerun without headless mode.
|
||
k_retry = 3
|
||
|
||
|
||
async def process_trace(trace_file, page):
|
||
trace_url = k_trace_url.format(k_http_base, trace_file)
|
||
print(f"Processing {trace_url}...")
|
||
await page.goto(trace_url)
|
||
|
||
await page.wait_for_timeout(10000)
|
||
|
||
# await page.screenshot(path="screenshot.png", full_page=True)
|
||
# print("screenshot taken")
|
||
processed_annotation = []
|
||
processed_snapshots = []
|
||
processed_screenshots = []
|
||
action_mapping = collections.defaultdict(list)
|
||
action_repr_mapping = collections.defaultdict()
|
||
action_idx_mapping = collections.defaultdict()
|
||
|
||
action_uids = []
|
||
await page.locator(".action-title").first.wait_for(timeout=10000)
|
||
for idx in range(await page.locator(".action-title").count()):
|
||
action = page.locator(".action-title").nth(idx)
|
||
action_repr = await action.text_content()
|
||
print(f"action_repr: {action_repr}")
|
||
|
||
# 为每个动作生成唯一ID,使用索引作为后备方案
|
||
action_uid = f"action_{idx}"
|
||
|
||
# 尝试从不同模式的动作中提取更有意义的ID
|
||
if action_repr.startswith("Keyboard.type"):
|
||
# 从键盘输入动作中提取信息
|
||
keyboard_match = re.search(r"Keyboard\.type\(\"(.+?)\"\)", action_repr)
|
||
if keyboard_match:
|
||
action_uid = f"keyboard_{keyboard_match.group(1)}"
|
||
elif "get_by_test_id" in action_repr:
|
||
# 原有的提取测试ID的逻辑
|
||
test_id_match = re.findall(r"get_by_test_id\(\"(.+?)\"\)", action_repr)
|
||
if test_id_match:
|
||
action_uid = test_id_match[0]
|
||
elif "get_by_role" in action_repr:
|
||
# 提取基于角色的选择器
|
||
role_match = re.search(r"get_by_role\(\"(.+?)\", name=\"(.+?)\"", action_repr)
|
||
if role_match:
|
||
action_uid = f"{role_match.group(1)}_{role_match.group(2)}"
|
||
elif "get_by_label" in action_repr:
|
||
# 提取基于标签的选择器
|
||
label_match = re.search(r"get_by_label\(\"(.+?)\"\)", action_repr)
|
||
if label_match:
|
||
action_uid = f"label_{label_match.group(1)}"
|
||
elif "get_by_text" in action_repr:
|
||
# 提取基于文本的选择器
|
||
text_match = re.search(r"get_by_text\(\"(.+?)\"\)", action_repr)
|
||
if text_match:
|
||
action_uid = f"text_{text_match.group(1)}"
|
||
|
||
# 只跳过非必要的定位器操作
|
||
if action_repr.startswith("Locator.count") or action_repr.startswith("Locator.all"):
|
||
continue
|
||
|
||
# 记录所有动作
|
||
if action_uid not in action_mapping:
|
||
action_uids.append(action_uid)
|
||
action_mapping[action_uid].append(action)
|
||
action_repr_mapping[action_uid] = action_repr
|
||
action_idx_mapping[action_uid] = idx
|
||
|
||
# print(f"action_uid: {action_uid}")
|
||
|
||
for action_uid in action_uids:
|
||
error = []
|
||
action_seq = action_mapping[action_uid]
|
||
await action_seq[0].click()
|
||
|
||
print('\n')
|
||
|
||
await page.locator('div.tabbed-pane-tab-label').get_by_text("Action", exact=True).click()
|
||
|
||
await page.locator('div.tabbed-pane-tab-label:text("Before")').click()
|
||
|
||
# path = f'output/screenshots/{action_uid}_before.png'
|
||
# await page.screenshot(path=path, full_page=True)
|
||
|
||
locator = page.locator('div.browser-frame-address-bar')
|
||
before_url = await locator.text_content()
|
||
print(f'before url {before_url}')
|
||
|
||
action_repr = action_repr_mapping[action_uid]
|
||
print(f'action_repr: {action_repr}')
|
||
|
||
await page.locator('div.tabbed-pane-tab-label:text("After")').click()
|
||
|
||
# path = f'output/screenshots/{action_uid}_after.png'
|
||
# await page.screenshot(path=path, full_page=True)
|
||
|
||
locator = page.locator('div.browser-frame-address-bar')
|
||
after_url = await locator.text_content()
|
||
print(f'after url {after_url}')
|
||
|
||
action_idx = action_idx_mapping[action_uid]
|
||
|
||
processed_annotation.append(
|
||
{
|
||
"action_uid": action_uid,
|
||
"idx": action_idx,
|
||
"action_repr": action_repr,
|
||
"before": {
|
||
"url": before_url,
|
||
},
|
||
"after": {
|
||
"url": after_url,
|
||
}
|
||
}
|
||
)
|
||
|
||
print(f"{len(processed_annotation)} actions found.")
|
||
return processed_annotation, processed_snapshots, processed_screenshots
|
||
|
||
|
||
async def main(trace_files, args):
|
||
async with async_playwright() as p:
|
||
# print(f"Launching browser, trace_files: {trace_files}, args: {args}")
|
||
p.selectors.set_test_id_attribute("data-pw-testid-buckeye")
|
||
browser = await p.chromium.launch(headless=True)
|
||
context = await browser.new_context(viewport={"width": 1280, "height": 1080})
|
||
for trace_file in trace_files:
|
||
success = False
|
||
for _ in range(1):
|
||
page = await context.new_page()
|
||
try:
|
||
(
|
||
processed_annotation,
|
||
processed_snapshots,
|
||
processed_screenshots,
|
||
) = await process_trace(trace_file, page)
|
||
output_dir = os.path.join(args.output_dir)
|
||
if not os.path.exists(output_dir):
|
||
os.makedirs(output_dir)
|
||
|
||
filename = trace_file.split('/')[-1]
|
||
with open(
|
||
os.path.join(
|
||
output_dir,
|
||
f"{filename}.content.json",
|
||
),
|
||
"w",
|
||
) as f:
|
||
json.dump(processed_annotation, f)
|
||
success = True
|
||
except Exception as e:
|
||
print(e)
|
||
print("Retrying...")
|
||
await page.close()
|
||
if success:
|
||
break
|
||
if not success:
|
||
print(f"Failed to process {trace_file}")
|
||
await browser.close()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
args = argparse.ArgumentParser()
|
||
args.add_argument("--input_pattern", type=str, required=True)
|
||
args.add_argument("--output_dir", type=str, required=True)
|
||
args = args.parse_args()
|
||
|
||
trace_files = []
|
||
for trace_file in glob.glob(args.input_pattern):
|
||
filename = trace_file.split('/')[-1]
|
||
output_dir = os.path.join(args.output_dir)
|
||
path = os.path.join(
|
||
output_dir,
|
||
f"{filename}.content.json",
|
||
)
|
||
# print(f"path: {path}")
|
||
if not os.path.exists(path):
|
||
trace_files.append(trace_file)
|
||
|
||
print(f"total trace number {len(trace_files)}")
|
||
asyncio.run(main(trace_files, args))
|