EDITAR 27-01-2018:
Acontece que esse problema está relacionado ao DirectRunner. Se você executar o mesmo pipeline usando o DataflowRunner, deverá obter lotes de até 1.000 registros. O DirectRunner sempre cria bundles de tamanho 1 após uma operação de agrupamento.
Resposta original:
Eu me deparei com o mesmo problema ao gravar em bancos de dados em nuvem usando o JdbcIO do Apache Beam. O problema é que, embora o JdbcIO suporte a gravação de até 1.000 registros em um lote, nunca o vi gravar mais de 1 linha por vez (tenho que admitir:isso sempre estava usando o DirectRunner em um ambiente de desenvolvimento).
Portanto, adicionei um recurso ao JdbcIO onde você pode controlar o tamanho dos lotes agrupando seus dados e gravando cada grupo como um lote. Abaixo está um exemplo de como usar esse recurso com base no exemplo original WordCount do Apache Beam.
p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
// Count words in input file(s)
.apply(new CountWords())
// Format as text
.apply(MapElements.via(new FormatAsTextFn()))
// Make key-value pairs with the first letter as the key
.apply(ParDo.of(new FirstLetterAsKey()))
// Group the words by first letter
.apply(GroupByKey.<String, String> create())
// Get a PCollection of only the values, discarding the keys
.apply(ParDo.of(new GetValues()))
// Write the words to the database
.apply(JdbcIO.<String> writeIterable()
.withDataSourceConfiguration(
JdbcIO.DataSourceConfiguration.create(options.getJdbcDriver(), options.getURL()))
.withStatement(INSERT_OR_UPDATE_SQL)
.withPreparedStatementSetter(new WordCountPreparedStatementSetter()));
A diferença com o método de gravação normal do JdbcIO é o novo método
writeIterable()
que recebe um PCollection<Iterable<RowT>>
como entrada em vez de PCollection<RowT>
. Cada Iterable é gravado como um lote no banco de dados. A versão do JdbcIO com esta adição pode ser encontrada aqui:https://github.com/olavloite/beam/blob/JdbcIOIterableWrite/sdks/java/io/jdbc/src/main/java /org/apache/beam/sdk/io/jdbc/JdbcIO.java
Todo o projeto de exemplo contendo o exemplo acima pode ser encontrado aqui:https://github.com/ olavloite/spanner-beam-example
(Há também um pull request pendente no Apache Beam para incluir isso no projeto)