HBase
 sql >> Base de Dados >  >> NoSQL >> HBase

Como:usar o carregamento em massa do HBase e por quê


O Apache HBase tem tudo a ver com fornecer acesso aleatório, em tempo real, de leitura/gravação ao seu Big Data, mas como você obtém esses dados com eficiência no HBase em primeiro lugar? Intuitivamente, um novo usuário tentará fazer isso por meio das APIs do cliente ou usando um trabalho MapReduce com TableOutputFormat, mas essas abordagens são problemáticas, como você aprenderá abaixo. Em vez disso, o recurso de carregamento em massa do HBase é muito mais fácil de usar e pode inserir a mesma quantidade de dados mais rapidamente.

Esta postagem de blog apresentará os conceitos básicos do recurso de carregamento em massa, apresentará dois casos de uso e proporá dois exemplos.

Visão geral do carregamento em massa


Se você tiver algum desses sintomas, o carregamento em massa provavelmente é a escolha certa para você:
  • Você precisava ajustar suas MemStores para usar a maior parte da memória.
  • Você precisava usar WALs maiores ou ignorá-los completamente.
  • Suas filas de compactação e descarga estão na casa das centenas.
  • Seu GC está fora de controle porque suas inserções variam em MBs.
  • Sua latência sai do seu SLA quando você importa dados.

A maioria desses sintomas é comumente chamada de “dores de crescimento”. Usar o carregamento em massa pode ajudá-lo a evitá-los.

Na linguagem HBase, o carregamento em massa é o processo de preparar e carregar HFiles (formato de arquivo próprio do HBase) diretamente nos RegionServers, ignorando assim o caminho de gravação e eliminando totalmente esses problemas. Esse processo é semelhante ao ETL e se parece com isso:

1. Extraia os dados de uma fonte, normalmente arquivos de texto ou outro banco de dados. O HBase não gerencia essa parte do processo. Em outras palavras, você não pode dizer ao HBase para preparar HFiles lendo-os diretamente do MySQL - em vez disso, você precisa fazer isso por seus próprios meios. Por exemplo, você pode executar mysqldump em uma tabela e fazer upload dos arquivos resultantes para o HDFS ou simplesmente pegar seus arquivos de log Apache HTTP. De qualquer forma, seus dados precisam estar no HDFS antes da próxima etapa.

2. Transforme os dados em HFiles. Esta etapa requer um trabalho MapReduce e para a maioria dos tipos de entrada você terá que escrever o Mapeador você mesmo. O trabalho precisará emitir a chave de linha como a Chave e um KeyValue, um Put ou um Delete como o valor. O Redutor é tratado pelo HBase; você o configura usando HFileOutputFormat.configureIncrementalLoad() e ele faz o seguinte:
  • Inspeciona a tabela para configurar um particionador de pedido total
  • Carrega o arquivo de partições para o cluster e o adiciona ao DistributedCache
  • Define o número de tarefas de redução para corresponder ao número atual de regiões
  • Define a classe de chave/valor de saída para corresponder aos requisitos de HFileOutputFormat
  • Configura o redutor para executar a classificação apropriada (KeyValueSortReducer ou PutSortReducer)

Nesta fase, um HFile será criado por região na pasta de saída. Lembre-se de que os dados de entrada são quase completamente reescritos, portanto, você precisará de pelo menos duas vezes a quantidade de espaço em disco disponível do que o tamanho do conjunto de dados original. Por exemplo, para um mysqldump de 100 GB, você deve ter pelo menos 200 GB de espaço em disco disponível no HDFS. Você pode excluir o arquivo de despejo no final do processo.

3. Carregue os arquivos no HBase informando aos RegionServers onde encontrá-los. Este é o passo mais fácil. Requer o uso de LoadIncrementalHFiles (mais comumente conhecido como a ferramenta completebulkload) e, ao passar uma URL que localiza os arquivos no HDFS, ele carregará cada arquivo na região relevante por meio do RegionServer que o atende. Caso uma região tenha sido dividida após a criação dos arquivos, a ferramenta dividirá automaticamente o HFile de acordo com os novos limites. Esse processo não é muito eficiente, portanto, se sua tabela estiver sendo gravada por outros processos, é melhor carregar os arquivos assim que a etapa de transformação for concluída.

Aqui está uma ilustração desse processo. O fluxo de dados vai da fonte original para o HDFS, onde os RegionServers simplesmente moverão os arquivos para os diretórios de suas regiões.


