Oefening: Gedeelde en huidige gegevensset samenvoegen en transformeren met toewijzingsstroom in Azure Data Factory

Voltooid

Gegevens transformeren met toewijzingsgegevensstroom

Nu u gegevens naar Azure Data Lake Storage hebt gekopieerd, is het tijd om die gegevens samen te voegen en te aggregeren in een datawarehouse.

We gebruiken de toewijzingsgegevensstroom, de visueel ontworpen transformatieservice van Azure Data Factory. Met toewijzingsgegevensstromen kunnen gebruikers transformatielogica codevrij ontwikkelen en uitvoeren op Spark-clusters die worden beheerd door de ADF-service.

De gegevensstroom die in deze stap is gemaakt, voegt de gegevensset TripDataCSV die in de vorige sectie is gemaakt samen met het bestand TripFares.csv dat wordt gedeeld via Azure Data Share. gebaseerd op vier sleutelkolommen. Vervolgens worden de gegevens samengevoegd op basis van kolom payment_type om het gemiddelde van bepaalde velden te berekenen en geschreven in een Azure Synapse Analytics-tabel.

Een gegevensstroomactiviteit toevoegen aan uw pijplijn

Open in het deelvenster Activiteiten van het pijplijncanvas de accordion Verplaatsen en transformeren en sleep de activiteit Gegevensstroom naar het canvas.

Move and Transform

Selecteer in het zijdeelvenster dat wordt geopend Nieuwe gegevensstroom maken en kies Toewijzingsgegevensstroom.

Klik op OK.

Create new DataFlow

U wordt omgeleid naar het gegevensstroomcanvas waar u uw transformatielogica gaat bouwen.

Geef op het tabblad Algemeen uw gegevensstroom de naam 'JoinAndAggregateData'.

Properties new DataFlow

De CSV-bron van uw reisgegevens configureren

Het eerste wat u moet doen is uw twee brontransformaties configureren. De eerste bron wijst naar de DelimitedText-gegevensset 'TripDataCSV'. Als u een brontransformatie wilt toevoegen, klikt u op het vak Bron toevoegen op het canvas.

Add Source to Dataflow

Geef de bron de naam 'TripDataCSV' en selecteer de gegevensset 'TripDataCSV' in de vervolgkeuzelijst Bron.

Omdat trip-data.csv nu bestaat, klikt u op Openen om naar het tabblad Instellingen van de gegevensset te gaan.

Source Settings of Dataflow

Ga naar het tabblad Schema en klik op Schema importeren.

Selecteer Uit verbinding/archief om rechtstreeks vanuit het bestandsarchief te importeren.

Schema definition source Dataflow

Als u From connection/store selecteert, worden de kolommen als volgt weergegeven:

Schema definition from Connection

Ga terug naar de gegevensstroom 'JoinAndAggregateData'.

Als uw cluster voor foutopsporing is gestart (aangegeven door een groene cirkel naast de schuifregelaar voor foutopsporing), kunt u een momentopname van de gegevens ophalen op het tabblad Gegevenspreview.

Klik op Vernieuwen om een voorbeeld van de gegevens op te halen.

Source Settings after Schema Mapping Dataflow

Wanneer u het tabblad Gegevensvoorbeeld selecteert en op Vernieuwen klikt, worden de gegevens opgehaald zolang uw foutopsporingscluster groen is, maar gegevensvoorbeeld schrijft geen gegevens.

Data Preview Fetching Data

Configureer uw csv-bron voor reistarieven:

De tweede bron die u toevoegt, verwijst naar het CSV-bestand TripFares.csv dat is gedeeld tijdens de Azure Data Share-oefening.

Onder de bron 'TripDataCSV' bevindt zich nog een vak Bron toevoegen.

Klik hierop om een nieuwe brontransformatie toe te voegen.

Add Source Transformation

Noem deze bron 'TripFaresCSV'.

Klik op Nieuw naast het veld brongegevensset om een nieuwe ADLS Gen2-gegevensset te maken.

New Source creation for Transformation

Selecteer de tegel Azure Data Lake Storage Gen2 en klik op Doorgaan.

Het kan zijn dat veel van de connectors in data factory niet worden ondersteund in de toewijzingsgegevensstroom. Als u gegevens uit een van deze bronnen wilt transformeren, neemt u deze op in een ondersteunde bron met behulp van de kopieeractiviteit.

Select Azure Data Lake Storage Gen 2 for new Source Transformation

Selecteer in het deelvenster Indeling selecteren DelimitedText terwijl u leest vanuit een CSV-bestand. Klik op Doorgaan.

