Introdução
O Python é usado extensivamente entre engenheiros de dados e cientistas de dados para resolver todos os tipos de problemas, desde pipelines ETL/ELT até a construção de modelos de aprendizado de máquina. O Apache HBase é um sistema de armazenamento de dados eficaz para muitos fluxos de trabalho, mas acessar esses dados especificamente por meio do Python pode ser uma dificuldade. Para profissionais de dados que desejam usar dados armazenados no HBase, o recente projeto upstream “hbase-connectors” pode ser usado com o PySpark para operações básicas.
Nesta série de blogs, explicaremos como configurar o PySpark e o HBase juntos para uso básico do Spark, bem como para trabalhos mantidos no CDSW. Para aqueles que não estão familiarizados com o CDSW, é uma plataforma de ciência de dados corporativa segura e de autoatendimento para cientistas de dados gerenciarem seus próprios pipelines de análise, acelerando assim os projetos de aprendizado de máquina da exploração à produção. Para obter mais informações sobre o CDSW, visite a página do produto Cloudera Data Science Workbench.
Neste post, várias operações serão explicadas e demonstradas junto com a saída de exemplo. Para contextualizar, todas as operações de exemplo nesta postagem de blog específica são executadas com uma implantação de CDSW.
Pré-requisitos:
- Tenha um cluster CDP com HBase e Spark
- Se você for seguir exemplos via CDSW, precisará dele instalado – Instalando o Cloudera Data Science Workbench
- O Python 3 é instalado em cada nó no mesmo caminho
Configuração:
Primeiro, o HBase e o Spark precisam ser configurados juntos para que as consultas do Spark SQL funcionem corretamente. Para isso, existem duas partes:primeiro, configure os HBase Region Servers através do Cloudera Manager; e segundo, certifique-se de que o tempo de execução do Spark tenha ligações HBase. Uma observação a ser lembrada, porém, é que o Cloudera Manager já configura algumas variáveis de configuração e de ambiente para apontar automaticamente o Spark para o HBase para você. No entanto, a primeira etapa de configuração de consultas Spark SQL é comum em todos os tipos de implantação em clusters CDP, mas a segunda é um pouco diferente dependendo do tipo de implantação.
Configurando servidores de região HBase
- Vá para Cloudera Manager e selecione o serviço HBase.
- Procure por “ambiente de servidor de região”
- Adicione uma nova variável de ambiente usando o snippet de configuração avançada do ambiente RegionServer (válvula de segurança):
- Chave:HBASE_CLASSPATH
- Valor:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar:/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib /hbase-spark-protocol-shaded.jar:/opt/cloudera/parcels/CDH/jars/scala-library-2.11.12.jar
Certifique-se de usar os números de versão apropriados.
- Reinicie os servidores de região.
Depois de seguir as etapas acima, siga as etapas abaixo, dependendo se você deseja uma implantação CDSW ou não CDSW.
Adicionando ligações HBase ao Spark Runtime em implantações não CDSW
Para implantar o shell ou usar o spark-submit corretamente, use os comandos a seguir para garantir que o spark tenha as associações HBase corretas.
pyspark –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded. jar
spark-submit –jars /opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol- shaded.jar
Adicionando HBase Bindings ao Spark Runtime em implantações de CDSW
Para configurar o CDSW com HBase e PySpark, há algumas etapas que você precisa seguir.
1) Verifique se o Python 3 está instalado em cada nó do cluster e anote o caminho para ele
2) Faça um novo projeto no CDSW e use um modelo PySpark
3) Abra o Projeto, vá em Configurações -> Motor -> Variáveis de Ambiente.
4) Defina PYSPARK3_DRIVER_PYTHON e PYSPARK3_PYTHON para o caminho onde o Python está instalado em seus nós de cluster (Caminho observado na Etapa 1).
Abaixo está uma amostra de como deve ficar.
5) Em seu projeto, vá em Files -> spark-defaults.conf e abra-o no Workbench
6) Copie e cole a linha abaixo nesse arquivo e verifique se ela está salva antes de iniciar uma nova sessão.
spark.jars=/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark.jar,/opt/cloudera/parcels/CDH/lib/hbase_connectors/lib/hbase-spark-protocol-shaded.jar
Neste ponto, o CDSW agora está configurado para executar trabalhos do PySpark no HBase! O restante desta postagem do blog se refere a algumas operações de exemplo em uma implantação de CDSW.
Exemplo de operações
Operações de venda
Há duas maneiras de inserir e atualizar linhas no HBase. O primeiro e mais recomendado método é construir um catálogo, que é um esquema que mapeará as colunas de uma tabela HBase para um dataframe PySpark enquanto especifica o nome da tabela e o namespace. Construir esse formato JSON definido pelo usuário é o método preferido, pois também pode ser usado com outras operações. Para obter mais informações sobre catálogos, consulte esta documentação http://hbase.apache.org/book.html#_define_catalog. O segundo método está usando um parâmetro de mapeamento específico chamado “hbase.columns.mapping”, que apenas recebe uma string de pares de valores-chave.
- Usando catálogos
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() tableCatalog = ''.join("""{ "table":{"namespace":"default", "name":"tblEmployee", "tableCoder":"PrimitiveType"}, "rowkey":"key", "columns":{ "key":{"cf":"rowkey", "col":"key", "type":"int"}, "empId":{"cf":"personal","col":"empId","type":"string"}, "empName":{"cf":"personal", "col":"empName", "type":"string"}, "empState":{"cf":"personal", "col":"empWeight", "type":"string"} } }""".split()) employee = [(10, 'jonD', 'Jon Daniels', 'CA'), (6, 'billR', 'Bill Robert', 'FL')] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empState=x[3])) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .options(catalog=tableCatalog, newTable=5) \ .option("hbase.spark.use.hbasecontext", False) \ .save() # newTable refers to the NumberOfRegions which has to be > 3
Verifique se uma nova tabela chamada “tblEmployee” foi criada no HBase simplesmente abrindo o shell do HBase e executando o seguinte comando:
scan 'tblEmployee', {'LIMIT' => 2}
O uso de catálogos também permite carregar tabelas HBase facilmente. Isso será discutido em um capítulo futuro.
- Usando hbase.columns.mapping
Ao escrever o PySpark Dataframe, uma opção chamada “hbase.columns.mapping” pode ser adicionada para incluir uma string que mapeia as colunas corretamente. Esta opção só permite inserir linhas em tabelas existentes.
No shell do HBase, vamos primeiro criar uma tabela create 'tblEmployee2', 'personal'
Agora no PySpark vamos inserir 2 linhas usando “hbase.columns.mapping”
from pyspark.sql import Row from pyspark.sql import SparkSession spark = SparkSession\ .builder\ .appName("SampleApplication")\ .getOrCreate() employee = [(10, 'jonD', 'Jon Daniels', 170.7), (6, 'billR', 'Bill Robert', 200.1)] employeeRDD = spark.sparkContext.parallelize(employee) employeeMap = employeeRDD.map(lambda x: Row(key=int(x[0]), empId=x[1], empName=x[2], empWeight=float(x[3]))) employeeDF = spark.createDataFrame(employeeMap) employeeDF.write.format("org.apache.hadoop.hbase.spark") \ .option("hbase.columns.mapping", "key INTEGER :key, empId STRING personal:empId, empName STRING personal:empName, empWeight FLOAT personal:empWeight") \ .option("hbase.table", "tblEmployee2") \ .option("hbase.spark.use.hbasecontext", False) \ .save()
Novamente, apenas verifique se uma nova tabela chamada “tblEmployee2” possui essas novas linhas.
scan 'tblEmployee2', {'LIMIT' => 2}
Isso completa nossos exemplos de como inserir linhas por meio do PySpark em tabelas do HBase. Na próxima parte, discutirei as operações de obtenção e verificação, PySpark SQL e algumas soluções de problemas. Até lá, você deve obter um cluster CDP e trabalhar com esses exemplos.