Casos de uso


Carregamento do conjunto de dados original: Todos os usuários migrando de outro armazenamento de dados devem considerar este caso de uso. Primeiro, você precisa fazer o exercício de projetar o esquema da tabela e, em seguida, criar a própria tabela, pré-dividida. Os pontos de divisão devem levar em consideração a distribuição de chave de linha e o número de RegionServers. Eu recomendo ler a apresentação do meu colega Lars George sobre design de esquema avançado para qualquer caso de uso sério.

A vantagem aqui é que é muito mais rápido gravar os arquivos diretamente do que passar pelo caminho de gravação do RegionServer (gravando no MemStore e no WAL) e, eventualmente, liberar, compactar e assim por diante. Isso também significa que você não precisa ajustar seu cluster para uma carga de trabalho pesada de gravação e depois ajustá-lo novamente para sua carga de trabalho normal.

Carga incremental: Digamos que você tenha algum conjunto de dados atualmente sendo servido pelo HBase, mas agora você precisa importar mais dados em lote de um terceiro ou você tem um trabalho noturno que gera alguns gigabytes que você precisa inserir. Provavelmente não é tão grande quanto o conjunto de dados que o HBase já está servindo, mas pode afetar o percentil 95 da sua latência. Percorrer o caminho de gravação normal terá o efeito adverso de acionar mais liberações e compactações durante a importação do que o normal. Esse estresse de E/S adicional competirá com suas consultas sensíveis à latência.

Exemplos


Você pode usar os exemplos a seguir em seu próprio cluster Hadoop, mas as instruções são fornecidas para a VM Cloudera QuickStart, que é um cluster de nó único, sistema operacional convidado e dados de amostra e exemplos integrados a um dispositivo de máquina virtual para sua área de trabalho.

Depois de iniciar a VM, informe-a, por meio da interface da Web que será aberta automaticamente, para implantar o CDH e, em seguida, certifique-se de que o serviço HBase também seja iniciado.

Carregador em massa TSV integrado

O HBase é fornecido com um trabalho MR que pode ler um arquivo de valores separados por delimitador e enviar diretamente para uma tabela HBase ou criar HFiles para carregamento em massa. Aqui vamos:
  1. Receba os dados de amostra e faça upload para o HDFS.
  2. Execute a tarefa ImportTsv para transformar o arquivo em vários HFiles de acordo com uma tabela pré-configurada.
  3. Prepare e carregue os arquivos no HBase.

A primeira etapa é abrir um console e usar o seguinte comando para obter dados de amostra:
curl -O
https://people.apache.org/~jdcryans/word_count.csv

Eu criei este arquivo executando uma contagem de palavras no manuscrito original desta postagem do blog e, em seguida, exibindo o resultado no formato csv, sem títulos de coluna. Agora, faça o upload do arquivo para o HDFS:
hdfs dfs -put word_count.csv

A parte de extração do carregamento em massa agora está concluída, você precisa transformar o arquivo. Primeiro você precisa projetar a mesa. Para manter as coisas simples, chame de “wordcount” – as chaves de linha serão as próprias palavras e a única coluna conterá a contagem, em uma família que chamaremos de “f”. A melhor prática ao criar uma tabela é dividi-la de acordo com a distribuição de chaves de linha, mas para este exemplo criaremos apenas cinco regiões com pontos de divisão distribuídos uniformemente pelo espaço de chaves. Abra o shell hbase:
hbase shell

E execute o seguinte comando para criar a tabela:
create 'wordcount', {NAME => 'f'},   {SPLITS => ['g', 'm', 'r', 'w']}

Os quatro pontos de divisão gerarão cinco regiões, onde a primeira região começa com uma chave de linha vazia. Para obter melhores pontos de divisão, você também pode fazer uma análise rápida para ver como as palavras são realmente distribuídas, mas vou deixar isso para você.

Se você apontar o navegador da sua VM para http://localhost:60010/, verá nossa tabela recém-criada e suas cinco regiões, todas atribuídas ao RegionServer.

Agora é hora de fazer o trabalho pesado. Invocar o jar do HBase na linha de comando com o script “hadoop” mostrará uma lista de ferramentas disponíveis. O que queremos chama-se importtsv e tem o seguinte uso:
hadoop jar /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-security.jar importtsv
 ERROR: Wrong number of arguments: 0
 Usage: importtsv -Dimporttsv.columns=a,b,c  

