themblem/api/products/aliyun.py
2025-03-01 20:40:38 +00:00

49 lines
1.3 KiB
Python

import json
import oss2
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.request import CommonRequest
from django.conf import settings
def oss_bucket(bucketname):
oss = settings.OSS
auth = oss2.Auth(oss['access_key'], oss['secret'])
bname = bucketname or oss['bucket']
bucket = oss2.Bucket(auth, oss['endpoint'], bname)
return bucket
def oss_put(name, f, bucket=None):
oss_bucket(bucket).put_object(name, f)
def oss_get(name, bucket=None):
try:
return oss_bucket(bucket).get_object(name).read()
except oss2.exceptions.NoSuchKey:
return None
def oss_list(bucket):
ret = []
for obj in oss_bucket(bucket).list_objects().object_list:
ret.append(obj.key)
return ret
def oss_delete(name, bucket):
oss_bucket(bucket).delete_object(name)
def oss_sign_url(name, method='GET', bucket=None):
return oss_bucket(bucket).sign_url(method, name.encode(), 24 * 60 * 60)
def oss_has(name, bucket=None):
try:
obj = oss_bucket(bucket).get_object(name)
return True
except oss2.exceptions.NoSuchKey:
return False
def oss_stat(bucket=None):
bucket = oss_bucket(bucket)
stat = bucket.get_bucket_stat()
return {
'objects': stat.object_count,
'size': stat.storage_size_in_bytes,
}