大文件分段上传

您所在的位置:网站首页 天翼云盘上传文件夹在哪找到 大文件分段上传

大文件分段上传

2024-07-04 16:32| 来源: 网络整理| 查看: 265

前提条件 开通对象存储服务。 在天翼云对象存储控制台获取Access Key、Secret Key和外网访问控制的endpoint。 安装Python环境,下载对应版本的ZOS官方Python SDK (本文以sdk_python3.X.tar为例),请参见开发者文档。 环境配置 解压sdk_python3.X.tar,在解压出的sdk文件夹下,打开cmd导入需要的包。 pip3 install -r requirements.txt 把sdk文件夹中的service-2.sns.sdk-extras.json和service-2.s3.sdk-extras.json文件放到“C:\Users\你的用户名.aws\models\s3\2006-03-01”文件夹(如果不存在该文件,可自行创建)下。 分段上传 操作场景 大型文件上传:当需要上传大型文件时,使用分段上传可以更高效地完成任务。相比于一次性上传整个文件,分段上传允许在网络故障或其他原因导致上传中断时,只需要重新传输中断的片段,而不需要重新上传整个文件。 网络不稳定环境:在网络连接不稳定或带宽有限的环境中,分段上传可以降低因网络中断或超时导致整个文件上传失败的风险。当上传过程中发生中断,只需要重新上传中断的片段,而不需要重新上传整个文件。 文件大小不确定:可以在需要上传的文件大小还不确定的情况下开始上传,这种场景在视频监控等行业应用中比较常见。 操作步骤 创建py文件,并引入boto3包的session模块。 from boto3.sessionimport Session 配置用于访问对象存储服务的凭证Access Key、Secret Key和外网访问地址endpoint。 access_key = "此处输入你的Access Key" # 这里输入你的Access Key secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key url = "此处输入你的endpoint" # 这里输入你的endpoint 获取对象存储服务的操作客户端。 session =Session(access_key, secret_key) s3_client =session.client('s3', endpoint_url=url) 开启分块上传,获取uploadID。 # 在’’中填入要上传对象的桶名(必须已存在)和对象名。 dict_cmp = s3_client.create_multipart_upload(Bucket='?', Key='?') uploadID = dict_cmp['UploadId'] # 记录uploadID,用于后续上传分块。 分块读取要上传的文件。 # 设置列表,用于收集文件的所有分块 chunks = [] # 分块读取文件 with open("此处输入你的文件地址", 'rb') as f: while True: chunk = f.read(5 * 1024 * 1024) # 此处输入分块大小,本示例是5M,分块大小最低为5M if not chunk: break chunks.append(chunk) 上传所有分块并记录各个分块的ETag和编号。 # 设置列表,用于记录上传分块的ETag和编号。 parts = [] i = 0 for chunk in chunks: i += 1 # 上传分块 dict_up = s3_client.upload_part(Bucket='?', Body=chunk, Key='?', PartNumber=i, UploadId=uploadID) # 记录该分块的ETag和编号 part = { 'ETag': dict_up['ETag'], 'PartNumber': i, } parts.append(part) 完成所有分块的拼接,存入ZOS。 # 完成所有分块拼接后,上传成功。 s3_client.complete_multipart_upload( Bucket='?', Key='?', MultipartUpload={ 'Parts': parts }, UploadId=uploadID, ) 整体代码 from boto3.session import Session access_key = "此处输入你的Access Key" # 这里输入你的Access Key secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key url = "此处输入你的endpoint" # 这里输入你的endpoint key = '?' # 传入桶内后的文件名 bucketName = '?' # 要传入文件的桶名 try: session = Session(access_key, secret_key) s3_client = session.client('s3', endpoint_url=url) # 开启分块上传,获取uploadID dict_cmp = s3_client.create_multipart_upload(Bucket=bucketName, Key=key) uploadID = dict_cmp['UploadId']#记录uploadID,用于后续上传分块。 filePos = "此处输入文件路径" size = 5 * 1024 * 1024 #此处输入分块大小,本示例是5M,分块大小最低为5M # 设置列表,用于收集文件的所有分块 chunks = [] # 分块读取文件 with open(filePos, 'rb') as f: while True: chunk = f.read(size) if not chunk: break chunks.append(chunk) # 设置列表,用于收集上传分块的ETag和编号。 parts = [] i = 0 for chunk in chunks: i += 1 # 上传分块 dict_up = s3_client.upload_part(Bucket=bucketName, Body=chunk, Key=key, PartNumber=i, UploadId=uploadID) # 记录该分块的ETag和编号 part = { 'ETag': dict_up['ETag'], 'PartNumber': i, } parts.append(part) # 完成整个拼接传入 s3_client.complete_multipart_upload( Bucket=bucketName, Key=key, MultipartUpload={ 'Parts': parts }, UploadId=uploadID, ) print('文件上传成功!') except BaseException: print("上传异常,请检查各个参数后重试。") finally: # 关闭文件流 f.close() 追加上传 操作场景

