Mysql
 sql >> Base de Dados >  >> RDS >> Mysql

Inserção em massa de JdbcIO do Google Dataflow (Apache beam) no banco de dados mysql


EDITAR 27-01-2018:

Acontece que esse problema está relacionado ao DirectRunner. Se você executar o mesmo pipeline usando o DataflowRunner, deverá obter lotes de até 1.000 registros. O DirectRunner sempre cria bundles de tamanho 1 após uma operação de agrupamento.

Resposta original:

Eu me deparei com o mesmo problema ao gravar em bancos de dados em nuvem usando o JdbcIO do Apache Beam. O problema é que, embora o JdbcIO suporte a gravação de até 1.000 registros em um lote, nunca o vi gravar mais de 1 linha por vez (tenho que admitir:isso sempre estava usando o DirectRunner em um ambiente de desenvolvimento).

Portanto, adicionei um recurso ao JdbcIO onde você pode controlar o tamanho dos lotes agrupando seus dados e gravando cada grupo como um lote. Abaixo está um exemplo de como usar esse recurso com base no exemplo original WordCount do Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
    // Count words in input file(s)
    .apply(new CountWords())
    // Format as text
    .apply(MapElements.via(new FormatAsTextFn()))
    // Make key-value pairs with the first letter as the key
    .apply(ParDo.of(new FirstLetterAsKey()))
    // Group the words by first letter
    .apply(GroupByKey.<String, String> create())
    // Get a PCollection of only the values, discarding the keys
    .apply(ParDo.of(new GetValues()))
    // Write the words to the database
    .apply(JdbcIO.<String> writeIterable()
            .withDataSourceConfiguration(
                JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
            .withStatement(INSERT_OR_UPDATE_SQL)
            .withPreparedStatementSetter(new WordCountPreparedStatementSetter()));

A diferença com o método de gravação normal do JdbcIO é o novo método writeIterable() que recebe um PCollection<Iterable<RowT>> como entrada em vez de PCollection<RowT> . Cada Iterable é gravado como um lote no banco de dados.

A versão do JdbcIO com esta adição pode ser encontrada aqui:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java

Todo o projeto de exemplo contendo o exemplo acima pode ser encontrado aqui:https://github.com/ olavloite/spanner-beam-example

(Há também um pull request pendente no Apache Beam para incluir isso no projeto)