ChatGPT解决这个技术问题 Extra ChatGPT

Can't pickle <type 'instancemethod'> when using multiprocessing Pool.map()

I'm trying to use multiprocessing's Pool.map() function to divide out work simultaneously. When I use the following code, it works fine:

import multiprocessing

def f(x):
    return x*x

def go():
    pool = multiprocessing.Pool(processes=4)        
    print pool.map(f, range(10))


if __name__== '__main__' :
    go()

However, when I use it in a more object-oriented approach, it doesn't work. The error message it gives is:

PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup
__builtin__.instancemethod failed

This occurs when the following is my main program:

import someClass

if __name__== '__main__' :
    sc = someClass.someClass()
    sc.go()

and the following is my someClass class:

import multiprocessing

class someClass(object):
    def __init__(self):
        pass

    def f(self, x):
        return x*x

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(self.f, range(10))

Anyone know what the problem could be, or an easy way around it?

if f is a nested function there is a similar error PicklingError: Can't pickle <class 'function'>: attribute lookup builtins.function failed

J
Jean-François Fabre

The problem is that multiprocessing must pickle things to sling them among processes, and bound methods are not picklable. The workaround (whether you consider it "easy" or not;-) is to add the infrastructure to your program to allow such methods to be pickled, registering it with the copy_reg standard library method.

For example, Steven Bethard's contribution to this thread (towards the end of the thread) shows one perfectly workable approach to allow method pickling/unpickling via copy_reg.


That's great - thank you. Seem to have progressed some way, anyhow: Using the code at pastebin.ca/1693348 I now get a RuntimeError: maximum recursion depth exceeded. I looked around and one forum post recommended increasing the maximum depth to 1500 (from the default 1000) but I had no joy there. To be honest, I can't see what part (of my code, at least) could be recursing out of control, unless for some reason the code is pickling and unpickling in a loop, due to slight changes I made in order to make Steven's code OO'd?
Your _pickle_method returns self._unpickle_method, a bound method; so of course pickle now tries to pickle THAT -- and it does as you've told it to: by calling _pickle_method, recursively. I.e. by OOing the code in this way, you have inevitably introduced infinite recursion. I suggest going back to Steven's code (and not worshipping at the altar of OO when not appropriate: many things in Python are best done in a more functional-way, and this is one).
For the super super lazy, see the only answer that bothered to post the actual non-mangled code...
Another way to fix / circumvent the pickling problem is using dill, see my answer stackoverflow.com/questions/8804830/…
M
Mike McKerns

All of these solutions are ugly because multiprocessing and pickling is broken and limited unless you jump outside the standard library.

If you use a fork of multiprocessing called pathos.multiprocesssing, you can directly use classes and class methods in multiprocessing's map functions. This is because dill is used instead of pickle or cPickle, and dill can serialize almost anything in python.

pathos.multiprocessing also provides an asynchronous map function… and it can map functions with multiple arguments (e.g. map(math.pow, [1,2,3], [4,5,6]))

See: What can multiprocessing and dill do together?

and: http://matthewrocklin.com/blog/work/2013/12/05/Parallelism-and-Serialization/

>>> import pathos.pools as pp
>>> p = pp.ProcessPool(4)
>>> 
>>> def add(x,y):
...   return x+y
... 
>>> x = [0,1,2,3]
>>> y = [4,5,6,7]
>>> 
>>> p.map(add, x, y)
[4, 6, 8, 10]
>>> 
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> 
>>> p.map(Test.plus, [t]*4, x, y)
[4, 6, 8, 10]
>>> 
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]

And just to be explicit, you can do exactly want you wanted to do in the first place, and you can do it from the interpreter, if you wanted to.

>>> import pathos.pools as pp
>>> class someClass(object):
...   def __init__(self):
...     pass
...   def f(self, x):
...     return x*x
...   def go(self):
...     pool = pp.ProcessPool(4)
...     print pool.map(self.f, range(10))
... 
>>> sc = someClass()
>>> sc.go()
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> 

Get the code here: https://github.com/uqfoundation/pathos


Can you please update this answer based on pathos.pp because pathos.multiprocessing doesn't exist anymore?
I'm the pathos author. The version you are referring to is several years old. Try the version on github, You can use pathos.pp or github.com/uqfoundation/ppft.
or github.com/uqfoundation/pathos. @SaheelGodhane: A new release is long overdue, but should be out shortly.
First pip install setuptools, then pip install git+https://github.com/uqfoundation/pathos.git@master. This will get the appropriate dependencies. A new release is nearly ready… now almost everything in pathos also runs on windows, and is 3.x compatible.
@Rika: Yes. blocking, iterative, and async maps are available.
d
deprecated