通过普通上传创建的对象,用户无法在原对象上进行追加写操作,如果对象内容发生了改变,只能重新上传同名对象来进行修改。这在日志、视频监控等数据复写较频繁的场景中使用不方便。所以可以通过追加上传的方式来只上传增加部分的内容,增强扩展性,提高文件的上传效率。

操作步骤

创建py文件,并引入boto3包的session模块。

from boto3.sessionimport Session

配置用于访问对象存储服务的凭证Access Key、Secret Key和外网访问地址endpoint。

access_key = "此处输入你的Access Key" # 这里输入你的Access Key secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key url = "此处输入你的endpoint" # 这里输入你的endpoint

获取对象存储服务的操作客户端。

session =Session(access_key, secret_key) s3_client =session.client('s3', endpoint_url=url)

读取文件并记录文件md5值。

with open('输入要上传文件的路径', 'rb') as file: data = file.read() md5 = hashlib.md5(data).digest() md5 = base64.b64encode(md5)

上传对象。 (1)首次上传

s3_client.put_object(Bucket='输入要传入的桶名', Metadata=dict(m1='m1'), Body=data, Key='输入存入后对象的键值', ContentMD5=str(md5, 'utf-8'), Append=True,# 开启追加上传 AppendPosition=0)# 指定追加上传开始的位置

(2)追加上传

# 获取需要追加上传的对象信息 response = s3_client.head_object(Bucket="bucketblocks", Key="mac.txt") # 获取当前对象长度 appendPos = response['ContentLength'] s3_client.put_object(Bucket='输入要传入的桶名', Metadata=dict(m1='m1'), Body=data, Key='输入存入后对象的键值', ContentMD5=str(md5, 'utf-8'), Append=True,# 开启追加上传 AppendPosition=appendPos)# 指定追加上传开始的位置 整体代码 from boto3.session import Session import hashlib import base64 access_key = "此处输入你的Access Key" # 这里输入你的Access Key secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key url = "此处输入你的endpoint" # 这里输入你的endpoint # 上传文件桶的桶名 bname = '输入要传入的桶名' # 文件在桶内存储的key值 key = '输入存入后对象的键值' # 追加上传开始的位置,默认为0 appendPos = 0 try: # 读取文件并记录文件md5值 with open('输入要上传文件的路径', 'rb') as file: data = file.read() md5 = hashlib.md5(data).digest() md5 = base64.b64encode(md5) # 获取对象存储服务的操作客户端 session = Session(access_key, secret_key) s3_client = session.client("s3", endpoint_url=url) # 获取桶内所有对象信息 response = s3_client.list_objects(Bucket=bname) # 遍历所有对象键值 for obj in response['Contents']: # 桶中存在与待上传对象同key的对象,需要判断该对象能否进行追加上传 if obj['Key'] == key: res = s3_client.head_object(Bucket=bname, Key=key) if res['ResponseMetadata']['HTTPHeaders']['x-rgw-object-type'] != 'Appendable': print('该对象不是Appendable类型,无法进行追加上传.') raise Exception #桶内对象可以进行追加上传,更新追加上传开始的位置 appendPos = res['ContentLength'] # 完成追加上传 s3_client.put_object(Bucket=bname, Metadata=dict(m1='m1'), Body=data, Key=key, ContentMD5=str(md5, 'utf-8'), Append=True, AppendPosition=appendPos) if appendPos == 0: print('初次上传成功.') else: print('追加上传成功.') except BaseException: print("参数有问题,请修改后重试.") finally: # 关闭文件流 file.close() 断点续传 操作场景 大文件上传:当需要上传大文件时,如视频、备份文件或软件包等,由于上传时间较长,在上传过程中可能会遇到各种问题,例如网络中断、上传工具崩溃或用户手动中断等。断点续传的操作场景允许用户在上传中断后,能够从中断点继续上传,而不需要重新上传整个文件。 网络不稳定:在网络环境不稳定的情况下,如移动网络或低带宽连接,大文件的上传过程可能会中断或超时。通过断点续传,在宽带不稳定的情况下,上传仍可恢复并继续进行。 长时间上传:某些上传任务可能需要较长的时间才能完成,这可能会增加上传过程中意外中断的风险。断点续传可以将上传任务分段处理,并保存上传进度信息,以便在上传中断后能够恢复并继续上传。 客户端或服务端故障:在上传过程中,当客户端或服务端发生故障时,断点续传允许用户重新连接并从中断点继续上传,而不会对上传任务造成重大影响。 操作步骤 创建py文件,并引入boto3包的session模块。 from boto3.sessionimport Session 配置用于访问对象存储服务的凭证Access Key、Secret Key和外网访问地址endpoint。 access_key = "此处输入你的Access Key" # 这里输入你的Access Key secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key url = "此处输入你的endpoint" # 这里输入你的endpoint 获取对象存储服务的操作客户端。 session =Session(access_key, secret_key) s3_client =session.client('s3', endpoint_url=url) 查询桶中还没有完成或放弃的分块上传,获取需要续传对象的UploadId和Key。 response = s3_client.list_multipart_uploads(Bucket='输入对象传入的桶名') uploadInfo = response['Uploads'] 根据上一步得到的UploadId和Key来获取当前对象已经上传完成的分段信息。 response = s3_client.list_parts(Bucket='输入对象传入的桶名', Key='Key值', UploadId='UploadId值') finishedPartsInfo = response['Parts'] # 设置列表,用于记录当前已经完成上传分块的编号。 finishedPartNumbers = [] # 记录分块的大小。 partSize = 0 # 设置列表,用于收集上传分块的ETag和编号。 parts = [] for finishedPart in finishedPartsInfo: finishedPartNumbers.append(finishedPart['PartNumber']) if partSize < finishedPart['Size']: partSize = finishedPart['Size'] part = { 'ETag': finishedPart['ETag'], 'PartNumber': finishedPart['PartNumber'] } parts.append(part) 上传对象中未完成的分段,并记录各分段的ETag信息。 # 设置列表,用于收集文件的所有分块 chunks = [] # 分块读取文件 with open("此处输入你的文件地址", 'rb') as f: while True: chunk = f.read(5*1024*1024) # 此处输入分块大小,本示例是5M,分块大小最低为5M if not chunk: break chunks.append(chunk) i = 0 for chunk in chunks: i += 1 # 当该分块已经完成上传时,跳过。 if i in finishedPartNumbers: continue # 上传分块 response = s3_client.upload_part(Bucket=bucketName, Body=chunk, Key=key, PartNumber=i, UploadId='UploadId值') # 记录该分块的ETag和编号 part = { 'ETag': response['ETag'], 'PartNumber': i, } parts.append(part) 完成所有分块的拼接,存入ZOS。 # 完成所有分块拼接后,上传成功。 s3_client.complete_multipart_upload( Bucket=bucketName, Key=key, MultipartUpload={ 'Parts': parts }, UploadId=uploadID, ) 整体代码 from boto3.session import Session access_key = "此处输入你的Access Key" # 这里输入你的Access Key secret_key = "此处输入你的Secret Key" # 这里输入你的Secret Key url = "此处输入你的endpoint" # 这里输入你的endpoint bucketName = '?' # 要传入文件的桶名 filePos = "?" # 待上传文件所在位置 try: session = Session(access_key, secret_key) s3_client = session.client('s3', endpoint_url=url) # 查询桶中还没有完成或放弃的分块上传,获取需要续传对象的UploadId和Key。 response = s3_client.list_multipart_uploads(Bucket=bucketName) uploadInfo = response['Uploads'] if uploadInfo is None: print(bucketName + '桶内没有需要断点上传的对象。') raise BaseException # 获取需要续传的对象的UploadId和Key,这里以第一个对象为例 uploadId = uploadInfo[0]['UploadId'] key = uploadInfo[0]['Key'] # 获取已经完成上传的分块信息 response = s3_client.list_parts(Bucket=bucketName, Key=key, UploadId=uploadId) finishedPartsInfo = response['Parts'] # 设置列表,用于记录当前已经完成上传分块的编号。 finishedPartNumbers = [] # 记录分块的大小。 partSize = 0 # 设置列表,用于收集上传分块的ETag和编号。 parts = [] for finishedPart in finishedPartsInfo: finishedPartNumbers.append(finishedPart['PartNumber']) if partSize < finishedPart['Size']: partSize = finishedPart['Size'] part = { 'ETag': finishedPart['ETag'], 'PartNumber': finishedPart['PartNumber'] } parts.append(part) # 设置列表,用于收集文件的所有分块 chunks = [] # 分块读取文件 with open(filePos, 'rb') as f: while True: chunk = f.read(partSize) # 此处输入分块大小,本示例是5M,分块大小最低为5M if not chunk: break chunks.append(chunk) i = 0 for chunk in chunks: i += 1 # 当该分块已经完成上传时,跳过。 if i in finishedPartNumbers: continue # 上传分块 response = s3_client.upload_part(Bucket=bucketName, Body=chunk, Key=key, PartNumber=i, UploadId=uploadId) # 记录该分块的ETag和编号 part = { 'ETag': response['ETag'], 'PartNumber': i, } parts.append(part) # 完成整个拼接传入 s3_client.complete_multipart_upload( Bucket=bucketName, Key=key, MultipartUpload={ 'Parts': parts }, UploadId=uploadId ) print('文件上传成功!') except BaseException: print("上传异常,请检查各个参数后重试。") finally: f.close()


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3