import os import glob import re import subprocess import sys import zipfile import shutil def extract_timestamp(filename): """Extracts the float timestamp from the filename.""" # Use regex for robustness, matching the pattern before '.jpeg' # It looks for one or more digits, optionally followed by a dot and more digits, # right before the .jpeg extension, preceded by a hyphen. match = re.search(r'-(\d+(\.\d+)?)\.jpeg$', filename) if match: try: return float(match.group(1)) except ValueError: return None return None def create_video_from_images(image_folder, output_video_file, default_last_frame_duration=0.1, max_duration=5.0): """ Creates a WebM video from timestamped JPEG images in a folder. Args: image_folder (str): Path to the folder containing JPEG images. output_video_file (str): Path for the output WebM video file. default_last_frame_duration (float): Duration (in seconds) to display the last frame. max_duration (float): Maximum duration (in seconds) for a single output video. If total duration exceeds this, multiple videos will be created. """ print(f"Scanning folder: {image_folder}") search_pattern = os.path.join(image_folder, 'page@*.jpeg') image_files = glob.glob(search_pattern) if not image_files: print(f"Error: No JPEG files matching pattern '{search_pattern}' found.") return print(f"Found {len(image_files)} matching image files.") # Extract timestamps and store as (timestamp, full_path) tuples timed_files = [] for img_path in image_files: timestamp = extract_timestamp(os.path.basename(img_path)) if timestamp is not None: timed_files.append((timestamp, img_path)) else: print(f"Warning: Could not extract timestamp from {os.path.basename(img_path)}. Skipping.") if not timed_files: print("Error: No files with valid timestamps found.") return # Sort files chronologically based on timestamp timed_files.sort() print(f"Processing {len(timed_files)} files with valid timestamps.") # 第一步:计算所有图片的持续时间并过滤掉持续时间超过10秒的图片 filtered_files_with_duration = [] for i in range(len(timed_files)): timestamp, img_path = timed_files[i] # 计算每帧的持续时间 if i < len(timed_files) - 1: next_timestamp, _ = timed_files[i+1] duration = next_timestamp - timestamp # 防止零或负值 if duration <= 0: duration = 0.01 else: # 最后一帧的持续时间 duration = default_last_frame_duration # 过滤掉持续时间超过5秒的图片 if duration <= 5.0: filtered_files_with_duration.append((timestamp, img_path, duration)) else: print(f"Skipping frame with duration {duration:.2f}s > 10s: {img_path}") if not filtered_files_with_duration: print("Error: No valid frames left after filtering.") return # 按照最大时长分段 segments = [] current_segment = [] current_segment_duration = 0.0 for timestamp, img_path, duration in filtered_files_with_duration: # 检查添加此帧是否会超过最大时长 if current_segment_duration + duration > max_duration and current_segment: # 当前段已满,开始新段 segments.append(current_segment) current_segment = [(timestamp, img_path, duration)] current_segment_duration = duration else: # 添加到当前段 current_segment.append((timestamp, img_path, duration)) current_segment_duration += duration # 添加最后一段(如果非空) if current_segment: segments.append(current_segment) # 第二步:合并短于3秒的段到前一个段 merged_segments = [] for i, segment in enumerate(segments): # 计算段的总时长 segment_duration = sum(duration for _, _, duration in segment) # 如果段小于10秒且不是第一个段,则合并到前一个段 if segment_duration < 10.0 and i > 0: print(f"Merging segment {i+1} (duration: {segment_duration:.2f}s < 3s) with previous segment") merged_segments[-1].extend(segment) else: merged_segments.append(segment) print(f"Final segments after merging: {len(merged_segments)} (from original {len(segments)})") # 处理每个段 for segment_index, segment in enumerate(merged_segments): # 生成输出文件名 if len(merged_segments) > 1: # 提取基本名称和扩展名 base_name, extension = os.path.splitext(output_video_file) segment_output_file = f"{base_name}_part{segment_index+1}{extension}" else: segment_output_file = output_video_file print(f"\nProcessing segment {segment_index+1}/{len(merged_segments)} -> {segment_output_file}") # 创建FFmpeg输入文件 ffmpeg_input_file = f"ffmpeg_input_segment_{segment_index+1}.txt" try: with open(ffmpeg_input_file, 'w', encoding='utf-8') as f: f.write("ffconcat version 1.0\n") for timestamp, img_path, duration in segment: abs_img_path = os.path.abspath(img_path) safe_img_path = abs_img_path.replace("'", "'\\''") f.write(f"file '{safe_img_path}'\n") f.write(f"duration {duration:.6f}\n") # 再次添加最后一帧以应用最终持续时间 _, last_img_path, _ = segment[-1] abs_last_img_path = os.path.abspath(last_img_path) safe_last_img_path = abs_last_img_path.replace("'", "'\\''") f.write(f"file '{safe_last_img_path}'\n") # 运行FFmpeg命令 ffmpeg_command = [ 'ffmpeg', '-f', 'concat', '-safe', '0', '-i', ffmpeg_input_file, '-c:v', 'libvpx-vp9', '-crf', '30', '-b:v', '0', '-pix_fmt', 'yuv420p', '-y', segment_output_file ] print("\nRunning FFmpeg command:") print(" ".join(f"'{arg}'" if " " in arg else arg for arg in ffmpeg_command)) try: result = subprocess.run(ffmpeg_command, check=True, capture_output=True, text=True, encoding='utf-8') print("\nFFmpeg output:") print(result.stdout) print(f"Segment {segment_index+1} finished successfully!") print(f"Output video saved to: {segment_output_file}") except subprocess.CalledProcessError as e: print(f"\nError running FFmpeg for segment {segment_index+1}!") print(f"Return code: {e.returncode}") print("FFmpeg stdout:") print(e.stdout) print("FFmpeg stderr:") print(e.stderr) except FileNotFoundError: print("\nError: 'ffmpeg' command not found. Make sure FFmpeg is installed and in your system's PATH.") finally: # 清理临时输入文件 if os.path.exists(ffmpeg_input_file): os.remove(ffmpeg_input_file) print(f"Cleaned up temporary file: {ffmpeg_input_file}") def process_zip_files(): """ Processes all zip files in the current directory, extracts them, and creates videos from the extracted images. """ # 获取当前目录下所有的zip文件 zip_files = glob.glob('trace2/*.trace.zip') if not zip_files: print("没有找到任何.trace.zip文件") return print(f"找到{len(zip_files)}个zip文件") # 创建video目录(如果不存在) video_base_dir = os.path.join(os.getcwd(), 'video2') if not os.path.exists(video_base_dir): os.makedirs(video_base_dir) # 处理每个zip文件 for zip_file in zip_files: # 获取不带.zip扩展名的文件名 base_name = zip_file[:-4] # 移除.zip base_name = base_name.split("/")[-1] extract_dir = os.path.join(os.getcwd(), "extract", base_name) video_output_dir = os.path.join(video_base_dir, base_name) print(f"\n处理zip文件: {zip_file}") # 如果解压目录已存在,先删除 if os.path.exists(extract_dir): print(f"删除已存在的目录: {extract_dir}") shutil.rmtree(extract_dir) # 创建解压目录 os.makedirs(extract_dir) # 创建视频输出目录 if not os.path.exists(video_output_dir): os.makedirs(video_output_dir) # 解压文件 try: print(f"解压文件到: {extract_dir}") with zipfile.ZipFile(zip_file, 'r') as zip_ref: zip_ref.extractall(extract_dir) # 进入解压目录并处理图像 print(f"进入目录: {extract_dir}") output_video = os.path.join(video_output_dir, f"{base_name}_recording.webm") image_dir = extract_dir + "/resources/" # 调用视频创建函数 create_video_from_images(image_dir, output_video, max_duration=30.0) except Exception as e: print(f"处理{zip_file}时出错: {str(e)}") # --- 主程序入口 --- if __name__ == "__main__": process_zip_files()