Python 멀티 프로세싱 PicklingError : 피클 할 수 없습니다
더 간단한 예제로 오류를 재현 할 수 없어서 죄송합니다. 코드가 너무 복잡하여 게시 할 수 없습니다. 일반 파이썬 대신 IPython 쉘에서 프로그램을 실행하면 문제가 해결됩니다.
이 문제에 대한 이전 메모를 찾아 보았습니다. 풀을 사용하여 클래스 함수 내에 정의 된 함수를 호출했기 때문에 모두 발생했습니다. 그러나 이것은 나에게 해당되지 않습니다.
Exception in thread Thread-3:
Traceback (most recent call last):
File "/usr/lib64/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib64/python2.7/multiprocessing/pool.py", line 313, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
도움을 주시면 감사하겠습니다.
업데이트 : 피클 링 된 기능은 모듈의 최상위 레벨에서 정의됩니다. 중첩 된 함수가 포함 된 함수를 호출하지만. 즉, f()
전화 g()
통화 h()
중첩 된 기능을 가지고 i()
, 나는 부르고있다 pool.apply_async(f)
. f()
, g()
, h()
모든 상단 레벨에서 정의됩니다. 이 패턴으로 더 간단한 예제를 시도했지만 작동합니다.
다음은 절임 가능한 항목의 목록입니다 . 특히 함수는 모듈의 최상위 수준에서 정의 된 경우에만 선택할 수 있습니다.
이 코드 조각 :
import multiprocessing as mp
class Foo():
@staticmethod
def work(self):
pass
if __name__ == '__main__':
pool = mp.Pool()
foo = Foo()
pool.apply_async(foo.work)
pool.close()
pool.join()
게시 한 것과 거의 동일한 오류가 발생합니다.
Exception in thread Thread-2:
Traceback (most recent call last):
File "/usr/lib/python2.7/threading.py", line 552, in __bootstrap_inner
self.run()
File "/usr/lib/python2.7/threading.py", line 505, in run
self.__target(*self.__args, **self.__kwargs)
File "/usr/lib/python2.7/multiprocessing/pool.py", line 315, in _handle_tasks
put(task)
PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
문제는 pool
모두 메소드를 사용하여 mp.SimpleQueue
작업을 작업자 프로세스에 전달한다는 것입니다. 통과하는 모든 항목은 선택 가능 mp.SimpleQueue
해야하며 foo.work
모듈의 최상위 레벨에 정의되어 있지 않으므로 선택 가능하지 않습니다.
최상위 레벨에서 함수를 정의하여 해결할 수 있습니다 foo.work()
.
def work(foo):
foo.work()
pool.apply_async(work,args=(foo,))
공지 foo
이후, pickable입니다 Foo
최고 수준에서 정의된다 foo.__dict__
picklable입니다.
내가 사용하는 것 pathos.multiprocesssing
대신에, multiprocessing
. 사용 pathos.multiprocessing
하는 포크입니다 . 파이썬에서 거의 모든 것을 직렬화 할 수 있으므로 병렬로 더 많이 보낼 수 있습니다. 당신은 클래스 메소드를 위해 필요로하는 포크는 또한 여러 인자 기능을 직접 작업 할 수있는 능력을 가지고 있습니다.multiprocessing
dill
dill
pathos
>>> from pathos.multiprocessing import ProcessingPool as Pool
>>> p = Pool(4)
>>> class Test(object):
... def plus(self, x, y):
... return x+y
...
>>> t = Test()
>>> p.map(t.plus, x, y)
[4, 6, 8, 10]
>>>
>>> class Foo(object):
... @staticmethod
... def work(self, x):
... return x+1
...
>>> f = Foo()
>>> p.apipe(f.work, f, 100)
<processing.pool.ApplyResult object at 0x10504f8d0>
>>> res = _
>>> res.get()
101
Get pathos
(and if you like, dill
) here: https://github.com/uqfoundation
As others have said multiprocessing
can only transfer Python objects to worker processes which can be pickled. If you cannot reorganize your code as described by unutbu, you can use dill
s extended pickling/unpickling capabilities for transferring data (especially code data) as I show below.
This solution requires only the installation of dill
and no other libraries as pathos
:
import os
from multiprocessing import Pool
import dill
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
return fun(*args)
def apply_async(pool, fun, args):
payload = dill.dumps((fun, args))
return pool.apply_async(run_dill_encoded, (payload,))
if __name__ == "__main__":
pool = Pool(processes=5)
# asyn execution of lambda
jobs = []
for i in range(10):
job = apply_async(pool, lambda a, b: (a, b, a * b), (i, i + 1))
jobs.append(job)
for job in jobs:
print job.get()
print
# async execution of static method
class O(object):
@staticmethod
def calc():
return os.getpid()
jobs = []
for i in range(10):
job = apply_async(pool, O.calc, ())
jobs.append(job)
for job in jobs:
print job.get()
I have found that I can also generate exactly that error output on a perfectly working piece of code by attempting to use the profiler on it.
Note that this was on Windows (where the forking is a bit less elegant).
I was running:
python -m profile -o output.pstats <script>
And found that removing the profiling removed the error and placing the profiling restored it. Was driving me batty too because I knew the code used to work. I was checking to see if something had updated pool.py... then had a sinking feeling and eliminated the profiling and that was it.
Posting here for the archives in case anybody else runs into it.
This solution requires only the installation of dill and no other libraries as pathos
def apply_packed_function_for_map((dumped_function, item, args, kwargs),):
"""
Unpack dumped function as target function and call it with arguments.
:param (dumped_function, item, args, kwargs):
a tuple of dumped function and its arguments
:return:
result of target function
"""
target_function = dill.loads(dumped_function)
res = target_function(item, *args, **kwargs)
return res
def pack_function_for_map(target_function, items, *args, **kwargs):
"""
Pack function and arguments to object that can be sent from one
multiprocessing.Process to another. The main problem is:
«multiprocessing.Pool.map*» or «apply*»
cannot use class methods or closures.
It solves this problem with «dill».
It works with target function as argument, dumps it («with dill»)
and returns dumped function with arguments of target function.
For more performance we dump only target function itself
and don't dump its arguments.
How to use (pseudo-code):
~>>> import multiprocessing
~>>> images = [...]
~>>> pool = multiprocessing.Pool(100500)
~>>> features = pool.map(
~... *pack_function_for_map(
~... super(Extractor, self).extract_features,
~... images,
~... type='png'
~... **options,
~... )
~... )
~>>>
:param target_function:
function, that you want to execute like target_function(item, *args, **kwargs).
:param items:
list of items for map
:param args:
positional arguments for target_function(item, *args, **kwargs)
:param kwargs:
named arguments for target_function(item, *args, **kwargs)
:return: tuple(function_wrapper, dumped_items)
It returs a tuple with
* function wrapper, that unpack and call target function;
* list of packed target function and its' arguments.
"""
dumped_function = dill.dumps(target_function)
dumped_items = [(dumped_function, item, args, kwargs) for item in items]
return apply_packed_function_for_map, dumped_items
It also works for numpy arrays.
Can't pickle <type 'function'>: attribute lookup __builtin__.function failed
This error will also come if you have any inbuilt function inside the model object that was passed to the async job.
So make sure to check the model objects that are passed doesn't have inbuilt functions. (In our case we were using FieldTracker()
function of django-model-utils inside the model to track a certain field). Here is the link to relevant GitHub issue.
Building on @rocksportrocker solution, It would make sense to dill when sending and RECVing the results.
import dill
import itertools
def run_dill_encoded(payload):
fun, args = dill.loads(payload)
res = fun(*args)
res = dill.dumps(res)
return res
def dill_map_async(pool, fun, args_list,
as_tuple=True,
**kw):
if as_tuple:
args_list = ((x,) for x in args_list)
it = itertools.izip(
itertools.cycle([fun]),
args_list)
it = itertools.imap(dill.dumps, it)
return pool.map_async(run_dill_encoded, it, **kw)
if __name__ == '__main__':
import multiprocessing as mp
import sys,os
p = mp.Pool(4)
res = dill_map_async(p, lambda x:[sys.stdout.write('%s\n'%os.getpid()),x][-1],
[lambda x:x+1]*10,)
res = res.get(timeout=100)
res = map(dill.loads,res)
print(res)
'IT story' 카테고리의 다른 글
CTOR의 의미는 무엇입니까? (0) | 2020.05.06 |
---|---|
matplotlib을 사용하여 이미지를 회색조로 표시 (0) | 2020.05.06 |
HTML 속성의 길이에 제한이 있습니까? (0) | 2020.05.06 |
라이브러리가로드되지 않음 : mysql2 gem으로 OS X 10.6에서 'rails server'를 실행하려고 할 때 libmysqlclient.16.dylib 오류 (0) | 2020.05.05 |
일부 개발자에게는 왜 좋은 UI 디자인이 그렇게 어려운가요? (0) | 2020.05.05 |