IT story

Python 멀티 프로세싱 PicklingError : 피클 할 수 없습니다

hot-time 2020. 5. 6. 21:03
반응형

Python 멀티 프로세싱 PicklingError : 피클 할 수 없습니다


더 간단한 예제로 오류를 재현 할 수 없어서 죄송합니다. 코드가 너무 복잡하여 게시 할 수 없습니다. 일반 파이썬 대신 IPython 쉘에서 프로그램을 실행하면 문제가 해결됩니다.

이 문제에 대한 이전 메모를 찾아 보았습니다. 풀을 사용하여 클래스 함수 내에 정의 된 함수를 호출했기 때문에 모두 발생했습니다. 그러나 이것은 나에게 해당되지 않습니다.

Exception in thread Thread-3:
Traceback (most recent call last):
  File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

도움을 주시면 감사하겠습니다.

업데이트 : 피클 링 된 기능은 모듈의 최상위 레벨에서 정의됩니다. 중첩 된 함수가 포함 된 함수를 호출하지만. 즉, f()전화 g()통화 h()중첩 된 기능을 가지고 i(), 나는 부르고있다 pool.apply_async(f). f(), g(), h()모든 상단 레벨에서 정의됩니다. 이 패턴으로 더 간단한 예제를 시도했지만 작동합니다.


다음은 절임 가능한 항목의 목록입니다 . 특히 함수는 모듈의 최상위 수준에서 정의 된 경우에만 선택할 수 있습니다.

이 코드 조각 :

import multiprocessing as mp

class Foo():
    @staticmethod
    def work(self):
        pass

if __name__ == '__main__':   
    pool = mp.Pool()
    foo = Foo()
    pool.apply_async(foo.work)
    pool.close()
    pool.join()

게시 한 것과 거의 동일한 오류가 발생합니다.

Exception in thread Thread-2:
Traceback (most recent call last):
  File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
    self.run()
  File "/usr/lib/python2.7/threading.py", line 505, in run
    self.__target(*self.__args, **self.__kwargs)
  File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
    put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

문제는 pool모두 메소드를 사용하여 mp.SimpleQueue작업을 작업자 프로세스에 전달한다는 것입니다. 통과하는 모든 항목은 선택 가능 mp.SimpleQueue해야하며 foo.work모듈의 최상위 레벨에 정의되어 있지 않으므로 선택 가능하지 않습니다.

최상위 레벨에서 함수를 정의하여 해결할 수 있습니다 foo.work().

def work(foo):
    foo.work()

pool.apply_async(work,args=(foo,))

공지 foo이후, pickable입니다 Foo최고 수준에서 정의된다 foo.__dict__picklable입니다.


내가 사용하는 것 pathos.multiprocesssing대신에, multiprocessing. 사용 pathos.multiprocessing하는 포크입니다 . 파이썬에서 거의 모든 것을 직렬화 할 수 있으므로 병렬로 더 많이 보낼 수 있습니다. 당신은 클래스 메소드를 위해 필요로하는 포크는 또한 여러 인자 기능을 직접 작업 할 수있는 능력을 가지고 있습니다.multiprocessingdilldillpathos

>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
...   def plus(self, x, y): 
...     return x+y
... 
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>> 
>>> class Foo(object):
...   @staticmethod
...   def work(self, x):
...     return x+1
... 
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101

Get pathos (and if you like, dill) here: https://github.com/uqfoundation


As others have said multiprocessing can only transfer Python objects to worker processes which can be pickled. If you cannot reorganize your code as described by unutbu, you can use dills extended pickling/unpickling capabilities for transferring data (especially code data) as I show below.

This solution requires only the installation of dill and no other libraries as pathos:

import os
from multiprocessing import Pool

import dill


def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    return fun(*args)


def apply_async(pool, fun, args):
    payload = dill.dumps((fun, args))
    return pool.apply_async(run_dill_encoded, (payload,))


if __name__ == "__main__":

    pool = Pool(processes=5)

    # asyn execution of lambda
    jobs = []
    for i in range(10):
        job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
        jobs.append(job)

    for job in jobs:
        print job.get()
    print

    # async execution of static method

    class O(object):

        @staticmethod
        def calc():
            return os.getpid()

    jobs = []
    for i in range(10):
        job = apply_async(pool, O.calc, ())
        jobs.append(job)

    for job in jobs:
        print job.get()

I have found that I can also generate exactly that error output on a perfectly working piece of code by attempting to use the profiler on it.

Note that this was on Windows (where the forking is a bit less elegant).

I was running:

python -m profile -o output.pstats <script> 

And found that removing the profiling removed the error and placing the profiling restored it. Was driving me batty too because I knew the code used to work. I was checking to see if something had updated pool.py... then had a sinking feeling and eliminated the profiling and that was it.

Posting here for the archives in case anybody else runs into it.


This solution requires only the installation of dill and no other libraries as pathos

def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
    """
    Unpack dumped function as target function and call it with arguments.

    :param (dumped_function, item, args, kwargs):
        a tuple of dumped function and its arguments
    :return:
        result of target function
    """
    target_function = dill.loads(dumped_function)
    res = target_function(item, *args, **kwargs)
    return res


def pack_function_for_map(target_function, items, *args, **kwargs):
    """
    Pack function and arguments to object that can be sent from one
    multiprocessing.Process to another. The main problem is:
        «multiprocessing.Pool.map*» or «apply*»
        cannot use class methods or closures.
    It solves this problem with «dill».
    It works with target function as argument, dumps it («with dill»)
    and returns dumped function with arguments of target function.
    For more performance we dump only target function itself
    and don't dump its arguments.
    How to use (pseudo-code):

        ~>>> import multiprocessing
        ~>>> images = [...]
        ~>>> pool = multiprocessing.Pool(100500)
        ~>>> features = pool.map(
        ~...     *pack_function_for_map(
        ~...         super(Extractor, self).extract_features,
        ~...         images,
        ~...         type='png'
        ~...         **options,
        ~...     )
        ~... )
        ~>>>

    :param target_function:
        function, that you want to execute like  target_function(item, *args, **kwargs).
    :param items:
        list of items for map
    :param args:
        positional arguments for target_function(item, *args, **kwargs)
    :param kwargs:
        named arguments for target_function(item, *args, **kwargs)
    :return: tuple(function_wrapper, dumped_items)
        It returs a tuple with
            * function wrapper, that unpack and call target function;
            * list of packed target function and its' arguments.
    """
    dumped_function = dill.dumps(target_function)
    dumped_items = [(dumped_function, item, args, kwargs) for item in items]
    return apply_packed_function_for_map, dumped_items

It also works for numpy arrays.


Can't pickle <type 'function'>: attribute lookup __builtin__.function failed

This error will also come if you have any inbuilt function inside the model object that was passed to the async job.

So make sure to check the model objects that are passed doesn't have inbuilt functions. (In our case we were using FieldTracker() function of django-model-utils inside the model to track a certain field). Here is the link to relevant GitHub issue.


Building on @rocksportrocker solution, It would make sense to dill when sending and RECVing the results.

import dill
import itertools
def run_dill_encoded(payload):
    fun, args = dill.loads(payload)
    res = fun(*args)
    res = dill.dumps(res)
    return res

def dill_map_async(pool, fun, args_list,
                   as_tuple=True,
                   **kw):
    if as_tuple:
        args_list = ((x,) for x in args_list)

    it = itertools.izip(
        itertools.cycle([fun]),
        args_list)
    it = itertools.imap(dill.dumps, it)
    return pool.map_async(run_dill_encoded, it, **kw)

if __name__ == '__main__':
    import multiprocessing as mp
    import sys,os
    p = mp.Pool(4)
    res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
                  [lambda x:x+1]*10,)
    res = res.get(timeout=100)
    res = map(dill.loads,res)
    print(res)

참고URL : https://stackoverflow.com/questions/8804830/python-multiprocessing-picklingerror-cant-pickle-type-function

반응형