HBase
 sql >> Base de Dados >  >> NoSQL >> HBase

Sincronização de dados de clusters HBase com ferramenta HashTable/SyncTable


A replicação (abordada neste artigo anterior do blog) foi lançada há algum tempo e está entre os recursos mais usados ​​do Apache HBase. Ter clusters replicando dados com diferentes pares é uma implantação muito comum, seja como uma estratégia de DR ou simplesmente como uma maneira perfeita de replicar dados entre ambientes de produção/preparação/desenvolvimento. Embora seja uma maneira eficiente de manter diferentes bancos de dados HBase sincronizados em uma latência de menos de um segundo, a replicação só opera sobre os dados ingeridos após o recurso ter sido habilitado. Isso significa que quaisquer dados pré-existentes em todos os clusters envolvidos na implantação de replicação ainda precisarão ser copiados entre os pares de alguma outra forma. Existem algumas ferramentas que podem ser usadas para sincronizar dados pré-existentes em diferentes clusters de mesmo nível. Snapshots, BulkLoad, CopyTable são exemplos bem conhecidos de tais ferramentas abordadas em postagens anteriores do blog Cloudera. Neste artigo, abordaremos HashTable/SyncTable, detalhando algumas de suas lógicas de implementação interna, os prós e contras de usá-lo e como ele se compara a algumas das outras técnicas de cópia de dados mencionadas acima.

HashTable/SyncTable em poucas palavras


HashTable/SyncTable é uma ferramenta implementada como dois trabalhos de redução de mapa que são executados como etapas individuais. Parece semelhante ao CopyTable ferramenta, que pode executar cópias de dados de tabela parciais ou inteiras. Ao contrário de CopyTable ele apenas copia dados divergentes entre clusters de destino, economizando recursos de rede e de computação durante o procedimento de cópia.

A primeira etapa a ser executada no processo é a HashTable trabalho de redução de mapa. Isso deve ser executado no cluster cujos dados devem ser copiados para o peer remoto, normalmente o cluster de origem. Um exemplo rápido de como executá-lo é mostrado abaixo, uma explicação detalhada de cada um dos parâmetros necessários é fornecida mais adiante neste artigo:
hbase org.apache.hadoop.hbase.mapreduce.HashTable --families=cf my-table /hashes/test-tbl…20/04/28 05:05:48 INFO mapreduce.Job:  map 100% reduce 100 %20/04/28 05:05:49 INFO mapreduce.Job:Job job_1587986840019_0001 concluído com sucesso20/04/28 05:05:49 INFO mapreduce.Job:Counters:68…File Input Format Counters Bytes Read=0File Output Format Counters Bytes Escrito=6811788

Uma vez que a HashTable a execução do trabalho com o comando acima foi concluída, alguns arquivos de saída foram gerados no hdfs de origem /hashes/my-table diretório:
hdfs dfs -ls -R /hashes/test-tbldrwxr-xr-x   - supergrupo raiz          0 2020-04-28 05:05 /hashes/test-tbl/hashes-rw-r--r--   2 raiz supergrupo          0 28-04-2020 05:05 /hashes/test-tbl/hashes/_SUCCESSdrwxr-xr-x   - supergrupo raiz          0 28-04-2020 05:05 /hashes/test-tbl/hashes/part-r-00000 -rw-r--r--   2 supergrupo raiz    6790909 28-04-2020 05:05 /hashes/test-tbl/hashes/part-r-00000/data-rw-r--r--   2 supergrupo raiz      20879 2020-04-28 05:05 /hashes/test-tbl/hashes/part-r-00000/index-rw-r--r--   2 supergrupo raiz         99 2020-04-28 05:04 /hashes/test- tbl/manifest-rw-r--r--   2 supergrupo raiz        153 ​​2020-04-28 05:04 /hashes/test-tbl/partitions

Eles são necessários como uma entrada para o SyncTable correr. SyncTable deve ser lançado no peer de destino. O comando abaixo executa SyncTable para a saída de HashTable do exemplo anterior. Ele usa o dryrun opção explicada mais adiante neste artigo:
hbase org.apache.hadoop.hbase.mapreduce.SyncTable --dryrun --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase hdfs://source- cluster-active-nn/hashes/test-tbl test-tbl test-tbl…org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$CounterBATCHES=97148HASHES_MATCHED=97146HASHES_NOT_MATCHED=2MATCHINGCELLS=17MATCHINGROWS=2RANGESNOTISSINGCELLS=1ROTARWSWISSDIFFSELLS=2SOURCEMSITISSELLSELLS=1 1

