Есть ли способ для работников в многопроцессорной работе .Pool apply_async, чтобы поймать ошибки и продолжить?

При использовании multiprocessing.Pool apply_async() что происходит с разрывом кода? Это включает, я думаю, только исключения, но могут быть и другие вещи, из-за которых рабочие функции терпят неудачу.

import multiprocessing as mp
pool = mp.Pool(mp.cpu_count())
for f in files:
 pool.apply_async(workerfunct, args=(*args), callback=callbackfunct)

Как я понимаю сейчас, процесс/работник не работает (все остальные процессы продолжаются), и все, что прошло после сброшенной ошибки, не выполняется, EVEN, если я поймаю ошибку с помощью try/except.

В качестве примера, как правило, я хотел бы исключить ошибки и поставить значение по умолчанию и/или распечатать сообщение об ошибке, и код будет продолжен. Если моя функция обратного вызова включает запись в файл, это делается со значениями по умолчанию.

Этот ответчик немного написал об этом:

Я подозреваю, что причина, по которой вы не видите, что что-то случится с вашим примером кода, связана с тем, что все вызовы вашей рабочей функции терпят неудачу. Если рабочая функция не работает, обратный вызов никогда не будет выполнен. Сбой вообще не сообщается, если вы не попытаетесь извлечь результат из объекта AsyncResult, возвращаемого вызовом apply_async. Однако, поскольку вы не сохраняете ни один из этих объектов, вы никогда не узнаете, что произошли сбои. Если бы я был вами, я бы попытался использовать pool.apply во время тестирования, чтобы вы видели ошибки, как только они появятся.

1 ответ

Если вы используете Python 3.2+, вы можете использовать error_callback аргумент ключевого слова для обработки исключений, возникающих у работников.

pool.apply_async(workerfunct, args=(*args), callback=callbackfunct, error_callback=handle_error)

handle_error будет вызываться с объектом исключения в качестве аргумента.

Если вы этого не сделаете, вы должны обернуть все свои рабочие функции в try/except, чтобы обеспечить выполнение callback. (Я думаю, у вас сложилось впечатление, что это не будет работать из моего ответа в этом другом вопросе, но это не так. Извините!):

def workerfunct(*args):
 try:
 # Stuff
 except Exception as e:
 # Do something here, maybe return e?
pool.apply_async(workerfunct, args=(*args), callback=callbackfunct)

Вы также можете использовать функцию обертки, если вы не можете/не хотите изменять функцию, которую вы действительно хотите вызвать:

def wrapper(func, *args):
 try:
 return func(*args)
 except Exception as e:
 return e
pool.apply_async(wrapper, args=(workerfunct, *args), callback=callbackfunct)

licensed under cc by-sa 3.0 with attribution.