import oss2 from django.conf import settings def oss_bucket(bucket, endpoint=None): oss = settings.OSS auth = oss2.Auth(oss['access_key'], oss['secret']) bname = bucket or oss['bucket'] bucket = oss2.Bucket(auth, endpoint or oss['endpoint'], bname) return bucket def oss_put(name, f, bucket=None, endpoint=None): oss_bucket(bucket, endpoint).put_object(name, f) def oss_get(name, bucket=None, endpoint=None): try: return oss_bucket(bucket, endpoint).get_object(name).read() except oss2.exceptions.NoSuchKey: return None def oss_list(bucket=None, endpoint=None): ret = [] for obj in oss_bucket(bucket).list_objects().object_list: ret.append(obj.key) return ret def oss_delete(name, bucket=None, endpoint=None): oss_bucket(bucket, endpoint).delete_object(name) def oss_sign_url(name, method='GET', bucket=None, endpoint=None): return oss_bucket(bucket, endpoint).sign_url(method, name.encode(), 24 * 60 * 60) def oss_has(name, bucket=None, endpoint=None): try: obj = oss_bucket(bucket, endpoint).get_object(name) return True except oss2.exceptions.NoSuchKey: return False def oss_stat(bucket=None, endpoint=None): bucket = oss_bucket(bucket, endpoint) stat = bucket.get_bucket_stat() return { 'objects': stat.object_count, 'size': stat.storage_size_in_bytes, }