Recuperar-se de falhas de consulta de fluxo de trabalho com fluxos de trabalho

O Fluxo Estruturado fornece tolerância a falhas e consistência de dados para consultas de streaming. Com os fluxos de trabalho do Azure Databricks, é possível configurar facilmente suas consultas de Fluxo Estruturado a fim de realizar a reinicialização automática em caso de falha. Ao habilitar o ponto de verificação para uma consulta de streaming, é possível reiniciar a consulta após uma falha. A consulta reiniciada continua de onde a consulta com falha parou.

Habilitar pontos de verificação para consultas do Fluxo Estruturado

O Databricks recomenda sempre especificar a opção checkpointLocation de um caminho de armazenamento em nuvem antes de iniciar a consulta. Por exemplo:

streamingDataFrame.writeStream
  .format("parquet")
  .option("path", "/path/to/table")
  .option("checkpointLocation", "/path/to/table/_checkpoint")
  .start()

Esse local de ponto de verificação preserva todas as informações essenciais que identificam exclusivamente uma consulta. Cada consulta deve ter um local de ponto de verificação diferente. Nunca use o mesmo local para várias consultas. Confira o Guia de programação de Fluxo Estruturado para obter mais detalhes.

Observação

Embora checkpointLocation seja uma opção necessária para a maioria dos tipos de coletores de saída, alguns deles, como o coletor de memória, podem gerar um local de ponto de verificação temporário automaticamente quando você não fornece checkpointLocation. Os locais de ponto de verificação temporários não garantem nenhuma tolerância a falhas ou consistência de dados e podem não ser limpos corretamente. Evite possíveis armadilhas sempre especificando um checkpointLocation.

Configurar trabalhos do Fluxo Estruturado para reiniciar as consultas do fluxo em caso de falha

É possível criar um trabalho de Azure Databricks com o notebook ou o JAR que tem suas consultas de streaming e configurá-lo para:

  • Sempre usar um novo cluster.
  • Sempre tentar novamente em caso de falha.

A reinicialização automática quando ocorre falha do trabalho é especialmente importante ao configurar cargas de trabalho de streaming com evolução do esquema. A evolução do esquema funciona no Azure Databricks gerando um erro esperado quando uma alteração de esquema é detectada e processando dados corretamente usando o novo esquema quando o trabalho é reiniciado. O Databricks recomenda sempre configurar as tarefas de streaming que contêm consultas com evolução do esquema para que elas reiniciem automaticamente nos fluxos de trabalho do Databricks.

Os trabalhos têm integração total com as APIs do Fluxo Estruturado e podem monitorar todas as consultas de streaming ativas em uma execução. Essa configuração garante que, se qualquer parte da consulta falhar, os trabalhos encerrarão automaticamente a execução (juntamente com todas as outras consultas) e iniciarão uma nova execução em um novo cluster. Isso realiza a nova execução do notebook ou código JAR e reinicia todas as consultas novamente. Esta é a maneira mais segura de voltar a um bom estado.

Observação

  • A falha em qualquer uma das consultas de streaming ativas faz com que a execução ativa falhe e encerre todas as outras consultas de streaming.
  • Você não precisa usar streamingQuery.awaitTermination() o ou spark.streams.awaitAnyTermination() o final do seu notebook. Os trabalhos impedem que uma execução seja concluída automaticamente quando uma consulta de streaming estiver ativa.
  • O Databricks recomenda usar trabalhos em vez de %run e dbutils.notebook.run() ao orquestrar notebooks de fluxo estruturado. Confira Executar um notebook do Databricks por meio de outro notebook.

Veja a seguir um exemplo de uma configuração de trabalho recomendada.

  • Cluster: defina isso sempre para usar um novo cluster e use a versão mais recente do Spark (ou pelo menos a versão 2.1). As consultas iniciadas no Spark a partir da versão 2.1 são recuperáveis após atualizações de consulta e versão do Spark.
  • Notificações: defina essa configuração se desejar receber notificações por email sobre falhas.
  • Agenda: não defina uma agenda.
  • Tempo limite: não defina um tempo limite. As consultas de streaming são executadas por um tempo indefinidamente longo.
  • Máximo de execuções simultâneas: defina como 1. Deve haver apenas uma instância de cada consulta ativa simultaneamente.
  • Tentativas: definido como Ilimitado.

Confira Criar e executar trabalhos do Azure Databricks para entender essas configurações.

Recuperar-se após alterações em uma consulta do Fluxo Estruturado

Há limitações no que as alterações em uma consulta de streaming são permitidas entre as reinicializações do mesmo local de ponto de verificação. Aqui estão algumas alterações que não são permitidas ou o efeito da alteração não está bem definido. Para todos eles:

  • O termo permitido significa que você pode fazer a alteração especificada, mas se a semântica de seu efeito é bem definida depende da consulta e da alteração.
  • O termo não permitido significa que você não deve fazer a alteração especificada, pois a consulta reiniciada provavelmente falhará com erros imprevisíveis.
  • sdf representa um DataFrame de streaming/Conjunto de dados gerado com sparkSession.readStream.

