对象存储 S3
对象存储概念
对象存储将数据以"对象"形式存储,每个对象包含数据本身、元数据和唯一标识符(Key)。
对象存储 vs 块存储 vs 文件存储:
块存储:
数据以固定大小的块存储
需要文件系统格式化
适合:数据库、VM 磁盘
访问方式:iSCSI/FC/NVMe
文件存储:
数据以文件和目录树组织
支持 POSIX 操作(open/read/write)
适合:共享文件、应用数据
访问方式:NFS/CIFS
对象存储:
数据以对象存储,扁平命名空间
通过 HTTP API 访问(S3/Swift)
适合:非结构化数据、备份、大数据
优势:无限扩展、低成本、高可用H3C 对象存储方案
Ceph RGW(推荐)
Ceph RADOS Gateway 提供 S3 兼容的对象存储接口:
bash
# 部署 RGW
ceph orch apply rgw mystore \
--realm=myrealm \
--zone=myzone \
--placement="3 storage01 storage02 storage03"
# 创建用户
radosgw-admin user create \
--uid=myuser \
--display-name="My User" \
--email=user@example.com
# 获取访问密钥
radosgw-admin user info --uid=myuser | python3 -c "
import sys, json
d = json.load(sys.stdin)
keys = d['keys'][0]
print(f'Access Key: {keys[\"access_key\"]}')
print(f'Secret Key: {keys[\"secret_key\"]}')
"
# 创建 Bucket
aws s3 mb s3://my-bucket \
--endpoint-url http://rgw.example.com:7480
# 上传文件
aws s3 cp local-file.txt s3://my-bucket/path/file.txt \
--endpoint-url http://rgw.example.com:7480RGW 配置优化
ini
# ceph.conf RGW 相关配置
[client.rgw.mystore]
# 监听地址和端口
rgw_frontends = beast endpoint=0.0.0.0:7480
# 性能优化
rgw_thread_pool_size = 512
rgw_max_chunk_size = 4194304 # 4MB 分片
# 缓存配置
rgw_cache_enabled = true
rgw_cache_lru_size = 10000
# 多站点同步(跨数据中心)
# rgw_zone = myzone
# rgw_zonegroup = myzonegroupS3 API 完整使用指南
Python boto3 完整示例
python
import boto3
import os
from botocore.config import Config
from botocore.exceptions import ClientError
class H3CObjectStorage:
"""H3C 对象存储客户端封装"""
def __init__(self, endpoint_url, access_key, secret_key, region='us-east-1'):
self.s3 = boto3.client(
's3',
endpoint_url=endpoint_url,
aws_access_key_id=access_key,
aws_secret_access_key=secret_key,
region_name=region,
config=Config(
signature_version='s3v4',
retries={'max_attempts': 3, 'mode': 'adaptive'},
max_pool_connections=50
)
)
def create_bucket(self, bucket_name, versioning=False):
"""创建 Bucket"""
try:
self.s3.create_bucket(Bucket=bucket_name)
if versioning:
self.s3.put_bucket_versioning(
Bucket=bucket_name,
VersioningConfiguration={'Status': 'Enabled'}
)
print(f"Bucket '{bucket_name}' 创建成功")
except ClientError as e:
if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
print(f"Bucket '{bucket_name}' 已存在")
else:
raise
def upload_file(self, local_path, bucket, key, metadata=None):
"""上传文件(自动选择分片上传)"""
from boto3.s3.transfer import TransferConfig
config = TransferConfig(
multipart_threshold=1024 * 25, # 25MB 以上分片
max_concurrency=10,
multipart_chunksize=1024 * 25,
use_threads=True
)
extra_args = {}
if metadata:
extra_args['Metadata'] = metadata
file_size = os.path.getsize(local_path)
print(f"上传 {local_path} ({file_size/1024/1024:.1f}MB) -> s3://{bucket}/{key}")
self.s3.upload_file(
local_path, bucket, key,
Config=config,
ExtraArgs=extra_args
)
print("上传完成")
def download_file(self, bucket, key, local_path):
"""下载文件"""
os.makedirs(os.path.dirname(local_path), exist_ok=True)
self.s3.download_file(bucket, key, local_path)
print(f"下载完成: {local_path}")
def list_objects(self, bucket, prefix='', max_keys=1000):
"""列出对象(支持分页)"""
paginator = self.s3.get_paginator('list_objects_v2')
objects = []
for page in paginator.paginate(
Bucket=bucket,
Prefix=prefix,
PaginationConfig={'MaxItems': max_keys}
):
for obj in page.get('Contents', []):
objects.append({
'key': obj['Key'],
'size': obj['Size'],
'last_modified': obj['LastModified'],
'etag': obj['ETag'].strip('"')
})
return objects
def delete_object(self, bucket, key):
"""删除对象"""
self.s3.delete_object(Bucket=bucket, Key=key)
def delete_objects_by_prefix(self, bucket, prefix):
"""批量删除指定前缀的对象"""
objects = self.list_objects(bucket, prefix)
if not objects:
print("没有找到匹配的对象")
return
# 批量删除(每次最多 1000 个)
delete_list = [{'Key': obj['key']} for obj in objects]
for i in range(0, len(delete_list), 1000):
batch = delete_list[i:i+1000]
self.s3.delete_objects(
Bucket=bucket,
Delete={'Objects': batch}
)
print(f"已删除 {len(delete_list)} 个对象")
def generate_presigned_url(self, bucket, key, expiration=3600, method='get_object'):
"""生成预签名 URL"""
url = self.s3.generate_presigned_url(
method,
Params={'Bucket': bucket, 'Key': key},
ExpiresIn=expiration
)
return url
def set_lifecycle_policy(self, bucket, rules):
"""设置生命周期策略"""
self.s3.put_bucket_lifecycle_configuration(
Bucket=bucket,
LifecycleConfiguration={'Rules': rules}
)
def get_bucket_stats(self, bucket):
"""获取 Bucket 统计信息"""
paginator = self.s3.get_paginator('list_objects_v2')
total_size = 0
total_count = 0
for page in paginator.paginate(Bucket=bucket):
for obj in page.get('Contents', []):
total_size += obj['Size']
total_count += 1
return {
'object_count': total_count,
'total_size_bytes': total_size,
'total_size_gb': total_size / 1024**3
}
# 使用示例
storage = H3CObjectStorage(
endpoint_url='http://rgw.example.com:7480',
access_key='your-access-key',
secret_key='your-secret-key'
)
# 创建 Bucket
storage.create_bucket('data-lake', versioning=True)
# 上传文件
storage.upload_file(
'/data/dataset.tar.gz',
'data-lake',
'datasets/2024/dataset.tar.gz',
metadata={'source': 'production', 'version': '2024-01'}
)
# 生成下载链接(有效期 1 天)
url = storage.generate_presigned_url('data-lake', 'datasets/2024/dataset.tar.gz', 86400)
print(f"下载链接: {url}")
# 设置生命周期(30 天后归档,1 年后删除)
storage.set_lifecycle_policy('data-lake', [
{
'ID': 'archive-old-data',
'Status': 'Enabled',
'Filter': {'Prefix': 'logs/'},
'Transitions': [
{'Days': 30, 'StorageClass': 'STANDARD_IA'},
{'Days': 90, 'StorageClass': 'GLACIER'}
],
'Expiration': {'Days': 365}
}
])对象存储最佳实践
命名规范
Bucket 命名:
- 全小写字母、数字、连字符
- 3-63 个字符
- 不能以连字符开头或结尾
示例:prod-data-lake, backup-2024, ml-training-data
Key(对象路径)命名:
- 使用 / 模拟目录结构
- 避免特殊字符(空格、中文等)
- 使用日期分区提升查询效率
示例:
logs/2024/01/15/app.log
datasets/nlp/sentiment/train.jsonl
backups/mysql/2024-01-15/full.tar.gz性能优化
python
# 并发上传多个文件
import concurrent.futures
import os
def upload_directory(storage, local_dir, bucket, prefix=''):
"""并发上传整个目录"""
files = []
for root, dirs, filenames in os.walk(local_dir):
for filename in filenames:
local_path = os.path.join(root, filename)
# 计算相对路径作为 S3 Key
rel_path = os.path.relpath(local_path, local_dir)
s3_key = os.path.join(prefix, rel_path).replace('\\', '/')
files.append((local_path, s3_key))
print(f"共 {len(files)} 个文件待上传")
with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
futures = {
executor.submit(storage.upload_file, local, bucket, key): (local, key)
for local, key in files
}
completed = 0
for future in concurrent.futures.as_completed(futures):
completed += 1
local, key = futures[future]
try:
future.result()
print(f"[{completed}/{len(files)}] 完成: {key}")
except Exception as e:
print(f"[{completed}/{len(files)}] 失败: {key} - {e}")
upload_directory(storage, '/data/training-dataset', 'ml-data', 'datasets/v2/')