MongoDB
 sql >> Base de Dados >  >> NoSQL >> MongoDB

Desempenho do MongoDB:executando operações de redução de mapa do MongoDB em secundários


Map-reduce é talvez a mais versátil das operações de agregação que o MongoDB suporta.

Map-Reduce é um modelo de programação popular que se originou no Google para processar e agregar grandes volumes de dados em paralelo. Uma discussão detalhada sobre Map-Reduce está fora do escopo deste artigo, mas essencialmente é um processo de agregação de várias etapas. As duas etapas mais importantes são o estágio de mapeamento (processar cada documento e emitir resultados) e o estágio de redução (coleciona os resultados emitidos durante o estágio de mapeamento).

O MongoDB oferece suporte a três tipos de operações de agregação:Map-Reduce, pipeline de agregação e comandos de agregação de propósito único. Você pode usar este documento de comparação do MongoDB para ver qual atende às suas necessidades. https://scalegrid.io/blog/mongodb-performance-running-mongodb-map-reduce-operations-on-secondaries/

No meu último post, vimos, com exemplos, como executar pipelines de agregação em secundários. Neste post, veremos como executar tarefas Map-Reduce nas réplicas secundárias do MongoDB.

Redução de mapa do MongoDB


O MongoDB suporta a execução de tarefas Map-Reduce nos servidores de banco de dados. Isso oferece a flexibilidade de escrever tarefas de agregação complexas que não são tão fáceis de executar por meio de pipelines de agregação. O MongoDB permite que você escreva funções customizadas de mapeamento e redução em Javascript que podem ser passadas para o banco de dados via shell Mongo ou qualquer outro cliente. Em conjuntos de dados grandes e em constante crescimento, pode-se até considerar a execução de tarefas Map-Reduce incrementais para evitar o processamento de dados mais antigos todas as vezes.

Historicamente, os métodos map e reduce costumavam ser executados em um contexto de thread único. No entanto, essa limitação foi removida na versão 2.4.

Por que executar tarefas Map-Reduce no secundário?


Assim como outros trabalhos de agregação, o Map-Reduce também é um trabalho em "lote" com uso intensivo de recursos, portanto, é uma boa opção para execução em réplicas somente leitura. As ressalvas ao fazê-lo são:

1) Não há problema em usar dados um pouco obsoletos. Ou você pode ajustar a preocupação de gravação para garantir que as réplicas estejam sempre sincronizadas com o primário. Esta segunda opção assume que ter um impacto no desempenho de gravação é aceitável.

2) A saída do trabalho Map-Reduce não deve ser gravada em outra coleção no banco de dados, mas retornada ao aplicativo (ou seja, sem gravações no banco de dados).

Vamos ver como fazer isso por meio de exemplos, tanto do shell mongo quanto do driver Java.

Redução de mapa em conjuntos de réplicas

Conjunto de dados


