unitTest.py 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263
  1. from minioClientPool import MinioClientPool
  2. from threadSafeMinioClient import ThreadSafeMinioClient
  3. # Example Usage
  4. if __name__ == "__main__":
  5. # Upload a file
  6. bucket_name = 'test'
  7. test_bucket_name="test-xzy"
  8. pool = MinioClientPool('192.168.50.234:6900', '6VkF2ul6X7udr7RLsG2W', 'jtBuqZ80biRWQf6sbwzDQJwHtEBicPjkZBtvjTrA')
  9. ts_minio_client = ThreadSafeMinioClient(pool)
  10. # Check if a bucket exists
  11. exists = ts_minio_client.bucket_exists(test_bucket_name)
  12. print("Bucket exists:", exists)
  13. # Create a new bucket if it doesn't exist
  14. if not exists:
  15. ts_minio_client.create_bucket(test_bucket_name)
  16. print("Bucket create successfully:",test_bucket_name)
  17. else :
  18. ts_minio_client.delete_bucket(test_bucket_name)
  19. print("Bucket delete successfully:",test_bucket_name)
  20. # List all buckets
  21. buckets = ts_minio_client.get_buckets()
  22. print("Buckets:", [bucket.name for bucket in buckets])
  23. # Delete a bucket
  24. # Make sure to handle this carefully, as it requires the bucket to be empty
  25. ts_minio_client.delete_bucket('your-old-bucket')
  26. object_name = 'new-example-object.txt'
  27. file_path = 'data/upload/path-to-your-local-file.txt' # Path to the local file to upload
  28. upload_result = ts_minio_client.upload_file(test_bucket_name, object_name, file_path)
  29. print(f"File uploaded successfully: {upload_result}")
  30. # List of files to upload (object_name, file_path)
  31. files_to_upload = [
  32. ('new-example-object1.txt', r'data/upload/path-to-your-local-file1.txt'),
  33. ('new-example-object2.txt', r'data/upload/path-to-your-local-file2.txt')
  34. ]
  35. # Upload files
  36. upload_results = ts_minio_client.upload_files(test_bucket_name, files_to_upload)
  37. print("Files uploaded successfully:")
  38. for result in upload_results:
  39. print(result)
  40. # Download a file
  41. save_path = r'data/download/local-example-object.txt' # The path where you want to save the file
  42. ts_minio_client.download_file(test_bucket_name, object_name, save_path)
  43. print(f"File downloaded successfully and saved to {save_path}")
  44. # List of files to download (object_name, local_save_path)
  45. files_to_download = [
  46. ('new-example-object1.txt', r'data/download/local-example-object1.txt'),
  47. ('new-example-object1.txt', r'data/download/local-example-object2.txt')
  48. ]
  49. # Download files
  50. ts_minio_client.download_files(test_bucket_name, files_to_download)
  51. print("Files downloaded successfully.")