ai-email-assistant/src/graph/nodes/parse_output.py

142 lines
4.2 KiB
Python

import json
import re
from src.graph.state import EmailGenerationState
from src.models.email import EmailDraft
from src.models.errors import ParseError
def parse_output_node(state: EmailGenerationState) -> EmailGenerationState:
try:
llm_output = state.get("llm_output")
if not llm_output:
state["error"] = "LLM output is required for parsing"
state["error_code"] = "MISSING_LLM_OUTPUT"
return state
content = llm_output.content.strip()
content = _clean_json_content(content)
try:
parsed_data = json.loads(content)
except json.JSONDecodeError:
parsed_data = _fallback_parse(content)
if not isinstance(parsed_data, dict):
raise ParseError("Response is not a JSON object", content)
subject = parsed_data.get("subject", "").strip()
body = parsed_data.get("body", "").strip()
short_reasoning = parsed_data.get("short_reasoning", "")
used_chunks = parsed_data.get("used_chunks", [])
if not subject:
raise ParseError("Subject is required", content)
if not body:
raise ParseError("Body is required", content)
subject = _validate_subject(subject)
body = _validate_body(body)
email_draft = EmailDraft(
subject=subject,
body=body,
short_reasoning=short_reasoning,
used_chunks=used_chunks if isinstance(used_chunks, list) else [],
)
state["email_draft"] = email_draft
return state
except ParseError as e:
state["error"] = e.message
state["error_code"] = "PARSE_ERROR"
state["trace_meta"] = {"raw_output": e.raw_output[:500], "details": e.details}
return state
except Exception as e:
state["error"] = f"Output parsing error: {str(e)}"
state["error_code"] = "PARSING_ERROR"
return state
def _clean_json_content(content: str) -> str:
content = re.sub(r"^```json\s*", "", content)
content = re.sub(r"\s*```$", "", content)
content = re.sub(r"^```\s*", "", content)
content = content.strip()
return content
def _fallback_parse(content: str) -> dict:
lines = content.split("\n")
result = {}
current_key = None
current_value = []
for line in lines:
line = line.strip()
if ":" in line and line.startswith('"') and line.count('"') >= 4:
if current_key:
result[current_key] = "\n".join(current_value)
parts = line.split(":", 1)
current_key = parts[0].strip('"').strip()
current_value = [parts[1].strip().strip(",").strip('"')]
elif current_key:
current_value.append(line.strip(",").strip('"'))
if current_key:
result[current_key] = "\n".join(current_value)
return result
def _validate_subject(subject: str) -> str:
if len(subject) > 80:
words = subject.split()
truncated = []
char_count = 0
for word in words:
if char_count + len(word) + 1 <= 77:
truncated.append(word)
char_count += len(word) + 1
else:
break
subject = " ".join(truncated) + "..."
spam_patterns = [
r"(!{2,})",
r"(СКИДКА|АКЦИЯ|СРОЧНО|БЕСПЛАТНО)",
r"(\$|\€|\₽)",
]
for pattern in spam_patterns:
subject = re.sub(pattern, "", subject, flags=re.IGNORECASE)
return subject.strip()
def _validate_body(body: str) -> str:
if len(body) > 2000:
body = body[:1950] + "..."
required_elements = {"greeting": False, "company_mention": False, "cta": False}
greetings = ["добрый день", "здравствуйте", "приветствую"]
if any(greeting in body.lower() for greeting in greetings):
required_elements["greeting"] = True
if "консоль" in body.lower():
required_elements["company_mention"] = True
cta_phrases = ["звонок", "демо", "встреча", "обсудить", "покажу"]
if any(phrase in body.lower() for phrase in cta_phrases):
required_elements["cta"] = True
return body.strip()