2025-03-01 20:40:38 +00:00

60 lines
2.1 KiB
Python

import json
from django.core.management.base import BaseCommand, CommandError
from django.utils import timezone
from products.sendmsg import send_user_message
from products.models import *
class Command(BaseCommand):
help = 'Check repeated QR verify request and send alert'
def add_arguments(self, parser):
parser.add_argument('code')
def handle(self, *args, **options):
code = options['code']
sc = SerialCode.objects.filter(code=code, is_active=True).first()
if not sc:
return
trigger = False
for a in AdminInfo.objects.filter(qr_verify_alert_rule__isnull=False):
if self.check_one(a, sc):
trigger = True
if trigger:
self.disable_code(sc)
def check_one(self, admin, sc):
rule = admin.qr_verify_alert_rule
if not rule:
return
rule = json.loads(rule)
time_window_seconds = rule.get('time_window_seconds')
repeat_threshold = rule.get('repeat_threshold')
if not time_window_seconds or not repeat_threshold:
return
start = timezone.now() - datetime.timedelta(seconds=time_window_seconds)
records = ScanData.objects.filter(code=sc.code, datetime__gte=start)
if records.count() >= repeat_threshold:
self.alert_one(admin, sc, records)
return True
def disable_code(self, sc):
sc.is_active = False
sc.save()
def alert_one(self, admin, sc, records):
subject = "重复验证报警: 序列码 %s 最近已重复 %d 次验证" % (sc.code, records.count())
print(subject)
lines = [
"序列码: %s" % sc.code,
"租户: %s" % (sc.tenant.username if sc.tenant else ""),
"产品: %s" % (sc.product.name if sc.product else ""),
"",
"近期验证记录:"]
for r in records:
lines.append("%s, %s, %s" % (r.datetime, r.location, r.ip))
lines.append("")
lines.append("验证码已冻结")
send_user_message(admin, subject, "\n".join(lines))