HBase
 sql >> Base de Dados >  >> NoSQL >> HBase

Spark-on-HBase:conector HBase baseado em DataFrame

Esta postagem do blog foi publicada no Hortonworks.com antes da fusão com a Cloudera. Alguns links, recursos ou referências podem não ser mais precisos.

Temos o orgulho de anunciar a prévia técnica do Spark-HBase Connector, desenvolvido pela Hortonworks em parceria com a Bloomberg.

O conector Spark-HBase aproveita a API de fonte de dados (SPARK-3247) introduzida no Spark-1.2.0. Ele preenche a lacuna entre o armazenamento de chave HBase simples e consultas SQL relacionais complexas e permite que os usuários executem análises de dados complexas sobre o HBase usando o Spark. Um HBase DataFrame é um Spark DataFrame padrão e é capaz de interagir com quaisquer outras fontes de dados, como Hive, ORC, Parquet, JSON, etc.

Plano de fundo


Existem vários conectores Spark HBase de código aberto disponíveis como pacotes Spark, como projetos independentes ou no tronco HBase.

O Spark mudou para as APIs Dataset/DataFrame, que fornecem otimização de plano de consulta integrada. Agora, os usuários finais preferem usar a interface baseada em DataFrames/Datasets.

O conector HBase no tronco HBase tem um suporte avançado no nível RDD, por exemplo. BulkPut, etc, mas seu suporte a DataFrame não é tão rico. O conector de tronco HBase depende do HadoopRDD padrão com TableInputFormat integrado ao HBase tem algumas limitações de desempenho. Além disso, o BulkGet executado no driver pode ser um único ponto de falha.

Existem algumas outras implementações alternativas. Use o Spark-SQL-on-HBase como um exemplo. Ele aplica técnicas de otimização personalizadas muito avançadas, incorporando seu próprio plano de otimização de consulta dentro do mecanismo Spark Catalyst padrão, envia o RDD para o HBase e executa tarefas complicadas, como agregação parcial, dentro do coprocessador HBase. Essa abordagem é capaz de atingir alto desempenho, mas é difícil de manter devido à sua complexidade e à rápida evolução do Spark. Também permitir que código arbitrário seja executado dentro de um coprocessador pode representar riscos de segurança.

O Spark-on-HBase Connector (SHC) foi desenvolvido para superar esses possíveis gargalos e pontos fracos. Ele implementa a API Spark Datasource padrão e aproveita o mecanismo Spark Catalyst para otimização de consultas. Em paralelo, o RDD é construído do zero em vez de usar TableInputFormat para obter alto desempenho. Com este RDD personalizado, todas as técnicas críticas podem ser aplicadas e totalmente implementadas, como remoção de partição, remoção de coluna, empilhamento de predicado e localidade de dados. O design torna a manutenção muito fácil, ao mesmo tempo em que alcança um bom equilíbrio entre desempenho e simplicidade.

Arquitetura


Presumimos que o Spark e o HBase sejam implantados no mesmo cluster e os executores do Spark estejam localizados junto aos servidores da região, conforme ilustrado na figura abaixo.



Figura 1. Arquitetura do conector Spark-on-HBase

Em um nível alto, o conector trata Scan e Get de maneira semelhante, e ambas as ações são executadas nos executores. O driver processa a consulta, agrega scans/gets com base nos metadados da região e gera tarefas por região. As tarefas são enviadas para os executores preferenciais co-localizados com o servidor da região e são executadas em paralelo nos executores para obter melhor localidade e simultaneidade dos dados. Se uma região não retiver os dados necessários, esse servidor de região não receberá nenhuma tarefa. Uma tarefa pode consistir em vários Scans e BulkGets, e as solicitações de dados por uma tarefa são recuperadas de apenas um servidor de região, e esse servidor de região também será a preferência de localidade para a tarefa. Observe que o driver não está envolvido na execução do trabalho real, exceto nas tarefas de agendamento. Isso evita que o motorista seja o gargalo.

Catálogo de Tabelas


Para trazer a tabela HBase como uma tabela relacional para o Spark, definimos um mapeamento entre as tabelas HBase e Spark, chamado Table Catalog. Existem duas partes críticas deste catálogo. Uma é a definição de rowkey e a outra é o mapeamento entre a coluna da tabela no Spark e a família de colunas e o qualificador de coluna no HBase. Consulte a seção Uso para obter detalhes.

Suporte nativo do Avro


O conector suporta o formato Avro nativamente, pois é uma prática muito comum persistir dados estruturados no HBase como uma matriz de bytes. O usuário pode persistir o registro Avro diretamente no HBase. Internamente, o esquema Avro é convertido em um tipo de dados nativo do Spark Catalyst automaticamente. Observe que ambas as partes de valor-chave em uma tabela HBase podem ser definidas no formato Avro. Consulte os exemplos/casos de teste no repositório para uso exato.

Empilhamento de Predicado


O conector recupera apenas as colunas necessárias do servidor regional para reduzir a sobrecarga da rede e evitar o processamento redundante no mecanismo Spark Catalyst. Os filtros HBase padrão existentes são usados ​​para executar push-down de predicado sem alavancar a capacidade do coprocessador. Como o HBase não reconhece o tipo de dados, exceto a matriz de bytes, e a inconsistência de ordem entre os tipos primitivos Java e a matriz de bytes,  precisamos pré-processar a condição do filtro antes de definir o filtro na operação Scan para evitar qualquer perda de dados. Dentro do servidor de região, os registros que não correspondem à condição de consulta são filtrados.

Remoção de Partições


Ao extrair a chave de linha dos predicados, dividimos o Scan/BulkGet em vários intervalos não sobrepostos, apenas os servidores da região que possuem os dados solicitados executarão o Scan/BulkGet. Atualmente, a remoção de partição é executada na primeira dimensão das chaves de linha. Por exemplo, se uma chave de linha for “key1:key2:key3”, a remoção de partição será baseada apenas em “key1”. Observe que as condições WHERE precisam ser definidas com cuidado. Caso contrário, a remoção da partição pode não ter efeito. Por exemplo, WHERE rowkey1> “abc” OR column =“xyz” (onde rowkey1 é a primeira dimensão da rowkey e column é uma coluna hbase regular) resultará em uma varredura completa, pois temos que cobrir todos os intervalos porque do OU lógica.

Localidade dos dados


Quando um executor Spark é co-localizado com servidores de região do HBase, a localidade de dados é obtida identificando o local do servidor de região e faz o melhor esforço para colocar a tarefa com o servidor de região. Cada executor executa Scan/BulkGet por parte dos dados colocados no mesmo host.

Digitalizar e obter em massa


Esses dois operadores são expostos aos usuários especificando WHERE CLAUSE, por exemplo, WHERE column> x e column para varredura e coluna WHERE =x esquecer. As operações são realizadas nos executores, e o driver apenas constrói essas operações. Internamente, eles são convertidos em scan e/ou get, e Iterator[Row] é retornado ao mecanismo de catalisador para processamento da camada superior.

Uso


O seguinte ilustra o procedimento básico sobre como usar o conector. Para obter mais detalhes e casos de uso avançados, como suporte a Avro e chave composta, consulte os exemplos no repositório.

1) Defina o catálogo para o mapeamento do esquema:
[code language="scala"]def catalog =s"""{        |"table":{"namespace":"default", "name":"table1"},        |"rowkey":"key" ,        |"columns":{          |"col0":{"cf":"rowkey", "col":"key", "type":"string"},          |"col1":{"cf":"cf1 ", "col":"col1", "type":"boolean"},          |"col2":{"cf":"cf2", "col":"col2", "type":"double"}, |"col3":{"cf":"cf3", "col":"col3", "type":"float"},          |"col4":{"cf":"cf4", "col":" col4", "type":"int"},          |"col5":{"cf":"cf5", "col":"col5", "type":"bigint"},          |"col6":{" cf":"cf6", "col":"col6", "type":"smallint"},          |"col7":{"cf":"cf7", "col":"col7", "type":"string"},          |"col8":{"cf":"cf8", "col":"col8", "type":"tinyint"}        |}      |}""".stripMargin[/code] 


2) Prepare os dados e preencha a tabela HBase:
case class HBaseRecord(col0:String, col1:Boolean, col2:Double, col3:Float, col4:Int,       col5:Long, col6:Short, col7:String, col8:Byte)

object HBaseRecord {def apply(i:Int, t:String):HBaseRecord ={ val s =s”””row${“%03d”.format(i)}”””       HBaseRecord(s, i % 2 ==0, i.toDouble, i.toFloat,  i, i.toLong, i.toShort,  s"String$i:$t",      i.toByte) }}

val data =(0 a 255).map { i =>  HBaseRecord(i, “extra”)}

sc.parallelize(data).toDF.write.options(
 Map(HBaseTableCatalog.tableCatalog -> catalog, HBaseTableCatalog.newTable -> “5”))
 .format(“org.apache.spark. sql.execution.datasources.hbase”)
 .save()
 
3) Carregue o DataFrame:
def withCatalog(cat:String):DataFrame ={
 sqlContext
 .read
 .options(Map(HBaseTableCatalog.tableCatalog->cat))
 .format( “org.apache.spark.sql.execution.datasources.hbase”)
 .load()
}

val df =withCatalog(catalog)

4) Consulta integrada ao idioma:
val s =df.filter((($”col0″ <=“linha050″ &&$”col0”> “linha040”) ||
 $”col0″ ===“linha005” ||
 $”col0″ ===“linha020” ||
 $”col0″ === “r20” ||
 $”col0″ <=“linha005”) &&
 ($”col4″ ===1 ||
 $”col4″ ===42))
 .select(“col0”, “col1”, “col4”)
s .exposição

