176 lines
4.9 KiB
Python
Executable File
176 lines
4.9 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
import requests
|
|
import argparse
|
|
|
|
class SubCommand(object):
|
|
""" Base class of subcommand"""
|
|
help = ""
|
|
aliases = []
|
|
want_argv = False # Whether the command accepts extra arguments
|
|
|
|
envs = {
|
|
'prod': {
|
|
'server': 'https://themblem.com',
|
|
'token': '3ebd8c33-f46e-4b06-bda8-4c0f5f5eb530',
|
|
},
|
|
'dev': {
|
|
'server': 'https://dev.themblem.com',
|
|
'token': 'D91AB64B-C5CA-4B78-AC76-01722B8C8A5C',
|
|
},
|
|
}
|
|
|
|
def setup_args(self, parser):
|
|
pass
|
|
|
|
def do(self, args, argv):
|
|
"""Do command"""
|
|
print("Not implemented")
|
|
|
|
def get_env(self):
|
|
return self.envs[self.args.env]
|
|
|
|
def get_server(self):
|
|
return self.get_env()['server']
|
|
|
|
def make_headers(self):
|
|
return {
|
|
'Authorization': 'token ' + self.get_env()['token'],
|
|
}
|
|
|
|
class ActivateCommand(SubCommand):
|
|
name = "activate"
|
|
want_argv = True
|
|
help = "Activate code"
|
|
|
|
def setup_args(self, parser):
|
|
pass
|
|
|
|
def do(self, args, argv):
|
|
for i in argv:
|
|
print(i)
|
|
self.activate(i)
|
|
|
|
def activate(self, code):
|
|
server = self.get_server()
|
|
pk = self.get_pk(code)
|
|
url = f'{server}/api/v1/code/{pk}/'
|
|
r = requests.patch(url, headers=self.make_headers(), json={
|
|
'is_active': True,
|
|
})
|
|
print(r.text)
|
|
|
|
def get_pk(self, code):
|
|
server = self.get_server()
|
|
url = f'{server}/api/v1/code/?code=' + code
|
|
r = requests.get(url, headers=self.make_headers())
|
|
r = r.json()
|
|
return r['objects'][0]['id']
|
|
|
|
class GetScanDataCommand(SubCommand):
|
|
name = "get-scan-data"
|
|
want_argv = True
|
|
help = "Get scan data from api"
|
|
|
|
def setup_args(self, parser):
|
|
parser.add_argument("--output", "-o", required=True)
|
|
|
|
def do(self, args, argv):
|
|
os.makedirs(args.output, exist_ok=True)
|
|
for i in argv:
|
|
sd = self.get_scan_data(i)
|
|
with open(f"{args.output}/metadata.json", "w") as f:
|
|
f.write(json.dumps(sd, indent=2) + "\n")
|
|
with open(f"{args.output}/frame.jpg", "wb") as f:
|
|
f.write(self.get_image(sd['image']))
|
|
|
|
def get_scan_data(self, i):
|
|
server = self.get_server()
|
|
url = f'{server}/api/v1/scan-data/{i}/'
|
|
print(url)
|
|
r = requests.get(url, headers=self.make_headers())
|
|
return r.json()
|
|
|
|
def get_image(self, name):
|
|
server = self.get_server()
|
|
token = self.get_env()['token']
|
|
url = f'{server}/api/v1/oss-image/?token={token}&name={name}'
|
|
r = requests.get(url)
|
|
r.raise_for_status()
|
|
return r.content
|
|
|
|
def get_roi(self, code):
|
|
server = self.get_server()
|
|
token = self.get_env()['token']
|
|
url = f'{server}/api/v1/code-feature-roi/?token={token}&code={code}'
|
|
r = requests.get(url)
|
|
if r.status_code == 404:
|
|
return None
|
|
return r.content
|
|
|
|
class UploadRoiCommand(SubCommand):
|
|
name = "upload-roi"
|
|
want_argv = True
|
|
help = "Upload roi, filename should be {code}.jpg"
|
|
|
|
def setup_args(self, parser):
|
|
parser.add_argument("file", action="append")
|
|
|
|
def do(self, args, argv):
|
|
for f in args.file:
|
|
print(f)
|
|
url = self.get_server() + "/api/v1/code-feature-roi/"
|
|
print(url)
|
|
r = requests.post(url, headers=self.make_headers(), files={
|
|
f: open(f, 'rb'),
|
|
})
|
|
print(r.text)
|
|
|
|
class UserInfoCommand(SubCommand):
|
|
name = "userinfo"
|
|
want_argv = True
|
|
help = "Call the userinfo API"
|
|
|
|
def setup_args(self, parser):
|
|
pass
|
|
|
|
def do(self, args, argv):
|
|
server = self.get_server()
|
|
url = f'{server}/api/v1/userinfo/'
|
|
r = requests.get(url, headers=self.make_headers())
|
|
r.raise_for_status()
|
|
print(r.json())
|
|
|
|
def global_args(parser):
|
|
parser.add_argument("--env", "-E", help="Env", default="prod")
|
|
parser.add_argument("-D", "--debug", action="store_true",
|
|
help="Enable debug output")
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser()
|
|
global_args(parser)
|
|
subparsers = parser.add_subparsers(title="subcommands")
|
|
for c in SubCommand.__subclasses__():
|
|
cmd = c()
|
|
p = subparsers.add_parser(cmd.name, aliases=cmd.aliases,
|
|
help=cmd.help)
|
|
cmd.setup_args(p)
|
|
p.set_defaults(func=cmd.do, cmdobj=cmd, all=False)
|
|
args, argv = parser.parse_known_args()
|
|
if not hasattr(args, "cmdobj"):
|
|
parser.print_usage()
|
|
return 1
|
|
if args.debug:
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
if argv and not args.cmdobj.want_argv:
|
|
raise Exception("Unrecognized arguments:\n" + argv[0])
|
|
args.cmdobj.args = args
|
|
r = args.func(args, argv)
|
|
return r
|
|
|
|
if __name__ == '__main__':
|
|
sys.exit(main())
|