Автоматическая приостановка задания с помощью PowerShell и Функций Azure или службы автоматизации Azure

Для некоторых приложений требуется обработка потоков, которую можно с легкостью организовать с помощью Azure Stream Analytics (ASA). Однако непрерывное выполнение не требуется. Причины:

  • Входные данные поступают по расписанию (в начале часа...)
  • Разреженный или небольшой объем входящих данных (несколько записей в минуту)
  • Бизнес-процессы, эффективность которых повышается за счет возможностей временных окон, но которые изначально выполняются в пакетном режиме (процессы финансового отдела или отдела кадров...)
  • Демонстрации, прототипы или тесты с низким уровнем выполнения длительных заданий

Преимуществом отказа от выполнения таких заданий будет снижение затрат, поскольку задания Stream Analytics оплачиваются за единицу потоковой передачи по времени.

В этой статье объясняется, как настроить автоматическую приостановку заданий Azure Stream Analytics. Мы покажем, как настроить задачу, которая автоматически приостанавливает и возобновляет задание по расписанию. Приостановка означает, что фактическое состояние задания будет Остановлено, что позволяет избежать выставления счетов.

Сначала мы обсудим общую структуру, затем рассмотрим необходимые компоненты и некоторые детали реализации.

Примечание

Существуют и недостатки автоматической приостановки заданий. Основные из них связаны с потерей возможностей низкой задержки/реального времени, а также с потенциальными рисками, позволяющими избежать неконтролируемый рост количества необработанных событий во время приостановки задания. Автоматическую приостановку не следует использовать в большинстве рабочих сценариев, выполняющихся в большом масштабе.

Конструирование

В этом примере нам необходимо, чтобы задание выполнялось в течение n минут, прежде чем приостановить его на m минут. При приостановке задания входные данные не потребляются, они накапливаются. После запуска задания оно будет перехватывать эту невыполненную работу и обрабатывать поступающие данные, прежде чем снова завершить работу.

Diagram that illustrates the behavior of the auto-paused job over time

При выполнении задача не должна прекращать задание до тех пор, пока его метрики не будут указывать на работоспособность. Нас интересуют метрики невыполненной работы и предела. Мы будем отслеживать, что оба этих показателя находятся на базовом уровне по крайней мере в течение n минут. Такое поведение приводит к двум действиям:

  • Остановленное задание перезапускается через m минут.
  • Выполняемое задание останавливается в любое время через n минут, как только его показатели невыполненной работы и предела будут указывать на работоспособность.

Diagram that shows the possible states of the job

Например, давайте возьмем n = 5 минут и m = 10 минут. С этими параметрами у задания будет не менее 5 минут для обработки всех данных, полученных за 15 минут. Потенциальная экономия затрат составляет до 66 %.

Чтобы перезапустить задание, мы будем использовать When Last Stoppedпараметр запуска. Этот параметр указывает ASA обрабатывать все события, которые были зарегистрированы как необработанные с момента остановки задания. Но тут есть два момента, которые следует учесть. Во первых, задание не может оставаться остановленным дольше, чем срок хранения входного потока. Если задание выполняется только раз в день, необходимо убедиться, что срок хранения концентратора событий больше одного дня. Во-вторых, задание должно быть запущено по крайней мере один раз, чтобы режим When Last Stopped был принят (в противном случае он считается никогда не остановленным ранее). Поэтому первый запуск задания должен выполняться вручную, или необходимо расширить скрипт для этого случая.

Последний момент заключается в том, чтобы сделать эти действия идемпотентными. Таким образом, они могут повторяться без побочных эффектов, как для простоты использования, так и для обеспечения устойчивости.

Компоненты

Вызовы API

Мы предполагаем, что взаимодействовать с ASA необходимо в следующих аспектах:

  • Получение текущего состояния задания (управление ресурсами ASA)
    • Если выполняется
      • Получение времени с момента запуска (журналы)
      • Получение текущих значений метрик (метрики)
      • Если применимо, остановка задания (управление ресурсами ASA).
    • Если остановлено
      • Получение времени с момента остановки (журналы)
      • Если применимо, запуск задания (управление ресурсами ASA).

