HBase
 sql >> Base de Dados >  >> NoSQL >> HBase

Serialização de mensagens robusta no Apache Kafka usando o Apache Avro, parte 1


No Apache Kafka, aplicativos Java chamados produtores gravam mensagens estruturadas em um cluster Kafka (composto de brokers). Da mesma forma, aplicativos Java chamados consumidores leem essas mensagens do mesmo cluster. Em algumas organizações, existem diferentes grupos encarregados de redigir e gerenciar os produtores e consumidores. Nesses casos, um dos principais pontos problemáticos pode estar na coordenação do formato de mensagem acordado entre produtores e consumidores.

Este exemplo demonstra como usar o Apache Avro para serializar registros que são produzidos para o Apache Kafka enquanto permite a evolução de esquemas e atualização não síncrona de aplicativos produtores e consumidores.

Serialização e desserialização

Um registro Kafka (anteriormente chamado de mensagem) consiste em uma chave, um valor e cabeçalhos. Kafka não tem conhecimento da estrutura dos dados na chave e valor dos registros. Ele os trata como matrizes de bytes. Mas os sistemas que leem registros do Kafka se preocupam com os dados desses registros. Então você precisa produzir dados em um formato legível. O formato de dados que você usa deve
  • Seja compacto
  • Seja rápido para codificar e decodificar
  • Permitir evolução
  • Permitir que sistemas upstream (aqueles que gravam em um cluster Kafka) e sistemas downstream (aqueles que lêem do mesmo cluster Kafka) atualizem para esquemas mais recentes em momentos diferentes

JSON, por exemplo, é autoexplicativo, mas não é um formato de dados compacto e é lento para analisar. Avro é uma estrutura de serialização rápida que cria uma saída relativamente compacta. Mas para ler os registros Avro, você precisa do esquema com o qual os dados foram serializados.

Uma opção é armazenar e transferir o esquema com o próprio registro. Isso é bom em um arquivo em que você armazena o esquema uma vez e o usa para um grande número de registros. Armazenar o esquema em cada registro Kafka, no entanto, adiciona uma sobrecarga significativa em termos de espaço de armazenamento e utilização da rede. Uma outra opção é ter um conjunto acordado de mapeamentos de esquema de identificador e fazer referência aos esquemas por seus identificadores no registro.

Do objeto ao registro Kafka e vice-versa

Os aplicativos do produtor não precisam converter dados diretamente em matrizes de bytes. KafkaProducer é uma classe genérica que precisa que seu usuário especifique tipos de chave e valor. Em seguida, os produtores aceitam instâncias de ProducerRecord que têm os mesmos parâmetros de tipo. A conversão do objeto para a matriz de bytes é feita por um serializador. Kafka fornece alguns serializadores primitivos:por exemplo, IntegerSerializer , ByteArraySerializer , StringSerializer . No lado do consumidor, desserializadores semelhantes convertem matrizes de bytes em um objeto com o qual o aplicativo pode lidar.

Portanto, faz sentido conectar-se ao nível Serializer e Deserializer e permitir que os desenvolvedores de aplicativos produtores e consumidores usem a interface conveniente fornecida pelo Kafka. Embora as versões mais recentes do Kafka permitam ExtendedSerializers e ExtendedDeserializers para acessar os cabeçalhos, decidimos incluir o identificador de esquema na chave e valor dos registros Kafka em vez de adicionar cabeçalhos de registro.

Avro Essentials

Avro é uma estrutura de serialização de dados (e chamada de procedimento remoto). Ele usa um documento JSON chamado esquema para descrever estruturas de dados. A maior parte do uso do Avro é por meio de GenericRecord ou subclasses de SpecificRecord. Classes Java geradas a partir de esquemas Avro são subclasses deste último, enquanto o primeiro pode ser usado sem conhecimento prévio da estrutura de dados com a qual se trabalha.

Quando dois esquemas atendem a um conjunto de regras de compatibilidade, os dados gravados com um esquema (chamado de esquema de gravação) podem ser lidos como se tivessem sido gravados com o outro (chamado de esquema de leitor). Os esquemas têm uma forma canônica que possui todos os detalhes irrelevantes para a serialização, como comentários, retirados para auxiliar na verificação de equivalência.

