Aracılığıyla paylaş


AKS üzerinde HDInsight üzerinde Apache Flink® kümelerinde Tablo API'si ve SQL

Önemli

Bu özellik şu anda önizlemededir. Microsoft Azure Önizlemeleri için Ek Kullanım Koşulları, beta, önizleme aşamasında olan veya henüz genel kullanıma sunulmamış Azure özellikleri için geçerli olan daha fazla yasal hüküm içerir. Bu belirli önizleme hakkında bilgi için bkz . AKS üzerinde Azure HDInsight önizleme bilgileri. Sorular veya özellik önerileri için lütfen AskHDInsight'ta ayrıntıları içeren bir istek gönderin ve Azure HDInsight Topluluğu hakkında daha fazla güncelleştirme için bizi takip edin.

Apache Flink, birleşik akış ve toplu işlem için tablo API'si ve SQL olan iki ilişkisel API'ye sahiptir. Tablo API'si, seçim, filtreleme ve birleştirme gibi ilişkisel işleçlerden gelen sorguların sezgisel olarak bileşimini sağlayan, dille tümleşik bir sorgu API'dir. Flink'in SQL desteği, SQL standardını uygulayan Apache Calcite'i temel alır.

Tablo API'si ve SQL arabirimleri birbirleriyle ve Flink'in DataStream API'siyle sorunsuz bir şekilde tümleşir. Tüm API'ler ve kitaplıklar arasında kolayca geçiş yapabilirsiniz. Bu api'ler bunlar üzerinde derlenir.

Diğer SQL altyapılarında olduğu gibi Flink sorguları da tabloların üzerinde çalışır. Flink bekleyen verileri yerel olarak yönetmediğinden geleneksel bir veritabanından farklıdır; bunun yerine sorguları dış tablolar üzerinde sürekli olarak çalışır.

Flink veri işleme işlem hatları kaynak tablolarla başlar ve havuz tablolarıyla biter. Kaynak tablolar, sorgu yürütme sırasında üzerinde çalıştırılan satırlar oluşturur; bunlar bir sorgunun FROM yan tümcesinde başvuruda bulunan tablolardır. Bağlan orlar HDInsight Kafka, HDInsight HBase, Azure Event Hubs, veritabanları, dosya sistemleri veya bağlayıcısı sınıf yolu içinde yer alan başka bir sistem türünde olabilir.

Azure portalında Secure Shell'den CLI'yi kullanma hakkında bu makaleye başvurabilirsiniz. Başlamaya yönelik bazı hızlı örnekler aşağıda verilmiştir.

  • SQL istemcisini başlatmak için

    ./bin/sql-client.sh
    
  • Başlatma sql dosyasını sql-client ile birlikte çalışacak şekilde geçirmek için

    ./sql-client.sh -i /path/to/init_file.sql
    
  • sql-client'da yapılandırma ayarlamak için

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

Flink SQL aşağıdaki CREATE deyimlerini destekler

  • CREATE TABLE
  • CREATE DATABASE
  • KATALOG OLUŞTURMA

Aşağıda, MSSQL'e bağlanmak için jdbc bağlayıcısını kullanarak kaynak tablo tanımlamaya yönelik örnek bir söz dizimi verilmiştir; create TABLE Deyiminde sütun olarak id, name

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://server-name.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

CREATE DATABASE :

CREATE DATABASE students;

KATALOG OLUŞTUR:

CREATE CATALOG myhive WITH ('type'='hive');

Bu tabloların üstünde Sürekli Sorgular çalıştırabilirsiniz

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Kaynak Tablodan Havuz Tablosuna yazın:

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Bağımlılık ekleme

JAR deyimleri, sınıf yolu içine kullanıcı jar'ları eklemek veya sınıf yolu kullanıcı jar'larını kaldırmak veya çalışma zamanında sınıf yolu içinde eklenen jar'ları göstermek için kullanılır.

Flink SQL aşağıdaki JAR deyimlerini destekler:

  • ADD JAR
  • JAR'LARı GÖSTER
  • JAR KALDıRMA
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

Kataloglar veritabanları, tablolar, bölümler, görünümler ve işlevler gibi meta verileri ve veritabanında veya diğer dış sistemlerde depolanan verilere erişmek için gereken bilgileri sağlar.

AKS üzerinde HDInsight'ta Flink iki katalog seçeneğini destekliyoruz:

GenericInMemoryCatalog

GenericInMemoryCatalog, bir kataloğun bellek içi uygulamasıdır. Tüm nesneler yalnızca SQL oturumunun ömrü boyunca kullanılabilir.

HiveCatalog

HiveCatalog iki amaca hizmet eder: saf Flink meta verileri için kalıcı depolama alanı ve mevcut Hive meta verilerini okumak ve yazmak için bir arabirim olarak.

Not

AKS kümelerinde HDInsight, Apache Flink için Hive Meta Veri Deposu'nun tümleşik bir seçeneğiyle birlikte gelir. Küme oluşturma sırasında Hive Meta Veri Deposu'u seçebilirsiniz

Cli'yi kullanma ve Azure portalında Secure Shell'den Flink SQL İstemcisi'ni kullanmaya başlama hakkında bu makaleye başvurabilirsiniz.

  • Oturumu başlat sql-client.sh

    Varsayılan hive kataloğunu gösteren ekran görüntüsü.

    Default_catalog varsayılan bellek içi katalogdur

  • Şimdi bellek içi kataloğun varsayılan veritabanını denetleyelim Varsayılan bellek içi katalogları gösteren ekran görüntüsü.

  • Sürüm 3.1.2'nin Hive Kataloğu'nu oluşturup kullanalım

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    Not

    AKS üzerinde HDInsight, Hive 3.1.2 ve Hadoop 3.3.2'yi destekler. hive-conf-dir konumuna ayarlanır/opt/hive-conf

  • Hive kataloğunda Veritabanı oluşturalım ve bunu oturum için varsayılan yapalım (değiştirilmediği sürece). Hive kataloğunda veritabanı oluşturmayı ve oturum için varsayılan katalog yapmayı gösteren ekran görüntüsü.

Hive Tabloları Oluşturma ve Hive Kataloğuna Kaydetme

  • Flink Veritabanlarını Kataloga Oluşturma ve Kaydetme yönergelerini izleyin

  • Bölümsüz Hive bağlayıcı türünde Flink Tablosu oluşturalım

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • hive_table'a Veri Ekleme

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • hive_table'dan veri okuma

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    Not

    Hive Warehouse Dizini, Apache Flink kümesi oluşturma sırasında seçilen belirlenmiş depolama hesabı kapsayıcısında bulunur, hive/warehouse/ dizininde bulunabilir

  • Bölüm ile bağlayıcı türü hive Flink Tablosu oluşturalım

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

Önemli

Apache Flink'te bilinen bir sınırlama vardır. Son 'n' sütunları, kullanıcı tanımlı bölüm sütununa bakılmadan bölümler için seçilir. FLINK-32596 Hive tablosu oluşturmak için Flink diyalekt kullanıldığında bölüm anahtarı yanlış olacaktır.

Başvuru

  • Apache Flink Tablo API'si ve SQL
  • Apache, Apache Flink, Flink ve ilişkili açık kaynak proje adları Apache Software Foundation'ın (ASF) ticari markalarıdır.