如何在 Python 中使用线程?

我试图了解 Python 中的线程。我看过文档和示例,但是坦率地说,许多示例过于复杂,我难以理解它们。

您如何清楚地显示为多线程而划分的任务?

答案

自 2010 年提出这个问题以来,如何使用带有mappool 的 Python 进行简单的多线程处理已经有了真正的简化。

下面的代码来自一篇文章 / 博客文章,您绝对应该检出(没有从属关系)- 并行显示在一行中:更好的日常线程任务模型 。我将在下面进行总结 - 最终仅是几行代码:

from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)

这是以下内容的多线程版本:

results = []
for item in my_array:
    results.append(my_function(item))

描述

Map 是一个很棒的小功能,是轻松将并行性注入 Python 代码的关键。对于那些不熟悉的人,地图是从 Lisp 之类的功能语言中提炼出来的。它是将另一个功能映射到序列上的功能。

Map 为我们处理序列上的迭代,应用函数,并将所有结果存储在最后的方便列表中。

在此处输入图片说明


实作

map 函数的并行版本由以下两个库提供:multiprocessing,以及鲜为人知但同样出色的 step child:multiprocessing.dummy。

multiprocessing.dummy与多处理模块完全相同, 但是使用线程代替一个重要的区别 - 使用多个进程来执行 CPU 密集型任务; I / O 期间(和期间)使用线程 ):

multiprocessing.dummy 复制了多处理的 API,但仅不过是线程模块的包装器。

import urllib2
from multiprocessing.dummy import Pool as ThreadPool

urls = [
  'http://www.python.org',
  'http://www.python.org/about/',
  'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
  'http://www.python.org/doc/',
  'http://www.python.org/download/',
  'http://www.python.org/getit/',
  'http://www.python.org/community/',
  'https://wiki.python.org/moin/',
]

# Make the Pool of workers
pool = ThreadPool(4)

# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)

# Close the pool and wait for the work to finish
pool.close()
pool.join()

以及计时结果:

Single thread:   14.4 seconds
       4 Pool:   3.1 seconds
       8 Pool:   1.4 seconds
      13 Pool:   1.3 seconds

传递多个参数仅在 Python 3.3 和更高版本中才这样 ):

要传递多个数组:

results = pool.starmap(function, zip(list_a, list_b))

或传递一个常量和一个数组:

results = pool.starmap(function, zip(itertools.repeat(constant), list_a))

如果您使用的是 Python 的早期版本,则可以通过此变通方法( )传递多个参数。

(感谢user136036的有用评论。)

这是一个简单的示例:您需要尝试一些备用 URL 并返回第一个 URL 的内容以进行响应。

import Queue
import threading
import urllib2

# Called by each thread
def get_url(q, url):
    q.put(urllib2.urlopen(url).read())

theurls = ["http://google.com", "http://yahoo.com"]

q = Queue.Queue()

for u in theurls:
    t = threading.Thread(target=get_url, args = (q,u))
    t.daemon = True
    t.start()

s = q.get()
print s

在这种情况下,线程被用作简单的优化:每个子线程都在等待 URL 解析和响应,以便将其内容放入队列中。每个线程都是一个守护进程(如果主线程结束,则不会使进程继续运行 - 这比不常见);主线程启动所有子线程,在队列上进行get ,以等待它们之一完成put ,然后发出结果并终止(这将取消所有可能仍在运行的子线程,因为它们是守护程序线程)。

正确使用 Python 中的线程总是会与 I / O 操作相关联(因为 CPython 无论如何都不会使用多个内核来运行受 CPU 约束的任务,所以线程的唯一原因是在等待某些 I / O 时不会阻塞进程)。顺便说一句,队列几乎总是将工作分配到线程和 / 或收集工作结果的最佳方法,并且它们本质上是线程安全的,因此它们使您不必担心锁,条件,事件,信号量以及其他相互之间的关系。线程协调 / 通信概念。

注意 :对于 Python 中的实际并行化,您应该使用多处理模块来分叉并行执行的多个进程(由于全局解释器锁,Python 线程提供了交织,但实际上它们是串行执行的,而不是并行执行的,并且仅在交错 I / O 操作时很有用)。

但是,如果您只是在寻找交织(或者正在执行尽管可以使用全局解释器锁定,但是可以并行化的 I / O 操作),那么就可以从线程模块开始。作为一个非常简单的示例,让我们考虑通过并行求和子范围来求和一个大范围的问题:

import threading

class SummingThread(threading.Thread):
     def __init__(self,low,high):
         super(SummingThread, self).__init__()
         self.low=low
         self.high=high
         self.total=0

     def run(self):
         for i in range(self.low,self.high):
             self.total+=i


thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join()  # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result

请注意,以上示例是一个非常愚蠢的示例,因为它完全没有 I / O,并且由于全局解释器锁定,尽管在CPython 中是交错执行的(带有上下文切换的额外开销),但仍将串行执行。

像其他提到的一样,由于GIL ,CPython 只能将线程用于 I / O 等待。

如果您想从多个内核中受益于 CPU 绑定任务,请使用multiprocessing

from multiprocessing import Process

def f(name):
    print 'hello', name

if __name__ == '__main__':
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

仅需注意:线程不需要队列。

这是我能想到的最简单的示例,其中显示了 10 个进程同时运行。

import threading
from random import randint
from time import sleep


def print_number(number):

    # Sleeps a random 1 to 10 seconds
    rand_int_var = randint(1, 10)
    sleep(rand_int_var)
    print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"

thread_list = []

for i in range(1, 10):

    # Instantiates the thread
    # (i) does not make a sequence, so (i,)
    t = threading.Thread(target=print_number, args=(i,))
    # Sticks the thread in a list so that it remains accessible
    thread_list.append(t)

# Starts threads
for thread in thread_list:
    thread.start()

# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
    thread.join()

# Demonstrates that the main process waited for threads to complete
print "Done"

Alex Martelli 的回答对我有所帮助。但是,这是我认为更有用的修改版本(至少对我而言)。

更新:在 Python 2 和 Python 3 中均可使用

try:
    # For Python 3
    import queue
    from urllib.request import urlopen
except:
    # For Python 2 
    import Queue as queue
    from urllib2 import urlopen

import threading

worker_data = ['http://google.com', 'http://yahoo.com', 'http://bing.com']

# Load up a queue with your data. This will handle locking
q = queue.Queue()
for url in worker_data:
    q.put(url)

# Define a worker function
def worker(url_queue):
    queue_full = True
    while queue_full:
        try:
            # Get your data off the queue, and do some work
            url = url_queue.get(False)
            data = urlopen(url).read()
            print(len(data))

        except queue.Empty:
            queue_full = False

# Create as many threads as you want
thread_count = 5
for i in range(thread_count):
    t = threading.Thread(target=worker, args = (q,))
    t.start()

我发现这非常有用:创建与内核一样多的线程,并让它们执行(大量)任务(在这种情况下,调用 Shell 程序):

import Queue
import threading
import multiprocessing
import subprocess

q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
    q.put(i)

def worker():
    while True:
        item = q.get()
        # Execute a task: call a shell program and wait until it completes
        subprocess.call("echo " + str(item), shell=True)
        q.task_done()

cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
     t = threading.Thread(target=worker)
     t.daemon = True
     t.start()

q.join() # Block until all tasks are done

给定一个函数f ,使它像这样线程化:

import threading
threading.Thread(target=f).start()

将参数传递给f

threading.Thread(target=f, args=(a,b,c)).start()

Python 3 具有启动并行任务的功能 。这使我们的工作更加轻松。

它具有线程池进程池

以下提供了一个见解:

ThreadPoolExecutor 示例

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()

对我来说,线程的完美示例是监视异步事件。看这段代码。

# thread_test.py
import threading
import time

class Monitor(threading.Thread):
    def __init__(self, mon):
        threading.Thread.__init__(self)
        self.mon = mon

    def run(self):
        while True:
            if self.mon[0] == 2:
                print "Mon = 2"
                self.mon[0] = 3;

您可以通过打开IPython会话并执行以下操作来使用此代码:

>>> from thread_test import Monitor
>>> a = [0]
>>> mon = Monitor(a)
>>> mon.start()
>>> a[0] = 2
Mon = 2
>>>a[0] = 2
Mon = 2

等一下

>>> a[0] = 2
Mon = 2