Select Delimited Text

Geef uw gegevensset de naam TripFaresCSV.

Selecteer 'ADLSGen2' als uw gekoppelde service.

Stel het bestandspad in op taxidata/TripFares.csv. In de Azure Data Share-oefening hebt u deze TripFares.csv genoemd in container 'taxidata'.

Stel de eerste rij in op true als de invoergegevens kopteksten bevatten.

Importeer het schema Uit verbinding/archief.

Als u klaar bent, klikt u op OK.

Set properties of File

Als u wilt controleren of uw bron correct is geconfigureerd, haalt u een voorbeeld van gegevens op op het tabblad Gegevensvoorbeeld.

Data Preview of created dataset

Inner join TripDataCSV en TripFaresSQL

Als u een nieuwe transformatie wilt toevoegen, klikt u op het pluspictogram in de rechterbenedenhoek van 'TripDataCSV'.

Onder Meerdere invoeren/uitvoeren selecteert u Samenvoegen.

Add Join Transformation

Geef uw join-transformatie de naam 'InnerJoinWithTripFares'.

Selecteer 'TripFaresCSV' in de juiste stroomvervolgkeuzelijst.

Selecteer Inner als join-type.

Selecteer de kolommen die u wilt vergelijken vanuit elke stroom via de vervolgkeuzelijst Join-voorwaarden.

Als u een extra join-voorwaarde wilt toevoegen, klikt u op het pluspictogram naast een bestaande voorwaarde.

Standaard worden alle join-voorwaarden gecombineerd met een AND-operator, wat betekent dat aan alle voorwaarden moet worden voldaan voor een overeenkomst.

We willen matchen op kolommen medailles, hack_license, vendor_id en pickup_datetime

Match Columns for Join Transformation

Controleer of u samen met Data Preview 25 kolommen hebt gekoppeld.

Data Preview for Join Transformation

Aggregatie per payment_type

Nadat u een jointransformatie hebt voltooid, voegt u een geaggregeerde transformatie toe door op het pluspictogram naast 'InnerJoinWithTripFares' te klikken.

Kies Aggregaat onder Schema-modifier en roep de aggregate aan: 'AggregateByPaymentType'

Add Aggregation for Transformation

Geef uw geaggregeerde transformatie de naam 'AggregateByPaymentType'. Selecteer payment_type als de kolom groeperen op.

Group by setting for Aggregation

Ga naar het tabblad Aggregaties. Hier geeft u twee aggregaties op:

* The average fare grouped by payment type

* The total trip distance grouped by payment type

Maak de gemiddelde tariefexpressie.

Voer 'average_fare' in het tekstvak Een kolom toevoegen of selecteren in.

Aggregates settings

Als u een aggregatie-expressie wilt invoeren, klikt u op de markering naast het blauwe vak met het label Enter-expressie.

Expression builder for Aggregations

Hiermee opent u de opbouwfunctie voor de gegevensstroomexpressie, een hulpprogramma dat wordt gebruikt om gegevensstroomexpressies visueel te maken met behulp van invoerschema's, ingebouwde functies en bewerkingen, en door de gebruiker gedefinieerde parameters. Zie de Documentatie voor opbouwfunctie van expressies voor meer informatie over de mogelijkheden van de opbouwfunctie voor expressies.

Als u het gemiddelde tarief wilt ophalen, gebruikt u de aggregatiefunctie avg() om de total_amount kolom te aggregeren naar een geheel getal met toInteger(). In de expressietaal voor gegevensstromen wordt dit gedefinieerd als avg(toInteger(total_amount)).

Klik op Opslaan en voltooien wanneer u klaar bent.

Visual Expression builder for Aggregations

Als u een extra aggregatie-expressie wilt toevoegen, klikt u op het pluspictogram naast average_fare.

Selecteer Kolom toevoegen.

Add a column for another expression

Voer 'total_trip_distance' in het tekstvak Een kolom toevoegen of selecteren in. Open zoals in de laatste stap de opbouwfunctie voor expressies om de expressie in te voeren.

Als u de totale reisafstand wilt ophalen, gebruikt u de aggregatiefunctie sum() om de trip_distance kolom te aggregeren naar een geheel getal met toInteger().

In de expressietaal voor gegevensstromen wordt dit gedefinieerd als sum(toInteger(trip_distance)).

Add expression to new column

Klik op Opslaan en voltooien.

Test uw transformatielogica op het tabblad Gegevenspreview.

Data Preview of Aggregation