Table-API en SQL in Apache Flink-clusters® in HDInsight op AKS
Belangrijk
Deze functie is momenteel beschikbaar in preview. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews bevatten meer juridische voorwaarden die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet beschikbaar zijn in algemene beschikbaarheid. Zie Azure HDInsight op AKS Preview-informatie voor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight met de details en volgt u ons voor meer updates in de Azure HDInsight-community.
Apache Flink bevat twee relationele API's, de Table-API en SQL, voor geïntegreerde stream- en batchverwerking. De Table-API is een met taal geïntegreerde query-API waarmee u query's kunt samenstellen van relationele operators, zoals selectie, filter en join intuïtief. De SQL-ondersteuning van Flink is gebaseerd op Apache Calcite, waarmee de SQL-standaard wordt geïmplementeerd.
De Table-API en SQL-interfaces kunnen naadloos worden geïntegreerd met elkaar en de DataStream-API van Flink. U kunt eenvoudig schakelen tussen alle API's en bibliotheken, die hierop zijn gebaseerd.
Apache Flink SQL
Net als andere SQL-engines werken Flink-query's op tabellen. Het verschilt van een traditionele database omdat Flink geen data-at-rest lokaal beheert; In plaats daarvan worden de query's continu uitgevoerd via externe tabellen.
Flink pijplijnen voor gegevensverwerking beginnen met brontabellen en eindigen met sinktabellen. Brontabellen produceren rijen die worden uitgevoerd tijdens de uitvoering van de query; dit zijn de tabellen waarnaar wordt verwezen in de FROM-component van een query. Verbinding maken ors kunnen van het type HDInsight Kafka, HDInsight HBase, Azure Event Hubs, databases, bestandssysteem of een ander systeem zijn waarvan de connector zich in het klassepad bevindt.
Flink SQL Client gebruiken in HDInsight op AKS-clusters
Raadpleeg dit artikel over het gebruik van CLI vanuit Secure Shell in Azure Portal. Hier volgen enkele snelle voorbeelden van hoe u aan de slag kunt gaan.
De SQL-client starten
./bin/sql-client.sh
Een initialisatie-SQL-bestand doorgeven om samen met sql-client uit te voeren
./sql-client.sh -i /path/to/init_file.sql
Een configuratie instellen in sql-client
SET execution.runtime-mode = streaming; SET sql-client.execution.result-mode = table; SET sql-client.execution.max-table-result.rows = 10000;
SQL DDL
Flink SQL ondersteunt de volgende CREATE-instructies
- CREATE TABLE
- CREATE DATABASE
- CATALOGUS MAKEN
Hieronder volgt een voorbeeldsyntaxis voor het definiëren van een brontabel met behulp van jdbc-connector om verbinding te maken met MSSQL, met id, naam als kolommen in een CREATE TABLE-instructie
CREATE TABLE student_information (
id BIGINT,
name STRING,
address STRING,
grade STRING,
PRIMARY KEY (id) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:sqlserver://server-name.database.windows.net;database=dbname;encrypt=true;trustServerCertificate=true;create=false;loginTimeout=30',
'table-name' = 'students',
'username' = 'username',
'password' = 'password'
);
CREATE DATABASE :
CREATE DATABASE students;
CATALOGUS MAKEN:
CREATE CATALOG myhive WITH ('type'='hive');
U kunt doorlopende query's uitvoeren boven op deze tabellen
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Schrijven naar sinktabel uit de brontabel:
INSERT INTO grade_counts
SELECT id,
COUNT(*) as student_count
FROM student_information
GROUP BY grade;
Afhankelijkheden toevoegen
JAR-instructies worden gebruikt om gebruikers-JAR-bestanden toe te voegen aan het klassepad of gebruikers-JAR's te verwijderen uit het klassepad of om toegevoegde JAR's weer te geven in het klassepad in de runtime.
Flink SQL ondersteunt de volgende JAR-instructies:
- ADD JAR
- JARS WEERGEVEN
- JAR VERWIJDEREN
Flink SQL> ADD JAR '/path/hello.jar';
[INFO] Execute statement succeed.
Flink SQL> ADD JAR 'hdfs:///udf/common-udf.jar';
[INFO] Execute statement succeed.
Flink SQL> SHOW JARS;
+----------------------------+
| jars |
+----------------------------+
| /path/hello.jar |
| hdfs:///udf/common-udf.jar |
+----------------------------+
Flink SQL> REMOVE JAR '/path/hello.jar';
[INFO] The specified jar is removed from session classloader.
Hive Metastore in Apache Flink-clusters® in HDInsight op AKS
Catalogi bieden metagegevens, zoals databases, tabellen, partities, weergaven en functies en informatie die nodig is voor toegang tot gegevens die zijn opgeslagen in een database of andere externe systemen.
In HDInsight op AKS ondersteunen we twee catalogusopties:
GenericInMemoryCatalog
GenericInMemoryCatalog is een in-memory implementatie van een catalogus. Alle objecten zijn alleen beschikbaar voor de levensduur van de SQL-sessie.
HiveCatalog
Het HiveCatalog dient twee doeleinden: als permanente opslag voor pure Flink-metagegevens en als interface voor het lezen en schrijven van bestaande Hive-metagegevens.
Notitie
HDInsight op AKS-clusters wordt geleverd met een geïntegreerde optie van Hive Metastore voor Apache Flink. U kunt kiezen voor Hive Metastore tijdens het maken van het cluster
Flink Databases maken en registreren bij catalogi
U kunt dit artikel raadplegen over het gebruik van CLI en aan de slag met Flink SQL Client vanuit Secure Shell in Azure Portal.
Sessie starten
sql-client.sh
Default_catalog is de standaardcatalogus in het geheugen
Laten we nu de standaarddatabase van in-memory catalogus controleren
Laten we Hive Catalog van versie 3.1.2 maken en gebruiken
CREATE CATALOG myhive WITH ('type'='hive'); USE CATALOG myhive;
Notitie
HDInsight op AKS ondersteunt Hive 3.1.2 en Hadoop 3.3.2. De
hive-conf-dir
is ingesteld op locatie/opt/hive-conf
We maken database in hive-catalogus en maken deze standaard voor de sessie (tenzij gewijzigd).
Hive-tabellen maken en registreren bij Hive-catalogus
Volg de instructies voor het maken en registreren van Flink Databases to Catalog
Laten we Flink Table van connectortype Hive maken zonder partitie
CREATE TABLE hive_table(x int, days STRING) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Gegevens invoegen in hive_table
INSERT INTO hive_table SELECT 2, '10'; INSERT INTO hive_table SELECT 3, '20';
Gegevens lezen uit hive_table
Flink SQL> SELECT * FROM hive_table; 2023-07-24 09:46:22,225 INFO org.apache.hadoop.mapred.FileInputFormat[] - Total input files to process : 3 +----+-------------+--------------------------------+ | op | x | days | +----+-------------+--------------------------------+ | +I | 3 | 20 | | +I | 2 | 10 | | +I | 1 | 5 | +----+-------------+--------------------------------+ Received a total of 3 rows
Notitie
Hive Warehouse Directory bevindt zich in de aangewezen container van het opslagaccount dat is gekozen tijdens het maken van het Apache Flink-cluster, vindt u in directory hive/warehouse/
Hiermee kunt u Flink Table van verbindingslijntype Hive maken met Partition
CREATE TABLE partitioned_hive_table(x int, days STRING) PARTITIONED BY (days) WITH ( 'connector' = 'hive', 'sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
Belangrijk
Er is een bekende beperking in Apache Flink. De laatste 'n' kolommen worden gekozen voor partities, ongeacht de door de gebruiker gedefinieerde partitiekolom. FLINK-32596 De partitiesleutel is onjuist wanneer u Flink dialect gebruikt om een Hive-tabel te maken.
Verwijzing
- Apache Flink Table-API & SQL
- Apache, Apache Flink, Flink en bijbehorende opensource-projectnamen zijn handelsmerken van de Apache Software Foundation (ASF).
Feedback
https://aka.ms/ContentUserFeedback.
Binnenkort beschikbaar: In de loop van 2024 zullen we GitHub-problemen geleidelijk uitfaseren als het feedbackmechanisme voor inhoud en deze vervangen door een nieuw feedbacksysteem. Zie voor meer informatie:Feedback verzenden en weergeven voor