Usare le funzioni definite dall'utente di Python (UDF) con Hive e Pig in HDInsight

Informazioni su come usare le funzioni definite dall'utente di Python con Apache Hive e Pig in Hadoop in Azure HDInsight.

Python in HDInsight

Python 2.7 viene installato per impostazione predefinita in HDInsight 3.0 e versioni successive. Apache Hive può essere usato con questa versione di Python per l'elaborazione di flussi. L'elaborazione di flussi usa STDOUT e STDIN per passare i dati tra Hive e la funzione definita dall'utente.

HDInsight include anche Jython, un'implementazione di Python scritta in Java. Jython viene eseguito direttamente in Java Virtual Machine e non usa lo streaming. Jython è l'interprete Python consigliato quando si usa Python con Pig.

Avviso

La procedura in questo documento parte dai presupposti seguenti:

  • L'utente deve creare gli script Python in un ambiente di sviluppo locale.
  • Caricare gli script in HDInsight usando il comando scp da una sessione Bash locale o lo script di PowerShell presente.

Se si desidera usare l'anteprima Azure Cloud Shell (bash) con HDInsight, è necessario:

  • Creare gli script all'interno dell'ambiente Cloud Shell.
  • Usare scp per caricare i file da Cloud Shell in HDInsight.
  • Usare ssh da Cloud Shell per connettersi a HDInsight ed eseguire gli esempi.

UDF di Hive

Python può essere usato come funzione definita dall'utente da Hive tramite l'istruzione TRANSFORM di HiveQL. Ad esempio, HiveQL seguente richiama il file hiveudf.py archiviato nell'account di archiviazione di Azure predefinito per il cluster.

HDInsight basato su Linux

