import base64 import io import json import re from pathlib import Path from typing import Any from PIL import Image from agent.prompts import * from browser_env import ( Action, ActionTypes, ObservationMetadata, StateInfo, action2str, ) HTML_TEMPLATE = """ {body} """ def get_render_action( action: Action, observation_metadata: dict[str, ObservationMetadata], action_set_tag: str, ) -> str: """Parse the predicted actions for rendering purpose. More comprehensive information""" match action_set_tag: case "id_html_tree": text_meta_data = observation_metadata["text"] if action["element_id"] in text_meta_data["obs_nodes_info"]: node_content = text_meta_data["obs_nodes_info"][ action["element_id"] ]["text"] else: node_content = "No match found" action_str = f"
{action['raw_prediction']}
" action_str += f"
{repr(action)}
" action_str += f"
{action2str(action, action_set_tag, node_content)}
" case "id_html_nasc_tree": text_meta_data = observation_metadata["text"] if action["element_id"] in text_meta_data["obs_nodes_info"]: node_content = text_meta_data["obs_nodes_info"][ action["element_id"] ]["text"] else: node_content = "No match found" action_str = f"
{action['raw_prediction']}
" action_str += f"
{repr(action)}
" action_str += f"
{action2str(action, action_set_tag, node_content)}
" case "id_accessibility_tree": text_meta_data = observation_metadata["text"] if action["element_id"] in text_meta_data["obs_nodes_info"]: node_content = text_meta_data["obs_nodes_info"][ action["element_id"] ]["text"] else: node_content = "No match found" action_str = f"
{action['raw_prediction']}
" action_str += f"
{repr(action)}
" action_str += f"
{action2str(action, action_set_tag, node_content)}
" case "playwright": action_str = action["pw_code"] case _: raise ValueError(f"Unknown action type {action['action_type']}") return action_str def get_action_description( action: Action, observation_metadata: dict[str, ObservationMetadata], action_set_tag: str, prompt_constructor: PromptConstructor | None, ) -> str: """Generate the text version of the predicted actions to store in action history for prompt use. May contain hint information to recover from the failures""" match action_set_tag: case "id_html_tree": # old_op_prompt = "Website: %s; Thinking process: %s; Html segment: %s; Operation: %s; Result: %s" op_prompt = "Html segment: %s; Operation: %s;" text_meta_data = observation_metadata["text"] node_info = text_meta_data["obs_nodes_info"] result = 'Operation Success' if action["action_type"] in [ ActionTypes.CLICK, ActionTypes.HOVER, ActionTypes.TYPE, ]: action_name = str(action["action_type"]).split(".")[1].lower() if action["element_id"] in node_info: node_content = node_info[action["element_id"]]["text"] node_content = " ".join(node_content.split()[1:]) action["label"] = node_info[action["element_id"]]["label"] action_str = action2str( action, action_set_tag, node_content ) else: action_str = "None" result = f"Cannot find the corresponding tag. Attempt to perfom \"{action_name}\" on element \"[{action['element_id']}]\" but no matching element found. Please check the observation more carefully." else: if ( action["action_type"] == ActionTypes.NONE and prompt_constructor is not None ): text = action["answer"] if text is not None and text.count("#Record#") > 0: action_str = text else: action_str = "None" result = f'Operation invalid. The format was incorrect. Ensure that the action is wrapped inside a pair of # and seperate arguments within spaces as follows: #action# arg1 arg2 ....' else: action_str = action2str(action, action_set_tag, "") # action_str = op_prompt % ( # prompt_constructor.state["url"], # prompt_constructor.state["intention"], # prompt_constructor.state["segment"], # action_str, # result, # ) action_str = op_prompt % ( prompt_constructor.state["segment"], action_str, ) case "id_html_nasc_tree": op_prompt = "%s #HTML Segment: %s" text_meta_data = observation_metadata["text"] node_info = text_meta_data["obs_nodes_info"] result = 'Operation Success' if action["action_type"] in [ ActionTypes.CLICK, ActionTypes.HOVER, ActionTypes.TYPE, ]: action_name = str(action["action_type"]).split(".")[1].lower() if action["element_id"] in node_info: node_content = node_info[action["element_id"]]["text"] node_content = " ".join(node_content.split()[1:]) action["label"] = node_info[action["element_id"]]["label"] action_str = action2str( action, action_set_tag, node_content ) else: action_str = "None" result = f"Cannot find the corresponding tag. Attempt to perfom \"{action_name}\" on element \"[{action['element_id']}]\" but no matching element found. Please check the observation more carefully." else: if ( action["action_type"] == ActionTypes.NONE and prompt_constructor is not None ): text = action["answer"] if text is not None and text.count("record") > 0: action_str = text else: action_str = "None" result = f'Operation invalid. The format was incorrect. Ensure that the action is wrapped inside a pair of # and seperate arguments within spaces as follows: #action# arg1 arg2 ....' else: action_str = action2str(action, action_set_tag, "") action_str = op_prompt % ( action_str, prompt_constructor.state["segment"], ) case "id_accessibility_tree": text_meta_data = observation_metadata["text"] if action["action_type"] in [ ActionTypes.CLICK, ActionTypes.HOVER, ActionTypes.TYPE, ]: action_name = str(action["action_type"]).split(".")[1].lower() if action["element_id"] in text_meta_data["obs_nodes_info"]: node_content = text_meta_data["obs_nodes_info"][ action["element_id"] ]["text"] node_content = " ".join(node_content.split()[1:]) action_str = action2str( action, action_set_tag, node_content ) else: action_str = f"Attempt to perfom \"{action_name}\" on element \"[{action['element_id']}]\" but no matching element found. Please check the observation more carefully." else: if ( action["action_type"] == ActionTypes.NONE and prompt_constructor is not None ): action_splitter = prompt_constructor.instruction[ "meta_data" ]["action_splitter"] action_str = f'The previous prediction you issued was "{action["raw_prediction"]}". However, the format was incorrect. Ensure that the action is wrapped inside a pair of {action_splitter} and enclose arguments within [] as follows: {action_splitter}action [arg] ...{action_splitter}.' else: action_str = action2str(action, action_set_tag, "") case "playwright": action_str = action["pw_code"] case _: raise ValueError(f"Unknown action type {action['action_type']}") return action_str class RenderHelper(object): """Helper class to render text and image observations and meta data in the trajectory""" def __init__( self, config_file: str, result_dir: str, action_set_tag: str ) -> None: with open(config_file, "r") as f: _config = json.load(f) _config_str = "" for k, v in _config.items(): _config_str += f"{k}: {v}\n" _config_str = f"
{_config_str}
\n" task_id = _config["task_id"] self.action_set_tag = action_set_tag self.render_file = open( Path(result_dir) / f"render_{task_id}.html", "a+" ) self.render_file.truncate(0) # write init template self.render_file.write(HTML_TEMPLATE.format(body=f"{_config_str}")) self.render_file.read() self.render_file.flush() def render( self, action: Action, state_info: StateInfo, meta_data: dict[str, Any], render_screenshot: bool = False, ) -> None: """Render the trajectory""" # text observation observation = state_info["observation"] text_obs = observation["text"] info = state_info["info"] new_content = f"

New Page

\n" new_content += f"

URL: {state_info['info']['page'].url}

\n" new_content += f"
{text_obs}
\n" if render_screenshot: # image observation img_obs = observation["image"] image = Image.fromarray(img_obs) byte_io = io.BytesIO() image.save(byte_io, format="PNG") byte_io.seek(0) image_bytes = base64.b64encode(byte_io.read()) image_str = image_bytes.decode("utf-8") new_content += f"\n" # meta data new_content += f"
{meta_data['action_history'][-1]}
\n" # action action_str = get_render_action( action, info["observation_metadata"], action_set_tag=self.action_set_tag, ) # with yellow background action_str = f"
{action_str}
" new_content += f"{action_str}\n" # add new content self.render_file.seek(0) html = self.render_file.read() html_body = re.findall(r"(.*?)", html, re.DOTALL)[0] html_body += new_content html = HTML_TEMPLATE.format(body=html_body) self.render_file.seek(0) self.render_file.truncate() self.render_file.write(html) self.render_file.flush() def close(self) -> None: self.render_file.close()