Для управления ресурсами ASA можно использовать либо REST API, либо пакет SDK для .NET, либо одну из библиотек CLI (Az CLI, PowerShell).

В случае с метриками и журналами в Azure все хранится централизованно в Azure Monitor с аналогичным выбором уровней API. Необходимо помнить, что при запросе API журналы и метрики всегда предоставляются от 1 до 3 минут. Поэтому задание для n значения 5 обычно означает, что в реальности задание будет выполняться от 6 до 8 минут. Также следует учитывать, что метрики всегда генерируются. Если задание остановлено, API возвращает пустые записи. Нам необходимо очистить выходные данные наших вызовов API, чтобы просмотреть только соответствующие значения.

Язык сценария

В этой статье мы решили реализовать автоматическую приостановку с помощью PowerShell. Первая причина этого выбора заключается в том, что PowerShell теперь является кросс-платформенным средством. Его можно запускать в любой ОС, что упрощает развертывание. Вторая причина заключается в том, что оно принимает и возвращает объекты, а не строки. Объекты упрощают анализ и обработку для задач автоматизации.

В PowerShell мы будем использовать модуль Az PowerShell, который включает Az.Monitor и Az.StreamAnalytics, для всего, что нам нужно:

Служба размещения

Для размещения нашей задачи PowerShell потребуется служба, которая предлагает запланированные запуски. Существует множество вариантов, но мы рассмотрим бессерверные:

  • Функции Azure — это бессерверный модуль вычислений, который может работать практически с любым фрагментом кода. Функции предоставляют триггер таймера, который может выполняться с периодичностью до секунды.
  • Служба автоматизации Azure — это управляемая служба, созданная для рабочих нагрузок и ресурсов в облаке. Это то, что нужно, однако минимальный интервал расписания составляет 1 час (меньше с обходными решениями).

Если не брать в расчет обходные решения, служба автоматизации Azure является более простым способом развертывания задачи. Но для сравнения в этой статье сначала мы создадим локальный скрипт. Когда у нас будет работающий скрипт, мы развернем его как в Функциях, так и в учетной записи службы автоматизации.

Инструменты для разработчиков

Локальное развертывание настоятельно рекомендуется выполнять с помощью VSCode, как для Функций, так и для ASA. Использование локальной интегрированной среды разработки позволяет использовать систему управления версиями, а также упрощает повторные развертывания. Но для краткости мы продемонстрируем процесс на портале Azure.

Локальное создание скрипта PowerShell

Лучший способ создания скрипта — локально. В PowerShell для кросс-платформенного выполнения скрипт можно написать и протестировать в любой ОС. В Windows можно использовать окно терминала Windows с PowerShell 7 и Az PowerShell.

Окончательный скрипт, который будет использоваться, доступен для Функцийслужбы автоматизации Azure). Он отличается от описанного ниже, который привязан к среде размещения (Функции или служба автоматизации). Мы рассмотрим это позже. Для начала рассмотрим версию, которая выполняется только локально.

Этот скрипт намеренно написан в простой форме, чтобы он был понятен всем.

Сначала зададим необходимые параметры и проверим состояние исходного задания:


# Setting variables
$restartThresholdMinute = 10 # This is M
$stopThresholdMinute = 5 # This is N

$maxInputBacklog = 0 # The amount of backlog we tolerate when stopping the job (in event count, 0 is a good starting point)
$maxWatermark = 10 # The amount of watermark we tolerate when stopping the job (in seconds, 10 is a good starting point at low SUs)

$subscriptionId = "<Replace with your Subscription Id - not the name>"
$resourceGroupName = "<Replace with your Resource Group Name>"
$asaJobName = "<Replace with your ASA job name>"

$resourceId = "/subscriptions/$($subscriptionId )/resourceGroups/$($resourceGroupName )/providers/Microsoft.StreamAnalytics/streamingjobs/$($asaJobName)"

