Esta postagem do blog foi publicada no Hortonworks.com antes da fusão com a Cloudera. Alguns links, recursos ou referências podem não ser mais precisos.
Em 2016, publicamos a segunda versão v1.0.1 do Spark HBase Connector (SHC). Neste blog, vamos passar pelos principais recursos que implementamos este ano.
Suporte ao codificador Phoenix
O SHC pode ser usado para gravar dados no cluster HBase para processamento posterior. Ele suporta serialização Avro para dados de entrada e saída e padroniza para uma serialização personalizada usando um mecanismo de codificação nativo simples. Ao ler os dados de entrada, o SHC envia filtros para o HBase para varreduras eficientes de dados. Dada a popularidade dos dados do Phoenix no HBase, parece natural oferecer suporte aos dados do Phoenix como entrada para o HBase, além dos dados do Avro. Além disso, o padrão para a codificação binária nativa simples parece suscetível a alterações futuras e é um risco para os usuários que gravam dados do SHC no HBase. Por exemplo, com o SHC avançando, a compatibilidade com versões anteriores precisa ser tratada adequadamente. Portanto, o padrão, o SHC precisa mudar para um formato mais padrão e bem testado, como o Phoenix.
Para o suporte de chave composta, antes desse recurso, o comprimento do valor de cada dimensão precisava ser fixo – com exceção da última dimensão da chave composta. Esta limitação foi removida pelo codificador Phoenix. Atualmente, se os usuários escolherem Phoenix como codificador de dados, eles não precisarão especificar o comprimento de cada parte da chave composta no catálogo.
Como Phoenix é o codificador padrão, a única mudança para os usuários é que, se quiserem usar PrimitiveType como codificador de dados, eles precisam especificar “tableCoder”:”PrimitiveType” em seus catálogos para notificar o SHC de que desejam usar PrimitiveType. de Phoenix como “tableCoder”.
def catalog =s”””{
|”table”:{“namespace”:”default”, “name”:”table1″, “tableCoder”:”PrimitiveType”},
|”rowkey ”:”chave”,
|”colunas”:{
|”col0″:{“cf”:”rowkey”, “col”:”chave”, “type”:”string”} ,
|”col1″:{“cf”:”cf1″, “col”:”col1″, “type”:”boolean”},
|”col2″:{“cf”:”cf2″, “col”:”col2″, “type”:”double”},
|”col3″:{“cf”:”cf3″, “col”:”col3″, “type” :”float”},
|”col4″:{“cf”:”cf4″, “col”:”col4″, “type”:”int”},
|”col5″:{“cf”:”cf5″, “col”:”col5″, “type”:”bigint”},
|”col6″:{“cf”:”cf6″, “col”:”col6 ″, “type”:”smallint”},
|”col7″:{“cf”:”cf7″, “col”:”col7″, “type”:”string”},
|”col8″:{“cf”:”cf8″, “col”:”col8″, “type”:”tinyint”}
|}
|}”””.stripMargin
Conexões do Cache Spark HBase
O SHC não armazenava em cache objetos de conexão para o HBase antes. Especificamente, a chamada para ‘ConnectionFactory.createConnection’ era feita sempre que o SHC precisava visitar as tabelas e regiões do HBase. Os usuários podem ver isso simplesmente observando os logs do executor e observando as conexões do zookeeper sendo estabelecidas para cada solicitação. Na documentação da interface Connection, ele diz que a criação da conexão é uma operação pesada e as implementações de conexão são thread-safe. Portanto, para processos de longa duração, seria muito útil para o SHC manter uma conexão em cache. Com esse recurso, o SHC diminui drasticamente o número de conexões criadas e melhora muito seu desempenho no processo.
Suporte a famílias de colunas duplicadas
O SHC oferece suporte a famílias de colunas duplicadas. Agora os usuários podem definir seus catálogos assim:
def catalog =s”””{
|”table”:{“namespace”:”default”, “name”:”table1″, “tableCoder”:”PrimitiveType”},
|”rowkey ”:”chave”,
|”colunas”:{
|”col0″:{“cf”:”rowkey”, “col”:”chave”, “type”:”string”} ,
|”col1″:{“cf”:”cf1″, “col”:”col1″, “type”:”boolean”},
|”col2″:{“cf”:”cf1″, “col”:”col2″, “type”:”double”},
|”col3″:{“cf”:”cf1″, “col”:”col3″, “type” :”float”},
|”col4″:{“cf”:”cf2″, “col”:”col4″, “type”:”int”},
|”col5″:{“cf”:”cf2″, “col”:”col5″, “type”:”bigint”},
|”col6″:{“cf”:”cf3″, “col”:”col6 ″, “tipo”:”pequeno”},
|”col7″:{“cf”:”cf3″, “col”:”col7″, “tipo”:”string”},
|”col8″:{“cf”:”cf3″, “col”:”col8″, “type”:”tinyint”}
|}
|}”””.stripMargin
Na definição do catálogo acima, as colunas ‘col0’, ‘col1’ e ‘col2’ têm a mesma família de colunas ‘cf1’.
Usar a API Spark UnhandledFilters
A SHC também implementou a API Spark unhandledFilters, que é uma otimização eficaz. Essa API informa ao Spark sobre os filtros que o SHC não está implementando em vez de retornar todos os filtros. O comportamento anterior, nesse caso, era reaplicar todos os filtros assim que os dados fossem extraídos no Spark. Isso deve ser idempotente, portanto, não altera nenhum dado, mas pode ser caro se os filtros forem complicados.
Comunidade SHC
A comunidade SHC é maior e mais influente do que há um ano. Em 2016, demos palestras no Hadoop Summit e no encontro HBase/Spark, e escrevemos blogs detalhados. Com o aumento do número de usuários do SHC, estamos recebendo um número maior de perguntas de usuários. Estamos muito felizes em ver o aumento da adoção do SHC e, se você tiver alguma ideia sobre como melhorá-lo ainda mais, envie-nos comentários por meio do Hortonworks Community Connection.
AGRADECIMENTO
Queremos agradecer a equipe da Bloomberg por nos orientar neste trabalho e também nos ajudar a validar este trabalho. Também queremos agradecer à comunidade HBase por fornecer seus comentários e melhorar isso. Por fim, este trabalho aproveitou as lições das integrações anteriores do Spark HBase e queremos agradecer a seus desenvolvedores por pavimentar o caminho.
REFERÊNCIA:
SHC: https://github.com/hortonworks-spark/shc
Apache HBase: https://hbase.apache.org/
Apache Spark: http://spark.apache.org/
Apache Phoenix: https://phoenix.apache.org/