add file wasb:///hiveudf.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
    USING 'python hiveudf.py' AS
    (clientid string, phoneLable string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

HDInsight basato su Windows

add file wasb:///hiveudf.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
    USING 'D:\Python27\python.exe hiveudf.py' AS
    (clientid string, phoneLable string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

Nota

Nei cluster HDInsight basati su Windows la clausola USING deve specificare il percorso completo di python.exe.

Ecco cosa fa l'esempio:

  1. L'istruzione add file all'inizio del file aggiunge il file hiveudf.py alla cache distribuita ed è quindi accessibile a tutti i nodi del cluster.
  2. L'istruzione SELECT TRANSFORM ... USING seleziona i dati da hivesampletable. Passa anche i valori clientid, devicemake e devicemodel nello script hiveudf.py.
  3. La clausola AS descrive i campi restituiti da hiveudf.py.

Creare il file hiveudf.py

Nell'ambiente di sviluppo creare un file di testo denominato hiveudf.py. Usare il codice seguente come contenuto del file:

#!/usr/bin/env python
import sys
import string
import hashlib

while True:
    line = sys.stdin.readline()
    if not line:
        break

    line = string.strip(line, "\n ")
    clientid, devicemake, devicemodel = string.split(line, "\t")
    phone_label = devicemake + ' ' + devicemodel
    print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

Lo script esegue le azioni seguenti:

  1. Leggere una riga di dati da STDIN.
  2. Il carattere di nuova riga finale viene rimosso con string.strip(line, "\n ").
  3. Durante l'elaborazione di flussi tutti i valori sono contenuti in un'unica riga sono separati da un carattere di tabulazione. Si può quindi usare string.split(line, "\t") per dividere l'input in corrispondenza di ogni tabulazione, in modo da restituire solo i campi.
  4. Al termine dell'elaborazione, l'output deve essere scritto in STDOUT in un'unica riga, con i campi separati da tabulazioni. Ad esempio, print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]).
  5. Il ciclo while si ripete finché non viene letta alcuna line.

Lo script di output è una concatenazione di valori di input per devicemake e devicemodel e un hash del valore concatenato.

Per informazioni su come eseguire questo esempio nel cluster HDInsight, vedere Esecuzione degli esempi .

UDF di Pig

Si può usare uno script di Python come funzione definita dall'utente da Pig tramite l'istruzione GENERATE. È possibile eseguire lo script usando Jython o C Python.

  • Jython viene eseguito su JVM e può essere chiamato in modo nativo da Pig.
  • C Python è un processo esterno, pertanto i dati di Pig su JVM vengono inviati allo script in esecuzione in un processo di Python. Quindi l'output dello script di Python viene inviato in Pig.

Per specificare l'interprete Python, usare register quando si fa riferimento allo script di Python. Negli esempi seguenti gli script vengono registrati con Pig come myfuncs:

  • Per usare Jython: register '/path/to/pigudf.py' using jython as myfuncs;
  • Per usare C Python: register '/path/to/pigudf.py' using streaming_python as myfuncs;

Importante

Quando si usa Jython, il percorso al file pig_jython può essere un percorso locale o un percorso WASB://. Tuttavia, quando si usa C Python, è necessario fare riferimento un file nel file system locale del nodo che si usa per inviare il processo Pig.

Una volta trascorsa la registrazione, Pig Latin per questo esempio è lo stesso per entrambi:

LOGS = LOAD 'wasb:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;

Ecco cosa fa l'esempio:

  1. La prima riga carica il file di dati di esempio sample.log in LOGS. Definisce anche ogni record come chararray.
  2. La riga successiva esclude tutti i valori Null, archiviando il risultato dell'operazione in LOG.
  3. Esegue quindi l'iterazione sui record in LOG e usa GENERATE per richiamare il metodo create_structure contenuto nello script di Python/Jython caricato come myfuncs. LINE viene usato per passare il record corrente alla funzione.
  4. Viene infine eseguito il dump degli output in STDOUT con il comando DUMP. Questo comando visualizza i risultati al termine dell'operazione.

Creare il file pigudf.py

Nell'ambiente di sviluppo creare un file di testo denominato pigudf.py. Usare il codice seguente come contenuto del file:

# Uncomment the following if using C Python
#from pig_util import outputSchema

@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
    if (input.startswith('java.lang.Exception')):
        input = input[21:len(input)] + ' - java.lang.Exception'
    date, time, classname, level, detail = input.split(' ', 4)
    return date, time, classname, level, detail

Nell'esempio Pig Latin l'input di LINE è stato definito come chararray perché non contiene uno schema coerente. Con lo script Python i dati vengono trasformati in uno schema coerente per l'output.

  1. L'istruzione @outputSchema definisce il formato dei dati che verranno restituiti a Pig. In questo caso si tratta di un contenitore di dati, ovvero un tipo di dati Pig. Il contenitore include i seguenti campi, che sono tutti chararray (stringhe):

    • date - data di creazione della voce del log
    • time - ora di creazione della voce del log
    • classname - nome della classe per cui è stata creata la voce
    • level - livello del log
    • detail - dettagli relativi alla voce di log
  2. Successivamente def create_structure(input) definisce la funzione a cui Pig passerà le voci.

  3. I dati di esempio in sample.log sono per lo più conformi allo schema date, time, classname, level e detail che si intende restituire. Contengono tuttavia alcune righe che iniziano con *java.lang.Exception*. Queste righe devono essere modificate in modo che corrispondano allo schema. L'istruzione if verifica la presenza di queste righe, quindi forza i dati di input a spostare la stringa *java.lang.Exception* alla fine, portando i dati in linea con lo schema di output previsto.

  4. Viene quindi usato il comando split per dividere i dati in corrispondenza dei primi quattro spazi. L'output viene assegnato in date, time, classname, level e detail.

  5. I valori vengono infine restituiti a Pig.

Quando i dati vengono restituiti a Pig, lo schema sarà coerente, come definito nell'istruzione @outputSchema.

Caricare ed eseguire gli esempi

Importante

La procedura per SSH può essere eseguita solo con un cluster HDInsight basato su Linux. La procedura per PowerShell può essere eseguita con un cluster HDInsight basato su Linux o su Windows, ma è necessario un client Windows.

SSH

Per altre informazioni sull'uso di SSH, vedere Usare SSH con HDInsight.

  1. Usare scp per copiare i file nel cluster HDInsight. Ad esempio, il comando seguente copia i file in un cluster denominato mycluster.

    scp hiveudf.py pigudf.py myuser@mycluster-ssh.azurehdinsight.net:
    
  2. Usare SSH per connettersi al cluster.

    ssh myuser@mycluster-ssh.azurehdinsight.net
    
  3. Dalla sessione SSH, aggiungere i file python caricati in precedenza nell'archivio WASB per il cluster.

    hdfs dfs -put hiveudf.py /hiveudf.py
    hdfs dfs -put pigudf.py /pigudf.py
    

Dopo il caricamento dei file, usare la procedura seguente per eseguire i processi Hive e Pig.

Usare l'UDF di Hive

  1. Usare il comando hive per avviare la shell di Hive. Una volta caricata la shell, verrà visualizzato un prompt hive> .

  2. Immettere la query seguente al prompt hive>:

    add file wasb:///hiveudf.py;
    SELECT TRANSFORM (clientid, devicemake, devicemodel)
        USING 'python hiveudf.py' AS
        (clientid string, phoneLabel string, phoneHash string)
    FROM hivesampletable
    ORDER BY clientid LIMIT 50;
    
  3. Dopo l'immissione dell'ultima riga il processo dovrebbe essere avviato. Al termine del processo, restituisce un output simile al seguente esempio:

     100041    RIM 9650    d476f3687700442549a83fac4560c51c
     100041    RIM 9650    d476f3687700442549a83fac4560c51c
     100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
     100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
     100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    

Usare l'UDF di Pig

  1. Usare il comando pig per avviare la shell. Dopo il caricamento della shell, viene visualizzato un prompt grunt>.

  2. Al prompt grunt> immettere le istruzioni seguenti:

    Register wasb:///pigudf.py using jython as myfuncs;
    LOGS = LOAD 'wasb:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    
  3. Dopo l'immissione della riga seguente, il processo dovrebbe essere avviato. Al termine del processo, restituisce un output simile ai dati seguenti:

     ((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
     ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
     ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
     ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
     ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
    
  4. Usare quit per chiudere la shell Grunt e quindi il prompt seguente per modificare il file pigudf.py nel file system locale:

    nano pigudf.py
    
  5. Una volta nell'editor, rimuovere i simboli di commenti dalla riga seguente rimuovendo il carattere # dall'inizio della riga:

    #from pig_util import outputSchema
    

    Una volta apportata la modifica, usare Ctrl + X per uscire dall'editor. Selezionare Y e quindi INVIO per salvare le modifiche.

  6. Usare il comando pig per avviare di nuovo la shell. Una volta al prompt grunt> usare la riga seguente per eseguire il script di Python con l'interprete C Python.

    Register 'pigudf.py' using streaming_python as myfuncs;
    LOGS = LOAD 'wasb:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    

    Una volta completato questo processo, si dovrebbe vedere lo stesso risultato di quando è stato eseguito lo script in precedenza con Jython.

PowerShell: Caricare i file

È possibile usare PowerShell per caricare i file nel server di HDInsight. Usare lo script seguente per caricare i file di Python:

Importante

I passaggi descritti in questa sezione usano Azure PowerShell. Per altre informazioni sull'uso di Azure PowerShell, vedere Come installare e configurare Azure PowerShell.

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzureRmSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Add-AzureRmAccount
}

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
# Change the path to match the file location on your system
$pathToStreamingFile = "C:\path\to\hiveudf.py"
$pathToJythonFile = "C:\path\to\pigudf.py"

$clusterInfo = Get-AzureRmHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzureRmStorageAccountKey `
    -Name $storageAccountName `
-ResourceGroupName $resourceGroup)[0].Value

#Create a storage content and upload the file
$context = New-AzureStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

Set-AzureStorageBlobContent `
    -File $pathToStreamingFile `
    -Blob "hiveudf.py" `
    -Container $container `
    -Context $context

Set-AzureStorageBlobContent `
    -File $pathToJythonFile `
    -Blob "pigudf.py" `
    -Container $container `
    -Context $context

Importante

Modifica il valore C:\path\to per il percorso dei file in un ambiente di sviluppo.

Questo script recupera le informazioni relative al cluster HDInsight, quindi estrae l'account e la chiave dell'account di archiviazione predefinito e carica i file nella radice del contenitore.

Nota

Per altre informazioni sul caricamento dei file, vedere il documento Caricare dati per processi Hadoop in HDInsight.

PowerShell: usare l'UDF di Hive

PowerShell può essere usato anche per eseguire in remoto le query Hive. Usare lo script di PowerShell seguente per eseguire una query di Hive che usa lo script hiveudf.py:

Importante

Prima dell'esecuzione, lo script richiede le informazioni sull'account HTTPS/Amministratore per il cluster HDInsight.

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzureRmSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Add-AzureRmAccount
}

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

# If using a Windows-based HDInsight cluster, change the USING statement to:
# "USING 'D:\Python27\python.exe hiveudf.py' AS " +
$HiveQuery = "add file wasb:///hiveudf.py;" +
                "SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
                "USING 'python hiveudf.py' AS " +
                "(clientid string, phoneLabel string, phoneHash string) " +
                "FROM hivesampletable " +
                "ORDER BY clientid LIMIT 50;"

$jobDefinition = New-AzureRmHDInsightHiveJobDefinition `
    -Query $HiveQuery

$job = Start-AzureRmHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds
Write-Host "Wait for the Hive job to complete ..." -ForegroundColor Green
Wait-AzureRmHDInsightJob `
    -JobId $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds
# Uncomment the following to see stderr output
# Get-AzureRmHDInsightJobOutput `
#   -Clustername $clusterName `
#   -JobId $job.JobId `
#   -HttpCredential $creds `
#   -DisplayOutputType StandardError
Write-Host "Display the standard output ..." -ForegroundColor Green
Get-AzureRmHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

L'output del processo Hive sarà simile al seguente esempio:

100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9

Pig (Jython)

PowerShell può essere usato anche per eseguire processi di Pig Latin. Per eseguire un processo di Pig Latin che usa lo script pigudf.py, usare lo script di PowerShell seguente:

Nota

Durante l'invio remoto di un processo con PowerShell, non è possibile usare C Python come interprete.

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzureRmSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Add-AzureRmAccount
}

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -Message "Enter the login for the cluster"

