themblem/api/products/aichat.py
2025-10-29 21:27:29 +00:00

491 lines
18 KiB
Python
Executable File
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
"""
AI Chat Module for 徵象防伪验证平台
基于 Moonshot Kimi K2 API 的智能客服聊天系统
"""
import os
import json
import time
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
from openai import OpenAI
from django.conf import settings
@dataclass
class ToolCall:
"""工具调用数据结构"""
id: str
type: str
function: Dict[str, Any]
@dataclass
class ToolResult:
"""工具执行结果"""
tool_call_id: str
content: str
class AIChatService:
"""AI聊天服务主类"""
def __init__(self, api_key: str = None, base_url: str = None, chat_type: str = 'platform', product_id: int = None):
"""初始化AI聊天服务
Args:
api_key: Moonshot API密钥
base_url: API基础URL
chat_type: 聊天类型 ('platform''product')
product_id: 产品ID当chat_type为'product'时使用
"""
self.api_key = api_key or getattr(settings, 'KIMI_API_KEY', None)
self.base_url = base_url or getattr(settings, 'KIMI_API_URL', None)
if not self.api_key:
raise ValueError("KIMI_API_KEY is required")
# 聊天类型和产品ID
self.chat_type = chat_type
self.product_id = product_id
# 验证参数
if chat_type == 'product' and not product_id:
raise ValueError("当chat_type为'product'必须提供product_id")
self.client = OpenAI(
api_key=self.api_key,
base_url=self.base_url
)
self.model = "kimi-k2-0711-preview"
self.conversation_history: List[Dict[str, str]] = []
self.tools = self._define_tools()
# 添加系统提示词
self._add_system_message()
def _define_tools(self) -> List[Dict[str, Any]]:
"""定义可用工具"""
return [
{
"type": "function",
"function": {
"name": "start_qr_scan",
"description": "启动二维码扫描,暂停对话等待扫描结果",
"parameters": {
"type": "object",
"properties": {},
"required": []
}
}
},
{
"type": "function",
"function": {
"name": "search_knowledge_base",
"description": "从平台知识库中检索相关信息",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "搜索查询"},
"max_results": {"type": "integer", "description": "最大结果数", "default": 3}
},
"required": ["query"]
}
}
}
]
def _add_system_message(self):
"""添加系统提示词"""
if self.chat_type == 'platform':
system_prompt = """你是一个专业的徵象防伪验证平台AI客服助手。
平台介绍:
徵象是由广州市诚投科技有限公司开发的AI驱动的智能防伪平台通过多模态特征识别构建新一代防伪验证体系实现从物理防伪到数字认证的全链路保护。系统采用ISO 12931国际防伪标准已获取国家发明专利(证书编号:CN 115222000 B)。
服务范围:
1. 商品防伪验证
2. 证件安全验证
3. 工业品防伪
4. 品牌数字化服务
服务原则:
1. 严谨谦虚,基于知识库和真实数据回答,绝不捏造事实
2. 优先使用官方信息和知识库内容回答
3. 对于不确定或超出知识范围的信息,明确告知用户"我不确定""需要进一步确认"
4. 如果知识库中没有相关信息,诚实告知用户,不要推测或编造
5. 引导用户使用相关功能和服务
6. 当用户需要验证产品真伪时,主动提供二维码扫描功能
可用工具:
- start_qr_scan: 启动二维码扫描功能,用于产品防伪验证
- search_knowledge_base: 从平台知识库中检索相关信息,用于回答专业问题
回答要求:
- 严格基于知识库检索结果和真实数据回答
- 如果知识库中没有相关信息,请明确告知用户
- 对于专业问题,优先使用知识库搜索工具获取准确信息
- 保持严谨谦虚的态度,不夸大、不推测、不编造
- 如果用户需要验证产品,请使用二维码扫描工具"""
elif self.chat_type == 'product':
system_prompt = f"""你是一个专业的徵象防伪验证平台产品客服助手专门为产品ID {self.product_id} 提供客服服务。
平台介绍:
徵象是由广州市诚投科技有限公司开发的AI驱动的智能防伪平台通过多模态特征识别构建新一代防伪验证体系实现从物理防伪到数字认证的全链路保护。
当前服务产品:
- 产品ID: {self.product_id}
- 服务类型: 产品专属客服
服务原则:
1. 严谨谦虚,基于知识库和真实数据回答,绝不捏造事实
2. 优先使用产品相关信息和知识库内容回答
3. 对于不确定或超出知识范围的信息,明确告知用户"我不确定""需要进一步确认"
4. 如果知识库中没有相关信息,诚实告知用户,不要推测或编造
5. 引导用户使用相关功能和服务
6. 当用户需要验证产品真伪时,主动提供二维码扫描功能
可用工具:
- start_qr_scan: 启动二维码扫描功能,用于产品防伪验证
- search_knowledge_base: 从平台知识库中检索相关信息,用于回答专业问题
回答要求:
- 严格基于知识库检索结果和真实数据回答
- 如果知识库中没有相关信息,请明确告知用户
- 对于专业问题,优先使用知识库搜索工具获取准确信息
- 保持严谨谦虚的态度,不夸大、不推测、不编造
- 如果用户需要验证产品,请使用二维码扫描工具"""
self.conversation_history.append({
"role": "system",
"content": system_prompt
})
def _execute_tool(self, tool_call: ToolCall) -> ToolResult:
"""执行工具调用
Args:
tool_call: 工具调用信息
Returns:
工具执行结果
"""
# Debug logging
print(f"🔧 执行工具调用: {tool_call.function}")
function_name = tool_call.function.name if hasattr(tool_call.function, 'name') else tool_call.function.get('name', '')
print(f"🔧 工具名称: {function_name}")
if function_name == "start_qr_scan":
return self._execute_qr_scan(tool_call.id)
elif function_name == "search_knowledge_base":
return self._execute_search_knowledge_base(tool_call.id, tool_call.function)
else:
return ToolResult(
tool_call_id=tool_call.id,
content=f"未知工具: {function_name}"
)
def _execute_qr_scan(self, tool_call_id: str) -> ToolResult:
"""执行二维码扫描(模拟实现)
Args:
tool_call_id: 工具调用ID
Returns:
扫描结果
"""
print("\n🔍 启动二维码扫描...")
print("📱 请在客户端打开相机进行扫描...")
# 模拟扫描过程
for i in range(5, 0, -1):
print(f"⏳ 扫描中... {i}秒后完成")
time.sleep(1)
print("✅ 扫描完成!")
print("🎯 验证结果:通过")
print("🏷️ 二维码标签:真品")
# 返回验证结果
result = {
"status": "pass",
"message": "二维码验证通过,产品为真品",
"verification_time": time.strftime("%Y-%m-%d %H:%M:%S")
}
return ToolResult(
tool_call_id=tool_call_id,
content=json.dumps(result, ensure_ascii=False)
)
def _execute_search_knowledge_base(self, tool_call_id: str, function_data) -> ToolResult:
"""执行知识库搜索工具"""
print(f"\n🔍 开始执行知识库搜索工具...")
print(f"📊 工具调用ID: {tool_call_id}")
print(f"📋 函数数据: {function_data}")
try:
# 解析函数参数
print(f"🔧 解析函数参数...")
if hasattr(function_data, 'arguments'):
arguments = function_data.arguments
print(f"✅ 从function_data.arguments获取参数: {arguments}")
else:
arguments = function_data.get('arguments', '{}')
print(f"✅ 从function_data.get获取参数: {arguments}")
# 解析JSON参数
print(f"🔧 解析JSON参数...")
if isinstance(arguments, str):
import json
try:
args = json.loads(arguments)
print(f"✅ JSON解析成功: {args}")
except json.JSONDecodeError as je:
print(f"❌ JSON解析失败: {je}")
return ToolResult(
tool_call_id=tool_call_id,
content=f"参数格式错误: {str(je)}"
)
else:
args = arguments
print(f"✅ 参数已经是字典格式: {args}")
query = args.get('query', '')
max_results = args.get('max_results', 3)
print(f"📝 解析结果: query='{query}', max_results={max_results}")
if not query:
print(f"❌ 搜索查询为空")
return ToolResult(
tool_call_id=tool_call_id,
content="搜索查询不能为空"
)
# 检查依赖
print(f"🔧 检查RAG服务依赖...")
try:
from .rag_service import CachedLangChainRAG
print(f"✅ RAG服务导入成功")
except ImportError as ie:
print(f"❌ RAG服务导入失败: {ie}")
return ToolResult(
tool_call_id=tool_call_id,
content=f"RAG服务不可用: {str(ie)}"
)
# 创建RAG服务实例
print(f"🔧 创建RAG服务实例...")
try:
rag_service = CachedLangChainRAG()
print(f"✅ RAG服务实例创建成功")
except Exception as e:
print(f"❌ RAG服务实例创建失败: {e}")
import traceback
traceback.print_exc()
return ToolResult(
tool_call_id=tool_call_id,
content=f"RAG服务初始化失败: {str(e)}"
)
# 根据聊天类型确定租户ID
tenant_id = self.product_id if self.chat_type == 'product' else None
print(f"🏢 租户ID: {tenant_id}")
# 获取增强上下文
print(f"🔍 开始知识库搜索...")
try:
context = rag_service.get_enhanced_context(query, max_results, tenant_id)
print(f"✅ 知识库搜索成功,上下文长度: {len(context)}")
return ToolResult(
tool_call_id=tool_call_id,
content=context
)
except Exception as e:
print(f"❌ 知识库搜索失败: {e}")
import traceback
traceback.print_exc()
return ToolResult(
tool_call_id=tool_call_id,
content=f"知识库搜索执行失败: {str(e)}"
)
except Exception as e:
print(f"❌ 知识库搜索工具执行异常: {e}")
import traceback
traceback.print_exc()
return ToolResult(
tool_call_id=tool_call_id,
content=f"知识库搜索工具异常: {str(e)}"
)
def chat(self, user_message: str) -> str:
"""发送消息给AI并获取回复
Args:
user_message: 用户消息
Returns:
AI回复内容
"""
# 添加用户消息到历史
self.conversation_history.append({
"role": "user",
"content": user_message
})
try:
# 调用Moonshot API
response = self.client.chat.completions.create(
model=self.model,
messages=self.conversation_history,
tools=self.tools,
tool_choice="auto",
temperature=0.7,
max_tokens=2000
)
assistant_message = response.choices[0].message
# 检查是否有工具调用
if assistant_message.tool_calls:
# 执行工具调用
tool_results = []
for tool_call in assistant_message.tool_calls:
# 将function对象转换为字典格式
function_dict = {
'name': tool_call.function.name,
'arguments': tool_call.function.arguments
}
tool_call_obj = ToolCall(
id=tool_call.id,
type=tool_call.type,
function=function_dict
)
result = self._execute_tool(tool_call_obj)
tool_results.append(result)
# 将工具结果发送给AI获取最终回复
tool_messages = []
for result in tool_results:
tool_messages.append({
"role": "tool",
"content": result.content,
"tool_call_id": result.tool_call_id
})
# 添加助手消息和工具结果到历史
self.conversation_history.append({
"role": "assistant",
"content": assistant_message.content or "",
"tool_calls": [
{
"id": tc.id,
"type": tc.type,
"function": {
"name": tc.function.name,
"arguments": tc.function.arguments
}
} for tc in assistant_message.tool_calls
]
})
self.conversation_history.extend(tool_messages)
# 获取最终回复
final_response = self.client.chat.completions.create(
model=self.model,
messages=self.conversation_history,
temperature=0.7,
max_tokens=2000
)
final_message = final_response.choices[0].message.content
# 添加最终回复到历史
self.conversation_history.append({
"role": "assistant",
"content": final_message
})
return final_message
else:
# 没有工具调用,直接返回回复
content = assistant_message.content or "抱歉,我没有获取到有效回复。"
# 添加助手回复到历史
self.conversation_history.append({
"role": "assistant",
"content": content
})
return content
except Exception as e:
error_msg = f"AI服务调用失败: {str(e)}"
print(f"{error_msg}")
return error_msg
def get_conversation_history(self) -> List[Dict[str, str]]:
"""获取对话历史
Returns:
对话历史列表
"""
return self.conversation_history.copy()
def clear_history(self):
"""清空对话历史"""
self.conversation_history = []
self._add_system_message()
def print_history(self):
"""打印对话历史"""
print("\n" + "="*50)
print("对话历史:")
print("="*50)
for i, message in enumerate(self.conversation_history):
role = message["role"]
content = message["content"]
if role == "system":
print(f"🤖 系统: {content[:100]}...")
elif role == "user":
print(f"👤 用户: {content}")
elif role == "assistant":
print(f"🤖 AI助手: {content}")
elif role == "tool":
print(f"🔧 工具结果: {content}")
if i < len(self.conversation_history) - 1:
print("-" * 30)
print("="*50)
if __name__ == "__main__":
"""直接运行时的简单测试"""
print("🧪 AI聊天库测试模式")
print("=" * 30)
print("这是一个库模块,建议使用以下方式运行:")
print("1. Django管理命令: python manage.py chat")
print("2. 作为库导入: from products.aichat import AIChatService")
print("=" * 30)
# 简单的功能测试
try:
service = AIChatService()
print("✅ AIChatService 初始化成功")
print(f"📝 系统提示词长度: {len(service.conversation_history[0]['content'])} 字符")
print(f"🔧 可用工具数量: {len(service.tools)}")
except Exception as e:
print(f"❌ 初始化失败: {e}")