clean code

This commit is contained in:
yuyr 2025-06-17 09:53:40 +00:00
parent a5b060d4f8
commit 17f1b1f8c7
13 changed files with 1377 additions and 22877 deletions

View File

@ -1,364 +0,0 @@
import os
import random
import json
import mysql.connector
from openai import OpenAI
from dotenv import load_dotenv
# --- Configuration ---
load_dotenv()
MYSQL_CONFIG = {
"host": "localhost",
"port": "23306",
"user": "mcpuser",
"password": "StrongPass123!",
"database": "magentodb"
}
OPENAI_CONFIG = {
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": os.getenv("OPENAI_BASE_URL"),
"model": "gpt-4o"
}
# --- Prompt Template ---
# This is a carefully engineered prompt to guide the LLM's output.
PROMPT_TEMPLATE = """
You are an expert database analyst and a creative test case designer for e-commerce web applications.
Your goal is to generate realistic administrative tasks that can be solved by a Web Agent navigating an admin panel.
I will provide you with the following context:
1. **Full Database Schema**: A list of `CREATE TABLE` statements for the core tables of a Magento e-commerce platform.
2. **Sampled Data**: A JSON object containing 5 random rows of data from 5 randomly selected core tables. This data is REAL and should be used to inspire specific, answerable questions.
## Your Task
Based on the provided schema and sample data, create a JSON object containing a single key, "questions", which holds an array of up to 10 unique task objects.
### Requirements for Each Question:
- **Web Agent Solvable**: The task must represent a realistic action an administrator would perform in a web UI (e.g., "Find all orders for customer X", "Update the stock for product Y", "Approve a pending review").
- **Grounded in Data**: The questions should be specific, using names, IDs, or values from the provided **Sampled Data** to make them concrete.
- **Utilize Schema**: You can formulate questions that require joining tables, even if not all tables were sampled. The full schema is your guide.
### Output Format
The final output MUST be a single, valid JSON object. Do not include any other text, explanations, or markdown formatting like ```json.
The JSON object must have one key: "questions", containing a JSON array of task objects.
Each object in the array must contain exactly three keys: `question`, `answer`, and `sql`.
- **`question`**: (string) A natural language description of the task for a web agent.
- **`answer`**: (string, integer, float, or list) The precise and concise answer to the question, derived by running the SQL query against the database.
- **`sql`**: (string) The exact, runnable MySQL query that was used to find the answer.
### Output Format Example
```json
{{
"questions": [
{{
"question": "What is the email address for customer with ID 5?",
"answer": "customer5@example.com",
"sql": "SELECT email FROM customer_entity WHERE entity_id = 5;"
}},
{{
"question": "Find the total quantity of item with SKU 'ABC-123' in the cart.",
"answer": 3,
"sql": "SELECT SUM(qty) FROM quote_item WHERE sku = 'ABC-123';"
}}
]
}}
```
---
### Full Database Schema
{schema_context}
---
### Sampled Data
Here is the sample data from randomly selected tables. Use this to make your questions specific.
{sampled_data_str}
---
Now, generate the JSON object based on these instructions.
"""
# This is a carefully engineered prompt to verify the LLM's own output.
SEMANTIC_VERIFICATION_PROMPT_TEMPLATE = """
You are a meticulous data verifier. Your task is to determine if a given "answer" is semantically correct and accurately supported by the "SQL query result".
I will provide you with a JSON object containing:
1. `question`: The original question asked.
2. `sql`: The SQL query used to find the answer.
3. `answer`: The answer generated by a previous AI.
4. `sql_result`: The actual data returned by executing the SQL query.
## Your Task
Carefully analyze the `sql_result` and compare it to the `answer`. The match should be semantic, not just a simple substring match. For example, if the question is "How many products are in stock?", an answer of "5" should be verifiable from the SQL result which might be `[(5,)]`.
### Requirements:
- Respond with a single JSON object.
- Do not include any other text, explanations, or markdown formatting.
- The JSON object must have exactly two keys:
- `is_match`: (boolean) `true` if the `answer` is fully and accurately supported by the `sql_result`, otherwise `false`.
- `reason`: (string) A brief explanation for your decision. If it's a mismatch, explain why (e.g., "The answer is 'John Doe' but the result contains 'Jane Doe'", "The answer is a count but the result is a list of names").
---
### Verification Data
{task_data_json}
---
Now, provide your verification as a JSON object.
"""
def get_db_connection():
"""Establishes a connection to the MySQL database."""
try:
conn = mysql.connector.connect(**MYSQL_CONFIG)
return conn
except mysql.connector.Error as err:
print(f"Error connecting to MySQL: {err}")
return None
def get_full_schema(cursor, tables):
"""Fetches the CREATE TABLE statements for all core tables."""
schema_parts = []
for table_name in tables:
try:
cursor.execute(f"SHOW CREATE TABLE `{table_name}`")
result = cursor.fetchone()
if result:
schema_parts.append(result[1]) # result[1] is the CREATE TABLE statement
except mysql.connector.Error as err:
print(f"Warning: Could not get schema for table {table_name}: {err}")
return "\n\n".join(schema_parts)
def get_random_tables_and_samples(cursor, tables, num_tables=5, num_samples=5):
"""Selects random tables and samples random rows from them."""
selected_tables = random.sample(tables, num_tables)
sampled_data = {}
for table_name in selected_tables:
try:
# Use ORDER BY RAND() for random sampling. Can be slow on very large tables.
query = f"SELECT * FROM `{table_name}` ORDER BY RAND() LIMIT {num_samples}"
cursor.execute(query)
rows = cursor.fetchall()
if not rows:
sampled_data[table_name] = []
continue
columns = [desc[0] for desc in cursor.description]
# Convert rows (tuples) to a list of dictionaries
sampled_rows = []
for row in rows:
row_dict = {}
for i, col_value in enumerate(row):
# Handle bytes by decoding, fall back to string representation
if isinstance(col_value, bytes):
try:
row_dict[columns[i]] = col_value.decode('utf-8')
except UnicodeDecodeError:
row_dict[columns[i]] = str(col_value)
else:
row_dict[columns[i]] = col_value
sampled_rows.append(row_dict)
sampled_data[table_name] = sampled_rows
except mysql.connector.Error as err:
print(f"Warning: Could not sample data from table {table_name}: {err}")
sampled_data[table_name] = f"Error: {err}"
return sampled_data
def generate_questions(client, schema_context, sampled_data):
"""Generates questions by calling the OpenAI API."""
if not client:
raise ValueError("OpenAI client not provided.")
sampled_data_str = json.dumps(sampled_data, indent=2, default=str)
prompt = PROMPT_TEMPLATE.format(
schema_context=schema_context,
sampled_data_str=sampled_data_str
)
try:
response = client.chat.completions.create(
model=OPENAI_CONFIG["model"],
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output JSON."},
{"role": "user", "content": prompt}
],
temperature=0.7,
response_format={"type": "json_object"},
)
content = response.choices[0].message.content
data = json.loads(content)
# The prompt asks for {"questions": [...]}, so we extract the list.
if isinstance(data, dict) and "questions" in data and isinstance(data["questions"], list):
return data["questions"]
elif isinstance(data, list):
# Fallback in case the model returns a list directly
print("Warning: Model returned a raw list instead of an object with a 'questions' key.")
return data
else:
print(f"Warning: Failed to find a 'questions' list in the model's output. Got: {content}")
return None
except Exception as e:
print(f"Error calling OpenAI API or parsing JSON: {e}")
return None
def semantic_validate_tasks(tasks, client):
"""
Uses an LLM to semantically validate if the task's answer matches the SQL result.
"""
if not tasks:
return []
final_validated_tasks = []
print("\nPerforming semantic validation with GPT-4o...")
for task in tasks:
# Prepare data for the prompt, including the SQL result
task_data_for_prompt = {
"question": task["question"],
"sql": task["sql"],
"answer": task["answer"],
"sql_result": task["sql_result"]
}
task_data_json = json.dumps(task_data_for_prompt, indent=2, default=str)
prompt = SEMANTIC_VERIFICATION_PROMPT_TEMPLATE.format(task_data_json=task_data_json)
try:
print(f" - Verifying question: \"{task['question'][:80]}...\"")
response = client.chat.completions.create(
model=OPENAI_CONFIG["model"],
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output JSON."},
{"role": "user", "content": prompt}
],
temperature=0.0, # We want deterministic validation
response_format={"type": "json_object"},
)
content = response.choices[0].message.content
verification_result = json.loads(content)
if verification_result.get("is_match") is True:
# Task is valid. Rename sql_result for the final output.
print(f" - Validation PASSED.")
task['sql_execute_result'] = task.pop('sql_result')
final_validated_tasks.append(task)
else:
reason = verification_result.get('reason', 'No reason provided.')
print(f" - Validation FAILED. Filtering task.")
print(f" - Reason: {reason}")
print(f" - Question: {task['question']}")
print(f" - Expected Answer: {json.dumps(task['answer'], default=str)}")
print(f" - SQL: {task['sql']}")
sql_result_str = json.dumps(task['sql_result'], indent=2, default=str)
print(f" - SQL Result: {sql_result_str}")
except Exception as e:
print(f" - An error occurred during semantic validation for task, filtering it out: {e}")
print(f" - Question: {task.get('question', 'N/A')}")
print(f" - SQL: {task.get('sql', 'N/A')}")
return final_validated_tasks
def main():
"""Main function to run the script."""
# 1. Load the list of core tables
try:
with open('core_tables.json', 'r') as f:
core_tables = json.load(f)
except FileNotFoundError:
print("Error: core_tables.json not found. Please create it.")
return
# 2. Connect to the database
conn = get_db_connection()
if not conn:
return
cursor = conn.cursor()
# 3. Setup OpenAI Client
if not OPENAI_CONFIG["api_key"]:
print("Error: OPENAI_API_KEY environment variable not set.")
return
client = OpenAI(api_key=OPENAI_CONFIG["api_key"], base_url=OPENAI_CONFIG["base_url"])
try:
# 4. Get full schema context
print("Fetching full database schema...")
schema_context = get_full_schema(cursor, core_tables)
# 5. Get random samples and print them
print("Sampling data from 5 random tables...")
sampled_data = get_random_tables_and_samples(cursor, core_tables, num_tables=5, num_samples=5)
print(f"Sampled from tables: {list(sampled_data.keys())}")
print("\n--- Sampled Data ---")
print(json.dumps(sampled_data, indent=2, default=str))
print("---------------------\n")
# 6. Generate questions using the LLM
print("Generating questions with GPT-4o...")
generated_tasks = generate_questions(client, schema_context, sampled_data)
# 7. Initial validation (SQL execution and substring check)
pre_validated_tasks = []
if generated_tasks:
print("\nPerforming initial validation (SQL execution and substring match)...")
for task in generated_tasks:
if not isinstance(task, dict) or not all(k in task for k in ['sql', 'answer', 'question']):
print(f"Filtering task due to malformed structure or missing keys: {task}")
continue
try:
cursor.execute(task['sql'])
sql_result = cursor.fetchall()
answer_str = str(task['answer'])
result_str = str(sql_result)
if answer_str in result_str:
task['sql_result'] = sql_result # Attach result for the next validation step
pre_validated_tasks.append(task)
else:
print(f"Filtering task: Answer '{answer_str}' not found in SQL result.")
print(f" - Question: {task['question']}")
print(f" - SQL: {task['sql']}")
print(f" - Result: {result_str[:250]}...")
except mysql.connector.Error as err:
print(f"Filtering task due to SQL error: {err}")
print(f" - Question: {task['question']}")
print(f" - SQL: {task['sql']}")
except Exception as e:
print(f"An unexpected error occurred during initial validation for task {task}: {e}")
# 8. Semantic validation using LLM
validated_tasks = semantic_validate_tasks(pre_validated_tasks, client)
# 9. Print the final JSON output
if validated_tasks:
print("\n--- Final Validated Tasks ---")
print(json.dumps(validated_tasks, indent=2, default=str))
else:
print("Failed to generate any valid tasks after all validation steps.")
finally:
# 10. Close the database connection
if conn.is_connected():
cursor.close()
conn.close()
print("\nDatabase connection closed.")
if __name__ == "__main__":
main()

View File

@ -1,370 +0,0 @@
import os
import random
import json
import mysql.connector
from openai import OpenAI
from dotenv import load_dotenv
# --- Configuration ---
load_dotenv()
MYSQL_CONFIG = {
"host": "localhost",
"port": "23306",
"user": "mcpuser",
"password": "StrongPass123!",
"database": "magentodb"
}
OPENAI_CONFIG = {
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": os.getenv("OPENAI_BASE_URL"),
"model": "gpt-4o"
}
# --- Prompt Template ---
# This is a carefully engineered prompt to guide the LLM's output.
PROMPT_TEMPLATE = """
You are an expert database analyst and a creative test case designer for e-commerce web applications.
Your goal is to generate realistic administrative tasks that can be solved by a Web Agent navigating an admin panel.
I will provide you with the following context:
1. **Full Database Schema**: A list of `CREATE TABLE` statements for the core tables of a Magento e-commerce platform.
2. **Sampled Data**: A JSON object containing 5 random rows of data from 5 randomly selected core tables. This data is REAL and should be used to inspire specific, answerable questions.
## Your Task
Based on the provided schema and sample data, create a JSON object containing a single key, "questions", which holds an array of up to 10 unique task objects.
### Requirements for Each Question:
- **Web Agent Solvable**: The task must represent a realistic action an administrator would perform in a web UI (e.g., "Find all orders for customer X", "Update the stock for product Y", "Approve a pending review").
- **Grounded in Data**: The questions should be specific, using names, IDs, or values from the provided **Sampled Data** to make them concrete.
- **Utilize Schema**: You can formulate questions that require joining tables, even if not all tables were sampled. The full schema is your guide.
### Output Format
The final output MUST be a single, valid JSON object. Do not include any other text, explanations, or markdown formatting like ```json.
The JSON object must have one key: "questions", containing a JSON array of task objects.
Each object in the array must contain exactly three keys: `question`, `answer`, and `sql`.
- **`question`**: (string) A natural language description of the task for a web agent.
- **`answer`**: (string, integer, float, or list) The precise and concise answer to the question, derived by running the SQL query against the database.
- **`sql`**: (string) The exact, runnable MySQL query that was used to find the answer.
### Output Format Example
```json
{{
"questions": [
{{
"question": "What is the email address for customer with ID 5?",
"answer": "customer5@example.com",
"sql": "SELECT email FROM customer_entity WHERE entity_id = 5;"
}},
{{
"question": "Find the total quantity of item with SKU 'ABC-123' in the cart.",
"answer": 3,
"sql": "SELECT SUM(qty) FROM quote_item WHERE sku = 'ABC-123';"
}}
]
}}
```
---
### Full Database Schema
{schema_context}
---
### Sampled Data
Here is the sample data from randomly selected tables. Use this to make your questions specific.
{sampled_data_str}
---
Now, generate the JSON object based on these instructions.
"""
# This is a new prompt to evaluate results and generate a corrected answer.
SEMANTIC_EVALUATION_PROMPT_TEMPLATE = """
You are a precise data analyst. Your task is to evaluate if a SQL query's result adequately answers a given natural language question. If it does, you must formulate a concise, natural-language answer.
I will provide you with a JSON object containing:
1. `question`: The original question asked.
2. `sql`: The SQL query that was executed.
3. `sql_result`: The actual data returned by executing the SQL query.
## Your Task
1. **Analyze**: Determine if the `sql_result` contains the necessary information to definitively answer the `question`.
2. **Respond**: Based on your analysis, generate a JSON object with one of two structures.
### Case 1: The question CAN be answered
If the `sql_result` provides a clear answer, respond with:
```json
{{
"can_answer": true,
"new_answer": "..."
}}
```
- `can_answer`: (boolean) Must be `true`.
- `new_answer`: (string, integer, float, or list) A concise, human-readable answer derived *only* from the `sql_result`. For example, if the result is `[(52.00,)]`, the answer can be "52.00" or 52.00.
### Case 2: The question CANNOT be answered
If the `sql_result` is empty, irrelevant, or insufficient to answer the question, respond with:
```json
{{
"can_answer": false,
"reason": "..."
}}
```
- `can_answer`: (boolean) Must be `false`.
- `reason`: (string) A brief explanation for why the question cannot be answered from the given data (e.g., "The query returned no results.", "The result contains internal IDs, not the requested customer names.").
---
### Evaluation Data
{task_data_json}
---
Now, provide your evaluation as a JSON object.
"""
def get_db_connection():
"""Establishes a connection to the MySQL database."""
try:
conn = mysql.connector.connect(**MYSQL_CONFIG)
return conn
except mysql.connector.Error as err:
print(f"Error connecting to MySQL: {err}")
return None
def get_full_schema(cursor, tables):
"""Fetches the CREATE TABLE statements for all core tables."""
schema_parts = []
for table_name in tables:
try:
cursor.execute(f"SHOW CREATE TABLE `{table_name}`")
result = cursor.fetchone()
if result:
schema_parts.append(result[1]) # result[1] is the CREATE TABLE statement
except mysql.connector.Error as err:
print(f"Warning: Could not get schema for table {table_name}: {err}")
return "\n\n".join(schema_parts)
def get_random_tables_and_samples(cursor, tables, num_tables=5, num_samples=5):
"""Selects random tables and samples random rows from them."""
selected_tables = random.sample(tables, num_tables)
sampled_data = {}
for table_name in selected_tables:
try:
# Use ORDER BY RAND() for random sampling. Can be slow on very large tables.
query = f"SELECT * FROM `{table_name}` ORDER BY RAND() LIMIT {num_samples}"
cursor.execute(query)
rows = cursor.fetchall()
if not rows:
sampled_data[table_name] = []
continue
columns = [desc[0] for desc in cursor.description]
# Convert rows (tuples) to a list of dictionaries
sampled_rows = []
for row in rows:
row_dict = {}
for i, col_value in enumerate(row):
# Handle bytes by decoding, fall back to string representation
if isinstance(col_value, bytes):
try:
row_dict[columns[i]] = col_value.decode('utf-8')
except UnicodeDecodeError:
row_dict[columns[i]] = str(col_value)
else:
row_dict[columns[i]] = col_value
sampled_rows.append(row_dict)
sampled_data[table_name] = sampled_rows
except mysql.connector.Error as err:
print(f"Warning: Could not sample data from table {table_name}: {err}")
sampled_data[table_name] = f"Error: {err}"
return sampled_data
def generate_questions(client, schema_context, sampled_data):
"""Generates questions by calling the OpenAI API."""
if not client:
raise ValueError("OpenAI client not provided.")
sampled_data_str = json.dumps(sampled_data, indent=2, default=str)
prompt = PROMPT_TEMPLATE.format(
schema_context=schema_context,
sampled_data_str=sampled_data_str
)
try:
response = client.chat.completions.create(
model=OPENAI_CONFIG["model"],
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output JSON."},
{"role": "user", "content": prompt}
],
temperature=0.7,
response_format={"type": "json_object"},
)
content = response.choices[0].message.content
data = json.loads(content)
# The prompt asks for {"questions": [...]}, so we extract the list.
if isinstance(data, dict) and "questions" in data and isinstance(data["questions"], list):
return data["questions"]
elif isinstance(data, list):
# Fallback in case the model returns a list directly
print("Warning: Model returned a raw list instead of an object with a 'questions' key.")
return data
else:
print(f"Warning: Failed to find a 'questions' list in the model's output. Got: {content}")
return None
except Exception as e:
print(f"Error calling OpenAI API or parsing JSON: {e}")
return None
def evaluate_and_refine_tasks(tasks, client):
"""
Uses an LLM to evaluate if a SQL result answers the question and refines the answer.
"""
if not tasks:
return []
final_validated_tasks = []
print("\nPerforming semantic evaluation and answer refinement with GPT-4o...")
for task in tasks:
# Prepare data for the prompt, excluding the original 'answer'
task_data_for_prompt = {
"question": task["question"],
"sql": task["sql"],
"sql_result": task["sql_result"]
}
task_data_json = json.dumps(task_data_for_prompt, indent=2, default=str)
prompt = SEMANTIC_EVALUATION_PROMPT_TEMPLATE.format(task_data_json=task_data_json)
try:
print(f" - Evaluating question: \"{task['question'][:80]}...\"")
response = client.chat.completions.create(
model=OPENAI_CONFIG["model"],
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output JSON."},
{"role": "user", "content": prompt}
],
temperature=0.0, # We want deterministic evaluation
response_format={"type": "json_object"},
)
content = response.choices[0].message.content
evaluation_result = json.loads(content)
if evaluation_result.get("can_answer") is True and "new_answer" in evaluation_result:
# Task is valid. Update the answer with the refined one from the LLM.
task['answer'] = evaluation_result['new_answer']
task['sql_execute_result'] = task.pop('sql_result')
final_validated_tasks.append(task)
print(f" - Evaluation PASSED. New answer: {json.dumps(task['answer'])}")
else:
reason = evaluation_result.get('reason', 'No reason provided.')
print(f" - Evaluation FAILED. Filtering task.")
print(f" - Reason: {reason}")
print(f" - Question: {task['question']}")
print(f" - Original Answer: {json.dumps(task['answer'], default=str)}")
print(f" - SQL: {task['sql']}")
sql_result_str = json.dumps(task['sql_result'], indent=2, default=str)
print(f" - SQL Result: {sql_result_str}")
except Exception as e:
print(f" - An error occurred during semantic evaluation for task, filtering it out: {e}")
print(f" - Question: {task.get('question', 'N/A')}")
print(f" - SQL: {task.get('sql', 'N/A')}")
return final_validated_tasks
def main():
"""Main function to run the script."""
# 1. Load the list of core tables
try:
with open('core_tables.json', 'r') as f:
core_tables = json.load(f)
except FileNotFoundError:
print("Error: core_tables.json not found. Please create it.")
return
# 2. Connect to the database
conn = get_db_connection()
if not conn:
return
cursor = conn.cursor()
# 3. Setup OpenAI Client
if not OPENAI_CONFIG["api_key"]:
print("Error: OPENAI_API_KEY environment variable not set.")
return
client = OpenAI(api_key=OPENAI_CONFIG["api_key"], base_url=OPENAI_CONFIG["base_url"])
try:
# 4. Get full schema context
print("Fetching full database schema...")
schema_context = get_full_schema(cursor, core_tables)
# 5. Get random samples and print them
print("Sampling data from 5 random tables...")
sampled_data = get_random_tables_and_samples(cursor, core_tables, num_tables=5, num_samples=5)
print(f"Sampled from tables: {list(sampled_data.keys())}")
print("\n--- Sampled Data ---")
print(json.dumps(sampled_data, indent=2, default=str))
print("---------------------\n")
# 6. Generate questions using the LLM
print("Generating questions with GPT-4o...")
generated_tasks = generate_questions(client, schema_context, sampled_data)
# 7. Execute SQL for all generated tasks
tasks_for_evaluation = []
if generated_tasks:
print("\nExecuting SQL for generated tasks...")
for task in generated_tasks:
if not isinstance(task, dict) or not all(k in task for k in ['sql', 'answer', 'question']):
print(f"Filtering task due to malformed structure or missing keys: {task}")
continue
try:
cursor.execute(task['sql'])
sql_result = cursor.fetchall()
task['sql_result'] = sql_result
tasks_for_evaluation.append(task)
except mysql.connector.Error as err:
print(f"Filtering task due to SQL error: {err}")
print(f" - Question: {task['question']}")
print(f" - SQL: {task['sql']}")
except Exception as e:
print(f"An unexpected error occurred during SQL execution for task {task}: {e}")
# 8. Semantic evaluation and answer refinement
validated_tasks = evaluate_and_refine_tasks(tasks_for_evaluation, client)
# 9. Print the final JSON output
if validated_tasks:
print("\n--- Final Validated Tasks ---")
print(json.dumps(validated_tasks, indent=2, default=str))
else:
print("Failed to generate any valid tasks after all validation steps.")
finally:
# 10. Close the database connection
if conn.is_connected():
cursor.close()
conn.close()
print("\nDatabase connection closed.")
if __name__ == "__main__":
main()