Tipos de alterações em consultas do Fluxo Estruturado

  • Alterações no número ou tipo (ou seja, fonte diferente) de fontes de entrada: isso não é permitido.
  • Alterações nos parâmetros de fontes de entrada: se isso é permitido e se a semântica da alteração é bem definida depende da origem e da consulta. Veja alguns exemplos.
    • Adição, exclusão e modificação de limites de taxa são permitidos:

      spark.readStream.format("kafka").option("subscribe", "article")
      

      para

      spark.readStream.format("kafka").option("subscribe", "article").option("maxOffsetsPerTrigger", ...)
      
    • As alterações nos artigos e arquivos assinados geralmente não são permitidas, pois os resultados são imprevisíveis: spark.readStream.format("kafka").option("subscribe", "article") para spark.readStream.format("kafka").option("subscribe", "newarticle")

  • Alterações no intervalo de gatilho: você pode alterar gatilhos entre lotes incrementais e intervalos de tempo. Consulte Alterando intervalos de gatilho entre execuções.
  • Alterações no tipo de coletor de saída: são permitidas alterações entre algumas combinações específicas de coletores. Isso precisa ser verificado em uma base caso a caso. Veja alguns exemplos.
    • O coletor de arquivo para o coletor Kafka é permitido. O Kafka verá apenas os novos dados.
    • O coletor de Kafka para o coletor de arquivos não é permitido.
    • O coletor Kafka mudou para foreach, ou vice-versa é permitido.
  • Alterações nos parâmetros do coletor de saída: se isso é permitido e se a semântica da alteração é bem definida conta com o coletor e a consulta. Veja alguns exemplos.
    • As alterações no diretório de saída de um coletor de arquivos não são permitidas: sdf.writeStream.format("parquet").option("path", "/somePath") para sdf.writeStream.format("parquet").option("path", "/anotherPath")
    • As alterações no tópico de saída têm permissão para: sdf.writeStream.format("kafka").option("topic", "topic1") para sdf.writeStream.format("kafka").option("topic", "topic2")
    • As alterações no coletor foreach definido pelo usuário (ou seja, o código ForeachWriter) são permitidas, mas a semântica da alteração conta com o código.
  • Alterações em operações semelhantes de projeção/filtro/mapa: alguns casos são permitidos. Por exemplo:
    • Adição/exclusão de filtros é permitido: sdf.selectExpr("a") para sdf.where(...).selectExpr("a").filter(...).
    • As alterações nas projeções com o mesmo esquema de saída são permitidas: sdf.selectExpr("stringColumn AS json").writeStream para sdf.select(to_json(...).as("json")).writeStream.
    • As alterações nas projeções com um esquema de saída diferente são condicionalmente permitidas: sdf.selectExpr("a").writeStream a sdf.selectExpr("b").writeStream é permitida somente se o coletor de saída permitir que o esquema mude de "a" para "b".
  • Alterações em operações com estado: algumas operações em consultas de streaming precisam manter dados de estado para atualizar continuamente o resultado. O Fluxo estruturado automaticamente verifica os dados de estado no armazenamento tolerante a falhas (por exemplo, DBFS, Armazenamento de BLOBs do Azure) e restaura-os após a reinicialização. No entanto, isso pressupõe que o esquema dos dados de estado permaneça o mesmo entre as reinicializações. Isso significa que quaisquer alterações (isto é, adições, exclusões ou modificações de esquema) para as operações com estado de uma consulta de streaming não são permitidas entre as reinicializações. Aqui está a lista de operações com estado cujo esquema não deve ser alterado entre as reinicializações para garantir a recuperação de estado:
    • Agregação de streaming: por exemplo, sdf.groupBy("a").agg(...). Qualquer alteração no número ou tipo de chaves ou agregações de agrupamento não é permitida.
    • Eliminação de duplicação de streaming: por exemplo, sdf.dropDuplicates("a"). Qualquer alteração no número ou tipo de chaves ou agregações de agrupamento não é permitida.
    • Junção de fluxo-fluxo: por exemplo, sdf1.join(sdf2, ...) (ou seja, ambas as entradas são geradas com sparkSession.readStream ). As alterações no esquema ou nas colunas de junção de equivalência não são permitidas. Não são permitidas alterações no tipo de junção (externa ou interna). Outras alterações na condição de junção são mal definidas.
    • Operação com estado arbitrário: por exemplo, sdf.groupByKey(...).mapGroupsWithState(...) ou sdf.groupByKey(...).flatMapGroupsWithState(...). Qualquer alteração no esquema do estado definido pelo usuário e no tipo de tempo limite não é permitida. Qualquer alteração na função de mapeamento de estado definida pelo usuário é permitida, mas o efeito semântico da alteração depende da lógica definida pelo usuário. Se você realmente quiser dar suporte a alterações de esquema de estado, poderá codificar/decodificar explicitamente suas estruturas de dados de estado complexo em bytes usando um esquema de codificação/decodificação que dá suporte à migração de esquema. Por exemplo, se você salvar seu estado como bytes codificados em Avro, será possível alterar o esquema de estado Avro entre as reinicializações de consulta, pois isso restaura o estado binário.