import json from django.core.management.base import BaseCommand, CommandError from django.utils import timezone from products.sendmsg import send_user_message from products.models import * class Command(BaseCommand): help = 'Check repeated QR verify request and send alert' def add_arguments(self, parser): parser.add_argument('code') def handle(self, *args, **options): code = options['code'] sc = SerialCode.objects.filter(code=code, is_active=True).first() if not sc: return trigger = False for a in AdminInfo.objects.filter(qr_verify_alert_rule__isnull=False): if self.check_one(a, sc): trigger = True if trigger: self.disable_code(sc) def check_one(self, admin, sc): rule = admin.qr_verify_alert_rule if not rule: return rule = json.loads(rule) time_window_seconds = rule.get('time_window_seconds') repeat_threshold = rule.get('repeat_threshold') if not time_window_seconds or not repeat_threshold: return start = timezone.now() - datetime.timedelta(seconds=time_window_seconds) records = ScanData.objects.filter(code=sc.code, datetime__gte=start) if records.count() >= repeat_threshold: self.alert_one(admin, sc, records) return True def disable_code(self, sc): sc.is_active = False sc.save() def alert_one(self, admin, sc, records): subject = "重复验证报警: 序列码 %s 最近已重复 %d 次验证" % (sc.code, records.count()) print(subject) lines = [ "序列码: %s" % sc.code, "租户: %s" % (sc.tenant.username if sc.tenant else ""), "产品: %s" % (sc.product.name if sc.product else ""), "", "近期验证记录:"] for r in records: lines.append("%s, %s, %s" % (r.datetime, r.location, r.ip)) lines.append("") lines.append("验证码已冻结") send_user_message(admin, subject, "\n".join(lines))