A linha de comando que vamos usar é a seguinte:
hadoop jar   /usr/lib/hbase/hbase-0.94.6-cdh4.3.0-
security.jar importtsv
-Dimporttsv.separator=,
-Dimporttsv.bulk.output=output
-Dimporttsv.columns=HBASE_ROW_KEY,f:count wordcount word_count.csv

Aqui está um resumo dos diferentes elementos de configuração:
  • -Dimporttsv.separator=, especifica que o separador é uma vírgula.
  • -Dimporttsv.bulk.output=saída é um caminho relativo para onde os HFiles serão gravados. Como seu usuário na VM é “cloudera” por padrão, isso significa que os arquivos estarão em /user/cloudera/output. Ignorar esta opção fará com que o trabalho seja gravado diretamente no HBase.
  • -Dimporttsv.columns=HBASE_ROW_KEY,f:count é uma lista de todas as colunas contidas neste arquivo. A chave de linha precisa ser identificada usando a string HBASE_ROW_KEY em maiúsculas; caso contrário, ele não iniciará o trabalho. (Decidi usar o qualificador “count”, mas pode ser qualquer outra coisa.)

O trabalho deve ser concluído em um minuto, devido ao pequeno tamanho de entrada. Observe que cinco Redutores estão em execução, um por região. Aqui está o resultado no HDFS:
-rw-r--r--   3 cloudera cloudera         4265   2013-09-12 13:13 output/f/2c0724e0c8054b70bce11342dc91897b
-rw-r--r--   3 cloudera cloudera         3163   2013-09-12 13:14 output/f/786198ca47ae406f9be05c9eb09beb36
-rw-r--r--   3 cloudera cloudera         2487   2013-09-12 13:14 output/f/9b0e5b2a137e479cbc978132e3fc84d2
-rw-r--r--   3 cloudera cloudera         2961   2013-09-12 13:13 output/f/bb341f04c6d845e8bb95830e9946a914
-rw-r--r--   3 cloudera cloudera         1336   2013-09-12 13:14 output/f/c656d893bd704260a613be62bddb4d5f

Como você pode ver, os arquivos atualmente pertencem ao usuário “cloudera”. Para carregá-los, precisamos alterar o proprietário para “hbase” ou o HBase não terá permissão para mover os arquivos. Execute o seguinte comando:
sudo -u hdfs hdfs dfs -chown -R   hbase:hbase/user/cloudera/output

Para a etapa final, precisamos usar a ferramenta completebulkload para apontar onde estão os arquivos e quais tabelas estamos carregando:
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output wordcount

Voltando ao shell do HBase, você pode executar o comando count que mostrará quantas linhas foram carregadas. Se você esqueceu de chown, o comando irá travar.

Trabalho de RM personalizado

O carregador em massa TSV é bom para prototipagem, mas como ele interpreta tudo como strings e não suporta a manipulação de campos no momento da transformação, você acabará tendo que escrever seu próprio trabalho de MR. Meu colega James Kinley, que trabalha como arquiteto de soluções na Europa, escreveu um trabalho que usaremos em nosso próximo exemplo. Os dados do trabalho contêm mensagens públicas do Facebook e do Twitter relacionadas às finais da NBA de 2010 (jogo 1) entre Lakers e Celtics. Você pode encontrar o código aqui. (A VM Quick Start vem com git e maven instalados para que você possa clonar o repositório nela.)

Olhando para a classe Driver, os bits mais importantes são os seguintes:
job.setMapOutputKeyClass(ImmutableBytesWritable.class);
    job.setMapOutputValueClass(KeyValue.class);
…
	// Auto configure partitioner and reducer
    HFileOutputFormat.configureIncrementalLoad(job, hTable);

Primeiro, seu Mapeador precisa gerar um ImmutableBytesWritable que contenha a chave de linha e o valor de saída pode ser um KeyValue, um Put ou um Delete. O segundo trecho mostra como configurar o Redutor; na verdade, é completamente tratado pelo HFileOutputFormat. configureIncrementalLoad() conforme descrito na seção “Transformar” anteriormente.

A classe HBaseKVMapper contém apenas o Mapper que respeita a chave e os valores de saída configurados:
public class HBaseKVMapper extends
   Mapper<LongWritable,   Text, ImmutableBytesWritable,
KeyValue> {

Para executá-lo, você precisará compilar o projeto usando o maven e pegar os arquivos de dados seguindo os links no README. (Ele também contém o script de shell para criar a tabela.) Antes de iniciar o trabalho, não se esqueça de enviar os arquivos para o HDFS e configurar seu classpath para estar ciente do HBase porque você não usará seu jar desta vez :
export HADOOP_CLASSPATH=$HADOOP_CLASSPATH:/etc/hbase/conf/:/usr/lib/hbase/*

Você poderá iniciar o trabalho usando uma linha de comando semelhante a esta:
hadoop jar hbase-examples-0.0.1-SNAPSHOT.jar
com.cloudera.examples.hbase.bulkimport.Driver -libjars
/home/cloudera/.m2/repository/joda-time/joda-time/2.1/joda-time-2.1.jar,
/home/cloudera/.m2/repository/net/sf/opencsv/opencsv/2.3/opencsv-2.3.jar
RowFeeder\ for\ Celtics\ and\ Lakers\ Game\ 1.csv output2 NBAFinal2010

Como você pode ver, as dependências do trabalho devem ser adicionadas separadamente. Por fim, você pode carregar os arquivos primeiro alterando seu proprietário e, em seguida, executando a ferramenta completebulkload:
sudo -u hdfs hdfs dfs -chown -R hbase:hbase/user/cloudera/output2
hbase org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles output2 NBAFinal2010

Potenciais problemas


Dados excluídos recentemente reaparecendo. Esse problema ocorre quando um Delete é inserido por meio de um carregamento em massa e é compactado principalmente enquanto o Put correspondente ainda está em um MemStore. Os dados serão considerados excluídos quando o Delete estiver em um HFile, mas, uma vez removido durante a compactação, o Put ficará visível novamente. Se você tiver esse caso de uso, considere configurar suas famílias de colunas para manter as células excluídas com KEEP_DELETED_CELLS no shell ou HColumnDescriptor.setKeepDeletedCells().

Dados carregados em massa não podem ser substituídos por outro carregamento em massa. Esse problema ocorre quando dois HFiles carregados em massa carregados em momentos diferentes tentam gravar um valor diferente na mesma célula, o que significa que eles têm a mesma chave de linha, família, qualificador e carimbo de data/hora. O resultado é que o primeiro valor inserido será retornado em vez do segundo. Este bug será corrigido no HBase 0.96.0 e CDH 5 (a próxima versão principal do CDH) e o trabalho está sendo feito no HBASE-8521 para o branch 0.94 e CDH 4.

O carregamento em massa aciona grandes compactações. Esse problema surge quando você está fazendo carregamentos em massa incrementais e há arquivos carregados em massa suficientes para acionar uma compactação menor (o limite padrão é 3). Os HFiles são carregados com um número de sequência definido como 0 para que sejam selecionados primeiro quando o RegionServer estiver selecionando arquivos para uma compactação e, devido a um bug, ele também selecionará todos os arquivos restantes. Esse problema afetará seriamente aqueles que já possuem grandes regiões (vários GBs) ou que carregam em massa com frequência (a cada poucas horas ou menos), pois muitos dados serão compactados. O HBase 0.96.0 tem a correção apropriada e o CDH 5 também; O HBASE-8521 corrige o problema na versão 0.94, pois os HFiles carregados em massa agora recebem um número de sequência adequado. HBASE-8283 pode ser habilitado com hbase.hstore.useExploringCompation após 0.94.9 e CDH 4.4.0 para mitigar este problema sendo apenas um algoritmo de seleção de compactação mais inteligente.

Dados carregados em massa não são replicados . Como o carregamento em massa ignora o caminho de gravação, o WAL não é gravado como parte do processo. A replicação funciona lendo os arquivos WAL para que não veja os dados carregados em massa – e o mesmo vale para as edições que usam Put.setWriteToWAL(true). Uma maneira de lidar com isso é enviar os arquivos brutos ou os HFiles para o outro cluster e fazer o outro processamento lá.

Conclusão


O objetivo desta postagem no blog foi apresentar os conceitos básicos do carregamento em massa do Apache HBase. Explicamos como o processo é como fazer ETL e que é muito melhor para conjuntos de big data do que usar a API normal, pois ignora o caminho de gravação. Os dois exemplos foram incluídos para mostrar como arquivos TSV simples podem ser carregados em massa no HBase e como escrever seu próprio Mapper para outros formatos de dados.

Agora você pode tentar fazer o mesmo usando uma interface gráfica de usuário via Hue.