Upload Large File

python实现大文件切片上传。有段时间没学习一下新东西了,最近偶尔在学习一下并行和分布式的东西。大文件上传的方式有点并行处理的那味道。所以就抽时间写了一点python实现大文件上传,代码写的比较简陋,大家感兴趣的可以自己实现一个。大部分上传文件其实是在前端实现的,但是原理都差不多,知道懂原理实现就很简单。

背景

文件上传这个需求在哪都有,网盘,上传各种文件等等。在文件比较大的时候有一些比较容易遇到的问题:

  1. 超时。因为前端或者后端代理都有限制最长请求时间,一旦超时文件就没办法长上传成功。
  2. 大小限制。一般后端对上传文件大小会有一定限制。

当然我们可以通过调整最大请求时间和大小限制来解决这个问题,但是会对服务器造成负载。上传失败重来这玩意有点头大。

文件切片

我们可以通过把文件切分成若干个小文件,并且把这些小文件并行的上传到后台,这样的话会节省大量的时间。当上传完成之后,后台会对这些若干个小文件进行合并。这个有点类似分布式(HDFS)的储存方法。

我们可以把文件读取成bytes格式,然后对这些bytes进行切片:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
import os

def split(filename):
CHUNSIZE = 1024 * 1024
file_size = os.stat(filename).st_size
f = open(filename, 'rb')
data = f.read()
f.close()

print("file size: ", file_size)

cursor = 0
chunk_list = []
while cursor <= file_size:
chunk_list.append(data[cursor:cursor + CHUNSIZE])
cursor = cursor + CHUNSIZE

print("chunk number: ", len(chunk_list))

我们可以根据网络状态,文件大小等因素来决定CHUNSIZE的大小。为了方便实现,我这里直接固定了切片的大小。在上传到后台的时候我们需要考虑切片文件的顺序,不然可能会导致文件发生错误,当然这个是后台需要实现的功能,可以稍后讨论。为了让资源得到充分利用我们可以使用多线程进行上传,如果是前端可以采用异步的方式。在这里我用的是Thread的方式实现多线程上传。

文件合并

文件合并常见的方式有:

  1. 上传完所有的小文件之后发送一个请求来合并文件,请求的时候可以传递合并之后文件的名字。
  2. 后台记录切片的index,当达到末尾的时候自动合并。

在这里我实现的是第一种方法(因为懒/(ㄒoㄒ)/~~)。

先简单介绍一个文件合并的原理,先把所有切分的文件根据bytes的格式读取,然后把这些读取的bytes按照顺序写入到新的文件中:

1
2
3
4
f = open('./xx.tar.bz2', 'ab')
for chunk in chunk_list:
f.write(chunk)
f.close()

当把所有的文件都写入到文件之后就成功把这个文件合并了。注意:ab表示appending bytes。

文件秒传

文件秒传是指如果后台已经有一个一样的文件,那么就没必要再次上传,直接返回上传成功即可。这个功能还是比较使用的,比如某盘的资源共享等等。

妙传的关键在于文件的唯一标识。我们可以根据文件的二进制编码进行计算hash值,用来当作文件的唯一标识。通常使用的算法是md5。对于md5算法,我们也有两种不同的方式去计算文件的hahs值。第一种是直接计算。

1
2
3
4
5
import hashlib
f = open(filename, 'rb').read()
md5 = hashlib.md5(f)
hash = md5.hexdigest()
print(hash)

第二种根据每行数据进行update hash值:

1
2
3
4
5
6
7
import hashlib
md5 = hashlib.md5()
with open(filename, 'rb') as lines:
for line in lines:
md5.update(line)
hash = md5.hexdigest()
print(hash)

总结

总体代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import os
import time
import json
import hashlib
import requests
from threading import Thread


class Upload():
def __init__(self, filename, url, CHUNSIZE=1024 * 1024 * 30) -> None:
self.filename = filename
self.url = url
self.md5 = ""
self.chunk_list = []
self.CHUNSIZE = CHUNSIZE


def md5_small(self):
f = open(self.filename, 'rb').read()
md5 = hashlib.md5(f)
self.md5 = md5.hexdigest()
print(md5.hexdigest())


def md5_large(self):
md5 = hashlib.md5()

with open(self.filename, 'rb') as lines:
for line in lines:
md5.update(line)

self.md5 = md5.hexdigest()
print(md5.hexdigest())


def split(self):
file_size = os.stat(self.filename).st_size
f = open(self.filename, 'rb')
data = f.read()
f.close()

print("file size: ", file_size)

cursor = 0
while cursor <= file_size:
self.chunk_list.append(data[cursor:cursor + self.CHUNSIZE])
cursor = cursor + self.CHUNSIZE

print("chunk number: ", len(self.chunk_list))


def async_upload(self, chunks, offset, filename="upt"):
for index, chunk in enumerate(chunks):
data = {
'index': index + offset,
'chunk': chunk.decode('latin-1'),
'filename': filename
}

response = requests.get(
url='{}/upload'.format(self.url),
data=json.dumps(data),
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
)

print(response)


def merge(self):
"""
merge is not belong to this part
"""
data = {
"path": "upt",
"filename": "xx.tar.bz2"
}
response = requests.get(
url='{}/merge'.format(self.url),
data=json.dumps(data),
headers = {
'Accept': 'application/json',
'Content-Type': 'application/json'
}
)

print(response)


def upload(self):
TREADNUM = 3
CHUNKSIZE = len(self.chunk_list)
PERSIZE = int(CHUNKSIZE / TREADNUM)
threads = []

for i in range(0, CHUNKSIZE, PERSIZE):
t = Thread(target=self.async_upload, args=[self.chunk_list[i : i + PERSIZE], i])
t.start()
threads.append(t)

while True:
for t in threads:
if t.is_alive():
continue
break

print("Finish")


if __name__ == '__main__':
upload = Upload('comp-books-master.zip', 'http://127.0.0.1:5000')
upload.split()
upload.upload()
# upload.merge()

后台模拟代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
import os
import glob
from flask import Flask, request

app = Flask(__name__)
CURR = os.getcwd()

@app.route('/upload', methods=["POST","GET"])
def upload():

data = request.get_json()
index = data['index']
chunk = data['chunk']
filename = data['filename']

if not os.path.exists(os.path.join(CURR, filename)):
os.mkdir(os.path.join(CURR, filename))

f = open(os.path.join(CURR, filename, str(index)), 'ab')
f.write(chunk.encode('latin-1'))
f.close()

return "SUCCESS", 200


@app.route('/merge', methods=["POST","GET"])
def meger():
data = request.get_json()
mp = data['path']
filename = data['filename']

f = open(filename, 'ab')
for file in os.listdir(os.path.join(CURR, mp)):
tf = open(os.path.join(CURR, mp, file), 'rb')
f.write(tf.read())
tf.close()

f.close()

return "SUCCESS", 200


if __name__ == '__main__':
app.run(debug=True)
----- End Thanks for reading-----