Utilizar as Funções Definidas pelo Utilizador (UDF) do Python com o Apache Hive e o Apache Pig no HDInsight

Saiba como utilizar funções definidas pelo utilizador (UDF) do Python com o Apache Hive e o Apache Pig no Apache Hadoop no Azure HDInsight.

Python no HDInsight

Python2.7 está instalado por predefinição no HDInsight 3.0 e posterior. O Apache Hive pode ser utilizado com esta versão do Python para processamento de fluxos. O processamento de fluxos utiliza STDOUT e STDIN para transmitir dados entre o Hive e o UDF.

O HDInsight também inclui jython, que é uma implementação python escrita em Java. O Jython é executado diretamente na Máquina Virtual Java e não utiliza a transmissão em fluxo. O Jython é o interpretador Python recomendado ao utilizar o Python com o Pig.

Pré-requisitos

Nota

A conta de armazenamento utilizada neste artigo era o Armazenamento do Azure com a transferência segura ativada e, portanto, wasbs é utilizada ao longo do artigo.

Configuração do armazenamento

Não é necessária qualquer ação se a conta de armazenamento utilizada for do tipo Storage (general purpose v1) ou StorageV2 (general purpose v2). O processo neste artigo produz um resultado para, pelo menos /tezstaging, . Uma configuração de hadoop predefinida contém /tezstaging a fs.azure.page.blob.dir variável de configuração no core-site.xml para o serviço HDFS. Esta configuração faz com que a saída para o diretório seja blobs de páginas, que não são suportadas para o tipo BlobStoragede conta de armazenamento . Para utilizar BlobStorage para este artigo, remova /tezstaging da fs.azure.page.blob.dir variável de configuração. A configuração pode ser acedida a partir da IU do Ambari. Caso contrário, receberá a mensagem de erro: Page blob is not supported for this account type.

Aviso

Os passos neste documento fazem as seguintes suposições:

  • Pode criar os scripts do Python no seu ambiente de desenvolvimento local.
  • Carregue os scripts para o HDInsight com o scp comando ou o script do PowerShell fornecido.

Se quiser utilizar o Azure Cloud Shell (bash) para trabalhar com o HDInsight, tem de:

  • Crie os scripts dentro do ambiente do Cloud Shell.
  • Utilize scp para carregar os ficheiros do cloud shell para o HDInsight.
  • Utilize ssh a partir do cloud shell para ligar ao HDInsight e executar os exemplos.

Apache Hive UDF

O Python pode ser utilizado como um UDF do Hive através da instrução HiveQL TRANSFORM . Por exemplo, o seguinte HiveQL invoca o hiveudf.py ficheiro armazenado na conta de Armazenamento do Azure predefinida do cluster.

add file wasbs:///hiveudf.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
    USING 'python hiveudf.py' AS
    (clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

Eis o que este exemplo faz:

  1. A add file instrução no início do ficheiro adiciona o hiveudf.py ficheiro à cache distribuída, para que seja acessível por todos os nós no cluster.
  2. A SELECT TRANSFORM ... USING instrução seleciona os dados de hivesampletable. Também transmite os valores clientid, devicemake e devicemodel para o hiveudf.py script.
  3. A AS cláusula descreve os campos devolvidos de hiveudf.py.

Criar ficheiro

No seu ambiente de desenvolvimento, crie um ficheiro de texto com o nome hiveudf.py. Utilize o seguinte código como o conteúdo do ficheiro:

#!/usr/bin/env python
import sys
import string
import hashlib

while True:
    line = sys.stdin.readline()
    if not line:
        break

    line = string.strip(line, "\n ")
    clientid, devicemake, devicemodel = string.split(line, "\t")
    phone_label = devicemake + ' ' + devicemodel
    print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

Este script executa as seguintes ações:

  1. Lê uma linha de dados da STDIN.
  2. O caráter de nova linha à direita é removido com string.strip(line, "\n ").
  3. Ao efetuar o processamento de fluxos, uma única linha contém todos os valores com um caráter de tabulação entre cada valor. Assim, string.split(line, "\t") pode ser utilizado para dividir a entrada em cada separador, devolvendo apenas os campos.
  4. Quando o processamento estiver concluído, o resultado tem de ser escrito em STDOUT como uma única linha, com um separador entre cada campo. Por exemplo, print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]).
  5. O while ciclo repete-se até que não line seja lido.

