《异步:多线程-多进程-协程与回调》

本文最后更新于:2024年8月11日 晚上

-

一般情况下,程序都是串行运行的,意味着你的A代码如果执行用时超长,比如调用Windows的程序API进行操作,或者请求某个服务器以1KB/s的速度下载内容,那么下一条代码B可能这辈子都无法运行了,在实际的业务中,我们当然不希望整个程序因为这若有若无的网速卡在那里,必须做点什么,因此把串行改并行是迫切的,于是==异步==的概念就浮出水面

异步

按照我的理解,==异步==是一个状态,他的具体实现方式有多线程多进程协程与回调,这些方法都能实现异步,但是具体的实现原理不同,因此根据代码状态选择一个合适的异步手段尤为重要,因为还没有深入了解,因此在此随意列举一二:

多线程

线程是对程序控制的最小单位,一个进程可以有多个现成,CPU每次处理任务也是一次处理一个线程的请求,那么我们可以在程序运行时让一个线程 挂起 或者 等待的时候执行另外一个线程,多个线程是在一个进程下的,为了防止冲突,使用 来确保每个线程的操作权限

比如:多个线程同时对一个变量或者一个文件进行写入操作,该听谁的?这个时候只有持有 写入锁的线程才有对这个变量的写入权限,其他线程的写入就得等待,当持有锁的进程写入完毕后,释放锁,第二个排队写入的线程就能拿到 写入锁然后进行写入操作

但是线程越多编程人员对程序的控制能力越发下降,因为锁的传递也越来越复杂,因此能不能把一些不需要对本地变量进行写入的操作直接开一个新的进程,而不干扰原来的进程呢?

==多进程==应运而生

多进程

每次执行一个任务,就开一个进程,这样就能避免锁的影响,而且能更好的使用多核CPU的能力,坏处就是资源占用会比较多,而且变量的传递也有些困难。

协程与回调

这是最朴素的方法了,与多线程和多进程不同,==协程==突出一格==协==字,当主程序运行中遇到一个需要等待的步骤,而下一个步骤并不十分需要这个返回的内容时,我们采用协程

例如,A步骤需要请求一个网络资源,下一个步骤B需要读取配置文件,两个步骤本身并不互相影响,那么当个A去请求网络资源时,把A当做协程==挂起==,让B直接读取配置文件,使得程序能够继续往下运行,当A请求完毕返回内容时,我们可以用一个==回调==程序去处理这些返回内容,这样就可以实现程序不被某一个预计需要大量时间的步骤给阻塞掉

但是使用协程必须要去这个步骤/函数/方法是可等待的Awaitable,网络请求request就是可等待的,计算1+1是即时的,是不可等待的。因此协程本身有一定的使用局限

实际情况

0

现在我们希望,读取一个文件夹内的word文件,然后计算每个文件的字数,求和,一个串行实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
from win32com.client import DispatchEx
from os import walk,path

origin_file = '.\all_doc'
walk_file = [x for _,_,x in walk(origin_file)][0] # 拿到所有doc文件的名称
all_word = 0
word = DispatchEx("Word.Application")
for file in walk_file:
word_count = 0
# 假设有一个判断是不是doc文件的方法
if end_with_doc(file):
doc = word.Documents.Open(path.join(path.abspath(origin_file),file))
doc.Saved = True
word_count = doc.ComputeStatistics(Statistic=0)
doc.Close(SaveChanges=0)
del doc
all_word += word_count
word.Quit()
print(f"所有的字数为{all_word}")

这个方法读取一个四页word文件的速度大概是1~3秒一个,如果是四十五十个那需要更多的时间,这是我们难以接受的,我们能否考虑出一个并行的办法,一次对多个word文件进行统计?

1

我似乎已经实现了一个多进程的方法,但是他的效果并不好,因为进程之间的变量是难以传递的,多进程通信又比较难,因此姑且放在这,有实例的人可以进行改进:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
from win32com.client import DispatchEx
from os import walk,path
from multiprocess import Pool

def count_word(path):
try:
word = DispatchEx("Word.Application")
doc = word.Documents.Open(path.join(path.abspath(origin_file),file))
doc.Saved = True
word_count = doc.ComputeStatistics(Statistic=0)
doc.Close(SaveChanges=0)
del doc
word.Quit()
except Exception as e:
prtin(f"发生错误{e}")
return word_count
def show(msg):
global all_word += msg


origin_file = '.\all_doc'
walk_file = [x for _,_,x in walk(origin_file)][0] # 拿到所有doc文件的名称
all_word = 0
# 初始化进程池-5
pool = Pool(5)
for file in walk_file:
if end_with_doc(file):
pool.apply_async(func=count_word,args=(
path.join(path.abspath(origin_file),file),)
callback=show)
pool.close()
pool.join()

因为进程间的变量传递我还没学会,而且比较复杂,Quece方法只允许变量同时被一个进程调用,而我们希望word程序被多个进程调用,因此在每一个统计进程里面加上了调用word的方法,而且新进程里的错误在主进程的控制面板看不到的,只能用try-except办法来捕获并debug,对于小程序没有问题, 大程序也许得寻找别的错误发现方案

综上呢,每一次调用 count都得开一个word,非常拖慢速度,因此这个异步方法的速度没有上一个快

我们找到了一种优化方法,可以大幅提升代码效率,内容放在下一节中

2

这个方法本身不适用==协程与回调==,因为方法本身不是可等待的

3

尝试使用多线程解决问题,实际上多线程应该是最好的方案:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from os import path, walk
import pythoncom
import threading
from win32com.client import DispatchEx

def single_thread(file):
# 必须加上这个内容,似乎是防止文件占用混乱
pythoncom.CoInitialize() # 这个方法在pycharm上会报WARN
word = DispatchEx("Word.Application")
# 这里声明为全局变量,这样三个分线程都会从这个主线程的文件池中取出文件来处理
# 因为线程在微观上也是串行处理的,不用担心多个线程拿到同一个文件的问题
global get_all_file_name
# 当文件池取空时,跳出while循环,关闭word后台
while len(get_all_file_name) > 0:
doc = word.Documents.Open(get_all_file_name.pop())
doc.Saved = True
word_count = doc.ComputeStatistics(Statistic=0)
img_count = doc.InlineShapes.Count
doc.Close(SaveChanges=0)
del doc
else:
word.Quit()
pythoncom.CoUninitialize()
return img_count,word_count

file_path = './all_doc'
# 通过一个复杂方法来拿到给定目录下的所有文件的绝对路径,似乎也可以使用path.abspath()来完成
get_all_file_name = [path.join(file_path,filename) for _,_,filename in
walk(file_path)][0]
# 循环载入线程,假设要开启三个线程
create_thread = []
for _ in range(3):
create_thread.append(threading.Thread(target=single_thread))
# 遍历激活每个线程
for one_thread in create_thread:
threads.start()
# 主线程等待多线程结束
for wait_thread in create_thread:
wait_thread.join()

这个多线程方法比较完善,第一版的时候我们曾经考虑过根据进程数生成一个包含多个DispatchEx(...)对象的列表,然后以传参的形式传入以供调用,后来发现似乎不是很需要,考虑到线程是顺序处理的,说不定甚至只用一个word对象就行,也许读者可以自行实践一下,看看是否有效率上的提升。

更进一步

经过不懈的公关和努力,我们成功的在==多进程==上实现了word的字数统计。具体可以参考上一个代码块,这个是单独的写法,我们后面把他封装成类以供主程序调用,你也可以在此基础上进行DIY,我们接下来讨论一下多进程上的word应用调用和关闭问题。

原来的多进程,因为进程间变量传递机制比较复杂,因此我们采用了一个办法来解决问题——既然传不进去,要不直接在进程里面生成这个变量不就行了?来看实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
from os import path, walk
import pythoncom
import threading
from win32com.client import DispatchEx

def proccess_word(path):
doc = word_for_each_proccess.Documents.Open(path)
doc.Saved = True
word_count = doc.ComputeStatistics(Statistic=0)
img_count = doc.InlineShapes.Count
doc.Close(SaveChanges=0)
del doc
return img_count,word_count

# 构造一个初始化函数,在进程池创建进程时会载入这个函数,因此里面的全局变量就是该进程的全局变量,可以被直接调用
def init()
global word_for_each_proccess
word_for_each_proccess = DispatchEx("Word.Application")

# 一个通知回调,用来告知用户结果,结果也会载入rep中
def show(msg):
if msg is not None:
print(msg)

if __name__ == "__main__":
all_file_path = [...]
pool = Pool(5,initializer=init)
# 用来接收处理结果
rep = []
# 循环载入任务
for file in all_file_path:
rep.append(pool.apply_async(func=proccess_word),args=(file,),
callback=show)
pool.close()
pool.join()
# 解析获得处理结果
for i in rep:
print(i.get())

这个方法已经能解决word本身的调用,但是也随之而来了一个问题,那就是,我们如何结束这个在分进程里开启的进程?诚然,在进程任务处理完毕后,会自动释放掉,但是这个进程拉起的word进程可不会释放掉,我们得在分进程释放掉之前告知分进程关掉word进程,使用word.Quit()方法自然是不二的选择。

我们也曾经想过使用获取进程ppid然后杀掉的方法,但是这个方法需要调用cmd的指令,额外增加代码量,我们更希望能直接使用现成的Quit方法。

