Inserções seriais
A maneira mais fácil seria fazer inserções dentro de um
Sink.foreach
. Supondo que você tenha usado a geração de código de esquema e supondo ainda que sua tabela seja chamada de "NumberTable"
//Tables file was auto-generated by the schema code generation
import Tables.{Numbertable, NumbertableRow}
val numberTableDB = Database forConfig "NumberTableConfig"
Podemos escrever uma função que faça a inserção
def insertIntoDb(num : Int) =
numberTableDB run (Numbertable += NumbertableRow(num))
E essa função pode ser colocada no Sink
val insertSink = Sink[Int] foreach insertIntoDb
Source(0 to 100) runWith insertSink
Inserções em lote
Você pode estender ainda mais a metodologia Sink agrupando N inserções por vez:
def batchInsertIntoDb(nums : Seq[Int]) =
numberTableDB run (Numbertable ++= nums.map(NumbertableRow.apply))
val batchInsertSink = Sink[Seq[Int]] foreach batchInsertIntoDb
Este coletor em lote pode ser alimentado por um
Flow
que faz o agrupamento de lotes:val batchSize = 10
Source(0 to 100).via(Flow[Int].grouped(batchSize))
.runWith(batchInsertSink)