O problema está no seu código. Como você substitui uma tabela da qual está tentando ler, efetivamente oblitera todos os dados antes que o Spark possa realmente acessá-los.
Lembre-se que o Spark é preguiçoso. Quando você cria um
Dataset
O Spark busca os metadados necessários, mas não carrega os dados. Portanto, não há cache mágico que preservará o conteúdo original. Os dados serão carregados quando forem realmente necessários. Aqui está quando você executa write
ação e quando você começa a escrever não há mais dados a serem buscados. O que você precisa é algo assim:
- Criar um
Dataset
. -
Aplique as transformações necessárias e grave dados em uma tabela MySQL intermediária.
-
TRUNCATE
a entrada original eINSERT INTO ... SELECT
da tabela intermediária ouDROP
a tabela original eRENAME
mesa intermediária.
Uma abordagem alternativa, mas menos favorável, seria:
- Criar um
Dataset
. - Aplicar as transformações necessárias e gravar dados em uma tabela Spark persistente (
df.write.saveAsTable(...)
ou equivalente) TRUNCATE
a entrada original.- Leia os dados e salve (
spark.table(...).write.jdbc(...)
) - Solte a tabela Spark.
Não podemos enfatizar o suficiente que usar o
cache
do Spark / persist
não é o caminho a seguir. Mesmo com o conservador StorageLevel
(MEMORY_AND_DISK_2
/ MEMORY_AND_DISK_SER_2
) os dados em cache podem ser perdidos (falhas de nó), levando a erros de correção silenciosos.