Eu resolvi meu problema. O motivo das contagens inconsistentes foi o MongoDefaultPartitioner que envolve MongoSamplePartitioner que usa amostragem aleatória. Para ser honesto, este é um padrão bastante estranho para mim. Eu pessoalmente preferiria ter um particionador lento, mas consistente. Os detalhes das opções do particionador podem ser encontrados nas opções de configuração oficiais documentação.
código:
val df = spark.read
.format("com.mongodb.spark.sql.DefaultSource")
.option("uri", "mongodb://127.0.0.1/enron_mail.messages")
.option("partitioner", "spark.mongodb.input.partitionerOptions.MongoPaginateBySizePartitioner ")
.load()