A saída do script é uma concatenação dos valores de entrada de devicemake e devicemodel, bem como um hash do valor concatenado.

Carregar ficheiro (shell)

O comando seguinte é substituído sshuser pelo nome de utilizador real, se diferente. Substitua pelo mycluster nome do cluster real. Certifique-se de que o diretório de trabalho é onde o ficheiro está localizado.

  1. Utilize scp para copiar os ficheiros para o cluster do HDInsight. Edite e introduza o comando:

    scp hiveudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Utilize o SSH para ligar ao cluster. Edite e introduza o comando:

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. Na sessão SSH, adicione os ficheiros Python carregados anteriormente ao armazenamento do cluster.

    hdfs dfs -put hiveudf.py /hiveudf.py
    

Utilizar a UDF do Hive (shell)

  1. Para ligar ao Hive, utilize o seguinte comando a partir da sessão SSH aberta:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
    

    Este comando inicia o cliente Beeline.

  2. Introduza a seguinte consulta na linha 0: jdbc:hive2://headnodehost:10001/> de comandos:

    add file wasbs:///hiveudf.py;
    SELECT TRANSFORM (clientid, devicemake, devicemodel)
        USING 'python hiveudf.py' AS
        (clientid string, phoneLabel string, phoneHash string)
    FROM hivesampletable
    ORDER BY clientid LIMIT 50;
    
  3. Assim que a última linha for introduzida, a tarefa deverá ser iniciada. Depois de concluída a tarefa, devolve um resultado semelhante ao seguinte exemplo:

    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    
  4. Para sair do Beeline, introduza o seguinte comando:

    !q
    

Carregar ficheiro (PowerShell)

O PowerShell também pode ser utilizado para executar remotamente consultas do Hive. Certifique-se de que o diretório de trabalho está hiveudf.py localizado. Utilize o seguinte script do PowerShell para executar uma consulta do Hive que utiliza o hiveudf.py script:

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToStreamingFile = ".\hiveudf.py"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToStreamingFile `
    -Blob "hiveudf.py" `
    -Container $container `
    -Context $context

Nota

Para obter mais informações sobre como carregar ficheiros, veja o documento Carregar dados para tarefas do Apache Hadoop no HDInsight .

Utilizar a UDF do Hive

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"

$HiveQuery = "add file wasbs:///hiveudf.py;" +
                "SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
                "USING 'python hiveudf.py' AS " +
                "(clientid string, phoneLabel string, phoneHash string) " +
                "FROM hivesampletable " +
                "ORDER BY clientid LIMIT 50;"

# Create Hive job object
$jobDefinition = New-AzHDInsightHiveJobDefinition `
    -Query $HiveQuery

