MongoDB
 sql >> Base de Dados >  >> NoSQL >> MongoDB

Salve um CSV muito grande no mongoDB usando o mongoose


Bem-vindo ao streaming. O que você realmente quer é um "fluxo de eventos" que processe sua entrada "um pedaço de cada vez" e, é claro, idealmente por um delimitador comum, como o caractere "nova linha" que você está usando no momento.

Para coisas realmente eficientes, você pode adicionar o uso do MongoDB "Bulk API" inserções para tornar seu carregamento o mais rápido possível sem consumir toda a memória da máquina ou ciclos de CPU.

Não defendendo, pois existem várias soluções disponíveis, mas aqui está uma lista que utiliza a line- pacote de fluxo de entrada para simplificar a parte do "terminador de linha".

Definições de esquema apenas por "exemplo":
var LineInputStream = require("line-input-stream"),
    fs = require("fs"),
    async = require("async"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;

var entrySchema = new Schema({},{ strict: false })

var Entry = mongoose.model( "Schema", entrySchema );

var stream = LineInputStream(fs.createReadStream("data.txt",{ flags: "r" }));

stream.setDelimiter("\n");

mongoose.connection.on("open",function(err,conn) { 

    // lower level method, needs connection
    var bulk = Entry.collection.initializeOrderedBulkOp();
    var counter = 0;

    stream.on("error",function(err) {
        console.log(err); // or otherwise deal with it
    });

    stream.on("line",function(line) {

        async.series(
            [
                function(callback) {
                    var row = line.split(",");     // split the lines on delimiter
                    var obj = {};             
                    // other manipulation

                    bulk.insert(obj);  // Bulk is okay if you don't need schema
                                       // defaults. Or can just set them.

                    counter++;

                    if ( counter % 1000 == 0 ) {
                        stream.pause();
                        bulk.execute(function(err,result) {
                            if (err) callback(err);
                            // possibly do something with result
                            bulk = Entry.collection.initializeOrderedBulkOp();
                            stream.resume();
                            callback();
                        });
                    } else {
                        callback();
                    }
               }
           ],
           function (err) {
               // each iteration is done
           }
       );

    });

    stream.on("end",function() {

        if ( counter % 1000 != 0 )
            bulk.execute(function(err,result) {
                if (err) throw err;   // or something
                // maybe look at result
            });
    });

});

Portanto, geralmente a interface "stream" "decompõe a entrada" para processar "uma linha de cada vez". Isso impede que você carregue tudo de uma vez.

As partes principais são a "Bulk Operations API" do MongoDB. Isso permite "enfileirar" muitas operações ao mesmo tempo antes de enviar para o servidor. Portanto, neste caso com o uso de um "módulo", as gravações são enviadas apenas por 1000 entradas processadas. Você pode realmente fazer qualquer coisa até o limite de 16 MB BSON, mas mantenha-o gerenciável.

Além das operações sendo processadas em massa, há um "limitador" adicional em vigor do async biblioteca. Não é realmente necessário, mas isso garante que essencialmente não mais do que o "limite de módulo" de documentos estejam em processo a qualquer momento. As "inserções" gerais de lote não têm nenhum custo de E/S além da memória, mas as chamadas "executar" significam que a E/S está sendo processada. Então, esperamos em vez de enfileirar mais coisas.

Certamente existem soluções melhores que você pode encontrar para dados do tipo CSV de "processamento de fluxo", o que parece ser. Mas, em geral, isso fornece os conceitos de como fazer isso de maneira eficiente de memória sem consumir ciclos de CPU também.