MongoDB
 sql >> Base de Dados >  >> NoSQL >> MongoDB

Desempenho do MongoDB:executando agregações do MongoDB em secundários


As operações de agregação no MongoDB permitem processar registros de dados, agrupá-los e retornar seus resultados calculados. O MongoDB suporta três tipos de operações de agregação:
  1. Comandos de agregação de propósito único
  2. Reduzir mapa
  3. Funil de agregação

Você pode usar este documento de comparação do MongoDB para ver qual atende às suas necessidades.

Pipeline de agregação


O pipeline de agregação é uma estrutura do MongoDB que fornece agregação de dados por meio de um pipeline de processamento de dados. Ou seja, os documentos são enviados por meio de um pipeline de várias etapas, filtrando, agrupando e transformando os documentos em cada etapa. Ele fornece SQL “GROUP BY ….” tipo de construções para o MongoDB que são executadas no próprio banco de dados. A documentação de agregação fornece exemplos úteis de como criar esses pipelines.

Por que executar agregações no secundário?


Os pipelines de agregação são operações com uso intensivo de recursos – faz sentido descarregar tarefas de agregação para secundários de um conjunto de réplicas do MongoDB quando não há problema operar em dados ligeiramente obsoletos. Isso geralmente é verdade para operações em "lote", pois elas não esperam ser executadas nos dados mais recentes. Se a saída precisar ser gravada em uma coleção, os trabalhos de agregação serão executados apenas no primário, pois somente o primário é gravável no MongoDB.

Neste post, mostraremos como garantir que os pipelines de agregação sejam executados no secundário tanto do shell do mongo quanto do Java.
Execute pipelines de agregação no secundário do Mongo Shell e Java no MongoDBClick To Tweet
Observação:usamos o conjunto de dados de amostra fornecido pelo MongoDB em seu exemplo de agregação de CEPs para mostrar nossos exemplos. Você pode baixá-lo conforme instruído no exemplo.

Pipeline de agregação em conjuntos de réplicas

Shell MongoDB


Configurando a preferência de leitura como secundária faz o truque ao executar um trabalho de agregação do shell do mongo. Vamos tentar buscar todos os estados com população maior que 10 milhões (1ª agregação no exemplo de CEPs). Tanto o shell quanto o servidor estão executando o MongoDB versão 3.2.10.
mongo -u admin -p <pwd> --authenticationDatabase admin --host RS-repl0-0/server-1.servers.example.com:27017,server-2.servers.example.com:27017
RS-repl0-0:PRIMARY> use test
switched to db test
RS-repl0-0:PRIMARY> db.setSlaveOk() // Ok to run commands on a slave
RS-repl0-0:PRIMARY> db.getMongo().setReadPref('secondary') // Set read pref
RS-repl0-0:PRIMARY> db.getMongo().getReadPrefMode()
secondary
RS-repl0-0:PRIMARY> db.zips.aggregate( [
...    { $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
...    { $match: { totalPop: { $gte: 10*1000*1000 } } }
... ] )
{ "_id" : "CA", "totalPop" : 29754890 }
{ "_id" : "FL", "totalPop" : 12686644 }
{ "_id" : "PA", "totalPop" : 11881643 }
{ "_id" : "NY", "totalPop" : 17990402 }
{ "_id" : "OH", "totalPop" : 10846517 }
{ "_id" : "IL", "totalPop" : 11427576 }
{ "_id" : "TX", "totalPop" : 16984601 }

Uma olhada nos logs do MongoDB (com o log habilitado para comandos) no secundário mostra que a agregação realmente foi executada no secundário:
...
2016-12-05T06:20:14.783+0000 I COMMAND  [conn200] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } }, { 
$match: { totalPop: { $gte: 10000000.0 } } } ], cursor: {} } keyUpdates:0 writeConflicts:0 numYields:229 reslen:338 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquire
Count: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_command 49ms
...

Java


No driver Java do MongoDB, definir novamente a preferência de leitura resolve o problema. Aqui está um exemplo usando a versão do driver 3.2.2:
public class AggregationChecker {

    /*
     * Data and code inspired from:
     * https://docs.mongodb.com/v3.2/tutorial/aggregation-zip-code-data-set/#return-states-with-populations-above-10-million
     */
    private static final String MONGO_END_POINT = "mongodb://admin:[email protected]:27017,server-2.servers.example.com:27017/admin?replicaSet=RS-repl0-0";

    private static final String COL_NAME = "zips";
    private static final String DEF_DB = "test";

    public AggregationChecker() {
    }

    public static void main(String[] args) {
        AggregationChecker writer = new AggregationChecker();
        writer.aggregationJob();
    }