Como referência rápida, você pode substituir os parâmetros fornecidos em ambos os exemplos pelos valores reais do ambiente. O restante deste artigo abordará os detalhes de implementação com mais profundidade.

Por que duas etapas diferentes?


O principal objetivo desta ferramenta é identificar e copiar apenas os dados que estão faltando entre os dois clusters. Tabela de Hash funciona como um trabalho de fragmentação/indexação, analisando lotes de dados da tabela e gerando índices de hash para cada um desses lotes. Estes são os resultados escritos nos arquivos em /hashes/my-table hdfs passado como um dos parâmetros de trabalho. Como mencionado anteriormente, essa saída é exigida pelo SyncTable trabalho. SyncTable verifica a tabela de destino localmente nos mesmos tamanhos de lote usados ​​por HashTable, e também calcula valores de hash para esses lotes usando a mesma função usada por HashTable. Em seguida, ele compara o hash de lote local valor com o da HashTable saída. Se os valores de hash forem iguais, significa que todo o lote é idêntico nos dois clusters e nada precisa ser copiado nesse segmento. Caso contrário, ele abre uma varredura para o lote no cluster de origem, verificando se cada uma das células já existe no cluster de destino, copiando apenas as que divergem. Em conjuntos de dados esparsos e ligeiramente diferentes, isso resultaria em muito menos dados sendo copiados entre os dois clusters. Também exigiria que apenas um pequeno número de células fosse verificado na origem para verificar incompatibilidades.

Parâmetros obrigatórios


Tabela de Hash requer apenas dois parâmetros:o nome da tabela e o caminho de saída onde os hashes relacionados e outros arquivos de meta-informações serão gravados. SyncTable usa HashTable output dir como entrada, juntamente com os nomes das tabelas na origem e no cluster de destino, respectivamente. Como estamos usando HashTable /SyncTable para mover dados entre clusters remotos, sourcezkcluster a opção deve ser definida para SyncTable . Este deve ser o endereço de quorum do zookeeper do cluster de origem. Neste exemplo de artigo, também referenciamos diretamente o endereço do namenode ativo do cluster de origem, para que SyncTable lerá o arquivo de saída de hash diretamente do cluster de origem. Como alternativa, HashTable a saída pode ser copiada manualmente do cluster de origem para o cluster remoto (com distcp, por exemplo).
OBSERVAÇÃO:Trabalhar com clusters remotos em diferentes domínios kerberos só é compatível a partir do CDH 6.2.1.

Opções avançadas


Ambos HashTable e SyncTable oferecem opções opcionais extras que podem ser ajustadas para resultados ideais.

Tabela de hash permite filtrar os dados por chave de linha e hora de modificação, com startrow/starttime, stoprow/stoptime propriedades, respectivamente. O escopo do conjunto de dados também pode ser limitado por versões e famílias propriedades. O tamanho do lote A propriedade define o tamanho de cada parte que será hash. Isso impacta diretamente no desempenho da sincronização. Em casos de poucas incompatibilidades, definir valores de tamanho de lote maiores pode levar a um melhor desempenho, pois partes maiores do conjunto de dados seriam ignoradas sem a necessidade de verificação pelo SyncTable.

SyncTable fornece uma execução a seco opção que permite uma visualização das alterações a serem aplicadas no destino.

SyncTable o comportamento padrão é espelhar os dados de origem no lado do destino, portanto, qualquer célula adicional presente no destino, mas ausente na origem, acaba sendo excluída no lado do destino. Isso pode ser indesejável ao sincronizar clusters em uma configuração de replicação ativa-ativa e, nesses casos, doDeletes as opções podem ser alteradas para false, ignorando a replicação de exclusões no destino. Há também um doPuts semelhante sinalizador para casos em que células adicionais não devem ser inseridas no cluster de destino.

Analisando as saídas


Tabela de hash gera alguns arquivos com meta-informações para SyncTable, aqueles, no entanto, não são legíveis por humanos. Ele não realiza nenhuma alteração nos dados existentes, portanto, as informações relacionadas são de pouco interesse para o contexto do usuário.

