MongoDB
 sql >> Base de Dados >  >> NoSQL >> MongoDB

Sink Kafka Stream para MongoDB usando PySpark Structured Streaming


Encontrei uma solução. Como não consegui encontrar o driver Mongo certo para Streaming Estruturado, trabalhei em outra solução. Agora, uso a conexão direta com o mongoDb e uso "foreach(...)" em vez de foreachbatch(. ..). Meu código se parece com isso no arquivo testSpark.py:
....
import pymongo
from pymongo import MongoClient

local_url = "mongodb://localhost:27017"


def write_machine_df_mongo(target_df):

    cluster = MongoClient(local_url)
    db = cluster["test_db"]
    collection = db.test1

    post = {
            "machine_id": target_df.machine_id,
            "proc_type": target_df.proc_type,
            "sensor1_id": target_df.sensor1_id,
            "sensor2_id": target_df.sensor2_id,
            "time": target_df.time,
            "sensor1_val": target_df.sensor1_val,
            "sensor2_val": target_df.sensor2_val,
            }

    collection.insert_one(post)

machine_df.writeStream\
    .outputMode("append")\
    .foreach(write_machine_df_mongo)\
    .start()