You could also define a __call__() method inside your someClass(), which calls someClass.go() and then pass an instance of someClass() to the pool. This object is pickleable and it works fine (for me)...


This is much easier than the technique proposed by Alex Martelli, but you are limited to sending only one method per class to your multiprocessing pool.
One other detail to bear in mind is that it is only the object (class instance) that gets pickled, not the class itself. Therefore, if you have changed any class attributes from their default values these changes will not propagate to the different processes. The workaround is to make sure that everything your function needs is stored as an instance attribute.
@dorvak could you please show a simple example with __call__() ? I think your answer might be the cleaner one - I am struggling to understand this error, and first time I come to see call. By the way, also this answer help to clarify what multiprocessing does: [stackoverflow.com/a/20789937/305883]
Can you give an example of this?
There is a new answer posted (currently below this one) with example code for this.
E
Eric H.

Some limitations though to Steven Bethard's solution :

When you register your class method as a function, the destructor of your class is surprisingly called every time your method processing is finished. So if you have 1 instance of your class that calls n times its method, members may disappear between 2 runs and you may get a message malloc: *** error for object 0x...: pointer being freed was not allocated (e.g. open member file) or pure virtual method called, terminate called without an active exception (which means than the lifetime of a member object I used was shorter than what I thought). I got this when dealing with n greater than the pool size. Here is a short example :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

# --------- see Stenven's solution above -------------
from copy_reg import pickle
from types import MethodType

def _pickle_method(method):
    func_name = method.im_func.__name__
    obj = method.im_self
    cls = method.im_class
    return _unpickle_method, (func_name, obj, cls)

def _unpickle_method(func_name, obj, cls):
    for cls in cls.mro():
        try:
            func = cls.__dict__[func_name]
        except KeyError:
            pass
        else:
            break
    return func.__get__(obj, cls)


class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multi-processing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self.process_obj, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __del__(self):
        print "... Destructor"

    def process_obj(self, index):
        print "object %d" % index
        return "results"

pickle(MethodType, _pickle_method, _unpickle_method)
Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once)

Output:

Constructor ...
object 0
object 1
object 2
... Destructor
object 3
... Destructor
object 4
... Destructor
object 5
... Destructor
object 6
... Destructor
object 7
... Destructor
... Destructor
... Destructor
['results', 'results', 'results', 'results', 'results', 'results', 'results', 'results']
... Destructor

The __call__ method is not so equivalent, because [None,...] are read from the results :

from multiprocessing import Pool, cpu_count
from multiprocessing.pool import ApplyResult

class Myclass(object):

    def __init__(self, nobj, workers=cpu_count()):

        print "Constructor ..."
        # multiprocessing
        pool = Pool(processes=workers)
        async_results = [ pool.apply_async(self, (i,)) for i in range(nobj) ]
        pool.close()
        # waiting for all results
        map(ApplyResult.wait, async_results)
        lst_results=[r.get() for r in async_results]
        print lst_results

    def __call__(self, i):
        self.process_obj(i)

    def __del__(self):
        print "... Destructor"

    def process_obj(self, i):
        print "obj %d" % i
        return "result"

Myclass(nobj=8, workers=3)
# problem !!! the destructor is called nobj times (instead of once), 
# **and** results are empty !

So none of both methods is satisfying...


You get None back because your definition of __call__ is missing the return: it should be return self.process_obj(i).
@Eric I was getting the same error and I tried this solution, however I started getting new error as "cPickle.PicklingError: Can't pickle : attribute lookup builtin.function failed". Do you know what can be a probable reason behind it?
t
torek

There's another short-cut you can use, although it can be inefficient depending on what's in your class instances.

As everyone has said the problem is that the multiprocessing code has to pickle the things that it sends to the sub-processes it has started, and the pickler doesn't do instance-methods.

However, instead of sending the instance-method, you can send the actual class instance, plus the name of the function to call, to an ordinary function that then uses getattr to call the instance-method, thus creating the bound method in the Pool subprocess. This is similar to defining a __call__ method except that you can call more than one member function.

Stealing @EricH.'s code from his answer and annotating it a bit (I retyped it hence all the name changes and such, for some reason this seemed easier than cut-and-paste :-) ) for illustration of all the magic:

import multiprocessing
import os

def call_it(instance, name, args=(), kwargs=None):
    "indirect caller for instance methods and multiprocessing"
    if kwargs is None:
        kwargs = {}
    return getattr(instance, name)(*args, **kwargs)

