60 lines
2.1 KiB
Python
60 lines
2.1 KiB
Python
import json
|
|
from django.core.management.base import BaseCommand, CommandError
|
|
from django.utils import timezone
|
|
from products.sendmsg import send_user_message
|
|
from products.models import *
|
|
|
|
class Command(BaseCommand):
|
|
help = 'Check repeated QR verify request and send alert'
|
|
|
|
def add_arguments(self, parser):
|
|
parser.add_argument('code')
|
|
|
|
def handle(self, *args, **options):
|
|
code = options['code']
|
|
sc = SerialCode.objects.filter(code=code, is_active=True).first()
|
|
if not sc:
|
|
return
|
|
trigger = False
|
|
for a in AdminInfo.objects.filter(qr_verify_alert_rule__isnull=False):
|
|
if self.check_one(a, sc):
|
|
trigger = True
|
|
if trigger:
|
|
self.disable_code(sc)
|
|
|
|
def check_one(self, admin, sc):
|
|
rule = admin.qr_verify_alert_rule
|
|
if not rule:
|
|
return
|
|
rule = json.loads(rule)
|
|
time_window_seconds = rule.get('time_window_seconds')
|
|
repeat_threshold = rule.get('repeat_threshold')
|
|
if not time_window_seconds or not repeat_threshold:
|
|
return
|
|
start = timezone.now() - datetime.timedelta(seconds=time_window_seconds)
|
|
records = ScanData.objects.filter(code=sc.code, datetime__gte=start)
|
|
if records.count() >= repeat_threshold:
|
|
self.alert_one(admin, sc, records)
|
|
return True
|
|
|
|
def disable_code(self, sc):
|
|
sc.is_active = False
|
|
sc.save()
|
|
|
|
def alert_one(self, admin, sc, records):
|
|
subject = "重复验证报警: 序列码 %s 最近已重复 %d 次验证" % (sc.code, records.count())
|
|
print(subject)
|
|
lines = [
|
|
"序列码: %s" % sc.code,
|
|
"租户: %s" % (sc.tenant.username if sc.tenant else ""),
|
|
"产品: %s" % (sc.product.name if sc.product else ""),
|
|
"",
|
|
"近期验证记录:"]
|
|
for r in records:
|
|
lines.append("%s, %s, %s" % (r.datetime, r.location, r.ip))
|
|
|
|
lines.append("")
|
|
lines.append("验证码已冻结")
|
|
|
|
send_user_message(admin, subject, "\n".join(lines))
|