Erreur de fonction non définie GeoSpark avec DBConnect

Problème

Vous essayez d’utiliser la fonction géospark st_geofromwkt avec DBConnect et vous recevez un message d’erreur Apache Spark.

Error: org.apache.spark.sql.AnalysisException: Undefined function: 'st_geomfromwkt'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.;

Cet exemple de code échoue avec l’erreur lorsqu’il est utilisé avec DBConnect.

val sc = spark.sparkContext
sc.setLogLevel("DEBUG")

val sqlContext = spark.sqlContext
spark.sparkContext.addJar("~/jars/geospark-sql_2.3-1.2.0.jar")
spark.sparkContext.addJar("~/jars/geospark-1.2.0.jar")

GeoSparkSQLRegistrator.registerAll(sqlContext)
println(spark.sessionState.functionRegistry.listFunction)

spark.sql("select ST_GeomFromWKT(area) AS geometry from polygon").show()

Cause

DBConnect ne prend pas en charge la synchronisation automatique des fonctions définies par l’utilisateur côté client sur le serveur.

Solution

Vous pouvez utiliser un fichier jar d’utilitaire personnalisé avec du code qui inscrit l’UDF sur le cluster à l’aide de la classe SparkSessionExtensions .

  1. Créez un fichier jar d’utilitaire qui enregistre les fonctions géospark à l’aide de SparkSessionExtensions . Cette définition de classe utilitaire peut être intégrée à un fichier jar d’utilitaire.

    package com.databricks.spark.utils
    
    import org.apache.spark.sql.SparkSessionExtensions
    import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator
    
    class GeoSparkUdfExtension extends (SparkSessionExtensions => Unit) {
      def apply(e: SparkSessionExtensions): Unit = {
        e.injectCheckRule(spark => {
          println("INJECTING UDF")
          GeoSparkSQLRegistrator.registerAll(spark)
          _ => Unit
        })
      }
    }
    
  2. Copiez les fichiers jar géospark et votre fichier jar dans le fichier DBFS à l’adresse dbfs:/databricks/geospark-extension-jars/ .

  3. Créez un script init ( set_geospark_extension_jar.sh ) qui copie les fichiers jar de l’emplacement dBFS vers le chemin de classe Spark et affecte spark.sql.extensions à la classe utilitaire.

    dbutils.fs.put(
        "dbfs:/databricks/<init-script-folder>/set_geospark_extension_jar.sh",
        """#!/bin/sh
          |sleep 10s
          |# Copy the extension and GeoSpark dependency jars to /databricks/jars.
          |cp -v /dbfs/databricks/geospark-extension-jars/{spark_geospark_extension_2_11_0_1.jar,geospark_sql_2_3_1_2_0.jar,geospark_1_2_0.jar} /databricks/jars/
          |# Set the extension.
          |cat << 'EOF' > /databricks/driver/conf/00-custom-spark.conf
          |[driver] {
          |    "spark.sql.extensions" = "com.databricks.spark.utils.GeoSparkUdfExtension"
          |}
          |EOF
          |""".stripMargin,
        overwrite = true
    )
    
  4. Installez le script init en tant que script init de l’étendue du cluster. Vous aurez besoin du chemin d’accès complet à l’emplacement du script ( dbfs:/databricks/<init-script-folder>/set_geospark_extension_jar.sh ).

  5. Redémarrez votre cluster.

  6. Vous pouvez maintenant utiliser le code géospark avec DBConnect.