# For status bar updates
$activity="Hive query"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting query..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting on query to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -JobId $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
   -Clustername $clusterName `
   -JobId $job.JobId `
   -HttpCredential $creds `
   -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

O resultado da tarefa do Hive deve ser semelhante ao seguinte exemplo:

100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9

Apache Pig UDF

Um script python pode ser utilizado como um UDF do Pig através da GENERATE instrução . Pode executar o script com Jython ou C Python.

  • O Jython é executado no JVM e pode ser chamado nativamente a partir do Pig.
  • O C Python é um processo externo, pelo que os dados do Pig no JVM são enviados para o script em execução num processo python. O resultado do script do Python é enviado de volta para o Pig.

Para especificar o interpretador Python, utilize register quando referenciar o script de Python. Os exemplos seguintes registam scripts com o Pig como myfuncs:

  • Para utilizar o Jython: register '/path/to/pigudf.py' using jython as myfuncs;
  • Para utilizar o C Python: register '/path/to/pigudf.py' using streaming_python as myfuncs;

Importante

Ao utilizar o Jython, o caminho para o ficheiro pig_jython pode ser um caminho local ou um caminho WASBS://. No entanto, ao utilizar o C Python, tem de referenciar um ficheiro no sistema de ficheiros local do nó que está a utilizar para submeter a tarefa do Pig.

Após o registo anterior, o Pig Latin para este exemplo é o mesmo para ambos:

LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;

Eis o que este exemplo faz:

  1. A primeira linha carrega o ficheiro de dados de exemplo para sample.logLOGS. Também define cada registo como um chararray.
  2. A linha seguinte filtra quaisquer valores nulos, armazenando o resultado da operação em LOG.
  3. Em seguida, itera sobre os registos em LOG e utiliza GENERATE para invocar o create_structure método contido no script Python/Jython carregado como myfuncs. LINE é utilizado para transmitir o registo atual para a função.
  4. Por fim, as saídas são transferidas para STDOUT com o DUMP comando . Este comando apresenta os resultados após a conclusão da operação.

Criar ficheiro

No seu ambiente de desenvolvimento, crie um ficheiro de texto com o nome pigudf.py. Utilize o seguinte código como o conteúdo do ficheiro:

# Uncomment the following if using C Python
#from pig_util import outputSchema


@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
    if (input.startswith('java.lang.Exception')):
        input = input[21:len(input)] + ' - java.lang.Exception'
    date, time, classname, level, detail = input.split(' ', 4)
    return date, time, classname, level, detail

No exemplo Pig Latin, a LINE entrada é definida como uma chararray porque não existe um esquema consistente para a entrada. O script python transforma os dados num esquema consistente para a saída.

  1. A @outputSchema instrução define o formato dos dados que são devolvidos ao Pig. Neste caso, é um saco de dados, que é um tipo de dados Pig. O saco contém os seguintes campos, todos os quais são chararray (cadeias):

    • date - a data em que a entrada de registo foi criada
    • time - a hora em que a entrada de registo foi criada
    • classname - o nome da classe para o que a entrada foi criada
    • nível - o nível de registo
    • detail - detalhes verbosos da entrada de registo
  2. Em seguida, o def create_structure(input) define a função para a qual o Pig transmite itens de linha.

  3. Os dados de exemplo, sample.log, estão em conformidade com a data, hora, nome da classe, nível e esquema de detalhes. No entanto, contém algumas linhas que começam com *java.lang.Exception*. Estas linhas têm de ser modificadas para corresponderem ao esquema. A if instrução verifica esses dados e, em seguida, massaja os dados de entrada para mover a *java.lang.Exception* cadeia para o fim, colocando os dados em linha com o esquema de saída esperado.

  4. Em seguida, o split comando é utilizado para dividir os dados nos primeiros quatro carateres de espaço. O resultado é atribuído a date, , time, classname, levele detail.

  5. Por fim, os valores são devolvidos ao Pig.

Quando os dados são devolvidos ao Pig, este tem um esquema consistente, conforme definido na @outputSchema instrução.

Carregar ficheiro (shell)

Nos comandos abaixo, substitua sshuser pelo nome de utilizador real, se diferente. Substitua mycluster pelo nome real do cluster. Certifique-se de que o diretório de trabalho é onde o ficheiro está localizado.

  1. Utilize scp para copiar os ficheiros para o cluster do HDInsight. Edite e introduza o comando:

    scp pigudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Utilize o SSH para ligar ao cluster. Edite e introduza o comando:

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. Na sessão SSH, adicione os ficheiros Python carregados anteriormente ao armazenamento do cluster.

    hdfs dfs -put pigudf.py /pigudf.py
    

Utilizar o Pig UDF (shell)

  1. Para ligar ao pig, utilize o seguinte comando da sessão SSH aberta:

    pig
    
  2. Introduza as seguintes instruções na linha grunt> de comandos:

    Register wasbs:///pigudf.py using jython as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    
  3. Depois de introduzir a seguinte linha, a tarefa deve ser iniciada. Assim que a tarefa for concluída, devolve um resultado semelhante aos seguintes dados:

    ((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
    ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
    ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
    ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
    ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
    
  4. Utilize quit para sair da shell Grunt e, em seguida, utilize o seguinte para editar o ficheiro pigudf.py no sistema de ficheiros local:

    nano pigudf.py
    
  5. Uma vez no editor, anule o comentário da seguinte linha ao remover o # caráter do início da linha:

    #from pig_util import outputSchema
    

    Esta linha modifica o script python para funcionar com O Python C em vez de Jython. Depois de efetuar a alteração, utilize Ctrl+X para sair do editor. Selecione Y e, em seguida, Introduza para guardar as alterações.

  6. Utilize o pig comando para iniciar a shell novamente. Assim que estiver na linha grunt> de comandos, utilize o seguinte para executar o script python com o interpretador C Python.

    Register 'pigudf.py' using streaming_python as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    

    Assim que esta tarefa estiver concluída, deverá ver o mesmo resultado que quando executou anteriormente o script com jython.

Carregar ficheiro (PowerShell)

O PowerShell também pode ser utilizado para executar remotamente consultas do Hive. Certifique-se de que o diretório de trabalho está localizado pigudf.py . Utilize o seguinte script do PowerShell para executar uma consulta do Hive que utiliza o pigudf.py script:

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToJythonFile = ".\pigudf.py"


# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToJythonFile `
    -Blob "pigudf.py" `
    -Container $container `
    -Context $context

