Écrire dans des récepteurs de données arbitraires

Les API de diffusion structurée offrent deux moyens d’écrire la sortie d’une requête de streaming dans des sources de données qui n’ont pas de récepteur de streaming existant : foreachBatch() et foreach() .

Réutiliser les sources de données batch existantes avec foreachBatch()

streamingDF.writeStream.foreachBatch(...) vous permet de spécifier une fonction qui est exécutée sur les données de sortie de chaque micro-lot de la requête de streaming. Il accepte deux paramètres : un tableau ou un DataSet qui contient les données de sortie d’un micro-lot et l’ID unique du micro-lot. Avec foreachBatch, vous pouvez :

Réutiliser les sources de données de lot existantes

Pour de nombreux systèmes de stockage, il est possible qu’il n’y ait pas encore de récepteur de diffusion en continu, mais il existe peut-être déjà un enregistreur de données pour les requêtes par lot. À l’aide de foreachBatch() , vous pouvez utiliser les enregistreurs de données batch sur la sortie de chaque micro-lot. Voici quelques exemples :

De nombreuses autres sources de données batch peuvent être utilisées à partir de foreachBatch() .

Écrire dans plusieurs emplacements

Si vous souhaitez écrire la sortie d’une requête de diffusion en continu dans plusieurs emplacements, vous pouvez simplement écrire plusieurs fois le tableau/jeu de données de sortie. Toutefois, chaque tentative d’écriture peut entraîner le recalcul des données de sortie (y compris la relecture possible des données d’entrée). Pour éviter les recalculs, vous devez mettre en cache le tableau/jeu de données de sortie, l’écrire dans plusieurs emplacements, puis le décacher. Voici un plan.

streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) =>
  batchDF.persist()
  batchDF.write.format(...).save(...)  // location 1
  batchDF.write.format(...).save(...)  // location 2
  batchDF.unpersist()
}

Notes

Si vous exécutez plusieurs travaux Spark sur le batchDF , le débit de données d’entrée de la requête de streaming (signalé StreamingQueryProgress et visible dans le graphique de taux de bloc-notes) peut être signalé comme un multiple de la vitesse réelle à laquelle les données sont générées à la source. Cela est dû au fait que les données d’entrée peuvent être lues plusieurs fois dans les travaux Spark multiples par lot.

Appliquer des opérations tableau supplémentaires

De nombreuses opérations tableau et DataSet ne sont pas prises en charge dans la diffusion en continu trames, car Spark ne prend pas en charge la génération de plans incrémentiels dans ces cas-là. À l’aide foreachBatch() de, vous pouvez appliquer certaines de ces opérations sur chaque sortie micro-lot. par exemple, vous pouvez utiliser foreachBath() et l' MERGE INTO opération SQL pour écrire la sortie des agrégations de streaming dans une table Delta en mode de mise à jour. Pour plus d’informations, consultez fusionnerdans.

Important

  • foreachBatch() fournit uniquement des garanties d’écriture au moins une fois. Toutefois, vous pouvez utiliser le batchId fourni à la fonction en tant que méthode pour dédupliquer la sortie et recevoir une garantie exactement une fois. Dans les deux cas, vous devez vous faire une explication de la sémantique de bout en bout.
  • foreachBatch() ne fonctionne pas avec le mode de traitement continu , car il repose fondamentalement sur l’exécution du micro-lot d’une requête de streaming. Si vous écrivez des données en mode continu, utilisez à la foreach() place.

Écrire à n’importe quel emplacement à l’aide de foreach()

Si foreachBatch() n’est pas une option (par exemple, si vous utilisez Databricks Runtime inférieur à 4,2, ou si le writer de données batch correspondant n’existe pas), vous pouvez exprimer votre logique de writer personnalisé à l’aide de foreach() . Plus précisément, vous pouvez exprimer la logique d’écriture des données en la divisant en trois méthodes : open() , process() et close() .

Utilisation de Scala ou de Java

Dans Scala ou Java, vous étendez la classe ForeachWriter:

datasetOfString.writeStream.foreach(
  new ForeachWriter[String] {

    def open(partitionId: Long, version: Long): Boolean = {
      // Open connection
    }

    def process(record: String) = {
      // Write string to connection
    }

    def close(errorOrNull: Throwable): Unit = {
      // Close the connection
    }
  }
).start()

Utilisation de Python

Dans Python, vous pouvez appeler foreach de deux manières : dans une fonction ou dans un objet. La fonction offre un moyen simple d’exprimer votre logique de traitement, mais elle ne vous permet pas de dédupliquer des données générées lorsque des échecs entraînent un retraitement de certaines données d’entrée. Pour cette situation, vous devez spécifier la logique de traitement dans un objet.

  • La fonction prend une ligne comme entrée.

    def processRow(row):
      // Write row to storage
    
    query = streamingDF.writeStream.foreach(processRow).start()
    
  • L' objet possède une process méthode et des open méthodes et facultatives close :

    class ForeachWriter:
      def open(self, partition_id, epoch_id):
          // Open connection. This method is optional in Python.
    
      def process(self, row):
          // Write row to connection. This method is not optional in Python.
    
      def close(self, error):
          // Close the connection. This method is optional in Python.
    
    query = streamingDF.writeStream.foreach(ForeachWriter()).start()
    

Sémantique d’exécution

Lors du démarrage de la requête de streaming, Spark appelle la fonction ou les méthodes de l’objet de la façon suivante :

  • Une seule copie de cet objet est responsable de toutes les données générées par une tâche unique dans une requête. En d’autres termes, une instance est responsable du traitement d’une partition des données générées de façon distribuée.

  • Cet objet doit être sérialisable, car chaque tâche obtient une copie sérialisée et désérialisée actualisée de l’objet fourni. Par conséquent, il est fortement recommandé d’effectuer toute initialisation pour l’écriture de données (par exemple, l’ouverture d’une connexion ou le démarrage d’une transaction) après avoir appelé la open() méthode, ce qui signifie que la tâche est prête à générer des données.

  • Le cycle de vie des méthodes est le suivant :

    Pour chaque partition avec partition_id :

    Pour chaque lot/époque de diffusion de données en continu avec epoch_id :

    La méthode open(partitionId, epochId) est appelée.

    Si open(...) retourne la valeur true, pour chaque ligne de la partition et le lot/l’époque, la méthode process(row) est appelée.

    La méthode close(error) est appelée avec une erreur (le cas échéant) rencontrée lors du traitement des lignes.

  • La close() méthode (si elle existe) est appelée si une open() méthode existe et qu’elle est correctement retournée (quelle que soit la valeur de retour), sauf si le processus JVM ou python se bloque au milieu.

Notes

partitionIdEt epochId dans la open() méthode peuvent être utilisés pour dédupliquer des données générées lorsque des échecs entraînent un retraitement de certaines données d’entrée. Cela dépend du mode d’exécution de la requête. Si la requête de streaming est en cours d’exécution en mode micro-lot, chaque partition représentée par un tuple unique (partition_id, epoch_id) est garantie d’avoir les mêmes données. Par conséquent, (partition_id, epoch_id) peut être utilisé pour dédupliquer et/ou valider de manière transactionnelle des données et obtenir des garanties exactement une fois. Toutefois, si la requête de streaming est en cours d’exécution en mode continu, cette garantie ne peut pas être conservée et ne doit donc pas être utilisée pour la déduplication.