GeoSpark-Fehler „Nicht definierte Funktion“ bei DBConnect

Problem

Sie versuchen, die GeoSpark-Funktion st_geofromwkt mit st_geofromwkt zu verwenden, und erhalten eine Apache Spark Fehlermeldung.

Error: org.apache.spark.sql.AnalysisException: Undefined function: 'st_geomfromwkt'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.;

Dieser Beispielcode schlägt mit dem Fehler fehl, wenn er mit DBConnect verwendet wird.

val sc = spark.sparkContext
sc.setLogLevel("DEBUG")

val sqlContext = spark.sqlContext
spark.sparkContext.addJar("~/jars/geospark-sql_2.3-1.2.0.jar")
spark.sparkContext.addJar("~/jars/geospark-1.2.0.jar")

GeoSparkSQLRegistrator.registerAll(sqlContext)
println(spark.sessionState.functionRegistry.listFunction)

spark.sql("select ST_GeomFromWKT(area) AS geometry from polygon").show()

Ursache

DBConnect unterstützt keine automatische Synchronisierung clientseitiger UDFs mit dem Server.

Lösung

Sie können eine benutzerdefinierte Hilfsprogramm-JAR-Datei mit Code verwenden, der die benutzerdefinierte Funktion im Cluster mithilfe der SparkSessionExtensions-Klasse registriert.

  1. Erstellen Sie eine Hilfsprogramm-JAR-Datei, die GeoSpark-Funktionen mithilfe von SparkSessionExtensions registriert. Diese Definition der Hilfsprogrammklasse kann in eine Hilfsprogramm-JAR-Datei integriert werden.

    package com.databricks.spark.utils
    
    import org.apache.spark.sql.SparkSessionExtensions
    import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator
    
    class GeoSparkUdfExtension extends (SparkSessionExtensions => Unit) {
      def apply(e: SparkSessionExtensions): Unit = {
        e.injectCheckRule(spark => {
          println("INJECTING UDF")
          GeoSparkSQLRegistrator.registerAll(spark)
          _ => Unit
        })
      }
    }
    
  2. Kopieren Sie die GeoSpark-JAR-Dateien und Ihre Hilfsprogramm-JAR-Datei in DBFS unter dbfs:/databricks/geospark-extension-jars/ .

  3. Erstellen Sie ein Initialisskript ( set_geospark_extension_jar.sh ), das die JAR-Dateien vom DBFS-Speicherort in den Spark-Klassenpfad kopiert und spark.sql.extensions auf die Hilfsprogrammklasse festlegt.

    dbutils.fs.put(
        "dbfs:/databricks/<init-script-folder>/set_geospark_extension_jar.sh",
        """#!/bin/sh
          |sleep 10s
          |# Copy the extension and GeoSpark dependency jars to /databricks/jars.
          |cp -v /dbfs/databricks/geospark-extension-jars/{spark_geospark_extension_2_11_0_1.jar,geospark_sql_2_3_1_2_0.jar,geospark_1_2_0.jar} /databricks/jars/
          |# Set the extension.
          |cat << 'EOF' > /databricks/driver/conf/00-custom-spark.conf
          |[driver] {
          |    "spark.sql.extensions" = "com.databricks.spark.utils.GeoSparkUdfExtension"
          |}
          |EOF
          |""".stripMargin,
        overwrite = true
    )
    
  4. Installieren Sie das Init-Skript als clusterbezogenes Init-Skript. Sie benötigen den vollständigen Pfad zum Speicherort des Skripts ( dbfs:/databricks/<init-script-folder>/set_geospark_extension_jar.sh ).

  5. Starten Sie Ihren Cluster neu.

  6. Sie können jetzt GeoSpark-Code mit DBConnect verwenden.