Azure HDInsight'ta Hive Warehouse Bağlan veya tarafından desteklenen Apache Spark işlemleri
Bu makalede Hive Warehouse Bağlan or (HWC) tarafından desteklenen spark tabanlı işlemler gösterilmektedir. Aşağıda gösterilen tüm örnekler Apache Spark kabuğu aracılığıyla yürütülür.
Önkoşul
Hive Ambarı Bağlan veya kurulum adımlarını tamamlayın.
Başlarken
Spark-Shell oturumu başlatmak için aşağıdaki adımları uygulayın:
Apache Spark kümenize bağlanmak için ssh komutunu kullanın. CLUSTERNAME değerini kümenizin adıyla değiştirerek aşağıdaki komutu düzenleyin ve komutunu girin:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Ssh oturumunuzda aşağıdaki komutu yürüterek sürümü not alın
hive-warehouse-connector-assembly
:ls /usr/hdp/current/hive_warehouse_connector
Aşağıdaki kodu yukarıda tanımlanan sürümle
hive-warehouse-connector-assembly
düzenleyin. Ardından spark kabuğunu başlatmak için komutunu yürütür:spark-shell --master yarn \ --jars /usr/hdp/current/hive_warehouse_connector/hive-warehouse-connector-assembly-<STACK_VERSION>.jar \ --conf spark.security.credentials.hiveserver2.enabled=false
Spark-shell başlatıldıktan sonra, aşağıdaki komutlar kullanılarak bir Hive Ambarı Bağlan veya örneği başlatılabilir:
import com.hortonworks.hwc.HiveWarehouseSession val hive = HiveWarehouseSession.session(spark).build()
Hive sorgularını kullanarak Spark DataFrame'ler oluşturma
HWC kitaplığını kullanan tüm sorguların sonuçları DataFrame olarak döndürülür. Aşağıdaki örneklerde temel bir hive sorgusunun nasıl oluşturulacağı gösterilmektedir.
hive.setDatabase("default")
val df = hive.executeQuery("select * from hivesampletable")
df.filter("state = 'Colorado'").show()
Sorgunun sonuçları, MLIB ve SparkSQL gibi Spark kitaplıklarıyla kullanılabilen Spark DataFrame'lerdir.
Spark DataFrames'i Hive tablolarına yazma
Spark, Hive'ın yönetilen ACID tablolarına yazmayı yerel olarak desteklemez. Ancak, HWC kullanarak herhangi bir DataFrame'i hive tablosuna yazabilirsiniz. Bu işlevi iş yerinde aşağıdaki örnekte görebilirsiniz:
adlı
sampletable_colorado
bir tablo oluşturun ve aşağıdaki komutu kullanarak sütunlarını belirtin:hive.createTable("sampletable_colorado").column("clientid","string").column("querytime","string").column("market","string").column("deviceplatform","string").column("devicemake","string").column("devicemodel","string").column("state","string").column("country","string").column("querydwelltime","double").column("sessionid","bigint").column("sessionpagevieworder","bigint").create()
Sütunun
state
eşit olduğu tabloyuhivesampletable
filtreleyinColorado
. Bu hive sorgusu bir Spark DataFrame döndürür ve sonuç hive tablosunasampletable_colorado
write
işlevi kullanılarak kaydedilir.hive.table("hivesampletable").filter("state = 'Colorado'").write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector").mode("append").option("table","sampletable_colorado").save()
Sonuçları aşağıdaki komutla görüntüleyin:
hive.table("sampletable_colorado").show()
Yapılandırılmış akış yazma işlemleri
Hive Warehouse Bağlan veya kullanarak, Hive tablolarına veri yazmak için Spark akışını kullanabilirsiniz.
Önemli
Yapılandırılmış akış yazma işlemleri ESP özellikli Spark 4.0 kümelerinde desteklenmez.
9999 numaralı localhost bağlantı noktası üzerinden bir Spark akışından Hive tablosuna veri almak için aşağıdaki adımları izleyin. Hive Ambarı Bağlan veya.
Açık Spark kabuğunuzdan aşağıdaki komutla bir spark akışı başlatın:
val lines = spark.readStream.format("socket").option("host", "localhost").option("port",9999).load()
Aşağıdaki adımları uygulayarak oluşturduğunuz Spark akışı için veri oluşturun:
- Aynı Spark kümesinde ikinci bir SSH oturumu açın.
- Komut istemine yazın
nc -lk 9999
. Bu komut, komut satırındannetcat
belirtilen bağlantı noktasına veri göndermek için yardımcı programını kullanır.
İlk SSH oturumuna dönün ve akış verilerini tutmak için yeni bir Hive tablosu oluşturun. spark-shell'de aşağıdaki komutu girin:
hive.createTable("stream_table").column("value","string").create()
Ardından aşağıdaki komutu kullanarak akış verilerini yeni oluşturulan tabloya yazın:
lines.filter("value = 'HiveSpark'").writeStream.format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource").option("database", "default").option("table","stream_table").option("metastoreUri",spark.conf.get("spark.datasource.hive.warehouse.metastoreUri")).option("checkpointLocation","/tmp/checkpoint1").start()
Önemli
Apache Spark'taki
metastoreUri
bilinen bir sorun nedeniyle vedatabase
seçenekleri şu anda el ile ayarlanmalıdır. Bu sorun hakkında daha fazla bilgi için bkz . SPARK-25460.İkinci SSH oturumuna dönün ve aşağıdaki değerleri girin:
foo HiveSpark bar
İlk SSH oturumuna dönün ve kısa etkinliği not edin. Verileri görüntülemek için aşağıdaki komutu kullanın:
hive.table("stream_table").show()
İkinci SSH oturumunda durdurmak netcat
için Ctrl + C tuşlarını kullanın. İlk SSH oturumunda spark-shell'dan çıkmak için kullanın :q
.