Queue: 应用于生产者-消费者模式的Python队列

queue 2.jpg
图片来源于网络

版权声明

© 著作权归作者所有
允许自由转载,但请保持署名和原文链接。 不允许商业用途、盈利行为及衍生盈利行为。

什么是Queue?

Queue是Python标准库中的线程安全的队列(FIFO)实现,提供了一个适用于多线程编程的先进先出的数据结构,即队列,用来在生产者和消费者()线程之间的信息传递。

为什么使用Queue,而不是list或者dict?

  • list或者dict是非线程安全的,Queue是线程安全的
  • 也即意味着:如果使用list或者dict,我们必须把它放到lock程序块中(acquire和release),以防止发生竞态条件。
  • 使用list或者dict,需要考虑线程同步的问题,即需要额外考虑wait和notify。
  • 生成者不能向满队列添加数据,如果使用list或者dict,需要额外的代码实现。
  • Queue则封装了Condition行为,wait() notify() acquire() release() 满队列问题等等,无须额外考虑。

先来了解一些概念

生产者-消费者模式(Producer-Consumer)

Producer-Consumer模式是多线程编程中最常用的设计模式。生产者负责生产数据,并将数据存入队列,消费负责消费数据,不断从队列中取数据来使用。这里面有两个条件:

  1. 必须满足线程互斥条件:任何时候最多只允许一个线程访问数据,其他线程必须等待。这称为线程互斥。

  2. 必须满足线程同步条件:线程同步是指线程之间所具有的一种制约关系,一个线程的执行依赖另一个线程的消息,当它没有得到另一个线程的消息时应等待,直到消息到达时才被唤醒。举个例子:在线程方式下,生产者和消费者各自是一个线程。生产者把数据写入队列,消费者从队列读出数据。当队列为空,消费者就阻塞等待(稍事休息);当队列满(达到最大长度),生产者就阻塞等待。

线程阻塞

线程阻塞通常是指一个线程在执行过程中暂停,以等待某个条件的触发。比如一个线程原子操作下,其他的线程都是阻塞状态的;比如input语句等待用户的输入,线程也是阻塞状态;比如当队列任务为空的时候,线程等待新的任务,这时候线程也是阻塞的。

线程安全

比如一个 ArrayList 类,在添加一个元素的时候,它可能会有两步来完成:

  1. 在 Items[Size] 的位置存放此元素;
  2. 增大 Size 的值。

在单线程运行的情况下,如果 Size = 0,添加一个元素后,此元素在位置 0,而且 Size=1;而如果是在多线程情况下,比如有两个线程,线程 A 先将元素1存放在位置 0。但是此时 CPU调度线程A暂停,线程B得到运行的机会。线程B向此 ArrayList 添加元素2,因为此时 Size 仍然等于0(注意,我们假设的是添加一个元素是要两个步骤,而线程A仅仅完成了步骤1),所以线程B也将元素存放在位置0。然后线程A和线程B都继续运行,都增加Size的值,结果Size等于2。 那好,我们来看看 ArrayList的情况,期望的元素应该有2个,而实际元素是在0位置,造成丢失元素,而且Size 等于 2。这就是“线程不安全”了。

原子操作

原子(atom)本意是“不能被进一步分割的最小粒子”,而原子操作(atomic operation)意为"不可被中断的一个或一系列操作"。如变量的赋值,不可能一个线程在赋值,到一半切到另外一个线程工作去了,这是原子操作。但是一些数据结构的操作,比如上述ArrayList的例子,添加元素是分成两个步骤的,所以必须要加锁。加锁后的操作就可以认为是原子的了。

举个小栗子来加深理解:

原来在银行办理业务是排队的形式的,虽然也是多线程(多个窗口),但是经常出现有些柜台人多、有些柜台人少,或者有些柜台办理完了,有些柜台还排着长队,这时候就需要人工来干预,很麻烦,效率不高。

现在的银行都是叫号系统,这是一个典型的生产者-消费者模式。

银行提供四排座椅(队列),每人手上拿一个号,先来先办,后来后办(先进先出 First in First out);

由叫号机(生产者)来打印一个号,来一个顾客打印一个号,完了塞到队尾;当然可以设置一个队列最大数,比如100人,超过100人在队列里,叫号机就不打印,直到队列有空闲位置。

银行开多个窗口(多个消费者线程)从队列里叫号,办完一个,再叫一个。办理的业务是原子性的(存或者取都在一个人手上完成,中间流程不可分割)。也不会同时有多个柜台操作你的帐户,所以是线程安全的。