SyncTable é o passo que realmente aplica as modificações no alvo e pode ser importante rever o seu resumo antes de alterar os dados do cluster alvo (veja dryrun opção mencionada acima). Ele publica alguns contadores relevantes no final da execução de redução do mapa. Observando os valores do exemplo acima, podemos ver que havia 97148 partições com hash (relatado por LOTES contador), que SyncTable detectou divergências em apenas dois deles (de acordo com o HASHES_MATCHED e HASHES_NOT_MACTHED contadores). Além disso, nas duas partições com hashes diferentes,  17 células mais de 2 linhas correspondiam (conforme relatado por MATCHING_CELLS e MATCHING_ROWS, respectivamente), mas também havia 2 linhas divergindo, nessas duas partições (de acordo com RANGESNOTMATCHED e ROWSWITDIFFS ). Por fim, CÉLULAS DE FONTE e CÉLULAS DESLIGADAS diga-nos em detalhes se as células estavam presentes apenas no cluster de origem ou de destino. Neste exemplo, o cluster de origem tinha uma célula que não estava no destino, mas o destino também tinha uma célula que não estava na origem. Desde SyncTable foi executado sem especificar dryrun opção e configuração doDeletes opção para falso , o trabalho excluiu a célula extra no cluster de destino e adicionou a célula extra localizada na origem ao cluster de destino. Assumindo sem gravações acontecer em ambos os clusters, uma execução subsequente da mesma SyncTable comando no cluster de destino não mostraria diferenças:
hbase org.apache.hadoop.hbase.mapreduce.SyncTable --sourcezkcluster=zk1.example.com,zk2.example.com,zk3.example.com:2181:/hbase hdfs://nn:9000/hashes /test-tbl test-tbl test-tbl…org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$CounterBATCHES=97148HASHES_MATCHED=97148…

Cenários aplicáveis

Sincronização de dados


À primeira vista, HashTable/SyncTable pode parecer sobrepor-se ao CopyTable ferramenta, mas ainda existem cenários específicos em que qualquer uma das ferramentas seria mais adequada. Como primeiro exemplo de comparação, usando HashTable/SyncTable para um carregamento inicial de uma tabela contendo 100.004 linhas e um tamanho total de dados de 5,17 GB, foram necessários alguns minutos apenas para SyncTable completar:
...20/04/29 03:48:00 INFO mapreduce.Job:Executando job:job_1587985272792_001120/04/29 03:48:09 INFO mapreduce.Job:Job job_1587985272792_0011 running in uber mode :false20/04/ 29 03:48:09 INFO mapreduce.Job:  map 0% reduce 0%20/04/29 03:54:08 INFO mapreduce.Job:  map 100% reduce 0%20/04/29 03:54:09 INFO mapreduce .Job:Job job_1587985272792_0011 completed successfully…org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$CounterBATCHES=97148EMPTY_BATCHES=97148HASHES_NOT_MATCHED=97148RANGESNOTMATCHED=97148ROWSWITHDIFFS=100004TARGETMISSINGCELLS=749589TARGETMISSINGROWS=100004

Mesmo em um conjunto de dados tão pequeno, CopyTable executado mais rápido (aproximadamente 3 minutos, enquanto SyncTable levou 6 minutos para copiar todo o conjunto de dados):
...20/04/29 05:12:07 INFO mapreduce.Job:Executando job:job_1587986840019_000520/04/29 05:12:24 INFO mapreduce.Job:Job job_1587986840019_0005 running in uber mode :false20/04/ 29 05:12:24 INFO mapreduce.Job:  map 0% reduce 0%20/04/29 05:13:16 INFO mapreduce.Job:  map 25% reduce 0%20/04/29 05:13:49 INFO mapreduce .Trabalho:  mapear 50% reduzir 0%20/04/29 05:14:37 INFO mapreduce.Job:  mapear 75% reduzir 0%20/04/29 05:15:14 INFO mapreduce.Job:  mapear 100% reduzir 0 %20/04/29 05:15:14 INFO mapreduce.Job:Job job_1587986840019_0005 completed successfully…HBase CountersBYTES_IN_REMOTE_RESULTS=2787236791BYTES_IN_RESULTS=5549784428MILLIS_BETWEEN_NEXTS=130808NOT_SERVING_REGION_EXCEPTION=0NUM_SCANNER_RESTARTS=0NUM_SCAN_RESULTS_STALE=0REGIONS_SCANNED=4REMOTE_RPC_CALLS=1334REMOTE_RPC_RETRIES=0ROWS_FILTERED=0ROWS_SCANNED=100004RPC_CALLS=2657RPC_RETRIES=0 
Agora vamos usar as duas ferramentas novamente para lidar com diferenças esparsas no conjunto de dados. O teste-tbl A tabela usada em todos esses exemplos tem quatro regiões no cluster de origem. Depois que todo o conjunto de dados original foi copiado para o cluster de destino no exemplo anterior, adicionamos quatro linhas adicionais apenas no lado da origem, uma para cada uma das regiões existentes, e executamos HashTable/SyncTable novamente para sincronizar ambos os agrupamentos:
20/04/29 05:29:23 INFO mapreduce.Job:Executando job:job_1587985272792_001320/04/29 05:29:39 INFO mapreduce.Job:Job job_1587985272792_0013 running in uber mode :false20/04/29 05:29:39 INFO mapreduce.Job:  map 0% reduz 0%20/04/29 05:29:53 INFO mapreduce.Job:  map 50% reduz 0%20/04/29 05:30:42 INFO mapreduce.Job:map 100% reduce 0%20/04/29 05:30:42 INFO mapreduce.Job:Job job_1587985272792_0013 concluído com sucesso…org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$CounterBATCHES=97148HASHES_MATCHED=97144HASHES_NOTWS=MATCHED=4MATCHINGC 5RANGESNOTMATCHED=4ROWWITHDIFFS=4TARGETMISSINGCELLS=4TARGETMISSINGROWS=4

