No Spark, as funções em
RDD
s (como map
aqui) são serializados e enviados aos executores para processamento. Isso implica que todos os elementos contidos nessas operações devem ser serializáveis. A conexão Redis aqui não é serializável, pois abre conexões TCP para o banco de dados de destino que está vinculada à máquina em que foi criada.
A solução é criar essas conexões nos executores, no contexto de execução local. Existem poucas maneiras de fazer isso. Dois que vêm à mente são:
rdd.mapPartitions
:permite processar uma partição inteira de uma só vez e, portanto, amortizar o custo de criação de conexões)- Gerenciadores de conexões singleton:crie a conexão uma vez por executor
mapPartitions
é mais fácil, pois tudo o que requer é uma pequena alteração na estrutura do programa:val perhit = perhitFile.mapPartitions{partition =>
val r = new RedisClient("192.168.1.101", 6379) // create the connection in the context of the mapPartition operation
val res = partition.map{ x =>
...
val refStr = r.hmget(...) // use r to process the local data
}
r.close // take care of resources
res
}
Um gerenciador de conexões singleton pode ser modelado com um objeto que contém uma referência lenta a uma conexão (observação:uma referência mutável também funcionará).
object RedisConnection extends Serializable {
lazy val conn: RedisClient = new RedisClient("192.168.1.101", 6379)
}
Este objeto pode ser usado para instanciar 1 conexão por JVM de trabalho e é usado como um
Serializable
objeto em um encerramento de operação. val perhit = perhitFile.map{x =>
val param = f(x)
val refStr = RedisConnection.conn.hmget(...) // use RedisConnection to get a connection to the local data
}
}
A vantagem de usar o objeto singleton é menos sobrecarga, pois as conexões são criadas apenas uma vez pela JVM (em oposição a 1 por partição RDD)
Há também algumas desvantagens:
- a limpeza de conexões é complicada (gancho de desligamento/temporizadores)
- é preciso garantir a segurança de thread de recursos compartilhados
(*) código fornecido para fins de ilustração. Não compilado ou testado.