VersionedSchema e SchemaProvider

Como mencionado anteriormente, precisamos de um mapeamento um-para-um entre esquemas e seus identificadores. Às vezes é mais fácil se referir a esquemas por nomes. Quando um esquema compatível é criado, ele pode ser considerado uma próxima versão do esquema. Assim, podemos nos referir a esquemas com um nome, par de versões. Vamos chamar o esquema, seu identificador, nome e versão juntos de VersionedSchema . Esse objeto pode conter metadados adicionais que o aplicativo requer.
public class VersionedSchema {
  private final int id;
  private final String name;
  private final int version;
  private final Schema schema;

  public VersionedSchema(int id, String name, int version, Schema schema) {
    this.id = id;
    this.name = name;
    this.version = version;
    this.schema = schema;
  }

  public String getName() {
    return name;
  }

  public int getVersion() {
    return version;
  }

  public Schema getSchema() {
    return schema;
  }
    
  public int getId() {
    return id;
  }
}

SchemaProvider objetos podem procurar as instâncias de VersionedSchema .
public interface SchemaProvider extends AutoCloseable {
  public VersionedSchema get(int id);
  public VersionedSchema get(String schemaName, int schemaVersion);
  public VersionedSchema getMetadata(Schema schema);
}

A forma como essa interface é implementada é abordada em “Implementing a Schema Store” em uma postagem futura do blog.

Serializando dados genéricos


Ao serializar um registro, primeiro precisamos descobrir qual Schema usar. Cada registro tem um getSchema método. Mas descobrir o identificador do esquema pode ser demorado. Geralmente é mais eficiente definir o esquema no momento da inicialização. Isso pode ser feito diretamente por identificador ou por nome e versão. Além disso, ao produzir vários tópicos, podemos querer definir esquemas diferentes para tópicos diferentes e descobrir o esquema a partir do nome do tópico fornecido como parâmetro para o método serialize(T, String) . Essa lógica é omitida em nossos exemplos por questões de brevidade e simplicidade.

private VersionedSchema getSchema(T data, String topic) {
  return schemaProvider.getMetadata( data.getSchema());
}

Com o esquema em mãos, precisamos armazená-lo em nossa mensagem. Serializar o ID como parte da mensagem nos dá uma solução compacta, pois toda a mágica acontece no Serializador/Desserializador. Ele também permite uma integração muito fácil com outros frameworks e bibliotecas que já suportam Kafka e permite que o usuário use seu próprio serializador (como o Spark).

Usando essa abordagem, primeiro escrevemos o identificador do esquema nos primeiros quatro bytes.
private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {
    try (DataOutputStream os = new DataOutputStream(stream)) {
    os.writeInt(id);
  }
}

Em seguida, podemos criar um DatumWriter e serializar o objeto.
private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {
  BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(stream, null);
  DatumWriter<T> datumWriter = new GenericDatumWriter<>(schema);
  datumWriter.write(data, encoder);
  encoder.flush();
}

Juntando tudo isso, implementamos um serializador de dados genérico.

public class KafkaAvroSerializer<T extends GenericContainer> implements Serializer<T> {

  private SchemaProvider schemaProvider;

  @Override
  public void configure(Map<String, ?> configs, boolean isKey) {
    schemaProvider = SchemaUtils.getSchemaProvider(configs);
  }

  @Override
  public byte[] serialize(String topic, T data) {
    try (ByteArrayOutputStream stream = new ByteArrayOutputStream()) {
      VersionedSchema schema = getSchema(data, topic);
   
      writeSchemaId(stream, schema.getId());
      writeSerializedAvro(stream, data, schema.getSchema());
      return stream.toByteArray();
    } catch (IOException e) {
      throw new RuntimeException("Could not serialize data", e);
    }
  }

  private void writeSchemaId(ByteArrayOutputStream stream, int id) throws IOException {...}

  private void writeSerializedAvro(ByteArrayOutputStream stream, T data, Schema schema) throws IOException {...}

  private VersionedSchema getSchema(T data, String topic) {...}

  @Override
  public void close() {
    try {
      schemaProvider.close();
    } catch (Exception e) {
      throw new RuntimeException(e);
    }
  }
}

Desserializando dados genéricos