View File

@ -1,408 +0,0 @@
import os
import random
import json
import mysql.connector
import argparse
from openai import OpenAI
from dotenv import load_dotenv
# --- Configuration ---
load_dotenv()
MYSQL_CONFIG = {
"host": "localhost",
"port": "23306",
"user": "mcpuser",
"password": "StrongPass123!",
"database": "magentodb"
}
OPENAI_CONFIG = {
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": os.getenv("OPENAI_BASE_URL"),
"model": "gpt-4o"
}
# --- Prompt Template ---
# This is a carefully engineered prompt to guide the LLM's output.
PROMPT_TEMPLATE = """
You are an expert database analyst and a creative test case designer for e-commerce web applications.
Your goal is to generate realistic administrative tasks that can be solved by a Web Agent navigating an admin panel.
I will provide you with the following context:
1. **Full Database Schema**: A list of `CREATE TABLE` statements for the core tables of a Magento e-commerce platform.
2. **Sampled Data**: A JSON object containing 5 random rows of data from 5 randomly selected core tables. This data is REAL and should be used to inspire specific, answerable questions.
## Your Task
Based on the provided schema and sample data, create a JSON object containing a single key, "questions", which holds an array of up to 10 unique task objects.
### Requirements for Each Question:
- **Web Agent Solvable**: The task must represent a realistic action an administrator would perform in a web UI (e.g., "Find all orders for customer X", "Update the stock for product Y", "Approve a pending review").
- **Grounded in Data**: The questions should be specific, using names, IDs, or values from the provided **Sampled Data** to make them concrete.
- **Utilize Schema**: You can formulate questions that require joining tables, even if not all tables were sampled. The full schema is your guide.
### Output Format
The final output MUST be a single, valid JSON object. Do not include any other text, explanations, or markdown formatting like ```json.
The JSON object must have one key: "questions", containing a JSON array of task objects.
Each object in the array must contain exactly three keys: `question`, `answer`, and `sql`.
- **`question`**: (string) A natural language description of the task for a web agent.
- **`answer`**: (string, integer, float, or list) The precise and concise answer to the question, derived by running the SQL query against the database.
- **`sql`**: (string) The exact, runnable MySQL query that was used to find the answer.
### Output Format Example
```json
{{
"questions": [
{{
"question": "What is the email address for customer with ID 5?",
"answer": "customer5@example.com",
"sql": "SELECT email FROM customer_entity WHERE entity_id = 5;"
}},
{{
"question": "Find the total quantity of item with SKU 'ABC-123' in the cart.",
"answer": 3,
"sql": "SELECT SUM(qty) FROM quote_item WHERE sku = 'ABC-123';"
}}
]
}}
```
---
### Full Database Schema
{schema_context}
---
### Sampled Data
Here is the sample data from randomly selected tables. Use this to make your questions specific.
{sampled_data_str}
---
Now, generate the JSON object based on these instructions.
"""
# This is a new prompt to evaluate results and generate a corrected answer.
SEMANTIC_EVALUATION_PROMPT_TEMPLATE = """
You are a precise data analyst. Your task is to evaluate if a SQL query's result adequately answers a given natural language question. If it does, you must formulate a concise, natural-language answer.
I will provide you with a JSON object containing:
1. `question`: The original question asked.
2. `sql`: The SQL query that was executed.
3. `sql_result`: The actual data returned by executing the SQL query.
## Your Task
1. **Analyze**: Determine if the `sql_result` contains the necessary information to definitively answer the `question`.
2. **Respond**: Based on your analysis, generate a JSON object with one of two structures.
### Case 1: The question CAN be answered
If the `sql_result` provides a clear answer, respond with:
```json
{{
"can_answer": true,
"new_answer": "..."
}}
```
- `can_answer`: (boolean) Must be `true`.
- `new_answer`: (string, integer, float, or list) A concise, human-readable answer derived *only* from the `sql_result`. For example, if the result is `[(52.00,)]`, the answer can be "52.00" or 52.00.
### Case 2: The question CANNOT be answered
If the `sql_result` is empty, irrelevant, or insufficient to answer the question, respond with:
```json
{{
"can_answer": false,
"reason": "..."
}}
```
- `can_answer`: (boolean) Must be `false`.
- `reason`: (string) A brief explanation for why the question cannot be answered from the given data (e.g., "The query returned no results.", "The result contains internal IDs, not the requested customer names.").
---
### Evaluation Data
{task_data_json}
---
Now, provide your evaluation as a JSON object.
"""
def get_db_connection():
"""Establishes a connection to the MySQL database."""
try:
conn = mysql.connector.connect(**MYSQL_CONFIG)
return conn
except mysql.connector.Error as err:
print(f"Error connecting to MySQL: {err}")
return None
def get_full_schema(cursor, tables):
"""Fetches the CREATE TABLE statements for all core tables."""
schema_parts = []
for table_name in tables:
try:
cursor.execute(f"SHOW CREATE TABLE `{table_name}`")
result = cursor.fetchone()
if result:
schema_parts.append(result[1]) # result[1] is the CREATE TABLE statement
except mysql.connector.Error as err:
print(f"Warning: Could not get schema for table {table_name}: {err}")
return "\n\n".join(schema_parts)
def get_random_tables_and_samples(cursor, tables, num_tables=5, num_samples=5):
"""Selects random tables and samples random rows from them."""
selected_tables = random.sample(tables, num_tables)
sampled_data = {}
for table_name in selected_tables:
try:
# Use ORDER BY RAND() for random sampling. Can be slow on very large tables.
query = f"SELECT * FROM `{table_name}` ORDER BY RAND() LIMIT {num_samples}"
cursor.execute(query)
rows = cursor.fetchall()
if not rows:
sampled_data[table_name] = []
continue
columns = [desc[0] for desc in cursor.description]
# Convert rows (tuples) to a list of dictionaries
sampled_rows = []
for row in rows:
row_dict = {}
for i, col_value in enumerate(row):
# Handle bytes by decoding, fall back to string representation
if isinstance(col_value, bytes):
try:
row_dict[columns[i]] = col_value.decode('utf-8')
except UnicodeDecodeError:
row_dict[columns[i]] = str(col_value)
else:
row_dict[columns[i]] = col_value
sampled_rows.append(row_dict)
sampled_data[table_name] = sampled_rows
except mysql.connector.Error as err:
print(f"Warning: Could not sample data from table {table_name}: {err}")
sampled_data[table_name] = f"Error: {err}"
return sampled_data
def generate_questions(client, schema_context, sampled_data):
"""Generates questions by calling the OpenAI API."""
if not client:
raise ValueError("OpenAI client not provided.")
sampled_data_str = json.dumps(sampled_data, indent=2, default=str)
prompt = PROMPT_TEMPLATE.format(
schema_context=schema_context,
sampled_data_str=sampled_data_str
)
try:
response = client.chat.completions.create(
model=OPENAI_CONFIG["model"],
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output JSON."},
{"role": "user", "content": prompt}
],
temperature=0.7,
response_format={"type": "json_object"},
)
content = response.choices[0].message.content
data = json.loads(content)
# The prompt asks for {"questions": [...]}, so we extract the list.
if isinstance(data, dict) and "questions" in data and isinstance(data["questions"], list):
return data["questions"]
elif isinstance(data, list):
# Fallback in case the model returns a list directly
print("Warning: Model returned a raw list instead of an object with a 'questions' key.")
return data
else:
print(f"Warning: Failed to find a 'questions' list in the model's output. Got: {content}")
return None
except Exception as e:
print(f"Error calling OpenAI API or parsing JSON: {e}")
return None
def load_existing_tasks(filepath):
"""Loads tasks from a JSON file if it exists."""
if not os.path.exists(filepath):
return []
try:
with open(filepath, 'r') as f:
content = f.read()
if not content: # Handle empty file
return []
return json.loads(content)
except (json.JSONDecodeError, FileNotFoundError):
print(f"Warning: Could not read or parse {filepath}. Starting with an empty list.")
return []
def evaluate_and_refine_tasks(tasks, client):
"""
Uses an LLM to evaluate if a SQL result answers the question and refines the answer.
"""
if not tasks:
return []
final_validated_tasks = []
print("\nPerforming semantic evaluation and answer refinement with GPT-4o...")
for task in tasks:
# Prepare data for the prompt, excluding the original 'answer'
task_data_for_prompt = {
"question": task["question"],
"sql": task["sql"],
"sql_result": task["sql_result"]
}
task_data_json = json.dumps(task_data_for_prompt, indent=2, default=str)
prompt = SEMANTIC_EVALUATION_PROMPT_TEMPLATE.format(task_data_json=task_data_json)
try:
print(f" - Evaluating question: \"{task['question'][:80]}...\"")
response = client.chat.completions.create(
model=OPENAI_CONFIG["model"],
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output JSON."},
{"role": "user", "content": prompt}
],
temperature=0.0, # We want deterministic evaluation
response_format={"type": "json_object"},
)
content = response.choices[0].message.content
evaluation_result = json.loads(content)
if evaluation_result.get("can_answer") is True and "new_answer" in evaluation_result:
# Task is valid. Update the answer with the refined one from the LLM.
task['answer'] = evaluation_result['new_answer']
task['sql_execute_result'] = task.pop('sql_result')
final_validated_tasks.append(task)
print(f" - Evaluation PASSED. New answer: {json.dumps(task['answer'])}")
else:
reason = evaluation_result.get('reason', 'No reason provided.')
print(f" - Evaluation FAILED. Filtering task.")
print(f" - Reason: {reason}")
print(f" - Question: {task['question']}")
print(f" - Original Answer: {json.dumps(task['answer'], default=str)}")
print(f" - SQL: {task['sql']}")
sql_result_str = json.dumps(task['sql_result'], indent=2, default=str)
print(f" - SQL Result: {sql_result_str}")
except Exception as e:
print(f" - An error occurred during semantic evaluation for task, filtering it out: {e}")
print(f" - Question: {task.get('question', 'N/A')}")
print(f" - SQL: {task.get('sql', 'N/A')}")
return final_validated_tasks
def main():
"""Main function to run the script."""
parser = argparse.ArgumentParser(description="Generate and validate e-commerce admin tasks.")
parser.add_argument(
"--target-count",
type=int,
required=True,
help="The total number of questions to generate."
)
parser.add_argument(
"--output-file",
type=str,
default="generated_tasks.json",
help="The file to save the generated tasks to (in JSON format)."
)
args = parser.parse_args()
# Load existing tasks from the output file
all_tasks = load_existing_tasks(args.output_file)
print(f"Found {len(all_tasks)} existing valid tasks in '{args.output_file}'.")
# Connect to DB and set up client
conn = get_db_connection()
if not conn:
return
cursor = conn.cursor()
if not OPENAI_CONFIG["api_key"]:
print("Error: OPENAI_API_KEY environment variable not set.")
return
client = OpenAI(api_key=OPENAI_CONFIG["api_key"], base_url=OPENAI_CONFIG["base_url"])
try:
# Load core tables and schema once
try:
with open('core_tables.json', 'r') as f:
core_tables = json.load(f)
except FileNotFoundError:
print("Error: core_tables.json not found. Please create it.")
return
print("Fetching full database schema...")
schema_context = get_full_schema(cursor, core_tables)
# Start the generation loop
round_num = 1
while len(all_tasks) < args.target_count:
print(f"\n--- Starting Generation Round {round_num} ---")
print(f"Goal: {args.target_count} | Current: {len(all_tasks)} | Needed: {args.target_count - len(all_tasks)}")
# Get random samples for this round
print("Sampling data from 5 random tables...")
sampled_data = get_random_tables_and_samples(cursor, core_tables, num_tables=5, num_samples=5)
# Generate questions
print("Generating questions with GPT-4o...")
generated_tasks = generate_questions(client, schema_context, sampled_data)
# Execute SQL for generated tasks
tasks_for_evaluation = []
if generated_tasks:
print("\nExecuting SQL for generated tasks...")
for task in generated_tasks:
if not isinstance(task, dict) or not all(k in task for k in ['sql', 'answer', 'question']):
print(f"Filtering task due to malformed structure: {task}")
continue
try:
cursor.execute(task['sql'])
sql_result = cursor.fetchall()
task['sql_result'] = sql_result
tasks_for_evaluation.append(task)
except mysql.connector.Error as err:
print(f"Filtering task due to SQL error: {err} on SQL: {task['sql']}")
# Perform semantic evaluation and get validated tasks
validated_tasks = evaluate_and_refine_tasks(tasks_for_evaluation, client)
# Append new tasks and save to file
if validated_tasks:
all_tasks.extend(validated_tasks)
with open(args.output_file, 'w') as f:
json.dump(all_tasks, f, indent=2, default=str)
print("\n--- Round Summary ---")
print(f"Generated {len(validated_tasks)} new valid tasks in this round.")
print(f"Progress: {len(all_tasks)} / {args.target_count} tasks.")
else:
print("\n--- Round Summary ---")
print("No new valid tasks were generated in this round. Retrying...")
round_num += 1
finally:
# Close the database connection
if conn.is_connected():
cursor.close()
conn.close()
print("\nDatabase connection closed.")
print(f"\nTarget of {args.target_count} tasks reached. Final output saved to {args.output_file}.")
if __name__ == "__main__":
main()