Utilizar o Pig UDF (PowerShell)

Nota

Ao submeter remotamente uma tarefa com o PowerShell, não é possível utilizar o C Python como interpretador.

O PowerShell também pode ser utilizado para executar tarefas do Pig Latin. Para executar uma tarefa pig latin que utiliza o pigudf.py script, utilize o seguinte script do PowerShell:

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"


$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
            "LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
            "LOG = FILTER LOGS by LINE is not null;" +
            "DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
            "DUMP DETAILS;"

# Create Pig job object
$jobDefinition = New-AzHDInsightPigJobDefinition -Query $PigQuery

# For status bar updates
$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -Job $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds `
    -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

O resultado da tarefa Pig deve ser semelhante aos seguintes dados:

((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))

Resolução de problemas

Erros ao executar tarefas

Ao executar a tarefa do hive, poderá encontrar um erro semelhante ao seguinte texto:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.

Este problema pode ser causado pelas terminações de linha no ficheiro Python. Muitos editores do Windows têm a predefinição de utilizar CRLF como a linha que termina, mas as aplicações linux normalmente esperam LF.

Pode utilizar as seguintes instruções do PowerShell para remover os carateres CR antes de carregar o ficheiro para o HDInsight:

Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job

Scripts PowerShell

Ambos os scripts do PowerShell de exemplo utilizados para executar os exemplos contêm uma linha comentada que apresenta o resultado do erro para a tarefa. Se não estiver a ver a saída esperada para a tarefa, anule o comentário da linha seguinte e veja se as informações de erro indicam um problema.

$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

As informações de erro (STDERR) e o resultado da tarefa (STDOUT) também são registadas no armazenamento do HDInsight.

Para este trabalho... Veja estes ficheiros no contentor de blobs
Hive /HivePython/stderr

/HivePython/stdout

Pig /PigPython/stderr

/PigPython/stdout

Passos seguintes

Se precisar de carregar módulos Python que não são fornecidos por predefinição, veja Como implementar um módulo no Azure HDInsight.

Para obter outras formas de utilizar o Pig, o Hive e saber mais sobre como utilizar o MapReduce, veja os seguintes documentos: