Verarbeiten von fehlerhaften Datensätzen und Dateien

Azure Databricks bietet eine Reihe von Optionen für den Umgang mit Dateien, die fehlerhafte Datensätze enthalten. Beispiele für fehlerhafte Daten:

  • Unvollständige oder beschädigte Datensätze: Treten hauptsächlich in textbasierten Dateiformaten wie JSON und CSV auf. Beispiele: ein JSON-Datensatz ohne schließende Klammer oder ein CSV-Datensatz, der nicht über dieselbe Spaltenanzahl wie die Kopfzeile oder der erste Datensatz der CSV-Datei verfügt.
  • Nicht übereinstimmende Datentypen: Der Wert für eine Spalte weist nicht den angegebenen oder abgeleiteten Datentyp auf.
  • Ungültige Feldnamen: Dies kann in allen Dateiformaten auftreten, wenn der in der Datei oder dem Datensatz angegebene Spaltenname eine andere Groß-/Kleinschreibung aufweist als das angegebene oder abgeleitete Schema.
  • Beschädigte Dateien: Eine Datei kann nicht gelesen werden, möglicherweise aufgrund von beschädigten Metadaten oder Daten in Binärdateitypen wie Avro, Parquet oder ORC. In seltenen Fällen kann dies durch langanhaltende vorübergehende Fehler im zugrunde liegenden Speichersystem verursacht werden.
  • Fehlende Dateien: Eine Datei wurde während der Abfrageanalyse erkannt wurde und ist zur Verarbeitungszeit nicht mehr vorhanden.

Verwenden Sie badRecordsPath

Wenn Sie badRecordsPath festlegen, zeichnet der angegebene Pfad Ausnahmen für fehlerhafte Datensätze oder Dateien auf, die beim Laden von Daten aufgetreten sind.

Neben beschädigten Datensätzen und Dateien werden Fehler, die auf gelöschte Dateien, Netzwerkverbindungsausnahmen, E/A-Ausnahmen usw. hinweisen, ignoriert und im badRecordsPath aufgezeichnet.

Hinweis

Bei der Verwendung der Option badRecordsPath in einer dateibasierten Datenquelle gibt es einige wichtige Einschränkungen:

  • Sie ist nicht transaktional und kann zu inkonsistenten Ergebnissen führen.
  • Vorübergehende Fehler werden als Ausfälle behandelt.

Eingabedatei kann nicht gefunden werden

val df = spark.read
  .option("badRecordsPath", "/tmp/badRecordsPath")
  .format("parquet").load("/input/parquetFile")

// Delete the input parquet file '/input/parquetFile'
dbutils.fs.rm("/input/parquetFile")

df.show()

Da die Eingabedatei im obigen Beispiel mit df.show() nicht gefunden werden kann, erstellt Spark eine Ausnahmedatei im JSON-Format, um den Fehler aufzuzeichnen. /tmp/badRecordsPath/20170724T101153/bad_files/xyz ist beispielsweise der Pfad der Ausnahmedatei. Diese Datei befindet sich im angegebenen badRecordsPath-Verzeichnis /tmp/badRecordsPath. 20170724T101153 ist die Erstellungszeit dieses DataFrameReader-Objekts. bad_files ist der Ausnahmetyp. xyz ist eine Datei, die einen JSON-Datensatz enthält, der den Pfad der fehlerhaften Datei und die Ausnahme-/Ursachenmeldung enthält.

Eingabedatei enthält fehlerhaften Datensatz

// Creates a json file containing both parsable and corrupted records
Seq("""{"a": 1, "b": 2}""", """{bad-record""").toDF().write.format("text").save("/tmp/input/jsonFile")

val df = spark.read
  .option("badRecordsPath", "/tmp/badRecordsPath")
  .schema("a int, b int")
  .format("json")
  .load("/tmp/input/jsonFile")

df.show()

In diesem Beispiel enthält der Datenrahmen nur den ersten analysierbaren Datensatz ({"a": 1, "b": 2}). Der zweite fehlerhafte Datensatz ({bad-record) wird in der Ausnahmedatei aufgezeichnet, bei der es sich um eine unter /tmp/badRecordsPath/20170724T114715/bad_records/xyz gespeicherte JSON-Datei handelt. Die Ausnahmedatei enthält den fehlerhaften Datensatz, den Pfad der Datei, die den Datensatz enthält, und die Ausnahme-/Ursachenmeldung. Nachdem Sie die Ausnahmedateien gefunden haben, können Sie diese Dateien mit einem JSON-Reader verarbeiten.