Para ilustração, usaremos um conjunto de dados bastante simples:um despejo diário de registros de transações de um varejista. Uma entrada de exemplo se parece com:
RS-replica-0:PRIMARY> use test
switched to db test
RS-replica-0:PRIMARY> show tables
txns
RS-replica-0:PRIMARY> db.txns.findOne()
{
    "_id" : ObjectId("584a3b71cdc1cb061957289b"),
    "custid" : "cust_66",
    "txnval" : 100,
    "items" : [{"sku": sku1", "qty": 1, "pr": 100}, ...],
...
}

Em nossos exemplos, calcularemos o gasto total de um determinado cliente naquele dia. Assim, dado nosso esquema, os métodos map e reduce ficarão assim:
var mapFunction = function() { emit(this.custid, this.txnval); } // Emit the custid and txn value from each record
var reduceFunction = function(key, values) { return Array.sum(values); } // Sum all the txn values for a given custid

Com nosso esquema estabelecido, vamos ver o Map-Reduce em ação.

MongoDB Shell


Para garantir que um trabalho Map-Reduce seja executado no secundário, a preferência de leitura deve ser definida como secundário . Como dissemos acima, para que um Map-Reduce seja executado em um secundário, a saída do resultado deve ser inline (Na verdade, esse é o único valor de saída permitido em secundários). Vamos ver como isso funciona.
$ mongo -u admin -p pwd --authenticationDatabase admin --host RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017
MongoDB shell version: 3.2.10
connecting to: RS-replica-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017/test
2016-12-09T08:15:19.347+0000 I NETWORK  [thread1] Starting new replica set monitor for server-1.servers.example.com:27017,server-2.servers.example.com:27017
2016-12-09T08:15:19.349+0000 I NETWORK  [ReplicaSetMonitorWatcher] starting
RS-replica-0:PRIMARY> db.setSlaveOk()
RS-replica-0:PRIMARY> db.getMongo().setReadPref('secondary')
RS-replica-0:PRIMARY> db.getMongo().getReadPrefMode()
secondary
RS-replica-0:PRIMARY> var mapFunc = function() { emit(this.custid, this.txnval); }
RS-replica-0:PRIMARY> var reduceFunc = function(key, values) { return Array.sum(values); }
RS-replica-0:PRIMARY> db.txns.mapReduce(mapFunc, reduceFunc, {out: { inline: 1 }})
{
    "results" : [
        {
            "_id" : "cust_0",
            "value" : 72734
        },
        {
            "_id" : "cust_1",
            "value" : 67737
        },
...
    ]
    "timeMillis" : 215,
    "counts" : {
        "input" : 10000,
        "emit" : 10000,
        "reduce" : 909,
        "output" : 101
    },
    "ok" : 1

}

Uma espiada nos logs no secundário confirma que o trabalho realmente foi executado no secundário.
...
2016-12-09T08:17:24.842+0000 D COMMAND  [conn344] mr ns: test.txns
2016-12-09T08:17:24.843+0000 I COMMAND  [conn344] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms
2016-12-09T08:17:24.865+0000 I COMMAND  [conn344] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms
2016-12-09T08:17:25.063+0000 I COMMAND  [conn344] command test.txns command: mapReduce { mapreduce: "txns", map: function () { emit(this.custid, this.txnval); }, reduce: function (key, values) { return Array.sum(values); }, out: { inline: 1.0 } } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:78 reslen:4233 locks:{ Global: { acquireCount: { r: 366 } }, Database: { acquireCount: { r: 3, R: 180 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_command 220ms
...

Java


Agora vamos tentar executar um trabalho Map-Reduce nas réplicas de leitura de um aplicativo Java. No driver Java do MongoDB, definir a preferência de leitura funciona. A saída é embutida por padrão, portanto, nenhum parâmetro adicional precisa ser passado. Aqui está um exemplo usando a versão do driver 3.2.2:
public class MapReduceExample {

    private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-replica-0";
    private static final String COL_NAME = "txns";
    private static final String DEF_DB = "test";

    public MapReduceExample() { }

    public static void main(String[] args) {
        MapReduceExample writer = new MapReduceExample();
        writer.mapReduce();
    }

    public static final String mapfunction = "function() { emit(this.custid, this.txnval); }";
    public static final String reducefunction = "function(key, values) { return Array.sum(values); }";

    private void mapReduce() {
        printer("Initializing...");
        Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary());
        MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options);
        MongoClient client = new MongoClient(uri);
        MongoDatabase database = client.getDatabase(DEF_DB);
        MongoCollection collection = database.getCollection(COL_NAME);
        MapReduceIterable iterable = collection.mapReduce(mapfunction, reducefunction); // inline by default
        MongoCursor cursor = iterable.iterator();
        while (cursor.hasNext()) {
           Document result = cursor.next();
           printer("Customer: " + result.getString("_id") + ", Total Txn value: " + result.getDouble("value"));
        }
        printer("Done...");
    }
...
}

Como é evidente nos logs, o trabalho foi executado no secundário:
...
2016-12-09T08:32:31.419+0000 D COMMAND  [conn371] mr ns: test.txns
2016-12-09T08:32:31.420+0000 I COMMAND  [conn371] command test.$cmd command: listCollections { listCollections: 1, filter: { name: "txns" }, cursor: {} } keyUpdates:0 writeConflicts:0 numYields:0 reslen:150 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 1, R: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 0ms
2016-12-09T08:32:31.444+0000 I COMMAND  [conn371] query test.system.js planSummary: EOF ntoreturn:0 ntoskip:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:0 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 6 } }, Database: { acquireCount: { r: 2, R: 1 } }, Collection: { acquireCount: { r: 2 } } } 0ms
2016-12-09T08:32:31.890+0000 I COMMAND  [conn371] command test.txns command: mapReduce { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { inline: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true } planSummary: COUNT keyUpdates:0 writeConflicts:0 numYields:156 reslen:4331 locks:{ Global: { acquireCount: { r: 722 } }, Database: { acquireCount: { r: 3, R: 358 } }, Collection: { acquireCount: { r: 3 } } } protocol:op_query 470ms
...

MongoDB Map-Reduce em clusters fragmentados


O MongoDB suporta Map-Reduce em clusters fragmentados, tanto quando uma coleção fragmentada é a entrada quanto quando é a saída de um trabalho Map-Reduce. No entanto, o MongoDB atualmente não suporta a execução de tarefas de redução de mapa em secundários de um cluster fragmentado. Portanto, mesmo que a opção de saída está definido como inline , os trabalhos Map-Reduce sempre serão executados nos primários de um cluster fragmentado. Esse problema está sendo rastreado por meio deste bug do JIRA.

A sintaxe de execução de um trabalho Map-Reduce em um cluster fragmentado é a mesma de um conjunto de réplicas. Portanto, os exemplos fornecidos na seção acima são válidos. Se o exemplo Java acima for executado em um cluster fragmentado, as mensagens de log aparecerão nos primários indicando que o comando foi executado lá.
...
2016-11-24T08:46:30.828+0000 I COMMAND  [conn357] command test.$cmd command: mapreduce.shardedfinish { mapreduce.shardedfinish: { mapreduce: "txns", map: function() { emit(this.custid, this.txnval); }, reduce: function(key, values) { return Array.sum(values); }, out: { in
line: 1 }, query: null, sort: null, finalize: null, scope: null, verbose: true, $queryOptions: { $readPreference: { mode: "secondary" } } }, inputDB: "test", shardedOutputCollection: "tmp.mrs.txns_1479977190_0", shards: { Shard-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 123, timing: { mapTime: 51, emitLoop: 116, reduceTime: 9, mode: "mixed", total: 123 }, counts: { input: 9474, emit: 9474, reduce: 909, output: 101 }, ok: 1.0, $gleS
tats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { result: "tmp.mrs.txns_1479977190_0", timeMillis: 71, timing:
 { mapTime: 8, emitLoop: 63, reduceTime: 4, mode: "mixed", total: 71 }, counts: { input: 1526, emit: 1526, reduce: 197, output: 101 }, ok: 1.0, $gleStats: { lastOpTime: Timestamp 1479977190000|103, electionId: ObjectId('7fffffff0000000000000001') } } }, shardCounts: { Sha
rd-0/primary.shard0.example.com:27017,secondary.shard0.example.com:27017: { input: 9474, emit: 9474, reduce: 909, output: 101 }, Shard-1/primary.shard1.example.com:27017,secondary.shard1.example.com:27017: { inpu
t: 1526, emit: 1526, reduce: 197, output: 101 } }, counts: { emit: 11000, input: 11000, output: 202, reduce: 1106 } } keyUpdates:0 writeConflicts:0 numYields:0 reslen:4368 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acqu
ireCount: { r: 1 } } } protocol:op_command 115ms
2016-11-24T08:46:30.830+0000 I COMMAND  [conn46] CMD: drop test.tmp.mrs.txns_1479977190_0
...

Visite nossa página do produto MongoDB para saber sobre nossa extensa lista de recursos.