Použití uživatelem definovaných funkcí Pythonu s Apache Hivem a Apache Pigem ve službě HDInsight

Naučte se používat uživatelem definované funkce Pythonu (UDF) s Apache Hivem a Apache Pigem v Apache Hadoopu ve službě Azure HDInsight.

Python ve službě HDInsight

Python2.7 je ve výchozím nastavení nainstalovaná v HDInsightu 3.0 a novějších. Apache Hive je možné použít s touto verzí Pythonu ke zpracování datových proudů. Zpracování datových proudů používá k předávání dat mezi Hivem a uživatelem definovanou funkcí STDOUT a STDIN.

HDInsight také zahrnuje Jython, což je implementace Pythonu napsaná v Javě. Jython běží přímo na virtuálním počítači v Javě a nepoužívá streamování. Jython je doporučeným interpretem Pythonu při použití Pythonu s Pig.

Požadavky

Poznámka

Účet úložiště použitý v tomto článku byl Azure Storage s povoleným zabezpečeným přenosem , a proto wasbs se používá v celém článku.

Konfigurace úložiště

Pokud je použitý účet úložiště druhem Storage (general purpose v1) nebo StorageV2 (general purpose v2), nevyžaduje se žádná akce. Proces v tomto článku vytvoří výstup alespoň /tezstagingdo . Výchozí konfigurace hadoopu obsahuje /tezstaging v fs.azure.page.blob.dir proměnné konfigurace v pro core-site.xml službu HDFS. Tato konfigurace způsobí, že výstupem do adresáře budou objekty blob stránky, které nejsou podporované pro typ BlobStorageúčtu úložiště. Pokud chcete pro účely tohoto článku použít BlobStorage , odeberte /tezstaging z fs.azure.page.blob.dir konfigurační proměnné . Ke konfiguraci je možné přistupovat z uživatelského rozhraní Ambari. Jinak se zobrazí chybová zpráva: Page blob is not supported for this account type.

Upozornění

Kroky v tomto dokumentu předpokládají následující předpoklady:

  • Skripty Pythonu vytvoříte v místním vývojovém prostředí.
  • Skripty nahrajete do služby HDInsight pomocí scp příkazu nebo poskytnutého skriptu PowerShellu.

Pokud chcete k práci se službou HDInsight použít Azure Cloud Shell (Bash), musíte:

  • Vytvořte skripty v prostředí Cloud Shell.
  • Použijte scp k nahrání souborů z Cloud Shellu do SLUŽBY HDInsight.
  • Použijte ssh z Cloud Shellu pro připojení ke službě HDInsight a spusťte příklady.

Apache Hive UDF

Python je možné použít jako uživatelsky definovanou funkci z Hive prostřednictvím příkazu HiveQL TRANSFORM . Například následující HiveQL vyvolá hiveudf.py soubor uložený ve výchozím účtu Azure Storage pro cluster.

add file wasbs:///hiveudf.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
    USING 'python hiveudf.py' AS
    (clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

Tento příklad dělá toto:

  1. Příkaz add file na začátku souboru přidá hiveudf.py soubor do distribuované mezipaměti, aby k němu byly přístupné všechny uzly v clusteru.
  2. Příkaz SELECT TRANSFORM ... USING vybere data z .hivesampletable Skriptu také předá hodnoty hiveudf.py clientid, devicemake a devicemodel.
  3. Klauzule AS popisuje pole vrácená z hiveudf.py.

Vytvořit soubor

Ve vývojovém prostředí vytvořte textový soubor s názvem hiveudf.py. Jako obsah souboru použijte následující kód:

#!/usr/bin/env python
import sys
import string
import hashlib

while True:
    line = sys.stdin.readline()
    if not line:
        break

    line = string.strip(line, "\n ")
    clientid, devicemake, devicemodel = string.split(line, "\t")
    phone_label = devicemake + ' ' + devicemodel
    print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

Tento skript provede následující akce:

  1. Načte řádek dat ze stdin.
  2. Koncový znak nového řádku se odebere pomocí string.strip(line, "\n ").
  3. Při zpracování datových proudů obsahuje jeden řádek všechny hodnoty se znakem tabulátoru mezi jednotlivými hodnotami. Můžete ho tedy string.split(line, "\t") použít k rozdělení vstupu na každé kartě a vrátit jenom pole.
  4. Po dokončení zpracování musí být výstup zapsán do výstupu STDOUT jako jeden řádek s tabulátorem mezi jednotlivými poli. Například, print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]).
  5. Smyčka se while opakuje, dokud není přečteno ne line .

