482 lines
20 KiB
Python
482 lines
20 KiB
Python
import json
|
|
import os
|
|
from typing import List, Dict
|
|
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig, pipeline
|
|
from transformers.utils import logging as hf_logging
|
|
from langchain_community.llms import HuggingFacePipeline
|
|
from langchain.agents import initialize_agent, AgentType
|
|
from langchain.tools import Tool
|
|
from langchain.memory import ConversationBufferMemory
|
|
from web_scraper import WebScraper
|
|
from google_drive_uploader import GoogleDriveUploader, SimpleDriveSaver
|
|
|
|
class AIAgent:
|
|
def __init__(self, config_path='./config.json'):
|
|
self.config_path = config_path
|
|
with open(config_path, 'r') as f:
|
|
self.config = json.load(f)
|
|
|
|
self.model_path = self.config['model_local_path']
|
|
self.max_tokens = self.config['max_tokens']
|
|
self.temperature = self.config['temperature']
|
|
|
|
# 모델 로드
|
|
self.model = None
|
|
self.tokenizer = None
|
|
self.llm = None
|
|
self.load_model()
|
|
|
|
# 도구들 초기화
|
|
self.web_scraper = WebScraper(config_path)
|
|
self.simple_saver = SimpleDriveSaver(self.config['data_storage']['drive_mount_path'])
|
|
|
|
# Google Drive API는 선택 사항: 자격증명 파일이 존재할 때만 활성화
|
|
self.drive_uploader = None
|
|
try:
|
|
drive_folder_id = self.config.get('google_drive_folder_id', '').strip()
|
|
creds_path = self.config.get('google_credentials_path', '').strip()
|
|
has_valid_folder = bool(drive_folder_id) and drive_folder_id != 'YOUR_GOOGLE_DRIVE_FOLDER_ID'
|
|
has_creds_file = bool(creds_path) and __import__('os').path.isfile(creds_path)
|
|
if has_valid_folder and has_creds_file:
|
|
self.drive_uploader = GoogleDriveUploader(config_path)
|
|
else:
|
|
reason = []
|
|
if not has_valid_folder:
|
|
reason.append('folder_id 미설정')
|
|
if not has_creds_file:
|
|
reason.append('credentials 파일 없음')
|
|
print(f"Google Drive API 비활성화 ({', '.join(reason)}) — SimpleDriveSaver만 사용")
|
|
except Exception as e:
|
|
# 어떤 이유로든 초기화 실패 시 API 도구는 비활성화하고 계속 진행
|
|
print(f"Google Drive API 초기화 실패: {e} — SimpleDriveSaver만 사용")
|
|
|
|
# LangChain 도구 정의
|
|
self.tools = [
|
|
Tool(
|
|
name="WebScraper",
|
|
func=self.scrape_web,
|
|
description="웹사이트에서 정보를 수집합니다. URL을 입력하세요."
|
|
),
|
|
Tool(
|
|
name="SimpleDriveSaver",
|
|
func=self.save_to_drive_simple,
|
|
description="마운트된 Google Drive에 데이터를 저장합니다. 데이터와 파일명을 입력하세요."
|
|
)
|
|
]
|
|
|
|
# Google Drive API 도구는 사용 가능할 때만 추가
|
|
if self.drive_uploader is not None:
|
|
self.tools.append(
|
|
Tool(
|
|
name="GoogleDriveUploader",
|
|
func=self.upload_to_drive_api,
|
|
description="Google Drive API를 사용하여 데이터를 업로드합니다. 데이터(JSON)|파일명 형식으로 입력하세요."
|
|
)
|
|
)
|
|
|
|
# 메모리
|
|
self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
|
|
|
|
# 에이전트 초기화
|
|
self.agent = initialize_agent(
|
|
tools=self.tools,
|
|
llm=self.llm,
|
|
agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
|
|
memory=self.memory,
|
|
verbose=True
|
|
)
|
|
|
|
def load_model(self):
|
|
"""
|
|
Hugging Face 모델을 로드합니다.
|
|
- model_downloader가 가져온 로컬 스냅샷을 우선 사용
|
|
- 양자화/디바이스 맵은 가능한 한 보수적으로 설정하고, 실패 시 단계적 폴백
|
|
"""
|
|
# GPU 메모리 최적화 설정
|
|
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
|
|
# Transformers 로깅 레벨을 낮춰 config __repr__ 경로로 인한 예외를 피함
|
|
try:
|
|
hf_logging.set_verbosity_error()
|
|
except Exception:
|
|
pass
|
|
# 안전한 __repr__ 패치로 f-string 로깅 중 예외 회피
|
|
try:
|
|
from transformers.configuration_utils import PretrainedConfig as _HFPC
|
|
_orig_repr = getattr(_HFPC, '__repr__', None)
|
|
def _safe_repr(self):
|
|
try:
|
|
# 가능한 경우 최소 정보만 반환
|
|
name = getattr(self, '__class__', type('X', (), {})).__name__
|
|
model_type = getattr(self, 'model_type', 'unknown')
|
|
return f"{name}(model_type={model_type})"
|
|
except Exception:
|
|
return f"PretrainedConfig(unknown)"
|
|
if _orig_repr is not None and getattr(_HFPC.__repr__, '_aiws_safe', None) is None:
|
|
_safe_repr._aiws_safe = True
|
|
_HFPC.__repr__ = _safe_repr
|
|
except Exception:
|
|
pass
|
|
|
|
model_settings = self.config.get('model_settings', {})
|
|
use_quantization = bool(model_settings.get('use_quantization', False))
|
|
# 양자화 비트/오프로딩 옵션
|
|
try:
|
|
quant_bits = int(model_settings.get('quantization_bits', 8))
|
|
except Exception:
|
|
quant_bits = 8
|
|
cpu_offload = bool(model_settings.get('cpu_offload', False))
|
|
torch_dtype_cfg = str(model_settings.get('torch_dtype', 'auto')).lower()
|
|
|
|
# dtype 파싱
|
|
import torch
|
|
dtype = None
|
|
if torch_dtype_cfg in ("float16", "fp16", "half"):
|
|
dtype = torch.float16
|
|
elif torch_dtype_cfg in ("bfloat16", "bf16"):
|
|
dtype = torch.bfloat16
|
|
elif torch_dtype_cfg in ("float32", "fp32"):
|
|
dtype = torch.float32
|
|
else:
|
|
dtype = None # auto
|
|
|
|
# 로컬 스냅샷이 있으면 우선 사용, 없으면 모델 이름 사용
|
|
model_source = self.model_path if os.path.isdir(self.model_path) else self.config.get('model_name')
|
|
if not model_source:
|
|
raise RuntimeError("모델 경로/이름이 설정되지 않았습니다.")
|
|
|
|
# (이전) quant_args 경로 제거: load_kwargs에서 직접 처리
|
|
|
|
# 메모리 제한/오프로딩 설정
|
|
mm_cfg = model_settings.get('max_memory', {}) if isinstance(model_settings.get('max_memory', {}), dict) else {}
|
|
# normalize memory strings to GiB (accelerate accepts both, but unify)
|
|
def _norm_mem(v):
|
|
if not isinstance(v, str):
|
|
return v
|
|
return v.replace('GB', 'GiB').replace('gb', 'GiB')
|
|
max_memory = {}
|
|
if 0 in mm_cfg or 'gpu' in mm_cfg:
|
|
max_memory[0] = _norm_mem(mm_cfg.get(0, mm_cfg.get('gpu', '30GiB')))
|
|
if 'cpu' in mm_cfg:
|
|
max_memory['cpu'] = _norm_mem(mm_cfg.get('cpu', '60GiB'))
|
|
offload_folder = os.path.join(os.path.dirname(self.config_path), 'offload')
|
|
os.makedirs(offload_folder, exist_ok=True)
|
|
|
|
# 1차 시도: device_map="auto" + max_memory 로 로드
|
|
try:
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_source, trust_remote_code=True)
|
|
# config 사전 로드 후 리포의 quantization_config 키 제거 (MXFP4 등 회피)
|
|
cfg = AutoConfig.from_pretrained(model_source, trust_remote_code=True)
|
|
if hasattr(cfg, 'quantization_config'):
|
|
try:
|
|
delattr(cfg, 'quantization_config')
|
|
except Exception:
|
|
setattr(cfg, 'quantization_config', None)
|
|
load_kwargs = dict(
|
|
device_map="auto",
|
|
low_cpu_mem_usage=True,
|
|
offload_folder=offload_folder,
|
|
offload_state_dict=True,
|
|
trust_remote_code=True,
|
|
config=cfg,
|
|
)
|
|
if dtype is not None:
|
|
load_kwargs["torch_dtype"] = dtype
|
|
if max_memory:
|
|
load_kwargs["max_memory"] = max_memory
|
|
|
|
# use_quantization=True면 4bit 우선, 아니면 8bit 레거시 플래그 사용
|
|
if use_quantization:
|
|
if quant_bits == 4:
|
|
try:
|
|
from transformers import BitsAndBytesConfig
|
|
load_kwargs["quantization_config"] = BitsAndBytesConfig(
|
|
load_in_4bit=True,
|
|
bnb_4bit_quant_type="nf4",
|
|
bnb_4bit_use_double_quant=True,
|
|
bnb_4bit_compute_dtype=__import__('torch').bfloat16
|
|
)
|
|
print("4bit 양자화 적용 (bnb nf4)")
|
|
except Exception as _:
|
|
load_kwargs["load_in_8bit"] = True
|
|
if cpu_offload:
|
|
load_kwargs["llm_int8_enable_fp32_cpu_offload"] = True
|
|
print("4bit 미지원 → 8bit(레거시)로 폴백")
|
|
else:
|
|
load_kwargs["load_in_8bit"] = True
|
|
if cpu_offload:
|
|
load_kwargs["llm_int8_enable_fp32_cpu_offload"] = True
|
|
print("8bit 양자화 적용 (레거시 플래그)")
|
|
|
|
self.model = AutoModelForCausalLM.from_pretrained(
|
|
model_source,
|
|
**load_kwargs
|
|
)
|
|
except Exception as e1:
|
|
print(f"device_map=auto 로드 실패: {e1}")
|
|
|
|
# 2a. 비양자화로 다시 auto+offload 시도 (오류가 bnb/버전이면 이 경로로 성공 가능)
|
|
try:
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_source, trust_remote_code=True)
|
|
cfg = AutoConfig.from_pretrained(model_source, trust_remote_code=True)
|
|
if hasattr(cfg, 'quantization_config'):
|
|
try:
|
|
delattr(cfg, 'quantization_config')
|
|
except Exception:
|
|
setattr(cfg, 'quantization_config', None)
|
|
retry_no_quant = dict(
|
|
device_map="auto",
|
|
low_cpu_mem_usage=True,
|
|
offload_folder=offload_folder,
|
|
offload_state_dict=True,
|
|
trust_remote_code=True,
|
|
config=cfg,
|
|
)
|
|
if dtype is not None:
|
|
retry_no_quant["torch_dtype"] = dtype
|
|
if max_memory:
|
|
retry_no_quant["max_memory"] = max_memory
|
|
self.model = AutoModelForCausalLM.from_pretrained(model_source, **retry_no_quant)
|
|
print("비양자화 재시도 성공")
|
|
except Exception as e_noq:
|
|
print(f"비양자화 재시도 실패: {e_noq}")
|
|
|
|
# 2b. 양자화로 재시도 (4bit 우선, 아니면 8bit)
|
|
loaded = False
|
|
try:
|
|
print("양자화로 재시도합니다...")
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_source, trust_remote_code=True)
|
|
cfg = AutoConfig.from_pretrained(model_source, trust_remote_code=True)
|
|
if hasattr(cfg, 'quantization_config'):
|
|
try:
|
|
delattr(cfg, 'quantization_config')
|
|
except Exception:
|
|
setattr(cfg, 'quantization_config', None)
|
|
retry_kwargs = dict(
|
|
device_map="auto",
|
|
low_cpu_mem_usage=True,
|
|
offload_folder=offload_folder,
|
|
offload_state_dict=True,
|
|
trust_remote_code=True,
|
|
config=cfg,
|
|
)
|
|
if dtype is not None:
|
|
retry_kwargs["torch_dtype"] = dtype
|
|
if max_memory:
|
|
retry_kwargs["max_memory"] = max_memory
|
|
if quant_bits == 4:
|
|
from transformers import BitsAndBytesConfig
|
|
retry_kwargs["quantization_config"] = BitsAndBytesConfig(
|
|
load_in_4bit=True,
|
|
bnb_4bit_quant_type="nf4",
|
|
bnb_4bit_use_double_quant=True,
|
|
bnb_4bit_compute_dtype=__import__('torch').bfloat16
|
|
)
|
|
else:
|
|
retry_kwargs["load_in_8bit"] = True
|
|
if cpu_offload:
|
|
retry_kwargs["llm_int8_enable_fp32_cpu_offload"] = True
|
|
|
|
self.model = AutoModelForCausalLM.from_pretrained(model_source, **retry_kwargs)
|
|
loaded = True
|
|
except Exception as e_q:
|
|
print(f"양자화 재시도 실패: {e_q}")
|
|
|
|
if not loaded:
|
|
print("CPU로 폴백합니다.")
|
|
try:
|
|
import torch, gc
|
|
torch.cuda.empty_cache()
|
|
gc.collect()
|
|
except Exception:
|
|
pass
|
|
|
|
# CPU 강제 로드 (config의 quantization_config 제거)
|
|
self.tokenizer = AutoTokenizer.from_pretrained(model_source, trust_remote_code=True)
|
|
cfg = AutoConfig.from_pretrained(model_source, trust_remote_code=True)
|
|
if hasattr(cfg, 'quantization_config'):
|
|
try:
|
|
delattr(cfg, 'quantization_config')
|
|
except Exception:
|
|
setattr(cfg, 'quantization_config', None)
|
|
self.model = AutoModelForCausalLM.from_pretrained(
|
|
model_source,
|
|
device_map={"": "cpu"},
|
|
torch_dtype=torch.float32,
|
|
low_cpu_mem_usage=False,
|
|
trust_remote_code=True,
|
|
config=cfg
|
|
)
|
|
|
|
# 파이프라인 생성
|
|
pad_id = self.tokenizer.eos_token_id if getattr(self.tokenizer, 'eos_token_id', None) is not None else None
|
|
pipe = pipeline(
|
|
"text-generation",
|
|
model=self.model,
|
|
tokenizer=self.tokenizer,
|
|
max_new_tokens=self.max_tokens,
|
|
temperature=self.temperature,
|
|
do_sample=True,
|
|
pad_token_id=pad_id
|
|
)
|
|
self.llm = HuggingFacePipeline(pipeline=pipe)
|
|
print("모델 로드 완료")
|
|
|
|
# 간단 검색: DuckDuckGo HTML 결과 파싱 (외부 API 불필요)
|
|
def _search_urls(self, query: str, k: int = 5) -> List[str]:
|
|
import requests
|
|
from bs4 import BeautifulSoup
|
|
q = query.strip().replace(' ', '+')
|
|
url = f"https://duckduckgo.com/html/?q={q}"
|
|
headers = {"User-Agent": self.config['web_scraping']['user_agent']}
|
|
try:
|
|
r = requests.get(url, headers=headers, timeout=20)
|
|
r.raise_for_status()
|
|
soup = BeautifulSoup(r.text, 'html.parser')
|
|
links = []
|
|
for a in soup.select('a.result__a'):
|
|
href = a.get('href')
|
|
if href and href.startswith('http'):
|
|
links.append(href)
|
|
if len(links) >= k:
|
|
break
|
|
return links
|
|
except Exception as e:
|
|
print(f"검색 실패({query}): {e}")
|
|
return []
|
|
|
|
def collect_information(self, topics: List[str]) -> List[Dict[str, str]]:
|
|
"""
|
|
주제별로 웹 검색 → 스크래핑 → 요약 → 저장까지 수행
|
|
반환: [{ topic, response }]
|
|
"""
|
|
results = []
|
|
for topic in topics:
|
|
urls = self._search_urls(topic, k=5)
|
|
collected = []
|
|
for u in urls[:5]:
|
|
data = self.web_scraper.scrape_website(u)
|
|
if data:
|
|
collected.append(data)
|
|
|
|
# 저장 (간단 저장 도구)
|
|
filename = f"{topic[:50].replace(' ', '_')}.json"
|
|
self.simple_saver.save_data_as_json(collected, filename)
|
|
|
|
# 간단 요약 생성
|
|
try:
|
|
snippet = "\n\n".join([d.get('title', '') + ": " + d.get('description', '') for d in collected[:3]])
|
|
prompt = f"""
|
|
다음 자료를 간결히 요약하고 핵심 포인트 3가지를 bullet로 정리하세요.
|
|
주제: {topic}
|
|
|
|
자료:
|
|
{snippet}
|
|
"""
|
|
summary = self.llm(prompt)
|
|
except Exception as e:
|
|
summary = f"요약 실패: {e}"
|
|
|
|
results.append({"topic": topic, "response": summary})
|
|
|
|
return results
|
|
|
|
def scrape_web(self, url):
|
|
"""
|
|
웹 스크래핑 도구 함수
|
|
"""
|
|
data = self.web_scraper.scrape_website(url)
|
|
if data:
|
|
return f"수집 완료: {data['title']} - {data['description'][:200]}..."
|
|
else:
|
|
return "수집 실패"
|
|
|
|
def upload_to_drive_api(self, data_and_filename):
|
|
"""
|
|
Google Drive API 업로드 도구 함수
|
|
"""
|
|
try:
|
|
# 간단한 파싱 (실제로는 더 정교하게)
|
|
parts = data_and_filename.split('|')
|
|
if len(parts) == 2:
|
|
data = json.loads(parts[0])
|
|
filename = parts[1]
|
|
else:
|
|
data = {"error": "잘못된 형식"}
|
|
filename = "error.json"
|
|
|
|
file_id = self.drive_uploader.upload_data_as_json(data, filename)
|
|
return f"업로드 완료: {file_id}"
|
|
except Exception as e:
|
|
return f"업로드 실패: {e}"
|
|
|
|
def save_to_drive_simple(self, data_and_filename):
|
|
"""
|
|
마운트된 Drive에 저장하는 도구 함수
|
|
"""
|
|
try:
|
|
parts = data_and_filename.split('|')
|
|
if len(parts) == 2:
|
|
data = json.loads(parts[0])
|
|
filename = parts[1]
|
|
else:
|
|
data = {"error": "잘못된 형식"}
|
|
filename = "error.json"
|
|
|
|
filepath = self.simple_saver.save_data_as_json(data, filename)
|
|
return f"저장 완료: {filepath}"
|
|
except Exception as e:
|
|
return f"저장 실패: {e}"
|
|
|
|
def run_agent(self, task_description):
|
|
"""
|
|
AI 에이전트를 실행합니다.
|
|
"""
|
|
try:
|
|
response = self.agent.run(task_description)
|
|
return response
|
|
except Exception as e:
|
|
print(f"에이전트 실행 실패: {e}")
|
|
return None
|
|
|
|
def generate_topics(self, num_topics=3):
|
|
"""
|
|
AI가 스스로 흥미로운 주제를 생성합니다.
|
|
"""
|
|
prompt = f"""
|
|
당신은 AI 연구원입니다. 현재 세계에서 가장 흥미롭고 조사할 가치가 있는 기술 및 과학 분야의 주제 {num_topics}개를 선정해주세요.
|
|
|
|
다음 기준을 고려하세요:
|
|
1. 최근 트렌드나 미래 지향적인 주제
|
|
2. 사회적 영향이 큰 주제
|
|
3. 기술 발전이 빠른 분야
|
|
4. AI와 관련된 주제 우선
|
|
|
|
각 주제는 구체적이고 조사하기 쉬운 형태로 제시해주세요.
|
|
예시: "양자 컴퓨팅의 최근 발전", "생성형 AI의 윤리적 문제"
|
|
|
|
주제 목록만 출력하고, 다른 설명은 하지 마세요.
|
|
형식: 각 줄에 하나의 주제
|
|
"""
|
|
|
|
try:
|
|
response = self.llm(prompt)
|
|
# 응답에서 주제들을 추출 (줄 단위로 분리)
|
|
topics = [line.strip() for line in response.split('\n') if line.strip() and not line.startswith(('1.', '2.', '3.', '-'))]
|
|
# 최대 num_topics개 반환
|
|
return topics[:num_topics]
|
|
except Exception as e:
|
|
print(f"주제 생성 실패: {e}")
|
|
# 기본 주제 반환
|
|
return ["AI 기술 동향", "머신러닝 응용", "딥러닝 최신 연구"]
|
|
|
|
def close(self):
|
|
self.web_scraper.close()
|
|
|
|
if __name__ == "__main__":
|
|
agent = AIAgent()
|
|
# 테스트용
|
|
topics = ["인공지능 최신 트렌드", "머신러닝 기초"]
|
|
results = agent.collect_information(topics)
|
|
print("수집 결과:", results)
|
|
agent.close()
|