# If not already logged, uncomment and run the 2 following commands
# Connect-AzAccount
# Set-AzContext -SubscriptionId $subscriptionId

# Check current ASA job status
$currentJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
Write-Output "asaRobotPause - Job $($asaJobName) is $($currentJobState)."

Затем, если задание выполняется, мы проверяем, выполнялось ли задание по крайней мере n минут, его невыполненную работу и его предел.


# Switch state
if ($currentJobState -eq "Running")
{
    # First we look up the job start time with Get-AzActivityLog
    ## Get-AzActivityLog issues warnings about deprecation coming in future releases, here we ignore them via -WarningAction Ignore
    ## We check in 1000 record of history, to make sure we're not missing what we're looking for. It may need adjustment for a job that has a lot of logging happening.
    ## There is a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). We move it down.
    $startTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Start Job*"}
    $startTimeStamp = $startTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}

    # Then we gather the current metric values
    ## Get-AzMetric issues warnings about deprecation coming in future releases, here we ignore them via -WarningAction Ignore
    $currentBacklog = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "InputEventsSourcesBacklogged" -DetailedOutput -WarningAction Ignore
    $currentWatermark = Get-AzMetric -ResourceId $resourceId -TimeGrain 00:01:00 -MetricName "OutputWatermarkDelaySeconds" -DetailedOutput -WarningAction Ignore

    # Metric are always lagging 1-3 minutes behind, so grabbing the last N minutes means checking N+3 actually. This may be overly safe and fined tune down per job.
    $Backlog =  $currentBacklog.Data |
                    Where-Object {$_.Maximum -ge 0} | # We remove the empty records (when the job is stopped or starting)
                    Sort-Object -Property Timestamp -Descending |
                    Where-Object {$_.Timestamp -ge $startTimeStamp} | # We only keep the records of the latest run
                    Select-Object -First $stopThresholdMinute | # We take the last N records
                    Measure-Object -Sum Maximum # We sum over those N records
    $BacklogSum = $Backlog.Sum

    $Watermark = $currentWatermark.Data |
                    Where-Object {$_.Maximum -ge 0} |
                    Sort-Object -Property Timestamp -Descending |
                    Where-Object {$_.Timestamp -ge $startTimeStamp} |
                    Select-Object -First $stopThresholdMinute |
                    Measure-Object -Average Maximum # Here we average
    $WatermarkAvg = [int]$Watermark.Average # Rounding the decimal value casting it to integer

    # Since we called Get-AzMetric with a TimeGrain of a minute, counting the number of records gives us the duration in minutes
    Write-Output "asaRobotPause - Job $($asaJobName) is running since $($startTimeStamp) with a sum of $($BacklogSum) backlogged events, and an average watermark of $($WatermarkAvg) sec, for $($Watermark.Count) minutes."

    # -le for lesser or equal, -ge for greater or equal
    if (
        ($BacklogSum -ge 0) -and ($BacklogSum -le $maxInputBacklog) -and ` # is not null and is under the threshold
        ($WatermarkAvg -ge 0) -and ($WatermarkAvg -le $maxWatermark) -and ` # is not null and is under the threshold
        ($Watermark.Count -ge $stopThresholdMinute) # at least N values
        )
    {
        Write-Output "asaRobotPause - Job $($asaJobName) is stopping..."
        Stop-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName
    }
    else {
        Write-Output "asaRobotPause - Job $($asaJobName) is not stopping yet, it needs to have less than $($maxInputBacklog) backlogged events and under $($maxWatermark) sec watermark for at least $($stopThresholdMinute) minutes."
    }
}

Если задание остановлено, в журнале будет указано, когда последний раз выполнялось действие "Остановить задание":