这样的话,只有一种办法,那就是在传入变量上动手脚,因为每一次都得从主进程的==文件池==中传入一个文件位置,让分进程去读取和处理,那么我们能否设想一种状态,即传入一个特殊的==毒药==去结束这个word进程?显然是可以的,尤其是我们的==文件池==是提纯过的(上述代码可能没有体现这个筛选过程,实际上我们使用了正则表达式来寻找我们需要的doc/docx文件),那这个==毒药==的投放就相当方便,看实例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
...这里只显示部分改动的代码...
def proccess_word(path):
if path == "####":
word_for_each_proccess.Quit()
else:
doc = word_for_each_proccess.Documents.Open(path)
doc.Saved = True
word_count = doc.ComputeStatistics(Statistic=0)
img_count = doc.InlineShapes.Count
doc.Close(SaveChanges=0)
del doc
return img_count,word_count
....
all_file_path.append("####")
for file in all_file_path:
rep.append(pool.apply_async(func=proccess_word),args=(file,),
callback=show)

我们使用一个几乎不可能有文件使用的名字####来作为==毒药==去给这个进程下毒,因为任务是按顺序派送的,只有到前面正常的文件都被处理完了,才会开始派送这个毒药,接受到这个特殊的字符串后分进程知道自己寿命已到,会调用Quit方法去了结掉word进程,这样一来word后台就能自动退出了。

但,一个更大的问题在于,这里只有一个毒药,如果有多个分进程(这是显然的),那么一个毒药显然不够用,我们自然可以添加更多,但是,分进程本身不像多线程那样顺序处理,他们是完全并行的,也就是说,完全有可能把多个毒药派送给一个进程,只要这个进程处理的速度够快,他就能源源不断的进入等待状态,进程池也就不断调用他去处理这个看起来是文件的毒药,这就坏了事了,分进程不可能不断的执行Quit指令,会报错。

怎么办呢?

你可能认为,那就多放一点毒药,总有机会毒死不止一个进程,一方面处理一个字符串的if判断速度本身极快,一旦出现==重复投毒==就会报错,风险极高,另一方面,就算我们在分进程里面加判断,判断到word还活着才执行裁决quit,那也得在文件尾部加不知道多少个####才能保证==致死率==,10个?100个?1000个?没有数,而且文件池里的文件数量和多进程数量也有关系,有时是3个有时是5个甚至更多分进程,==剂量==就很难把握,我们必须想一种办法让分进程能高效的服用毒药……或者说,既然他们处理的速度够快(往往远小于1秒),那就先派送毒药,再让他们服下,如何?

1
2
3
4
5
6
7
8
9
...省略冗余代码...
from time import sleep
def proccess_word(path):
if path == "####":
sleep(1)
word_for_each_proccess.Quit()
else:
....
...

我们只是单纯的加了一个睡眠函数,领到药之后不能马上处理,因为要沉睡1秒(你觉得不妥可以设置更高,但是会损坏文件处理速度,这个沉睡时间自然会加到处理时间上),而if判断只需很短的一瞬,因此你可以认为,进程池给分进程发了一份==毒药==,但是分进程先小睡一会儿,没有立即释放,进程池发现还有一份毒药,又往下派送给别的分进程,这个执行时间也非常短,因此所有的分进程都拿到了药,1秒才过去,他们都同时服用了==毒药==,最后进程池发现没有任务派送了,因此结束了所有的分进程,世界一片和平!撒花!

后记

经过实际的测试呢(200多数据的大样本)也能安全的退出word程序,1秒钟完全足够,因此只需要给==文件池==加入等同于进程数量的毒药即可。

最终的实现代码我觉得有点丑陋,尤其是看到了分线程的优势后,还是决定不再使用分进程来处理这个问题,经过实际的测试,分线程在小样本(<60份4~5页的文件)上处理会比串行和分进程快(大约是10%,也就1~2秒),再往上的样本数量分线程与多进程实际上不分仲伯(当然大部分情况下分线程比多进程快个1~5秒),少部分情况下分线程大幅领先,再加上分线程能更好的访问处理结果,也不需要==毒药的传递==等等机制,因此代码更加整洁优雅。

当然,上面提供的代码只是一个范本,实际上我的实用代码还结合具体业务进行了特化,不过这也不是读者需要关心的部分,相比你们一定能领悟其中的个中精神,然后结合实际开发出自己的代码!

就这样,我跟异步的斗争又暂时告一段落了,但我想我们很快又会再见的。

这就是我跟异步斗争的故事。

参考资料

因为查阅的文档太多了,时间又有点久,难以具体分辨挑选精品文档,就全放在这吧:

[似乎是进程池问题]

01 02-initializer 03-pool 04 05 官方文档 06
StackOverFlow-Initalizer如何使用?全英文
曾经考虑过如何杀死pool内的进程

[协程]

04


《异步:多线程-多进程-协程与回调》
https://qlozin.top/2022/11/05/多线程多进程与协程/
作者
QLozan
发布于
2022年11月6日
更新于
2024年8月11日
许可协议