Files
AI_Devlop/AI_Web_Scraper/ai_agent.py

437 lines
18 KiB
Python

import json
import os
from typing import List, Dict
from transformers import AutoModelForCausalLM, AutoTokenizer, AutoConfig, pipeline
from transformers.utils import logging as hf_logging
from langchain_community.llms import HuggingFacePipeline
from langchain.agents import initialize_agent, AgentType
from langchain.tools import Tool
from langchain.memory import ConversationBufferMemory
from web_scraper import WebScraper
from google_drive_uploader import GoogleDriveUploader, SimpleDriveSaver
class AIAgent:
def __init__(self, config_path='./config.json'):
self.config_path = config_path
with open(config_path, 'r') as f:
self.config = json.load(f)
self.model_path = self.config['model_local_path']
self.max_tokens = self.config['max_tokens']
self.temperature = self.config['temperature']
# 모델 로드
self.model = None
self.tokenizer = None
self.llm = None
self.load_model()
# 도구들 초기화
self.web_scraper = WebScraper(config_path)
self.drive_uploader = GoogleDriveUploader(config_path)
self.simple_saver = SimpleDriveSaver(self.config['data_storage']['drive_mount_path'])
# LangChain 도구 정의
self.tools = [
Tool(
name="WebScraper",
func=self.scrape_web,
description="웹사이트에서 정보를 수집합니다. URL을 입력하세요."
),
Tool(
name="GoogleDriveUploader",
func=self.upload_to_drive_api,
description="Google Drive API를 사용하여 데이터를 업로드합니다. 데이터와 파일명을 입력하세요."
),
Tool(
name="SimpleDriveSaver",
func=self.save_to_drive_simple,
description="마운트된 Google Drive에 데이터를 저장합니다. 데이터와 파일명을 입력하세요."
)
]
# 메모리
self.memory = ConversationBufferMemory(memory_key="chat_history", return_messages=True)
# 에이전트 초기화
self.agent = initialize_agent(
tools=self.tools,
llm=self.llm,
agent=AgentType.CONVERSATIONAL_REACT_DESCRIPTION,
memory=self.memory,
verbose=True
)
def load_model(self):
"""
Hugging Face 모델을 로드합니다.
- model_downloader가 가져온 로컬 스냅샷을 우선 사용
- 양자화/디바이스 맵은 가능한 한 보수적으로 설정하고, 실패 시 단계적 폴백
"""
# GPU 메모리 최적화 설정
os.environ["PYTORCH_CUDA_ALLOC_CONF"] = "expandable_segments:True"
# Transformers 로깅 레벨을 낮춰 config __repr__ 경로로 인한 예외를 피함
try:
hf_logging.set_verbosity_error()
except Exception:
pass
model_settings = self.config.get('model_settings', {})
use_quantization = bool(model_settings.get('use_quantization', False))
torch_dtype_cfg = str(model_settings.get('torch_dtype', 'auto')).lower()
# dtype 파싱
import torch
dtype = None
if torch_dtype_cfg in ("float16", "fp16", "half"):
dtype = torch.float16
elif torch_dtype_cfg in ("bfloat16", "bf16"):
dtype = torch.bfloat16
elif torch_dtype_cfg in ("float32", "fp32"):
dtype = torch.float32
else:
dtype = None # auto
# 로컬 스냅샷이 있으면 우선 사용, 없으면 모델 이름 사용
model_source = self.model_path if os.path.isdir(self.model_path) else self.config.get('model_name')
if not model_source:
raise RuntimeError("모델 경로/이름이 설정되지 않았습니다.")
# quantization 설정 (가능한 경우에만)
quant_args = {}
if use_quantization:
try:
from transformers import BitsAndBytesConfig
quant_args["quantization_config"] = BitsAndBytesConfig(
load_in_8bit=True,
llm_int8_enable_fp32_cpu_offload=True
)
print("8bit 양자화 적용")
except Exception as _:
# transformers/bitsandbytes 호환 문제 시 양자화 비활성화
print("bitsandbytes/transformers 호환 문제로 양자화를 비활성화합니다.")
quant_args = {}
# 메모리 제한/오프로딩 설정
mm_cfg = model_settings.get('max_memory', {}) if isinstance(model_settings.get('max_memory', {}), dict) else {}
# normalize memory strings to GiB (accelerate accepts both, but unify)
def _norm_mem(v):
if not isinstance(v, str):
return v
return v.replace('GB', 'GiB').replace('gb', 'GiB')
max_memory = {}
if 0 in mm_cfg or 'gpu' in mm_cfg:
max_memory[0] = _norm_mem(mm_cfg.get(0, mm_cfg.get('gpu', '30GiB')))
if 'cpu' in mm_cfg:
max_memory['cpu'] = _norm_mem(mm_cfg.get('cpu', '60GiB'))
offload_folder = os.path.join(os.path.dirname(self.config_path), 'offload')
os.makedirs(offload_folder, exist_ok=True)
# 1차 시도: device_map="auto" + max_memory 로 로드
try:
self.tokenizer = AutoTokenizer.from_pretrained(model_source, trust_remote_code=True)
# config 사전 로드 후 리포의 quantization_config 키 제거 (MXFP4 등 회피)
cfg = AutoConfig.from_pretrained(model_source, trust_remote_code=True)
if hasattr(cfg, 'quantization_config'):
try:
delattr(cfg, 'quantization_config')
except Exception:
setattr(cfg, 'quantization_config', None)
load_kwargs = dict(
device_map="auto",
low_cpu_mem_usage=True,
offload_folder=offload_folder,
offload_state_dict=True,
trust_remote_code=True,
config=cfg,
)
if dtype is not None:
load_kwargs["torch_dtype"] = dtype
if max_memory:
load_kwargs["max_memory"] = max_memory
# use_quantization=True면 8bit 우선 시도 (repo의 다른 양자화 경로 우회)
if use_quantization:
try:
from transformers import BitsAndBytesConfig
tmp = BitsAndBytesConfig(load_in_8bit=True, llm_int8_enable_fp32_cpu_offload=True)
if hasattr(tmp, 'get_loading_attributes'):
load_kwargs["quantization_config"] = tmp
print("8bit 양자화 적용 (1차 시도, bnb 신 API)")
else:
# 레거시 API 시도
load_kwargs["load_in_8bit"] = True
load_kwargs["llm_int8_enable_fp32_cpu_offload"] = True
print("8bit 양자화 적용 (1차 시도, 레거시 API)")
except Exception as _:
print("bitsandbytes 감지 실패: 비양자화로 1차 시도 진행")
self.model = AutoModelForCausalLM.from_pretrained(
model_source,
**load_kwargs
)
except Exception as e1:
print(f"device_map=auto 로드 실패: {e1}")
# 2a. 비양자화로 다시 auto+offload 시도 (오류가 bnb/버전이면 이 경로로 성공 가능)
try:
self.tokenizer = AutoTokenizer.from_pretrained(model_source, trust_remote_code=True)
cfg = AutoConfig.from_pretrained(model_source, trust_remote_code=True)
if hasattr(cfg, 'quantization_config'):
try:
delattr(cfg, 'quantization_config')
except Exception:
setattr(cfg, 'quantization_config', None)
retry_no_quant = dict(
device_map="auto",
low_cpu_mem_usage=True,
offload_folder=offload_folder,
offload_state_dict=True,
trust_remote_code=True,
config=cfg,
)
if dtype is not None:
retry_no_quant["torch_dtype"] = dtype
if max_memory:
retry_no_quant["max_memory"] = max_memory
self.model = AutoModelForCausalLM.from_pretrained(model_source, **retry_no_quant)
print("비양자화 재시도 성공")
except Exception as e_noq:
print(f"비양자화 재시도 실패: {e_noq}")
# 2b. 8-bit 양자화로 재시도 (가능 시)
tried_int8 = False
try:
from transformers import BitsAndBytesConfig
print("8bit 양자화로 재시도합니다...")
self.tokenizer = AutoTokenizer.from_pretrained(model_source, trust_remote_code=True)
# config 재생성 및 quantization_config 제거
cfg = AutoConfig.from_pretrained(model_source, trust_remote_code=True)
if hasattr(cfg, 'quantization_config'):
try:
delattr(cfg, 'quantization_config')
except Exception:
setattr(cfg, 'quantization_config', None)
retry_kwargs = dict(
device_map="auto",
low_cpu_mem_usage=True,
offload_folder=offload_folder,
offload_state_dict=True,
trust_remote_code=True,
config=cfg,
)
if dtype is not None:
retry_kwargs["torch_dtype"] = dtype
if max_memory:
retry_kwargs["max_memory"] = max_memory
tmp = BitsAndBytesConfig(load_in_8bit=True, llm_int8_enable_fp32_cpu_offload=True)
if hasattr(tmp, 'get_loading_attributes'):
retry_kwargs["quantization_config"] = tmp
else:
retry_kwargs["load_in_8bit"] = True
retry_kwargs["llm_int8_enable_fp32_cpu_offload"] = True
self.model = AutoModelForCausalLM.from_pretrained(model_source, **retry_kwargs)
tried_int8 = True
except Exception as e_int8:
print(f"8bit 재시도 실패: {e_int8}")
if not tried_int8:
print("CPU로 폴백합니다.")
try:
import torch, gc
torch.cuda.empty_cache()
gc.collect()
except Exception:
pass
# CPU 강제 로드 (config의 quantization_config 제거)
self.tokenizer = AutoTokenizer.from_pretrained(model_source, trust_remote_code=True)
cfg = AutoConfig.from_pretrained(model_source, trust_remote_code=True)
if hasattr(cfg, 'quantization_config'):
try:
delattr(cfg, 'quantization_config')
except Exception:
setattr(cfg, 'quantization_config', None)
self.model = AutoModelForCausalLM.from_pretrained(
model_source,
device_map={"": "cpu"},
torch_dtype=torch.float32,
low_cpu_mem_usage=False,
trust_remote_code=True,
config=cfg
)
# 파이프라인 생성
pad_id = self.tokenizer.eos_token_id if getattr(self.tokenizer, 'eos_token_id', None) is not None else None
pipe = pipeline(
"text-generation",
model=self.model,
tokenizer=self.tokenizer,
max_new_tokens=self.max_tokens,
temperature=self.temperature,
do_sample=True,
pad_token_id=pad_id
)
self.llm = HuggingFacePipeline(pipeline=pipe)
print("모델 로드 완료")
# 간단 검색: DuckDuckGo HTML 결과 파싱 (외부 API 불필요)
def _search_urls(self, query: str, k: int = 5) -> List[str]:
import requests
from bs4 import BeautifulSoup
q = query.strip().replace(' ', '+')
url = f"https://duckduckgo.com/html/?q={q}"
headers = {"User-Agent": self.config['web_scraping']['user_agent']}
try:
r = requests.get(url, headers=headers, timeout=20)
r.raise_for_status()
soup = BeautifulSoup(r.text, 'html.parser')
links = []
for a in soup.select('a.result__a'):
href = a.get('href')
if href and href.startswith('http'):
links.append(href)
if len(links) >= k:
break
return links
except Exception as e:
print(f"검색 실패({query}): {e}")
return []
def collect_information(self, topics: List[str]) -> List[Dict[str, str]]:
"""
주제별로 웹 검색 → 스크래핑 → 요약 → 저장까지 수행
반환: [{ topic, response }]
"""
results = []
for topic in topics:
urls = self._search_urls(topic, k=5)
collected = []
for u in urls[:5]:
data = self.web_scraper.scrape_website(u)
if data:
collected.append(data)
# 저장 (간단 저장 도구)
filename = f"{topic[:50].replace(' ', '_')}.json"
self.simple_saver.save_data_as_json(collected, filename)
# 간단 요약 생성
try:
snippet = "\n\n".join([d.get('title', '') + ": " + d.get('description', '') for d in collected[:3]])
prompt = f"""
다음 자료를 간결히 요약하고 핵심 포인트 3가지를 bullet로 정리하세요.
주제: {topic}
자료:
{snippet}
"""
summary = self.llm(prompt)
except Exception as e:
summary = f"요약 실패: {e}"
results.append({"topic": topic, "response": summary})
return results
def scrape_web(self, url):
"""
웹 스크래핑 도구 함수
"""
data = self.web_scraper.scrape_website(url)
if data:
return f"수집 완료: {data['title']} - {data['description'][:200]}..."
else:
return "수집 실패"
def upload_to_drive_api(self, data_and_filename):
"""
Google Drive API 업로드 도구 함수
"""
try:
# 간단한 파싱 (실제로는 더 정교하게)
parts = data_and_filename.split('|')
if len(parts) == 2:
data = json.loads(parts[0])
filename = parts[1]
else:
data = {"error": "잘못된 형식"}
filename = "error.json"
file_id = self.drive_uploader.upload_data_as_json(data, filename)
return f"업로드 완료: {file_id}"
except Exception as e:
return f"업로드 실패: {e}"
def save_to_drive_simple(self, data_and_filename):
"""
마운트된 Drive에 저장하는 도구 함수
"""
try:
parts = data_and_filename.split('|')
if len(parts) == 2:
data = json.loads(parts[0])
filename = parts[1]
else:
data = {"error": "잘못된 형식"}
filename = "error.json"
filepath = self.simple_saver.save_data_as_json(data, filename)
return f"저장 완료: {filepath}"
except Exception as e:
return f"저장 실패: {e}"
def run_agent(self, task_description):
"""
AI 에이전트를 실행합니다.
"""
try:
response = self.agent.run(task_description)
return response
except Exception as e:
print(f"에이전트 실행 실패: {e}")
return None
def generate_topics(self, num_topics=3):
"""
AI가 스스로 흥미로운 주제를 생성합니다.
"""
prompt = f"""
당신은 AI 연구원입니다. 현재 세계에서 가장 흥미롭고 조사할 가치가 있는 기술 및 과학 분야의 주제 {num_topics}개를 선정해주세요.
다음 기준을 고려하세요:
1. 최근 트렌드나 미래 지향적인 주제
2. 사회적 영향이 큰 주제
3. 기술 발전이 빠른 분야
4. AI와 관련된 주제 우선
각 주제는 구체적이고 조사하기 쉬운 형태로 제시해주세요.
예시: "양자 컴퓨팅의 최근 발전", "생성형 AI의 윤리적 문제"
주제 목록만 출력하고, 다른 설명은 하지 마세요.
형식: 각 줄에 하나의 주제
"""
try:
response = self.llm(prompt)
# 응답에서 주제들을 추출 (줄 단위로 분리)
topics = [line.strip() for line in response.split('\n') if line.strip() and not line.startswith(('1.', '2.', '3.', '-'))]
# 최대 num_topics개 반환
return topics[:num_topics]
except Exception as e:
print(f"주제 생성 실패: {e}")
# 기본 주제 반환
return ["AI 기술 동향", "머신러닝 응용", "딥러닝 최신 연구"]
def close(self):
self.web_scraper.close()
if __name__ == "__main__":
agent = AIAgent()
# 테스트용
topics = ["인공지능 최신 트렌드", "머신러닝 기초"]
results = agent.collect_information(topics)
print("수집 결과:", results)
agent.close()