A desserialização pode funcionar com um único esquema (os dados do esquema foram gravados), mas você pode especificar um esquema de leitor diferente. O esquema do leitor deve ser compatível com o esquema com o qual os dados foram serializados, mas não precisa ser equivalente. Por esse motivo, introduzimos nomes de esquema. Agora podemos especificar que queremos ler dados com uma versão específica de um esquema. No momento da inicialização, lemos as versões de esquema desejadas por nome de esquema e armazenamos metadados em readerSchemasByName para acesso rápido. Agora podemos ler cada registro escrito com uma versão compatível do esquema como se tivesse sido escrito com a versão especificada.
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  this.schemaProvider = SchemaUtils.getSchemaProvider(configs);
  this.readerSchemasByName = SchemaUtils.getVersionedSchemas(configs, schemaProvider);
}

Quando um registro precisa ser desserializado, primeiro lemos o identificador do esquema do gravador. Isso permite pesquisar o esquema do leitor por nome. Com ambos os esquemas disponíveis, podemos criar um GeneralDatumReader e leia o registro.

@Override
public GenericData.Record deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {

    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);

    VersionedSchema readerSchema =
        readerSchemasByName.get(writerSchema.getName());
    GenericData.Record avroRecord = readAvroRecord(stream,
        writerSchema.getSchema(), readerSchema.getSchema());
    return avroRecord;
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private int readSchemaId(InputStream stream ) throws IOException {
  try(DataInputStream is = new DataInputStream(stream)) {
    return is.readInt();
  }
}

private GenericData.Record readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<Object> datumReader = new GenericDatumReader<>(writerSchema,
      readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  GenericData.Record record = new GenericData.Record(readerSchema);
  datumReader.read(record, decoder);
  return record;
}

Lidando com Registros Específicos

Na maioria das vezes, há uma classe que queremos usar para nossos registros. Essa classe geralmente é gerada a partir de um esquema Avro. O Apache Avro fornece ferramentas para gerar código Java a partir de esquemas. Uma dessas ferramentas é o plugin Avro Maven. As classes geradas têm o esquema a partir do qual foram geradas disponível em tempo de execução. Isso torna a serialização e a desserialização mais simples e eficazes. Para serialização, podemos usar a classe para descobrir o identificador de esquema a ser usado.

@Override
public void configure(Map<String, ?> configs, boolean isKey) {
  String className = configs.get(isKey ? KEY_RECORD_CLASSNAME : VALUE_RECORD_CLASSNAME).toString();
  try (SchemaProvider schemaProvider = SchemaUtils.getSchemaProvider(configs)) {
    Class<?> recordClass = Class.forName(className);
    Schema writerSchema = new
        SpecificData(recordClass.getClassLoader()).getSchema(recordClass);
    this.writerSchemaId = schemaProvider.getMetadata(writerSchema).getId();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}

Assim, não precisamos da lógica para determinar o esquema do tópico e dos dados. Usamos o esquema disponível na classe de registro para gravar registros.

Da mesma forma, para desserialização, o esquema do leitor pode ser encontrado na própria classe. A lógica de desserialização se torna mais simples, porque o esquema do leitor é fixo no momento da configuração e não precisa ser consultado pelo nome do esquema.
@Override
public T deserialize(String topic, byte[] data) {
  try (ByteArrayInputStream stream = new ByteArrayInputStream(data)) {
    int schemaId = readSchemaId(stream);
    VersionedSchema writerSchema = schemaProvider.get(schemaId);
    return readAvroRecord(stream, writerSchema.getSchema(), readerSchema);
  } catch (IOException e) {
    throw new RuntimeException(e);
  }
}

private T readAvroRecord(InputStream stream, Schema writerSchema, Schema readerSchema) throws IOException {
  DatumReader<T> datumReader = new SpecificDatumReader<>(writerSchema, readerSchema);
  BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(stream, null);
  return datumReader.read(null, decoder);
}

Leitura Adicional


Para obter mais informações sobre compatibilidade de esquema, consulte a especificação Avro para resolução de esquema.

Para obter mais informações sobre formulários canônicos, consulte a especificação Avro para Parsing Canonical Form for Schemas.

Próxima vez…


A Parte 2 mostrará uma implementação de um sistema para armazenar as definições do esquema Avro.