Você pode fazer isso com fast-csv obtendo os
headers
da definição do esquema que retornará as linhas analisadas como "objetos". Na verdade, você tem algumas incompatibilidades, então eu as marquei com correções:const fs = require('mz/fs');
const csv = require('fast-csv');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String, // <-- You have this as Number but it's a string
networth: Number,
tag: String,
stuff: String, // the empty field in the csv
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
console.log(headers);
await new Promise((resolve,reject) => {
let buffer = [],
counter = 0;
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
.on("error", reject)
.on("data", async doc => {
stream.pause();
buffer.push(doc);
counter++;
log(doc);
try {
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
stream.destroy(e);
}
stream.resume();
})
.on("end", async () => {
try {
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
resolve();
}
} catch(e) {
stream.destroy(e);
}
});
});
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Contanto que o esquema realmente esteja alinhado ao CSV fornecido, tudo bem. Estas são as correções que posso ver, mas se você precisar que os nomes dos campos reais sejam alinhados de maneira diferente, precisará ajustar. Mas havia basicamente um
Number
na posição onde há uma String
e essencialmente um campo extra, que estou presumindo ser o campo em branco no CSV. As coisas gerais são obter a matriz de nomes de campo do esquema e passá-la para as opções ao criar a instância do analisador csv:
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }))
Uma vez que você realmente faz isso, você obtém um "objeto" de volta em vez de uma matriz:
{
"serverid": "9",
"resetid": "1557",
"rank": "358",
"name": "286",
"land": "Mutantville",
"networth": "4368",
"tag": "2358026",
"stuff": "",
"gov": "M",
"gdi": "0",
"protection": "0",
"vacation": "0",
"alive": "1",
"deleted": "0"
}
Não se preocupe com os "tipos" porque o Mongoose irá converter os valores de acordo com o esquema.
O resto acontece dentro do manipulador para os
data
evento. Para máxima eficiência, estamos usando insertMany()
para gravar no banco de dados apenas uma vez a cada 10.000 linhas. Como isso realmente vai para o servidor e os processos depende da versão do MongoDB, mas 10.000 deve ser bastante razoável com base no número médio de campos que você importaria para uma única coleção em termos de "troca" para uso de memória e gravação de um pedido de rede razoável. Diminua o número, se necessário. As partes importantes são marcar essas chamadas como
async
funções e await
o resultado do insertMany()
antes de continuar. Também precisamos pause()
o fluxo e resume()
em cada item, caso contrário, corremos o risco de sobrescrever o buffer
de documentos a serem inseridos antes de serem realmente enviados. A pause()
e resume()
são necessários para colocar "back-pressure" no tubo, caso contrário os itens apenas continuam "saindo" e disparando os data
evento. Naturalmente, o controle para as 10.000 entradas exige que verifiquemos isso em cada iteração e na conclusão do fluxo para esvaziar o buffer e enviar quaisquer documentos restantes ao servidor.
Isso é realmente o que você deseja fazer, pois certamente não deseja disparar uma solicitação assíncrona para o servidor em "cada" iteração por meio de
data
evento ou essencialmente sem esperar que cada solicitação seja concluída. Você escapará sem verificar isso para "arquivos muito pequenos", mas para qualquer carga do mundo real, você certamente excederá a pilha de chamadas devido a chamadas assíncronas "em voo" que ainda não foram concluídas. FYI - um
package.json
usado. O mz
é opcional, pois é apenas uma Promise
modernizada biblioteca habilitada de bibliotecas "embutidas" de nó padrão que eu simplesmente estou acostumado a usar. O código é, obviamente, completamente intercambiável com o fs
módulo. {
"description": "",
"main": "index.js",
"dependencies": {
"fast-csv": "^2.4.1",
"mongoose": "^5.1.1",
"mz": "^2.7.0"
},
"keywords": [],
"author": "",
"license": "ISC"
}
Na verdade, com o Node v8.9.xe acima, podemos tornar isso muito mais simples com uma implementação de
AsyncIterator
através do stream-to-iterator
módulo. Ainda está em Iterator<Promise<T>>
mode, mas isso deve acontecer até que o Node v10.x se torne LTS estável:const fs = require('mz/fs');
const csv = require('fast-csv');
const streamToIterator = require('stream-to-iterator');
const { Schema } = mongoose = require('mongoose');
const uri = 'mongodb://localhost/test';
mongoose.Promise = global.Promise;
mongoose.set('debug', true);
const rankSchema = new Schema({
serverid: Number,
resetid: Number,
rank: Number,
name: String,
land: String,
networth: Number,
tag: String,
stuff: String, // the empty field
gov: String,
gdi: Number,
protection: Number,
vacation: Number,
alive: Number,
deleted: Number
});
const Rank = mongoose.model('Rank', rankSchema);
const log = data => console.log(JSON.stringify(data, undefined, 2));
(async function() {
try {
const conn = await mongoose.connect(uri);
await Promise.all(Object.entries(conn.models).map(([k,m]) => m.remove()));
let headers = Object.keys(Rank.schema.paths)
.filter(k => ['_id','__v'].indexOf(k) === -1);
//console.log(headers);
let stream = fs.createReadStream('input.csv')
.pipe(csv({ headers }));
const iterator = await streamToIterator(stream).init();
let buffer = [],
counter = 0;
for ( let docPromise of iterator ) {
let doc = await docPromise;
buffer.push(doc);
counter++;
if ( counter > 10000 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
}
if ( counter > 0 ) {
await Rank.insertMany(buffer);
buffer = [];
counter = 0;
}
} catch(e) {
console.error(e)
} finally {
process.exit()
}
})()
Basicamente, todo o tratamento de "eventos" de stream, pausa e retomada são substituídos por um simples
for
ciclo:const iterator = await streamToIterator(stream).init();
for ( let docPromise of iterator ) {
let doc = await docPromise;
// ... The things in the loop
}
Fácil! Isso é limpo na implementação posterior do nó com
for..await..of
quando se torna mais estável. Mas o acima funciona bem na versão especificada e acima.