python---并发编程

# 并发编程(并发,并行,同步,异步)

# 通俗理解并发编程中的相关核心概念

核心概念:进程、线程和互斥锁

  • CPU 的作用
    • 计算机的核心是 CPU,它承担了所有的计算任务。它就像一座工厂,时刻在运行。
    • CPU 的核数
      • 假定工厂的电力有限,一次只能供给一个车间使用。也就是说,一个车间开工的时候,其他车间都必须停工。背后的含义就是,单核 CPU 一次只能运行一个任务。这个任务是什么呢?
  • 进程 就好比工厂的车间,它代表 CPU 所能处理的单个任务。
  • 任意时刻,CPU 总是运行一个进程,其他进程处于非运行状态。
  • 基于车间来聊:
    • 一个车间里,可以有很多工人。他们协同完成一个任务。
    • 线程 就好比车间里的工人。一个进程可以包括多个线程
  • 车间的空间是工人们共享的,比如许多房间是每个工人都可以进出的。这象征一个进程的内存空间是被线程共享的,每个线程都可以使用这些共享内存。
  • 基于进程空间可以被线程共享的角度 — 思考:
    • 每间房间的大小不同,有些房间最多只能容纳一个人,比如厕所。里面有人的时候,其他人就不能进去了。这代表一个线程使用某些共享内存时,其他线程必须等它结束,才能使用这一块内存。那么如何实现呢?
    • 一个防止他人进入的简单方法,就是门口加一把锁。先到的人锁上门,后到的人看到上锁,就在门口排队,等锁打开再进去。这就叫 **“互斥锁”**,其作用是防止多个线程同时读写某一块内存区域

# 进程

# 什么是进程

  • 广义定义:进程是一个具有一定独立功能的程序关于某个数据集合的一次运行活动。它是操作系统动态执行的基本单元,在传统的操作系统中,进程既是基本的分配单元,也是基本的执行单元。
  • 在操作系统中,每启动一个应用程序其实就是 OS 开启了一个进程且为进程分类对应的内存 / 资源,应用程序的执行也就是进程在执行。
  • 狭义定义:一个正在运行的应用程序在操作系统中被视为一个进程
  • 举例: 我们有 py1 文件中和 py2 文件,两个文件运行起来后是两个进程。

# 进程调度

  • 提问:
    • 进程就是计算机中正在运行的一个程序或者软件,并且在上述工厂案例中,我们说单个 CPU 一次只能运行一个任务,那么你有没有在电脑上一边聊微信一边听音乐一边打游戏的场景啊?why?
      • 是因为 CPU 在交替运行多个进程。
  • 要想多个进程交替运行,操作系统必须对这些进程进行调度,这个调度也不是随机进行的,而是需要遵循一定的法则,由此就有了进程的调度算法。
    • 目前已实现的调度算法有:先来先服务(FCFS)调度算法、短作业优先调度算法和时间片轮转法。不过被公认的一种比较好的进程调度算法是 "时间片轮转法"。
1
2
3
4
"时间片轮转法"调度算法的实施过程如下所述。
(1) os会创建多个就绪队列存储进程,并为各个队列赋予不同的优先级。第一个队列的优先级最高,第二个队列次之,以此类推。并且该算法赋予各个队列中进程执行时间片的大小也各不相同,在优先级愈高的队列中,为每个进程所规定的执行时间片就愈小。例如,第二个队列的时间片要比第一个队列的时间片长一倍
(2) 当一个新进程进入内存后,首先将它放入第一队列的末尾,排队等待调度。当轮到该进程执行时,如它能在该时间片内完成,便可准备撤离系统;如果它在一个时间片结束时尚未完成,调度程序便将该进程转入第二队列的末尾,再同样地排队等待调度执行;如果它在第二队列中运行一个时间片后仍未完成,再依次将它放入第三队列。
(3) 仅当第一队列空闲时,调度程序才调度第二队列中的进程运行;仅当第1(i-1)队列均空时,才会调度第i队列中的进程运行。如果os正在第i队列中为某进程服务时,又有新进程进入优先权较高的队列(第1~(i-1)中的任何一个队列),则此时新进程将抢占正在运行进程的服务,即由调度程序把正在运行的进程放回到第i队列的末尾,把处理机分配给新到的高优先权进程。

# 并发与并行

  • 通过进程之间的调度,也就是进程之间的切换,我们用户感知到的好像是两个视频文件同时在播放,或者音乐和游戏同时在进行,那就让我们来看一下什么叫做并发和并行。
  • 无论是并行还是并发,在用户看来都是’同时’运行的,不管是进程还是线程,都只是一个任务而已,真实干活的是 cpu,而一个 cpu 同一时刻只能执行一个任务。
  • ** 并行:** 同时运行,只有具备多个 cpu 才能实现并行
  • ** 并发:** 是伪并行,即看起来是同时运行。

并发的关键是你有处理多个任务的能力,不一定要同时。

并行的关键是你有同时处理多个任务的能力。

所以它们最关键的点就是:是否是『同时』。

# 进程的状态

在程序运行的过程中,由于被操作系统的调度算法控制,程序会进入几个状态:就绪,运行、阻塞和终止。

  • 就绪 (Ready) 状态
    • 进程已经准备好,已分配到所需资源 / 内存。
  • 执行 / 运行(Running)状态
    • 进程处于就绪状态被调度后,进程进入执行状态
  • 阻塞 (Blocked) 状态
    • 正在执行的进程由于某些事件(I/O 请求,input, 申请缓存区失败)而暂时无法运行,进程受到阻塞,则进入就绪状态等待系统调用
  • 终止状态
    • 进程结束,或出现错误,或被系统终止,进入终止状态。无法再执行

# 同步和异步

  • 举个例子来说,你去商场买手机的时候正好口渴了。
    • 同步的意思就是说,你和店员说你看上这部手机了,然后店员去仓库拿货,你在店里等待店员回来后再去买水喝。
    • 异步呢,异步的意思就是在店员去仓库拿货的时候,你趁机去买水喝,然后喝完水后,刚好店员也带着你的新手机回来了。
  • 使用方法的调用来举例:
    • 同步方法调用一旦开始,调用者必须等到方法调用返回后,才能继续该方法后续的行为代码。
    • 异步方法调用更像一个消息传递,一旦调用开始,该方法调用就会立即返回,调用者就可以继续后续的操作。而异步方法通常会在另外一个线程 / 进程中,“真实” 地执行着。整个过程,不会阻碍调用者的工作

注意:同步和异步针对是 cup 遇到阻塞操作时,所产生的不同行为!

思考:异步操作是基于并行的还是基于并发的?

  • 异步可以是基于并行的也可以是基于并发的,但是大部分情况下是基于并发的。
    • 基于并发是指,在方法调用开始的时候,启动另一个进程 / 线程执行方法后序的操作,而当前的进程 / 线程执行该方法内部的操作。则当前进程 / 线程和启动的另一个新的进程 / 线程是基于 cpu 的调度算法,调度执行的。
    • 基于并行是指,在多核情况下,如果应用程序执行过程中设计到的计算量特别大,则相关的运算操作启动的进程 / 线程会在另一个 cup 中启动,这样可以实现真正的并行

# Python 进程的实现

# multiprocessing 包

multiprocess 是 python 中管理进程的包。 之所以叫 multi 是取自 multiple 的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块,提供的子模块非常多。

# Process 模块

Process 模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。

之前我们说过,运行一个 py 文件就相当于启动了一个进程,这个进程我们成为 **“主进程”**

而在主进程对应的 py 文件中,可以通过 Process 模块创建另一个进程,这个进程是基于主进程创建的,因此可以被称为 **“子进程”**

当有了两个进程后,我们其实就可以实现异步机制了!

具体实现过程:

1. 导入模块:from multiprocessing import Process

2. 基于 Process 创建一个子进程对象 (当前运行的整个 py 文件表示主进程),然后可以基于 target 参数将外部的一个函数注册到该子进程中

3. 基于 start () 方法启动创建好的子进程

1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process
def func():
print('我是绑定给子进程的一组任务!')

if __name__ == '__main__':
print('主进程开始执行!')
#创建一个进程p,给该进程绑定一组任务
p = Process(target=func)
#启动创建好的进程
p.start()

print('主进程执行结束!')

上面例子说了,我们通过主进程创建的子进程是异步执行的,那么我们就验证一下,并且看一下子进程和主进程来看看是否是父子关系。

		os.getpid() 获取自己进程的ID号

		s.getppid() 获取自己进程的父进程的ID号
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
import os
from multiprocessing import Process
from time import sleep
def func():
print('我是子进程!')
sleep(1)
print('子进程ID号:',os.getpid())
print('该子进程的父进程ID号:',os.getppid())

if __name__ == '__main__':
print('主进程开始执行!主进程的ID号:',os.getpid())
#创建一个进程p,给该进程绑定一组任务
p = Process(target=func)
#启动创建好的进程
p.start()
print('主进程执行结束!')
  • 如何手动给注册在子线程中的函数传递指定的参数?
    • 通过 args 传递参数
1
2
3
4
5
6
7
8
9
10
11
12
from multiprocessing import Process
def func(num1,num2):
print('我是绑定给子进程的一组任务!',num1,num2)

if __name__ == '__main__':
print('主进程开始执行!')
#创建一个进程p,给该进程绑定一组任务
p = Process(target=func,args=(123,456))
#启动创建好的进程
p.start()

print('主进程执行结束!')
  • 使用进程实现异步效果:
    • 同步效果:
1
2
3
4
5
6
7
8
9
10
11
12
import time
def get_request(url):
print('正在请求网址的数据:',url)
time.sleep(2)
print('请求结束:',url)

if __name__ == "__main__":
start = time.time()
urls = ['www.1.com','www.2.com','www.3.com']
for url in urls:
get_request(url)
print('总耗时:',time.time()-start)
  • # 异步效果:

1
2
3
4
5
6
7
8
9
10
11
12
13
import time
from multiprocessing import Process
def get_request(url):
print('正在请求网址的数据:',url)
time.sleep(2)
print('请求结束:',url)

if __name__ == "__main__":
urls = ['www.1.com','www.2.com','www.3.com']
for url in urls:
#创建了三个进程,表示三组任务
p = Process(target=get_request,args=(url,))
p.start()
  • join 方法的使用
    • 思考:如果有时候,主进程需要使用子进程运行后的结果,则必须保证主进程等待子进程运行结束后在结束,如何实现呢?
      • 主进程会在加上 join 的地方等待(也就是阻塞住),会等待子进程执行完之后,再继续往后执行主进程 join 后序的部分
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
import time

def func():
print('子进程正在执行......')
time.sleep(2)
print('子进程执行结束!')

if __name__ == '__main__':
print('主进程正在执行......')
p = Process(target=func)
p.start()
print('主进程执行结束!')
#发现:主进程执行结束后,子进程才执行结束!
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
from multiprocessing import Process
import time
ticketNum = 10 #全部的车票
def func(num):
print('我是子进程,我要购买%d张票!'%num)
global ticketNum
ticketNum -= num
time.sleep(2)

if __name__ == '__main__':
p = Process(target=func,args=(3,))
p.start()
#主进程在子进程结束之后在结束
p.join() #只有当子进程结束后,join的调用结束,才会执行join后续的操作
print('目前剩余车票数量为:',ticketNum) #输出结果依然是10
#进程和进程之间是完全独立。两个进程对应的是两块独立的内存空间,每一个进程只可以访问自己内存空间里的数据。
  • 如果主进程的查询结果是在 2s 中后才出现的,则 join 生效了。但是查询结果为什么是这样的呢?
    • 首先,ticketNum = 10 这个变量是存在于主进程中的,然后再 func 函数中 ticketNum 则是将全局变量 ticketNum 的值拷贝到了子进程中的 ticketNum 变量中,因此在 func 中的减法操作只能作用在子进程的变量中。最终,最后一行主进程打印的 ticketNum 则是原来主进程未发生变量的值。
  • 如何解决?(自己可以尝试文件共享)
    • 进程通信机制,管道,信号量等 (没必要掌握,日后用不到)
  • 继续思考:一个子进程函数的返回值如何被主进程获取?
  • 总结:进程之间的数据是隔离的,也就是数据不共享
  • 那怎么样开启多个进程呢?
    • 需求是:所有的子进程异步执行,然后所有的子进程全部执行完之后,在结束主进程,怎么搞?
      • 重点理解下两组代码的不同之处:(两种不同位置调用 join 的区别)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import time
from multiprocessing import Process
def get_request(url):
print('正在请求网址的数据:',url)
time.sleep(2)
print('请求结束:',url)

if __name__ == "__main__":
start = time.time()
urls = ['www.1.com','www.2.com','www.3.com']
for url in urls:
#创建了三个进程,表示三组任务
p = Process(target=get_request,args=(url,))
p.start()
#主进程每创建一个子进程就要执行一次join操作,就要等待创建好的子进程执行结束后才可以继续创建下一个子进程。
p.join() #整个异步效果消失了
print('总耗时:',time.time()-start)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import time
from multiprocessing import Process
def get_request(url):
print('正在请求网址的数据:',url)
time.sleep(2)
print('请求结束:',url)

if __name__ == "__main__":
start = time.time()
urls = ['www.1.com','www.2.com','www.3.com']
ps = [] #存储创建好的多个子进程
for url in urls: #三个子进程创建且启动了,将三个子进程存储到了ps列表中
#创建了三个进程,表示三组任务
p = Process(target=get_request,args=(url,))
p.start()
ps.append(p)

for p in ps: #主进程等待三个子进程结束后再结束
p.join()

print('总耗时:',time.time()-start)

进程创建的第二种方式(继承)

  • 自定义类继承 Process
  • 重写父类的 run 方法
  • 在类外部调用 start 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from multiprocessing import Process
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name = name
#重写父类的run方法
def run(self): #该方法就是用来表示当前进程对象执行的任务
print('当前%s子进程正在执行',self.name)
def func(self):
print('我就是一个普通的实例方法!')

if __name__ == '__main__':
p = MyProcess('bobo')
p.start() #启动进程,相当于是在调用对象的run方法

# 守护进程

那么如果有一天我们的需求是我的主进程结束了,由我主进程创建的那些子进程必须跟着结束,怎么办?守护进程就来了!

主进程创建守护进程后:

其一:守护进程会在主进程代码执行结束后就终止

其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children

语法:

子进程对象.daemon = True

注意:主进程代码运行结束,守护进程随即终止

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
import time
from multiprocessing import Process
def get_request(url):
print('正在请求网址的数据:',url)
time.sleep(2)
print('请求结束:',url)

if __name__ == "__main__":
start = time.time()
p = Process(target=get_request,args=('www.1.com',))
# 将当前p这个子进程设置为了守护进程
p.daemon = True #该操作必须放置在子进程启动操作之前
p.start()

print('主进程执行结束')

# 进程同步 (锁)

通过刚刚的学习,我们千方百计实现了程序的异步,让多个任务可以同时在几个进程中并发处理,但是它们之间的运行没有顺序,一旦开启也不受我们控制。

尽管并发编程让我们能更加充分的利用计算机的资源,但是也给我们带来了新的问题:进程之间数据不共享,但是共享同一套文件系统,所以访问同一个文件是没有问题的,要是对同一文件进行读写操作呢?要知道共享带来的是竞争,竞争带来的结果就是错乱,如何控制,就是加锁处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import os
import time
import random
from multiprocessing import Process

def work(n):
print('%s: %s is running' %(n,os.getpid()))
time.sleep(random.random())
print('%s:%s is done' %(n,os.getpid()))

if __name__ == '__main__':
for i in range(5):
p=Process(target=work,args=(i,))
p.start()

通过结果可以看出两个问题:

	问题一:每个进程中work函数的第一个打印不一定会是按照我们for循环的0-4的顺序来打印

	问题二:每个work进程中有两个打印,但是所有进程中第一个打印的顺序和第二个打印顺序完全不同且无规律,说明我们一个进程中的程序的执行顺序都混乱了。

问题的解决方法,第二个问题加锁来解决,第一个问题是没有办法解决的,因为进程开到了内核,有操作系统来决定进程的调度,我们自己控制不了

加锁流程:

1.导包:from multiprocessing import Lock

2.加锁:lock.acquire()

3.解锁:lock.release()

接下来,我们以模拟抢票为例,来看看数据安全的重要性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
#在当前目录下创建一个文件(db)
#文件db的内容:{"count":1}表示的是余票数量
from multiprocessing import Process
import time,json,random
def search():#查询db文件中的余票数量
fp = open('./db.txt','r')
dic = json.load(fp) #反序列化,将文件中的json数据转成python字典对象
print('剩余车票数量为:',dic['count'])

def get(): #负责抢票,一次只能购买一张票
fp = open('./db.txt', 'r')
dic = json.load(fp)
time.sleep(0.1)
if dic['count'] > 0:#还有剩余车票
time.sleep(0.2)
dic['count'] -= 1 #一次只能购买一张票
time.sleep(0.1)
#购买车票后,余票数量发生了变化,将最新的余票数量在写回到db文件中进行存储
json.dump(dic,open('./db.txt','w'))
print('购票成功!')
def task():
search() #先查询
get() #后购买

if __name__ == '__main__':
for i in range(3):#创建三个子进程
p = Process(target=task)
p.start()

加锁后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
from multiprocessing import Process
import time,json,random
from multiprocessing import Lock #进程锁
def search():#查询db文件中的余票数量
fp = open('./db.txt','r')
dic = json.load(fp) #反序列化,将文件中的json数据转成python字典对象
print('剩余车票数量为:',dic['count'])

def get(): #负责抢票,一次只能购买一张票
fp = open('./db.txt', 'r')
dic = json.load(fp)
time.sleep(0.1)
if dic['count'] > 0:#还有剩余车票
time.sleep(0.2)
dic['count'] -= 1 #一次只能购买一张票
time.sleep(0.1)
#购买车票后,余票数量发生了变化,将最新的余票数量在写回到db文件中进行存储
json.dump(dic,open('./db.txt','w'))
print('购票成功!')
def task(lock):
lock.acquire() #上锁
search() #先查询
get() #后购买
lock.release() #解锁

if __name__ == '__main__':
lock = Lock() #创建了一把进程锁
for i in range(3):#创建三个子进程
p = Process(target=task,args=(lock,))
p.start()

# 线程

# 基本概念

** 线程:** 线程是操作系统能够进行运算调度的最小单位(车间里的工人),它被包含在进程之中,线程是进程中的实际运作单位。一个进程中可以并发多个线程,每条线程并行执行不同的任务。

注意:

1.同一个进程内的多个线程是共享该进程的资源的,不同进程内的线程资源肯定是隔离的

2.创建线程的开销比创建进程的开销要小的多

3.每一个进程中至少会包含有一个线程,该线程叫做"主线程"

思考:多线程可以实现并行吗?

  1. 在 CPU 比较繁忙,资源不足的时候(开启了很多进程),操作系统只为一个含有多线程的进程分配仅有的 CPU 资源,这些线程就会为自己尽量多抢时间片,这就是通过多线程实现并发,线程之间会竞争 CPU 资源争取执行机会。
  2. 在 CPU 资源比较充足的时候,一个进程内的多线程,可以被分配到不同的 CPU 资源,这就是通过多线程实现并行。
  3. 至于多线程实现的是并发还是并行?上面所说,所写多线程可能被分配到一个 CPU 内核中执行,也可能被分配到不同 CPU 执行,分配过程是操作系统所为,不可人为控制。所有,如果有人问我我所写的多线程是并发还是并行的?我会说,都有可能。
  4. 不管并发还是并行,都提高了程序对 CPU 资源的利用率,最大限度地利用 CPU 资源。

# Python 实现线程

# python 线程模块的选择

Python 提供了几个用于多线程编程的模块,包括 thread、threading 和 Queue 等。thread 和 threading 模块允许程序员创建和管理线程。thread 模块提供了基本的线程和锁的支持,threading 提供了更高级别、功能更强的线程管理的功能。Queue 模块允许用户创建一个可以用于多个线程之间共享数据的队列数据结构。

由于更高级别的 threading 模块更为先进,对线程的支持更为完善,因此推荐大家使用该模块!

# Threading 模块
  • 我们先简单应用一下 threading 模块来看看并发效果:
1
2
3
4
5
6
7
8
from threading import Thread
def func(num):
print('num的值是:',num)

if __name__ == '__main__':
#创建好了一个子线程(在主线程中创建)
t = Thread(target=func,args=(1,))
t.start()
1
2
3
4
5
6
7
8
9
10
11
from threading import Thread
import time
def func(num):
time.sleep(1)
print('num的值是:',num)

if __name__ == '__main__':
for i in range(3):
#创建好了一个子线程(在主线程中创建)
t = Thread(target=func,args=(1,))
t.start()
  • 线程的两种创建方式
1
2
3
4
5
6
7
8
9
10
11
12
13
14
from threading import Thread
import time
class MyThread(Thread):
def __init__(self):
super().__init__()

def run(self):
print('当前子线程正在执行')
time.sleep(1)
print('当前子线程执行结束')

for i in range(3):
t = MyThread() #创建线程对象
t.start() #启动线程对象
  • join () 方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from threading import Thread
import time
class MyThread(Thread):
def __init__(self):
super().__init__()

