Aracılığıyla paylaş


AKS üzerinde HDInsight üzerinde Apache Flink® kümelerinde Hive diyalekt

Önemli

Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.

Bu makalede, AKS üzerinde HDInsight üzerinde Apache Flink kümelerinde Hive diyalektini kullanmayı öğrenin.

Giriş

Kullanıcı, AKS kümelerinde HDInsight'ta kullanımı için varsayılan flink diyalektini hive diyalekt olarak değiştiremez. Aşağıdaki hatayla tüm SQL işlemleri bir kez hive diyalekt olarak değiştirildiğinde başarısız olur.


*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader can't be cast to class java.net.URLClassLoader*

Bu sorunun nedeni, açık bir Hive Jira nedeniyle ortaya çıkar. Hive şu anda sistem sınıfı yükleyicisinin URLClassLoader örneği olduğunu varsayar. 'de Java 11bu varsayım geçerli değildir.

  • Webssh'de aşağıdaki adımları yürütür:

    1. Lib konumunda mevcut flink-sql-connector-hive*jar dosyasını kaldırma

      rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
      
    2. Pod'da webssh aşağıdaki jar dosyasını indirin ve /opt/flink-webssh/lib wget https://aka.ms/hdiflinkhivejdk11jaraltına ekleyin. (Yukarıdaki hive jar dosyasının düzeltmesi https://issues.apache.org/jira/browse/HIVE-27508vardır )

    3.  mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
      
    4. mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
      
    5. Yapılandırma yönetimine flink aşağıdaki anahtarları core-site.xml bölümüne ekleyin:

      fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
      
  • Hive diyalekt sorgularının genel bakışını burada bulabilirsiniz

    • Bölümleme olmadan Flink'te Hive diyalekt yürütme
      root [ ~ ]# ./bin/sql-client.sh
      Flink SQL>
      Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf');
      [INFO] Execute statement succeed.
    
      Flink SQL> use catalog myhive;
      [INFO] Execute statement succeed.
    
      Flink SQL> load module hive;
      [INFO] Execute statement succeed.
    
      Flink SQL> use modules hive,core;
      [INFO] Execute statement succeed.
    
      Flink SQL> set table.sql-dialect=hive;
      [INFO] Session property has been set.
    
      Flink SQL> set sql-client.execution.result-mode=tableau;
      [INFO] Session property has been set.
    
      Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8
    
    
    > [!WARNING]
    > An illegal reflective access operation has occurred
    
    > [!WARNING]
    > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string
    
    > [!WARNING]
    > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils
    
    > [!WARNING]
    > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations
    
    > [!WARNING]
    >  All illegal access operations will be denied in a future release
    select explode(array(1,2,3));
    
    
    +----+-------------+
    | op |         col |
    +----+-------------+
    | +I |           1 |
    | +I |           2 |
    | +I |           3 |
    +----+-------------+
    
    Received a total of 3 rows
    
    Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837
    
    [INFO] Execute statement succeed.
    
    Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d');
    
    [INFO] Submitting SQL update statement to the cluster...
    [INFO] SQL update statement has been successfully submitted to the cluster:
    Job ID: d0542da4c4252f9494298666ff4e9f8e
    
    Flink SQL> set execution.runtime-mode=batch;
    [INFO] Session property has been set.
    
    Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31
    
    +-----+-------+
    | key | value |
    +-----+-------+
    |   1 |     a |
    |   1 |     a |
    |   2 |     b |
    |   3 |     c |
    |   3 |     c |
    |   3 |     c |
    |   4 |     d |
    |   5 |     e |
    +-----+-------+
    8 rows in set
    
    Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98
    
    [INFO] Exiting Flink SQL CLI Client...
    
    Shutting down the session...
    done.
    root [ ~ ]# exit
    

    Veriler hive/warehouse dizininde yapılandırılan aynı kapsayıcıda yazılır.

    Kapsayıcı tablosu 1'i gösteren ekran görüntüsü.

    • Flink'te Bölümler ile Hive diyalekt yürütme
  create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');

  insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');

Kapsayıcı tablosu 2'nin ekran görüntüsü.

Kapsayıcı tablosu 3'i gösteren ekran görüntüsü.

Başvuru

  • Apache Flink'te Hive Diyalekt
  • Apache, Apache Flink, Flink ve ilişkili açık kaynak proje adları Apache Software Foundation'ın (ASF) ticari markalarıdır.