from minio import Minio from queue import Queue from minio.error import S3Error from utils.minioUtil.minioClientPool import MinioClientPool import threading import json import mimetypes class ThreadSafeMinioClient: def __init__(self, client_pool: MinioClientPool): self.client_pool: MinioClientPool = client_pool self.lock = threading.Lock() def upload_file(self, bucket_name, object_name, file_path): with self.lock: client = self.client_pool.get_client() try: content_type = self._get_content_type(file_path) result = client.fput_object( bucket_name, object_name, file_path, content_type=content_type) return result finally: self.client_pool.release_client(client) def download_file(self, bucket_name, object_name, file_path): with self.lock: client = self.client_pool.get_client() try: client.fget_object(bucket_name, object_name, file_path) finally: self.client_pool.release_client(client) def upload_files(self, bucket_name, items): # items should be a list of tuples (object_name, file_path) results = [] with self.lock: client = self.client_pool.get_client() try: for object_name, file_path in items: content_type = self._get_content_type(file_path) result = client.fput_object( bucket_name, object_name, file_path, content_type=content_type) results.append(result) return results finally: self.client_pool.release_client(client) def download_files(self, bucket_name, items): # items should be a list of tuples (object_name, save_path) with self.lock: client = self.client_pool.get_client() try: for object_name, save_path in items: client.fget_object(bucket_name, object_name, save_path) finally: self.client_pool.release_client(client) def create_bucket(self, bucket_name): with self.lock: client = self.client_pool.get_client() try: if not self.bucket_exists(bucket_name): client.make_bucket(bucket_name) finally: self.client_pool.release_client(client) def delete_bucket(self, bucket_name): with self.lock: client = self.client_pool.get_client() try: if client.bucket_exists(bucket_name): # To delete a bucket, it must be empty client.remove_bucket(bucket_name) finally: self.client_pool.release_client(client) def get_buckets(self): with self.lock: client = self.client_pool.get_client() try: return client.list_buckets() finally: self.client_pool.release_client(client) def bucket_exists(self, bucket_name): # with self.lock: client = self.client_pool.get_client() try: return client.bucket_exists(bucket_name) except S3Error as err: return False finally: self.client_pool.release_client(client) def set_bucket_policy(self, bucket_name): client = self.client_pool.get_client() try: policy = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetBucketLocation","s3:ListBucket"],"Resource":["arn:aws:s3:::%s"]},{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetObject"],"Resource":["arn:aws:s3:::%s/*"]}]}' % (bucket_name, bucket_name) client.set_bucket_policy(bucket_name, policy) finally: self.client_pool.release_client(client) def _get_content_type(self, file_path): content_type, _ = mimetypes.guess_type(file_path) return content_type if content_type else 'application/octet-stream'