Isso ainda não foi publicado, mas no branch master da Alpakka,
MongoSource.apply
recebe um parâmetro de tipo:object MongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Portanto, com a próxima versão 0.18 do Alpakka, você poderá fazer o seguinte:
val source: Source[TodoMongo, NotUsed] = MongoSource[TodoMongo](todoCollection.find())
Observe que
source
aqui assume que todoCollection.find()
retorna um Observable[TodoMongo]
; ajuste os tipos conforme necessário. Enquanto isso, você pode simplesmente adicionar o código acima manualmente. Por exemplo:
package akka.stream.alpakka.mongodb.scaladsl
import akka.NotUsed
import akka.stream.alpakka.mongodb.ObservableToPublisher
import akka.stream.scaladsl.Source
import org.mongodb.scala.Observable
object MyMongoSource {
def apply[T](query: Observable[T]): Source[T, NotUsed] =
Source.fromPublisher(ObservableToPublisher(query))
}
Observe que
MyMongoSource
é definido para residir no akka.stream.alpakka.mongodb.scaladsl
pacote (como MongoSource
), porque ObservableToPublisher
é uma classe privada de pacote. Você usaria MyMongoSource
da mesma forma que você usaria MongoSource
:val source: Source[TodoMongo, NotUsed] = MyMongoSource[TodoMongo](todoCollection.find())