ChatGPT解决这个技术问题 Extra ChatGPT

如何使用带有多个参数的多处理 pool.map

在 Python multiprocessing 库中,是否存在支持多个参数的 pool.map 变体?

import multiprocessing

text = "test"

def harvester(text, case):
    X = case[0]
    text + str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    pool.map(harvester(text, case), case, 1)
    pool.close()
    pool.join()
令我惊讶的是,我不能让 partiallambda 这样做。我认为这与将函数传递给子进程(通过 pickle)的奇怪方式有关。
@senderle:这是 Python 2.6 中的一个错误,但已在 2.7 中修复:bugs.python.org/issue5228
只需将 pool.map(harvester(text,case),case, 1) 替换为:pool.apply_async(harvester(text,case),case, 1)
@Syrtis_Major,请不要编辑有效扭曲先前给出的答案的 OP 问题。将 return 添加到 harvester() 会使 @senderie 的响应变得不准确。这对未来的读者没有帮助。
我想说简单的解决方案是将所有参数打包在一个元组中,然后在执行的函数中解包。当我需要将复杂的多个 args 发送到由进程池执行的 func 时,我这样做了。

j
jfs

是否有支持多个参数的 pool.map 变体?

Python 3.3 包括 pool.starmap() method

#!/usr/bin/env python3
from functools import partial
from itertools import repeat
from multiprocessing import Pool, freeze_support

def func(a, b):
    return a + b

def main():
    a_args = [1,2,3]
    second_arg = 1
    with Pool() as pool:
        L = pool.starmap(func, [(1, 1), (2, 1), (3, 1)])
        M = pool.starmap(func, zip(a_args, repeat(second_arg)))
        N = pool.map(partial(func, b=second_arg), a_args)
        assert L == M == N

if __name__=="__main__":
    freeze_support()
    main()

对于旧版本:

#!/usr/bin/env python2
import itertools
from multiprocessing import Pool, freeze_support

def func(a, b):
    print a, b

def func_star(a_b):
    """Convert `f([1,2])` to `f(1,2)` call."""
    return func(*a_b)

def main():
    pool = Pool()
    a_args = [1,2,3]
    second_arg = 1
    pool.map(func_star, itertools.izip(a_args, itertools.repeat(second_arg)))

if __name__=="__main__":
    freeze_support()
    main()

输出

1 1
2 1
3 1

请注意此处如何使用 itertools.izip()itertools.repeat()

由于 the bug mentioned by @unutbu 您不能在 Python 2.6 上使用 functools.partial() 或类似功能,因此应显式定义简单包装函数 func_star()。另见the workaround suggested by uptimebox


F.:您可以像这样解压缩 func_star 签名中的参数元组:def func_star((a, b))。当然,这只适用于固定数量的参数,但如果这是他唯一的情况,它更具可读性。
@Space_C0wb0y:f((a,b)) 语法在 py3k 中已被弃用和删除。而且这里没有必要。
也许更 Pythonic:func = lambda x: func(*x) 而不是定义包装函数
@zthomas.nc 这个问题是关于如何支持多处理 pool.map 的多个参数。如果想知道如何通过多处理在不同的 Python 进程中调用方法而不是函数,请提出一个单独的问题(如果一切都失败了,您始终可以创建一个全局函数来包装类似于上述 func_star() 的方法调用)
我希望有starstarmap
s
senderle

对此的答案取决于版本和情况。对于最新版本的 Python(自 3.3 起),最通用的答案首先由 J.F. Sebastian 在下面描述。1它使用 Pool.starmap 方法,该方法接受一系列参数元组。然后它会自动解包每个元组的参数并将它们传递给给定的函数:

import multiprocessing
from itertools import product

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with multiprocessing.Pool(processes=3) as pool:
        results = pool.starmap(merge_names, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

对于早期版本的 Python,您需要编写一个辅助函数来显式解包参数。如果您想使用 with,您还需要编写一个包装器来将 Pool 变成一个上下文管理器。 (感谢 muon 指出这一点。)

import multiprocessing
from itertools import product
from contextlib import contextmanager

def merge_names(a, b):
    return '{} & {}'.format(a, b)

def merge_names_unpack(args):
    return merge_names(*args)

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(merge_names_unpack, product(names, repeat=2))
    print(results)

# Output: ['Brown & Brown', 'Brown & Wilson', 'Brown & Bartlett', ...

在更简单的情况下,使用固定的第二个参数,您也可以使用 partial,但仅限于 Python 2.7+。

import multiprocessing
from functools import partial
from contextlib import contextmanager

@contextmanager
def poolcontext(*args, **kwargs):
    pool = multiprocessing.Pool(*args, **kwargs)
    yield pool
    pool.terminate()

def merge_names(a, b):
    return '{} & {}'.format(a, b)

if __name__ == '__main__':
    names = ['Brown', 'Wilson', 'Bartlett', 'Rivera', 'Molloy', 'Opie']
    with poolcontext(processes=3) as pool:
        results = pool.map(partial(merge_names, b='Sons'), names)
    print(results)

# Output: ['Brown & Sons', 'Wilson & Sons', 'Bartlett & Sons', ...

1. 这在很大程度上受到了他的回答的启发,而他的回答可能应该被接受。但由于这个卡在顶部,似乎最好为未来的读者改进它。


在我看来,在这种情况下 RAW_DATASET 应该是一个全局变量?虽然我希望 partial_harvester 在每次调用harvester() 时更改 case 的值。如何做到这一点?
这里最重要的是将 =RAW_DATASET 默认值分配给 case。否则 pool.map 会混淆多个参数。
我很困惑,您的示例中的 text 变量发生了什么?为什么 RAW_DATASET 似乎通过了两次。我想你可能有错字?
不知道为什么使用 with .. as .. 给了我 AttributeError: __exit__,但如果我只调用 pool = Pool(); 然后手动关闭 pool.close()(python2.7)就可以了
@muon,很好的收获。看起来 Pool 对象直到 Python 3.3 才成为上下文管理器。我添加了一个返回 Pool 上下文管理器的简单包装函数。
P
Peter Mortensen

我认为下面会更好:

def multi_run_wrapper(args):
   return add(*args)

def add(x,y):
    return x+y

if __name__ == "__main__":
    from multiprocessing import Pool
    pool = Pool(4)
    results = pool.map(multi_run_wrapper,[(1,2),(2,3),(3,4)])
    print results

输出

[3, 5, 7]

最简单的解决方案。有一个小的优化;删除包装函数并直接在 add 中解压缩 args,它适用于任意数量的参数:def add(args): (x,y) = args
您也可以使用 lambda 函数而不是定义 multi_run_wrapper(..)
嗯...事实上,使用 lambda 不起作用,因为 pool.map(..) 试图腌制给定的函数
如果要将 add 的结果存储在列表中,如何使用它?
请在获取 results = pool.map(...) 后添加 pool.close()pool.join(),否则这可能会永远运行
u
user136036

Python 3.3+pool.starmap(): 结合使用

from multiprocessing.dummy import Pool as ThreadPool 

def write(i, x):
    print(i, "---", x)

a = ["1","2","3"]
b = ["4","5","6"] 

pool = ThreadPool(2)
pool.starmap(write, zip(a,b)) 
pool.close() 
pool.join()

结果:

1 --- 4
2 --- 5
3 --- 6

如果您愿意,还可以 zip() 更多参数:zip(a,b,c,d,e)

如果您希望将常量值作为参数传递:

import itertools

zip(itertools.repeat(constant), a)

如果你的函数应该返回一些东西:

results = pool.starmap(write, zip(a,b))

这给出了一个带有返回值的列表。


这是一个与 2011 年 @JFSebastian 的答案几乎完全相同的答案(有 60 多票)。
不。首先,它删除了许多不必要的东西,并明确指出它适用于 python 3.3+,适用于寻求简单而干净的答案的初学者。作为一个初学者,我自己花了一些时间才弄清楚(是的,JFSebastians 的帖子),这就是为什么我写我的帖子来帮助其他初学者,因为他的帖子只是说“有星图”但没有解释它 - 这个这就是我的帖子的意图。所以绝对没有理由用两次反对票来抨击我。
l
lpd11

如何接受多个参数:

def f1(args):
    a, b, c = args[0] , args[1] , args[2]
    return a+b+c

if __name__ == "__main__":
    import multiprocessing
    pool = multiprocessing.Pool(4) 

    result1 = pool.map(f1, [ [1,2,3] ])
    print(result1)

整洁优雅。
我不明白为什么我必须一直滚动到这里才能找到最佳答案。
从字面上看,这个答案应该是最重要的。
尽管如此,还是需要做出解释。例如,想法/要点是什么?它使用哪些语言功能,为什么?请通过 editing (changing) your answer 回复,而不是在评论中(没有“编辑:”、“更新:”或类似内容 - 答案应该看起来好像是今天写的)。
P
Peter Mortensen

J.F. Sebastian's answer 中了解了 itertools 后,我决定更进一步,编写一个处理并行化的 parmap 包,在 Python 2.7 和 Python 3.2 中提供 mapstarmap 函数(以及后来的) 可以接受任意数量的个位置参数。

安装

pip install parmap

如何并行化:

import parmap
# If you want to do:
y = [myfunction(x, argument1, argument2) for x in mylist]
# In parallel:
y = parmap.map(myfunction, mylist, argument1, argument2)

# If you want to do:
z = [myfunction(x, y, argument1, argument2) for (x,y) in mylist]
# In parallel:
z = parmap.starmap(myfunction, mylist, argument1, argument2)

# If you want to do:
listx = [1, 2, 3, 4, 5, 6]
listy = [2, 3, 4, 5, 6, 7]
param = 3.14
param2 = 42
listz = []
for (x, y) in zip(listx, listy):
        listz.append(myfunction(x, y, param1, param2))
# In parallel:
listz = parmap.starmap(myfunction, zip(listx, listy), param1, param2)

我已将 parmap 上传到 PyPI 和 a GitHub repository

例如,这个问题可以回答如下:

import parmap

def harvester(case, text):
    X = case[0]
    text+ str(X)

if __name__ == "__main__":
    case = RAW_DATASET  # assuming this is an iterable
    parmap.map(harvester, case, "test", chunksize=1)

P
Peter Mortensen

multiprocessing 有一个名为 pathos 的分支(注意:使用 GitHub 上的版本),它不需要 starmap——地图函数反映了 Python 地图的 API,因此地图可以接受多个参数。

使用 pathos,您通常还可以在解释器中进行多处理,而不是卡在 __main__ 块中。 Pathos 即将发布,经过一些温和的更新——主要是转换为 Python 3.x。

  Python 2.7.5 (default, Sep 30 2013, 20:15:49)
  [GCC 4.2.1 (Apple Inc. build 5566)] on darwin
  Type "help", "copyright", "credits" or "license" for more information.
  >>> def func(a,b):
  ...     print a,b
  ...
  >>>
  >>> from pathos.multiprocessing import ProcessingPool
  >>> pool = ProcessingPool(nodes=4)
  >>> pool.map(func, [1,2,3], [1,1,1])
  1 1
  2 1
  3 1
  [None, None, None]
  >>>
  >>> # also can pickle stuff like lambdas
  >>> result = pool.map(lambda x: x**2, range(10))
  >>> result
  [0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
  >>>
  >>> # also does asynchronous map
  >>> result = pool.amap(pow, [1,2,3], [4,5,6])
  >>> result.get()
  [1, 32, 729]
  >>>
  >>> # or can return a map iterator
  >>> result = pool.imap(pow, [1,2,3], [4,5,6])
  >>> result
  <processing.pool.IMapIterator object at 0x110c2ffd0>
  >>> list(result)
  [1, 32, 729]

pathos 有多种方法可以让您获得 starmap 的确切行为。

>>> def add(*x):
...   return sum(x)
...
>>> x = [[1,2,3],[4,5,6]]
>>> import pathos
>>> import numpy as np
>>> # use ProcessPool's map and transposing the inputs
>>> pp = pathos.pools.ProcessPool()
>>> pp.map(add, *np.array(x).T)
[6, 15]
>>> # use ProcessPool's map and a lambda to apply the star
>>> pp.map(lambda x: add(*x), x)
[6, 15]
>>> # use a _ProcessPool, which has starmap
>>> _pp = pathos.pools._ProcessPool()
>>> _pp.starmap(add, x)
[6, 15]
>>>

我想指出,这并没有解决原始问题中的结构。 [[1,2,3], [4,5,6]] 将使用星图解包到 [pow(1,2,3), pow(4,5,6)],而不是 [pow(1,4) , pow(2,5), pow(3, 6)]。如果您无法很好地控制传递给您的函数的输入,您可能需要先重组它们。
@Scott:啊,我没注意到...... 5 年前。我会做一个小更新。谢谢。
应该压缩输入向量。比转置和数组更容易理解,你不觉得吗?
数组转置虽然可能不太清晰,但应该更便宜。
P
Peter Mortensen

Python 2 的更好解决方案:

from multiprocessing import Pool
def func((i, (a, b))):
    print i, a, b
    return a + b
pool = Pool(3)
pool.map(func, [(0,(1,2)), (1,(2,3)), (2,(3, 4))])

输出

2 3 4

1 2 3

0 1 2

out[]:

[3, 5, 7]


P
Peter Mortensen

另一种方法是将列表列表传递给单参数例程:

import os
from multiprocessing import Pool

def task(args):
    print "PID =", os.getpid(), ", arg1 =", args[0], ", arg2 =", args[1]

pool = Pool()

pool.map(task, [
        [1,2],
        [3,4],
        [5,6],
        [7,8]
    ])

然后可以用自己喜欢的方法构造一个参数列表。


这是一种简单的方法,但您需要更改原始功能。更重要的是,有时会回忆别人可能无法修改的功能。
我会说这坚持 Python zen。应该有一种而且只有一种明显的方法可以做到这一点。如果碰巧你是调用函数的作者,那么你应该使用这个方法,其他情况我们可以使用imotai的方法。
我的选择是使用一个元组,然后立即将它们作为第一行中的第一件事展开。
“参数列表” 是什么意思(似乎难以理解)?最好通过 editing (changing) your answer 回复,而不是在评论中(without“Edit:”、“Update:”或类似的 - 答案应该看起来好像是今天写的)。
P
Peter Mortensen

更好的方法是使用 decorator 而不是手动编写 wrapper function。尤其是当您有很多要映射的函数时,装饰器将通过避免为每个函数编写包装器来节省您的时间。通常装饰函数是不可提取的,但是我们可以使用 functools 来绕过它。可以找到更多讨论here

这是示例:

def unpack_args(func):
    from functools import wraps
    @wraps(func)
    def wrapper(args):
        if isinstance(args, dict):
            return func(**args)
        else:
            return func(*args)
    return wrapper

@unpack_args
def func(x, y):
    return x + y

然后你可以用压缩参数映射它:

np, xlist, ylist = 2, range(10), range(10)
pool = Pool(np)
res = pool.map(func, zip(xlist, ylist))
pool.close()
pool.join()

当然,如其他答案中所述,您可以始终在 Python 3 (>=3.3) 中使用 Pool.starmap


结果不符合预期:[0,2,4,6,8,10,12,14,16,18] 我希望:[0,1,2,3,4,5,6,7,8, 9,1,2,3,4,5,6,7,8,9,10,2,3,4,5,6,7,8,9,10,11, ...
@TedoVrbanec 结果应该是 [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]。如果您想要后者,您可以使用 itertools.product 而不是 zip
starmap 是我正在寻找的答案。
M
M. Toya

您可以使用以下两个函数,以避免为每个新函数编写包装器:

import itertools
from multiprocessing import Pool

def universal_worker(input_pair):
    function, args = input_pair
    return function(*args)

def pool_args(function, *args):
    return zip(itertools.repeat(function), zip(*args))

将函数 function 与参数列表 arg_0arg_1arg_2 一起使用,如下所示:

pool = Pool(n_core)
list_model = pool.map(universal_worker, pool_args(function, arg_0, arg_1, arg_2)
pool.close()
pool.join()

A
Alex Klibisz

另一个简单的替代方法是将函数参数包装在一个元组中,然后将应该在元组中传递的参数也包装起来。在处理大量数据时,这可能并不理想。我相信它会为每个元组制作副本。

from multiprocessing import Pool

def f((a,b,c,d)):
    print a,b,c,d
    return a + b + c +d

if __name__ == '__main__':
    p = Pool(10)
    data = [(i+0,i+1,i+2,i+3) for i in xrange(10)]
    print(p.map(f, data))
    p.close()
    p.join()

以某种随机顺序给出输出:

0 1 2 3
1 2 3 4
2 3 4 5
3 4 5 6
4 5 6 7
5 6 7 8
7 8 9 10
6 7 8 9
8 9 10 11
9 10 11 12
[6, 10, 14, 18, 22, 26, 30, 34, 38, 42]

确实如此,仍在寻找更好的方法:(
c
cdahms

这是另一种方法,恕我直言,比提供的任何其他答案都更简单和优雅。

这个程序有一个函数,它接受两个参数,将它们打印出来并打印总和:

import multiprocessing

def main():

    with multiprocessing.Pool(10) as pool:
        params = [ (2, 2), (3, 3), (4, 4) ]
        pool.starmap(printSum, params)
    # end with

# end function

def printSum(num1, num2):
    mySum = num1 + num2
    print('num1 = ' + str(num1) + ', num2 = ' + str(num2) + ', sum = ' + str(mySum))
# end function

if __name__ == '__main__':
    main()

输出是:

num1 = 2, num2 = 2, sum = 4
num1 = 3, num2 = 3, sum = 6
num1 = 4, num2 = 4, sum = 8

有关更多信息,请参阅 python 文档:

https://docs.python.org/3/library/multiprocessing.html#module-multiprocessing.pool

特别是一定要检查 starmap 功能。

我正在使用 Python 3.6,我不确定这是否适用于较旧的 Python 版本

为什么在文档中没有这样一个非常直接的例子,我不确定。


P
Peter Mortensen

从 Python 3.4.4 开始,您可以使用 multiprocessing.get_context() 获取上下文对象以使用多个启动方法:

import multiprocessing as mp

def foo(q, h, w):
    q.put(h + ' ' + w)
    print(h + ' ' + w)

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,'hello', 'world'))
    p.start()
    print(q.get())
    p.join()

或者你只是简单地替换

pool.map(harvester(text, case), case, 1)

和:

pool.apply_async(harvester(text, case), case, 1)

r
roj4s

在官方文档中声明它仅支持一个可迭代参数。在这种情况下,我喜欢使用 apply_async。在你的情况下,我会这样做:

from multiprocessing import Process, Pool, Manager

text = "test"
def harvester(text, case, q = None):
 X = case[0]
 res = text+ str(X)
 if q:
  q.put(res)
 return res


def block_until(q, results_queue, until_counter=0):
 i = 0
 while i < until_counter:
  results_queue.put(q.get())
  i+=1

if __name__ == '__main__':
 pool = multiprocessing.Pool(processes=6)
 case = RAW_DATASET
 m = Manager()
 q = m.Queue()
 results_queue = m.Queue() # when it completes results will reside in this queue
 blocking_process = Process(block_until, (q, results_queue, len(case)))
 blocking_process.start()
 for c in case:
  try:
   res = pool.apply_async(harvester, (text, case, q = None))
   res.get(timeout=0.1)
  except:
   pass
 blocking_process.join()

您的意思是 c 而不是 case ,对吧?:res = pool.apply_async(harvester, (text, case, q = None))
c
cgnorthcutt

这里有很多答案,但似乎没有一个提供适用于任何版本的 Python 2/3 兼容代码。如果您希望您的代码正常工作,这将适用于任一 Python 版本:

# For python 2/3 compatibility, define pool context manager
# to support the 'with' statement in Python 2
if sys.version_info[0] == 2:
    from contextlib import contextmanager
    @contextmanager
    def multiprocessing_context(*args, **kwargs):
        pool = multiprocessing.Pool(*args, **kwargs)
        yield pool
        pool.terminate()
else:
    multiprocessing_context = multiprocessing.Pool

之后,你可以使用常规的 Python 3 方式进行多处理,但你喜欢。例如:

def _function_to_run_for_each(x):
       return x.lower()
with multiprocessing_context(processes=3) as pool:
    results = pool.map(_function_to_run_for_each, ['Bob', 'Sue', 'Tim'])    print(results)

将在 Python 2 或 Python 3 中工作。


J
Jaime
text = "test"

def unpack(args):
    return args[0](*args[1:])

def harvester(text, case):
    X = case[0]
    text+ str(X)

if __name__ == '__main__':
    pool = multiprocessing.Pool(processes=6)
    case = RAW_DATASET
    # args is a list of tuples 
    # with the function to execute as the first item in each tuple
    args = [(harvester, text, c) for c in case]
    # doing it this way, we can pass any function
    # and we don't need to define a wrapper for each different function
    # if we need to use more than one
    pool.map(unpack, args)
    pool.close()
    pool.join()

A
A. Nodar

这是我用来将多个参数传递给 pool.imap 派生中使用的单参数函数的例程示例:

from multiprocessing import Pool

# Wrapper of the function to map:
class makefun:
    def __init__(self, var2):
        self.var2 = var2
    def fun(self, i):
        var2 = self.var2
        return var1[i] + var2

# Couple of variables for the example:
var1 = [1, 2, 3, 5, 6, 7, 8]
var2 = [9, 10, 11, 12]

# Open the pool:
pool = Pool(processes=2)

# Wrapper loop
for j in range(len(var2)):
    # Obtain the function to map
    pool_fun = makefun(var2[j]).fun

    # Fork loop
    for i, value in enumerate(pool.imap(pool_fun, range(len(var1))), 0):
        print(var1[i], '+' ,var2[j], '=', value)

# Close the pool
pool.close()

A
Aenaon

这可能是另一种选择。诀窍在于 wrapper 函数返回另一个传递给 pool.map 的函数。下面的代码读取一个输入数组,并为其中的每个(唯一)元素返回该元素在数组中出现的次数(即计数),例如,如果输入是

np.eye(3) = [ [1. 0. 0.]
              [0. 1. 0.]
              [0. 0. 1.]]

然后 0 出现 6 次, 1 出现 3 次

import numpy as np
from multiprocessing.dummy import Pool as ThreadPool
from multiprocessing import cpu_count


def extract_counts(label_array):
    labels = np.unique(label_array)
    out = extract_counts_helper([label_array], labels)
    return out

def extract_counts_helper(args, labels):
    n = max(1, cpu_count() - 1)
    pool = ThreadPool(n)
    results = {}
    pool.map(wrapper(args, results), labels)
    pool.close()
    pool.join()
    return results

def wrapper(argsin, results):
    def inner_fun(label):
        label_array = argsin[0]
        counts = get_label_counts(label_array, label)
        results[label] = counts
    return inner_fun

def get_label_counts(label_array, label):
    return sum(label_array.flatten() == label)

if __name__ == "__main__":
    img = np.ones([2,2])
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
           
    img = np.eye(3)
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")
    
    img = np.random.randint(5, size=(3, 3))
    out = extract_counts(img)
    print('input array: \n', img)
    print('label counts: ', out)
    print("========")

你应该得到:

input array: 
 [[1. 1.]
 [1. 1.]]
label counts:  {1.0: 4}
========
input array: 
 [[1. 0. 0.]
 [0. 1. 0.]
 [0. 0. 1.]]
label counts:  {0.0: 6, 1.0: 3}
========
input array: 
 [[4 4 0]
 [2 4 3]
 [2 3 1]]
label counts:  {0: 1, 1: 1, 2: 2, 3: 2, 4: 3}
========

D
Dipankar Biswas
import time
from multiprocessing import Pool


def f1(args):
    vfirst, vsecond, vthird = args[0] , args[1] , args[2]
    print(f'First Param: {vfirst}, Second value: {vsecond} and finally third value is: {vthird}')
    pass


if __name__ == '__main__':
    p = Pool()
    result = p.map(f1, [['Dog','Cat','Mouse']])
    p.close()
    p.join()
    print(result)

一个解释将是有序的。例如,想法/要点是什么?请通过 editing (changing) your answer 回复,而不是在评论中(没有“编辑:”、“更新:”或类似内容 - 答案应该看起来好像是今天写的)。
P
Peter Mortensen

将所有参数存储为元组数组。

该示例通常说您将函数称为:

def mainImage(fragCoord: vec2, iResolution: vec3, iTime: float) -> vec3:

而是传递一个元组并解压缩参数:

def mainImage(package_iter) -> vec3:
    fragCoord = package_iter[0]
    iResolution = package_iter[1]
    iTime = package_iter[2]

预先使用循环构建元组:

package_iter = []
iResolution = vec3(nx, ny, 1)
for j in range((ny-1), -1, -1):
    for i in range(0, nx, 1):
        fragCoord: vec2 = vec2(i, j)
        time_elapsed_seconds = 10
        package_iter.append((fragCoord, iResolution, time_elapsed_seconds))

然后通过传递元组数组使用 map 执行所有操作:

array_rgb_values = []

with concurrent.futures.ProcessPoolExecutor() as executor:
    for val in executor.map(mainImage, package_iter):
        fragColor = val
        ir = clip(int(255* fragColor.r), 0, 255)
        ig = clip(int(255* fragColor.g), 0, 255)
        ib = clip(int(255* fragColor.b), 0, 255)

        array_rgb_values.append((ir, ig, ib))

我知道 Python 有 *** 用于解包,但我还没有尝试过。

与低级多处理库相比,使用高级库并发期货也更好。


P
Peter Mortensen

对于 Python 2,你可以使用这个技巧

def fun(a, b):
    return a + b

pool = multiprocessing.Pool(processes=6)
b = 233
pool.map(lambda x:fun(x, b), range(1000))

为什么 b=233。违背了问题的目的