5) Consulta SQL:
df.registerTempTable(“table”)
sqlContext.sql(“select count(col1) from table”).show

Configurando o pacote Spark


Os usuários podem usar o conector Spark-on-HBase como um pacote Spark padrão. Para incluir o pacote em seu aplicativo Spark, use:

spark-shell, pyspark ou spark-submit

> $SPARK_HOME/bin/spark-shell –packages zhzhan:shc:0.0.11-1.6.1-s_2.10

Os usuários também podem incluir o pacote como dependência em seu arquivo SBT. O formato é o spark-package-name:version

spDependencies +=“zhzhan/shc:0.0.11-1.6.1-s_2.10”

Executando em cluster seguro


Para executar em um cluster habilitado para Kerberos, o usuário deve incluir jars relacionados ao HBase no caminho de classe, pois a recuperação e a renovação do token HBase são feitas pelo Spark e são independentes do conector. Em outras palavras, o usuário precisa iniciar o ambiente da maneira normal, seja através do kinit ou fornecendo principal/keytab. Os exemplos a seguir mostram como executar em um cluster seguro com o modo yarn-client e yarn-cluster. Observe que SPARK_CLASSPATH deve ser definido para ambos os modos, e o jar de exemplo é apenas um espaço reservado para o Spark.

export SPARK_CLASSPATH=/usr/hdp/current/hbase-client/lib/hbase-common.jar:/usr/hdp/current/hbase-client/lib/hbase-client.jar:/usr/hdp/current/hbase- client/lib/hbase-server.jar:/usr/hdp/current/hbase-client/lib/hbase-protocol.jar:/usr/hdp/current/hbase-client/lib/guava-12.0.1.jar

Suponha que hrt_qa seja uma conta headless, o usuário pode usar o seguinte comando para kinit:

kinit -k -t /tmp/hrt_qa.headless.keytab hrt_qa

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-client –packages zhzhan:shc:0.0.11- 1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark-client/lib/spark-examples-1.6.1.2.4.2. 0-106-hadoop2.7.1.2.4.2.0-106.jar

/usr/hdp/current/spark-client/bin/spark-submit –class org.apache.spark.sql.execution.datasources.hbase.examples.HBaseSource –master yarn-cluster –files /etc/hbase/conf/hbase -site.xml –packages zhzhan:shc:0.0.11-1.6.1-s_2.10 –num-executors 4 –driver-memory 512m –executor-memory 512m –executor-cores 1 /usr/hdp/current/spark- client/lib/spark-examples-1.6.1.2.4.2.0-106-hadoop2.7.1.2.4.2.0-106.jar

Juntando tudo


Acabamos de dar uma rápida visão geral de como o HBase oferece suporte ao Spark no nível do DataFrame. Com a API DataFrame, os aplicativos Spark podem trabalhar com dados armazenados na tabela HBase tão facilmente quanto qualquer dado armazenado em outras fontes de dados. Com esse novo recurso, os dados nas tabelas do HBase podem ser facilmente consumidos por aplicativos Spark e outras ferramentas interativas, por exemplo. os usuários podem executar uma consulta SQL complexa em cima de uma tabela HBase dentro do Spark, realizar uma junção de tabela no Dataframe ou integrar-se ao Spark Streaming para implementar um sistema mais complicado.

O que vem a seguir?


Atualmente, o conector está hospedado no repositório Hortonworks e publicado como um pacote Spark. Está em processo de migração para o tronco do Apache HBase. Durante a migração, identificamos alguns bugs críticos no tronco do HBase, e eles serão corrigidos junto com a mesclagem. O trabalho da comunidade é rastreado pelo guarda-chuva HBase JIRA HBASE-14789, incluindo HBASE-14795 e HBASE-14796  para otimizar a arquitetura de computação subjacente para Scan e BulkGet,  HBASE-14801 para fornecer interface de usuário JSON para facilidade de uso, HBASE-15336 para o caminho de gravação do DataFrame, HBASE-15334 para suporte a Avro, HBASE-15333  para oferecer suporte a tipos primitivos Java, como short, int, long, float e double etc., HBASE-15335 para oferecer suporte a chave de linha composta e HBASE-15572 para adicionar semântica de carimbo de data/hora opcional. Estamos ansiosos para produzir uma versão futura do conector que torne o conector ainda mais fácil de trabalhar.

Reconhecimento


Queremos agradecer a Hamel Kothari, Sudarshan Kadambi e a equipe da Bloomberg por nos orientar neste trabalho e também nos ajudar a validar este trabalho. Também queremos agradecer à comunidade HBase por fornecer seus comentários e melhorar isso. Por fim, este trabalho aproveitou as lições das integrações anteriores do Spark HBase e queremos agradecer a seus desenvolvedores por pavimentar o caminho.

Referência:


SHC:https://github.com/hortonworks/shc-release

Pacote Spark:http://spark-packages.org/package/zhzhan/shc

Apache HBase: https://hbase.apache.org/

Apache Spark:http://spark.apache.org/