class Klass(object):
    def __init__(self, nobj, workers=multiprocessing.cpu_count()):
        print "Constructor (in pid=%d)..." % os.getpid()
        self.count = 1
        pool = multiprocessing.Pool(processes = workers)
        async_results = [pool.apply_async(call_it,
            args = (self, 'process_obj', (i,))) for i in range(nobj)]
        pool.close()
        map(multiprocessing.pool.ApplyResult.wait, async_results)
        lst_results = [r.get() for r in async_results]
        print lst_results

    def __del__(self):
        self.count -= 1
        print "... Destructor (in pid=%d) count=%d" % (os.getpid(), self.count)

    def process_obj(self, index):
        print "object %d" % index
        return "results"

Klass(nobj=8, workers=3)

The output shows that, indeed, the constructor is called once (in the original pid) and the destructor is called 9 times (once for each copy made = 2 or 3 times per pool-worker-process as needed, plus once in the original process). This is often OK, as in this case, since the default pickler makes a copy of the entire instance and (semi-) secretly re-populates it—in this case, doing:

obj = object.__new__(Klass)
obj.__dict__.update({'count':1})

—that's why even though the destructor is called eight times in the three worker processes, it counts down from 1 to 0 each time—but of course you can still get into trouble this way. If necessary, you can provide your own __setstate__:

    def __setstate__(self, adict):
        self.count = adict['count']

in this case for instance.


This is by far the best answer for this problem, as it's the easiest to apply to the non-pickle-able default behaviour
C
CDspace

You could also define a __call__() method inside your someClass(), which calls someClass.go() and then pass an instance of someClass() to the pool. This object is pickleable and it works fine (for me)...

class someClass(object):
   def __init__(self):
       pass
   def f(self, x):
       return x*x

   def go(self):
      p = Pool(4)
      sc = p.map(self, range(4))
      print sc

   def __call__(self, x):   
     return self.f(x)

sc = someClass()
sc.go()

m
martineau

The solution from parisjohn above works fine with me. Plus the code looks clean and easy to understand. In my case there are a few functions to call using Pool, so I modified parisjohn's code a bit below. I made __call__ to be able to call several functions, and the function names are passed in the argument dict from go():

from multiprocessing import Pool
class someClass(object):
    def __init__(self):
        pass
    
    def f(self, x):
        return x*x
    
    def g(self, x):
        return x*x+1    

    def go(self):
        p = Pool(4)
        sc = p.map(self, [{"func": "f", "v": 1}, {"func": "g", "v": 2}])
        print sc

    def __call__(self, x):
        if x["func"]=="f":
            return self.f(x["v"])
        if x["func"]=="g":
            return self.g(x["v"])        

sc = someClass()
sc.go()

I had an issue with using a __call__ function like that since someone using the class object might accidentally perform an operation not wanted. However with this, and perhaps an additional check that "func" key exist and that x is dict - makes it a very very nice solution!
m
mhh

In this simple case, where someClass.f is not inheriting any data from the class and not attaching anything to the class, a possible solution would be to separate out f, so it can be pickled:

import multiprocessing


def f(x):
    return x*x


class someClass(object):
    def __init__(self):
        pass

    def go(self):
        pool = multiprocessing.Pool(processes=4)       
        print pool.map(f, range(10))

D
David Parks

A potentially trivial solution to this is to switch to using multiprocessing.dummy. This is a thread based implementation of the multiprocessing interface that doesn't seem to have this problem in Python 2.7. I don't have a lot of experience here, but this quick import change allowed me to call apply_async on a class method.

A few good resources on multiprocessing.dummy:

https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.dummy

http://chriskiehl.com/article/parallelism-in-one-line/


0
0script0

Why not to use separate func?

def func(*args, **kwargs):
    return inst.method(args, kwargs)

print pool.map(func, arr)

A
Alexey Vazhnov

I ran into this same issue but found out that there is a JSON encoder that can be used to move these objects between processes.

from pyVmomi.VmomiSupport import VmomiJSONEncoder

Use this to create your list:

jsonSerialized = json.dumps(pfVmomiObj, cls=VmomiJSONEncoder)

Then in the mapped function, use this to recover the object:

pfVmomiObj = json.loads(jsonSerialized)

r
rachid el kedmiri

Update: as of the day of this writing, namedTuples are pickable (starting with python 2.7)

The issue here is the child processes aren't able to import the class of the object -in this case, the class P-, in the case of a multi-model project the Class P should be importable anywhere the child process get used

a quick workaround is to make it importable by affecting it to globals()

globals()["P"] = P

d
devil in the detail

pathos.multiprocessing worked for me.

It has a pool method and serializes everything unlike multiprocessing

import pathos.multiprocessing as mp
pool = mp.Pool(processes=2)