PostgreSQL
 sql >> Base de Dados >  >> RDS >> PostgreSQL

Como configurar um túnel SSH no Google Cloud Dataflow para um servidor de banco de dados externo?


Problema resolvido ! Eu não posso acreditar que passei dois dias inteiros nisso... Eu estava olhando completamente na direção errada.

O problema não era com alguma configuração de rede do Dataflow ou GCP e, pelo que sei...

é verdade.

O problema estava claro no meu código:apenas o problema foi revelado apenas em um ambiente distribuído. Eu cometi o erro de abrir o túnel do processador principal do pipeline, em vez dos trabalhadores. Portanto, o túnel SSH estava ativo, mas não entre os trabalhadores e o servidor de destino, apenas entre o pipeline principal e o destino!

Para corrigir isso, tive que alterar meu DoFn solicitante para envolver a execução da consulta com o túnel:
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""

def __init__(self, *args, **kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args, **kwargs)

def process(self, query, *args, **kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost, self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,
        ssh_username=self.kwargs["ssh_user"],
        ssh_password=self.kwargs["ssh_password"],
        remote_bind_address=remote_address,
        set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args, **self.kwargs)
        sql.SQLSouceInput._build_value(source, source.runtime_params)
        logging.info("Processing - {}".format(query))
        for records, schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row, schema)

como você pode ver, eu tive que substituir alguns bits da biblioteca pysql_beam.

Por fim, cada trabalhador abre seu próprio túnel para cada solicitação. Provavelmente é possível otimizar esse comportamento, mas é suficiente para minhas necessidades.