Podemos ver isso com apenas quatro incompatibilidade de partições, SyncTable foi consideravelmente mais rápido (aproximadamente um minuto para ser concluído). Usando CopyTable para realizar esta mesma sincronização apresentou os seguintes resultados:
20/04/29 08:32:38 INFO mapreduce.Job:Executando job:job_1587986840019_000820/04/29 08:32:52 INFO mapreduce.Job:Job job_1587986840019_0008 running in uber mode :false20/04/29 08:32:52 INFO mapreduce.Job:  mapreduce 0% reduzir 0%20/04/29 08:33:38 INFO mapreduce.Job:  mapreduce 25% reduzir 0%20/04/29 08:34:15 INFO mapreduce.Job:mapreduce 50% reduz 0%20/04/29 08:34:48 INFO mapreduce.Job:  mapreduce 75% reduz 0%20/04/29 08:35:31 INFO mapreduce.Job:  mapreduce 100% reduz 0%20/ 04/29 08:35:32 INFO mapreduce.Job:Job job_1587986840019_0008 completed successfully…HBase CountersBYTES_IN_REMOTE_RESULTS=2762547723BYTES_IN_RESULTS=5549784600MILLIS_BETWEEN_NEXTS=340672NOT_SERVING_REGION_EXCEPTION=0NUM_SCANNER_RESTARTS=0NUM_SCAN_RESULTS_STALE=0REGIONS_SCANNED=4REMOTE_RPC_CALLS=1323REMOTE_RPC_RETRIES=0ROWS_FILTERED=0ROWS_SCANNED=100008RPC_CALLS=2657RPC_RETRIES=0

Copiar tabela levou a mesma quantidade de tempo para sincronizar as tabelas como ao copiar todo o conjunto de dados, embora houvesse apenas quatro células para copiar. Isso ainda estava bom para esse conjunto de dados muito pequeno e com um cluster ocioso, mas em casos de uso de produção com conjuntos de dados maiores e onde o cluster de destino também pode estar em uso por muitos aplicativos cliente gravando dados nele, CopyTable degradação de desempenho em comparação com SyncTable seria ainda maior.

Vale a pena mencionar que também existem ferramentas/recursos adicionais que podem ser usados ​​em combinação para um carregamento inicial de um cluster de destino (o destino não possui dados), como exportação de snapshots, carregamento em massa ou até mesmo uma cópia direta do original diretórios de tabela do cluster de origem. Para carregamentos iniciais com grandes somas de dados para copiar, tirar um instantâneo da tabela e usar o ExportSnapshot ferramenta superará ferramentas de cópia online como SyncTable ou Copiar tabela.

Verificando a integridade da replicação


