46 lines
1.4 KiB
Python
46 lines
1.4 KiB
Python
import oss2
|
|
from django.conf import settings
|
|
|
|
def oss_bucket(bucket, endpoint=None):
|
|
oss = settings.OSS
|
|
auth = oss2.Auth(oss['access_key'], oss['secret'])
|
|
bname = bucket or oss['bucket']
|
|
bucket = oss2.Bucket(auth, endpoint or oss['endpoint'], bname)
|
|
return bucket
|
|
|
|
def oss_put(name, f, bucket=None, endpoint=None):
|
|
oss_bucket(bucket, endpoint).put_object(name, f)
|
|
|
|
def oss_get(name, bucket=None, endpoint=None):
|
|
try:
|
|
return oss_bucket(bucket, endpoint).get_object(name).read()
|
|
except oss2.exceptions.NoSuchKey:
|
|
return None
|
|
|
|
def oss_list(bucket=None, endpoint=None):
|
|
ret = []
|
|
for obj in oss_bucket(bucket).list_objects().object_list:
|
|
ret.append(obj.key)
|
|
return ret
|
|
|
|
def oss_delete(name, bucket=None, endpoint=None):
|
|
oss_bucket(bucket, endpoint).delete_object(name)
|
|
|
|
def oss_sign_url(name, method='GET', bucket=None, endpoint=None):
|
|
return oss_bucket(bucket, endpoint).sign_url(method, name.encode(), 24 * 60 * 60)
|
|
|
|
def oss_has(name, bucket=None, endpoint=None):
|
|
try:
|
|
obj = oss_bucket(bucket, endpoint).get_object(name)
|
|
return True
|
|
except oss2.exceptions.NoSuchKey:
|
|
return False
|
|
|
|
def oss_stat(bucket=None, endpoint=None):
|
|
bucket = oss_bucket(bucket, endpoint)
|
|
stat = bucket.get_bucket_stat()
|
|
return {
|
|
'objects': stat.object_count,
|
|
'size': stat.storage_size_in_bytes,
|
|
}
|