Share via


Table-API en SQL in Apache Flink-clusters® in HDInsight op AKS

Belangrijk

Deze functie is momenteel beschikbaar in preview. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews bevatten meer juridische voorwaarden die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet beschikbaar zijn in algemene beschikbaarheid. Zie Azure HDInsight op AKS Preview-informatie voor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight met de details en volgt u ons voor meer updates in de Azure HDInsight-community.

Apache Flink bevat twee relationele API's, de Table-API en SQL, voor geïntegreerde stream- en batchverwerking. De Table-API is een met taal geïntegreerde query-API waarmee u query's kunt samenstellen van relationele operators, zoals selectie, filter en join intuïtief. De SQL-ondersteuning van Flink is gebaseerd op Apache Calcite, waarmee de SQL-standaard wordt geïmplementeerd.

De Table-API en SQL-interfaces kunnen naadloos worden geïntegreerd met elkaar en de DataStream-API van Flink. U kunt eenvoudig schakelen tussen alle API's en bibliotheken, die hierop zijn gebaseerd.

Net als andere SQL-engines werken Flink-query's op tabellen. Het verschilt van een traditionele database omdat Flink geen data-at-rest lokaal beheert; In plaats daarvan worden de query's continu uitgevoerd via externe tabellen.

Flink pijplijnen voor gegevensverwerking beginnen met brontabellen en eindigen met sinktabellen. Brontabellen produceren rijen die worden uitgevoerd tijdens de uitvoering van de query; dit zijn de tabellen waarnaar wordt verwezen in de FROM-component van een query. Verbinding maken ors kunnen van het type HDInsight Kafka, HDInsight HBase, Azure Event Hubs, databases, bestandssysteem of een ander systeem zijn waarvan de connector zich in het klassepad bevindt.

Raadpleeg dit artikel over het gebruik van CLI vanuit Secure Shell in Azure Portal. Hier volgen enkele snelle voorbeelden van hoe u aan de slag kunt gaan.

  • De SQL-client starten

    ./bin/sql-client.sh
    
  • Een initialisatie-SQL-bestand doorgeven om samen met sql-client uit te voeren

    ./sql-client.sh -i /path/to/init_file.sql
    
  • Een configuratie instellen in sql-client

    SET execution.runtime-mode = streaming;
    SET sql-client.execution.result-mode = table;
    SET sql-client.execution.max-table-result.rows = 10000;
    

SQL DDL

Flink SQL ondersteunt de volgende CREATE-instructies

  • CREATE TABLE
  • CREATE DATABASE
  • CATALOGUS MAKEN

Hieronder volgt een voorbeeldsyntaxis voor het definiëren van een brontabel met behulp van jdbc-connector om verbinding te maken met MSSQL, met id, naam als kolommen in een CREATE TABLE-instructie

CREATE TABLE student_information (
    id BIGINT,
    name STRING,  
    address STRING,
    grade STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'jdbc',
     'url' = 'jdbc:sqlserver://server-name.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
     'table-name' = 'students',
     'username' = 'username',
     'password' = 'password'
 );

CREATE DATABASE :

CREATE DATABASE students;

CATALOGUS MAKEN:

CREATE CATALOG myhive WITH ('type'='hive');

U kunt doorlopende query's uitvoeren boven op deze tabellen

  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Schrijven naar sinktabel uit de brontabel:

  INSERT INTO grade_counts
  SELECT id,
  COUNT(*) as student_count 
  FROM student_information 
  GROUP BY grade;

Afhankelijkheden toevoegen

JAR-instructies worden gebruikt om gebruikers-JAR-bestanden toe te voegen aan het klassepad of gebruikers-JAR's te verwijderen uit het klassepad of om toegevoegde JAR's weer te geven in het klassepad in de runtime.

Flink SQL ondersteunt de volgende JAR-instructies:

  • ADD JAR
  • JARS WEERGEVEN
  • JAR VERWIJDEREN
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.

Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.

Flink SQL> SHOW JARS;
+----------------------------+
|                       jars |
+----------------------------+
|            /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+

Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.

Catalogi bieden metagegevens, zoals databases, tabellen, partities, weergaven en functies en informatie die nodig is voor toegang tot gegevens die zijn opgeslagen in een database of andere externe systemen.

In HDInsight op AKS ondersteunen we twee catalogusopties:

GenericInMemoryCatalog

GenericInMemoryCatalog is een in-memory implementatie van een catalogus. Alle objecten zijn alleen beschikbaar voor de levensduur van de SQL-sessie.

HiveCatalog

Het HiveCatalog dient twee doeleinden: als permanente opslag voor pure Flink-metagegevens en als interface voor het lezen en schrijven van bestaande Hive-metagegevens.

Notitie

HDInsight op AKS-clusters wordt geleverd met een geïntegreerde optie van Hive Metastore voor Apache Flink. U kunt kiezen voor Hive Metastore tijdens het maken van het cluster

U kunt dit artikel raadplegen over het gebruik van CLI en aan de slag met Flink SQL Client vanuit Secure Shell in Azure Portal.

  • Sessie starten sql-client.sh

    Schermopname van de standaard hive-catalogus.

    Default_catalog is de standaardcatalogus in het geheugen

  • Laten we nu de standaarddatabase van in-memory catalogus controleren Schermopname van de standaardcatalogus in het geheugen.

  • Laten we Hive Catalog van versie 3.1.2 maken en gebruiken

      CREATE CATALOG myhive WITH ('type'='hive');
      USE CATALOG myhive;
    

    Notitie

    HDInsight op AKS ondersteunt Hive 3.1.2 en Hadoop 3.3.2. De hive-conf-dir is ingesteld op locatie /opt/hive-conf

  • We maken database in hive-catalogus en maken deze standaard voor de sessie (tenzij gewijzigd). Schermopname van het maken van een database in hive-catalogus en het maken van de standaardcatalogus voor de sessie.

Hive-tabellen maken en registreren bij Hive-catalogus

  • Volg de instructies voor het maken en registreren van Flink Databases to Catalog

  • Laten we Flink Table van connectortype Hive maken zonder partitie

    CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    
  • Gegevens invoegen in hive_table

    INSERT INTO hive_table SELECT 2, '10';
    INSERT INTO hive_table SELECT 3, '20';
    
  • Gegevens lezen uit hive_table

      Flink SQL> SELECT * FROM hive_table;
      2023-07-24 09:46:22,225 INFO  org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3
      +----+-------------+--------------------------------+
      | op |           x |                           days |
      +----+-------------+--------------------------------+
      | +I |           3 |                             20 |
      | +I |           2 |                             10 |
      | +I |           1 |                              5 |
      +----+-------------+--------------------------------+
      Received a total of 3 rows
    

    Notitie

    Hive Warehouse Directory bevindt zich in de aangewezen container van het opslagaccount dat is gekozen tijdens het maken van het Apache Flink-cluster, vindt u in directory hive/warehouse/

  • Hiermee kunt u Flink Table van verbindingslijntype Hive maken met Partition

    CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
    

Belangrijk

Er is een bekende beperking in Apache Flink. De laatste 'n' kolommen worden gekozen voor partities, ongeacht de door de gebruiker gedefinieerde partitiekolom. FLINK-32596 De partitiesleutel is onjuist wanneer u Flink dialect gebruikt om een Hive-tabel te maken.

Verwijzing