ChatGPT解决这个技术问题 Extra ChatGPT

Threading pool similar to the multiprocessing Pool?

Is there a Pool class for worker threads, similar to the multiprocessing module's Pool class?

I like for example the easy way to parallelize a map function

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

however I would like to do it without the overhead of creating new processes.

I know about the GIL. However, in my usecase, the function will be an IO-bound C function for which the python wrapper will release the GIL before the actual function call.

Do I have to write my own threading pool?

Here's something that looks promising over in the Python Cookbook: Recipe 576519: Thread pool with same API as (multi)processing.Pool (Python)
Nowadays it's built-in: from multiprocessing.pool import ThreadPool.
Can you elaborate on this I know about the GIL. However, in my usecase, the function will be an IO-bound C function for which the python wrapper will release the GIL before the actual function call. ?

M
Martin

I just found out that there actually is a thread-based Pool interface in the multiprocessing module, however it is hidden somewhat and not properly documented.

It can be imported via

from multiprocessing.pool import ThreadPool

It is implemented using a dummy Process class wrapping a python thread. This thread-based Process class can be found in multiprocessing.dummy which is mentioned briefly in the docs. This dummy module supposedly provides the whole multiprocessing interface based on threads.


That's awesome. I had a problem creating ThreadPools outside the main thread, you can use them from a child thread once created though. I put an issue in for it: bugs.python.org/issue10015
I don't get it why this class has no documentation. Such helper classes are so important nowadays.
@Wernight: it isn't public primarily because nobody has offered a patch that provides it (or something similar) as threading.ThreadPool, including documentation and tests. It would indeed be a good battery to include in the standard library, but it won't happen if nobody writes it. One nice advantage of this existing implementation in multiprocessing, is that it should make any such threading patch much easier to write (docs.python.org/devguide)
@daniel.gindi: multiprocessing.dummy.Pool/multiprocessing.pool.ThreadPool are the same thing, and are both thread pools. They mimic the interface of a process pool, but they are implemented entirely in terms of threading. Reread the docs, you got it backwards.
@daniel.gindi: Read further: "multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module." multiprocessing in general is about processes, but to allow switching between processes and threads, they (mostly) replicated the multiprocessing API in multiprocessing.dummy, but backed with threads, not processes. The goal is to allow you to do import multiprocessing.dummy as multiprocessing to change process-based code to thread-based.
a
asmeurer

In Python 3 you can use concurrent.futures.ThreadPoolExecutor, i.e.:

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

See the docs for more info and examples.


in order to use the backported futures module, run sudo pip install futures
it's the most efficient and fastest way for multi processing
What is the difference between using ThreadPoolExecutor and multiprocessing.dummy.Pool?
concurrent.futures is as of the time of Python 3.9 / beginning of 3.10 is a very problematic library. It looks like it's overrun by bugs that aren't getting proper fixes. Perhaps, the whole premise of this library was bad. I'm more familiar with the process-based part of this library, where there's no end to reasons why the pool would hang up forever, swallow errors and misbehave in other ways. I would stay away from this library as much as possible.
m
martineau

Yes, and it seems to have (more or less) the same API.

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....

Import path for ThreadPool is different from Pool. Correct import is from multiprocessing.pool import ThreadPool.
Strangely this is not a documented API, and multiprocessing.pool is only briefly mentioned as providing AsyncResult. But it is available in 2.x and 3.x.
This is what I was looking for. It's just a single import line and a small change to my existing pool line and it works perfectly.
j
jfunez

For something very simple and lightweight (slightly modified from here):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

To support callbacks on task completion you can just add the callback to the task tuple.


how can the threads ever join if they unconditionally infinite loop?
@JosephGarvin I've tested it, and the threads keep blocking on an empty queue(since the call to Queue.get() is blocking) till the program ends, after which they are terminated automatically.
@JosephGarvin, good question. Queue.join() will actually join the task queue, not worker threads. So, when queue is empty, wait_completion returns, program ends, and threads are reaped by the OS.
If all of this code is wrapped up into a neat function it doesn't seem to be stopping threads even when the queue is empty and pool.wait_completion() returns. The result is that threads just keep building.
g
g00glen00b

Hi to use the thread pool in Python you can use this library :

from multiprocessing.dummy import Pool as ThreadPool

and then for use, this library do like that :

pool = ThreadPool(threads)
results = pool.map(service, tasks)
pool.close()
pool.join()
return results

The threads are the number of threads that you want and tasks are a list of task that most map to the service.


Thanks, that is a great suggestion! From the docs: multiprocessing.dummy replicates the API of multiprocessing but is no more than a wrapper around the threading module. One correction - I think you want to say that the pool api is (function,iterable)
We missed the .close() and .join() calls and that causes .map() to finish before all the threads are finished. Just a warning.
f
forumulator

Here's the result I finally ended up using. It's a modified version of the classes by dgorissen above.

File: threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

To use the pool

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()

Annotion for other readers: This code is Python 3 (shebang #!/usr/bin/python3)
Why do you use for i, d in enumerate(delays): and then ignore the i value?
@martineau - probably just a relic from development where they probably wanted to print i during a run.
Why is create_task there? What is it for?
I can't believe and answer with 4 votes on SO is the way to do ThreadPooling in Python. The Threadpool in the official python distribution is still broken? What am I missing?
K
Kashif

Yes, there is a threading pool similar to the multiprocessing Pool, however, it is hidden somewhat and not properly documented. You can import it by following way:-

from multiprocessing.pool import ThreadPool

Just I show you simple example

def test_multithread_stringio_read_csv(self):
        # see gh-11786
        max_row_range = 10000
        num_files = 100

        bytes_to_df = [
            '\n'.join(
                ['%d,%d,%d' % (i, i, i) for i in range(max_row_range)]
            ).encode() for j in range(num_files)]
        files = [BytesIO(b) for b in bytes_to_df]

        # read all files in many threads
        pool = ThreadPool(8)
        results = pool.map(self.read_csv, files)
        first_result = results[0]

        for result in results:
            tm.assert_frame_equal(first_result, result) 

imo this should be the accepted answer
p
pelos

another way can be adding the process to thethread queue pool

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)

u
unbeli

The overhead of creating the new processes is minimal, especially when it's just 4 of them. I doubt this is a performance hot spot of your application. Keep it simple, optimize where you have to and where profiling results point to.


If the questioner is under Windows (which I do not believe he specified), then I think that process spinup can be a significant expense. At least it is on the projects that I have been recently doing. :-)
c
crizCraig

There is no built in thread based pool. However, it can be very quick to implement a producer/consumer queue with the Queue class.

From: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done

This is no longer the case with the concurrent.futures module.
I don't think this is true at all anymore. from multiprocessing.pool import ThreadPool
j
jagadeesh pulamarasetti

If you don't mind executing other's code, here's mine:

Note: There is lot of extra code you may want to remove [added for better clarificaiton and demonstration how it works]

Note: Python naming conventions were used for method names and variable names instead of camelCase.

Working procedure:

MultiThread class will initiate with no of instances of threads by sharing lock, work queue, exit flag and results. SingleThread will be started by MultiThread once it creates all instances. We can add works using MultiThread (It will take care of locking). SingleThreads will process work queue using a lock in middle. Once your work is done, you can destroy all threads with shared boolean value. Here, work can be anything. It can automatically import (uncomment import line) and process module using given arguments. Results will be added to results and we can get using get_results

Code:

import threading
import queue


class SingleThread(threading.Thread):
    def __init__(self, name, work_queue, lock, exit_flag, results):
        threading.Thread.__init__(self)
        self.name = name
        self.work_queue = work_queue
        self.lock = lock
        self.exit_flag = exit_flag
        self.results = results

    def run(self):
        # print("Coming %s with parameters %s", self.name, self.exit_flag)
        while not self.exit_flag:
            # print(self.exit_flag)
            self.lock.acquire()
            if not self.work_queue.empty():
                work = self.work_queue.get()
                module, operation, args, kwargs = work.module, work.operation, work.args, work.kwargs
                self.lock.release()
                print("Processing : " + operation + " with parameters " + str(args) + " and " + str(kwargs) + " by " + self.name + "\n")
                # module = __import__(module_name)
                result = str(getattr(module, operation)(*args, **kwargs))
                print("Result : " + result + " for operation " + operation + " and input " + str(args) + " " + str(kwargs))
                self.results.append(result)
            else:
                self.lock.release()
        # process_work_queue(self.work_queue)

class MultiThread:
    def __init__(self, no_of_threads):
        self.exit_flag = bool_instance()
        self.queue_lock = threading.Lock()
        self.threads = []
        self.work_queue = queue.Queue()
        self.results = []
        for index in range(0, no_of_threads):
            thread = SingleThread("Thread" + str(index+1), self.work_queue, self.queue_lock, self.exit_flag, self.results)
            thread.start()
            self.threads.append(thread)

    def add_work(self, work):
        self.queue_lock.acquire()
        self.work_queue._put(work)
        self.queue_lock.release()

    def destroy(self):
        self.exit_flag.value = True
        for thread in self.threads:
            thread.join()

    def get_results(self):
        return self.results


class Work:
    def __init__(self, module, operation, args, kwargs={}):
        self.module = module
        self.operation = operation
        self.args = args
        self.kwargs = kwargs


class SimpleOperations:
    def sum(self, *args):
        return sum([int(arg) for arg in args])

    @staticmethod
    def mul(a, b, c=0):
        return int(a) * int(b) + int(c)


class bool_instance:
    def __init__(self, value=False):
        self.value = value

    def __setattr__(self, key, value):
        if key != "value":
            raise AttributeError("Only value can be set!")
        if not isinstance(value, bool):
            raise AttributeError("Only True/False can be set!")
        self.__dict__[key] = value
        # super.__setattr__(key, bool(value))

    def __bool__(self):
        return self.value

if __name__ == "__main__":
    multi_thread = MultiThread(5)
    multi_thread.add_work(Work(SimpleOperations(), "mul", [2, 3], {"c":4}))
    while True:
        data_input = input()
        if data_input == "":
            pass
        elif data_input == "break":
            break
        else:
            work = data_input.split()
            multi_thread.add_work(Work(SimpleOperations(), work[0], work[1:], {}))
    multi_thread.destroy()
    print(multi_thread.get_results())