Использование определяемых пользователем функций Python с Apache Hive и Apache Pig в HDInsight

Узнайте, как использовать определяемые пользователем функции (UDF) Python с Apache Hive и Apache Pig в Apache Hadoop на кластерах Azure HDInsight.

Python в HDInsight

Python2.7 устанавливается по умолчанию в HDInsight 3.0 и более поздних версий. Apache Hive можно использовать с этой версией Python для потоковой обработки. При этом для передачи данных между Hive и определяемой пользователем функцией используется STDOUT и STDIN.

В состав HDInsight также входят Jython, который представляет собой реализацию Python, написанную на Java. Jython выполняется непосредственно на виртуальной машине Java и не использует потоковую передачу. Jython является рекомендуемым интерпретатором Python при использовании Python с Pig.

Предварительные требования

Примечание

Учетная запись хранения, используемая в этой статье, относится к службе хранилища Azure с включенным безопасным перемещением, поэтому здесь везде используется wasbs.

Конфигурация хранилища

Если используемая учетная запись хранения относится к типу Storage (general purpose v1) или StorageV2 (general purpose v2), ничего делать не нужно. Процесс, приведенный в этой статье, создает выходные данные по крайней мере /tezstagingдля . Конфигурация hadoop по умолчанию содержится /tezstaging в переменной fs.azure.page.blob.dir конфигурации в для core-site.xml службы HDFS. Эта конфигурация приводит к выводу в каталог страничных BLOB-объектов, которые не поддерживаются для учетной записи хранения.BlobStorage Чтобы использовать BlobStorage для процесса, описанного в этой статье, удалите /tezstaging из переменной конфигурации fs.azure.page.blob.dir. Для доступа к конфигурации используйте пользовательский интерфейс Ambari. В противном случае вы получите сообщение об ошибке: Page blob is not supported for this account type.

Предупреждение

Шаги в этом документе основаны на следующих предположениях:

  • Вы создаете скрипты Python в локальной среде разработки.
  • Вы отправляете скрипты в HDInsight, используя команду scp или предоставленный скрипт PowerShell.

Если для работы с HDInsight вы хотите использовать Azure Cloud Shell (bash), вам необходимо:

  • Создать скрипты в среде Cloud Shell.
  • Использовать scp для отправки файлов из Cloud Shell в HDInsight.
  • Использовать ssh из Cloud Shell для подключения к HDInsight и выполнения примеров.

Определяемая пользователем функция Apache Hive

Скрипт Python можно использовать в качестве определяемой пользователем функции из Hive через HiveQL с помощью инструкции TRANSFORM. Например, следующий запрос HiveQL вызывает файл hiveudf.py, хранящийся в учетной записи хранения Azure по умолчанию для кластера.

add file wasbs:///hiveudf.py;

SELECT TRANSFORM (clientid, devicemake, devicemodel)
    USING 'python hiveudf.py' AS
    (clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;

Вот что делает данный пример:

  1. Инструкция add file в начале файла добавляет файл hiveudf.py в распределенный кэш, и он становится доступен всем узлам кластера.
  2. Инструкция SELECT TRANSFORM ... USING выбирает данные из hivesampletable. Она также передает параметры clientid, devicemake и devicemodel в скрипт hiveudf.py.
  3. Предложение AS описывает поля, возвращаемые из hiveudf.py.

Создать файл

В среде разработки создайте текстовый файл с именем hiveudf.py. Используйте следующий код в качестве содержимого файла:

#!/usr/bin/env python
import sys
import string
import hashlib

while True:
    line = sys.stdin.readline()
    if not line:
        break

    line = string.strip(line, "\n ")
    clientid, devicemake, devicemodel = string.split(line, "\t")
    phone_label = devicemake + ' ' + devicemodel
    print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])

Сценарий выполняет следующие действия:

  1. Считывается строка данных из STDIN.
  2. Стоящий в конце знак новой строки удаляется с помощью string.strip(line, "\n ").
  3. При обработке потока в одной строке будут содержаться все значения, разделенные символом табуляции. Поэтому можно использовать string.split(line, "\t") для разделения входящих данных при каждой табуляции, возвращая лишь поля.
  4. По завершении обработки результат должен быть записан в поток STDOUT в виде одной строки, с разделенными символами табуляции полями. Например, print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()]).
  5. Цикл while повторяется до тех пор, пока считывается line.