elseif ($currentJobState -eq "Stopped")
{
    # First we look up the job start time with Get-AzActivityLog
    ## Get-AzActivityLog issues warnings about deprecation coming in future releases, here we ignore them via -WarningAction Ignore
    ## We check in 1000 record of history, to make sure we're not missing what we're looking for. It may need adjustment for a job that has a lot of logging happening.
    ## There is a bug in Get-AzActivityLog that triggers an error when Select-Object First is in the same pipeline (on the same line). We move it down.
    $stopTimeStamp = Get-AzActivityLog -ResourceId $resourceId -MaxRecord 1000 -WarningAction Ignore | Where-Object {$_.EventName.Value -like "Stop Job*"}
    $stopTimeStamp = $stopTimeStamp | Select-Object -First 1 | Foreach-Object {$_.EventTimeStamp}

    # Get-Date returns a local time, we project it to the same time zone (universal) as the result of Get-AzActivityLog that we extracted above
    $minutesSinceStopped = ((Get-Date).ToUniversalTime()- $stopTimeStamp).TotalMinutes

    # -ge for greater or equal
    if ($minutesSinceStopped -ge $restartThresholdMinute)
    {
        Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it is now starting..."
        Start-AzStreamAnalyticsJob -ResourceGroupName $resourceGroupName -Name $asaJobName -OutputStartMode LastOutputEventTime
    }
    else{
        Write-Output "asaRobotPause - Job $($jobName) was paused $([int]$minutesSinceStopped) minutes ago, set interval is $($restartThresholdMinute), it will not be restarted yet."
    }
}
else {
    Write-Output "asaRobotPause - Job $($jobName) is not in a state I can manage: $($currentJobState). Let's wait a bit, but consider helping is that doesn't go away!"
}

В конце мы регистрируем в журнале завершение задания:


# Final ASA job status check
$newJobState = Get-AzStreamAnalyticsJob  -ResourceGroupName $resourceGroupName -Name $asaJobName | Foreach-Object {$_.JobState}
Write-Output "asaRobotPause - Job $($asaJobName) was $($currentJobState), is now $($newJobState). Job completed."

Вариант 1. Размещение задачи в Функциях Azure

Для справки можно воспользоваться исчерпывающим руководством разработчика PowerShell, предоставленным командой Функций Azure.

Для начала нам потребуется новое приложение-функция. Приложение-функция аналогична решению, в котором можно разместить несколько Функций.

Здесьприведена полная процедура, однако основные действия выполняются на портале Azure, а для создания нового приложения-функции используйте следующее:

  • Публикация: код
  • Среда выполнения: PowerShell Core
  • Версия:  7 и более поздние

После подготовки приступим к общей конфигурации.

Управляемые удостоверения для Функций Azure

Функциям требуются разрешения для запуска и завершения задания ASA. Мы назначим эти разрешения с помощью управляемого удостоверения.

Первым делом необходимо включить для функции управляемое удостоверение, назначаемое системой, следуя этой процедуре.

Теперь мы можем предоставить этому удостоверению правильные разрешения для задания ASA, которое требуется приостановить. Для этого на портале для задания ASA (не Функции) в разделе Элемент управления доступом (IAM) добавьте назначение роли для роли Участник для члена типа Управляемое удостоверение, выбрав имя Функции, указанное выше.

Screenshot of IAM settings for the ASA job

В скрипт PowerShell можно добавить проверку, которая гарантирует правильную настройку управляемого удостоверения (окончательный скрипт доступен здесь).


# Check if managed identity has been enabled and granted access to a subscription, resource group, or resource
$AzContext = Get-AzContext -ErrorAction SilentlyContinue
if (-not $AzContext.Subscription.Id)
{
    Throw ("Managed identity is not enabled for this app or it has not been granted access to any Azure resources. Please see /azure/app-service/overview-managed-identity for additional details.")
}

Мы также добавим некоторые сведения о ведении журнала, чтобы убедиться в том, что Функция срабатывает:


$currentUTCtime = (Get-Date).ToUniversalTime()

# Write an information log with the current time.
Write-Host "asaRobotPause - PowerShell timer trigger function is starting at time: $currentUTCtime"

Параметры для Функций Azure

Лучшим способом передать наши параметры в скрипт в Функциях является использование параметров приложения-функция в качестве переменных среды.

Для этого сначала перейдем на страницу приложения-функции странице, чтобы определить наши параметры как Параметры приложения после выполнения этой процедуры. Нам потребуется:

