PostgreSQL
 sql >> Base de Dados >  >> RDS >> PostgreSQL

Pendure no script Python usando SQLAlchemy e multiprocessamento


Acredito que o TypeError vem de multiprocessing 's get .

Eu retirei todo o código DB do seu script. Dê uma olhada neste:
import multiprocessing
import sqlalchemy.exc

def do(kwargs):
    i = kwargs['i']
    print i
    raise sqlalchemy.exc.ProgrammingError("", {}, None)
    return i


pool = multiprocessing.Pool(processes=5)               # start 4 worker processes
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append) # evaluate "f(10)" asynchronously

# Use get or wait?
# r.get()
r.wait()

pool.close()
pool.join()
print results

Usando r.wait retorna o resultado esperado, mas usando r.get gera TypeError . Conforme descrito em documentos do python , use r.wait após um map_async .

Editar :eu tenho que alterar minha resposta anterior. Agora acredito que o TypeError vem do SQLAlchemy. Eu alterei meu script para reproduzir o erro.

Editar 2 :Parece que o problema é que multiprocessing.pool não funciona bem se algum trabalhador gerar uma exceção cujo construtor requer um parâmetro (consulte também aqui ).

Eu alterei meu script para destacar isso.
import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

class GoodExc(Exception):
    def __init__(self, a=None):
        '''Optional param in the constructor.'''
        self.a = a

def do(kwargs):
    i = kwargs['i']
    print i
    raise BadExc('a')
    # raise GoodExc('a')
    return i

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

No seu caso, dado que seu código gera uma exceção SQLAlchemy, a única solução que consigo pensar é pegar todas as exceções no do função e re-raise uma normal Exception em vez de. Algo assim:
import multiprocessing

class BadExc(Exception):
    def __init__(self, a):
        '''Non-optional param in the constructor.'''
        self.a = a

def do(kwargs):
    try:
        i = kwargs['i']
        print i
        raise BadExc('a')
        return i
    except Exception as e:
        raise Exception(repr(e))

pool = multiprocessing.Pool(processes=5)
results = []
arglist = []
for i in range(10):
    arglist.append({'i':i})
r = pool.map_async(do, arglist, callback=results.append)
try:
    # set a timeout in order to be able to catch C-c
    r.get(1e100)
except KeyboardInterrupt:
    pass
print results

Editar 3 :então, parece ser um bug com Python , mas as exceções adequadas no SQLAlchemy resolveriam isso:portanto, eu levantei o problema com o SQLAlchemy , também.

Como solução para o problema, acho que a solução no final da Edit 2 faria (envolvendo callbacks em try-except e re-raise).