MongoDB
 sql >> Base de Dados >  >> NoSQL >> MongoDB

MongoDBObject não sendo adicionado dentro de um loop rrd foreach casbah scala apache spark


Os cálculos nos RDDs são distribuídos pelo cluster. Você não pode atualizar uma variável que foi criada fora do fechamento da operação RDD de dentro do RDD. Eles estão basicamente em dois lugares diferentes:A variável é criada no driver Spark e acessada nos workers e deve ser tratada como somente leitura.

O Spark oferece suporte a acumuladores distribuídos que podem ser usados ​​neste caso:Spark Cumulators

Outra opção (a que eu prefiro) é transformar o fluxo de RDD no formato de dados desejado e usar o foreachRDD método para persisti-lo no armazenamento secundário. Esta seria uma maneira mais funcional de abordar o problema. Ficaria mais ou menos assim:
  val filteredStream = twitterStream.filter(entry => filters.exists(term => entry.getText.getStatus.contains(term)))
  val filteredStreamWithTs = filteredStream.map(x => ((DateTime.now.toString(), x)))
  filteredStreamWithTs.foreachRdd(rdd => // write to Mongo)