Python 线程池/进程池 内存管理

concurrent.futures 线程池/进程池内存管理

起因

之前进行爬虫任务的时候遇到了这么一个需求,1G 内存的机器跑爬虫,爬取一个网站的数据,之前使用的是这样一种方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 futures = list()
with ThreadPool(max_workers=20) as exc:
for tr in table.select("tr"):

# 取线程执行结果
future = exc.submit(self.get_asn, tr.strings)
futures.append(future)

# 使用 as_completed 异步方式处理完成任务的线程
for future in as_completed(futures):
result = future.result()
# 拼接 asn.json 的 path
file_path = result["asn"] + ".json"
asn_file = os.path.join(self.base_data_path, file_path)

with open(asn_file, "w", encoding="utf8") as f:
f.write(json.dumps(result))

使用了 concurrent.futures 的 ThreadPoolExecutor submit 方法,因为开启了20个线程同时爬取,连接网站的速度还很快,任务很快就被处理完成了,可以看到我时完成一个任务的时候就进行写文件了操作,但是2分钟后很快程序就终止了,监控程序发现时由于程序内存占用达到 80% 被系统 Kill 掉了。

  • 为什么内存会爆呢?监控内存显示,程序处理完任务之后,内存并没有被立刻释放,而是有很长时间延迟之后才被释放( 在此吐槽 python GC)

改进方法:

参考 https://stackoverflow.com/questions/34770169/using-concurrent-futures-without-running-out-of-ram

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 允许同时进行的工作数
MAX_JOBS_IN_QUEUE = 1000


tr_list = table.select("tr")
tr_left = len(tr_list) - 1 # <----
tr_iter = iter(tr_list) # <------
jobs = dict()

with ThreadPool(max_workers=20) as exc:
while tr_left:
print("#" * 100, "TASK: {} <===> JOB: {}".format(tr_left, len(jobs)))
for tr in tr_iter:
# 取线程执行结果
job = exc.submit(self.get_asn, tr.strings)
jobs[job] = tr
if len(jobs) > MAX_JOBS_IN_QUEUE:
break # limit the job submission for now job

# 使用 as_completed 异步方式处理任务线程
for job in as_completed(jobs):
tr_left -= 1 # one down - many to go... <---
result = job.result()
# 从字典中删除结果,因为我们不需要存储它
del jobs[job]

# 拼接 asn.json 的 path
file_path = result["asn"] + ".json"
asn_file = os.path.join(self.base_data_path, file_path)

with open(asn_file, "w", encoding="utf8") as f:
f.write(json.dumps(result))
break

改进后内存在切换 html 爬取的时候,会偶尔会升高一下,最高 65%,平均在 35% 左右。

  • ProcessPoolExecutor 进程同理
收藏文章
表情删除后不可恢复,是否删除
取消
确定
图片正在上传,请稍后...
评论内容为空!
还没有评论,快来抢沙发吧!