Выходные данные скрипта представляют собой объединенные входные значения для devicemake и devicemodel, а также хэш для объединенного значения.

Отправка файла (оболочка)

Следующая команда заменяет sshuser фактическим именем пользователя, если оно отличается. Вместо mycluster укажите реальное имя кластера. Рабочей должна быть папка, в которой находится файл.

  1. Используйте scp для копирования файлов в кластер HDInsight. Измените и введите команду :

    scp hiveudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Используйте SSH, чтобы подключиться к кластеру. Измените и введите команду :

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. В сеансе SSH добавьте отправленные ранее файлы Python в хранилище для кластера.

    hdfs dfs -put hiveudf.py /hiveudf.py
    

Использование Hive UDF (оболочка)

  1. Чтобы подключиться к Hive, в открытом сеансе SSH введите следующую команду:

    beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
    

    Эта команда запускает клиент Beeline.

  2. Введите следующий запрос 0: jdbc:hive2://headnodehost:10001/> в командной строке:

    add file wasbs:///hiveudf.py;
    SELECT TRANSFORM (clientid, devicemake, devicemodel)
        USING 'python hiveudf.py' AS
        (clientid string, phoneLabel string, phoneHash string)
    FROM hivesampletable
    ORDER BY clientid LIMIT 50;
    
  3. После ввода последней строки задание должно начаться. По завершении задания эта команда возвращает выходные данные следующего вида:

    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100041    RIM 9650    d476f3687700442549a83fac4560c51c
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
    
  4. Чтобы выйти из Beeline, введите следующую команду:

    !q
    

Отправка файла (PowerShell)

PowerShell также можно использовать для удаленного запуска запросов на использование Hive. Рабочей должна быть папка, в которой находится hiveudf.py. Чтобы выполнить запрос Hive, использующий скрипт hiveudf.py, примените следующий скрипт PowerShell:

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToStreamingFile = ".\hiveudf.py"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToStreamingFile `
    -Blob "hiveudf.py" `
    -Container $container `
    -Context $context

Примечание

Дополнительные сведения об отправке файлов см. в статье Отправка данных для заданий Apache Hadoop в HDInsight.

Использование Hive UDF

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"

$HiveQuery = "add file wasbs:///hiveudf.py;" +
                "SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
                "USING 'python hiveudf.py' AS " +
                "(clientid string, phoneLabel string, phoneHash string) " +
                "FROM hivesampletable " +
                "ORDER BY clientid LIMIT 50;"

# Create Hive job object
$jobDefinition = New-AzHDInsightHiveJobDefinition `
    -Query $HiveQuery

