Share via


Azure HDInsight'ta Hive Warehouse Bağlan veya tarafından desteklenen Apache Spark işlemleri

Bu makalede Hive Warehouse Bağlan or (HWC) tarafından desteklenen spark tabanlı işlemler gösterilmektedir. Aşağıda gösterilen tüm örnekler Apache Spark kabuğu aracılığıyla yürütülür.

Önkoşul

Hive Ambarı Bağlan veya kurulum adımlarını tamamlayın.

Başlarken

Spark-Shell oturumu başlatmak için aşağıdaki adımları uygulayın:

  1. Apache Spark kümenize bağlanmak için ssh komutunu kullanın. CLUSTERNAME değerini kümenizin adıyla değiştirerek aşağıdaki komutu düzenleyin ve komutunu girin:

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. Ssh oturumunuzda aşağıdaki komutu yürüterek sürümü not alın hive-warehouse-connector-assembly :

    ls /usr/hdp/current/hive_warehouse_connector
    
  3. Aşağıdaki kodu yukarıda tanımlanan sürümle hive-warehouse-connector-assembly düzenleyin. Ardından spark kabuğunu başlatmak için komutunu yürütür:

    spark-shell --master yarn \
    --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \
    --conf spark.security.credentials.hiveserver2.enabled=false
    
  4. Spark-shell başlatıldıktan sonra, aşağıdaki komutlar kullanılarak bir Hive Ambarı Bağlan veya örneği başlatılabilir:

    import com.hortonworks.hwc.HiveWarehouseSession
    val hive = HiveWarehouseSession.session(spark).build()
    

Hive sorgularını kullanarak Spark DataFrame'ler oluşturma

HWC kitaplığını kullanan tüm sorguların sonuçları DataFrame olarak döndürülür. Aşağıdaki örneklerde temel bir hive sorgusunun nasıl oluşturulacağı gösterilmektedir.

hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()

Sorgunun sonuçları, MLIB ve SparkSQL gibi Spark kitaplıklarıyla kullanılabilen Spark DataFrame'lerdir.

Spark DataFrames'i Hive tablolarına yazma

Spark, Hive'ın yönetilen ACID tablolarına yazmayı yerel olarak desteklemez. Ancak, HWC kullanarak herhangi bir DataFrame'i hive tablosuna yazabilirsiniz. Bu işlevi iş yerinde aşağıdaki örnekte görebilirsiniz:

  1. adlı sampletable_colorado bir tablo oluşturun ve aşağıdaki komutu kullanarak sütunlarını belirtin:

    hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
    
  2. Sütunun state eşit olduğu tabloyu hivesampletable filtreleyinColorado. Bu hive sorgusu bir Spark DataFrame döndürür ve sonuç hive tablosuna sampletable_coloradowrite işlevi kullanılarak kaydedilir.

    hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
    
  3. Sonuçları aşağıdaki komutla görüntüleyin:

    hive.table("sampletable_colorado").show()
    

    hive warehouse connector show hive table.

Yapılandırılmış akış yazma işlemleri

Hive Warehouse Bağlan veya kullanarak, Hive tablolarına veri yazmak için Spark akışını kullanabilirsiniz.

Önemli

Yapılandırılmış akış yazma işlemleri ESP özellikli Spark 4.0 kümelerinde desteklenmez.

9999 numaralı localhost bağlantı noktası üzerinden bir Spark akışından Hive tablosuna veri almak için aşağıdaki adımları izleyin. Hive Ambarı Bağlan veya.

  1. Açık Spark kabuğunuzdan aşağıdaki komutla bir spark akışı başlatın:

    val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
    
  2. Aşağıdaki adımları uygulayarak oluşturduğunuz Spark akışı için veri oluşturun:

    1. Aynı Spark kümesinde ikinci bir SSH oturumu açın.
    2. Komut istemine yazın nc -lk 9999. Bu komut, komut satırından netcat belirtilen bağlantı noktasına veri göndermek için yardımcı programını kullanır.
  3. İlk SSH oturumuna dönün ve akış verilerini tutmak için yeni bir Hive tablosu oluşturun. spark-shell'de aşağıdaki komutu girin:

    hive.createTable("stream_table").column("value","string").create()
    
  4. Ardından aşağıdaki komutu kullanarak akış verilerini yeni oluşturulan tabloya yazın:

    lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
    

    Önemli

    Apache Spark'taki metastoreUri bilinen bir sorun nedeniyle ve database seçenekleri şu anda el ile ayarlanmalıdır. Bu sorun hakkında daha fazla bilgi için bkz . SPARK-25460.

  5. İkinci SSH oturumuna dönün ve aşağıdaki değerleri girin:

    foo
    HiveSpark
    bar
    
  6. İlk SSH oturumuna dönün ve kısa etkinliği not edin. Verileri görüntülemek için aşağıdaki komutu kullanın:

    hive.table("stream_table").show()
    

İkinci SSH oturumunda durdurmak netcat için Ctrl + C tuşlarını kullanın. İlk SSH oturumunda spark-shell'dan çıkmak için kullanın :q .

Sonraki adımlar