多线程是个坑爹的课题,有多线程就有锁。Python中有低级线程模块_thread、再封装一下就是threading、再来一下就是from concurrent.futures import ThreadPoolExecutor。其中涉及到的锁大概有Lock、RLock、Condition、Semaphore、BoundedSeamphore、Event。这么多看起来怪吓人的,其实这么多的锁全部只是由Lock演化出来的
 
Lock Lock = _thread._allocate_lock 这个是直接由最低级别的_thread直接引用过来的。创建锁的时候是处于未被锁定的状态。这应该是最容易被理解和使用的一种锁了。刚学习的时候铁定写过类似下面这种示例。本质原因是python中的原子操作是针对单条指令。而a+=1被翻译成了多条指令。执行过程中任何时刻可能被打断
1 2 3 4 5 6 7 8 9 10 11 a = 0  def  change ():    global  a     for  i in  range (100000 ):         a += 1  t = [threading.Thread(target=change) for  i in  range (100 )] [i.start() for  i in  t] [i.join() for  i in  t] print(a) 
 
解决方法就是所有线程共享同一个锁,虽然锁有很多种,但是几乎都是这样暴露给用户使用的  ,这是大家都知道的。一个锁都好理解。如果有两个锁就不是那么好理解了,比如这个有点装逼的例子。交叉打印Hello和World
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 lock_a = Lock() lock_b = Lock() def  hello ():    for  i in  range (5 ):         lock_a.acquire()         time.sleep(0.1 )         print("Hello" )         lock_b.release() def  world ():    for  i in  range (5 ):         lock_b.acquire()         time.sleep(0.1 )         print("World" )         lock_a.release() Thread(target=hello).start() Thread(target=world).start() 
 
RLock 可重入锁允许线程获得锁之后该线程可以无数次的再次获得锁。看起来没什么用,大概也确实没啥用吧,不过考虑一下下面这种场景
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 class  Change :    def  __init__ (self ):         self.a = 1          self.b = 1          self.lock = RLock()     def  adda (self ):         with  self.lock:             self.a += 1      def  addb (self ):         with  self.lock:             self.b += 1      def  addab (self ):         with  self.lock:             self.adda()             self.addb() 
 
你有一个类。其中adda改变a,addb改变b.还提供方法addab同时调用前两者。如果没有可重入锁那么你将不得不把两个函数粘贴到addab下面。它的实现原理是获取锁的时候根据get_ident得到当前线程标识。如果和当前锁的标识一样则仅仅是给值加1,否则就尝试去获取锁。简要代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 from  threading import  Lock, get_identclass  RLock :    def  __init__ (self ):         self.lock = Lock()         self._owner = None          self._count = 0      def  acquire (self ):         me = get_ident()         if  self._owner == me:             self._count += 1              return          rc = self.lock.acquire()         if  rc:             self._owner = me             self._count = 1      __enter__ = acquire     def  release (self ):         if  self._owner != get_ident():             raise  RuntimeError()         self._count -= 1          if  self._count == 0 :             self._owner = None              self.lock.release()     def  __exit__ (self, exc_type, exc_val, exc_tb ):         self.release() 
 
Condition 被称为条件变量,这个应该是很少被使用的。它允许传入一个锁,当满足条件的时候触发通知(可以看一下queue模块的实现,当get不到数据的时候就执行wait等待,新加入数据的时候执行notify通知唤醒),这应该是它被称为条件变量的原因(感觉都有点牵强😕),和Lock以及RLock不同。这个东东主要是用来通知其他线程的。大概有两个方法wait(阻塞直到被通知),notify(唤醒wait)。 实现的思路是所有的线程共享同一个锁A,同时有一个共用的锁列表。每当调用wait的时候生成一个新的锁放入锁列表,然后获得该锁,再释放锁A(因为wait必须在获得锁A才能调用,如果不释放则其他线程无法获得锁A)。最绝的来了,再次获得新生成的锁造成死锁  。由此该线程被挂起。直到其他获得锁A的线程执行notify操作。将wait的锁释放。 实现代码如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 from  collections import  dequeclass  Condition :    def  __init__ (self ):         self._lock = RLock()         self._waiters = deque()     def  __enter__ (self ):         return  self._lock.__enter__()     def  __exit__ (self, *args ):         return  self._lock.__exit__(*args)     def  wait (self ):         assert  get_ident() == self._lock._owner         waiter = Lock()         self._waiters.append(waiter)         waiter.acquire()         self._lock.release()         waiter.acquire()         self._lock.acquire()         del  waiter     def  notify (self ):         assert  get_ident() == self._lock._owner         for  waiter in  self._waiters.copy():             waiter.release()             self._waiters.remove(waiter) 
 