View File

@ -1,437 +0,0 @@
import os
import random
import json
import mysql.connector
import argparse
from openai import OpenAI
from dotenv import load_dotenv
# --- Configuration ---
load_dotenv()
MYSQL_CONFIG = {
"host": "localhost",
"port": "23306",
"user": "mcpuser",
"password": "StrongPass123!",
"database": "magentodb"
}
OPENAI_CONFIG = {
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": os.getenv("OPENAI_BASE_URL"),
"model": "gpt-4o"
}
# --- Prompt Template ---
# This is a carefully engineered prompt to guide the LLM's output.
PROMPT_TEMPLATE = """
You are an expert database analyst and a creative test case designer for e-commerce web applications.
Your goal is to generate realistic administrative tasks that can be solved by a Web Agent navigating an admin panel.
I will provide you with the following context:
1. **Full Database Schema**: A list of `CREATE TABLE` statements for the core tables of a Magento e-commerce platform.
2. **Sampled Data**: A JSON object containing 5 random rows of data from 5 randomly selected core tables. This data is REAL and should be used to inspire specific, answerable questions.
## Your Task
Based on the provided schema and sample data, create a JSON object containing a single key, "questions", which holds an array of up to 10 unique task objects.
### Requirements for Each Question:
- **Web Agent Solvable**: The task must represent a realistic action an administrator would perform in a web UI (e.g., "Find all orders for customer X", "Update the stock for product Y", "Approve a pending review").
- **Grounded in Data**: The questions should be specific, using names, IDs, or values from the provided **Sampled Data** to make them concrete.
- **Utilize Schema**: You can formulate questions that require joining tables, even if not all tables were sampled. The full schema is your guide.
### Output Format
The final output MUST be a single, valid JSON object. Do not include any other text, explanations, or markdown formatting like ```json.
The JSON object must have one key: "questions", containing a JSON array of task objects.
Each object in the array must contain exactly three keys: `question`, `answer`, and `sql`.
- **`question`**: (string) A natural language description of the task for a web agent.
- **`answer`**: (string, integer, float, or list) The precise and concise answer to the question, derived by running the SQL query against the database.
- **`sql`**: (string) The exact, runnable MySQL query that was used to find the answer.
### Output Format Example
```json
{{
"questions": [
{{
"question": "What is the email address for customer with ID 5?",
"answer": "customer5@example.com",
"sql": "SELECT email FROM customer_entity WHERE entity_id = 5;"
}},
{{
"question": "Find the total quantity of item with SKU 'ABC-123' in the cart.",
"answer": 3,
"sql": "SELECT SUM(qty) FROM quote_item WHERE sku = 'ABC-123';"
}}
]
}}
```
---
### Full Database Schema
{schema_context}
---
### Sampled Data
Here is the sample data from randomly selected tables. Use this to make your questions specific.
{sampled_data_str}
---
Now, generate the JSON object based on these instructions.
"""
# This is a new prompt to evaluate results and generate a corrected answer.
SEMANTIC_EVALUATION_PROMPT_TEMPLATE = """
You are a precise data analyst. Your task is to evaluate if a SQL query's result adequately answers a given natural language question. You will then either refine the answer, or completely rephrase the question if the result set is large.
I will provide you with a JSON object containing:
1. `question`: The original question asked.
2. `sql`: The SQL query that was executed.
3. `sql_result`: The actual data returned by executing the SQL query.
4. `row_count`: The number of rows in `sql_result`.
## Your Task
Analyze the inputs and respond with a JSON object. You have three cases. The `new_answer` field MUST always be an array of strings.
### Case 1: Large Result Set (Question Transformation)
If `row_count` is greater than 10 AND the original `question` does NOT already ask for a count (e.g., it is not phrased like "How many..."), you must transform the question.
Respond with:
```json
{{
"can_answer": true,
"new_question": "How many items were found?",
"new_answer": ["42"]
}}
```
- `can_answer`: (boolean) Must be `true`.
- `new_question`: (string) A rephrased question that asks for the quantity of items. For example, if the original question was "List all products", the new question should be "How many products were found?".
- `new_answer`: (array of strings) An array containing the `row_count` as a single string element.
### Case 2: Standard Answer (No Transformation)
If Case 1 does not apply, but the `sql_result` still provides a clear answer to the original `question`, respond with:
```json
{{
"can_answer": true,
"new_answer": ["value1", "value2", ...]
}}
```
- `can_answer`: (boolean) Must be `true`.
- `new_answer`: (array of strings) An array containing all the essential parts of the answer extracted from `sql_result`. Every value from the result set that contributes to the answer should be included as a string in the array. This ensures answer completeness.
- **Example 1**: If `question` is "What is the status of order 123?" and `sql_result` is `[["processing"]]`, `new_answer` should be `["processing"]`.
- **Example 2**: If `question` is "List emails for pending customers" and `sql_result` is `[["test@a.com"], ["test@b.com"]]`, `new_answer` should be `["test@a.com", "test@b.com"]`.
- **Example 3**: If `question` is "Get product name and price for SKU 'XYZ'" and `sql_result` is `[["My Product", 19.99]]`, `new_answer` should be `["My Product", "19.99"]`.
### Case 3: The question CANNOT be answered
If the `sql_result` is empty, irrelevant, or insufficient to answer the question, respond with:
```json
{{
"can_answer": false,
"reason": "..."
}}
```
- `can_answer`: (boolean) Must be `false`.
- `reason`: (string) A brief explanation for why the question cannot be answered.
---
### Evaluation Data
{task_data_json}
---
Now, provide your evaluation as a JSON object.
"""
def get_db_connection():
"""Establishes a connection to the MySQL database."""
try:
conn = mysql.connector.connect(**MYSQL_CONFIG)
return conn
except mysql.connector.Error as err:
print(f"Error connecting to MySQL: {err}")
return None
def get_full_schema(cursor, tables):
"""Fetches the CREATE TABLE statements for all core tables."""
schema_parts = []
for table_name in tables:
try:
cursor.execute(f"SHOW CREATE TABLE `{table_name}`")
result = cursor.fetchone()
if result:
schema_parts.append(result[1]) # result[1] is the CREATE TABLE statement
except mysql.connector.Error as err:
print(f"Warning: Could not get schema for table {table_name}: {err}")
return "\n\n".join(schema_parts)
def get_random_tables_and_samples(cursor, tables, num_tables=5, num_samples=5):
"""Selects random tables and samples random rows from them."""
selected_tables = random.sample(tables, num_tables)
sampled_data = {}
for table_name in selected_tables:
try:
# Use ORDER BY RAND() for random sampling. Can be slow on very large tables.
query = f"SELECT * FROM `{table_name}` ORDER BY RAND() LIMIT {num_samples}"
cursor.execute(query)
rows = cursor.fetchall()
if not rows:
sampled_data[table_name] = []
continue
columns = [desc[0] for desc in cursor.description]
# Convert rows (tuples) to a list of dictionaries
sampled_rows = []
for row in rows:
row_dict = {}
for i, col_value in enumerate(row):
# Handle bytes by decoding, fall back to string representation
if isinstance(col_value, bytes):
try:
row_dict[columns[i]] = col_value.decode('utf-8')
except UnicodeDecodeError:
row_dict[columns[i]] = str(col_value)
else:
row_dict[columns[i]] = col_value
sampled_rows.append(row_dict)
sampled_data[table_name] = sampled_rows
except mysql.connector.Error as err:
print(f"Warning: Could not sample data from table {table_name}: {err}")
sampled_data[table_name] = f"Error: {err}"
return sampled_data
def generate_questions(client, schema_context, sampled_data):
"""Generates questions by calling the OpenAI API."""
if not client:
raise ValueError("OpenAI client not provided.")
sampled_data_str = json.dumps(sampled_data, indent=2, default=str)
prompt = PROMPT_TEMPLATE.format(
schema_context=schema_context,
sampled_data_str=sampled_data_str
)
try:
response = client.chat.completions.create(
model=OPENAI_CONFIG["model"],
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output JSON."},
{"role": "user", "content": prompt}
],
temperature=0.7,
response_format={"type": "json_object"},
)
content = response.choices[0].message.content
data = json.loads(content)
# The prompt asks for {"questions": [...]}, so we extract the list.
if isinstance(data, dict) and "questions" in data and isinstance(data["questions"], list):
return data["questions"]
elif isinstance(data, list):
# Fallback in case the model returns a list directly
print("Warning: Model returned a raw list instead of an object with a 'questions' key.")
return data
else:
print(f"Warning: Failed to find a 'questions' list in the model's output. Got: {content}")
return None
except Exception as e:
print(f"Error calling OpenAI API or parsing JSON: {e}")
return None
def load_existing_tasks(filepath):
"""Loads tasks from a JSON file if it exists."""
if not os.path.exists(filepath):
return []
try:
with open(filepath, 'r') as f:
content = f.read()
if not content: # Handle empty file
return []
return json.loads(content)
except (json.JSONDecodeError, FileNotFoundError):
print(f"Warning: Could not read or parse {filepath}. Starting with an empty list.")
return []
def evaluate_and_refine_tasks(tasks, client):
"""
Uses an LLM to evaluate if a SQL result answers the question and refines the answer.
"""
if not tasks:
return []
final_validated_tasks = []
print("\nPerforming semantic evaluation and answer refinement with GPT-4o...")
for task in tasks:
# Prepare data for the prompt, excluding the original 'answer'
task_data_for_prompt = {
"question": task["question"],
"sql": task["sql"],
"sql_result": task["sql_result"],
"row_count": task["row_count"]
}
task_data_json = json.dumps(task_data_for_prompt, indent=2, default=str)
prompt = SEMANTIC_EVALUATION_PROMPT_TEMPLATE.format(task_data_json=task_data_json)
try:
print(f" - Evaluating question: \"{task['question'][:80]}...\"")
response = client.chat.completions.create(
model=OPENAI_CONFIG["model"],
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output JSON."},
{"role": "user", "content": prompt}
],
temperature=0.0, # We want deterministic evaluation
response_format={"type": "json_object"},
)
content = response.choices[0].message.content
evaluation_result = json.loads(content)
if evaluation_result.get("can_answer") is True and "new_answer" in evaluation_result:
# Task is valid. Update the answer with the refined one from the LLM.
task['answer'] = evaluation_result['new_answer']
# If the LLM provides a new question, update it.
if 'new_question' in evaluation_result:
task['question'] = evaluation_result['new_question']
print(f" - Question was rephrased: \"{task['question']}\"")
task['sql_execute_result'] = task.pop('sql_result')
task.pop('row_count', None) # Clean up temp key
final_validated_tasks.append(task)
print(f" - Evaluation PASSED. New answer: {json.dumps(task['answer'])}")
else:
reason = evaluation_result.get('reason', 'No reason provided.')
print(f" - Evaluation FAILED. Filtering task.")
print(f" - Reason: {reason}")
print(f" - Question: {task['question']}")
print(f" - SQL: {task['sql']}")
sql_result_str = json.dumps(task['sql_result'], indent=2, default=str)
print(f" - SQL Result: {sql_result_str}")
except Exception as e:
print(f" - An error occurred during semantic evaluation for task, filtering it out: {e}")
print(f" - Question: {task.get('question', 'N/A')}")
print(f" - SQL: {task.get('sql', 'N/A')}")
return final_validated_tasks
def main():
"""Main function to run the script."""
parser = argparse.ArgumentParser(description="Generate and validate e-commerce admin tasks.")
parser.add_argument(
"--target-count",
type=int,
required=True,
help="The total number of questions to generate."
)
parser.add_argument(
"--output-file",
type=str,
default="generated_tasks.json",
help="The file to save the generated tasks to (in JSON format)."
)
args = parser.parse_args()
# Load existing tasks from the output file
all_tasks = load_existing_tasks(args.output_file)
print(f"Found {len(all_tasks)} existing valid tasks in '{args.output_file}'.")
# Connect to DB and set up client
conn = get_db_connection()
if not conn:
return
cursor = conn.cursor()
if not OPENAI_CONFIG["api_key"]:
print("Error: OPENAI_API_KEY environment variable not set.")
return
client = OpenAI(api_key=OPENAI_CONFIG["api_key"], base_url=OPENAI_CONFIG["base_url"])
try:
# Load core tables and schema once
try:
with open('core_tables.json', 'r') as f:
core_tables = json.load(f)
except FileNotFoundError:
print("Error: core_tables.json not found. Please create it.")
return
print("Fetching full database schema...")
schema_context = get_full_schema(cursor, core_tables)
# Start the generation loop
round_num = 1
while len(all_tasks) < args.target_count:
print(f"\n--- Starting Generation Round {round_num} ---")
print(f"Goal: {args.target_count} | Current: {len(all_tasks)} | Needed: {args.target_count - len(all_tasks)}")
# Get random samples for this round
print("Sampling data from 5 random tables...")
sampled_data = get_random_tables_and_samples(cursor, core_tables, num_tables=5, num_samples=5)
# Generate questions
print("Generating questions with GPT-4o...")
generated_tasks = generate_questions(client, schema_context, sampled_data)
# Execute SQL for generated tasks
tasks_for_evaluation = []
if generated_tasks:
print("\nExecuting SQL for generated tasks...")
for task in generated_tasks:
if not isinstance(task, dict) or not all(k in task for k in ['sql', 'answer', 'question']):
print(f"Filtering task due to malformed structure: {task}")
continue
try:
cursor.execute(task['sql'])
sql_result = cursor.fetchall()
# Create a new dict for evaluation, excluding the original 'answer'.
tasks_for_evaluation.append({
'question': task['question'],
'sql': task['sql'],
'sql_result': sql_result,
'row_count': len(sql_result)
})
except mysql.connector.Error as err:
print(f"Filtering task due to SQL error: {err} on SQL: {task['sql']}")
# Perform semantic evaluation and get validated tasks
validated_tasks = evaluate_and_refine_tasks(tasks_for_evaluation, client)
# Append new tasks and save to file
if validated_tasks:
all_tasks.extend(validated_tasks)
with open(args.output_file, 'w') as f:
json.dump(all_tasks, f, indent=2, default=str)
print("\n--- Round Summary ---")
print(f"Generated {len(validated_tasks)} new valid tasks in this round.")
print(f"Progress: {len(all_tasks)} / {args.target_count} tasks.")
else:
print("\n--- Round Summary ---")
print("No new valid tasks were generated in this round. Retrying...")
round_num += 1
finally:
# Close the database connection
if conn.is_connected():
cursor.close()
conn.close()
print("\nDatabase connection closed.")
print(f"\nTarget of {args.target_count} tasks reached. Final output saved to {args.output_file}.")
if __name__ == "__main__":
main()