Имя Значение
maxInputBacklog Объем невыполненной работы, который допускается при остановке задания (для количества событий хорошей отправной точкой является значение "0").
maxWatermark Значение предела, которое допускается при остановке задания (в секундах, значение 10 является хорошей отправной точкой с низким уровнем SU).
restartThresholdMinute m: время (в минутах) до перезапуска остановленного задания.
stopThresholdMinute n: время (в минутах) до остановки выполняющегося задания. В течение этого времени для невыполненной работы должно оставаться значение 0.
subscriptionId Идентификатор подписки (не имя) задания ASA для автоматической приостановки.
имя_группы_ресурсов Имя группы ресурсов задания ASA для автоматической приостановки.
asaJobName Имя задания ASA, которое будет приостановлено автоматически

Позже потребуется обновить скрипт PowerShell, чтобы соответствующим образом загрузить переменные:

$maxInputBacklog = $env:maxInputBacklog
$maxWatermark = $env:maxWatermark

$restartThresholdMinute = $env:restartThresholdMinute
$stopThresholdMinute = $env:stopThresholdMinute

$subscriptionId = $env:subscriptionId
$resourceGroupName = $env:resourceGroupName
$asaJobName = $env:asaJobName

Требования к модулю PowerShell

Так же, как мы установили Az PowerShell локально для использования команд ASA (например, Start-AzStreamAnalyticsJob), необходимо добавить его в узел приложения-функции.

Для этого можно открыть Functions>App files на странице приложения-функции, выбрать requirements.psd1 и раскомментировать строку 'Az' = '6.*'. Чтобы это изменение вступило в силу, необходимо будет перезапустить все приложение.

Screenshot of the app files settings for the Function App

Создание функции

По завершении настройки можно создать конкретную функцию в приложении-функции, которая будет выполнять наш скрипт.

На портале мы создадим функцию, активируемую по таймеру (каждую минуту с 0 */1 * * * *, которая выполняет чтение "при наступлении 0 секунды каждой минуты"):

Screenshot of creating a new timer trigger function in the function app

При необходимости можно изменить значение таймера в Integration, обновив расписание:

Screenshot of the integration settings of the function

Затем в Code + Test можно скопировать скрипт в run.ps1 и протестировать его. Полный скрипт можно скопировать здесь. Бизнес-логика была перемещена в оператор TRY/CATCH для создания правильных ошибок в случае сбоя во время обработки.

Screenshot of Code+Test for the function

Чтобы проверить, что все работает правильно, можно воспользоваться кнопкой Тест/запуск на панели Code + Test. Также можно обратиться к панели Monitor, однако сведения на ней всегда запаздывают на пару выполнений.

Screenshot of the output of a successful run

Настройка оповещения о выполнении функции

Наконец, мы хотим получать оповещения, если функция выполняется со сбоями. Затраты на оповещения незначительные, однако они могут предотвратить возникновение более серьезных ситуаций, которые могут вылиться в копеечку.

На странице Приложение-функция в разделе Logs выполните следующий запрос, который возвращает все выполнения, которые завершились сбоем, за последние 5 минут:

requests
| where success == false
| where timestamp > ago(5min)
| summarize failedCount=sum(itemCount) by operation_Name
| order by failedCount desc

В редакторе запросов выберите New alert rule. На следующем экране определите измерение следующим образом:

  • Мера: failedCount
  • Тип агрегирования: Итог
  • Степень детализации агрегирования: 5 минут

Далее настройте логику оповещения следующим образом.

  • Оператор: Больше
  • Пороговое значение: 0
  • Частота вычислений: 5 минут

После этого повторно используйте или создайте новую группу действий, а затем завершите настройку.

Чтобы проверить, правильно ли настроено оповещение, можно добавить throw "Testing the alert" в любом месте скрипта PowerShell и подождать 5 минут, чтобы получить сообщение электронной почты.

Вариант 2. Размещение задачи в службе автоматизации Azure

