Sinktransformatie in toewijzingsgegevensstroom

VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics

Tip

Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .

Gegevensstromen zijn beschikbaar in Zowel Azure Data Factory als Azure Synapse Pipelines. Dit artikel is van toepassing op toewijzingsgegevensstromen. Als u geen ervaring hebt met transformaties, raadpleegt u het inleidende artikel Gegevens transformeren met behulp van een toewijzingsgegevensstroom.

Nadat u klaar bent met het transformeren van uw gegevens, schrijft u deze naar een doelarchief met behulp van de sinktransformatie. Elke gegevensstroom vereist ten minste één sinktransformatie, maar u kunt naar zoveel sinks schrijven als nodig is om uw transformatiestroom te voltooien. Als u naar extra sinks wilt schrijven, maakt u nieuwe streams via nieuwe vertakkingen en voorwaardelijke splitsingen.

Elke sinktransformatie is gekoppeld aan precies één gegevenssetobject of gekoppelde service. De sinktransformatie bepaalt de shape en locatie van de gegevens waarnaar u wilt schrijven.

Inlinegegevenssets

Wanneer u een sinktransformatie maakt, kiest u of uw sinkgegevens zijn gedefinieerd in een gegevenssetobject of in de sinktransformatie. De meeste indelingen zijn beschikbaar in slechts één of de andere indelingen. Zie het juiste connectordocument voor meer informatie over het gebruik van een specifieke connector.

Wanneer een indeling wordt ondersteund voor zowel inline als in een gegevenssetobject, zijn er voordelen voor beide. Gegevenssetobjecten zijn herbruikbare entiteiten die kunnen worden gebruikt in andere gegevensstromen en activiteiten zoals Kopiëren. Deze herbruikbare entiteiten zijn vooral handig wanneer u een beperkt schema gebruikt. Gegevenssets zijn niet gebaseerd op Spark. Soms moet u bepaalde instellingen of schemaprojectie in de sinktransformatie overschrijven.

Inlinegegevenssets worden aanbevolen wanneer u flexibele schema's, eenmalige sink-exemplaren of geparameteriseerde sinks gebruikt. Als uw sink zwaar is geparameteriseerd, kunt u met inlinegegevenssets geen dummy-object maken. Inlinegegevenssets zijn gebaseerd in Spark en hun eigenschappen zijn systeemeigen voor de gegevensstroom.

Als u een inlinegegevensset wilt gebruiken, selecteert u de gewenste indeling in de Sink-typekiezer . In plaats van een sinkgegevensset te selecteren, selecteert u de gekoppelde service waarmee u verbinding wilt maken.

Screenshot that shows Inline selected.

Werkruimtedatabase (alleen Synapse-werkruimten)

Wanneer u gegevensstromen gebruikt in Azure Synapse-werkruimten, hebt u een extra optie om uw gegevens rechtstreeks naar een databasetype te sinken dat zich in uw Synapse-werkruimte bevindt. Hierdoor hoeft u geen gekoppelde services of gegevenssets toe te voegen voor deze databases. De databases die zijn gemaakt via de Azure Synapse-databasesjablonen , zijn ook toegankelijk wanneer u Werkruimtedatabase selecteert.

Notitie

De Azure Synapse Workspace DB-connector is momenteel beschikbaar als openbare preview en kan momenteel alleen werken met Spark Lake-databases

Screenshot that shows workspace db selected.

Ondersteunde sinktypen

Toewijzingsgegevensstroom volgt een ELT-benadering (EXTRAHEREN, laden en transformeren) en werkt met faseringsgegevenssets die zich allemaal in Azure bevinden. Op dit moment kunnen de volgende gegevenssets worden gebruikt in een sinktransformatie.

Connector Notatie Gegevensset/inline
Azure Blob-opslag Avro
Tekst met scheidingstekens
Delta
JSON
ORC
Parquet
✓/✓
✓/✓
-/✓
✓/✓
✓/✓
✓/✓
Azure Cosmos DB voor NoSQL ✓/-
Azure Data Lake Storage Gen1 Avro
Tekst met scheidingstekens
JSON
ORC
Parquet
✓/-
✓/-
✓/-
✓/✓
✓/-
Azure Data Lake Storage Gen2 Avro
Common Data Model
Tekst met scheidingstekens
Delta
JSON
ORC
Parquet
✓/✓
-/✓
✓/✓
-/✓
✓/✓
✓/✓
✓/✓
Azure Database for MySQL ✓/✓
Azure Database for PostgreSQL ✓/✓
Azure Data Explorer ✓/✓
Azure SQL-database ✓/✓
Azure SQL Managed Instance ✓/-
Azure Synapse Analytics ✓/-
Dataverse ✓/✓
Dynamics 365 ✓/✓
Dynamics CRM ✓/✓
Fabric Lakehouse ✓/✓
SFTP Avro
Tekst met scheidingstekens
JSON
ORC
Parquet
✓/✓
✓/✓
✓/✓
✓/✓
✓/✓
Snowflake ✓/✓
SQL Server ✓/✓

Instellingen specifiek voor deze connectors bevindt zich op het tabblad Instellingen. Voorbeelden van informatie- en gegevensstroomscripts op deze instellingen bevinden zich in de connectordocumentatie.

De service heeft toegang tot meer dan 90 systeemeigen connectors. Als u gegevens wilt schrijven naar die andere bronnen vanuit uw gegevensstroom, gebruikt u de kopieeractiviteit om die gegevens uit een ondersteunde sink te laden.

Sink-instellingen

Nadat u een sink hebt toegevoegd, configureert u deze via het tabblad Sink . Hier kunt u de gegevensset kiezen of maken waarnaar uw sink schrijft. Ontwikkelingswaarden voor gegevenssetparameters kunnen worden geconfigureerd in instellingen voor foutopsporing. (De foutopsporingsmodus moet zijn ingeschakeld.)

In de volgende video wordt een aantal verschillende sinkopties voor door tekst gescheiden bestandstypen uitgelegd.

Screenshot that shows Sink settings.

Schemadrift: Schemadrift is de mogelijkheid van de service om flexibele schema's in uw gegevensstromen systeemeigen af te handelen zonder dat u expliciet kolomwijzigingen hoeft te definiëren. Schakel schemadrift in om extra kolommen te schrijven boven op wat is gedefinieerd in het sinkgegevensschema.

Schema valideren: als het schema is geselecteerd, mislukt de gegevensstroom als een kolom in sinkprojectie niet wordt gevonden in het sinkarchief of als de gegevenstypen niet overeenkomen. Gebruik deze instelling om af te dwingen dat het sinkschema voldoet aan het contract van uw gedefinieerde projectie. Het is handig in database-sinkscenario's om aan te geven dat kolomnamen of -typen zijn gewijzigd.

Cache-sink

Een cache-sink is wanneer een gegevensstroom gegevens naar de Spark-cache schrijft in plaats van een gegevensarchief. In toewijzingsgegevensstromen kunt u vaak verwijzen naar deze gegevens binnen dezelfde stroom met behulp van een cachezoekactie. Dit is handig als u wilt verwijzen naar gegevens als onderdeel van een expressie, maar niet expliciet de kolommen eraan wilt koppelen. Veelvoorkomende voorbeelden waarbij een cache-sink kan helpen bij het opzoeken van een maximale waarde in een gegevensarchief en overeenkomende foutcodes voor een database met foutberichten.

Als u naar een cachesink wilt schrijven, voegt u een sinktransformatie toe en selecteert u Cache als sinktype. In tegenstelling tot andere sinktypen hoeft u geen gegevensset of gekoppelde service te selecteren omdat u niet naar een extern archief schrijft.

Select cache sink

In de sink-instellingen kunt u desgewenst de sleutelkolommen van de cache-sink opgeven. Deze worden gebruikt als overeenkomende voorwaarden bij het gebruik van de lookup() functie in een cachezoekactie. Als u sleutelkolommen opgeeft, kunt u de outputs() functie niet gebruiken in een cachezoekactie. Zie opzoeksyntaxis in de cache voor meer informatie over de opzoeksyntaxis in de cache.

Cache sink key columns

Als ik bijvoorbeeld een kolom met één sleutel opgeeft van column1 in een cachesink met de naam cacheExample, zou aanroepen cacheExample#lookup() één parameter hebben die aangeeft op welke rij in de cachesink moet worden vergeleken. De functie voert één complexe kolom met subkolommen uit voor elke kolom die is toegewezen.

Notitie

Een cache-sink moet zich in een volledig onafhankelijke gegevensstroom bevinden van elke transformatie die ernaar verwijst via een cachezoekactie. Een cache-sink moet ook de eerste sink zijn die is geschreven.

Uitvoer van schrijven naar activiteit De sink in de cache kan eventueel uw uitvoergegevens schrijven naar de invoer van de volgende pijplijnactiviteit. Hierdoor kunt u gegevens snel en eenvoudig doorgeven aan uw gegevensstroomactiviteit zonder dat u de gegevens in een gegevensarchief hoeft op te slaan.

Bijwerkingsmethode

