Hive dialect in Apache Flink-clusters® in HDInsight op AKS
Belangrijk
Deze functie is momenteel beschikbaar in preview. De aanvullende gebruiksvoorwaarden voor Microsoft Azure Previews bevatten meer juridische voorwaarden die van toepassing zijn op Azure-functies die bèta, in preview of anderszins nog niet beschikbaar zijn in algemene beschikbaarheid. Zie Azure HDInsight op AKS Preview-informatie voor meer informatie over deze specifieke preview. Voor vragen of suggesties voor functies dient u een aanvraag in op AskHDInsight met de details en volgt u ons voor meer updates in de Azure HDInsight-community.
In dit artikel leert u hoe u Hive-dialect gebruikt in Apache Flink-clusters in HDInsight op AKS.
Inleiding
De gebruiker kan het standaarddialect flink
niet wijzigen in hive dialect voor hun gebruik in HDInsight op AKS-clusters. Alle SQL-bewerkingen mislukken zodra ze zijn gewijzigd in hive dialect met de volgende fout.
*java.lang.ClassCastException: class jdk.internal.loader.ClassLoaders$AppClassLoader can't be cast to class java.net.URLClassLoader*
De reden voor dit probleem ontstaat door een open Hive Jira. Momenteel gaat Hive ervan uit dat het systeemklasselaadprogramma een exemplaar van URLClassLoader is. In Java 11
, deze aanname is niet het geval.
Hive dialect gebruiken in Flink
Voer de volgende stappen uit in webssh:
Verwijder de bestaande flink-sql-connector-hive*jar in lib-locatie
rm /opt/flink-webssh/lib/flink-sql-connector-hive*jar
Download het volgende jar in
webssh
pod en voeg het toe onder de wget https://aka.ms/hdiflinkhivejdk11jar/opt/flink-webssh/lib . (De bovenstaande hive jar heeft de fix https://issues.apache.org/jira/browse/HIVE-27508)-
mv /opt/flink-webssh/lib/flink-table-planner-loader-1.17.0-*.*.*.*.jar /opt/flink-webssh/opt/
-
mv /opt/flink-webssh/opt/flink-table-planner_2.12-1.17.0-*.*.*.*.jar /opt/flink-webssh/lib/
Voeg de volgende sleutels toe in het
flink
configuratiebeheer onder core-site.xml sectie:fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY> flink.hadoop.fs.azure.account.key.<STORAGE>.dfs.core.windows.net: <KEY>
Hier volgt een overzicht van hive-dialectquery's
- Hive dialect uitvoeren in Flink zonder partitionering
root [ ~ ]# ./bin/sql-client.sh Flink SQL> Flink SQL> create catalog myhive with ('type' = 'hive', 'hive-conf-dir' = '/opt/hive-conf'); [INFO] Execute statement succeed. Flink SQL> use catalog myhive; [INFO] Execute statement succeed. Flink SQL> load module hive; [INFO] Execute statement succeed. Flink SQL> use modules hive,core; [INFO] Execute statement succeed. Flink SQL> set table.sql-dialect=hive; [INFO] Session property has been set. Flink SQL> set sql-client.execution.result-mode=tableau; [INFO] Session property has been set. Flink SQL> select explode(array(1,2,3));Hive Session ID = 6ba45be2-360e-4bee-8842-2765c91581c8 > [!WARNING] > An illegal reflective access operation has occurred > [!WARNING] > Illegal reflective access by org.apache.hadoop.hive.common.StringInternUtils (file:/opt/flink-webssh/lib/flink-sql-connector-hive-3.1.2_2.12-1.16-SNAPSHOT.jar) to field java.net.URI.string > [!WARNING] > Please consider reporting this to the maintainers of org.apache.hadoop.hive.common.StringInternUtils > [!WARNING] > `Use --illegal-access=warn` to enable warnings of further illegal reflective access operations > [!WARNING] > All illegal access operations will be denied in a future release select explode(array(1,2,3)); +----+-------------+ | op | col | +----+-------------+ | +I | 1 | | +I | 2 | | +I | 3 | +----+-------------+ Received a total of 3 rows Flink SQL> create table tttestHive Session ID = fb8b652a-8dad-4781-8384-0694dc16e837 [INFO] Execute statement succeed. Flink SQL> insert into table tttestHive Session ID = f239dc6f-4b58-49f9-ad02-4c73673737d8),(3,'c'),(4,'d'); [INFO] Submitting SQL update statement to the cluster... [INFO] SQL update statement has been successfully submitted to the cluster: Job ID: d0542da4c4252f9494298666ff4e9f8e Flink SQL> set execution.runtime-mode=batch; [INFO] Session property has been set. Flink SQL> select * from tttestHive Session ID = 61b6eb3b-90a6-499c-aced-0598366c5b31 +-----+-------+ | key | value | +-----+-------+ | 1 | a | | 1 | a | | 2 | b | | 3 | c | | 3 | c | | 3 | c | | 4 | d | | 5 | e | +-----+-------+ 8 rows in set Flink SQL> QUIT;Hive Session ID = 2dadad92-436e-426e-a88c-66eafd740d98 [INFO] Exiting Flink SQL CLI Client... Shutting down the session... done. root [ ~ ]# exit
De gegevens worden geschreven in dezelfde container die is geconfigureerd in de hive/warehouse-map.
- Hive dialect uitvoeren in Flink met partities
create table tblpart2 (key int, value string) PARTITIONED by ( part string ) tblproperties ('sink.partition-commit.delay'='1 s', 'sink.partition-commit.policy.kind'='metastore,success-file');
insert into table tblpart2 Hive Session ID = 78fae85f-a451-4110-bea6-4aa1c172e282),(2,'b','d'),(3,'c','d'),(3,'c','a'),(4,'d','e');
Verwijzing
- Hive Dialect in Apache Flink
- Apache, Apache Flink, Flink en bijbehorende opensource-projectnamen zijn handelsmerken van de Apache Software Foundation (ASF).
Feedback
https://aka.ms/ContentUserFeedback.
Binnenkort beschikbaar: In de loop van 2024 zullen we GitHub-problemen geleidelijk uitfaseren als het feedbackmechanisme voor inhoud en deze vervangen door een nieuw feedbacksysteem. Zie voor meer informatie:Feedback verzenden en weergeven voor