# For status bar updates
$activity="Hive query"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting query..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting on query to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -JobId $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
   -Clustername $clusterName `
   -JobId $job.JobId `
   -HttpCredential $creds `
   -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Результат выполнения задания Hive должен выглядеть аналогично следующему примеру:

100041    RIM 9650    d476f3687700442549a83fac4560c51c
100041    RIM 9650    d476f3687700442549a83fac4560c51c
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9
100042    Apple iPhone 4.2.x    375ad9a0ddc4351536804f1d5d0ea9b9

Определяемая пользователем функция Apache Pig

Скрипт Python можно использовать в виде определяемой пользователем функции из Pig с использованием инструкции GENERATE. Вы можете запустить скрипт с помощью Jython или CPython.

  • Jython работает на виртуальной машине Java и изначально может вызываться из Pig.
  • CPython является внешним процессом, поэтому данные из Pig на JVM отправляются в скрипт, выполняющийся в процессе Python. Выходные данные скрипта Python отправляются обратно в Pig.

Чтобы указать интерпретатор Python, используйте register при указании ссылки на скрипт Python. Следующие примеры регистрируют скрипты с Pig в качестве myfuncs:

  • Для использования Jython:register '/path/to/pigudf.py' using jython as myfuncs;
  • Для использования CPython:register '/path/to/pigudf.py' using streaming_python as myfuncs;

Важно!

При использовании Jython путь к файлу pig_jython может быть локальным или указан как WASBS://. Но при использовании CPython необходимо указать ссылку на файл в локальной файловой системе узла, который используется для отправки задания Pig.

После регистрации язык Pig Latin будет одинаковым для обоих примеров:

LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;

Вот что делает данный пример:

  1. Первая строка загружает образец файла данных sample.log в LOGS. Она также определяет каждую запись как массив символов chararray.
  2. Следующая строка отфильтровывает все пустые значения, сохраняя результат работы в LOG.
  3. Затем выполняется итерация по записям в LOG и используется инструкция GENERATE для вызова метода create_structure, содержащегося в скрипте Python или Jython, загруженном как myfuncs. LINE используется для передачи текущей записи в функцию.
  4. Наконец, выходные данные сбрасываются в поток STDOUT командой DUMP. После завершения операции эта команда выведет результат.

Создать файл

В среде разработки создайте текстовый файл с именем pigudf.py. Используйте следующий код в качестве содержимого файла:

# Uncomment the following if using C Python
#from pig_util import outputSchema


@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
    if (input.startswith('java.lang.Exception')):
        input = input[21:len(input)] + ' - java.lang.Exception'
    date, time, classname, level, detail = input.split(' ', 4)
    return date, time, classname, level, detail

В примере с Pig Latin входные данные LINE определены как массив знаков, т. к. согласованной схемы входных данных нет. Скрипт Python выполняет преобразование данных в согласованную схему на выходе.

  1. Инструкция @outputSchema задает формат данных, в котором они возвращаются в Pig. В данном случае это data bag, являющийся типом данных Pig. Корзина содержит следующие поля, все они имеют тип "Массив строк" (строки):

    • date — дата создания записи журнала;
    • date — время создания записи журнала;
    • classname — имя класса, для которого создана запись;
    • level — уровень журналирования;
    • detail — подробная информация о записи журнала.
  2. Затем def create_structure(input) определяет функцию, в которую Pig отправляет строковые элементы.

  3. Данные для примера, sample.log, в основном соответствуют схеме даты, времени, имени класса, уровня и подробной информации. Однако он содержит несколько строк, начинающихся с *java.lang.Exception*. Эти строки должны быть изменены в соответствии со схемой. Инструкция if проверяет на наличие таких строк, затем обрабатывает входные данные, переставляя строку *java.lang.Exception* в конец, формируя данные в соответствии с ожидаемой схемой.

  4. Затем команда split используется для разделения данных по первым четырем символам пробела. Выходным данным присваиваются значения date, time, classname, level и detail.

  5. И результаты возвращаются в Pig.

Когда данные возвращаются в Pig, они имеют согласованную схему, определенную инструкцией @outputSchema.

Отправка файла (оболочка)

В командах ниже вместо sshuser укажите реальное имя пользователя, если оно отличается. Вместо mycluster укажите реальное имя кластера. Рабочей должна быть папка, в которой находится файл.

  1. Используйте scp для копирования файлов в кластер HDInsight. Измените и введите команду :

    scp pigudf.py sshuser@mycluster-ssh.azurehdinsight.net:
    
  2. Используйте SSH, чтобы подключиться к кластеру. Измените и введите команду :

    ssh sshuser@mycluster-ssh.azurehdinsight.net
    
  3. В сеансе SSH добавьте отправленные ранее файлы Python в хранилище для кластера.

    hdfs dfs -put pigudf.py /pigudf.py
    

Использование Pig UDF (оболочка)

  1. Чтобы подключиться к Pig, в открытом сеансе SSH введите следующую команду:

    pig
    
  2. В окне запроса grunt> введите следующие операторы:

    Register wasbs:///pigudf.py using jython as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    
  3. После ввода указанной строки должно запуститься задание. По завершении задания эта команда возвращает выходные данные следующего вида:

    ((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
    ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
    ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
    ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
    ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
    
  4. Используйте quit для выхода из оболочки Grunt, а затем следующую команду для изменения файла pigudf.py в локальной файловой системе:

    nano pigudf.py
    
  5. Войдите в редактор и раскомментируйте следующую строку, удалив символ # в начале строки.

    #from pig_util import outputSchema
    

    Эта строка изменяет сценарий Python для работы с C Python вместо Jython. Закончив вносить изменения, нажмите клавиши CTRL+X, чтобы выйти из редактора. Выберите Y (Да) и нажмите клавишу ВВОД, чтобы сохранить изменения.

  6. Используйте команду pig , чтобы снова запустить оболочку. При появлении запроса grunt> введите следующие инструкции, чтобы запустить сценарий Python с помощью интерпретатора CPython.

    Register 'pigudf.py' using streaming_python as myfuncs;
    LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
    LOG = FILTER LOGS by LINE is not null;
    DETAILS = foreach LOG generate myfuncs.create_structure(LINE);
    DUMP DETAILS;
    

    Когда это задание будет выполнено, вы увидите такой же результат, как при запуске сценария с помощью Jython.

Отправка файла (PowerShell)

PowerShell также можно использовать для удаленного запуска запросов на использование Hive. Рабочей должна быть папка, в которой находится pigudf.py. Чтобы выполнить запрос Hive, использующий скрипт pigudf.py, примените следующий скрипт PowerShell:

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"

# Revise file path as needed
$pathToJythonFile = ".\pigudf.py"


# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
   -ResourceGroupName $resourceGroup `
   -Name $storageAccountName)[0].Value