Voor databasesinktypen bevat het tabblad Instellingen een eigenschap Updatemethode. De standaardinstelling is invoegen, maar bevat ook selectievakjeopties voor bijwerken, upsert en verwijderen. Als u deze extra opties wilt gebruiken, moet u vóór de sink een transformatie voor alter row toevoegen. Met de rij Wijzigen kunt u de voorwaarden voor elk van de databaseacties definiëren. Als uw bron een systeemeigen CDC-bron is, kunt u de updatemethoden instellen zonder een wijzigingsrij omdat ADF al op de hoogte is van de rijmarkeringen voor invoegen, bijwerken, upsert en verwijderen.

Veldtoewijzing

Net als bij een selectietransformatie kunt u op het tabblad Toewijzing van de sink bepalen welke binnenkomende kolommen worden geschreven. Standaard worden alle invoerkolommen, inclusief gedrifte kolommen, toegewezen. Dit gedrag wordt automatisch toewijzen genoemd.

Wanneer u automatisch toewijzen uitschakelt, kunt u vaste toewijzingen op basis van kolommen of toewijzingen op basis van regels toevoegen. Met toewijzingen op basis van regels kunt u expressies schrijven met patroonkoppeling. Er is een probleem opgelost met toewijzingstoewijzingen voor logische en fysieke kolomnamen. Zie Kolompatronen in de toewijzingsgegevensstroom voor meer informatie over toewijzing op basis van regels.

Aangepaste sink-rangschikking

Standaard worden gegevens naar meerdere sinks geschreven in een niet-deterministische volgorde. De uitvoeringsengine schrijft gegevens parallel wanneer de transformatielogica is voltooid en de sinkvolgorde kan per uitvoering variëren. Als u een exacte sinkvolgorde wilt opgeven, schakelt u Aangepaste sinkvolgorde in op het tabblad Algemeen van de gegevensstroom. Wanneer deze optie is ingeschakeld, worden sinks opeenvolgend geschreven in toenemende volgorde.

Screenshot that shows Custom sink ordering.

Notitie

Wanneer u opzoekacties in de cache gebruikt, moet u ervoor zorgen dat de sinkvolgorde in de cache is ingesteld op 1, de laagste (of eerste) volgorde.

Custom sink ordering

Sinkgroepen

U kunt sinks groeperen door hetzelfde ordernummer toe te passen voor een reeks sinks. De service behandelt deze sinks als groepen die parallel kunnen worden uitgevoerd. Opties voor parallelle uitvoering worden weergegeven in de activiteit van de pijplijngegevensstroom.

Fouten

Op het tabblad Sink-fouten kunt u foutrijafhandeling configureren om uitvoer vast te leggen en om te leiden voor fouten in databasestuurprogramma's en mislukte asserties.

Bij het schrijven naar databases kunnen bepaalde rijen met gegevens mislukken vanwege beperkingen die zijn ingesteld door de bestemming. Standaard mislukt een uitvoering van een gegevensstroom bij de eerste fout die deze krijgt. In bepaalde connectors kunt u ervoor kiezen om door te gaan op een fout waarmee uw gegevensstroom kan worden voltooid, zelfs als afzonderlijke rijen fouten hebben. Deze mogelijkheid is momenteel alleen beschikbaar in Azure SQL Database en Azure Synapse. Zie de verwerking van foutrijen in Azure SQL DB voor meer informatie.

Hieronder vindt u een videozelfstudie over het automatisch verwerken van databasefoutenrijen in uw sinktransformatie.

Voor assertiefoutrijen kunt u de transformatie Assert upstream in uw gegevensstroom gebruiken en mislukte asserties vervolgens omleiden naar een uitvoerbestand hier op het tabblad Sink-fouten. U hebt hier ook een optie om rijen met assertiefouten te negeren en deze rijen helemaal niet naar het sink-doelgegevensarchief uit te voeren.

Assert failure rows

Voorbeeld van gegevens in sink

Wanneer u een voorbeeld van gegevens ophaalt in de foutopsporingsmodus, worden er geen gegevens naar uw sink geschreven. Een momentopname van hoe de gegevens eruit zien, worden geretourneerd, maar er wordt niets naar uw bestemming geschreven. Als u het schrijven van gegevens in uw sink wilt testen, voert u een pijplijnopsporing uit vanaf het pijplijncanvas.

Script voor gegevensstroom

Opmerking

Hieronder ziet u een voorbeeld van een sinktransformatie en het bijbehorende gegevensstroomscript:

sink(input(
		movie as integer,
		title as string,
		genres as string,
		year as integer,
		Rating as integer
	),
	allowSchemaDrift: true,
	validateSchema: false,
	deletable:false,
	insertable:false,
	updateable:true,
	upsertable:false,
	keys:['movie'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true,
	saveOrder: 1,
	errorHandlingOption: 'stopOnFirstError') ~> sink1

Nu u de gegevensstroom hebt gemaakt, voegt u een gegevensstroomactiviteit toe aan uw pijplijn.