如果都办理完了,叫号机和柜台都陷入了等待状态,打一会儿瞌睡,线程阻塞,直到有新的顾客来办理业务。

三种形式的Queue:

FIFO队列

fifo.gif
图片来源于网络

class Queue.Queue(maxsize=0)
FIFO即First in First Out,先进先出。Queue提供了一个基本的FIFO容器,使用方法很简单,maxsize是个整数,指明了队列中能存放的数据个数的上限。一旦达到上限,插入会导致阻塞,直到队列中的数据被消费掉。如果maxsize小于或者等于0,队列大小没有限制。

LIFO队列

ExecutionStack.gif
图片来源于网络

class Queue.LifoQueue(maxsize=0)
LIFO即Last in First Out,后进先出。与栈的类似,使用也很简单,maxsize用法同上

Priority队列

class Queue.PriorityQueue(maxsize=0)
构造一个优先队列。优先级级别越低的越先出来,maxsize用法同上。

常用方法

创建一个队列

# maxsize 表示队列长度,小于1表示队列长度无限。
import Queue
q = Queue.Queue(maxsize=10)

将一个值放入队列

# put(item[,block[,timeout]])
# 参数item为必选参数
# block 为可选参数,默认为True
# 如果队列为空且block=True,put()使得调用线程阻塞,直到空出一个数据单元。
# 如果队列为空且block=False,put()将抛出Full异常。
# 将10插入队尾
q.put(10)

将一个值从队列中取出

# get([block[,timeout]])
# 参数block为可选参数,默认为True
# 如果队列为空且block=True,get()使得调用线程阻塞,直到有新数据产生。
# 如果队列为空且block=False,get()将抛出Empty异常。
# 从对列头部取出一个数据
q.get()

获取队列的大小

q.qsize()

判断队列是否为空

# 队列为空返回True,反之返回False
q.empty()

判断队列是否已满

# 队列已满返回True,反之返回False
q.full()

task_done()

意味着之前入队的一个任务已经完成。由队列的消费者线程调用。每一个get()调用得到一个任务,接下来的task_done()调用告诉队列该任务已经处理完毕。

如果当前一个join()正在阻塞,它将在队列中的所有任务都处理完时恢复执行(即每一个由put()调用入队的任务都有一个对应的task_done()调用)。

join()

阻塞调用线程,直到队列中的所有任务被处理掉。

只要有数据被加入队列,未完成的任务数就会增加。当消费者线程调用task_done()(意味着有消费者取得任务并完成任务),未完成的任务数就会减少。当未完成的任务数降到0,join()解除阻塞。

其他

q.put_nowait(item) == q.put(item,False)
q.get_nowait() == q.get(False)

一个最简单的生产者-消费者模式

# coding: utf-8
# filename: queue.py
# author: caord@showwant.com

import time
import Queue
import threading

class Producer(threading.Thread):
    def __init__(self, thread_name, queue):
        threading.Thread.__init__(self, name=thread_name)
        self.data = queue

    def run(self):
        for i in range(20):
            print('%s:%s is producing %d to the queue!' % (time.ctime(), self.getName(), i))
            self.data.put(i)
            time.sleep(1)
        print('%s: %s finished!' % (time.ctime(), self.getName()))
        time.sleep(10)
        for i in range(10):
            self.data.put(i)


class Consumer(threading.Thread):
    def __init__(self, thread_name, queue):
        threading.Thread.__init__(self, name=thread_name)
        self.data = queue

    def run(self):
        while 1:
            try:
                num = self.data.get()
                print('%s: %s is consuming. %d in the queue is consumed!' % (time.ctime(), self.getName(), num))
            except:
                print('%s: %s finished!' % (time.ctime(), self.getName()))
            #break


def main():
    queue = Queue.Queue(maxsize=20)
    producer = Producer('producer', queue)
    consumer = Consumer('consumer', queue)
    producer.start()
    consumer.start()
    producer.join()
    consumer.join()
    print('All thread finished')


if __name__ == '__main__':
    main()

标签: python, 队列, queue

已有 7 条评论

  1. 葫芦岛网站建设 葫芦岛网站建设

    这个好,正需要,帮我解决了问题,感谢!

  2. 压力机 压力机

    您好,您的网站做的很不错,很漂亮,我已经收藏了,方便我随时访问.

  3. 板机 板机

    感谢分享!!

  4. 加气加油 加气加油

    感谢分享

  5. 板机 板机

    感谢博主分享的文章

  6. 小卷 小卷

    谢谢博主的分享

  7. 头条 头条

    文章不错支持一下吧

添加新评论