trace_synthesis/video_gen.py
yuyr a84d51a101 1. 增加r1生成综合策略代码和输出;
2. 增加tasks;
3. 增加analysis部分,对策略进行归纳分类,然后进行评测。
2025-04-17 17:40:15 +08:00

257 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import glob
import re
import subprocess
import sys
import zipfile
import shutil
def extract_timestamp(filename):
"""Extracts the float timestamp from the filename."""
# Use regex for robustness, matching the pattern before '.jpeg'
# It looks for one or more digits, optionally followed by a dot and more digits,
# right before the .jpeg extension, preceded by a hyphen.
match = re.search(r'-(\d+(\.\d+)?)\.jpeg$', filename)
if match:
try:
return float(match.group(1))
except ValueError:
return None
return None
def create_video_from_images(image_folder, output_video_file, default_last_frame_duration=0.1, max_duration=5.0):
"""
Creates a WebM video from timestamped JPEG images in a folder.
Args:
image_folder (str): Path to the folder containing JPEG images.
output_video_file (str): Path for the output WebM video file.
default_last_frame_duration (float): Duration (in seconds) to display the last frame.
max_duration (float): Maximum duration (in seconds) for a single output video.
If total duration exceeds this, multiple videos will be created.
"""
print(f"Scanning folder: {image_folder}")
search_pattern = os.path.join(image_folder, 'page@*.jpeg')
image_files = glob.glob(search_pattern)
if not image_files:
print(f"Error: No JPEG files matching pattern '{search_pattern}' found.")
return
print(f"Found {len(image_files)} matching image files.")
# Extract timestamps and store as (timestamp, full_path) tuples
timed_files = []
for img_path in image_files:
timestamp = extract_timestamp(os.path.basename(img_path))
if timestamp is not None:
timed_files.append((timestamp, img_path))
else:
print(f"Warning: Could not extract timestamp from {os.path.basename(img_path)}. Skipping.")
if not timed_files:
print("Error: No files with valid timestamps found.")
return
# Sort files chronologically based on timestamp
timed_files.sort()
print(f"Processing {len(timed_files)} files with valid timestamps.")
# 第一步计算所有图片的持续时间并过滤掉持续时间超过10秒的图片
filtered_files_with_duration = []
for i in range(len(timed_files)):
timestamp, img_path = timed_files[i]
# 计算每帧的持续时间
if i < len(timed_files) - 1:
next_timestamp, _ = timed_files[i+1]
duration = next_timestamp - timestamp
# 防止零或负值
if duration <= 0:
duration = 0.01
else:
# 最后一帧的持续时间
duration = default_last_frame_duration
# 过滤掉持续时间超过5秒的图片
if duration <= 5.0:
filtered_files_with_duration.append((timestamp, img_path, duration))
else:
print(f"Skipping frame with duration {duration:.2f}s > 10s: {img_path}")
if not filtered_files_with_duration:
print("Error: No valid frames left after filtering.")
return
# 按照最大时长分段
segments = []
current_segment = []
current_segment_duration = 0.0
for timestamp, img_path, duration in filtered_files_with_duration:
# 检查添加此帧是否会超过最大时长
if current_segment_duration + duration > max_duration and current_segment:
# 当前段已满,开始新段
segments.append(current_segment)
current_segment = [(timestamp, img_path, duration)]
current_segment_duration = duration
else:
# 添加到当前段
current_segment.append((timestamp, img_path, duration))
current_segment_duration += duration
# 添加最后一段(如果非空)
if current_segment:
segments.append(current_segment)
# 第二步合并短于3秒的段到前一个段
merged_segments = []
for i, segment in enumerate(segments):
# 计算段的总时长
segment_duration = sum(duration for _, _, duration in segment)
# 如果段小于10秒且不是第一个段则合并到前一个段
if segment_duration < 10.0 and i > 0:
print(f"Merging segment {i+1} (duration: {segment_duration:.2f}s < 3s) with previous segment")
merged_segments[-1].extend(segment)
else:
merged_segments.append(segment)
print(f"Final segments after merging: {len(merged_segments)} (from original {len(segments)})")
# 处理每个段
for segment_index, segment in enumerate(merged_segments):
# 生成输出文件名
if len(merged_segments) > 1:
# 提取基本名称和扩展名
base_name, extension = os.path.splitext(output_video_file)
segment_output_file = f"{base_name}_part{segment_index+1}{extension}"
else:
segment_output_file = output_video_file
print(f"\nProcessing segment {segment_index+1}/{len(merged_segments)} -> {segment_output_file}")
# 创建FFmpeg输入文件
ffmpeg_input_file = f"ffmpeg_input_segment_{segment_index+1}.txt"
try:
with open(ffmpeg_input_file, 'w', encoding='utf-8') as f:
f.write("ffconcat version 1.0\n")
for timestamp, img_path, duration in segment:
abs_img_path = os.path.abspath(img_path)
safe_img_path = abs_img_path.replace("'", "'\\''")
f.write(f"file '{safe_img_path}'\n")
f.write(f"duration {duration:.6f}\n")
# 再次添加最后一帧以应用最终持续时间
_, last_img_path, _ = segment[-1]
abs_last_img_path = os.path.abspath(last_img_path)
safe_last_img_path = abs_last_img_path.replace("'", "'\\''")
f.write(f"file '{safe_last_img_path}'\n")
# 运行FFmpeg命令
ffmpeg_command = [
'ffmpeg',
'-f', 'concat',
'-safe', '0',
'-i', ffmpeg_input_file,
'-c:v', 'libvpx-vp9',
'-crf', '30',
'-b:v', '0',
'-pix_fmt', 'yuv420p',
'-y',
segment_output_file
]
print("\nRunning FFmpeg command:")
print(" ".join(f"'{arg}'" if " " in arg else arg for arg in ffmpeg_command))
try:
result = subprocess.run(ffmpeg_command, check=True, capture_output=True, text=True, encoding='utf-8')
print("\nFFmpeg output:")
print(result.stdout)
print(f"Segment {segment_index+1} finished successfully!")
print(f"Output video saved to: {segment_output_file}")
except subprocess.CalledProcessError as e:
print(f"\nError running FFmpeg for segment {segment_index+1}!")
print(f"Return code: {e.returncode}")
print("FFmpeg stdout:")
print(e.stdout)
print("FFmpeg stderr:")
print(e.stderr)
except FileNotFoundError:
print("\nError: 'ffmpeg' command not found. Make sure FFmpeg is installed and in your system's PATH.")
finally:
# 清理临时输入文件
if os.path.exists(ffmpeg_input_file):
os.remove(ffmpeg_input_file)
print(f"Cleaned up temporary file: {ffmpeg_input_file}")
def process_zip_files():
"""
Processes all zip files in the current directory, extracts them,
and creates videos from the extracted images.
"""
# 获取当前目录下所有的zip文件
zip_files = glob.glob('trace2/*.trace.zip')
if not zip_files:
print("没有找到任何.trace.zip文件")
return
print(f"找到{len(zip_files)}个zip文件")
# 创建video目录如果不存在
video_base_dir = os.path.join(os.getcwd(), 'video2')
if not os.path.exists(video_base_dir):
os.makedirs(video_base_dir)
# 处理每个zip文件
for zip_file in zip_files:
# 获取不带.zip扩展名的文件名
base_name = zip_file[:-4] # 移除.zip
base_name = base_name.split("/")[-1]
extract_dir = os.path.join(os.getcwd(), "extract", base_name)
video_output_dir = os.path.join(video_base_dir, base_name)
print(f"\n处理zip文件: {zip_file}")
# 如果解压目录已存在,先删除
if os.path.exists(extract_dir):
print(f"删除已存在的目录: {extract_dir}")
shutil.rmtree(extract_dir)
# 创建解压目录
os.makedirs(extract_dir)
# 创建视频输出目录
if not os.path.exists(video_output_dir):
os.makedirs(video_output_dir)
# 解压文件
try:
print(f"解压文件到: {extract_dir}")
with zipfile.ZipFile(zip_file, 'r') as zip_ref:
zip_ref.extractall(extract_dir)
# 进入解压目录并处理图像
print(f"进入目录: {extract_dir}")
output_video = os.path.join(video_output_dir, f"{base_name}_recording.webm")
image_dir = extract_dir + "/resources/"
# 调用视频创建函数
create_video_from_images(image_dir, output_video, max_duration=30.0)
except Exception as e:
print(f"处理{zip_file}时出错: {str(e)}")
# --- 主程序入口 ---
if __name__ == "__main__":
process_zip_files()