Escala :
Se tudo o que você precisa é de números exclusivos, você pode usar
zipWithUniqueId
e recriar DataFrame. Primeiro algumas importações e dados fictícios:import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}
val df = sc.parallelize(Seq(
("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")
Extraia o esquema para uso posterior:
val schema = df.schema
Adicionar campo de identificação:
val rows = df.rdd.zipWithUniqueId.map{
case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}
Criar DataFrame:
val dfWithPK = sqlContext.createDataFrame(
rows, StructType(StructField("id", LongType, false) +: schema.fields))
A mesma coisa em Python :
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType
row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)
df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()
def make_row(columns):
def _make_row(row, uid):
row_dict = row.asDict()
return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
return _make_row
f = make_row(df.columns)
df_with_pk = (df.rdd
.zipWithUniqueId()
.map(lambda x: f(*x))
.toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))
Se você preferir um número consecutivo, pode substituir
zipWithUniqueId
com zipWithIndex
mas é um pouco mais caro. Diretamente com
DataFrame
API :(universal Scala, Python, Java, R com praticamente a mesma sintaxe)
Anteriormente, eu perdi
monotonicallyIncreasingId
função que deve funcionar bem, desde que você não exija números consecutivos:import org.apache.spark.sql.functions.monotonicallyIncreasingId
df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar| id|
// +---+----+-----------+
// | a|-1.0|17179869184|
// | b|-2.0|42949672960|
// | c|-3.0|60129542144|
// +---+----+-----------+
Embora útil
monotonicallyIncreasingId
é não determinístico. Não apenas os ids podem ser diferentes de execução para execução, mas sem truques adicionais não podem ser usados para identificar linhas quando as operações subsequentes contêm filtros. Observação :
Também é possível usar
rowNumber
função da janela:from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber
w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()
Infelizmente:
WARN Window:Nenhuma partição definida para operação de janela! Mover todos os dados para uma única partição pode causar séria degradação do desempenho.
Portanto, a menos que você tenha uma maneira natural de particionar seus dados e garantir que a exclusividade não seja particularmente útil neste momento.