Python용 AWS SDK인 boto3를 사용하여 AWS를 빠르게 시작하십시오. Boto3를 사용하면 Python 애플리케이션, 라이브러리 또는 스크립트를 Amazon S3, Amazon EC2, Amazon DynamoDB 등 AWS 서비스와 쉽게 통합할 수 있습니다.
- abort_multipart_upload
- can_paginate
- close
- complete_multipart_upload
- copy
- copy_object
- create_bucket
- create_multipart_upload
- create_session
- delete_bucket
- delete_bucket_analytics_configuration
- delete_bucket_cors
- delete_bucket_encryption
- delete_bucket_intelligent_tiering_configuration
- delete_bucket_inventory_configuration
- delete_bucket_lifecycle
- delete_bucket_metrics_configuration
- delete_bucket_ownership_controls
- delete_bucket_policy
- delete_bucket_replication
- delete_bucket_tagging
- delete_bucket_website
- delete_object
- delete_object_tagging
- delete_objects
- delete_public_access_block
- download_file
- download_fileobj
- generate_presigned_post
- generate_presigned_url
- get_bucket_accelerate_configuration
- get_bucket_acl
- get_bucket_analytics_configuration
- get_bucket_cors
- get_bucket_encryption
- get_bucket_intelligent_tiering_configuration
- get_bucket_inventory_configuration
- get_bucket_lifecycle
- get_bucket_lifecycle_configuration
- get_bucket_location
- get_bucket_logging
- get_bucket_metrics_configuration
- get_bucket_notification
- get_bucket_notification_configuration
- get_bucket_ownership_controls
- get_bucket_policy
- get_bucket_policy_status
- get_bucket_replication
- get_bucket_request_payment
- get_bucket_tagging
- get_bucket_versioning
- get_bucket_website
- get_object
- get_object_acl
- get_object_attributes
- get_object_legal_hold
- get_object_lock_configuration
- get_object_retention
- get_object_tagging
- get_object_torrent
- get_paginator
- get_public_access_block
- get_waiter
- head_bucket
- head_object
- list_bucket_analytics_configurations
- list_bucket_intelligent_tiering_configurations
- list_bucket_inventory_configurations
- list_bucket_metrics_configurations
- list_buckets
- list_directory_buckets
- list_multipart_uploads
- list_object_versions
- list_objects
- list_objects_v2
- list_parts
- put_bucket_accelerate_configuration
- put_bucket_acl
- put_bucket_analytics_configuration
- put_bucket_cors
- put_bucket_encryption
- put_bucket_intelligent_tiering_configuration
- put_bucket_inventory_configuration
- put_bucket_lifecycle
- put_bucket_lifecycle_configuration
- put_bucket_logging
- put_bucket_metrics_configuration
- put_bucket_notification
- put_bucket_notification_configuration
- put_bucket_ownership_controls
- put_bucket_policy
- put_bucket_replication
- put_bucket_request_payment
- put_bucket_tagging
- put_bucket_versioning
- put_bucket_website
- put_object
- put_object_acl
- put_object_legal_hold
- put_object_lock_configuration
- put_object_retention
- put_object_tagging
- put_public_access_block
- restore_object
- select_object_content
- upload_file
- upload_fileobj
- upload_part
- upload_part_copy
- write_get_object_response
Key Exists
import boto3
import botocore
s3 = boto3.resource('s3')
s3.Object('my-bucket', 'dootdoot.jpg').load()
except botocore.exceptions.ClientError as e:
if e.response['Error']['Code'] == "404":
# The object does not exist.
# Something else has gone wrong.
# The object does exist.
asyncio로 boto3 실행 방법
Threadpool 기반의 MultiThread로 실행해야 함.
from pprint import pprint
from concurrent.futures import ThreadPoolExecutor
import asyncio
import boto3
_executor = ThreadPoolExecutor(10)
async def execute(region):
print(f"call the region {region}")
loop = asyncio.get_running_loop()
ec2 = boto3.client("ec2", region_name=region)
response = await loop.run_in_executor(_executor, ec2.describe_instances)
return response
async def main():
ec2 = boto3.client("ec2", region_name="ap-northeast-2")
regions = [region['RegionName'] for region in ec2.describe_regions()['Regions']]
task_list = [asyncio.ensure_future(execute(region)) for region in regions]
done, pending = await asyncio.wait(task_list)
results = [d.result() for d in done]
return results
if __name__ == "__main__":
loop = asyncio.get_event_loop()
result = loop.run_until_complete(main())
Cloudflare R2 Example
import boto3
s3 = boto3.client(
service_name ="s3",
endpoint_url = 'https://<accountid>',
aws_access_key_id = '<access_key_id>',
aws_secret_access_key = '<access_key_secret>'
region_name="<location>", # Must be one of: wnam, enam, weur, eeur, apac, auto
# Get object information
object_information = s3.head_object(Bucket=<R2_BUCKET_NAME>, Key=<FILE_KEY_NAME>)
# Upload/Update single file
s3.upload_fileobj(io.BytesIO(file_content), <R2_BUCKET_NAME>, <FILE_KEY_NAME>)
# Delete object
s3.delete_object(Bucket=<R2_BUCKET_NAME>, Key=<FILE_KEY_NAME>)
