Skriv över data selektivt med Delta Lake

Azure Databricks använder Delta Lake-funktioner för att stödja två olika alternativ för selektiva överskrivningar:

  • Alternativet replaceWhere ersätter atomiskt alla poster som matchar ett angivet predikat.
  • Du kan ersätta kataloger med data baserat på hur tabeller partitioneras med dynamiska partitionsöverskrivningar.

För de flesta åtgärder rekommenderar Databricks att du använder replaceWhere för att ange vilka data som ska skrivas över.

Viktigt!

Om data har skrivits över av misstag kan du använda återställning för att ångra ändringen.

Godtycklig selektiv överskrivning med replaceWhere

Du kan selektivt skriva över endast de data som matchar ett godtyckligt uttryck.

Kommentar

SQL kräver Databricks Runtime 12.2 LTS eller senare.

Följande kommando ersätter atomiskt händelser i januari i måltabellen, som partitioneras av start_date, med data i replace_data:

Python

(replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")
)

Scala

replace_data.write
  .mode("overwrite")
  .option("replaceWhere", "start_date >= '2017-01-01' AND end_date <= '2017-01-31'")
  .save("/tmp/delta/events")

SQL

INSERT INTO TABLE events REPLACE WHERE start_date >= '2017-01-01' AND end_date <= '2017-01-31' SELECT * FROM replace_data

Den här exempelkoden skriver ut data i replace_data, verifierar att alla rader matchar predikatet och utför en atomisk ersättning med hjälp av overwrite semantik. Om några värden i åtgärden ligger utanför villkoret misslyckas den här åtgärden med ett fel som standard.

Du kan ändra det här beteendet till overwrite värden inom predikatintervallet och insert poster som ligger utanför det angivna intervallet. Om du vill göra det inaktiverar du begränsningskontrollen genom att ange spark.databricks.delta.replaceWhere.constraintCheck.enabled false med någon av följande inställningar:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.constraintCheck.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.constraintCheck.enabled=false

Äldre beteende

Äldre standardbeteende hade replaceWhere skriv över data som endast matchar ett predikat över partitionskolumner. Med den här äldre modellen skulle följande kommando atomiskt ersätta månaden januari i måltabellen, som partitioneras av date, med data i df:

Python

(df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("replaceWhere", "birthDate >= '2017-01-01' AND birthDate <= '2017-01-31'")
  .save("/tmp/delta/people10m")

Om du vill återgå till det gamla beteendet kan du inaktivera spark.databricks.delta.replaceWhere.dataColumns.enabled flaggan:

Python

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", False)

Scala

spark.conf.set("spark.databricks.delta.replaceWhere.dataColumns.enabled", false)

SQL

SET spark.databricks.delta.replaceWhere.dataColumns.enabled=false

Dynamisk partitionsöverskrivning

Viktigt!

Den här funktionen finns som allmänt tillgänglig förhandsversion.

Databricks Runtime 11.3 LTS och senare stöder dynamisk partitionsöverskrivningsläge för partitionerade tabeller. För tabeller med flera partitioner stöder Databricks Runtime 11.3 LTS och nedan endast dynamiska partitionsöverskrivningar om alla partitionskolumner är av samma datatyp.

I läget för dynamisk partitionsöverskrivning skriver åtgärderna över alla befintliga data i varje logisk partition som skrivningen checkar in nya data för. Alla befintliga logiska partitioner som skrivningen inte innehåller data för förblir oförändrade. Det här läget gäller endast när data skrivs i överskrivningsläge: antingen INSERT OVERWRITE i SQL eller en DataFrame-skrivning med df.write.mode("overwrite").

Konfigurera läget för dynamisk partitionsöverskrivning genom att ange Spark-sessionskonfigurationen spark.sql.sources.partitionOverwriteMode till dynamic. Du kan också aktivera detta genom att ange DataFrameWriter alternativet partitionOverwriteMode till dynamic. Om det finns åsidosätter det frågespecifika alternativet det läge som definierats i sessionskonfigurationen. Standardvärdet för partitionOverwriteMode är static.

Viktigt!

Kontrollera att de data som skrivs med dynamisk partitionsöverskrivning endast berör de förväntade partitionerna. En enskild rad i den felaktiga partitionen kan leda till att en hel partition oavsiktligt skrivs över.

I följande exempel visas hur du använder dynamiska partitionsöverskrivningar:

SQL

SET spark.sql.sources.partitionOverwriteMode=dynamic;
INSERT OVERWRITE TABLE default.people10m SELECT * FROM morePeople;

Python

(df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")
)

Scala

df.write
  .mode("overwrite")
  .option("partitionOverwriteMode", "dynamic")
  .saveAsTable("default.people10m")

Kommentar

  • Dynamisk partitionsöverskrivning står i konflikt med alternativet replaceWhere för partitionerade tabeller.
    • Om dynamisk partitionsöverskrivning är aktiverat i Spark-sessionskonfigurationen och replaceWhere tillhandahålls som ett DataFrameWriter alternativ skriver Delta Lake över data enligt replaceWhere uttrycket (frågespecifika alternativ åsidosätter sessionskonfigurationer).
    • Du får ett felmeddelande om DataFrameWriter alternativen har både dynamisk partitionsöverskrivning och replaceWhere aktiverat.
  • Du kan inte ange overwriteSchema som true när du använder dynamisk partitionsöverskrivning.