Для начала нам потребуется новая учетная запись службы автоматизации. Учетная запись службы автоматизации аналогична решению, в котором можно разместить несколько модулей Runbook.

Процедура описана здесь. Здесь можно выбрать использование управляемого удостоверения, назначаемого системой, непосредственно на вкладке advanced.

Для справки команда службы автоматизации Azure создала отличный учебник, который поможет приступить к работе с модулями Runbook PowerShell.

Параметры для службы автоматизации Azure

С помощью модуля Runbook передать аргументы можно с помощью классического синтаксиса параметра PowerShell:

Param(
    [string]$subscriptionId,
    [string]$resourceGroupName,
    [string]$asaJobName,

    [int]$restartThresholdMinute,
    [int]$stopThresholdMinute,

    [int]$maxInputBacklog,
    [int]$maxWatermark
)

Управляемое удостоверение для службы автоматизации Azure

Учетная запись службы автоматизации должна получить управляемое удостоверение во время подготовки. Но при необходимости мы можем включить его с помощью этой процедуры.

Как и для функции, необходимо предоставить необходимые разрешения для задания ASA, которое требуется автоматически приостановить.

Для этого на портале для задания ASA (не службы автоматизации) в разделе Элемент управления доступом (IAM) добавьте назначение роли для роли Участник для члена типа Управляемое удостоверение, выбрав имя учетной записи службы автоматизации, указанное выше.

Screenshot of IAM settings for the ASA job

В скрипт PowerShell можно добавить проверку, которая гарантирует правильную настройку управляемого удостоверения (окончательный скрипт доступен здесь).

# Ensures you do not inherit an AzContext in your runbook
Disable-AzContextAutosave -Scope Process | Out-Null

# Connect using a Managed Service Identity
try {
        $AzureContext = (Connect-AzAccount -Identity).context
    }
catch{
        Write-Output "There is no system-assigned user identity. Aborting.";
        exit
    }

Создание Runbook

По завершении настройки можно создать конкретный модуль Runbook в учетной записи службы автоматизации, который будет выполнять наш скрипт. Здесь не нужно добавлять Az PowerShell в качестве требования, поскольку он уже встроен.

На портале в разделе "Автоматизация процессов" выберите Runbooks, затем выберите Create a runbook, укажите PowerShell в качестве типа Runbook и любую версию выше 7 в качестве версии (на текущий момент 7.1 (preview)).

Теперь можно вставить скрипт и протестировать его. Полный скрипт можно скопировать здесь. Бизнес-логика была перемещена в оператор TRY/CATCH для создания правильных ошибок в случае сбоя во время обработки.

Screenshot of the runbook script editor in Azure Automation

Мы можем проверить, что все работает правильно, в разделе Test Pane.

После этого необходимо выполнить команду Publish для задания, что позволит нам связать модуль Runbook с расписанием. Создание и связывание расписания — это простой процесс, который не рассматривается здесь. Теперь самое время запомнить, что существуют обходные решения для создания интервалов расписания менее 1 часа.

Наконец, можно настроить оповещение. Для начала необходимо включить журналы с помощью параметров диагностики учетной записи службы автоматизации. Затем необходимо настроить фиксацию ошибок с помощью запроса, такого как и для Функций.

Результат

Если обратить внимание на задание ASA, можно увидеть, что все работает, как ожидалось, в двух местах.

В журнале действий:

Screenshot of the logs of the ASA job

И на панели метрик:

Screenshot of the metrics of the ASA job

После того как скрипт понятен, его можно с легкостью изменить, чтобы расширить область его действия. Его можно обновить для списка заданий, а не для одного задания. Более крупные области можно определить и обработать с помощью тегов, групп ресурсов или даже целых подписок.

Техническая поддержка

Для получения дополнительной помощи воспользуйтесь страницей вопросов по Azure Stream Analytics на сайте Microsoft Q&A.

Дальнейшие действия

Вы ознакомились с основами использования PowerShell для автоматизации управления заданиями Azure Stream Analytics. Дополнительные сведения см. в следующих статьях: