如何使用 boto3
查看 S3 中存储桶内的内容? (即做一个"ls"
)?
执行以下操作:
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('some/path/')
返回:
s3.Bucket(name='some/path/')
如何查看其内容?
查看内容的一种方法是:
for my_bucket_object in my_bucket.objects.all():
print(my_bucket_object)
这类似于“ls”,但它不考虑前缀文件夹约定,并将列出存储桶中的对象。由读者自行过滤掉作为键名一部分的前缀。
在 Python 2 中:
from boto.s3.connection import S3Connection
conn = S3Connection() # assumes boto.cfg setup
bucket = conn.get_bucket('bucket_name')
for obj in bucket.get_all_keys():
print(obj.key)
在 Python 3 中:
from boto3 import client
conn = client('s3') # again assumes boto.cfg setup, assume AWS S3
for key in conn.list_objects(Bucket='bucket_name')['Contents']:
print(key['Key'])
conn.list_objects(Bucket='bucket_name', Prefix='prefix_string')['Contents']
我假设您已经单独配置了身份验证。
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('bucket_name')
for file in my_bucket.objects.all():
print(file.key)
我的 s3 keys
utility function 本质上是@Hephaestus 答案的优化版本:
import boto3
s3_paginator = boto3.client('s3').get_paginator('list_objects_v2')
def keys(bucket_name, prefix='/', delimiter='/', start_after=''):
prefix = prefix[1:] if prefix.startswith(delimiter) else prefix
start_after = (start_after or prefix) if prefix.endswith(delimiter) else start_after
for page in s3_paginator.paginate(Bucket=bucket_name, Prefix=prefix, StartAfter=start_after):
for content in page.get('Contents', ()):
yield content['Key']
在我的测试(boto3 1.9.84)中,它比等效(但更简单)的代码要快得多:
import boto3
def keys(bucket_name, prefix='/', delimiter='/'):
prefix = prefix[1:] if prefix.startswith(delimiter) else prefix
bucket = boto3.resource('s3').Bucket(bucket_name)
return (_.key for _ in bucket.objects.filter(Prefix=prefix))
作为 S3 guarantees UTF-8 binary sorted results,在第一个函数中添加了 start_after
优化。
list()
中以返回文件列表。
Delimiter=delimiter
添加到 s3_paginator.paginate
调用以避免递归到“子目录”。
为了处理大型键列表(即当目录列表大于 1000 项时),我使用以下代码来累积具有多个列表的键值(即文件名)(感谢上面的 Amelio 第一行)。代码适用于 python3:
from boto3 import client
bucket_name = "my_bucket"
prefix = "my_key/sub_key/lots_o_files"
s3_conn = client('s3') # type: BaseClient ## again assumes boto.cfg setup, assume AWS S3
s3_result = s3_conn.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter = "/")
if 'Contents' not in s3_result:
#print(s3_result)
return []
file_list = []
for key in s3_result['Contents']:
file_list.append(key['Key'])
print(f"List count = {len(file_list)}")
while s3_result['IsTruncated']:
continuation_key = s3_result['NextContinuationToken']
s3_result = s3_conn.list_objects_v2(Bucket=bucket_name, Prefix=prefix, Delimiter="/", ContinuationToken=continuation_key)
for key in s3_result['Contents']:
file_list.append(key['Key'])
print(f"List count = {len(file_list)}")
return file_list
如果你想传递 ACCESS 和 SECRET 密钥(你不应该这样做,因为它不安全):
from boto3.session import Session
ACCESS_KEY='your_access_key'
SECRET_KEY='your_secret_key'
session = Session(aws_access_key_id=ACCESS_KEY,
aws_secret_access_key=SECRET_KEY)
s3 = session.resource('s3')
your_bucket = s3.Bucket('your_bucket')
for s3_file in your_bucket.objects.all():
print(s3_file.key)
#To print all filenames in a bucket
import boto3
s3 = boto3.client('s3')
def get_s3_keys(bucket):
"""Get a list of keys in an S3 bucket."""
resp = s3.list_objects_v2(Bucket=bucket)
for obj in resp['Contents']:
files = obj['Key']
return files
filename = get_s3_keys('your_bucket_name')
print(filename)
#To print all filenames in a certain directory in a bucket
import boto3
s3 = boto3.client('s3')
def get_s3_keys(bucket, prefix):
"""Get a list of keys in an S3 bucket."""
resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix)
for obj in resp['Contents']:
files = obj['Key']
print(files)
return files
filename = get_s3_keys('your_bucket_name', 'folder_name/sub_folder_name/')
print(filename)
更新:最简单的方法是使用awswrangler
import awswrangler as wr
wr.s3.list_objects('s3://bucket_name')
ls
。你会怎么做..只打印根目录中的文件
def get_s3_keys(bucket, prefix): resp = s3.list_objects_v2(Bucket=bucket, Prefix=prefix) return [obj['Key'] for obj in resp['Contents']]
一种更简洁的方式,而不是通过 for 循环进行迭代,您还可以只打印包含 S3 存储桶内所有文件的原始对象:
session = Session(aws_access_key_id=aws_access_key_id,aws_secret_access_key=aws_secret_access_key)
s3 = session.resource('s3')
bucket = s3.Bucket('bucket_name')
files_in_s3 = bucket.objects.all()
#you can print this iterable with print(list(files_in_s3))
files_in_s3
是“列表对象”。 Python中没有这样的东西。它是一个可迭代的,我不能让你的代码工作,因此被否决了。比我发现错误并看到您的观点但无法撤消我的反对意见。
import boto3
s3 = boto3.resource('s3')
## Bucket to use
my_bucket = s3.Bucket('city-bucket')
## List objects within a given prefix
for obj in my_bucket.objects.filter(Delimiter='/', Prefix='city/'):
print obj.key
输出:
city/pune.csv
city/goa.csv
city/
、city/pune.csv
和city/goa.csv
。
对象总结:
有两个标识符附加到 ObjectSummary:
桶名
钥匙
AWS S3 文档中有关对象键的更多信息:
对象键:创建对象时,指定键名,唯一标识存储桶中的对象。例如,在 Amazon S3 控制台(请参阅 AWS 管理控制台)中,当您突出显示存储桶时,会显示存储桶中的对象列表。这些名称是对象键。键的名称是 Unicode 字符序列,其 UTF-8 编码长度最多为 1024 个字节。 Amazon S3 数据模型是一个平面结构:您创建一个存储桶,存储桶存储对象。没有子桶或子文件夹的层次结构;但是,您可以像 Amazon S3 控制台那样使用键名前缀和分隔符来推断逻辑层次结构。 Amazon S3 控制台支持文件夹的概念。假设您的存储桶(管理员创建)有四个对象,其对象键如下: Development/Projects1.xls Finance/statement1.pdf Private/taxdocument.pdf s3-dg.pdf 参考:AWS S3:对象键
下面是一些示例代码,演示了如何获取存储桶名称和对象键。
例子:
import boto3
from pprint import pprint
def main():
def enumerate_s3():
s3 = boto3.resource('s3')
for bucket in s3.buckets.all():
print("Name: {}".format(bucket.name))
print("Creation Date: {}".format(bucket.creation_date))
for object in bucket.objects.all():
print("Object: {}".format(object))
print("Object bucket_name: {}".format(object.bucket_name))
print("Object key: {}".format(object.key))
enumerate_s3()
if __name__ == '__main__':
main()
因此,您要求在 boto3 中等效于 aws s3 ls
。这将列出所有顶级文件夹和文件。这是我能得到的最接近的;它只列出所有顶级文件夹。令人惊讶的是,如此简单的操作是多么困难。
import boto3
def s3_ls():
s3 = boto3.resource('s3')
bucket = s3.Bucket('example-bucket')
result = bucket.meta.client.list_objects(Bucket=bucket.name,
Delimiter='/')
for o in result.get('CommonPrefixes'):
print(o.get('Prefix'))
Prefix=
添加到 list_objects
以限制为该前缀。
这是一个简单的函数,它返回所有文件的文件名或具有某些类型的文件,例如“json”、“jpg”。
def get_file_list_s3(bucket, prefix="", file_extension=None):
"""Return the list of all file paths (prefix + file name) with certain type or all
Parameters
----------
bucket: str
The name of the bucket. For example, if your bucket is "s3://my_bucket" then it should be "my_bucket"
prefix: str
The full path to the the 'folder' of the files (objects). For example, if your files are in
s3://my_bucket/recipes/deserts then it should be "recipes/deserts". Default : ""
file_extension: str
The type of the files. If you want all, just leave it None. If you only want "json" files then it
should be "json". Default: None
Return
------
file_names: list
The list of file names including the prefix
"""
import boto3
s3 = boto3.resource('s3')
my_bucket = s3.Bucket(bucket)
file_objs = my_bucket.objects.filter(Prefix=prefix).all()
file_names = [file_obj.key for file_obj in file_objs if file_extension is not None and file_obj.key.split(".")[-1] == file_extension]
return file_names
我曾经这样做的一种方法:
import boto3
s3 = boto3.resource('s3')
bucket=s3.Bucket("bucket_name")
contents = [_.key for _ in bucket.objects.all() if "subfolders/ifany/" in _.key]
我就是这样做的,包括认证方法:
s3_client = boto3.client(
's3',
aws_access_key_id='access_key',
aws_secret_access_key='access_key_secret',
config=boto3.session.Config(signature_version='s3v4'),
region_name='region'
)
response = s3_client.list_objects(Bucket='bucket_name', Prefix=key)
if ('Contents' in response):
# Object / key exists!
return True
else:
# Object / key DOES NOT exist!
return False
这是解决方案
import boto3
s3=boto3.resource('s3')
BUCKET_NAME = 'Your S3 Bucket Name'
allFiles = s3.Bucket(BUCKET_NAME).objects.all()
for file in allFiles:
print(file.key)
在上述评论之一中对@Hephaeastus 的代码几乎没有修改,编写了以下方法来列出给定路径中的文件夹和对象(文件)。工作原理类似于 s3 ls 命令。
from boto3 import session
def s3_ls(profile=None, bucket_name=None, folder_path=None):
folders=[]
files=[]
result=dict()
bucket_name = bucket_name
prefix= folder_path
session = boto3.Session(profile_name=profile)
s3_conn = session.client('s3')
s3_result = s3_conn.list_objects_v2(Bucket=bucket_name, Delimiter = "/", Prefix=prefix)
if 'Contents' not in s3_result and 'CommonPrefixes' not in s3_result:
return []
if s3_result.get('CommonPrefixes'):
for folder in s3_result['CommonPrefixes']:
folders.append(folder.get('Prefix'))
if s3_result.get('Contents'):
for key in s3_result['Contents']:
files.append(key['Key'])
while s3_result['IsTruncated']:
continuation_key = s3_result['NextContinuationToken']
s3_result = s3_conn.list_objects_v2(Bucket=bucket_name, Delimiter="/", ContinuationToken=continuation_key, Prefix=prefix)
if s3_result.get('CommonPrefixes'):
for folder in s3_result['CommonPrefixes']:
folders.append(folder.get('Prefix'))
if s3_result.get('Contents'):
for key in s3_result['Contents']:
files.append(key['Key'])
if folders:
result['folders']=sorted(folders)
if files:
result['files']=sorted(files)
return result
这列出了给定路径中的所有对象/文件夹。默认情况下,Folder_path 可以保留为 None 并且方法将列出存储桶根目录的直接内容。
也可以按如下方式进行:
csv_files = s3.list_objects_v2(s3_bucket_path)
for obj in csv_files['Contents']:
key = obj['Key']
使用 cloudpathlib
cloudpathlib
提供了一个方便的包装器,以便您可以使用简单的 pathlib
API 与 AWS S3(以及 Azure blob 存储、GCS 等)进行交互。您可以使用 pip install "cloudpathlib[s3]"
安装。
与 pathlib
一样,您可以使用 glob
或 iterdir
列出目录的内容。
这是一个包含公共 AWS S3 存储桶的示例,您可以复制并粘贴该存储桶以运行。
from cloudpathlib import CloudPath
s3_path = CloudPath("s3://ladi/Images/FEMA_CAP/2020/70349")
# list items with glob
list(
s3_path.glob("*")
)[:3]
#> [ S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0001_5a63d42e-27c6-448a-84f1-bfc632125b8e.jpg'),
#> S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0002_a89f1b79-786f-4dac-9dcc-609fb1a977b1.jpg'),
#> S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0003_02c30af6-911e-4e01-8c24-7644da2b8672.jpg')]
# list items with iterdir
list(
s3_path.iterdir()
)[:3]
#> [ S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0001_5a63d42e-27c6-448a-84f1-bfc632125b8e.jpg'),
#> S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0002_a89f1b79-786f-4dac-9dcc-609fb1a977b1.jpg'),
#> S3Path('s3://ladi/Images/FEMA_CAP/2020/70349/DSC_0003_02c30af6-911e-4e01-8c24-7644da2b8672.jpg')]
由 reprexlite v0.4.2 创建于 2021-05-21 20:38:47 PDT
一个不错的选择也可能是从 lambda 函数运行 aws cli 命令
import subprocess
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)
def run_command(command):
command_list = command.split(' ')
try:
logger.info("Running shell command: \"{}\"".format(command))
result = subprocess.run(command_list, stdout=subprocess.PIPE);
logger.info("Command output:\n---\n{}\n---".format(result.stdout.decode('UTF-8')))
except Exception as e:
logger.error("Exception: {}".format(e))
return False
return True
def lambda_handler(event, context):
run_command('/opt/aws s3 ls s3://bucket-name')
我被困了整整一夜,因为我只想获取子文件夹下的文件数量,但它还在子文件夹本身的内容中返回了一个额外的文件,
经过研究,我发现这就是 s3 的工作方式,但我有一个场景,我从以下目录中的 redshift 卸载数据
s3://bucket_name/subfolder/<10 number of files>
当我使用
paginator.paginate(Bucket=price_signal_bucket_name,Prefix=new_files_folder_path+"/")
它只会返回 10 个文件,但是当我在 s3 存储桶本身上创建文件夹时,它也会返回子文件夹
结论
如果整个文件夹上传到 s3,则列出仅返回前缀下的文件但如果 fodler 是在 s3 存储桶本身上创建的,则使用 boto3 客户端列出它也将返回子文件夹和文件
mybucket.objects.filter(Prefix='foo/bar')
,它只会列出带有该前缀的对象。您还可以传递Delimiter
参数。object
作为变量名,因为它会影响全局类型object
。