# Create an Azure Storage context
$context = New-AzStorageContext `
    -StorageAccountName $storageAccountName `
    -StorageAccountKey $storageAccountKey

# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
    -File $pathToJythonFile `
    -Blob "pigudf.py" `
    -Container $container `
    -Context $context

Использование Pig UDF (PowerShell)

Примечание

При удаленной отправке задания с помощью PowerShell нельзя использовать CPython в качестве интерпретатора.

PowerShell также можно использовать для запуска заданий Pig Latin. Чтобы выполнить задание Pig Latin, использующее скрипт pigudf.py, примените следующий скрипт PowerShell:

# Script should stop on failures
$ErrorActionPreference = "Stop"

# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
    Connect-AzAccount
}

# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"


$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
            "LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
            "LOG = FILTER LOGS by LINE is not null;" +
            "DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
            "DUMP DETAILS;"

# Create Pig job object
$jobDefinition = New-AzHDInsightPigJobDefinition -Query $PigQuery

# For status bar updates
$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
    -ClusterName $clusterName `
    -JobDefinition $jobDefinition `
    -HttpCredential $creds

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
    -Job $job.JobId `
    -ClusterName $clusterName `
    -HttpCredential $creds

# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds `
    -DisplayOutputType StandardError
#>

# Progress bar (optional)
Write-Progress -Activity $activity "Retrieving output..."

# Gets the log output
Get-AzHDInsightJobOutput `
    -Clustername $clusterName `
    -JobId $job.JobId `
    -HttpCredential $creds

Результат выполнения задания Pig должен выглядеть аналогично следующим данным:

((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))

Устранение неполадок

Ошибки при выполнении заданий

При выполнении задания hive может возникнуть ошибка, аналогичная приведенной ниже:

Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.

Эта проблема может быть вызвана символами окончания строк в файле Python. Многие редакторы Windows по умолчанию используют символы CRLF, но в приложениях Linux обычно ожидается использование символа LF.

Вы можете использовать следующие команды PowerShell для удаления символов CR перед передачей файла в HDInsight:

Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."

# Wait for completion or failure of specified job

Сценарии PowerShell

Оба примера скриптов PowerShell, используемых для запуска примеров, содержат закомментированную строку, которая отображает вывод ошибок для задания. Если вы не видите ожидаемых результатов задания, раскомментируйте следующую строку и посмотрите, нет ли в описании ошибки информации о проблеме.

$activity="Pig job"

# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."

Сведения об ошибках (STDERR) и результат выполнения задания (STDOUT) также записываются в хранилище HDInsight.

Для данного задания... Смотрите эти файлы в контейнере
Hive /HivePython/stderr

/HivePython/stdout

Pig, /PigPython/stderr

/PigPython/stdout

Дальнейшие действия

Если вам нужно загрузить модули Python, которые не поставляются по умолчанию, см. статью How to deploy a Python module to Windows Azure HDInsight (Как развернуть модуль Python в Windows Azure HDInsight).

Сведения о других способах использования Pig и Hive и дополнительную информацию об использовании MapReduce см. в следующих документах: