MongoDB
 sql >> Base de Dados >  >> NoSQL >> MongoDB

Importar CSV usando o esquema do Mongoose


Você pode fazer isso com fast-csv obtendo os headers da definição do esquema que retornará as linhas analisadas como "objetos". Na verdade, você tem algumas incompatibilidades, então eu as marquei com correções:
const fs = require('mz/fs');
const csv = require('fast-csv');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,         // <-- You have this as Number but it's a string
  networth: Number,
  tag: String,
  stuff: String,        // the empty field in the csv
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    console.log(headers);

    await new Promise((resolve,reject) => {

      let buffer = [],
          counter = 0;

      let stream = fs.createReadStream('input.csv')
        .pipe(csv({ headers }))
        .on("error", reject)
        .on("data", async doc => {
          stream.pause();
          buffer.push(doc);
          counter++;
          log(doc);
          try {
            if ( counter > 10000 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
            }
          } catch(e) {
            stream.destroy(e);
          }

          stream.resume();

        })
        .on("end", async () => {
          try {
            if ( counter > 0 ) {
              await Rank.insertMany(buffer);
              buffer = [];
              counter = 0;
              resolve();
            }
          } catch(e) {
            stream.destroy(e);
          }
        });

    });


  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }


})()

Contanto que o esquema realmente esteja alinhado ao CSV fornecido, tudo bem. Estas são as correções que posso ver, mas se você precisar que os nomes dos campos reais sejam alinhados de maneira diferente, precisará ajustar. Mas havia basicamente um Number na posição onde há uma String e essencialmente um campo extra, que estou presumindo ser o campo em branco no CSV.

As coisas gerais são obter a matriz de nomes de campo do esquema e passá-la para as opções ao criar a instância do analisador csv:
let headers = Object.keys(Rank.schema.paths)
  .filter(k => ['_id','__v'].indexOf(k) === -1);

let stream = fs.createReadStream('input.csv')
  .pipe(csv({ headers }))

Uma vez que você realmente faz isso, você obtém um "objeto" de volta em vez de uma matriz:
{
  "serverid": "9",
  "resetid": "1557",
  "rank": "358",
  "name": "286",
  "land": "Mutantville",
  "networth": "4368",
  "tag": "2358026",
  "stuff": "",
  "gov": "M",
  "gdi": "0",
  "protection": "0",
  "vacation": "0",
  "alive": "1",
  "deleted": "0"
}

Não se preocupe com os "tipos" porque o Mongoose irá converter os valores de acordo com o esquema.

O resto acontece dentro do manipulador para os data evento. Para máxima eficiência, estamos usando insertMany() para gravar no banco de dados apenas uma vez a cada 10.000 linhas. Como isso realmente vai para o servidor e os processos depende da versão do MongoDB, mas 10.000 deve ser bastante razoável com base no número médio de campos que você importaria para uma única coleção em termos de "troca" para uso de memória e gravação de um pedido de rede razoável. Diminua o número, se necessário.

As partes importantes são marcar essas chamadas como async funções e await o resultado do insertMany() antes de continuar. Também precisamos pause() o fluxo e resume() em cada item, caso contrário, corremos o risco de sobrescrever o buffer de documentos a serem inseridos antes de serem realmente enviados. A pause() e resume() são necessários para colocar "back-pressure" no tubo, caso contrário os itens apenas continuam "saindo" e disparando os data evento.

Naturalmente, o controle para as 10.000 entradas exige que verifiquemos isso em cada iteração e na conclusão do fluxo para esvaziar o buffer e enviar quaisquer documentos restantes ao servidor.

Isso é realmente o que você deseja fazer, pois certamente não deseja disparar uma solicitação assíncrona para o servidor em "cada" iteração por meio de data evento ou essencialmente sem esperar que cada solicitação seja concluída. Você escapará sem verificar isso para "arquivos muito pequenos", mas para qualquer carga do mundo real, você certamente excederá a pilha de chamadas devido a chamadas assíncronas "em voo" que ainda não foram concluídas.

FYI - um package.json usado. O mz é opcional, pois é apenas uma Promise modernizada biblioteca habilitada de bibliotecas "embutidas" de nó padrão que eu simplesmente estou acostumado a usar. O código é, obviamente, completamente intercambiável com o fs módulo.
{
  "description": "",
  "main": "index.js",
  "dependencies": {
    "fast-csv": "^2.4.1",
    "mongoose": "^5.1.1",
    "mz": "^2.7.0"
  },
  "keywords": [],
  "author": "",
  "license": "ISC"
}

Na verdade, com o Node v8.9.xe acima, podemos tornar isso muito mais simples com uma implementação de AsyncIterator através do stream-to-iterator módulo. Ainda está em Iterator<Promise<T>> mode, mas isso deve acontecer até que o Node v10.x se torne LTS estável:
const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');

const { Schema } = mongoose = require('mongoose');

const uri = 'mongodb://localhost/test';

mongoose.Promise = global.Promise;
mongoose.set('debug', true);

const rankSchema = new Schema({
  serverid: Number,
  resetid: Number,
  rank: Number,
  name: String,
  land: String,
  networth: Number,
  tag: String,
  stuff: String,        // the empty field
  gov: String,
  gdi: Number,
  protection: Number,
  vacation: Number,
  alive: Number,
  deleted: Number
});

const Rank = mongoose.model('Rank', rankSchema);

const log = data => console.log(JSON.stringify(data, undefined, 2));

(async function() {

  try {
    const conn = await mongoose.connect(uri);

    await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));

    let headers = Object.keys(Rank.schema.paths)
      .filter(k => ['_id','__v'].indexOf(k) === -1);

    //console.log(headers);

    let stream = fs.createReadStream('input.csv')
      .pipe(csv({ headers }));

    const iterator = await streamToIterator(stream).init();

    let buffer = [],
        counter = 0;

    for ( let docPromise of iterator ) {
      let doc = await docPromise;
      buffer.push(doc);
      counter++;

      if ( counter > 10000 ) {
        await Rank.insertMany(buffer);
        buffer = [];
        counter = 0;
      }
    }

    if ( counter > 0 ) {
      await Rank.insertMany(buffer);
      buffer = [];
      counter = 0;
    }

  } catch(e) {
    console.error(e)
  } finally {
    process.exit()
  }

})()

Basicamente, todo o tratamento de "eventos" de stream, pausa e retomada são substituídos por um simples for ciclo:
const iterator = await streamToIterator(stream).init();

for ( let docPromise of iterator ) {
  let doc = await docPromise;
  // ... The things in the loop
}

Fácil! Isso é limpo na implementação posterior do nó com for..await..of quando se torna mais estável. Mas o acima funciona bem na versão especificada e acima.