De acordo com o erro, você já tem uma string, (você já fez
df.selectExpr("CAST(value AS STRING)") ), então você deve tentar obter o evento Row como um String , e não um Array[Byte] Comece alterando
val valueStr = new String(record.getAs[Array[Byte]]("value"))
para
val valueStr = record.getAs[String]("value")
Entendo que você já tenha um cluster para executar o código Spark, mas sugiro que você ainda examine o Conector de pia Kafka Connect Mongo para que você não precise escrever e manter seu próprio gravador Mongo no código Spark.
Ou você pode gravar conjuntos de dados Spark diretamente no mongo