View File

@ -1,246 +0,0 @@
import os
import random
import json
import mysql.connector
from openai import OpenAI
from dotenv import load_dotenv
# --- Configuration ---
load_dotenv()
MYSQL_CONFIG = {
"host": "localhost",
"port": "23306",
"user": "mcpuser",
"password": "StrongPass123!",
"database": "magentodb"
}
OPENAI_CONFIG = {
"api_key": os.getenv("OPENAI_API_KEY"),
"base_url": os.getenv("OPENAI_BASE_URL"),
"model": "gpt-4o"
}
# --- Prompt Template ---
# This is a carefully engineered prompt to guide the LLM's output.
PROMPT_TEMPLATE = """
You are an expert database analyst and a creative test case designer for e-commerce web applications.
Your goal is to generate realistic administrative tasks that can be solved by a Web Agent navigating an admin panel.
I will provide you with the following context:
1. **Full Database Schema**: A list of `CREATE TABLE` statements for the core tables of a Magento e-commerce platform.
2. **Sampled Data**: A JSON object containing 5 random rows of data from 5 randomly selected core tables. This data is REAL and should be used to inspire specific, answerable questions.
## Your Task
Based on the provided schema and sample data, create a JSON array of up to 10 unique questions.
### Requirements for Each Question:
- **Web Agent Solvable**: The task must represent a realistic action an administrator would perform in a web UI (e.g., "Find all orders for customer X", "Update the stock for product Y", "Approve a pending review").
- **Grounded in Data**: The questions should be specific, using names, IDs, or values from the provided **Sampled Data** to make them concrete.
- **Utilize Schema**: You can formulate questions that require joining tables, even if not all tables were sampled. The full schema is your guide.
### Output Format
The final output MUST be a single, valid JSON array of objects. Do not include any other text, explanations, or markdown formatting like ```json.
Each object in the array must contain exactly three keys: `question`, `answer`, and `sql`.
- **`question`**: (string) A natural language description of the task for a web agent.
- **`answer`**: (string, integer, float, or list) The precise and concise answer to the question, derived by running the SQL query against the database.
- **`sql`**: (string) The exact, runnable MySQL query that was used to find the answer.
---
### Full Database Schema
{schema_context}
---
### Sampled Data
Here is the sample data from randomly selected tables. Use this to make your questions specific.
{sampled_data_str}
---
Now, generate the JSON array based on these instructions.
"""
def get_db_connection():
"""Establishes a connection to the MySQL database."""
try:
conn = mysql.connector.connect(**MYSQL_CONFIG)
return conn
except mysql.connector.Error as err:
print(f"Error connecting to MySQL: {err}")
return None
def get_full_schema(cursor, tables):
"""Fetches the CREATE TABLE statements for all core tables."""
schema_parts = []
for table_name in tables:
try:
cursor.execute(f"SHOW CREATE TABLE `{table_name}`")
result = cursor.fetchone()
if result:
schema_parts.append(result[1]) # result[1] is the CREATE TABLE statement
except mysql.connector.Error as err:
print(f"Warning: Could not get schema for table {table_name}: {err}")
return "\n\n".join(schema_parts)
def get_random_tables_and_samples(cursor, tables, num_tables=5, num_samples=5):
"""Selects random tables and samples random rows from them."""
selected_tables = random.sample(tables, num_tables)
sampled_data = {}
for table_name in selected_tables:
try:
# Use ORDER BY RAND() for random sampling. Can be slow on very large tables.
query = f"SELECT * FROM `{table_name}` ORDER BY RAND() LIMIT {num_samples}"
cursor.execute(query)
rows = cursor.fetchall()
if not rows:
sampled_data[table_name] = []
continue
columns = [desc[0] for desc in cursor.description]
# Convert rows (tuples) to a list of dictionaries
sampled_rows = []
for row in rows:
row_dict = {}
for i, col_value in enumerate(row):
# Handle bytes by decoding, fall back to string representation
if isinstance(col_value, bytes):
try:
row_dict[columns[i]] = col_value.decode('utf-8')
except UnicodeDecodeError:
row_dict[columns[i]] = str(col_value)
else:
row_dict[columns[i]] = col_value
sampled_rows.append(row_dict)
sampled_data[table_name] = sampled_rows
except mysql.connector.Error as err:
print(f"Warning: Could not sample data from table {table_name}: {err}")
sampled_data[table_name] = f"Error: {err}"
return sampled_data
def generate_questions(schema_context, sampled_data):
"""Generates questions by calling the OpenAI API."""
if not OPENAI_CONFIG["api_key"]:
raise ValueError("OPENAI_API_KEY environment variable not set.")
client = OpenAI(api_key=OPENAI_CONFIG["api_key"], base_url=OPENAI_CONFIG["base_url"])
sampled_data_str = json.dumps(sampled_data, indent=2, default=str)
prompt = PROMPT_TEMPLATE.format(
schema_context=schema_context,
sampled_data_str=sampled_data_str
)
try:
response = client.chat.completions.create(
model=OPENAI_CONFIG["model"],
messages=[
{"role": "system", "content": "You are a helpful assistant designed to output JSON."},
{"role": "user", "content": prompt}
],
temperature=0.7,
)
content = response.choices[0].message.content
return json.loads(content)
except Exception as e:
print(f"Error calling OpenAI API: {e}")
return None
def main():
"""Main function to run the script."""
# 1. Load the list of core tables
try:
with open('core_tables.json', 'r') as f:
core_tables = json.load(f)
except FileNotFoundError:
print("Error: core_tables.json not found. Please create it.")
return
# 2. Connect to the database
conn = get_db_connection()
if not conn:
return
cursor = conn.cursor()
try:
# 3. Get full schema context
print("Fetching full database schema...")
schema_context = get_full_schema(cursor, core_tables)
# 4. Get random samples and print them
print("Sampling data from 5 random tables...")
sampled_data = get_random_tables_and_samples(cursor, core_tables, num_tables=5, num_samples=5)
print(f"Sampled from tables: {list(sampled_data.keys())}")
print("\n--- Sampled Data ---")
print(json.dumps(sampled_data, indent=2, default=str))
print("---------------------\n")
# 5. Generate questions using the LLM
print("Generating questions with GPT-4o...")
generated_tasks = generate_questions(schema_context, sampled_data)
# 6. Validate and filter the generated tasks
validated_tasks = []
if generated_tasks:
print("\nValidating generated tasks...")
for task in generated_tasks:
# Basic validation for task structure
if not isinstance(task, dict) or not all(k in task for k in ['sql', 'answer', 'question']):
print(f"Filtering task due to malformed structure or missing keys: {task}")
continue
try:
# Execute the SQL query from the task
cursor.execute(task['sql'])
sql_result = cursor.fetchall()
# Convert both answer and result to string for flexible substring matching
answer_str = str(task['answer'])
result_str = str(sql_result)
# If the answer exists in the result, the task is valid
if answer_str in result_str:
validated_tasks.append(task)
else:
# Log tasks that are filtered because the answer doesn't match
print(f"Filtering task: Answer '{answer_str}' not found in SQL result.")
print(f" - Question: {task['question']}")
print(f" - SQL: {task['sql']}")
# Showing a snippet of a large result is helpful for debugging
print(f" - Result: {result_str[:250]}...")
except mysql.connector.Error as err:
# Log tasks that are filtered due to SQL errors
print(f"Filtering task due to SQL error: {err}")
print(f" - Question: {task['question']}")
print(f" - SQL: {task['sql']}")
except Exception as e:
print(f"An unexpected error occurred during validation for task {task}: {e}")
# 7. Print the final JSON output
if validated_tasks:
print("\n--- Generated and Validated Tasks ---")
print(json.dumps(validated_tasks, indent=2))
else:
print("Failed to generate any valid tasks.")
finally:
# 8. Close the database connection
if conn.is_connected():
cursor.close()
conn.close()
print("\nDatabase connection closed.")
if __name__ == "__main__":
main()

