Acredito que o
TypeError
vem de multiprocessing
's get
. Eu retirei todo o código DB do seu script. Dê uma olhada neste:
import multiprocessing
import sqlalchemy.exc
def do(kwargs):
i = kwargs['i']
print i
raise sqlalchemy.exc.ProgrammingError("", {}, None)
return i
pool = multiprocessing.Pool(processes=5) # start 4 worker processes
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously
# Use get or wait?
# r.get()
r.wait()
pool.close()
pool.join()
print results
Usando
r.wait
retorna o resultado esperado, mas usando r.get
gera TypeError
. Conforme descrito em documentos do python
, use r.wait
após um map_async
. Editar :eu tenho que alterar minha resposta anterior. Agora acredito que o
TypeError
vem do SQLAlchemy. Eu alterei meu script para reproduzir o erro. Editar 2 :Parece que o problema é que
multiprocessing.pool
não funciona bem se algum trabalhador gerar uma exceção cujo construtor requer um parâmetro (consulte também aqui
). Eu alterei meu script para destacar isso.
import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
class GoodExc(Exception):
def __init__(self, a=None):
'''Optional param in the constructor.'''
self.a = a
def do(kwargs):
i = kwargs['i']
print i
raise BadExc('a')
# raise GoodExc('a')
return i
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
No seu caso, dado que seu código gera uma exceção SQLAlchemy, a única solução que consigo pensar é pegar todas as exceções no
do
função e re-raise uma normal Exception
em vez de. Algo assim:import multiprocessing
class BadExc(Exception):
def __init__(self, a):
'''Non-optional param in the constructor.'''
self.a = a
def do(kwargs):
try:
i = kwargs['i']
print i
raise BadExc('a')
return i
except Exception as e:
raise Exception(repr(e))
pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
# set a timeout in order to be able to catch C-c
r.get(1e100)
except KeyboardInterrupt:
pass
print results
Editar 3 :então, parece ser um bug com Python , mas as exceções adequadas no SQLAlchemy resolveriam isso:portanto, eu levantei o problema com o SQLAlchemy , também.
Como solução para o problema, acho que a solução no final da Edit 2 faria (envolvendo callbacks em try-except e re-raise).