Files
AI_Devlop/AI_Web_Scraper/event_logger.py

182 lines
6.8 KiB
Python

import os
import json
import uuid
import datetime as _dt
from typing import Any, Dict, Optional
_LOGGER_INSTANCE = None
class EventLogger:
"""
Lightweight JSONL + console event logger for the app.
Avoids exposing LLM chain-of-thought by default; can be opted-in via config/env.
"""
def __init__(self, log_dir: str = "./logs", enable_file: bool = True,
show_llm_thoughts: bool = False, console_level: str = "INFO",
preview_saved_files: bool = False, preview_limit: int = 500):
self.log_dir = log_dir
self.enable_file = enable_file
self.show_llm_thoughts = show_llm_thoughts
self.console_level = console_level.upper()
self.preview_saved_files = preview_saved_files
self.preview_limit = preview_limit
self.run_id = _dt.datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6]
self.file_path = None
if enable_file:
os.makedirs(log_dir, exist_ok=True)
self.file_path = os.path.join(log_dir, f"run_{self.run_id}.jsonl")
def _now(self) -> str:
return _dt.datetime.now().isoformat(timespec='seconds')
def _console_enabled(self, level: str) -> bool:
order = ["DEBUG", "INFO", "WARN", "ERROR"]
try:
return order.index(level) >= order.index(self.console_level)
except Exception:
return True
def log_event(self, event: str, message: Optional[str] = None, **fields: Any) -> None:
rec: Dict[str, Any] = {
"ts": self._now(),
"run_id": self.run_id,
"event": event,
}
if message:
rec["message"] = message
if fields:
rec.update(fields)
# console (pretty one-liner)
if self._console_enabled("INFO"):
kv = " ".join(
f"{k}={str(v)[:120]}" for k, v in fields.items() if v is not None
)
line = f"[{rec['ts']}] {event}"
if message:
line += f" | {message}"
if kv:
line += f" | {kv}"
print(line)
# JSONL file
if self.enable_file and self.file_path:
try:
with open(self.file_path, "a", encoding="utf-8") as f:
f.write(json.dumps(rec, ensure_ascii=False) + "\n")
except Exception:
# Do not crash on logging errors
pass
def init_from_config(config: Dict[str, Any]) -> EventLogger:
global _LOGGER_INSTANCE
lg = config.get("logging", {}) if isinstance(config, dict) else {}
log_dir = lg.get("log_dir", "./logs")
enable_file = bool(lg.get("log_to_file", True))
console_level = str(lg.get("console_level", "INFO")).upper()
# env override for showing LLM thoughts
env_flag = os.environ.get("AIWS_SHOW_THOUGHTS")
show_llm_thoughts = bool(lg.get("show_thoughts", False)) or (str(env_flag).lower() in ("1", "true", "yes"))
from os import environ
preview_files = bool(lg.get("preview_saved_files", False)) or (str(environ.get("AIWS_LOG_FILE_PREVIEW")).lower() in ("1", "true", "yes"))
preview_limit = int(lg.get("preview_limit", 500)) if str(lg.get("preview_limit", "")).isdigit() else 500
_LOGGER_INSTANCE = EventLogger(
log_dir=log_dir,
enable_file=enable_file,
show_llm_thoughts=show_llm_thoughts,
console_level=console_level,
preview_saved_files=preview_files,
preview_limit=preview_limit,
)
_LOGGER_INSTANCE.log_event("run_start", message="Application run started")
return _LOGGER_INSTANCE
def get_logger() -> Optional[EventLogger]:
return _LOGGER_INSTANCE
# LangChain callback handler
try:
from langchain.callbacks.base import BaseCallbackHandler
except Exception: # pragma: no cover - fallback for newer versions
try:
from langchain_core.callbacks.base import BaseCallbackHandler # type: ignore
except Exception:
BaseCallbackHandler = object # minimal fallback
class LangChainEventsHandler(BaseCallbackHandler):
def __init__(self, logger: EventLogger):
super().__init__()
self.logger = logger
# Chains
def on_chain_start(self, serialized, inputs, **kwargs):
name = serialized.get("name") if isinstance(serialized, dict) else str(serialized)
self.logger.log_event("chain_start", name=name, inputs=_truncate(inputs))
def on_chain_end(self, outputs, **kwargs):
self.logger.log_event("chain_end", outputs=_truncate(outputs))
# LLMs
def on_llm_start(self, serialized, prompts, **kwargs):
if self.logger.show_llm_thoughts:
self.logger.log_event("llm_start", prompts=_truncate(prompts))
else:
self.logger.log_event("llm_start", message="prompt issued", prompt_count=len(prompts) if prompts else 0)
def on_llm_end(self, response, **kwargs):
try:
if self.logger.show_llm_thoughts:
texts = [g[0].text for g in response.generations] # type: ignore[attr-defined]
self.logger.log_event("llm_end", outputs=_truncate(texts))
else:
# token/length only when possible
usage = getattr(response, 'llm_output', None) or {}
self.logger.log_event("llm_end", message="llm completed", meta=_truncate(usage))
except Exception:
self.logger.log_event("llm_end")
# Tools
def on_tool_start(self, serialized, input_str, **kwargs):
name = serialized.get("name") if isinstance(serialized, dict) else str(serialized)
self.logger.log_event("tool_start", name=name, input=_truncate(input_str))
def on_tool_end(self, output, **kwargs):
self.logger.log_event("tool_end", output=_truncate(output))
# Agent actions
def on_agent_action(self, action, **kwargs):
try:
self.logger.log_event(
"agent_action",
tool=getattr(action, 'tool', None),
tool_input=_truncate(getattr(action, 'tool_input', None)),
log=_truncate(getattr(action, 'log', None)) if self.logger.show_llm_thoughts else None,
)
except Exception:
self.logger.log_event("agent_action")
def on_agent_finish(self, finish, **kwargs):
try:
out = getattr(finish, 'return_values', {}).get('output')
self.logger.log_event("agent_finish", output=_truncate(out))
except Exception:
self.logger.log_event("agent_finish")
def _truncate(obj: Any, limit: int = 800) -> Any:
try:
s = obj if isinstance(obj, str) else json.dumps(obj, ensure_ascii=False)
return s if len(s) <= limit else (s[:limit] + "")
except Exception:
try:
s = str(obj)
return s if len(s) <= limit else (s[:limit] + "")
except Exception:
return None