themblem/api/products/models.py
2025-10-29 21:27:29 +00:00

475 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import os
import json
import uuid
import datetime
import random
from django.conf import settings
from django.utils import timezone
from django.contrib.auth.models import User
from django.db import models
class AdminInfo(models.Model):
user = models.ForeignKey(User, related_name="admininfo", on_delete=models.CASCADE)
mobile = models.CharField(max_length=128,
unique=True, null=True, blank=True,
verbose_name="手机号")
qr_verify_alert_rule = models.TextField(null=True, blank=True)
def __str__(self):
return self.user.username
class GlobalConfig(models.Model):
name = models.CharField(max_length=128,
db_index=True, unique=True,
verbose_name="配置名称")
value = models.TextField(null=True,
verbose_name="配置内容")
def __str__(self):
return self.name
def get_value(self):
if self.value:
return json.loads(self.value)
return None
class Tenant(models.Model):
username = models.CharField(max_length=128,
unique=True, db_index=True,
verbose_name="用户名")
display_name = models.CharField(max_length=128,
null=True, blank=True,
verbose_name="显示名称")
mobile = models.CharField(max_length=128,
unique=True, null=True, blank=True,
verbose_name="手机号")
password = models.CharField(max_length=256,
verbose_name="密码")
password_reset_code = models.CharField(max_length=128, db_index=True, null=True, blank=True)
password_reset_code_expire = models.DateTimeField(null=True, blank=True)
welcome_page_config = models.TextField(null=True, blank=True)
next_seq_num = models.IntegerField(default=1)
service_qr_file = models.ForeignKey('AssetFile', related_name='tenant_service_qrs', on_delete=models.PROTECT,
null=True, blank=True,
verbose_name="客服二维码文件")
def alloc_seq_nums(self, n=1):
ret = self.next_seq_num
self.next_seq_num += n
self.save()
return ret
def __str__(self):
return self.username
@classmethod
def authenticate(cls, username, password):
return Tenant.objects.filter(username=username, password=password).first()
class AuthToken(models.Model):
token = models.CharField(primary_key=True, max_length=128, verbose_name="API Token")
admin = models.ForeignKey(AdminInfo, null=True, blank=True, related_name="tokens", on_delete=models.CASCADE)
tenant = models.ForeignKey(Tenant, null=True, blank=True, related_name="tokens", on_delete=models.CASCADE)
expire = models.DateTimeField(null=True)
def validate(self):
if self.expire and self.expire < timezone.now():
return False
self.update_expire()
return True
def update_expire(self):
new_expire = timezone.now() + datetime.timedelta(minutes=settings.TOKEN_EXPIRE_MINUTES)
if not self.expire or new_expire > self.expire:
self.expire = new_expire
self.save()
class MiniProgramContent(models.Model):
tenant = models.ForeignKey(Tenant, null=True, unique=True, related_name="mini_program_content", on_delete=models.CASCADE)
content = models.TextField(default='')
def get_content(self):
try:
return json.loads(self.content)
except:
return {}
class Product(models.Model):
tenant = models.ForeignKey(Tenant, related_name="products", on_delete=models.CASCADE)
name = models.CharField(max_length=128, verbose_name="名称")
description = models.TextField(verbose_name="备注")
article = models.ForeignKey('Article',
null=True, blank=True,
verbose_name="产品信息页面",
related_name="products",
on_delete=models.CASCADE)
template_asset = models.ForeignKey('AssetFile',
null=True, blank=True,
verbose_name="产品信息页面模板文件",
related_name="template_of",
on_delete=models.PROTECT)
def __str__(self):
return "%s - %s" % (self.tenant.username, self.name)
class Meta:
unique_together = ('tenant', 'name')
class ProductProperty(models.Model):
product = models.ForeignKey(Product, related_name="properties", on_delete=models.CASCADE, verbose_name="产品")
name = models.CharField(max_length=128, verbose_name="名称")
text = models.TextField(null=True, blank=True, verbose_name="纯文本内容(可选)")
richtext = models.TextField(null=True, blank=True, verbose_name="图文内容(可选)")
file = models.ForeignKey('AssetFile', null=True, blank=True,
related_name='product_properties', on_delete=models.PROTECT, verbose_name="关联文件")
memo = models.TextField(null=True, blank=True, verbose_name="备注")
class Meta:
unique_together = ('product', 'name')
def __str__(self):
return "%s - %s" % (self.product.name, self.name)
class Article(models.Model):
title = models.CharField(max_length=128, null=True, blank=True, verbose_name="标题")
body = models.TextField()
tenant = models.ForeignKey(Tenant, related_name="articles", on_delete=models.CASCADE, null=True, blank=True)
options = models.TextField(default='', blank=True)
url = models.TextField(null=True, blank=True)
is_platform_knowledge_base = models.BooleanField(default=False, verbose_name="是否为平台知识库")
class AssetFile(models.Model):
filename = models.TextField(verbose_name="文件名")
uuid = models.CharField(max_length=256)
mime_type = models.CharField(max_length=256, default="application/octet-stream")
tenant = models.ForeignKey(Tenant, related_name="assets", on_delete=models.CASCADE, null=True, blank=True)
datetime = models.DateTimeField(auto_now_add=True, verbose_name="创建日期")
def get_name(self):
return "%s-%s" % (self.uuid, self.filename)
def is_image(self):
if self.mime_type.startswith("image/"):
return True
_, ext = os.path.splitext(self.filename)
if ext in ['.png', '.jpg', '.gif', '.jpeg']:
return True
class CodeBatch(models.Model):
tenant = models.ForeignKey(Tenant, related_name="batches", null=True, on_delete=models.CASCADE)
description = models.TextField(null=True, blank=True, verbose_name='备注')
qr_angle = models.FloatField(default=0.0, verbose_name="网线角度")
qr_angle_allowed_error = models.FloatField(default=1.0, verbose_name="角度验证误差范围")
feature_comparison_threshold = models.FloatField(default=0.0,
verbose_name="特征对比阈值(0=禁用)")
datetime = models.DateTimeField(auto_now_add=True, verbose_name="创建日期")
code_prefix = models.CharField(max_length=64, verbose_name="序列码前缀")
num_digits = models.IntegerField(default=10, verbose_name="尾号长度")
is_active = models.BooleanField(default=True, verbose_name="已激活")
name = models.CharField(null=True, max_length=255, db_index=True, verbose_name="名称")
scan_redirect_url = models.TextField(null=True, blank=True,
verbose_name="自定义扫码重定向URL可选")
enable_auto_torch = models.BooleanField(default=False, verbose_name="自动打开闪光灯")
camera_sensitivity = models.FloatField(default=1.0, verbose_name="摄像头灵敏度")
mini_prog_entry_path = models.TextField(null=True, blank=True, default='/pages/index/index',
verbose_name="小程序入口路径(可选)")
mini_prog_entry_env_version = models.CharField(max_length=128, null=True, blank=True, default='release',
verbose_name="小程序入口环境版本(可选), 默认release。如为trial则进入体验版")
def gen_code(self, n):
a = 10 ** (self.num_digits - 1)
b = 10 ** self.num_digits - 1
tenant = self.tenant
target = self.codes.count() + n
while True:
rem = target - self.codes.count()
if rem <= 0:
break
scs = []
batchsize = min(rem, 10000)
seq_num = self.tenant.alloc_seq_nums(batchsize) if self.tenant else None
for x in range(batchsize):
rn = random.randint(a, b)
code = "%s%d" % (self.code_prefix, rn)
sc = SerialCode(batch=self, code=code, tenant=tenant, seq_num=seq_num)
if seq_num:
seq_num += 1
scs.append(sc)
SerialCode.objects.bulk_create(scs, ignore_conflicts=True)
def __str__(self):
try:
return f"{self.pk} {self.qr_angle} {self.tenant} - {self.description}"
except:
return f"{self.pk}"
class SerialCode(models.Model):
seq_num = models.IntegerField(verbose_name="流水号", null=True, db_index=True)
code = models.CharField(max_length=128, unique=True, db_index=True, verbose_name="序列码")
batch = models.ForeignKey(CodeBatch, related_name="codes", on_delete=models.CASCADE)
is_active = models.BooleanField(default=False, verbose_name="已激活")
tenant = models.ForeignKey(Tenant, related_name="codes", null=True, on_delete=models.CASCADE)
product = models.ForeignKey(Product, null=True, blank=True, related_name="batches", on_delete=models.CASCADE, verbose_name="产品")
datetime = models.DateTimeField(auto_now_add=True, verbose_name="创建日期")
class Meta:
unique_together = ('tenant', 'seq_num')
def get_qr_url(self):
tenant_id = str(self.tenant.pk) if self.tenant else ''
return 'https://emblem.hondcloud.com/api/mini-prog-entry/?code=%s&tenant=%s' % (self.code, tenant_id)
def __str__(self):
return f"{self.code} (# {self.seq_num} {self.batch})"
class SerialCodeBatchOpRecord(models.Model):
seq_num_begin = models.BigIntegerField(null=True, verbose_name="开始流水号")
seq_num_end = models.BigIntegerField(null=True, verbose_name="结束流水号")
code_samples = models.TextField(null=True, verbose_name="序列码摘要")
batch_size = models.IntegerField(verbose_name='序列码数量')
datetime = models.DateTimeField(auto_now_add=True, verbose_name="操作时间")
op_name = models.CharField(max_length=128, verbose_name="操作类型")
product_name = models.TextField(null=True, verbose_name="绑定产品名称")
product_page_name = models.TextField(null=True, verbose_name="页面名称")
tenant = models.ForeignKey(Tenant, related_name="code_batch_ops", on_delete=models.CASCADE)
class ConsumerInfo(models.Model):
create_time = models.DateTimeField(auto_now_add=True)
username = models.CharField(max_length=128, null=True, blank=True, verbose_name="用户名")
platform = models.CharField(max_length=128, null=True, blank=True, default='wechat')
ip = models.CharField(max_length=128, null=True, blank=True, verbose_name="IP")
class Meta:
unique_together = ('platform', 'username')
def get_consumer(username, ip, platform='wechat', gender=None):
ci, created = ConsumerInfo.objects.get_or_create(platform=platform, username=username)
if created:
ci.ip = ip
ci.gender = gender
ci.save()
return ci
class ScanData(models.Model):
consumer = models.ForeignKey(ConsumerInfo, related_name="scans", null=True,
on_delete=models.CASCADE)
product = models.ForeignKey(Product, related_name="scans",
null=True, on_delete=models.CASCADE)
tenant = models.ForeignKey(Tenant, null=True, on_delete=models.CASCADE)
datetime = models.DateTimeField(auto_now_add=True, verbose_name="时间")
ip = models.CharField(max_length=64, verbose_name="IP地址")
location = models.CharField(max_length=128, null=True, verbose_name='位置')
message = models.TextField(null=True, verbose_name='信息')
image = models.TextField()
succeeded = models.BooleanField(default=True, verbose_name="验证结果")
code = models.TextField(null=True, verbose_name="序列号")
phone_model = models.TextField(null=True, verbose_name="手机型号")
client_log = models.TextField(null=True, blank=True, verbose_name="采集日志")
labels = models.TextField(default="", blank=True, verbose_name="标签")
class SystemLog(models.Model):
datetime = models.DateTimeField(auto_now_add=True, db_index=True)
log = models.TextField()
class Counter(models.Model):
tenant = models.ForeignKey(Tenant, null=True, related_name='counters', on_delete=models.CASCADE)
name = models.CharField(max_length=128, db_index=True)
params = models.TextField(null=True)
datetime = models.DateTimeField(auto_now_add=True, db_index=True)
count = models.IntegerField()
class UserMessage(models.Model):
sender = models.TextField(null=True, verbose_name='发送自')
datetime = models.DateTimeField(auto_now_add=True, verbose_name="时间")
read = models.BooleanField(default=False, verbose_name='已读')
admin = models.ForeignKey(AdminInfo, null=True, related_name="messages", on_delete=models.CASCADE)
tenant = models.ForeignKey(Tenant, null=True, related_name="messages", on_delete=models.CASCADE)
subject = models.TextField(verbose_name='标题')
content = models.TextField(verbose_name='内容')
content_type = models.CharField(max_length=128, default='text/plain')
def send_message(user, subject, content, sender=None):
m = UserMessage(sender=sender, subject=subject, content=content)
if isinstance(user, AdminInfo):
m.receiver_admin = user
else:
m.receiver_tenant = user
m.save()
def params_repr(params):
return "".join(["%s=%s;" % (k, str(v)) for k, v in params.items()])
def count(tenant, name, params={}, count=1):
ps = params_repr(params)
prev = Counter.objects.filter(name=name, params=ps).order_by('-pk').first()
Counter.objects.create(tenant=tenant, name=name, params=ps, count=count)
def get_sum(name):
r = Counter.objects.filter(name=name).aggregate(models.Sum('count'))
return r['count__sum'] or 0
def get_history_counts(tenant, name, since, interval_secs, total_secs=None, params=None):
dt = datetime.timedelta(seconds=interval_secs)
if total_secs:
until = since + datetime.timedelta(seconds=total_secs)
else:
until = datetime.datetime.now()
total_secs = int((until - since).total_seconds())
q = Counter.objects.filter(name=name, datetime__gte=since, datetime__lt=until)
if tenant:
q = q.filter(tenant=tenant)
q = q.order_by('pk')
if params:
q = q.filter(params=params_repr(params))
npoints = int(total_secs // interval_secs) + 1
intervals = [0] * npoints
for rec in q:
idx = int((rec.datetime - since).total_seconds()) // interval_secs
if idx >= npoints:
break
intervals[idx] += rec.count
ret = []
for i, x in enumerate(intervals):
ret.append((since + i * dt, x))
return ret
def log(*log):
log = " ".join([str(x) for x in log])
SystemLog.objects.create(log=log)
class Job(models.Model):
admin = models.ForeignKey(AdminInfo, null=True, related_name="jobs", on_delete=models.CASCADE)
tenant = models.ForeignKey(Tenant, null=True, related_name="jobs", on_delete=models.CASCADE)
name = models.CharField(max_length=128, null=True)
status = models.CharField(max_length=64, default='created')
progress = models.FloatField(default=0.0)
message = models.TextField(default='')
create_time = models.DateTimeField(auto_now_add=True)
update_time = models.DateTimeField(auto_now=True)
output = models.TextField(null=True)
def update(self, status, progress=None, message=None):
self.status = status
if progress:
self.progress = progress
if message:
self.message = message
self.save()
def __str__(self):
return self.name
class MiniProgram(models.Model):
name = models.CharField(max_length=128, verbose_name='名称')
app_id = models.CharField(max_length=128, unique=True, db_index=True, verbose_name='App ID')
app_secret = models.CharField(max_length=128, verbose_name='App Secret')
class MiniProgramIntegration(models.Model):
tenant = models.ForeignKey(Tenant, related_name="integrations", on_delete=models.CASCADE)
token = models.CharField(max_length=128, unique=True, db_index=True)
product_scope = models.ForeignKey(Product, related_name="integrations",
on_delete=models.CASCADE, null=True)
batch_scope = models.ForeignKey(CodeBatch, related_name="integrations",
on_delete=models.CASCADE, null=True)
webhook_url = models.TextField(null=True)
class SmsVerifiedAction(models.Model):
admin = models.ForeignKey(AdminInfo, related_name="sms_verified_actions", on_delete=models.CASCADE)
action = models.TextField(verbose_name="操作")
request_time = models.DateTimeField(auto_now_add=True, verbose_name="创建日期")
verify_code = models.TextField(verbose_name="验证码")
committed = models.BooleanField(default=False, verbose_name="已提交")
commit_time = models.DateTimeField(null=True, blank=True, verbose_name="提交日期")
def __str__(self):
return f"{self.admin.user.username} {self.action}"
class EstorArchiveRecord(models.Model):
filename_sha1 = models.CharField(primary_key=True, max_length=128)
filename = models.TextField(max_length=128)
data_sha1 = models.TextField(null=True)
create_time = models.DateTimeField(auto_now_add=True)
class CameraRule(models.Model):
name = models.CharField(max_length=128, verbose_name="规则名称")
model_text = models.TextField(verbose_name="机型字符串")
zoom = models.IntegerField(default=4, verbose_name="放大倍数")
use_web_view = models.BooleanField(default=False, verbose_name="使用web-viewiPhone微距")
disabled = models.BooleanField(default=False, verbose_name="禁用")
notes = models.TextField(null=True, verbose_name="备注信息")
def __str__(self):
return self.name
class FrameLabel(models.Model):
oss_path = models.TextField(verbose_name="OSS路径", unique=True, db_index=True)
labels = models.TextField(verbose_name="标签")
create_time = models.DateTimeField(auto_now_add=True, verbose_name="创建日期")
update_time = models.DateTimeField(auto_now=True, verbose_name="更新日期")
def __str__(self):
return self.oss_path
class ABTest(models.Model):
name = models.CharField(max_length=128, verbose_name="名称", db_index=True, unique=True)
description = models.TextField(verbose_name="描述")
test_object = models.CharField(max_length=128, verbose_name="实验对象")
create_time = models.DateTimeField(auto_now_add=True, verbose_name="创建日期")
update_time = models.DateTimeField(auto_now=True, verbose_name="更新日期")
enabled = models.BooleanField(default=True, verbose_name="是否启用")
spec = models.TextField(verbose_name="实验配置")
def __str__(self):
return f"{self.name} {self.category}"
class ABTestSample(models.Model):
abtest = models.ForeignKey(ABTest, related_name="samples", on_delete=models.CASCADE)
data = models.TextField(verbose_name="实验数据")
create_time = models.DateTimeField(auto_now_add=True, verbose_name="创建日期")
update_time = models.DateTimeField(auto_now=True, verbose_name="更新日期")
def __str__(self):
return f"{self.pk} {self.abtest.name} {self.create_time}"
class DataLabel(models.Model):
name = models.CharField(max_length=128, verbose_name="名称", db_index=True, unique=True)
create_time = models.DateTimeField(auto_now_add=True, verbose_name="创建日期")
update_time = models.DateTimeField(auto_now=True, verbose_name="更新日期")
description = models.TextField(verbose_name="描述")
def __str__(self):
return f"{self.name}: created {self.create_time} updated {self.update_time}"
class ChatSession(models.Model):
session_id = models.CharField(max_length=128, unique=True, db_index=True, verbose_name="会话ID")
tenant = models.ForeignKey(Tenant, related_name="chat_sessions", null=True, blank=True, on_delete=models.CASCADE, verbose_name="租户")
product = models.ForeignKey(Product, related_name="chat_sessions", null=True, blank=True, on_delete=models.CASCADE, verbose_name="产品")
consumer = models.ForeignKey(ConsumerInfo, related_name="chat_sessions", null=True, blank=True, on_delete=models.CASCADE, verbose_name="消费者")
created_at = models.DateTimeField(auto_now_add=True, verbose_name="创建时间")
class ChatMessage(models.Model):
MESSAGE_TYPE_CHOICES = [
('text', '文本'),
('image', '图片'),
('file', '文件'),
('system', '系统消息'),
('ai_response', 'AI回复'),
]
ROLE_CHOICES = [
('user', '用户'),
('assistant', '助手'),
('system', '系统'),
]
session = models.ForeignKey(ChatSession, related_name="messages", on_delete=models.CASCADE, verbose_name="会话")
role = models.CharField(max_length=20, choices=ROLE_CHOICES, verbose_name="角色")
message_type = models.CharField(max_length=20, choices=MESSAGE_TYPE_CHOICES, default='text', verbose_name="消息类型")
text_content = models.TextField(verbose_name="消息内容")
file_oss_bucket = models.TextField(null=True, blank=True, verbose_name="文件OSS桶")
file_oss_path = models.TextField(null=True, blank=True, verbose_name="文件OSS路径")
created_at = models.DateTimeField(auto_now_add=True, verbose_name="发送时间")