Redis
 sql >> Base de Dados >  >> NoSQL >> Redis

Redis no Spark:tarefa não serializável


No Spark, as funções em RDD s (como map aqui) são serializados e enviados aos executores para processamento. Isso implica que todos os elementos contidos nessas operações devem ser serializáveis.

A conexão Redis aqui não é serializável, pois abre conexões TCP para o banco de dados de destino que está vinculada à máquina em que foi criada.

A solução é criar essas conexões nos executores, no contexto de execução local. Existem poucas maneiras de fazer isso. Dois que vêm à mente são:
  • rdd.mapPartitions :permite processar uma partição inteira de uma só vez e, portanto, amortizar o custo de criação de conexões)
  • Gerenciadores de conexões singleton:crie a conexão uma vez por executor

mapPartitions é mais fácil, pois tudo o que requer é uma pequena alteração na estrutura do programa:
val perhit = perhitFile.mapPartitions{partition => 
    val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
    val res = partition.map{ x =>
        ...
        val refStr = r.hmget(...) // use r to process the local data
    }
    r.close // take care of resources
    res
}

Um gerenciador de conexões singleton pode ser modelado com um objeto que contém uma referência lenta a uma conexão (observação:uma referência mutável também funcionará).
object RedisConnection extends Serializable {
   lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}

Este objeto pode ser usado para instanciar 1 conexão por JVM de trabalho e é usado como um Serializable objeto em um encerramento de operação.
val perhit = perhitFile.map{x => 
    val param = f(x)
    val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
    }
}

A vantagem de usar o objeto singleton é menos sobrecarga, pois as conexões são criadas apenas uma vez pela JVM (em oposição a 1 por partição RDD)

Há também algumas desvantagens:
  • a limpeza de conexões é complicada (gancho de desligamento/temporizadores)
  • é preciso garantir a segurança de thread de recursos compartilhados

(*) código fornecido para fins de ilustração. Não compilado ou testado.