Výstupem skriptu je zřetězení vstupních hodnot pro devicemake a a devicemodelhodnota hash zřetězené hodnoty.

Nahrání souboru (prostředí)

Následující příkaz nahradí sshuser skutečným uživatelským jménem, pokud se liší. Nahraďte mycluster skutečným názvem clusteru. Ujistěte se, že se soubor nachází ve vašem pracovním adresáři.

  1. Pomocí scp příkazu zkopírujte soubory do clusteru HDInsight. Upravte a zadejte příkaz :

    scp hiveudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Pro připojení ke clusteru použijte SSH. Upravte a zadejte příkaz :

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. Z relace SSH přidejte soubory Pythonu nahrané dříve do úložiště pro cluster.

    hdfs dfs -put hiveudf.py /hiveudf.py
    

Použití UDF Hivu (prostředí)

  1. Pokud se chcete připojit k Hivu, použijte následující příkaz z otevřené relace SSH:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
    

    Tento příkaz spustí klienta Beeline.

  2. Do příkazového 0: jdbc:hive2://headnodehost:10001/> řádku zadejte následující dotaz:

    add file wasbs:///hiveudf.py;
    SELECT TRANSFORM (clientid, devicemake, devicemodel)
        USING 'python hiveudf.py' AS
        (clientid string, phoneLabel string, phoneHash string)
    FROM hivesampletable
    ORDER BY clientid LIMIT 50;
    
  3. Po zadání posledního řádku by se měla úloha spustit. Jakmile se úloha dokončí, vrátí výstup podobný následujícímu příkladu:

    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    
  4. Chcete-li ukončit Beeline, zadejte následující příkaz:

    !q
    

Nahrání souboru (PowerShell)

PowerShell je také možné použít ke vzdálenému spouštění dotazů Hive. Ujistěte se, že hiveudf.py se nachází váš pracovní adresář. Pomocí následujícího skriptu PowerShellu spusťte dotaz Hive, který tento skript používá hiveudf.py :

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToStreamingFile = ".\hiveudf.py"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToStreamingFile `
    -Blob "hiveudf.py" `
    -Container $container `
    -Context $context

Poznámka

Další informace o nahrávání souborů najdete v dokumentu Nahrávání dat pro úlohy Apache Hadoop ve službě HDInsight .

Použití UDF Hivu

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"

$HiveQuery = "add file wasbs:///hiveudf.py;" +
                "SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
                "USING 'python hiveudf.py' AS " +
                "(clientid string, phoneLabel string, phoneHash string) " +
                "FROM hivesampletable " +
                "ORDER BY clientid LIMIT 50;"

# Create Hive job object
$jobDefinition = New-AzHDInsightHiveJobDefinition `
    -Query $HiveQuery

# For status bar updates
$activity="Hive query"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting query..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting on query to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -JobId $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
   -Clustername $clusterName `
   -JobId $job.JobId `
   -HttpCredential $creds `
   -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Výstup úlohy Hive by měl vypadat podobně jako v následujícím příkladu:

100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9

Apache Pig UDF

Skript Pythonu je možné použít jako uživatelsky definovanou funkci z Pig prostřednictvím příkazu .GENERATE Skript můžete spustit pomocí jazyka Jython nebo C Python.

  • Jython běží na prostředí JVM a dá se nativně volat z Pigu.
  • Python jazyka C je externí proces, takže data z Pig v prostředí JVM se odesílají do skriptu spuštěného v procesu Pythonu. Výstup skriptu Pythonu se odešle zpět do Pigu.

Pokud chcete určit interpret Pythonu, použijte register při odkazování na skript Pythonu příkaz . Následující příklady registrují skripty s Pig jako myfuncs:

  • Použití Jythonu: register '/path/to/pigudf.py' using jython as myfuncs;
  • Použití jazyka C Python: register '/path/to/pigudf.py' using streaming_python as myfuncs;

Důležité

Při použití Jythonu může být cesta k souboru pig_jython buď místní, nebo WASBS:// cesta. Při použití jazyka C Python ale musíte odkazovat na soubor v místním systému souborů uzlu, který používáte k odeslání úlohy Pig.

Po dokončení registrace je pig latinka v tomto příkladu stejná pro oba:

LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;

Tento příklad funguje takto:

  1. První řádek načte ukázkový datový soubor sample.log do LOGS. Také definuje každý záznam jako chararray.
  2. Další řádek vyfiltruje všechny hodnoty null a uloží výsledek operace do LOG.
  3. Dále iteruje záznamy v LOG a použije GENERATE k vyvolání create_structure metody obsažené ve skriptu Python/Jython načtený jako myfuncs. LINE slouží k předání aktuálního záznamu do funkce.
  4. Nakonec se výstupy pomocí příkazu stdout vypíše do výpisu DUMP . Tento příkaz zobrazí výsledky po dokončení operace.

Vytvoření souboru

Ve vývojovém prostředí vytvořte textový soubor s názvem pigudf.py. Jako obsah souboru použijte následující kód:

# Uncomment the following if using C Python
#from pig_util import outputSchema


@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
    if (input.startswith('java.lang.Exception')):
        input = input[21:len(input)] + ' - java.lang.Exception'
    date, time, classname, level, detail = input.split(' ', 4)
    return date, time, classname, level, detail

V příkladu Pig Latin je vstup definován jako chararray, LINE protože pro vstup neexistuje konzistentní schéma. Skript Pythonu transformuje data na konzistentní schéma výstupu.

  1. Příkaz @outputSchema definuje formát dat vrácených do Pig. V tomto případě se jedná o datový vak, což je datový typ Pig. Taška obsahuje následující pole, z nichž všechna jsou chararray (řetězce):

    • date – datum vytvoření položky protokolu.
    • time – čas vytvoření položky protokolu
    • classname – název třídy, pro kterou byla položka vytvořena
    • level – úroveň protokolu
    • detail – podrobné podrobnosti pro položku protokolu
  2. Dále definuje def create_structure(input) funkci, do které Pig předává řádkové položky.

  3. Ukázková data sample.logvětšinou odpovídají schématu data, času, názvu třídy, úrovně a podrobností. Obsahuje však několik řádků, které začínají na *java.lang.Exception*. Tyto řádky musí být upraveny tak, aby odpovídaly schématu. Příkaz if je zkontroluje a potom masíruje vstupní data, aby se řetězec přesunul *java.lang.Exception* na konec, čímž se data přenesou do souladu s očekávaným výstupním schématem.

  4. Dále se split příkaz použije k rozdělení dat na prvních čtyřech mezerách. Výstup je přiřazený do date, time, classname, levela detail.

  5. Nakonec se hodnoty vrátí do Pig.

Když se data vrátí do Pig, mají konzistentní schéma definované v @outputSchema příkazu.

Nahrání souboru (prostředí)

V následujících příkazech nahraďte sshuser skutečným uživatelským jménem, pokud se liší. Nahraďte mycluster skutečným názvem clusteru. Ujistěte se, že se soubor nachází v pracovním adresáři.

  1. Slouží scp ke zkopírování souborů do clusteru HDInsight. Upravte a zadejte příkaz:

    scp pigudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Pro připojení ke clusteru použijte SSH. Upravte a zadejte příkaz:

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. Z relace SSH přidejte soubory Pythonu nahrané dříve do úložiště pro cluster.

    hdfs dfs -put pigudf.py /pigudf.py
    

Použití pig UDF (shell)

  1. Pokud se chcete připojit k pig, použijte následující příkaz z otevřené relace SSH:

    pig
    
  2. Na příkazovém řádku zadejte následující příkazy grunt> :

    Register wasbs:///pigudf.py using jython as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    
  3. Po zadání následujícího řádku by se měla úloha spustit. Po dokončení úlohy se vrátí výstup podobný následujícím datům:

    ((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
    ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
    ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
    ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
    ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
    
  4. Pomocí quit příkazu ukončete prostředí Grunt a pak pomocí následujícího postupu upravte soubor pigudf.py v místním systému souborů:

    nano pigudf.py
    
  5. Jakmile budete v editoru, odkomentujte následující řádek odebráním # znaku z začátku řádku:

    #from pig_util import outputSchema
    

    Tento řádek upraví skript Pythonu tak, aby místo Jythonu fungoval s jazykem C Python. Po provedení změny ukončete editor stisknutím kombinace kláves Ctrl+X . Vyberte Y a pak enter a uložte změny.

  6. Pomocí příkazu pig znovu spusťte prostředí. Jakmile budete na příkazovém grunt> řádku, pomocí následujícího příkazu spusťte skript Pythonu pomocí interpretu Pythonu jazyka C.

    Register 'pigudf.py' using streaming_python as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    

    Po dokončení této úlohy by se měl zobrazit stejný výstup jako při spuštění skriptu pomocí Jythonu.

Nahrání souboru (PowerShell)

PowerShell je také možné použít ke vzdálenému spouštění dotazů Hive. Ujistěte se, že pigudf.py se nachází váš pracovní adresář. Pomocí následujícího skriptu PowerShellu spusťte dotaz Hive, který tento skript používá pigudf.py :

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToJythonFile = ".\pigudf.py"


# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToJythonFile `
    -Blob "pigudf.py" `
    -Container $container `
    -Context $context

Použití pig UDF (PowerShell)

Poznámka

Při vzdáleném odeslání úlohy pomocí PowerShellu není možné jako interpret použít jazyk C Python.

PowerShell je také možné použít ke spouštění úloh Pig Latin. Pokud chcete spustit úlohu Pig Latin, která používá pigudf.py tento skript, použijte následující skript PowerShellu:

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"


$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
            "LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
            "LOG = FILTER LOGS by LINE is not null;" +
            "DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
            "DUMP DETAILS;"

# Create Pig job object
$jobDefinition = New-AzHDInsightPigJobDefinition -Query $PigQuery

# For status bar updates
$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -Job $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds `
    -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Výstup pro úlohu Pig by měl vypadat podobně jako následující data:

((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))

Poradce při potížích

Chyby při spouštění úloh

Při spuštění úlohy Hive může dojít k chybě podobné následujícímu textu:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.

Příčinou tohoto problému mohou být konce řádků v souboru Pythonu. Mnoho editorů windows ve výchozím nastavení jako konec řádku používá CRLF, ale linuxové aplikace obvykle očekávají LF.

Před nahráním souboru do HDInsight můžete pomocí následujících příkazů PowerShellu odebrat znaky CR:

Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job

Skripty prostředí PowerShell

Oba ukázkové skripty PowerShellu použité ke spuštění příkladů obsahují řádek s komentářem, který zobrazuje výstup chyby pro úlohu. Pokud se nezobrazuje očekávaný výstup úlohy, zrušte komentář na následujícím řádku a zjistěte, jestli informace o chybě značí problém.

$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

Informace o chybě (STDERR) a výsledek úlohy (STDOUT) se také protokolují do úložiště HDInsight.

Pro tuto práci... Prohlédněte si tyto soubory v kontejneru objektů blob.
Hive /HivePython/stderr

/HivePython/stdout

Pig /PigPython/stderr

/PigPython/stdout

Další kroky

Pokud potřebujete načíst moduly Pythonu, které nejsou ve výchozím nastavení k dispozici, přečtěte si téma Nasazení modulu do Azure HDInsight.

Další způsoby použití Pigu, Hive a další informace o používání MapReduce najdete v následujících dokumentech: