Skip to content

对象存储 S3

对象存储概念

对象存储将数据以"对象"形式存储,每个对象包含数据本身、元数据和唯一标识符(Key)。

对象存储 vs 块存储 vs 文件存储:

块存储:
  数据以固定大小的块存储
  需要文件系统格式化
  适合:数据库、VM 磁盘
  访问方式:iSCSI/FC/NVMe

文件存储:
  数据以文件和目录树组织
  支持 POSIX 操作(open/read/write)
  适合:共享文件、应用数据
  访问方式:NFS/CIFS

对象存储:
  数据以对象存储,扁平命名空间
  通过 HTTP API 访问(S3/Swift)
  适合:非结构化数据、备份、大数据
  优势:无限扩展、低成本、高可用

H3C 对象存储方案

Ceph RGW(推荐)

Ceph RADOS Gateway 提供 S3 兼容的对象存储接口:

bash
# 部署 RGW
ceph orch apply rgw mystore \
    --realm=myrealm \
    --zone=myzone \
    --placement="3 storage01 storage02 storage03"

# 创建用户
radosgw-admin user create \
    --uid=myuser \
    --display-name="My User" \
    --email=user@example.com

# 获取访问密钥
radosgw-admin user info --uid=myuser | python3 -c "
import sys, json
d = json.load(sys.stdin)
keys = d['keys'][0]
print(f'Access Key: {keys[\"access_key\"]}')
print(f'Secret Key: {keys[\"secret_key\"]}')
"

# 创建 Bucket
aws s3 mb s3://my-bucket \
    --endpoint-url http://rgw.example.com:7480

# 上传文件
aws s3 cp local-file.txt s3://my-bucket/path/file.txt \
    --endpoint-url http://rgw.example.com:7480

RGW 配置优化

ini
# ceph.conf RGW 相关配置
[client.rgw.mystore]
# 监听地址和端口
rgw_frontends = beast endpoint=0.0.0.0:7480

# 性能优化
rgw_thread_pool_size = 512
rgw_max_chunk_size = 4194304  # 4MB 分片

# 缓存配置
rgw_cache_enabled = true
rgw_cache_lru_size = 10000

# 多站点同步(跨数据中心)
# rgw_zone = myzone
# rgw_zonegroup = myzonegroup

S3 API 完整使用指南

Python boto3 完整示例

python
import boto3
import os
from botocore.config import Config
from botocore.exceptions import ClientError

class H3CObjectStorage:
    """H3C 对象存储客户端封装"""
    
    def __init__(self, endpoint_url, access_key, secret_key, region='us-east-1'):
        self.s3 = boto3.client(
            's3',
            endpoint_url=endpoint_url,
            aws_access_key_id=access_key,
            aws_secret_access_key=secret_key,
            region_name=region,
            config=Config(
                signature_version='s3v4',
                retries={'max_attempts': 3, 'mode': 'adaptive'},
                max_pool_connections=50
            )
        )
    
    def create_bucket(self, bucket_name, versioning=False):
        """创建 Bucket"""
        try:
            self.s3.create_bucket(Bucket=bucket_name)
            if versioning:
                self.s3.put_bucket_versioning(
                    Bucket=bucket_name,
                    VersioningConfiguration={'Status': 'Enabled'}
                )
            print(f"Bucket '{bucket_name}' 创建成功")
        except ClientError as e:
            if e.response['Error']['Code'] == 'BucketAlreadyOwnedByYou':
                print(f"Bucket '{bucket_name}' 已存在")
            else:
                raise
    
    def upload_file(self, local_path, bucket, key, metadata=None):
        """上传文件(自动选择分片上传)"""
        from boto3.s3.transfer import TransferConfig
        
        config = TransferConfig(
            multipart_threshold=1024 * 25,   # 25MB 以上分片
            max_concurrency=10,
            multipart_chunksize=1024 * 25,
            use_threads=True
        )
        
        extra_args = {}
        if metadata:
            extra_args['Metadata'] = metadata
        
        file_size = os.path.getsize(local_path)
        print(f"上传 {local_path} ({file_size/1024/1024:.1f}MB) -> s3://{bucket}/{key}")
        
        self.s3.upload_file(
            local_path, bucket, key,
            Config=config,
            ExtraArgs=extra_args
        )
        print("上传完成")
    
    def download_file(self, bucket, key, local_path):
        """下载文件"""
        os.makedirs(os.path.dirname(local_path), exist_ok=True)
        self.s3.download_file(bucket, key, local_path)
        print(f"下载完成: {local_path}")
    
    def list_objects(self, bucket, prefix='', max_keys=1000):
        """列出对象(支持分页)"""
        paginator = self.s3.get_paginator('list_objects_v2')
        objects = []
        
        for page in paginator.paginate(
            Bucket=bucket,
            Prefix=prefix,
            PaginationConfig={'MaxItems': max_keys}
        ):
            for obj in page.get('Contents', []):
                objects.append({
                    'key': obj['Key'],
                    'size': obj['Size'],
                    'last_modified': obj['LastModified'],
                    'etag': obj['ETag'].strip('"')
                })
        
        return objects
    
    def delete_object(self, bucket, key):
        """删除对象"""
        self.s3.delete_object(Bucket=bucket, Key=key)
    
    def delete_objects_by_prefix(self, bucket, prefix):
        """批量删除指定前缀的对象"""
        objects = self.list_objects(bucket, prefix)
        if not objects:
            print("没有找到匹配的对象")
            return
        
        # 批量删除(每次最多 1000 个)
        delete_list = [{'Key': obj['key']} for obj in objects]
        for i in range(0, len(delete_list), 1000):
            batch = delete_list[i:i+1000]
            self.s3.delete_objects(
                Bucket=bucket,
                Delete={'Objects': batch}
            )
        print(f"已删除 {len(delete_list)} 个对象")
    
    def generate_presigned_url(self, bucket, key, expiration=3600, method='get_object'):
        """生成预签名 URL"""
        url = self.s3.generate_presigned_url(
            method,
            Params={'Bucket': bucket, 'Key': key},
            ExpiresIn=expiration
        )
        return url
    
    def set_lifecycle_policy(self, bucket, rules):
        """设置生命周期策略"""
        self.s3.put_bucket_lifecycle_configuration(
            Bucket=bucket,
            LifecycleConfiguration={'Rules': rules}
        )
    
    def get_bucket_stats(self, bucket):
        """获取 Bucket 统计信息"""
        paginator = self.s3.get_paginator('list_objects_v2')
        total_size = 0
        total_count = 0
        
        for page in paginator.paginate(Bucket=bucket):
            for obj in page.get('Contents', []):
                total_size += obj['Size']
                total_count += 1
        
        return {
            'object_count': total_count,
            'total_size_bytes': total_size,
            'total_size_gb': total_size / 1024**3
        }