View File

@ -9,8 +9,10 @@ from dotenv import load_dotenv
# --- Configuration ---
load_dotenv()
server_address = "localhost"
MYSQL_CONFIG = {
"host": "localhost",
"host": server_address,
"port": "23306",
"user": "mcpuser",
"password": "StrongPass123!",

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,3 @@
mysql-connector==2.2.9
openai
dotenv

1088
random_sample/tes1.json Normal file

File diff suppressed because it is too large Load Diff

282
random_sample/test1.json Normal file
View File

@ -0,0 +1,282 @@
[
{
"question": "What is the total income amount for orders completed on February 14, 2022, in store 1?",
"sql": "SELECT total_income_amount FROM sales_order_aggregated_created WHERE period = '2022-02-14' AND store_id = 1 AND order_status = 'complete';",
"answer": [
"240.0000"
],
"sql_execute_result": [
[
"240.0000"
]
]
},
{
"question": "Find the email address for the shipping address with entity ID 197.",
"sql": "SELECT email FROM sales_order_address WHERE entity_id = 197;",
"answer": [
"janesmith456@yahoo.com"
],
"sql_execute_result": [
[
"janesmith456@yahoo.com"
]
]
},
{
"question": "What is the name of the product with ID 16 that was a bestseller in March 2023?",
"sql": "SELECT product_name FROM sales_bestsellers_aggregated_monthly WHERE product_id = 16 AND period = '2023-03-01';",
"answer": [
"Dual Handle Cardio Ball"
],
"sql_execute_result": [
[
"Dual Handle Cardio Ball"
],
[
"Dual Handle Cardio Ball"
]
]
},
{
"question": "What is the value associated with the attribute option ID 80?",
"sql": "SELECT value FROM eav_attribute_option_value WHERE option_id = 80;",
"answer": [
"Men"
],
"sql_execute_result": [
[
"Men"
]
]
},
{
"question": "Find the percentage rating for the review with ID 219.",
"sql": "SELECT percent FROM rating_option_vote WHERE review_id = 219;",
"answer": [
"100"
],
"sql_execute_result": [
[
100
]
]
},
{
"question": "What is the total shipping amount for orders completed on July 1, 2022, in store 0?",
"sql": "SELECT total_shipping_amount FROM sales_order_aggregated_created WHERE period = '2022-07-01' AND store_id = 0 AND order_status = 'complete';",
"answer": [
"15.0000"
],
"sql_execute_result": [
[
"15.0000"
]
]
},
{
"question": "What is the product price for the 'Zoe Tank-S-Yellow' that was a bestseller in January 2023?",
"sql": "SELECT product_price FROM sales_bestsellers_aggregated_monthly WHERE product_name = 'Zoe Tank-S-Yellow' AND period = '2023-01-01';",
"answer": [
"29.0000"
],
"sql_execute_result": [
[
"29.0000"
],
[
"29.0000"
]
]
},
{
"question": "Find the total quantity ordered for orders that were canceled on February 24, 2023, in store 1.",
"sql": "SELECT total_qty_ordered FROM sales_order_aggregated_created WHERE period = '2023-02-24' AND store_id = 1 AND order_status = 'canceled';",
"answer": [
"5.0000"
],
"sql_execute_result": [
[
"5.0000"
]
]
},
{
"question": "What is the region associated with the sales order address with entity ID 228?",
"sql": "SELECT region FROM sales_order_address WHERE entity_id = 228;",
"answer": [
"Massachusetts"
],
"sql_execute_result": [
[
"Massachusetts"
]
]
},
{
"question": "Find the rating position for the product 'Sinbad Fitness Tank-M-Blue' in October 2022.",
"sql": "SELECT rating_pos FROM sales_bestsellers_aggregated_monthly WHERE product_name = 'Sinbad Fitness Tank-M-Blue' AND period = '2022-10-01';",
"answer": [
"5",
"2"
],
"sql_execute_result": [
[
5
],
[
2
]
]
},
{
"question": "What is the ISO-3 code for the country with ISO-2 code 'VC'?",
"sql": "SELECT iso3_code FROM directory_country WHERE iso2_code = 'VC';",
"answer": [
"VCT"
],
"sql_execute_result": [
[
"VCT"
]
]
},
{
"question": "How many orders were completed on 2022-01-17 in the default store?",
"sql": "SELECT orders_count FROM sales_order_aggregated_created WHERE period = '2022-01-17' AND store_id = 0 AND order_status = 'complete';",
"answer": [
"2"
],
"sql_execute_result": [
[
2
]
]
},
{
"question": "Find the total quantity ordered for the product 'Gobi HeatTec\u00ae Tee-XS-Orange' in April 2023.",
"sql": "SELECT qty_ordered FROM sales_bestsellers_aggregated_monthly WHERE product_name = 'Gobi HeatTec&reg; Tee-XS-Orange' AND period = '2023-04-01';",
"answer": [
"4.0000"
],
"sql_execute_result": [
[
"2.0000"
],
[
"2.0000"
]
]
},
{
"question": "What is the product price for 'Cora Parachute Pant-29-Blue' in April 2023?",
"sql": "SELECT product_price FROM sales_bestsellers_aggregated_monthly WHERE product_name = 'Cora Parachute Pant-29-Blue' AND period = '2023-04-01';",
"answer": [
"60.0000"
],
"sql_execute_result": [
[
"60.0000"
],
[
"60.0000"
]
]
},
{
"question": "List all countries that have an ISO-2 code starting with 'F'.",
"sql": "SELECT country_id FROM directory_country WHERE iso2_code LIKE 'F%';",
"answer": [
"FI",
"FJ",
"FK",
"FM",
"FO",
"FR"
],
"sql_execute_result": [
[
"FI"
],
[
"FJ"
],
[
"FK"
],
[
"FM"
],
[
"FO"
],
[
"FR"
]
]
},
{
"question": "What is the rating position of 'Lando Gym Jacket-XS-Green' in May 2022?",
"sql": "SELECT rating_pos FROM sales_bestsellers_aggregated_monthly WHERE product_name = 'Lando Gym Jacket-XS-Green' AND period = '2022-05-01';",
"answer": [
"5",
"18"
],
"sql_execute_result": [
[
5
],
[
18
]
]
},
{
"question": "How many products have a value of '2' for attribute ID 136 in the default store?",
"sql": "SELECT COUNT(*) FROM catalog_product_entity_int WHERE attribute_id = 136 AND store_id = 0 AND value = 2;",
"answer": [
"2038"
],
"sql_execute_result": [
[
2038
]
]
},
{
"question": "Find the period for the order with ID 1003.",
"sql": "SELECT period FROM sales_order_aggregated_created WHERE id = 1003;",
"answer": [
"2023-01-13"
],
"sql_execute_result": [
[
"2023-01-13"
]
]
},
{
"question": "What is the total income amount for orders on 2022-09-23 in store ID 1?",
"sql": "SELECT total_income_amount FROM sales_order_aggregated_created WHERE period = '2022-09-23' AND store_id = 1;",
"answer": [
"210.0000"
],
"sql_execute_result": [
[
"210.0000"
]
]
},
{
"question": "Find the sequence value for the latest shipment entry.",
"sql": "SELECT sequence_value FROM sequence_shipment_1 ORDER BY sequence_value DESC LIMIT 1;",
"answer": [
"3"
],
"sql_execute_result": [
[
3
]
]
}
]

View File

@ -1,5 +1,5 @@
# 用于将网站的mysql端口转发到本地保证稳定性
autossh -M 0 -f -N -o "ServerAliveInterval 30" \
-o "ServerAliveCountMax 3" \
-L 23306:localhost:23306 yuyr@g14_jump2
-L 23306:localhost:23306 yuyr@g14