De acordo com o erro, você já tem uma string, (você já fez
df.selectExpr("CAST(value AS STRING)")
), então você deve tentar obter o evento Row como um String
, e não um Array[Byte]
Comece alterando
val valueStr = new String(record.getAs[Array[Byte]]("value"))
para
val valueStr = record.getAs[String]("value")
Entendo que você já tenha um cluster para executar o código Spark, mas sugiro que você ainda examine o Conector de pia Kafka Connect Mongo para que você não precise escrever e manter seu próprio gravador Mongo no código Spark.
Ou você pode gravar conjuntos de dados Spark diretamente no mongo