import os import json import uuid import datetime as _dt from typing import Any, Dict, Optional _LOGGER_INSTANCE = None class EventLogger: """ Lightweight JSONL + console event logger for the app. Avoids exposing LLM chain-of-thought by default; can be opted-in via config/env. """ def __init__(self, log_dir: str = "./logs", enable_file: bool = True, show_llm_thoughts: bool = False, console_level: str = "INFO", preview_saved_files: bool = False, preview_limit: int = 500): self.log_dir = log_dir self.enable_file = enable_file self.show_llm_thoughts = show_llm_thoughts self.console_level = console_level.upper() self.preview_saved_files = preview_saved_files self.preview_limit = preview_limit self.run_id = _dt.datetime.now().strftime("%Y%m%d_%H%M%S") + "_" + uuid.uuid4().hex[:6] self.file_path = None if enable_file: os.makedirs(log_dir, exist_ok=True) self.file_path = os.path.join(log_dir, f"run_{self.run_id}.jsonl") def _now(self) -> str: return _dt.datetime.now().isoformat(timespec='seconds') def _console_enabled(self, level: str) -> bool: order = ["DEBUG", "INFO", "WARN", "ERROR"] try: return order.index(level) >= order.index(self.console_level) except Exception: return True def log_event(self, event: str, message: Optional[str] = None, **fields: Any) -> None: rec: Dict[str, Any] = { "ts": self._now(), "run_id": self.run_id, "event": event, } if message: rec["message"] = message if fields: rec.update(fields) # console (pretty one-liner) if self._console_enabled("INFO"): kv = " ".join( f"{k}={str(v)[:120]}" for k, v in fields.items() if v is not None ) line = f"[{rec['ts']}] {event}" if message: line += f" | {message}" if kv: line += f" | {kv}" print(line) # JSONL file if self.enable_file and self.file_path: try: with open(self.file_path, "a", encoding="utf-8") as f: f.write(json.dumps(rec, ensure_ascii=False) + "\n") except Exception: # Do not crash on logging errors pass def init_from_config(config: Dict[str, Any]) -> EventLogger: global _LOGGER_INSTANCE lg = config.get("logging", {}) if isinstance(config, dict) else {} log_dir = lg.get("log_dir", "./logs") enable_file = bool(lg.get("log_to_file", True)) console_level = str(lg.get("console_level", "INFO")).upper() # env override for showing LLM thoughts env_flag = os.environ.get("AIWS_SHOW_THOUGHTS") show_llm_thoughts = bool(lg.get("show_thoughts", False)) or (str(env_flag).lower() in ("1", "true", "yes")) from os import environ preview_files = bool(lg.get("preview_saved_files", False)) or (str(environ.get("AIWS_LOG_FILE_PREVIEW")).lower() in ("1", "true", "yes")) preview_limit = int(lg.get("preview_limit", 500)) if str(lg.get("preview_limit", "")).isdigit() else 500 _LOGGER_INSTANCE = EventLogger( log_dir=log_dir, enable_file=enable_file, show_llm_thoughts=show_llm_thoughts, console_level=console_level, preview_saved_files=preview_files, preview_limit=preview_limit, ) _LOGGER_INSTANCE.log_event("run_start", message="Application run started") return _LOGGER_INSTANCE def get_logger() -> Optional[EventLogger]: return _LOGGER_INSTANCE # LangChain callback handler try: from langchain.callbacks.base import BaseCallbackHandler except Exception: # pragma: no cover - fallback for newer versions try: from langchain_core.callbacks.base import BaseCallbackHandler # type: ignore except Exception: BaseCallbackHandler = object # minimal fallback class LangChainEventsHandler(BaseCallbackHandler): def __init__(self, logger: EventLogger): super().__init__() self.logger = logger # Chains def on_chain_start(self, serialized, inputs, **kwargs): name = serialized.get("name") if isinstance(serialized, dict) else str(serialized) self.logger.log_event("chain_start", name=name, inputs=_truncate(inputs)) def on_chain_end(self, outputs, **kwargs): self.logger.log_event("chain_end", outputs=_truncate(outputs)) # LLMs def on_llm_start(self, serialized, prompts, **kwargs): if self.logger.show_llm_thoughts: self.logger.log_event("llm_start", prompts=_truncate(prompts)) else: self.logger.log_event("llm_start", message="prompt issued", prompt_count=len(prompts) if prompts else 0) def on_llm_end(self, response, **kwargs): try: if self.logger.show_llm_thoughts: texts = [g[0].text for g in response.generations] # type: ignore[attr-defined] self.logger.log_event("llm_end", outputs=_truncate(texts)) else: # token/length only when possible usage = getattr(response, 'llm_output', None) or {} self.logger.log_event("llm_end", message="llm completed", meta=_truncate(usage)) except Exception: self.logger.log_event("llm_end") # Tools def on_tool_start(self, serialized, input_str, **kwargs): name = serialized.get("name") if isinstance(serialized, dict) else str(serialized) self.logger.log_event("tool_start", name=name, input=_truncate(input_str)) def on_tool_end(self, output, **kwargs): self.logger.log_event("tool_end", output=_truncate(output)) # Agent actions def on_agent_action(self, action, **kwargs): try: self.logger.log_event( "agent_action", tool=getattr(action, 'tool', None), tool_input=_truncate(getattr(action, 'tool_input', None)), log=_truncate(getattr(action, 'log', None)) if self.logger.show_llm_thoughts else None, ) except Exception: self.logger.log_event("agent_action") def on_agent_finish(self, finish, **kwargs): try: out = getattr(finish, 'return_values', {}).get('output') self.logger.log_event("agent_finish", output=_truncate(out)) except Exception: self.logger.log_event("agent_finish") def _truncate(obj: Any, limit: int = 800) -> Any: try: s = obj if isinstance(obj, str) else json.dumps(obj, ensure_ascii=False) return s if len(s) <= limit else (s[:limit] + "…") except Exception: try: s = str(obj) return s if len(s) <= limit else (s[:limit] + "…") except Exception: return None