PostgreSQL
 sql >> Base de Dados >  >> RDS >> PostgreSQL

Chaves primárias com Apache Spark


Escala :

Se tudo o que você precisa é de números exclusivos, você pode usar zipWithUniqueId e recriar DataFrame. Primeiro algumas importações e dados fictícios:
import sqlContext.implicits._
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, LongType}

val df = sc.parallelize(Seq(
    ("a", -1.0), ("b", -2.0), ("c", -3.0))).toDF("foo", "bar")

Extraia o esquema para uso posterior:
val schema = df.schema

Adicionar campo de identificação:
val rows = df.rdd.zipWithUniqueId.map{
   case (r: Row, id: Long) => Row.fromSeq(id +: r.toSeq)}

Criar DataFrame:
val dfWithPK = sqlContext.createDataFrame(
  rows, StructType(StructField("id", LongType, false) +: schema.fields))

A mesma coisa em Python :
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, LongType

row = Row("foo", "bar")
row_with_index = Row(*["id"] + df.columns)

df = sc.parallelize([row("a", -1.0), row("b", -2.0), row("c", -3.0)]).toDF()

def make_row(columns):
    def _make_row(row, uid):
        row_dict = row.asDict()
        return row_with_index(*[uid] + [row_dict.get(c) for c in columns])
    return _make_row

f = make_row(df.columns)

df_with_pk = (df.rdd
    .zipWithUniqueId()
    .map(lambda x: f(*x))
    .toDF(StructType([StructField("id", LongType(), False)] + df.schema.fields)))

Se você preferir um número consecutivo, pode substituir zipWithUniqueId com zipWithIndex mas é um pouco mais caro.

Diretamente com DataFrame API :

(universal Scala, Python, Java, R com praticamente a mesma sintaxe)

Anteriormente, eu perdi monotonicallyIncreasingId função que deve funcionar bem, desde que você não exija números consecutivos:
import org.apache.spark.sql.functions.monotonicallyIncreasingId

df.withColumn("id", monotonicallyIncreasingId).show()
// +---+----+-----------+
// |foo| bar|         id|
// +---+----+-----------+
// |  a|-1.0|17179869184|
// |  b|-2.0|42949672960|
// |  c|-3.0|60129542144|
// +---+----+-----------+

Embora útil monotonicallyIncreasingId é não determinístico. Não apenas os ids podem ser diferentes de execução para execução, mas sem truques adicionais não podem ser usados ​​para identificar linhas quando as operações subsequentes contêm filtros.

Observação :

Também é possível usar rowNumber função da janela:
from pyspark.sql.window import Window
from pyspark.sql.functions import rowNumber

w = Window().orderBy()
df.withColumn("id", rowNumber().over(w)).show()

Infelizmente:

WARN Window:Nenhuma partição definida para operação de janela! Mover todos os dados para uma única partição pode causar séria degradação do desempenho.

Portanto, a menos que você tenha uma maneira natural de particionar seus dados e garantir que a exclusividade não seja particularmente útil neste momento.