这个东东吧其实不太适合被直接使用。但是网上还是有人硬生生凑了一个消费者生产者的例子出来做演示
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 import  threadingimport  randomimport  timeclass  Producer (threading.Thread ):    def  __init__ (self, integers, condition ):         super (Producer, self).__init__()         self.integers = integers         self.condition = condition     def  run (self ):         for  i in  range (10 ):             integer = random.randint(0 , 256 )             with  self.condition:                 self.integers.append(integer)                 self.condition.notify()                 time.sleep(0.5 ) class  Consumer (threading.Thread ):    def  __init__ (self, integers, condition ):         super (Consumer, self).__init__()         self.integers = integers         self.condition = condition     def  run (self ):         with  self.condition:             while  True :                 while  self.integers:                     integer = self.integers.pop()                     print(integer)                 self.condition.wait() integers = [] condition = Condition() Producer(integers, condition).start() Consumer(integers, condition).start() 
 
这个例子有个地方需要注意。notify和wait并不是一一对应的。不代表执行了notify,wait部分一定会被执行。所以如果消费者部分这样写会造成部分没有被处理
1 2 3 4 5 6 7 8 9 10 11 12 13 class  Consumer (threading.Thread ):    def  __init__ (self, integers, condition ):         super (Consumer, self).__init__()         self.integers = integers         self.condition = condition     def  run (self ):         with  self.condition:             while  True :                 self.condition.wait()                 integer = self.integers.pop()                 print(integer) 
 
Semaphore & BoundedSeamphore 上面先介绍Condition是有原因的,因为多线程信号量和事件都是基于它生成的。信号量常用于限制对有限资源的访问。比如你有1000个线程,如果每一个线程都去创建数据库连接,那么数据库可能会崩。或者爬虫创建对网站的连接,太过凶残并不好,这个时候用信号量来处理就不错。
这里如果直接使用Lock来构建。那么一个锁同时只允许一个线程来获得显然是不达不到要求的。祭出刚才构建的Condition
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 class  Semaphore :         def  __init__ (self, value=1  ):         self._lock = Condition()         self.value = value     def  __enter__ (self ):         with  self._lock:             if  self.value == 0 :                 self._lock.wait()             else :                 self.value -= 1      def  __exit__ (self, exc_type, exc_val, exc_tb ):         self.value += 1          self._lock.notify() class  BoundedSemaphore (Semaphore ):    def  __init__ (self, value=1  ):         super (BoundedSemaphore, self).__init__(value=value) 
 
Event Event和Condition差不多。区别就是上面提到的,使用Condition,如果在单线程,那么当notify的时候是无法触发wait的,再调用wait线程会被阻塞。但是Event就支持在单线程使用。  提供两个主要方法,set和wait。但是它并不需要先获得锁。使用好像更方便一点,实现如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 class  Event :    def  __init__ (self ):         self._cond = Condition()         self._flag = False      def  set (self ):         with  self._cond:             self._flag = True              self._cond.notify()     def  clear (self ):         with  self._cond:             self._flag = False      def  wait (self ):         with  self._cond:             signaled = self._flag             if  not  signaled:                 signaled = self._cond.wait()             return  signaled 
 
如果把刚才那个生产者消费者换到它上面来,大概是这个样子了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 import  threadingimport  randomimport  timeclass  Producer (threading.Thread ):    def  __init__ (self, integers, condition ):         super (Producer, self).__init__()         self.integers = integers         self.condition = condition     def  run (self ):         for  i in  range (10 ):             integer = random.randint(0 , 256 )             self.integers.append(integer)             self.condition.set ()             time.sleep(0.5 ) class  Consumer (threading.Thread ):    def  __init__ (self, integers, condition ):         super (Consumer, self).__init__()         self.integers = integers         self.condition = condition     def  run (self ):         while  True :             while  self.integers:                 integer = self.integers.pop()                 print(integer)             self.condition.wait() integers = [] condition = Event() Producer(integers, condition).start() Consumer(integers, condition).start() 
 
当然,没啥必要用这些比较低级的东东。能用通用高级数据结构就直接用,这样大家都比较好理解,总结就是只是给共享资源加锁用Lock、限制资源访问使用BoundedSeamphore,唤醒相关线程使用Event