$PigQuery = "Register wasb:///pigudf.py using jython as myfuncs;" +
            "LOGS = LOAD 'wasb:///example/data/sample.log' as (LINE:chararray);" +
            "LOG = FILTER LOGS by LINE is not null;" +
            "DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
            "DUMP DETAILS;"

$jobDefinition = New-AzureRmHDInsightPigJobDefinition -Query $PigQuery

$job = Start-AzureRmHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

Write-Host "Wait for the Pig job to complete ..." -ForegroundColor Green
Wait-AzureRmHDInsightJob `
    -Job $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds
# Uncomment the following to see stderr output
# Get-AzureRmHDInsightJobOutput `
#    -Clustername $clusterName `
#    -JobId $job.JobId `
#    -HttpCredential $creds `
#    -DisplayOutputType StandardError
Write-Host "Display the standard output ..." -ForegroundColor Green
Get-AzureRmHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

L'output del processo Pig sarà simile ai dati seguenti:

((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))

Risoluzione dei problemi

Errori durante l'esecuzione di processi

Quando si esegue il processo hive, è possibile riscontrare un errore simile al testo seguente:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.

Questo problema potrebbe essere causato dalle terminazioni di riga nel file di Python. Molti editor di Windows usano per impostazione predefinita CRLF come terminazione di riga, mentre le applicazioni Linux prevedono in genere LF.

