A GeoSpark nem definiált függvényének hibája a DBConnect esetében

Probléma

A GeoSpark függvényt a st_geofromwktst_geofromwkt próbálja használni, és megjelenik egy Apache Spark hibaüzenet.

Error: org.apache.spark.sql.AnalysisException: Undefined function: 'st_geomfromwkt'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.;

Ez a példakód a DBConnect esetében hibát jelez.

val sc = spark.sparkContext
sc.setLogLevel("DEBUG")

val sqlContext = spark.sqlContext
spark.sparkContext.addJar("~/jars/geospark-sql_2.3-1.2.0.jar")
spark.sparkContext.addJar("~/jars/geospark-1.2.0.jar")

GeoSparkSQLRegistrator.registerAll(sqlContext)
println(spark.sessionState.functionRegistry.listFunction)

spark.sql("select ST_GeomFromWKT(area) AS geometry from polygon").show()

Ok

A DBConnect nem támogatja az ügyféloldaliudF-ek automatikus szinkronizálását a kiszolgálóra.

Megoldás

A SparkSessionExtensions osztály használatával a fürtön az UDF-et regisztráló kóddal használhat egyéni segédprogram jar-t.

 1. Hozzon létre egy segédprogram jar-t, amely regisztrálja a GeoSpark-függvényeket a SparkSessionExtensions használatával. Ez a segédprogramosztály-definíció beépíthető egy segédprogram jar-fájlba.

  package com.databricks.spark.utils
  
  import org.apache.spark.sql.SparkSessionExtensions
  import org.datasyslab.geosparksql.utils.GeoSparkSQLRegistrator
  
  class GeoSparkUdfExtension extends (SparkSessionExtensions => Unit) {
   def apply(e: SparkSessionExtensions): Unit = {
    e.injectCheckRule(spark => {
     println("INJECTING UDF")
     GeoSparkSQLRegistrator.registerAll(spark)
     _ => Unit
    })
   }
  }
  
 2. Másolja a GeoSpark jar-fájlokat és a segédprogram jar-fájlját a DBFS-be a következő helyről: dbfs:/databricks/geospark-extension-jars/ .

 3. Hozzon létre egy init szkriptet ( ), amely átmásolja a JAR-fájlokat a DBFS-helyről a Spark-osztály elérési útjára, és beállítja a set_geospark_extension_jar.shspark.sql.extensions segédprogramosztályt.

  dbutils.fs.put(
    "dbfs:/databricks/<init-script-folder>/set_geospark_extension_jar.sh",
    """#!/bin/sh
     |sleep 10s
     |# Copy the extension and GeoSpark dependency jars to /databricks/jars.
     |cp -v /dbfs/databricks/geospark-extension-jars/{spark_geospark_extension_2_11_0_1.jar,geospark_sql_2_3_1_2_0.jar,geospark_1_2_0.jar} /databricks/jars/
     |# Set the extension.
     |cat << 'EOF' > /databricks/driver/conf/00-custom-spark.conf
     |[driver] {
     |  "spark.sql.extensions" = "com.databricks.spark.utils.GeoSparkUdfExtension"
     |}
     |EOF
     |""".stripMargin,
    overwrite = true
  )
  
 4. Telepítse az init szkriptet fürthatókörű init szkriptként. Szüksége lesz a szkript helyének teljes elérési útjára ( dbfs:/databricks/<init-script-folder>/set_geospark_extension_jar.sh ).

 5. Indítsa újra a fürtöt.

 6. Most már használhatja a GeoSpark-kódot a DBConnect használatával.