Agradecemos a Pengyu Wang, desenvolvedor de software da FINRA, pela permissão para republicar esta postagem.
As tabelas salgadas do Apache HBase com pré-divisão são uma solução HBase comprovadamente eficaz para fornecer distribuição uniforme de carga de trabalho entre RegionServers e evitar pontos de acesso durante gravações em massa. Neste design, uma chave de linha é feita com uma chave lógica mais sal no início. Uma maneira de gerar sal é calculando n (número de regiões) módulo no código hash da chave de linha lógica (data, etc).
Chaves da linha de sal
Por exemplo, uma tabela que aceita carregamento de dados diariamente pode usar chaves de linha lógicas começando com uma data e queremos pré-dividir essa tabela em 1.000 regiões. Neste caso, esperamos gerar 1.000 sais diferentes. O sal pode ser gerado, por exemplo, como:
StringUtils.leftPad(Integer.toString(Math.abs(keyCore.hashCode() % 1000)), 3, "0") + "|" + logicalKey logicalKey = 2015-04-26|abc rowKey = 893|2015-04-26|abc
A saída de
hashCode()
com módulo fornece aleatoriedade para valor de sal de “000” a “999”. Com essa transformação de chave, a tabela é pré-dividida nos limites de sal à medida que é criada. Isso fará com que os volumes de linha sejam distribuídos uniformemente ao carregar os HFiles com o MapReduce bulkload. Garante que as chaves de linha com o mesmo sal caiam na mesma região. Em muitos casos de uso, como arquivamento de dados, você precisa varrer ou copiar os dados em um determinado intervalo de chave lógica (intervalo de datas) usando o trabalho MapReduce. Os trabalhos MapReduce de tabela padrão são configurados fornecendo o
Scan
instância com atributos de intervalo de chaves. Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setBatch(1000); scan.setMaxVersions(1); scan.setStartRow(Bytes.toBytes("2015-04-26")); scan.setStopRow(Bytes.toBytes("2015-04-27")); /* Setup the table mapper job */ TableMapReduceUtil.initTableMapperJob( tablename, scan, DataScanMapper.class, ImmutableBytesWritable.class, KeyValue.class, job, true, TableInputFormat.class ); …
No entanto, a configuração de tal trabalho torna-se um desafio para tabelas pré-divididas salgadas. As chaves de linha de início e parada serão diferentes para cada região porque cada uma tem um sal exclusivo. E não podemos especificar vários intervalos para um
Scan
instância. Para resolver este problema, precisamos ver como a tabela MapReduce funciona. Geralmente, a estrutura MapReduce cria uma tarefa de mapa para ler e processar cada divisão de entrada. Cada divisão é gerada em
InputFormat
base de classe, pelo método getSplits()
. No trabalho MapReduce da tabela HBase,
TableInputFormat
é usado como InputFormat
. Dentro da implementação, o getSplits()
é substituído para recuperar as chaves de linha de início e fim do Scan
instância. Como as teclas de linha inicial e final abrangem várias regiões, o intervalo é dividido pelos limites da região e retorna a lista de TableSplit
objetos que cobrem o intervalo de chaves de varredura. Em vez de ser baseado no bloco HDFS, TableSplit
s são baseados na região. Sobrescrevendo o getSplits()
método, podemos controlar o TableSplit
. Criando TableInputFormat personalizado
Para alterar o comportamento do
getSplits()
método, uma classe personalizada que estende TableInputFormat
É necessário. O propósito de getSplits()
aqui é para cobrir o intervalo de chave lógica em cada região, construir seu intervalo de chave de linha com seu sal exclusivo. A classe HTable fornece o método getStartEndKeys()
que retorna as chaves de linha inicial e final para cada região. De cada chave de início, analise o sal correspondente para a região. Pair keys = table.getStartEndKeys(); for (int i = 0; i < keys.getFirst().length; i++) { // The first 3 bytes is the salt, for the first region, start key is empty, so apply “000” if (keys.getFirst()[i].length == 0) { regionSalt = "000"; } else { regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3); } … }
A configuração do trabalho passa o intervalo de chaves lógicas
TableInputFormat
recupera a chave de início e parada de Scan
instância. Como não podemos usar Scan
em nosso trabalho MapReduce, podemos usar Configuration
em vez disso, passar essas duas variáveis e apenas a chave lógica de início e parada é boa o suficiente (uma variável pode ser uma data ou outra informação comercial). O getSplits()
método tem JobContext
argumento, A instância de configuração pode ser lida como context.getConfiguration()
. No driver MapReduce:
Configuration conf = getConf(); conf = HBaseConfiguration.addHbaseResources(conf); conf.set("logical.scan.start", "2015-04-26"); conf.set("logical.scan.stop", "2015-04-27");
Em
Custom TableInputFormat
:@Override public List getSplits(JobContext context) throws IOException { conf = context.getConfiguration(); String scanStart = conf.get("logical.scan.start"); String scanStop = conf.get("logical.scan.stop"); … }
Reconstruir o intervalo de chaves salgadas por região
Agora que temos o salt e a chave de início/parada lógica para cada região, podemos reconstruir o intervalo de chave de linha real.
byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop);
Criando um TableSplit para cada região
Com o intervalo de chave de linha, agora podemos inicializar
TableSplit
instância para a região. List splits = new ArrayList(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { … byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop); InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation); splits.add(split); }
Mais uma coisa a ser observada é a localidade de dados. A estrutura usa informações de localização em cada divisão de entrada para atribuir uma tarefa de mapa em seu host local. Para nosso
TableInputFormat
, usamos o método getTableRegionLocation()
para recuperar o local da região que atende à chave de linha. Este local é então passado para o
TableSplit
construtor. Isso garantirá que o mapeador que processa a divisão da tabela esteja no mesmo servidor de região. Um método, chamado DNS.reverseDns()
, requer o endereço do servidor de nomes HBase. Este atributo é armazenado na configuração “hbase.nameserver.address
“. this.nameServer = context.getConfiguration().get("hbase.nameserver.address", null); … public String getTableRegionLocation(HTable table, byte[] rowKey) throws IOException { HServerAddress regionServerAddress = table.getRegionLocation(rowKey).getServerAddress(); InetAddress regionAddress = regionServerAddress.getInetSocketAddress().getAddress(); String regionLocation; try { regionLocation = reverseDNS(regionAddress); } catch (NamingException e) { regionLocation = regionServerAddress.getHostname(); } return regionLocation; } protected String reverseDNS(InetAddress ipAddress) throws NamingException { String hostName = this.reverseDNSCacheMap.get(ipAddress); if (hostName == null) { hostName = Strings.domainNamePointerToHostName(DNS.reverseDns(ipAddress, this.nameServer)); this.reverseDNSCacheMap.put(ipAddress, hostName); } return hostName; }
Um código completo de
getSplits
ficará assim:@Override public List getSplits(JobContext context) throws IOException { conf = context.getConfiguration(); table = getHTable(conf); if (table == null) { throw new IOException("No table was provided."); } // Get the name server address and the default value is null. this.nameServer = conf.get("hbase.nameserver.address", null); String scanStart = conf.get("region.scan.start"); String scanStop = conf.get("region.scan.stop"); Pair keys = table.getStartEndKeys(); if (keys == null || keys.getFirst() == null || keys.getFirst().length == 0) { throw new RuntimeException("At least one region is expected"); } List splits = new ArrayList(keys.getFirst().length); for (int i = 0; i < keys.getFirst().length; i++) { String regionLocation = getTableRegionLocation(table, keys.getFirst()[i]); String regionSalt = null; if (keys.getFirst()[i].length == 0) { regionSalt = "000"; } else { regionSalt = Bytes.toString(keys.getFirst()[i]).substring(0, 3); } byte[] startRowKey = Bytes.toBytes(regionSalt + "|" + scanStart); byte[] endRowKey = Bytes.toBytes(regionSalt + "|" + scanStop); InputSplit split = new TableSplit(table.getTableName(), startRowKey, endRowKey, regionLocation); splits.add(split); } log.info("Total table splits: " + splits.size()); return splits; }
Use o TableInoutFormat personalizado no driver MapReduce
Agora precisamos substituir o
TableInputFormat
class com a compilação personalizada que usamos para a configuração do trabalho MapReduce da tabela. Configuration conf = getConf(); conf = HBaseConfiguration.addHbaseResources(conf); HTableInterface status_table = new HTable(conf, status_tablename); conf.set("logical.scan.start", "2015-04-26"); conf.set("logical.scan.stop", "2015-04-27"); Scan scan = new Scan(); scan.setCaching(1000); scan.setCacheBlocks(false); scan.setBatch(1000); scan.setMaxVersions(1); /* Setup the table mapper job */ TableMapReduceUtil.initTableMapperJob( tablename, scan, DataScanMapper.class, ImmutableBytesWritable.class, KeyValue.class, job, true, MultiRangeTableInputFormat.class );
A abordagem do
TableInputFormat
personalizado fornece um recurso de varredura eficiente e escalável para tabelas HBase que são projetadas para usar salt para uma carga de dados balanceada. Como a varredura pode ignorar qualquer chave de linha não relacionada, independentemente do tamanho da tabela, a complexidade da varredura é limitada apenas ao tamanho dos dados de destino. Na maioria dos casos de uso, isso pode garantir um tempo de processamento relativamente consistente à medida que a tabela cresce.