È possibile usare le istruzioni di PowerShell seguenti per rimuovere i caratteri CR prima di caricare il file in HDInsight:

$original_file ='c:\path\to\hiveudf.py'
$text = [IO.File]::ReadAllText($original_file) -replace "`r`n", "`n"
[IO.File]::WriteAllText($original_file, $text)

Script di PowerShell

Entrambi gli script di esempio di PowerShell usati per eseguire gli esempi contengono una riga impostata come commento che mostra l'output degli errori relativi al processo. Se l'output previsto per il processo non viene visualizzato, rimuovere i simboli di commento dalla riga seguente e verificare se le informazioni di errore indicano un problema.

# Get-AzureRmHDInsightJobOutput `
        -Clustername $clusterName `
        -JobId $job.JobId `
        -HttpCredential $creds `
        -DisplayOutputType StandardError

Le informazioni sull'errore (STDERR) e il risultato del processo (STDOUT) vengono anche registrati nell'archivio di HDInsight.

Processo File nel contenitore BLOB da verificare
Hive /HivePython/stderr

/HivePython/stdout

Pig /PigPython/stderr

/PigPython/stdout

Passaggi successivi

Se è necessario caricare moduli Python non forniti per impostazione predefinita, vedere Come distribuire un modulo in Azure HDInsight.

Per altre modalità d'uso di Pig e Hive e per informazioni su come usare MapReduce, vedere i documenti seguenti: