我正在尝试使用 multiprocessing
的 Pool.map()
函数同时分配工作。当我使用以下代码时,它工作正常:
import multiprocessing
def f(x):
return x*x
def go():
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
if __name__== '__main__' :
go()
但是,当我在更面向对象的方法中使用它时,它就不起作用了。它给出的错误信息是:
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed
当以下是我的主程序时会发生这种情况:
import someClass
if __name__== '__main__' :
sc = someClass.someClass()
sc.go()
以下是我的 someClass
类:
import multiprocessing
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(self.f, range(10))
任何人都知道问题可能是什么,或者解决它的简单方法?
PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed
问题是多处理必须腌制事物以将它们吊在进程之间,并且绑定的方法是不可腌制的。解决方法(无论您是否认为它“简单”;-)是将基础结构添加到您的程序中以允许对此类方法进行腌制,并将其注册到 copy_reg 标准库方法。
例如,Steven Bethard 对 this thread 的贡献(接近线程的末尾)展示了一种完全可行的方法来允许通过 copy_reg
进行方法酸洗/取消酸洗。
所有这些解决方案都很丑陋,因为除非您跳出标准库,否则多处理和酸洗会被破坏和限制。
如果您使用 multiprocessing
的分支 pathos.multiprocesssing
,您可以直接在多处理的 map
函数中使用类和类方法。这是因为使用 dill
而不是 pickle
或 cPickle
,并且 dill
几乎可以序列化 python 中的任何内容。
pathos.multiprocessing
还提供了一个异步映射函数……它可以 map
具有多个参数的函数(例如 map(math.pow, [1,2,3], [4,5,6])
)
请参阅:What can multiprocessing and dill do together?
和:http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/
>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>>
>>> def add(x,y):
... return x+y
...
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>>
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>>
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>>
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>>
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
明确地说,您可以一开始就做您想做的事情,如果您愿意,您可以通过解释器来做。
>>> import pathos.pools as pp
>>> class someClass(object):
... def __init__(self):
... pass
... def f(self, x):
... return x*x
... def go(self):
... pool = pp.ProcessPool(4)
... print pool.map(self.f, range(10))
...
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>>
在此处获取代码:https://github.com/uqfoundation/pathos
pip install setuptools
,然后是 pip install git+https://github.com/uqfoundation/pathos.git@master
。这将获得适当的依赖关系。新版本几乎准备就绪……现在 pathos
中的几乎所有内容也可以在 Windows 上运行,并且与 3.x
兼容。
您还可以在 someClass()
中定义一个 __call__()
方法,该方法调用 someClass.go()
,然后将 someClass()
的实例传递给池。这个对象是可腌制的,它工作正常(对我来说)......
__call__()
举一个简单的例子吗?我认为您的答案可能是更清晰的答案-我正在努力理解这个错误,而且我第一次来看电话。顺便说一句,这个答案也有助于澄清多处理的作用:[stackoverflow.com/a/20789937/305883]
Steven Bethard 的解决方案有一些限制:
当您将类方法注册为函数时,每次方法处理完成时都会令人惊讶地调用您的类的析构函数。因此,如果您有 1 个类的实例调用其方法的 n 次,则成员可能会在 2 次运行之间消失,您可能会收到消息 malloc: *** error for object 0x...: pointer being freed was not allocated
(例如打开成员文件)或 pure virtual method called, terminate called without an active exception
(这意味着比成员的生命周期我使用的对象比我想象的要短)。我在处理大于池大小的 n 时得到了这个。这是一个简短的例子:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType
def _pickle_method(method):
func_name = method.im_func.__name__
obj = method.im_self
cls = method.im_class
return _unpickle_method, (func_name, obj, cls)
def _unpickle_method(func_name, obj, cls):
for cls in cls.mro():
try:
func = cls.__dict__[func_name]
except KeyError:
pass
else:
break
return func.__get__(obj, cls)
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multi-processing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __del__(self):
print "... Destructor"
def process_obj(self, index):
print "object %d" % index
return "results"
pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)
输出:
Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor
__call__
方法不是那么等效,因为 [None,...] 是从结果中读取的:
from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult
class Myclass(object):
def __init__(self, nobj, workers=cpu_count()):
print "Constructor ..."
# multiprocessing
pool = Pool(processes=workers)
async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
pool.close()
# waiting for all results
map(ApplyResult.wait, async_results)
lst_results=[r.get() for r in async_results]
print lst_results
def __call__(self, i):
self.process_obj(i)
def __del__(self):
print "... Destructor"
def process_obj(self, i):
print "obj %d" % i
return "result"
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once),
# **and** results are empty !
所以这两种方法都不令人满意......
None
,因为您对 __call__
的定义缺少 return
:它应该是 return self.process_obj(i)
。
您可以使用另一种快捷方式,尽管它可能效率低下,具体取决于您的类实例中的内容。
正如每个人所说的那样,问题在于 multiprocessing
代码必须腌制它发送到它已启动的子流程的东西,并且腌制器不执行实例方法。
但是,您可以不发送实例方法,而是将实际的类实例以及要调用的函数的名称发送到一个普通函数,然后使用 getattr
调用实例方法,从而在Pool
子流程。这类似于定义 __call__
方法,不同之处在于您可以调用多个成员函数。
从他的答案中窃取@EricH.的代码并对其进行一些注释(我重新输入了它,因此所有名称都发生了变化等等,出于某种原因,这似乎比剪切和粘贴更容易:-))以说明所有的魔力:
import multiprocessing
import os
def call_it(instance, name, args=(), kwargs=None):
"indirect caller for instance methods and multiprocessing"
if kwargs is None:
kwargs = {}
return getattr(instance, name)(*args, **kwargs)
class Klass(object):
def __init__(self, nobj, workers=multiprocessing.cpu_count()):
print "Constructor (in pid=%d)..." % os.getpid()
self.count = 1
pool = multiprocessing.Pool(processes = workers)
async_results = [pool.apply_async(call_it,
args = (self, 'process_obj', (i,))) for i in range(nobj)]
pool.close()
map(multiprocessing.pool.ApplyResult.wait, async_results)
lst_results = [r.get() for r in async_results]
print lst_results
def __del__(self):
self.count -= 1
print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)
def process_obj(self, index):
print "object %d" % index
return "results"
Klass(nobj=8, workers=3)
输出显示,确实,构造函数被调用一次(在原始 pid 中),而析构函数被调用 9 次(每个复制一次 = 2 或 3 次每个 pool-worker-process 需要,加上一次在原始过程)。这通常是可以的,就像在这种情况下一样,因为默认的pickler会复制整个实例并(半)秘密地重新填充它——在这种情况下,这样做:
obj = object.__new__(Klass)
obj.__dict__.update({'count':1})
——这就是为什么即使在三个工作进程中调用了八次析构函数,它每次都从 1 倒数到 0 ——当然,这样你仍然会遇到麻烦。如有必要,您可以提供自己的 __setstate__
:
def __setstate__(self, adict):
self.count = adict['count']
例如在这种情况下。
您还可以在 someClass()
中定义一个 __call__()
方法,该方法调用 someClass.go()
,然后将 someClass()
的实例传递给池。这个对象是可腌制的,它工作正常(对我来说)......
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def go(self):
p = Pool(4)
sc = p.map(self, range(4))
print sc
def __call__(self, x):
return self.f(x)
sc = someClass()
sc.go()
上面 parisjohn 的解决方案对我很有效。此外,代码看起来干净且易于理解。在我的例子中,有几个函数可以使用 Pool 调用,所以我在下面修改了 parisjohn 的代码。我使 __call__
能够调用多个函数,并且函数名称在 go()
的参数 dict 中传递:
from multiprocessing import Pool
class someClass(object):
def __init__(self):
pass
def f(self, x):
return x*x
def g(self, x):
return x*x+1
def go(self):
p = Pool(4)
sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
print sc
def __call__(self, x):
if x["func"]=="f":
return self.f(x["v"])
if x["func"]=="g":
return self.g(x["v"])
sc = someClass()
sc.go()
__call__
函数时遇到了问题,因为使用类对象的人可能会不小心执行不想要的操作。然而,有了这个,也许还有一个额外的检查“func”键是否存在并且 x 是 dict - 使它成为一个非常好的解决方案!
在这个简单的情况下,其中 someClass.f
没有从类继承任何数据并且没有将任何内容附加到该类,一个可能的解决方案是分离出 f
,因此它可以被腌制:
import multiprocessing
def f(x):
return x*x
class someClass(object):
def __init__(self):
pass
def go(self):
pool = multiprocessing.Pool(processes=4)
print pool.map(f, range(10))
一个可能很简单的解决方案是改用 multiprocessing.dummy
。这是多处理接口的基于线程的实现,在 Python 2.7 中似乎没有这个问题。我在这里没有很多经验,但是这个快速的导入更改允许我在类方法上调用 apply_async。
multiprocessing.dummy
上的一些好资源:
https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy
http://chriskiehl.com/article/parallelism-in-one-line/
为什么不使用单独的函数?
def func(*args, **kwargs):
return inst.method(args, kwargs)
print pool.map(func, arr)
我遇到了同样的问题,但发现有一个 JSON 编码器可用于在进程之间移动这些对象。
from pyVmomi.VmomiSupport import VmomiJSONEncoder
使用它来创建您的列表:
jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)
然后在映射函数中,使用它来恢复对象:
pfVmomiObj = json.loads(jsonSerialized)
更新:截至撰写本文时,namedTuples 是可选的(从 python 2.7 开始)
这里的问题是子进程无法导入对象的类 - 在这种情况下是类 P-,在多模型项目的情况下,类 P 应该可以在任何使用子进程的地方导入
一个快速的解决方法是通过将其影响到 globals() 使其可导入
globals()["P"] = P
pathos.multiprocessing
为我工作。
它有一个 pool
方法并序列化所有与 multiprocessing
不同的东西
import pathos.multiprocessing as mp
pool = mp.Pool(processes=2)
不定期副业成功案例分享
_pickle_method
返回self._unpickle_method
,一个绑定方法;所以当然 pickle 现在会尝试腌制 - 它会按照您的指示进行:通过递归调用_pickle_method
。即OO
以这种方式编写代码,不可避免地会引入无限递归。我建议回到 Steven 的代码(不要在不合适的时候崇拜 OO:Python 中的许多事情最好以更实用的方式完成,这就是其中之一)。