31 lines
968 B
Python
31 lines
968 B
Python
import os
|
|
from app.core import config as app_config
|
|
import boto3
|
|
from botocore.client import Config
|
|
|
|
scheme = 'https' if app_config.MINIO_SECURE else 'http'
|
|
endpoint = f"{scheme}://{os.getenv('MINIO_ENDPOINT', 'localhost')}:{app_config.MINIO_PORT}"
|
|
access_key = os.getenv('MINIO_ACCESS_KEY', 'minioadmin')
|
|
secret_key = os.getenv('MINIO_SECRET_KEY', 'minioadmin')
|
|
bucket = os.getenv('MINIO_IMAGE_BUCKET', 'images')
|
|
|
|
s3 = boto3.client(
|
|
's3',
|
|
endpoint_url=endpoint,
|
|
aws_access_key_id=access_key,
|
|
aws_secret_access_key=secret_key,
|
|
config=Config(signature_version='s3v4'),
|
|
region_name='us-east-1'
|
|
)
|
|
|
|
try:
|
|
# List buckets
|
|
response = s3.list_buckets()
|
|
print('Buckets:', [b['Name'] for b in response['Buckets']])
|
|
# Upload test file
|
|
with open('test_minio.py', 'rb') as f:
|
|
s3.upload_fileobj(f, bucket, 'test_minio.py')
|
|
print(f'Archivo subido a bucket {bucket} correctamente.')
|
|
except Exception as e:
|
|
print('Error:', e)
|