    private void aggregationJob() {
        printer("Initializing...");
        Builder options = MongoClientOptions.builder().readPreference(ReadPreference.secondary());
        MongoClientURI uri = new MongoClientURI(MONGO_END_POINT, options);
        MongoClient client = new MongoClient(uri);
        try {
            final DB db = client.getDB(DEF_DB);
            final DBCollection coll = db.getCollection(COL_NAME);
            // Avg city pop by state: https://docs.mongodb.com/manual/tutorial/aggregation-zip-code-data-set/#return-average-city-population-by-state
            Iterable iterable = coll.aggregate(
                    Arrays.asList(
                            new BasicDBObject("$group", new BasicDBObject("_id", new BasicDBObject("state", "$state").append("city", "$city")).append("pop",
                                    new BasicDBObject("$sum", "$pop"))),
                                    new BasicDBObject("$group", new BasicDBObject("_id", "$_id.state").append("avgCityPop", new BasicDBObject("$avg", "$pop"))))).results();

            for (DBObject entry : iterable) {
                printer(entry.toString());
            }
        } finally {
            client.close();
        }
        printer("Done...");
    }
...
}

Logs no secundário:
...
2016-12-01T10:54:18.667+0000 I COMMAND  [conn4113] command test.zips command: aggregate { aggregate: "zipcodes", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } }, { $group: { _id: "$_id.state", avgCityPop: { $avg: "$pop" } } } ] } keyUpdates:0 writeConflicts:0 numYields:229 reslen:2149 locks:{ Global: { acquireCount: { r: 466 } }, Database: { acquireCount: { r: 233 } }, Collection: { acquireCount: { r: 233 } } } protocol:op_query 103ms
...

Nenhuma operação foi registrada no primário.

Pipeline de agregação em clusters fragmentados


Os pipelines de agregação têm suporte em clusters fragmentados. O comportamento detalhado é explicado na documentação. Em termos de implementação, há pouca diferença entre o conjunto de réplicas e o cluster fragmentado ao usar um pipeline de agregação.
Como configurar um pipeline de agregação em clusters fragmentados no MongoDBClick To Tweet

Shell MongoDB


Antes de importar dados para o cluster fragmentado, habilite a fragmentação na coleção.
mongos> sh.enableSharding("test")
mongos> sh.shardCollection("test.zips", { "_id" : "hashed" } )

Depois disso, as operações são as mesmas do conjunto de réplicas:
mongos> db.setSlaveOk()
mongos> db.getMongo().setReadPref('secondary')
mongos> db.getMongo().getReadPrefMode()
secondary
mongos> db.zips.aggregate( [
...    { $group: { _id: "$state", totalPop: { $sum: "$pop" } } },
...    { $match: { totalPop: { $gte: 10*1000*1000 } } }
... ] )
{ "_id" : "TX", "totalPop" : 16984601 }
{ "_id" : "PA", "totalPop" : 11881643 }
{ "_id" : "CA", "totalPop" : 29754890 }
{ "_id" : "FL", "totalPop" : 12686644 }
{ "_id" : "NY", "totalPop" : 17990402 }
{ "_id" : "OH", "totalPop" : 10846517 }
{ "_id" : "IL", "totalPop" : 11427576 }

Logs de um dos secundários:
...
2016-12-02T05:46:24.627+0000 I COMMAND  [conn242] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44258973083 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms
2016-12-02T05:46:24.641+0000 I COMMAND  [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: "$state", totalPop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44258973083 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:51 reslen:1601 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 13ms
...

Java


O mesmo código aplicável no conjunto de réplicas funciona bem com um cluster fragmentado. Basta substituir a cadeia de conexão do conjunto de réplicas pela do cluster fragmentado. Os logs de um secundário indicam que o trabalho foi realmente executado nos secundários:
...
2016-12-02T05:39:12.339+0000 I COMMAND  [conn130] command test.zips command: aggregate { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:44228970872 keyUpdates:0 writeConflicts:0 numYields:0 reslen:115 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 0ms
2016-12-02T05:39:12.371+0000 I COMMAND  [conn131] getmore test.zips query: { aggregate: "zips", pipeline: [ { $group: { _id: { state: "$state", city: "$city" }, pop: { $sum: "$pop" } } } ], fromRouter: true, cursor: { batchSize: 0 } } planSummary: PIPELINE_PROXY cursorid:44228970872 ntoreturn:0 keysExamined:0 docsExamined:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:112 nreturned:12902 reslen:741403 locks:{ Global: { acquireCount: { r: 230 } }, Database: { acquireCount: { r: 115 } }, Collection: { acquireCount: { r: 115 } } } 30ms
...

Este conteúdo foi útil? Informe-nos twittando para nós @scaledgridio e, como sempre, se você tiver alguma dúvida, informe-nos nos comentários abaixo. Óh, e! Não se esqueça de conferir nossos produtos de hospedagem MongoDB que podem economizar até 40% em custos de hospedagem MongoDB® de longo prazo.