3.7. queue — 线程安全的 fifo 队列 | 数据结构 |《python 3 标准库实例教程》| python 技术论坛-380玩彩网官网入口
目的:提供线程安全的fifo实现
queue
模块提供了一个适用于多线程编程的先进先出(fifo)数据结构。它可以用来安全地在生产者和消费者线程之间传递消息或其他数据。锁是调用者来处理的,所有多个线程能够安全且容易的使用同样的queue
实例工作。queue
的大小(它包含的元素的数量)可能会受到限制,以调节内存的适用或处理。
注意
这个讨论假定你已经理解了队列的一般性质。如果您不了解,您可能需要阅读一些参考资料,然后再继续。
基本的 fifo 队列
queue
类实现了一个基本的先进先出的容器。使用 put()
将元素添加到序列的一端,并适用 get()
从另一端移除。
queue_fifo.py
import queue
q = queue.queue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
本示例使用单个线程来说明将元素按照插入顺序从队列中删除。
$ python3 queue_fifo.py
0 1 2 3 4
lifo queue (后进先出型队列)
与标准 fifo (先进先出型队列)队列 queue
相反, lifoqueue
使用的是后进先出的模式(与普通的栈结构类似)。
queue_lifo.py
import queue
q = queue.lifoqueue()
for i in range(5):
q.put(i)
while not q.empty():
print(q.get(), end=' ')
print()
最后一次被 put
进队列的数据会首先被 get
出来,看执行结果:
$ python3 queue_lifo.py
4 3 2 1 0
priority queue (优先队列)
有时,我们想要以队列中数据的某些属性为序进行处理。比如,工资部门想要打印出的工作名单可能与开发者所想的不一样。这时使用 priorityqueue
就比较好,它内部进行的排序方式是以其数据而定,而不是数据添加到队列中时的顺序。
queue_priority.py
import functools
import queue
import threading
@functools.total_ordering
class job:
def __init__(self, priority, description):
self.priority = priority
self.description = description
print('new job:', description)
return
def __eq__(self, other):
try:
return self.priority == other.priority
except attributeerror:
return notimplemented
def __lt__(self, other):
try:
return self.priority < other.priority
except attributeerror:
return notimplemented
q = queue.priorityqueue()
q.put(job(3, 'mid-level job'))
q.put(job(10, 'low-level job'))
q.put(job(1, 'important job'))
def process_job(q):
while true:
next_job = q.get()
print('processing job:', next_job.description)
q.task_done()
workers = [
threading.thread(target=process_job, args=(q,)),
threading.thread(target=process_job, args=(q,)),
]
for w in workers:
w.setdaemon(true)
w.start()
q.join()
例子中使用了多个线程来取出工作信息,当 get()
发生时,所取出的数据都是基于各个数据的优先级(例子中是 priority 属性)来返回的。消费者线程运行时,其处理队列中的数据的顺序取决于线程上下文切换。
$ python3 queue_priority.py
new job: mid-level job
new job: low-level job
new job: important job
processing job: important job
processing job: mid-level job
processing job: low-level job
基于线程的播客客户端
本播客客户端代码用于演示 queue
类如何同多个线程一起工作。本程序会读取一个或多个 rss 源,并从每个源里获取出5个最新的广播数据添加到下载队列中,之后使用线程以并行的方式同时进行多个下载,其结构的部署正好演示了 queue
模块的使用。
首先,创建一些后续所需要参数,通常情况下,这些参数应该来自于用户输入(比如用户输入自己喜欢的源或者从数据库中获取)。本例使用硬编码的值来进行演示。
fetch_podcasts.py
from queue import queue
import threading
import time
import urllib
from urllib.parse import urlparse
import feedparser
# 定义一些全局变量
num_fetch_threads = 2
enclosure_queue = queue()
# 真实场景不会使用硬编码数据。
feed_urls = [
'http://talkpython.fm/episodes/rss',
]
def message(s):
print('{}: {}'.format(threading.current_thread().name, s))
download_enclosures()
函数会在工作线程中运行,同时使用 urllib
来进行下载。
def download_enclosures(q):
"""
本函数是一个工作线程函数。
本函数会不断处理从队列中取出的数据。
这些守护线程会被放入一个无限循环中,只有当主线程结束时才会退出。
"""
while true:
message('looking for the next enclosure')
url = q.get()
filename = url.rpartition('/')[-1]
message('downloading {}'.format(filename))
response = urllib.request.urlopen(url)
data = response.read()
# 将所下载的文件保存在当前目录中
message('writing to {}'.format(filename))
with open(filename, 'wb') as outfile:
outfile.write(data)
q.task_done()
一旦定义了线程的目标函数( target 参数),就可以开始这个线程了。当 download_enclosure()
函数执行到 url = q.get()
时,就会阻塞并且等待队列里有什么数据返回。也就是说在队列中有任何数据前,启动线程也是安全的。
# 设置一些线程来获取广播数据
for i in range(num_fetch_threads):
worker = threading.thread(
target=download_enclosures,
args=(enclosure_queue,),
name='worker-{}'.format(i),
)
worker.setdaemon(true)
worker.start()
下一步我们使用 feedparser
模块来取得其中的内容,并把广播数据的 url 塞入队列。只要一有 url 被塞入队列,其中的一个工作线程就会获取出它并开始下载。这个循环会持续不断地添加数据进队列直到这个源被榨干,同时工作线程也会不断轮流从队列中获取出 url 并进行下载。
# 下载源的内容并将其中的 url 塞入队列。
for url in feed_urls:
response = feedparser.parse(url, agent='fetch_podcasts.py')
for entry in response['entries'][:5]:
for enclosure in entry.get('enclosures', []):
parsed_url = urlparse(enclosure['url'])
message('queuing {}'.format(
parsed_url.path.rpartition('/')[-1]))
enclosure_queue.put(enclosure['url'])
唯一不要忘记做的,就是等待队列再次被榨干,请使用 join()
。
# 等待队列被榨干,也就是我们已经下载了所有要下载的东西。
message('*** main thread waiting')
enclosure_queue.join()
message('*** done')
运行的话会出现类似的打印信息
$ python3 fetch_podcasts.py
worker-0: looking for the next enclosure
worker-1: looking for the next enclosure
mainthread: queuing turbogears-and-the-future-of-python-web-frameworks.mp3
mainthread: queuing continuum-scientific-python-and-the-business-of-open-source.mp3
mainthread: queuing openstack-cloud-computing-built-on-python.mp3
mainthread: queuing pypy.js-pypy-python-in-your-browser.mp3
mainthread: queuing machine-learning-with-python-and-scikit-learn.mp3
mainthread: *** main thread waiting
worker-0: downloading turbogears-and-the-future-of-python-web-frameworks.mp3
worker-1: downloading continuum-scientific-python-and-the-business-of-open-source.mp3
worker-0: looking for the next enclosure
worker-0: downloading openstack-cloud-computing-built-on-python.mp3
worker-1: looking for the next enclosure
worker-1: downloading pypy.js-pypy-python-in-your-browser.mp3
worker-0: looking for the next enclosure
worker-0: downloading machine-learning-with-python-and-scikit-learn.mp3
worker-1: looking for the next enclosure
worker-0: looking for the next enclosure
mainthread: *** done
实际输出取决于 rss 源的内容。
参考
- 中的
- -- 维基百科解读队列
- -- 维基百科解读先进,先出的数据结构
- -- 一个用于编译 rss 和 atom 源的模块,由 mark pilgrim 创建,kurt mckee 维护。
本译文仅用于学习和交流目的,转载请务必注明文章译者、出处、和本文链接
我们的翻译工作遵照 cc 协议,如果我们的工作有侵犯到您的权益,请及时联系380玩彩网官网入口。