def run(self):
print('当前子线程正在执行')
time.sleep(2)
print('当前子线程执行结束')

if __name__ == '__main__':
start = time.time()

ts = []
for i in range(3):
t = MyThread() #创建线程对象
t.start() #启动线程对象
ts.append(t)
for t in ts:
t.join()
print('总耗时:',time.time()-start)
  • 线程内存数据共享:
1
2
3
4
5
6
7
8
9
10
from threading import Thread
import time
def work():
global n
n = 0 #将全局变量修改为了0
if __name__ == '__main__':
n = 1 #全局变量
t = Thread(target=work)
t.start()
print(n) #在进程中输出全局变量的值就是线程修改后的结果为0
  • 守护线程
    • 无论是进程还是线程,都遵循:守护 xx 会在主 xx 运行完毕后被销毁,不管守护 xx 时候被执行结束。
1
2
3
4
5
6
7
8
9
10
from threading import Thread
import time
def work():
time.sleep(1)
print('子线程正在执行!')
if __name__ == '__main__':
t = Thread(target=work)
t.daemon = True #当前的子线程设置为了守护线程
t.start()
print('主线程结束!')
  • 多线程使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from threading import Thread
import time
class MyThread(Thread):
def __init__(self):
super().__init__()

def run(self):
print('当前子线程正在执行')
time.sleep(2)
print('当前子线程执行结束')

if __name__ == '__main__':
start = time.time()

ts = []
for i in range(3):
t = MyThread() #创建线程对象
t.start() #启动线程对象
ts.append(t)
for t in ts:
t.join()
print('总耗时:',time.time()-start)

# 线程的 GIL 锁(大致了解)

首先,一些语言(java、c++、c)是支持同一个进程中的多个线程是可以应用多核 CPU 的,也就是我们会听到的现在 4 核 8 核这种多核 CPU 技术的厉害之处。

那么我们之前说过应用多进程的时候如果有共享数据是不是会出现数据不安全的问题啊,就是多个进程同时一个文件中去抢这个数据,大家都把这个数据改了,但是还没来得及去更新到原来的文件中,就被其他进程也计算了,导致数据不安全的问题啊,所以我们是不是通过加锁可以解决啊,多线程大家想一下是不是一样的,并发执行也会有这个数据安全的问题。如何解决呢?

但是 python 最早期的时候对于多线程也加锁,但是 python 比较极端的加了一个 GIL 全局解释锁,锁的是整个线程,而不是线程里面的某些数据操作,也就是说每次只能有一个线程使用 cpu,也就说多线程用不了多核实现并行。

但是这个并不是 python 语言的问题,是 CPython 解释器的特性,在 Cpython 里面就是没办法用多核,这是 python 的弊病,历史问题,虽然众多 python 团队的大神在致力于改变这个情况,但是暂没有解决。

# GIL 介绍

在同一个进程中只有一个线程可以获取 cpu 的使用权限,那么其他的线程就必须等待该线程的 cpu 使用权消失后才能使用 cpu, 即使多个线程直接不会相互影响,在同一个进程下也只有一个线程使用 cpu,这样的机制称为全局解释器锁(GIL)。每一个 Python 线程,在 CPython 解释器中执行时,都会先锁住自己的线程,阻止别的线程执行。

GIL 的优点:

1、避免了大量的加锁解锁的繁琐操作

2、使数据更加安全,解决多线程间的数据完整性和状态同步

缺点:

		多核处理器的效果退化成单核处理器,只能并发不能并行

# GIL 与多线程

有了 GIL 的存在,虽然可以保证数据的安全,但是同一时刻同一进程中只有一个线程被执行

听到这里,有的同学立马质问:进程可以利用多核,但是开销大,而 python 的多线程开销小,但却无法利用多核优势,也就是说 python 没用了,php 才是最好的语言?

别着急啊,我们还没讲完:要解决这个问题,我们需要在几个点上达成一致:

1
2
3
4
5
1. cpu到底是用来做计算的,还是用来做I/O的?

2.cpu,意味着可以有多个核并行完成计算,所以多核提升的是计算性能

3. 每个cpu一旦遇到I/O阻塞,仍然需要等待,所以多核对I/O操作没什么用处

就好比是:

	一个工人相当于cpu,那么计算相当于工人在干活,I/O阻塞相当于为工人干活提供所需原材料的过程,工人干活的过程中如果没有原材料了,则工人干活的过程需要停止,直到等待原材料的到来。

如果你的工厂干的大多数任务都要有准备原材料的过程(I/O 密集型),那么你有再多的工人,意义也不大。

反过来讲,如果你的工厂原材料都齐全,那当然是工人越多,效率越高。

结论:

对计算来说,cpu 越多越好,但是对于 I/O 来说,再多的 cpu 也没用

当然对运行一个程序来说,随着 cpu 的增多执行效率肯定会有所提高(不管提高幅度多大,总会有所提高),这是因为一个程序基本上不会是纯计算或者纯 I/O,所以我们只能相对的去看一个程序到底是计算密集型还是 I/O 密集型,从而进一步分析 python 的多线程到底有无用武之地。

计算密集型案例:多进程效率高

os.cpu_count ():查看计算机的核数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from multiprocessing import Process
from threading import Thread
import time,os
def word(): #计算密集型任务
res = 1
for i in range(1000000):
res *= i

if __name__ == '__main__':
print('当前计算机的核数为:',os.cpu_count())
start = time.time()
ps = []
for i in range(100):
#使用多进程来处理计算密集型任务(利用多核优势【并行】)
p = Process(target=word) #执行耗时1s
# p = Thread(target=word) #执行耗时2s

p.start()
ps.append(p)
for p in ps:
p.join()
print('总耗时:',time.time()-start)

密集 IO 型:多线程效率高

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
from multiprocessing import Process
from threading import Thread
import time,os
def word(): #IO密集型任务
time.sleep(2)
print('------------------')

if __name__ == '__main__':
print('当前计算机的核数为:',os.cpu_count())
start = time.time()
ps = []
for i in range(999):
# p = Process(target=word) #执行耗时11s
p = Thread(target=word) #执行耗时2s

p.start()
ps.append(p)
for p in ps:
p.join()
print('总耗时:',time.time()-start)

# 同步锁

  • 什么是同步锁?
    • 一个进程中的一个线程只能使用一个 cpu。要保证一个线程对应的某些操作在一段时间内被完整的直接完毕所加入的锁就是同步锁。
  • 为什么用同步锁?
    • 因为有可能当一个线程在使用 cpu 时,该线程下的程序可能会遇到 io 阻塞操作,那么 cpu 就会切到别的线程上去,这样就有可能会影响到该程序结果的完整性。
  • 使用:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
#未上锁
from threading import Thread,Lock
import time,random
def work():
global n
temp = n
time.sleep(random.random())
n = temp - 1

if __name__ == '__main__':
n = 10 #全局变量
ts = []
for i in range(10):
t = Thread(target=work)
t.start()
ts.append(t)
for t in ts:
t.join()

print('全局变量n的值为:',n)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
#上锁
from threading import Thread,Lock
import time,random
def work():
global n
lock.acquire() #上锁
temp = n
time.sleep(random.random())
n = temp - 1
lock.release() #解锁

if __name__ == '__main__':
n = 10 #全局变量
ts = []
lock = Lock()
for i in range(10):
t = Thread(target=work)
t.start()
ts.append(t)
for t in ts:
t.join()

print('全局变量n的值为:',n)

总结:保护不同的数据就应该加不同的锁。 GIL 与 Lock 是两把锁,保护的数据不一样,前者是解释器级别的(当然保护的就是解释器级别的数据,比如垃圾回收的数据),后者是保护用户自己开发的应用程序的数据,很明显 GIL 不负责这件事,只能用户自定义加锁处理,即 Lock

# 线程池

线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和系统稳定性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from multiprocessing.dummy import Pool #导入了线程池模块
import time
urls = ['www.1.com','www.2.com','www.3.com','www.4.com','www.5.com']
def get_reqeust(url):
print('正在请求数据:',url)
time.sleep(2)
print('请求结束:',url)
start = time.time()
#创建一个线程池,开启了5个线程
pool = Pool(5)
#可以利用线程池中三个线程不断的去处理5个任务
pool.map(get_reqeust,urls)
#get_reqeust函数调用的次数取决urls列表元素的个数
#get_requests每次执行都会接收urls列表中的一个元素作为参数

print('总耗时:',time.time()-start)
pool.close() #释放线程池

python---并发编程
https://rofgd.github.io/2020/10/17/python---并发编程/
作者
ReadPond
发布于
2020年10月17日
许可协议