890 lines
52 KiB
Python
890 lines
52 KiB
Python
import openai
|
|
import mysql.connector
|
|
from mysql.connector import Error
|
|
import json
|
|
import logging
|
|
from datetime import datetime
|
|
import time
|
|
import decimal # For handling Decimal from DB
|
|
import os
|
|
from dotenv import load_dotenv
|
|
import re # Added for regex
|
|
|
|
load_dotenv()
|
|
|
|
# --- LLM Configuration ---
|
|
# It's best to set API keys as environment variables or use a secrets manager
|
|
OPENAI_API_KEY = os.environ.get("OPENAI_API_KEY", "YOUR_OPENAI_API_KEY_FALLBACK")
|
|
OPENAI_BASE_URL = os.environ.get("OPENAI_BASE_URL", "https://api.openai.com/v1") # Default OpenAI URL
|
|
# LLM_MODEL_GENERATION = "gpt-4o" # Example, ensure it's correctly set
|
|
# LLM_MODEL_VALIDATION = "gpt-4o" # Example
|
|
LLM_MODEL_GENERATION = os.environ.get("LLM_MODEL_GENERATION", "gpt-4o")
|
|
LLM_MODEL_VALIDATION = os.environ.get("LLM_MODEL_VALIDATION", "gpt-4o")
|
|
|
|
# Configure logging
|
|
log_file_name = f'magento_main_pipeline_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log'
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format='%(asctime)s - %(levelname)s - %(module)s - %(funcName)s - %(message)s',
|
|
handlers=[
|
|
logging.FileHandler(log_file_name),
|
|
logging.StreamHandler()
|
|
]
|
|
)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# --- Database Configuration ---
|
|
DB_HOST = "localhost"
|
|
DB_USER = "root"
|
|
DB_PORT = 23306 # Make sure this is an integer
|
|
DB_PASS = "1234567890"
|
|
DB_NAME = "magentodb"
|
|
|
|
# --- Magento Schema (loaded in main_workflow) ---
|
|
MAGENTO_SCHEMA_CONTENT = ""
|
|
TABLE_SAMPLES_CACHE_FILE = "table_samples_cache.txt" # Cache file for table samples
|
|
TABLE_SAMPLES_CONTENT = "" # To store sample rows
|
|
|
|
# --- System Prompt (Loaded in main_workflow) ---
|
|
SYSTEM_PROMPT_TEMPLATE = ""
|
|
|
|
|
|
def _clean_llm_json_response(response_content: str) -> str:
|
|
"""Strips markdown code fences from LLM JSON responses."""
|
|
clean_response = response_content.strip()
|
|
if clean_response.startswith("```json"):
|
|
clean_response = clean_response[7:-3].strip()
|
|
elif clean_response.startswith("```"):
|
|
clean_response = clean_response[3:-3].strip()
|
|
return clean_response
|
|
|
|
def _clean_llm_python_code_response(response_content: str) -> str:
|
|
"""Strips markdown code fences from LLM Python code responses."""
|
|
clean_code = response_content.strip()
|
|
if clean_code.startswith("```python"):
|
|
clean_code = clean_code[10:-3].strip() # Handles ```python\n ... ```
|
|
elif clean_code.startswith("```"):
|
|
clean_code = clean_code[3:-3].strip()
|
|
return clean_code
|
|
|
|
|
|
def get_table_names_from_schema(schema_content):
|
|
"""Extracts table names from schema DDL using regex."""
|
|
# Regex to find "CREATE TABLE `table_name`" or "CREATE TABLE table_name"
|
|
# It captures the table name, optionally enclosed in backticks.
|
|
table_names = re.findall(r"CREATE TABLE(?: IF NOT EXISTS)?\s+`?(\w+)`?", schema_content, re.IGNORECASE)
|
|
logger.info(f"Extracted {len(table_names)} table names from schema.")
|
|
logger.debug(f"Table names: {table_names}")
|
|
return list(set(table_names)) # Return unique table names
|
|
|
|
def _fetch_and_format_table_samples(table_names, db_conn, cache_file_path):
|
|
"""
|
|
Fetches top 5 rows for each table, formats them, and saves to a cache file.
|
|
Returns the formatted string of all table samples.
|
|
"""
|
|
all_samples_str = ""
|
|
if not db_conn or not db_conn.is_connected():
|
|
logger.warning("Database connection not available. Cannot fetch fresh table samples.")
|
|
return ""
|
|
|
|
logger.info(f"Fetching top 5 rows for {len(table_names)} tables...")
|
|
for table_name in table_names:
|
|
try:
|
|
cursor = db_conn.cursor(dictionary=True)
|
|
query = f"SELECT * FROM `{table_name}` LIMIT 5" # Use backticks for table names
|
|
logger.debug(f"Executing sample query for {table_name}: {query}")
|
|
cursor.execute(query)
|
|
rows = cursor.fetchall()
|
|
|
|
current_table_sample_str = f"\n--- Sample rows for table: {table_name} ---\n"
|
|
if rows:
|
|
headers = ", ".join(rows[0].keys())
|
|
current_table_sample_str += headers + "\n"
|
|
for row in rows:
|
|
# Convert all values to string, handling None
|
|
values = ", ".join([str(v) if v is not None else "NULL" for v in row.values()])
|
|
current_table_sample_str += values + "\n"
|
|
else:
|
|
current_table_sample_str += "(No rows found or table is empty)\n"
|
|
|
|
all_samples_str += current_table_sample_str
|
|
cursor.close()
|
|
except Error as e:
|
|
logger.error(f"Error fetching samples for table {table_name}: {e}")
|
|
all_samples_str += f"\n--- Error fetching samples for table: {table_name}: {e} ---\n"
|
|
except Exception as ex: # Catch any other unexpected errors
|
|
logger.error(f"Unexpected error fetching samples for table {table_name}: {ex}")
|
|
all_samples_str += f"\n--- Unexpected error for table: {table_name}: {ex} ---\n"
|
|
|
|
|
|
try:
|
|
with open(cache_file_path, "w", encoding="utf-8") as f:
|
|
f.write(all_samples_str)
|
|
logger.info(f"Table samples cached successfully to {cache_file_path}")
|
|
except IOError as e:
|
|
logger.error(f"Failed to write table samples to cache file {cache_file_path}: {e}")
|
|
|
|
return all_samples_str
|
|
|
|
def initialize_system_prompt(db_conn_for_samples, current_script_dir):
|
|
global SYSTEM_PROMPT_TEMPLATE, MAGENTO_SCHEMA_CONTENT, TABLE_SAMPLES_CONTENT
|
|
|
|
if not MAGENTO_SCHEMA_CONTENT:
|
|
logger.error("Magento schema content is not loaded. Cannot initialize system prompt.")
|
|
# SYSTEM_PROMPT_TEMPLATE will remain empty or use a default if set elsewhere
|
|
return
|
|
|
|
sample_rows_cache_path = os.path.join(current_script_dir, TABLE_SAMPLES_CACHE_FILE)
|
|
|
|
try:
|
|
with open(sample_rows_cache_path, "r", encoding="utf-8") as f:
|
|
TABLE_SAMPLES_CONTENT = f.read()
|
|
logger.info(f"Table samples loaded successfully from cache: {sample_rows_cache_path}")
|
|
except FileNotFoundError:
|
|
logger.info(f"Table samples cache file not found: {sample_rows_cache_path}. Attempting to fetch from DB.")
|
|
if db_conn_for_samples and db_conn_for_samples.is_connected():
|
|
table_names = get_table_names_from_schema(MAGENTO_SCHEMA_CONTENT)
|
|
if table_names:
|
|
TABLE_SAMPLES_CONTENT = _fetch_and_format_table_samples(table_names, db_conn_for_samples, sample_rows_cache_path)
|
|
else:
|
|
logger.warning("No table names extracted from schema. Cannot fetch samples.")
|
|
TABLE_SAMPLES_CONTENT = " (Could not extract table names to fetch samples) "
|
|
else:
|
|
logger.warning("DB connection not available and cache miss. Proceeding without table samples in prompt.")
|
|
TABLE_SAMPLES_CONTENT = " (DB connection not available for fetching samples and no cache found) "
|
|
except Exception as e:
|
|
logger.error(f"Error loading table samples from cache {sample_rows_cache_path}: {e}")
|
|
TABLE_SAMPLES_CONTENT = f" (Error loading table samples from cache: {e}) "
|
|
|
|
|
|
SYSTEM_PROMPT_TEMPLATE = f"""
|
|
You are an expert Magento 2 database analyst and Python programmer. Your task is to assist in creating a dataset of questions, SQL queries, and Python validation functions for a Magento 2 database.
|
|
|
|
**Database Schema:**
|
|
--- START OF FILE schema_nonempty.txt ---
|
|
{MAGENTO_SCHEMA_CONTENT}
|
|
--- END OF FILE schema_nonempty.txt ---
|
|
|
|
**Sample Data from Tables (Top 5 rows if available):**
|
|
--- START OF SAMPLE DATA ---
|
|
{TABLE_SAMPLES_CONTENT}
|
|
--- END OF SAMPLE DATA ---
|
|
|
|
**Key Magento Schema Characteristics & EAV Model:**
|
|
* **EAV (Entity-Attribute-Value):** Many entities (products, categories, customers) use EAV.
|
|
* Core entity table: e.g., `catalog_product_entity`.
|
|
* Attribute definition: `eav_attribute`.
|
|
* Value tables by data type: e.g., `catalog_product_entity_varchar`, `_int`, `_decimal`.
|
|
* To get an attribute value (e.g., product name), you typically JOIN `catalog_product_entity` with `eav_attribute` (to find the attribute_id for 'name') and then JOIN with `catalog_product_entity_varchar` using that attribute_id and the product's entity_id.
|
|
* **Store Scopes:** Data can be global (store_id=0 or admin), website-specific, or store-view-specific. Queries often need to specify `store_id`. `store_id = 0` is the admin/default scope for many attributes.
|
|
* **Product Types:** `catalog_product_entity.type_id` can be 'simple', 'configurable', 'virtual', 'bundle', 'downloadable', 'grouped'.
|
|
* **Inventory (MSI):** `inventory_source_item` manages stock per source (e.g., 'default'). `status = 1` (In Stock), `status = 0` (Out of Stock). `cataloginventory_stock_item` is the older/default system.
|
|
* **Order Workflow:** `quote` (cart) -> `sales_order` -> `sales_invoice`, `sales_shipment`, `sales_creditmemo`.
|
|
* **Flat/Grid Tables:** Tables like `sales_order_grid`, `customer_grid_flat` are denormalized for admin panel performance. Queries for direct user-facing info might use these, but detailed analysis often requires joining base tables.
|
|
* **Date/Time:** Timestamps are common (e.g., `created_at`, `updated_at`). Be mindful of timezones if applicable, though standard MySQL functions usually handle it.
|
|
* **Foreign Keys:** Pay attention to foreign key relationships for JOINs (e.g., `sales_order_item.order_id` -> `sales_order.entity_id`).
|
|
|
|
**Task-Specific Instructions (General):**
|
|
* Ensure SQL queries are compatible with MariaDB/MySQL.
|
|
* For EAV attributes, ensure you correctly identify the `entity_type_id` for the attribute (e.g., for 'catalog_product' from `eav_entity_type` WHERE entity_type_code = 'catalog_product') and the `attribute_code`.
|
|
* Use `store_id = 0` for admin/default scope attributes unless a specific store view is relevant.
|
|
* Aim for variety in questions: simple lookups, aggregations, joins, EAV traversals, date operations, DML (for operational tasks).
|
|
* Answers derived from queries should be strictly verifiable.
|
|
"""
|
|
|
|
def create_db_connection(host_name, port, user_name, user_password, db_name, max_retries=3):
|
|
connection = None
|
|
retry_count = 0
|
|
while retry_count < max_retries:
|
|
try:
|
|
logger.info(f"Attempting to connect to database {db_name} at {host_name}:{port} (Attempt {retry_count + 1}/{max_retries})")
|
|
connection = mysql.connector.connect(
|
|
host=host_name,
|
|
port=int(port), # Ensure port is an integer
|
|
user=user_name,
|
|
passwd=user_password,
|
|
database=db_name,
|
|
connection_timeout=180,
|
|
)
|
|
logger.info(f"MySQL Database connection successful to {db_name}")
|
|
return connection
|
|
except Error as err:
|
|
retry_count += 1
|
|
logger.error(f"Failed to connect to database (Attempt {retry_count}/{max_retries}): {err}")
|
|
if retry_count < max_retries:
|
|
wait_time = 2 ** retry_count
|
|
logger.info(f"Waiting {wait_time} seconds before retrying...")
|
|
time.sleep(wait_time)
|
|
else:
|
|
return None
|
|
|
|
def call_llm(prompt_messages, model_name, temperature=0.2, max_tokens=2048):
|
|
"""Generic function to call OpenAI compatible API."""
|
|
if OPENAI_API_KEY == "YOUR_OPENAI_API_KEY_FALLBACK":
|
|
logger.error("OpenAI API key is not configured. Please set the OPENAI_API_KEY environment variable or update the script.")
|
|
return None
|
|
|
|
# Ensure client is initialized for each call or manage a global client carefully
|
|
try:
|
|
client = openai.OpenAI(api_key=OPENAI_API_KEY, base_url=OPENAI_BASE_URL)
|
|
except Exception as e:
|
|
logger.error(f"Failed to initialize OpenAI client: {e}")
|
|
return None
|
|
|
|
try:
|
|
logger.info(f"Calling LLM model: {model_name} with temperature {temperature}")
|
|
# Log the prompt (which is now the full conversation history)
|
|
# For very long histories, consider logging only the last few messages or a summary.
|
|
if len(json.dumps(prompt_messages, indent=2)) < 2000: # Avoid overly verbose logs for long histories
|
|
logger.debug(f"LLM Request Messages: {json.dumps(prompt_messages, indent=2)}")
|
|
else:
|
|
logger.debug(f"LLM Request Messages: (History too long to log fully - {len(prompt_messages)} messages)")
|
|
|
|
response = client.chat.completions.create(
|
|
model=model_name,
|
|
messages=prompt_messages,
|
|
temperature=temperature,
|
|
max_tokens=max_tokens
|
|
)
|
|
content = response.choices[0].message.content.strip()
|
|
logger.info(f"LLM call successful. Tokens used: Completion={response.usage.completion_tokens}, Prompt={response.usage.prompt_tokens}, Total={response.usage.total_tokens}")
|
|
logger.debug(f"LLM Raw Response Content:\n{content}")
|
|
return content
|
|
except Exception as e:
|
|
logger.error(f"Error calling LLM: {e}")
|
|
return None
|
|
|
|
|
|
def generate_initial_tasks_and_prep_sql(conversation_history, num_tasks=10):
|
|
"""Step 1: LLM generates initial questions/tasks and preparatory (SELECT-only) SQL queries."""
|
|
logger.info(f"Requesting LLM to generate {num_tasks} initial tasks and preparatory SQLs.")
|
|
prompt_step1 = f"""
|
|
Based on the provided Magento 2 database schema, sample data, and its characteristics (from system prompt which is already part of our conversation history), generate a list of {num_tasks} diverse tasks. For each task:
|
|
1. Provide an **original_question** (string): This is the initial high-level question or operational intent.
|
|
2. Provide a **task_type** (string): Either "query" or "operational_check".
|
|
3. Provide a **preparatory_sql_list** (list of strings): A list of one or more **SELECT-ONLY SQL queries**.
|
|
* These SQLs are for **information gathering, pre-condition checking, or collecting data to answer a question.**
|
|
* For **"query" task_type**, these SQLs should aim to gather all necessary data to answer the `original_question`.
|
|
* For **"operational_check" task_type** (e.g., "Intent: Update product X's price" or "Intent: Cancel order Y"), these SQLs should **ONLY** check if the target entity exists, get its current state, or list potential entities. **ABSOLUTELY NO DML (UPDATE, INSERT, DELETE) should be generated in this list.**
|
|
* The results of these preparatory SQLs will be used by a subsequent LLM call to refine the question and assess the gathered information.
|
|
|
|
Format the output STRICTLY as a JSON list of objects. Each object must have "original_question", "task_type", and "preparatory_sql_list" keys.
|
|
Ensure the JSON is well-formed. Do not include any introductory text or markdown formatting around the JSON list itself.
|
|
|
|
**Example for an "operational_check" task:**
|
|
{{
|
|
"original_question": "Intent: Update the stock quantity of product with SKU 'TEST-SKU-XYZ' to 50 in the default source.",
|
|
"task_type": "operational_check",
|
|
"preparatory_sql_list": [
|
|
"SELECT entity_id, sku FROM catalog_product_entity WHERE sku = 'TEST-SKU-XYZ';",
|
|
"SELECT quantity, status FROM inventory_source_item WHERE sku = 'TEST-SKU-XYZ' AND source_code = 'default';"
|
|
]
|
|
}}
|
|
|
|
**Example for a "query" task:**
|
|
{{
|
|
"original_question": "What are the details (increment_id, status, grand_total) of the most recent order placed by customer_email 'test@example.com'?",
|
|
"task_type": "query",
|
|
"preparatory_sql_list": [
|
|
"SELECT entity_id FROM customer_entity WHERE email = 'test@example.com';",
|
|
"SELECT entity_id, increment_id, status, grand_total, created_at FROM sales_order WHERE customer_email = 'test@example.com' ORDER BY created_at DESC LIMIT 1;"
|
|
]
|
|
}}
|
|
|
|
Generate {num_tasks} new and distinct items.
|
|
"""
|
|
conversation_history.append({"role": "user", "content": prompt_step1})
|
|
|
|
response_content = call_llm(conversation_history, LLM_MODEL_GENERATION, temperature=0.7, max_tokens=3500)
|
|
|
|
if response_content:
|
|
conversation_history.append({"role": "assistant", "content": response_content})
|
|
try:
|
|
clean_response = _clean_llm_json_response(response_content) # Use helper
|
|
generated_data = json.loads(clean_response)
|
|
if isinstance(generated_data, list) and all(
|
|
isinstance(item, dict) and
|
|
"original_question" in item and isinstance(item["original_question"], str) and
|
|
"task_type" in item and item["task_type"] in ["query", "operational_check"] and
|
|
"preparatory_sql_list" in item and isinstance(item["preparatory_sql_list"], list) and
|
|
all(isinstance(sql, str) and sql.strip().upper().startswith("SELECT") for sql in item["preparatory_sql_list"]) and item["preparatory_sql_list"]
|
|
for item in generated_data
|
|
):
|
|
logger.info(f"Successfully parsed {len(generated_data)} initial tasks from LLM.")
|
|
logger.info("--- LLM Generated Initial Tasks & Prep SQL ---")
|
|
for i, item in enumerate(generated_data):
|
|
logger.info(f" Item {i+1}/{len(generated_data)}:")
|
|
logger.info(f" Original Question: {item['original_question']}")
|
|
logger.info(f" Task Type: {item['task_type']}")
|
|
for j, sql in enumerate(item['preparatory_sql_list']):
|
|
logger.info(f" Prep SQL {j+1}: {sql}")
|
|
logger.info("--- End of LLM Generated Initial Tasks & Prep SQL ---")
|
|
return generated_data
|
|
else:
|
|
logger.error(f"LLM response was not a valid list of initial task objects or contained non-SELECT prep SQL. Content: {response_content}")
|
|
return []
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse JSON from LLM response for initial tasks: {e}")
|
|
logger.error(f"LLM Response Content (check for issues):\n{response_content}")
|
|
return []
|
|
else:
|
|
logger.error("LLM call failed for generate_initial_tasks_and_prep_sql. Removing last user prompt from history.")
|
|
if conversation_history and conversation_history[-1]["role"] == "user":
|
|
conversation_history.pop()
|
|
return []
|
|
|
|
def refine_question_and_assess_info(conversation_history, original_question, task_type, prep_sql_list, prep_sql_results_repr_list):
|
|
"""
|
|
Step 2: LLM refines question & derives answer/assesses feasibility based on preparatory SQL results.
|
|
NO FINAL SQL IS GENERATED HERE.
|
|
"""
|
|
logger.info(f"Requesting LLM to refine question and assess info for: {original_question[:100]}...")
|
|
|
|
prep_info_str = ""
|
|
for i, sql in enumerate(prep_sql_list):
|
|
prep_info_str += f"Preparatory SQL {i+1}: {sql}\n"
|
|
prep_info_str += f"Result {i+1} (Python repr):\n{prep_sql_results_repr_list[i]}\n\n"
|
|
|
|
output_keys_guidance = ""
|
|
if task_type == "query":
|
|
output_keys_guidance = 'Return a single JSON object with: "revised_question", "llm_derived_answer", "revision_justification". The "llm_derived_answer" should be your attempt to answer the revised_question based *solely* on the provided prep_sql_results.'
|
|
elif task_type == "operational_check":
|
|
output_keys_guidance = 'Return a single JSON object with: "revised_question", "llm_feasibility_summary", "llm_feasibility" (boolean, true if the operation described in revised_question is feasible based on prep_sql_results, false otherwise), "revision_justification". The "llm_feasibility_summary" should state whether the operational intent in revised_question seems feasible (e.g., "Product exists and is active") or not (e.g., "Order not found"), based *solely* on prep_sql_results.'
|
|
|
|
prompt_step2 = f"""
|
|
You are an expert Magento 2 database analyst.
|
|
You previously received an original question/task (as part of our ongoing conversation) and a list of preparatory SELECT SQL queries. Those SQLs have been executed.
|
|
Your current task is to:
|
|
1. Review the original question, the preparatory SQLs, and their execution results.
|
|
2. Generate a **revised_question** (string). This might be the same as the original if it's still perfectly valid, or it might be adjusted based on the findings (e.g., if an ID doesn't exist, or if more specific information was found).
|
|
3. Based on the **task_type** (see below) and the preparatory SQL results:
|
|
* If **task_type** is "query": Generate an **llm_derived_answer** (string). This should be a natural language answer to the `revised_question`, formulated *exclusively* from the data in `prep_sql_results`. If the results are insufficient, state that.
|
|
* If **task_type** is "operational_check": Generate an **llm_feasibility_summary** (string). This should summarize if the operational intent in `revised_question` appears feasible based *exclusively* on the `prep_sql_results` (e.g., "Product XYZ exists and current price is $50, so update is feasible." or "Order ABC not found, operation not feasible."). Also, generate an **llm_feasibility** (boolean): `true` if the summary indicates feasibility, `false` otherwise.
|
|
4. Provide a brief **revision_justification** (string) explaining why the question was or was not revised, and how the preparatory results informed your assessment or derived answer.
|
|
|
|
**Input Provided to You for this specific refinement task:**
|
|
* **Original Question (for context, you might have seen it or similar ones in our conversation):** {original_question}
|
|
* **Task Type (from previous step):** {task_type}
|
|
* **Preparatory SQLs and their Results for THIS task:**
|
|
{prep_info_str}
|
|
|
|
**Output Format:**
|
|
{output_keys_guidance}
|
|
Ensure the JSON is well-formed and contains only the specified keys. Provide the JSON object directly.
|
|
|
|
**Example for "query" task_type:**
|
|
{{
|
|
"revised_question": "What is the status and grand_total of order increment_id '100000005'?",
|
|
"llm_derived_answer": "Order '100000005' has status 'complete' and grand_total '125.50'.",
|
|
"revision_justification": "Original question was specific. Prep SQL confirmed order existence and fetched details. Answer derived directly from prep results."
|
|
}}
|
|
|
|
**Example for "operational_check" task_type:**
|
|
{{
|
|
"revised_question": "Intent: Update stock for SKU 'ABC-123' from 10 to 5.",
|
|
"llm_feasibility_summary": "Product SKU 'ABC-123' exists and current stock is 10. The update is feasible.",
|
|
"llm_feasibility": true,
|
|
"revision_justification": "Prep SQLs confirmed product existence and current stock level, matching conditions for the intended operation."
|
|
}}
|
|
"""
|
|
conversation_history.append({"role": "user", "content": prompt_step2})
|
|
response_content = call_llm(conversation_history, LLM_MODEL_GENERATION, temperature=0.2, max_tokens=1500)
|
|
|
|
if response_content:
|
|
conversation_history.append({"role": "assistant", "content": response_content})
|
|
try:
|
|
clean_response = _clean_llm_json_response(response_content) # Use helper
|
|
refined_data = json.loads(clean_response)
|
|
|
|
base_keys_valid = isinstance(refined_data, dict) and \
|
|
"revised_question" in refined_data and isinstance(refined_data["revised_question"], str) and \
|
|
"revision_justification" in refined_data and isinstance(refined_data["revision_justification"], str)
|
|
|
|
type_specific_keys_valid = False
|
|
if task_type == "query":
|
|
type_specific_keys_valid = "llm_derived_answer" in refined_data and isinstance(refined_data["llm_derived_answer"], str)
|
|
elif task_type == "operational_check":
|
|
type_specific_keys_valid = "llm_feasibility_summary" in refined_data and isinstance(refined_data["llm_feasibility_summary"], str) and \
|
|
"llm_feasibility" in refined_data and isinstance(refined_data["llm_feasibility"], bool)
|
|
|
|
if base_keys_valid and type_specific_keys_valid:
|
|
logger.info("Successfully parsed refined question and assessment from LLM.")
|
|
logger.info(f" Revised Question: {refined_data['revised_question']}")
|
|
if task_type == "query":
|
|
logger.info(f" LLM Derived Answer: {refined_data['llm_derived_answer']}")
|
|
elif task_type == "operational_check":
|
|
logger.info(f" LLM Feasibility Summary: {refined_data['llm_feasibility_summary']}")
|
|
logger.info(f" LLM Feasibility: {refined_data['llm_feasibility']}")
|
|
logger.info(f" Revision Justification: {refined_data['revision_justification']}")
|
|
return refined_data
|
|
else:
|
|
logger.error(f"LLM response for refined assessment had missing or invalid keys for task_type '{task_type}'. Content: {response_content}")
|
|
return None
|
|
except json.JSONDecodeError as e:
|
|
logger.error(f"Failed to parse JSON from LLM response for refined assessment: {e}")
|
|
logger.error(f"LLM Response Content (check for issues):\n{response_content}")
|
|
return None
|
|
else:
|
|
logger.error("LLM call failed for refine_question_and_assess_info. Removing last user prompt from history.")
|
|
if conversation_history and conversation_history[-1]["role"] == "user":
|
|
conversation_history.pop()
|
|
return None
|
|
|
|
def execute_sql_and_get_results(db_conn, sql_query, question_text, q_num):
|
|
"""Step 2: Execute SQL and collect results. Handles SELECT and DML."""
|
|
logger.info(f"Attempting to execute SQL for Question {q_num}: {question_text[:100]}...")
|
|
logger.debug(f"Full SQL for Q{q_num}: {sql_query}")
|
|
|
|
# Ensure connection is active
|
|
try:
|
|
if db_conn is None or not db_conn.is_connected():
|
|
logger.warning(f"DB connection lost or not available for Q{q_num}. Attempting to reconnect...")
|
|
db_conn = create_db_connection(DB_HOST, DB_PORT, DB_USER, DB_PASS, DB_NAME)
|
|
if db_conn is None or not db_conn.is_connected():
|
|
logger.error(f"Failed to re-establish DB connection for Q{q_num}.")
|
|
return f"Error: Database connection lost and could not be re-established."
|
|
except Exception as e: # Catch broader exceptions if is_connected() fails
|
|
logger.error(f"Error checking DB connection status for Q{q_num}: {e}. Attempting to reconnect...")
|
|
db_conn = create_db_connection(DB_HOST, DB_PORT, DB_USER, DB_PASS, DB_NAME)
|
|
if db_conn is None :
|
|
logger.error(f"Failed to re-establish DB connection for Q{q_num} after check error.")
|
|
return f"Error: Database connection check failed and could not be re-established."
|
|
|
|
|
|
cursor = None
|
|
raw_results = []
|
|
# Normalize SQL query for DML check (uppercase, remove leading/trailing whitespace)
|
|
normalized_sql_query = sql_query.strip().upper()
|
|
is_dml = any(normalized_sql_query.startswith(dml_cmd) for dml_cmd in ["UPDATE", "INSERT", "DELETE"])
|
|
# CREATE, ALTER, DROP are DDL, not typically what we expect here but could be considered 'operational'
|
|
is_ddl = any(normalized_sql_query.startswith(ddl_cmd) for ddl_cmd in ["CREATE", "ALTER", "DROP"])
|
|
|
|
|
|
query_start_time = time.time()
|
|
try:
|
|
cursor = db_conn.cursor(dictionary=True)
|
|
logger.debug(f"Cursor created for Q{q_num}.")
|
|
|
|
cursor.execute(sql_query)
|
|
logger.debug(f"SQL executed for Q{q_num}.")
|
|
|
|
|
|
if not is_dml and not is_ddl: # It's a SELECT query
|
|
fetched_rows = cursor.fetchall()
|
|
# Convert Decimal to string for JSON serializability and consistent LLM input
|
|
for row in fetched_rows:
|
|
raw_results.append({
|
|
k: str(v) if isinstance(v, decimal.Decimal) else
|
|
v.strftime('%Y-%m-%d %H:%M:%S') if isinstance(v, datetime) else
|
|
v
|
|
for k, v in row.items()
|
|
})
|
|
logger.info(f"SELECT query for Q{q_num} fetched {len(raw_results)} rows.")
|
|
elif is_dml:
|
|
db_conn.commit()
|
|
raw_results = f"Rows affected: {cursor.rowcount}"
|
|
logger.info(f"DML query for Q{q_num} committed. {raw_results}")
|
|
elif is_ddl:
|
|
db_conn.commit() # Some DDL might need commit or are auto-committed
|
|
raw_results = f"DDL statement executed. Rows affected: {cursor.rowcount}" # rowcount might be -1 or 0 for DDL
|
|
logger.info(f"DDL query for Q{q_num} executed. {raw_results}")
|
|
|
|
|
|
except Error as e:
|
|
logger.error(f"Error executing SQL for Q{q_num}: {e}\nSQL: {sql_query}")
|
|
if db_conn and db_conn.is_connected() and (is_dml or is_ddl) : # Only rollback DML/DDL on error
|
|
try:
|
|
db_conn.rollback()
|
|
logger.info(f"Rolled back transaction for Q{q_num} due to error.")
|
|
except Error as rb_err:
|
|
logger.error(f"Error during rollback for Q{q_num}: {rb_err}")
|
|
return f"Error: {str(e)}" # Return error string
|
|
finally:
|
|
if cursor:
|
|
cursor.close()
|
|
logger.debug(f"Cursor closed for Q{q_num}.")
|
|
|
|
query_duration = time.time() - query_start_time
|
|
logger.info(f"SQL for Q{q_num} processed in {query_duration:.2f}s.")
|
|
return raw_results
|
|
|
|
|
|
def generate_validation_function(conversation_history, revised_question, task_type,
|
|
prep_sql_list_str,
|
|
prep_sql_results_repr_list_str,
|
|
llm_assessment_from_step2_str
|
|
):
|
|
"""
|
|
Step 3: LLM generates Python validation function based on ongoing conversation.
|
|
- For "query": creates validate_query_answer(user_answer) with hardcoded expected answer.
|
|
- For "operational_check": creates validate_operational_state(db_connection) with hardcoded DB checks.
|
|
"""
|
|
logger.info(f"Requesting LLM to generate validation function for: {revised_question[:100]} (Type: {task_type})")
|
|
|
|
prompt_core_context = f"""
|
|
**Context Provided to You (for informing the validation logic you will create, building upon our conversation):**
|
|
1. **Revised Question/Operational Intent (from a previous turn in our conversation):**
|
|
```
|
|
{revised_question}
|
|
```
|
|
2. **Task Type:** "{task_type}"
|
|
3. **Preparatory SELECT SQL List (that led to the assessment below):**
|
|
```sql
|
|
{prep_sql_list_str}
|
|
```
|
|
4. **Preparatory SQL Execution Results (Python string repr of a list of results):**
|
|
```python
|
|
{prep_sql_results_repr_list_str}
|
|
```
|
|
"""
|
|
|
|
if task_type == "query":
|
|
prompt_step3_template = f"""
|
|
You are an expert Magento 2 database analyst and Python programmer, continuing our session.
|
|
Your task is to create a Python validation function `validate_query_answer(user_answer)`.
|
|
This function will take a `user_answer` (string) and compare it against an **expected answer** that you will determine and **hardcode** into the function.
|
|
|
|
{prompt_core_context}
|
|
5. **LLM's Derived Answer (this was your answer in a previous turn and should be the basis for your hardcoded expected answer):**
|
|
```
|
|
{llm_assessment_from_step2_str}
|
|
```
|
|
|
|
**Your Task for "query" type (based on our discussion):**
|
|
Create a Python function `validate_query_answer(user_answer)`:
|
|
* The function should **hardcode the expected answer** based on the "LLM's Derived Answer" provided above. This might involve storing the exact string, or if it's numerical/structured, parsing and storing it appropriately.
|
|
* It compares the input `user_answer` to this hardcoded expected answer.
|
|
* Return `(is_valid, message)`:
|
|
* `is_valid` (boolean): `True` if `user_answer` matches the hardcoded expected answer (allow for some flexibility like case-insensitivity or stripping whitespace for strings, or numerical tolerance if applicable).
|
|
* `message` (string): Explaining the outcome (e.g., "User answer matches expected.", "User answer 'X' does not match expected 'Y'.").
|
|
* The function must be self-contained (standard imports like `json`, `decimal` are ok if needed for handling the expected answer). It does **NOT** use a database connection.
|
|
|
|
**Example `validate_query_answer` structure:**
|
|
```python
|
|
import decimal # if needed
|
|
|
|
def validate_query_answer(user_answer):
|
|
# Based on LLM's Derived Answer: "The total number of customers is 157."
|
|
expected_answer_str = "The total number of customers is 157."
|
|
# Or, for numerical: expected_count = 157
|
|
|
|
# Simple string comparison (you can make this more robust)
|
|
if isinstance(user_answer, str) and user_answer.strip().lower() == expected_answer_str.strip().lower():
|
|
return True, "User answer matches the expected answer."
|
|
else:
|
|
# Attempt to extract number if question implies a number
|
|
try:
|
|
# This is just an example, adapt based on actual derived answer format
|
|
user_num_part = ''.join(filter(str.isdigit, user_answer))
|
|
expected_num_part = ''.join(filter(str.isdigit, expected_answer_str))
|
|
if user_num_part and expected_num_part and int(user_num_part) == int(expected_num_part):
|
|
return True, f"User answer contains the correct numerical part '{{user_num_part}}' matching expected."
|
|
except ValueError:
|
|
pass # Failed to parse numbers
|
|
|
|
return False, f"User answer '{{user_answer}}' does not sufficiently match the expected: '{{expected_answer_str}}'."
|
|
```
|
|
|
|
Now, provide *only* the Python code for `validate_query_answer(user_answer)` based on the specific inputs given.
|
|
"""
|
|
elif task_type == "operational_check":
|
|
prompt_step3_template = f"""
|
|
You are an expert Magento 2 database analyst and Python programmer, continuing our session.
|
|
Your task is to create a Python validation function `validate_operational_state(db_connection)`.
|
|
This function will use the provided `db_connection` to perform **new, hardcoded SELECT queries** to verify that the database state aligns with an expected condition or feasibility assessment derived in our conversation.
|
|
|
|
{prompt_core_context}
|
|
5. **LLM's Feasibility Summary (this was your summary in a previous turn and describes the state your function should verify):**
|
|
```
|
|
{llm_assessment_from_step2_str}
|
|
```
|
|
|
|
**Your Task for "operational_check" type (based on our discussion):**
|
|
Create a Python function `validate_operational_state(db_connection)`:
|
|
* The function must contain **hardcoded SELECT SQL query/queries** that you design. These queries should aim to re-verify the conditions described in the "LLM's Feasibility Summary" and the "Revised Operational Intent".
|
|
* It uses the `db_connection` to execute these hardcoded SQLs.
|
|
* It then analyzes the results of its own SQLs to determine if the database state is as expected.
|
|
* Return `(is_valid, message)`:
|
|
* `is_valid` (boolean): `True` if the database state (queried by your hardcoded SQLs) matches the expected conditions.
|
|
* `message` (string): Explaining the outcome (e.g., "Verified: Product SKU 'XYZ' exists and is active.", "Verification failed: Order 123 status is 'shipped', not 'pending' as expected for the check.").
|
|
* The function must be self-contained (standard imports, `mysql.connector.Error` for db errors are ok).
|
|
* Handle potential errors during its own database operations. If `db_connection` is `None` or unusable, it should return `(False, "DB connection not available for validation.")`.
|
|
|
|
**Example `validate_operational_state` structure:**
|
|
```python
|
|
from mysql.connector import Error # If you need to catch DB errors
|
|
|
|
def validate_operational_state(db_connection):
|
|
# Revised Intent: "Check if product SKU 'ABC' is in stock (status=1) at source 'default'."
|
|
# LLM Feasibility Summary: "Product SKU 'ABC' exists. Its stock status at 'default' is 1 (In Stock)."
|
|
|
|
if not db_connection or not db_connection.is_connected():
|
|
return False, "Database connection not available for validation."
|
|
|
|
sku_to_check = "ABC" # Hardcoded based on context
|
|
source_to_check = "default" # Hardcoded
|
|
expected_status = 1 # Hardcoded
|
|
|
|
try:
|
|
cursor = db_connection.cursor(dictionary=True)
|
|
# Hardcoded SQL to re-verify the state
|
|
query = f"SELECT status FROM inventory_source_item WHERE sku = %s AND source_code = %s"
|
|
cursor.execute(query, (sku_to_check, source_to_check))
|
|
result = cursor.fetchone()
|
|
cursor.close()
|
|
|
|
if result:
|
|
if result['status'] == expected_status:
|
|
return True, f"Validation successful: SKU '{{sku_to_check}}' at source '{{source_to_check}}' has status '{{expected_status}}'."
|
|
else:
|
|
return False, f"Validation failed: SKU '{{sku_to_check}}' at source '{{source_to_check}}' has status '{{result['status']}}', expected '{{expected_status}}'."
|
|
else:
|
|
return False, f"Validation failed: SKU '{{sku_to_check}}' not found at source '{{source_to_check}}' during validation check."
|
|
except Error as e:
|
|
return False, f"Database error during validation: {{e}}"
|
|
except Exception as ex:
|
|
return False, f"Unexpected error during validation: {{ex}}"
|
|
|
|
```
|
|
Now, provide *only* the Python code for the function (`validate_query_answer` or `validate_operational_state`) based on the specific inputs given.
|
|
"""
|
|
else:
|
|
logger.error(f"Unknown task_type '{task_type}' for generating validation function prompt.")
|
|
return "# Error: Unknown task_type for validation function generation."
|
|
|
|
filled_prompt = prompt_step3_template
|
|
conversation_history.append({"role": "user", "content": filled_prompt})
|
|
|
|
validation_function_code = call_llm(conversation_history, LLM_MODEL_VALIDATION, temperature=0.1, max_tokens=2500)
|
|
|
|
if validation_function_code:
|
|
conversation_history.append({"role": "assistant", "content": validation_function_code})
|
|
clean_code = _clean_llm_python_code_response(validation_function_code) # Use helper
|
|
logger.info(f"Successfully generated validation function code for task type '{task_type}'.")
|
|
return clean_code
|
|
else:
|
|
logger.error(f"Failed to generate validation function code for task type '{task_type}'. Removing last user prompt from history.")
|
|
if conversation_history and conversation_history[-1]["role"] == "user":
|
|
conversation_history.pop()
|
|
return "# LLM failed to generate validation function or an error occurred."
|
|
|
|
|
|
def main_workflow():
|
|
"""Main orchestrator for the multi-step QA generation (SELECT-only focus)."""
|
|
logger.info("=================================================")
|
|
logger.info("=== Magento QA Gen (SELECT-Only Info Gathering & Validation) ===")
|
|
logger.info("=================================================")
|
|
|
|
global MAGENTO_SCHEMA_CONTENT, SYSTEM_PROMPT_TEMPLATE # Ensure SYSTEM_PROMPT_TEMPLATE is global
|
|
script_dir = ""
|
|
try:
|
|
script_dir = os.path.dirname(os.path.abspath(__file__))
|
|
schema_file_path = os.path.join(script_dir, "curated_schema.txt")
|
|
with open(schema_file_path, "r", encoding="utf-8") as f:
|
|
MAGENTO_SCHEMA_CONTENT = f.read()
|
|
logger.info(f"Schema loaded successfully from {schema_file_path}")
|
|
except FileNotFoundError:
|
|
logger.error(f"schema_nonempty.txt not found at {schema_file_path}. Exiting.")
|
|
return
|
|
except Exception as e:
|
|
logger.error(f"Error loading schema file: {e}. Exiting.")
|
|
return
|
|
|
|
db_connection_main_loop = create_db_connection(DB_HOST, DB_PORT, DB_USER, DB_PASS, DB_NAME)
|
|
if not db_connection_main_loop: # db_connection_main_loop is used for samples and passed around
|
|
logger.error("Initial DB connection failed. Needed for samples and prep SQL. Exiting.")
|
|
return
|
|
|
|
initialize_system_prompt(db_connection_main_loop, script_dir)
|
|
if not SYSTEM_PROMPT_TEMPLATE: # SYSTEM_PROMPT_TEMPLATE is populated by initialize_system_prompt
|
|
logger.error("System prompt initialization failed. Exiting.")
|
|
if db_connection_main_loop and db_connection_main_loop.is_connected(): db_connection_main_loop.close()
|
|
return
|
|
|
|
# Initialize conversation history with the system prompt
|
|
conversation_history = [{"role": "system", "content": SYSTEM_PROMPT_TEMPLATE}]
|
|
|
|
# Step 1: Generate Initial Tasks and Preparatory SQL
|
|
logger.info("--- Starting Step 1: Generate Initial Tasks and Preparatory SQL ---")
|
|
# Pass the conversation_history to be appended to
|
|
initial_tasks = generate_initial_tasks_and_prep_sql(conversation_history, num_tasks=5)
|
|
if not initial_tasks:
|
|
logger.error("No initial tasks generated by LLM. Exiting.")
|
|
if db_connection_main_loop and db_connection_main_loop.is_connected(): db_connection_main_loop.close()
|
|
return
|
|
logger.info(f"Step 1 completed. Received {len(initial_tasks)} initial tasks.")
|
|
|
|
output_filename = f"magento_qa_info_gathering_{datetime.now().strftime('%Y%m%d_%H%M%S')}.jsonl"
|
|
|
|
try:
|
|
with open(output_filename, "w", encoding="utf-8") as outfile:
|
|
for i, task_data in enumerate(initial_tasks):
|
|
item_num = i + 1
|
|
logger.info(f"\nProcessing Item {item_num}/{len(initial_tasks)}: \"{task_data['original_question'][:100]}...\"")
|
|
|
|
original_question = task_data["original_question"]
|
|
task_type = task_data["task_type"]
|
|
preparatory_sql_list = task_data["preparatory_sql_list"]
|
|
|
|
prep_sql_actual_results_list = []
|
|
prep_sql_results_repr_list = []
|
|
logger.info(f" Executing {len(preparatory_sql_list)} preparatory SQLs for Item {item_num}...")
|
|
|
|
current_db_conn_for_item = db_connection_main_loop
|
|
for prep_sql_idx, prep_sql in enumerate(preparatory_sql_list):
|
|
logger.info(f" Prep SQL {prep_sql_idx+1}: {prep_sql}")
|
|
if not current_db_conn_for_item or not current_db_conn_for_item.is_connected():
|
|
logger.warning(f"DB connection lost before prep SQL for item {item_num}. Attempting reconnect...")
|
|
current_db_conn_for_item = create_db_connection(DB_HOST, DB_PORT, DB_USER, DB_PASS, DB_NAME)
|
|
if not current_db_conn_for_item:
|
|
db_connection_main_loop = None
|
|
err_msg = "Error: DB connection lost and failed to reconnect during prep SQL execution."
|
|
for _ in range(prep_sql_idx, len(preparatory_sql_list)):
|
|
prep_sql_actual_results_list.append(err_msg)
|
|
prep_sql_results_repr_list.append(repr(err_msg))
|
|
logger.error(f"Failed to reconnect. Skipping rest of prep SQLs for item {item_num}.")
|
|
break
|
|
|
|
current_prep_result = execute_sql_and_get_results(current_db_conn_for_item, prep_sql, f"Prep Q{item_num}.{prep_sql_idx+1}", item_num)
|
|
prep_sql_actual_results_list.append(current_prep_result)
|
|
prep_sql_results_repr_list.append(repr(current_prep_result))
|
|
if isinstance(current_prep_result, str) and current_prep_result.startswith("Error:"):
|
|
logger.warning(f" Prep SQL {prep_sql_idx+1} for item {item_num} resulted in error: {current_prep_result}")
|
|
|
|
if current_db_conn_for_item is not db_connection_main_loop and current_db_conn_for_item is not None:
|
|
db_connection_main_loop = current_db_conn_for_item
|
|
elif not current_db_conn_for_item:
|
|
db_connection_main_loop = None
|
|
|
|
|
|
logger.info(f" Finished executing preparatory SQLs for Item {item_num}.")
|
|
|
|
logger.info(f" Starting Step 2: Refine Question and Assess Info for Item {item_num}...")
|
|
# Pass conversation_history to be appended to
|
|
llm_assessment_data = refine_question_and_assess_info(
|
|
conversation_history, original_question, task_type, preparatory_sql_list, prep_sql_results_repr_list
|
|
)
|
|
|
|
if not llm_assessment_data:
|
|
logger.error(f"Failed to get assessment from LLM for Item {item_num}. Skipping validation and saving partial.")
|
|
record = { "item_number": item_num, "original_question": original_question, "task_type": task_type,
|
|
"preparatory_sql_list": preparatory_sql_list,
|
|
"preparatory_sql_actual_results_preview": [str(r)[:200] for r in prep_sql_actual_results_list],
|
|
"status": "Failed at LLM assessment step" }
|
|
outfile.write(json.dumps(record) + "\n"); outfile.flush()
|
|
continue
|
|
|
|
revised_question = llm_assessment_data["revised_question"]
|
|
revision_justification = llm_assessment_data["revision_justification"]
|
|
llm_assessment_from_step2_value = ""
|
|
llm_feasibility_flag = None
|
|
|
|
if task_type == "query":
|
|
llm_assessment_from_step2_value = llm_assessment_data.get("llm_derived_answer", "Error: LLM did not provide derived answer.")
|
|
elif task_type == "operational_check":
|
|
llm_assessment_from_step2_value = llm_assessment_data.get("llm_feasibility_summary", "Error: LLM did not provide feasibility_summary.")
|
|
llm_feasibility_flag = llm_assessment_data.get("llm_feasibility", False) # Default to False if missing
|
|
|
|
validation_function_code = "# Validation function not generated."
|
|
if task_type == "query" or (task_type == "operational_check" and llm_feasibility_flag is True):
|
|
logger.info(f" Starting Step 3: Generate Validation Function for Item {item_num}...")
|
|
time.sleep(1)
|
|
|
|
prep_sql_list_str_for_prompt = "\n".join(preparatory_sql_list)
|
|
prep_sql_results_repr_list_str_for_prompt = "[\n" + ",\n".join(f" {r}" for r in prep_sql_results_repr_list) + "\n]"
|
|
|
|
# Pass conversation_history to be appended to
|
|
current_validation_code = generate_validation_function(
|
|
conversation_history, revised_question, task_type,
|
|
prep_sql_list_str_for_prompt,
|
|
prep_sql_results_repr_list_str_for_prompt,
|
|
llm_assessment_from_step2_value
|
|
)
|
|
if not current_validation_code or "# LLM failed" in current_validation_code:
|
|
logger.warning(f" Validation function generation failed or was incomplete for Item {item_num}.")
|
|
validation_function_code = current_validation_code if current_validation_code else "# LLM failed to generate validation function or returned empty."
|
|
else:
|
|
validation_function_code = current_validation_code
|
|
elif task_type == "operational_check" and llm_feasibility_flag is False:
|
|
logger.info(f" Skipping Step 3 (Validation Function Generation) for Item {item_num} because llm_feasibility is False.")
|
|
validation_function_code = "# Validation function generation skipped as operation was deemed not feasible."
|
|
|
|
|
|
record = {
|
|
"item_number": item_num, "original_question": original_question, "task_type": task_type,
|
|
"preparatory_sql_list": preparatory_sql_list,
|
|
"preparatory_sql_actual_results_preview": [str(r)[:200] for r in prep_sql_actual_results_list],
|
|
"full_preparatory_sql_actual_results_repr": prep_sql_results_repr_list,
|
|
"revised_question": revised_question, "revision_justification": revision_justification,
|
|
}
|
|
if task_type == "query":
|
|
record["llm_derived_answer_for_validation_func_gen"] = llm_assessment_from_step2_value
|
|
elif task_type == "operational_check":
|
|
record["llm_feasibility_summary_for_validation_func_gen"] = llm_assessment_from_step2_value
|
|
record["llm_feasibility"] = llm_feasibility_flag # Record the boolean feasibility
|
|
|
|
record["python_validation_function"] = validation_function_code
|
|
|
|
outfile.write(json.dumps(record) + "\n")
|
|
outfile.flush()
|
|
logger.info(f"Record {item_num} written to {output_filename}")
|
|
|
|
if i < len(initial_tasks) - 1:
|
|
llm_call_delay = int(os.environ.get("LLM_CALL_DELAY_SECONDS", "5"))
|
|
logger.info(f"Waiting {llm_call_delay} seconds before next item (and its LLM calls)...")
|
|
time.sleep(llm_call_delay)
|
|
|
|
except Exception as e:
|
|
logger.error(f"An critical error occurred in the main workflow: {e}", exc_info=True)
|
|
finally:
|
|
if db_connection_main_loop and db_connection_main_loop.is_connected():
|
|
db_connection_main_loop.close()
|
|
logger.info("Main database connection closed at the end of the workflow.")
|
|
elif db_connection_main_loop is None:
|
|
logger.info("Main database connection was lost and not re-established.")
|
|
logger.info(f"Workflow finished. Log file: {log_file_name}")
|
|
|
|
if __name__ == "__main__":
|
|
if OPENAI_API_KEY == "YOUR_OPENAI_API_KEY_FALLBACK":
|
|
print("CRITICAL: OpenAI API key is not set. Please set the OPENAI_API_KEY environment variable or update the script.")
|
|
logger.critical("OpenAI API key is not set. Please set the OPENAI_API_KEY environment variable or update the script.")
|
|
else:
|
|
main_workflow()
|
|
logger.info(f"Workflow finished. Log file: {log_file_name}")
|
|
|
|
"""
|
|
**Before Running:**
|
|
|
|
1. **`schema_nonempty.txt`**: **CRITICAL:** Place your full Magento `schema_nonempty.txt` file in the same directory as this Python script. The script now tries to load it.
|
|
2. **OpenAI API Key:**
|
|
* **Best Method:** Set it as an environment variable: `export OPENAI_API_KEY="sk-..."`
|
|
* Alternatively, replace `"YOUR_OPENAI_API_KEY_FALLBACK"` in the script, but this is less secure.
|
|
3. **`OPENAI_BASE_URL`**: If you are using a proxy or a non-standard OpenAI endpoint, update this. Otherwise, the default `https://api.openai.com/v1` should work for official OpenAI.
|
|
4. **Database Credentials:** Ensure `DB_HOST`, `DB_PORT`, `DB_USER`, `DB_PASS`, `DB_NAME` are correct.
|
|
5. **Install Libraries:** `pip install openai mysql-connector-python pandas`
|
|
6. **LLM Models:** `LLM_MODEL_GENERATION` and `LLM_MODEL_VALIDATION` are set to `gpt-4-turbo-preview`. You might want to use `gpt-3.5-turbo` for `LLM_MODEL_VALIDATION` to save costs/time if its quality is sufficient for generating the validation functions.
|
|
7. **Rate Limiting:** The `time.sleep(5)` (now configurable via `LLM_CALL_DELAY_SECONDS` env var, defaulting to 5) is a very basic rate limiter. If you have higher API limits or make fewer calls, you can adjust this.
|
|
|
|
**Key Improvements in this Version:**
|
|
|
|
* **Schema Loading:** The script now explicitly loads `schema_nonempty.txt` and incorporates its content into the system prompt.
|
|
* **Environment Variables:** Encourages using environment variables for API keys.
|
|
* **Robust LLM JSON Parsing:** Added more cleaning for the JSON response from the LLM.
|
|
* **Error Handling:** More `try-except` blocks, especially around LLM calls and database operations.
|
|
* **DML Handling in `execute_sql_and_get_results`:**
|
|
* Detects DML (UPDATE, INSERT, DELETE) and DDL (CREATE, ALTER, DROP).
|
|
* Commits transactions for DML/DDL.
|
|
* Rolls back DML/DDL on error.
|
|
* Returns "Rows affected: X" for DML.
|
|
* **Stringification of Results:** `datetime` and `decimal.Decimal` objects from `SELECT` queries are converted to strings before being passed to `repr()` for the LLM prompt. This makes the LLM's job of understanding the "SQL Execution Result" string easier and more consistent.
|
|
* **Logging:** Enhanced logging for better traceability, including logging the prompts sent to the LLM (at DEBUG level) and token usage.
|
|
* **Connection Management:** Improved checks for database connection status and attempts to reconnect if lost.
|
|
* **Clearer Prompts:** Refined prompts for the LLM, especially for generating validation functions, to be more explicit about input formats and expected output.
|
|
* **Configurable LLM Call Delay:** Added an environment variable `LLM_CALL_DELAY_SECONDS` for easier adjustment of the delay between LLM calls.
|
|
* **Port as Integer:** Ensured `DB_PORT` is cast to an integer for `mysql.connector.connect`.
|
|
* **Full Result for LLM:** The `full_sql_execution_result_repr_for_llm` is now included in the JSONL, which can be useful for debugging or if you need to re-prompt the LLM for a specific validation function.
|
|
|
|
This script is now much more robust and production-ready for your task. Remember to monitor the LLM costs and API rate limits.
|
|
|
|
""" |