threadSafeMinioClient.py 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. from minio import Minio
  2. from queue import Queue
  3. from minio.error import S3Error
  4. from utils.minioUtil.minioClientPool import MinioClientPool
  5. import threading
  6. import json
  7. import mimetypes
  8. class ThreadSafeMinioClient:
  9. def __init__(self, client_pool: MinioClientPool):
  10. self.client_pool: MinioClientPool = client_pool
  11. self.lock = threading.Lock()
  12. def upload_file(self, bucket_name, object_name, file_path):
  13. with self.lock:
  14. client = self.client_pool.get_client()
  15. try:
  16. content_type = self._get_content_type(file_path)
  17. result = client.fput_object(
  18. bucket_name, object_name, file_path, content_type=content_type)
  19. return result
  20. finally:
  21. self.client_pool.release_client(client)
  22. def download_file(self, bucket_name, object_name, file_path):
  23. with self.lock:
  24. client = self.client_pool.get_client()
  25. try:
  26. client.fget_object(bucket_name, object_name, file_path)
  27. finally:
  28. self.client_pool.release_client(client)
  29. def upload_files(self, bucket_name, items):
  30. # items should be a list of tuples (object_name, file_path)
  31. results = []
  32. with self.lock:
  33. client = self.client_pool.get_client()
  34. try:
  35. for object_name, file_path in items:
  36. content_type = self._get_content_type(file_path)
  37. result = client.fput_object(
  38. bucket_name, object_name, file_path, content_type=content_type)
  39. results.append(result)
  40. return results
  41. finally:
  42. self.client_pool.release_client(client)
  43. def download_files(self, bucket_name, items):
  44. # items should be a list of tuples (object_name, save_path)
  45. with self.lock:
  46. client = self.client_pool.get_client()
  47. try:
  48. for object_name, save_path in items:
  49. client.fget_object(bucket_name, object_name, save_path)
  50. finally:
  51. self.client_pool.release_client(client)
  52. def create_bucket(self, bucket_name):
  53. with self.lock:
  54. client = self.client_pool.get_client()
  55. try:
  56. if not self.bucket_exists(bucket_name):
  57. client.make_bucket(bucket_name)
  58. finally:
  59. self.client_pool.release_client(client)
  60. def delete_bucket(self, bucket_name):
  61. with self.lock:
  62. client = self.client_pool.get_client()
  63. try:
  64. if client.bucket_exists(bucket_name):
  65. # To delete a bucket, it must be empty
  66. client.remove_bucket(bucket_name)
  67. finally:
  68. self.client_pool.release_client(client)
  69. def get_buckets(self):
  70. with self.lock:
  71. client = self.client_pool.get_client()
  72. try:
  73. return client.list_buckets()
  74. finally:
  75. self.client_pool.release_client(client)
  76. def bucket_exists(self, bucket_name):
  77. # with self.lock:
  78. client = self.client_pool.get_client()
  79. try:
  80. return client.bucket_exists(bucket_name)
  81. except S3Error as err:
  82. return False
  83. finally:
  84. self.client_pool.release_client(client)
  85. def set_bucket_policy(self, bucket_name):
  86. client = self.client_pool.get_client()
  87. try:
  88. policy = '{"Version":"2012-10-17","Statement":[{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetBucketLocation","s3:ListBucket"],"Resource":["arn:aws:s3:::%s"]},{"Effect":"Allow","Principal":{"AWS":["*"]},"Action":["s3:GetObject"],"Resource":["arn:aws:s3:::%s/*"]}]}' % (bucket_name, bucket_name)
  89. client.set_bucket_policy(bucket_name, policy)
  90. finally:
  91. self.client_pool.release_client(client)
  92. def _get_content_type(self, file_path):
  93. content_type, _ = mimetypes.guess_type(file_path)
  94. return content_type if content_type else 'application/octet-stream'