123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106 |
- from minio import Minio
- from queue import Queue
- from minio.error import S3Error
- from utils.minioUtil.minioClientPool import MinioClientPool
- import threading
- import json
- import mimetypes
- class ThreadSafeMinioClient:
- def __init__(self, client_pool: MinioClientPool):
- self.client_pool: MinioClientPool = client_pool
- self.lock = threading.Lock()
- def upload_file(self, bucket_name, object_name, file_path):
- with self.lock:
- client = self.client_pool.get_client()
- try:
- content_type = self._get_content_type(file_path)
- result = client.fput_object(
- bucket_name, object_name, file_path, content_type=content_type)
- return result
- finally:
- self.client_pool.release_client(client)
- def download_file(self, bucket_name, object_name, file_path):
- with self.lock:
- client = self.client_pool.get_client()
- try:
- client.fget_object(bucket_name, object_name, file_path)
- finally:
- self.client_pool.release_client(client)
- def upload_files(self, bucket_name, items):
- # items should be a list of tuples (object_name, file_path)
- results = []
- with self.lock:
- client = self.client_pool.get_client()
- try:
- for object_name, file_path in items:
- content_type = self._get_content_type(file_path)
- result = client.fput_object(
- bucket_name, object_name, file_path, content_type=content_type)
- results.append(result)
- return results
- finally:
- self.client_pool.release_client(client)
- def download_files(self, bucket_name, items):
- # items should be a list of tuples (object_name, save_path)
- with self.lock:
- client = self.client_pool.get_client()
- try:
- for object_name, save_path in items:
- client.fget_object(bucket_name, object_name, save_path)
- finally:
- self.client_pool.release_client(client)
- def create_bucket(self, bucket_name):
- with self.lock:
- client = self.client_pool.get_client()
- try:
- if not self.bucket_exists(bucket_name):
- client.make_bucket(bucket_name)
- finally:
- self.client_pool.release_client(client)
- def delete_bucket(self, bucket_name):
- with self.lock:
- client = self.client_pool.get_client()
- try:
- if client.bucket_exists(bucket_name):
- # To delete a bucket, it must be empty
- client.remove_bucket(bucket_name)
- finally:
- self.client_pool.release_client(client)
- def get_buckets(self):
- with self.lock:
- client = self.client_pool.get_client()
- try:
- return client.list_buckets()
- finally:
- self.client_pool.release_client(client)
- def bucket_exists(self, bucket_name):
- # with self.lock:
- client = self.client_pool.get_client()
- try:
- return client.bucket_exists(bucket_name)
- except S3Error as err:
- return False
- finally:
- self.client_pool.release_client(client)
- def set_bucket_policy(self, bucket_name):
- client = self.client_pool.get_client()
- try:
- policy = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetBucketLocation","s3:ListBucket"],"Resource":["arn:aws:s3:::%s"]},{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetObject"],"Resource":["arn:aws:s3:::%s/*"]}]}' % (bucket_name, bucket_name)
- client.set_bucket_policy(bucket_name, policy)
- finally:
- self.client_pool.release_client(client)
- def _get_content_type(self, file_path):
- content_type, _ = mimetypes.guess_type(file_path)
- return content_type if content_type else 'application/octet-stream'
|