MongoDB
 sql >> Base de Dados >  >> NoSQL >> MongoDB

Streaming de dados em tempo real com o MongoDB Change Streams


Recentemente, o MongoDB lançou um novo recurso a partir da versão 3.6, Change Streams. Isso lhe dá acesso instantâneo aos seus dados, o que ajuda você a se manter atualizado com as alterações de dados. No mundo de hoje, todo mundo quer notificações instantâneas em vez de receber depois de algumas horas ou minutos. Para alguns aplicativos, é fundamental enviar notificações em tempo real para todos os usuários inscritos para cada atualização. O MongoDB facilitou muito esse processo ao introduzir esse recurso. Neste artigo, aprenderemos sobre o fluxo de mudança do MongoDB e seus aplicativos com alguns exemplos.

Definindo fluxos de mudança


Os fluxos de mudança nada mais são do que o fluxo em tempo real de quaisquer alterações que ocorram no banco de dados ou na coleção ou mesmo nas implantações. Por exemplo, sempre que qualquer atualização (Insert, Update ou Delete) ocorre em uma coleção específica, o MongoDB aciona um evento de alteração com todos os dados que foram modificados.

Você pode definir fluxos de mudança em qualquer coleção como qualquer outro operador de agregação normal usando o operador $changeStream e o método watch(). Você também pode definir o fluxo de mudança usando o método MongoCollection.watch().

Exemplo
db.myCollection.watch()

Alterar recursos de streams


  • Filtrando Alterações

    Você pode filtrar as alterações para obter notificações de eventos apenas para alguns dados direcionados.

    Exemplo:
    pipeline = [
       {
         $match: { name: "Bob" }
       } ];
    changeStream = collection.watch(pipeline);

    Esse código garantirá que você receba atualizações apenas para registros com nome igual a Bob. Dessa forma, você pode escrever quaisquer pipelines para filtrar os fluxos de mudança.

  • Retomar fluxos de mudança

    Esse recurso garante que não haja perda de dados em caso de falhas. Cada resposta no fluxo contém o token de retomada que pode ser usado para reiniciar o fluxo a partir de um ponto específico. Para algumas falhas de rede frequentes, o driver mongodb tentará restabelecer a conexão com os assinantes usando o token de retomada mais recente. Embora, em caso de falha completa do aplicativo, o token de retomada deva ser mantido pelos clientes para retomar o fluxo.

  • Fluxos de mudança ordenados

    O MongoDB usa um relógio lógico global para ordenar todos os eventos de fluxo de mudança em todas as réplicas e fragmentos de qualquer cluster para que o receptor sempre receba as notificações na mesma ordem em que os comandos foram aplicados no banco de dados.

  • Eventos com documentos completos

    O MongoDB retorna a parte dos documentos correspondentes por padrão. Mas você pode modificar a configuração do fluxo de mudança para receber um documento completo. Para isso, passe { fullDocument:“updateLookup”} para o método watch.
    Exemplo:
    collection = db.collection("myColl")
    changeStream = collection.watch({ fullDocument: “updateLookup”})

  • Durabilidade

    Os fluxos de mudança notificarão apenas os dados confirmados para a maioria das réplicas. Isso garantirá que os eventos sejam gerados por dados de persistência majoritários, garantindo a durabilidade da mensagem.

  • Segurança/Controle de Acesso

    Os fluxos de mudança são muito seguros. Os usuários podem criar fluxos de mudança apenas nas coleções nas quais têm permissões de leitura. Você pode criar fluxos de mudança com base nas funções do usuário.
Vários noves Torne-se um DBA do MongoDB - Trazendo o MongoDB para a produçãoSaiba mais sobre o que você precisa saber para implantar, monitorar, gerenciar e dimensionar o MongoDBBaixe gratuitamente

Exemplo de fluxos de mudança


Neste exemplo, criaremos fluxos de mudança na coleção de ações para ser notificado quando qualquer preço de ação ultrapassar qualquer limite.

  • Configurar o cluster

    Para usar fluxos de mudança, primeiro precisamos criar um conjunto de réplicas. Execute o comando a seguir para criar um conjunto de réplicas de nó único.
    mongod --dbpath ./data --replSet “rs”

  • Insira alguns registros na coleção Ações
    var docs = [
     { ticker: "AAPL", price: 210 },
     { ticker: "AAPL", price: 260 },
     { ticker: "AAPL", price: 245 },
     { ticker: "AAPL", price: 255 },
     { ticker: "AAPL", price: 270 }
    ];
    db.Stocks.insert(docs)

  • Configurar o ambiente do nó e instalar dependências
    mkdir mongo-proj && cd mongo-proj
    npm init -y
    npm install mongodb --save

  • Assine as alterações

    Crie um arquivo index.js e coloque o código a seguir nele.
    const mongo = require("mongodb").MongoClient;
    mongo.connect("mongodb://localhost:27017/?replicaSet=rs0").then(client => {
     console.log("Connected to MongoDB server");
     // Select DB and Collection
     const db = client.db("mydb");
     const collection = db.collection("Stocks");
     pipeline = [
       {
         $match: { "fullDocument.price": { $gte: 250 } }
       }
     ];
     // Define change stream
     const changeStream = collection.watch(pipeline);
     // start listen to changes
     changeStream.on("change", function(event) {
       console.log(JSON.stringify(event));
     });
    });

    Agora execute este arquivo:
    node index.js

  • Insira um novo registro em db para receber uma atualização
    db.Stocks.insert({ ticker: “AAPL”, price: 280 })

    Agora verifique seu console, você receberá uma atualização do MongoDB.
    Exemplo de resposta:
    {
    "_id":{
    "_data":"825C5D51F70000000129295A1004E83608EE8F1B4FBABDCEE73D5BF31FC946645F696400645C5D51F73ACA83479B48DE6E0004"},
    "operationType":"insert",
    "clusterTime":"6655565945622233089",
    "fullDocument":{
    "_id":"5c5d51f73aca83479b48de6e",
    "ticker":"AAPL",
    "Price":300
    },
    "ns":{"db":"mydb","coll":"Stocks"},
    "documentKey":{"_id":"5c5d51f73aca83479b48de6e"}
    }

Aqui você pode alterar o valor do parâmetro operationType com as seguintes operações para ouvir diferentes tipos de alterações em uma coleção:
  • Inserir
  • Substituir (exceto ID exclusivo)
  • Atualizar
  • Excluir
  • Invalidar (sempre que o Mongo retornar um cursor inválido)

Outros modos de fluxos de alterações


Você pode iniciar fluxos de mudança em um banco de dados e implantação da mesma forma que em uma coleção. Esse recurso foi lançado a partir do MongoDB versão 4.0. Aqui estão os comandos para abrir um fluxo de mudança no banco de dados e nas implantações.
Against DB: db.watch()
Against deployment: Mongo.watch()

Conclusão


O MongoDB Change Streams simplifica a integração entre front-end e back-end de maneira contínua e em tempo real. Esse recurso pode ajudá-lo a usar o MongoDB para o modelo pubsub para que você não precise mais gerenciar implantações de Kafka ou RabbitMQ. Se seu aplicativo requer informações em tempo real, você deve verificar esse recurso do MongoDB. Espero que este post faça você começar com os fluxos de mudança do MongoDB.