Outro uso comum de HashTable/SyncTable é para monitorar o estado de replicação entre clusters, ao solucionar possíveis problemas de replicação. Nesse cenário, ela funciona como uma alternativa à ferramenta VerifyReplication. Normalmente, ao verificar o estado entre dois clusters, não há incompatibilidades ou um problema parcial temporário fez com que uma pequena parte do conjunto de dados maior ficasse fora de sincronia. No ambiente de teste que usamos para nosso exemplo anterior, deve haver 100.008 linhas com valores correspondentes em ambos os clusters. A execução do SyncTable no cluster de destino com a opção dryrun nos permitirá identificar quaisquer diferenças:
20/05/04 10:47:25 INFO mapreduce.Job:Executando job:job_1588611199158_0004…20/05/04 10:48:48 INFO mapreduce.Job:  map 100% reduce 0%20/05/04 10 :48:48 INFO mapreduce.Job:Job job_1588611199158_0004 completed successfully…HBase CountersBYTES_IN_REMOTE_RESULTS=3753476784BYTES_IN_RESULTS=5549784600ROWS_SCANNED=100008…org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$CounterBATCHES=97148HASHES_MATCHED=97148...Unlike SyncTable we must run a ferramenta VerifyReplication no cluster de origem. Passamos o id do peer como um de seus parâmetros para que ele possa encontrar o cluster remoto a ser verificado para comparação:20/05/04 11:01:58 INFO mapreduce.Job:Running job:job_1588611196128_0001…20/05/04 11:04:39 INFO mapreduce.Job:  map 100% reduce 0%20/05/04 11:04:39 INFO mapreduce.Job:Job job_1588611196128_0001 concluído com sucesso…HBase CountersBYTES_IN_REMOTE_RESULTS=2761955495BYTES_IN_RESULTS=5549784600…org. .replication.VerifyReplication$Verifier$CountersGOODROWS=100008...

Sem diferenças, o SyncTable encontra todos os hashes correspondentes entre as partições de origem e de destino e, portanto, evita a necessidade de verificar novamente o cluster de origem remoto. O VerifyReplication realiza uma comparação uma a uma para cada célula em ambos os clusters que já podem ter um alto custo de rede mesmo ao lidar com conjuntos de dados tão pequenos.

Adicionando uma linha extra no cluster de origem e realizando as verificações novamente. Com VerifyReplication:
20/05/05 11:14:05 INFO mapreduce.Job:Executando job:job_1588611196128_0004…20/05/05 11:16:32 INFO mapreduce.Job:  map 100% reduce 0%20/05/05 11 :16:32 INFO mapreduce.Job:Job job_1588611196128_0004 concluído com sucesso…org.apache.hadoop.hbase.mapreduce.replication.VerifyReplication$Verifier$CountersBADROWS=1GOODROWS=100008ONLY_IN_SOURCE_TABLE_ROWS=1…

Antes de podermos usar SyncTable, temos que regenerar hashes na fonte com HashTable novamente, pois há uma nova célula agora:
20/05/04 11:31:48 INFO mapreduce.Job:Executando job:job_1588611196128_0003…20/05/04 11:33:15 INFO mapreduce.Job:Job job_1588611196128_0003 concluído com sucesso...

Agora SyncTable:
20/05/07 05:47:51 INFO mapreduce.Job:Executando job:job_1588611199158_0014…20/05/07 05:49:20 INFO mapreduce.Job:Job job_1588611199158_0014 concluído com sucesso org.apache.hadoop.hbase.mapreduce.SyncTable$SyncMapper$CounterBATCHES=97148HASHES_NOT_MATCHED=97148MATCHINGCELLS=749593MATCHINGROWS=100008RANGESMATCHED=97147RANGESNOTMATCHED=1ROWSWITHDIFFS=1TARGETpre>MISSINGCELLS=1TARGETMISSINGROWS=1TARGETMISSINGROWS=1 
Podemos ver o aumento no tempo de execução devido à varredura adicional e comparação de células entre os dois clusters remotos. Enquanto isso, o tempo de execução do VerifyReplication mostrou pouca variação.

Conclusão


HashTable/SyncTable é uma ferramenta valiosa para mover dados, ao lidar com incompatibilidades esparsas entre dois conjuntos de dados de clusters. Ele faz uso de particionamento de dados e hash para detectar com eficiência diferenças nos intervalos dos dois conjuntos de dados, reduzindo o número de células a serem verificadas ao comparar dados dos dois clusters, evitando também colocações desnecessárias de valores já existentes no cluster de destino. No entanto, não é uma bala de prata, como demonstrado em alguns dos exemplos acima, embora possa parecer sobrepor em função a CopyTable ferramenta, o último pode oferecer melhor desempenho ao lidar com um intervalo sequencial maior de células incompatíveis. Nesse sentido, HashTable/SyncTable seria mais aplicável nos casos em que ambos os clusters já possuem alguns dados residentes ou em casos em que uma configuração de replicação existente foi interrompida pela indisponibilidade temporária de um dos pares.

Artigos relacionados:

https://blog.cloudera.com/apache-hbase-replication-overview/

https://blog.cloudera.com/approaches-to-backup-and-disaster-recovery-in-hbase/

https://blog.cloudera.com/online-apache-hbase-backups-with-copytable/

https://blog.cloudera.com/introduction-to-apache-hbase-snapshots/