# 使用示例
storage = H3CObjectStorage(
    endpoint_url='http://rgw.example.com:7480',
    access_key='your-access-key',
    secret_key='your-secret-key'
)

# 创建 Bucket
storage.create_bucket('data-lake', versioning=True)

# 上传文件
storage.upload_file(
    '/data/dataset.tar.gz',
    'data-lake',
    'datasets/2024/dataset.tar.gz',
    metadata={'source': 'production', 'version': '2024-01'}
)

# 生成下载链接(有效期 1 天)
url = storage.generate_presigned_url('data-lake', 'datasets/2024/dataset.tar.gz', 86400)
print(f"下载链接: {url}")

# 设置生命周期(30 天后归档,1 年后删除)
storage.set_lifecycle_policy('data-lake', [
    {
        'ID': 'archive-old-data',
        'Status': 'Enabled',
        'Filter': {'Prefix': 'logs/'},
        'Transitions': [
            {'Days': 30, 'StorageClass': 'STANDARD_IA'},
            {'Days': 90, 'StorageClass': 'GLACIER'}
        ],
        'Expiration': {'Days': 365}
    }
])

对象存储最佳实践

命名规范

Bucket 命名:
  - 全小写字母、数字、连字符
  - 3-63 个字符
  - 不能以连字符开头或结尾
  示例:prod-data-lake, backup-2024, ml-training-data

Key(对象路径)命名:
  - 使用 / 模拟目录结构
  - 避免特殊字符(空格、中文等)
  - 使用日期分区提升查询效率
  示例:
    logs/2024/01/15/app.log
    datasets/nlp/sentiment/train.jsonl
    backups/mysql/2024-01-15/full.tar.gz

性能优化

python
# 并发上传多个文件
import concurrent.futures
import os

def upload_directory(storage, local_dir, bucket, prefix=''):
    """并发上传整个目录"""
    files = []
    for root, dirs, filenames in os.walk(local_dir):
        for filename in filenames:
            local_path = os.path.join(root, filename)
            # 计算相对路径作为 S3 Key
            rel_path = os.path.relpath(local_path, local_dir)
            s3_key = os.path.join(prefix, rel_path).replace('\\', '/')
            files.append((local_path, s3_key))
    
    print(f"共 {len(files)} 个文件待上传")
    
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        futures = {
            executor.submit(storage.upload_file, local, bucket, key): (local, key)
            for local, key in files
        }
        
        completed = 0
        for future in concurrent.futures.as_completed(futures):
            completed += 1
            local, key = futures[future]
            try:
                future.result()
                print(f"[{completed}/{len(files)}] 完成: {key}")
            except Exception as e:
                print(f"[{completed}/{len(files)}] 失败: {key} - {e}")

upload_directory(storage, '/data/training-dataset', 'ml-data', 'datasets/v2/')

褚成志的云与计算笔记