Oefening: Gedeelde en huidige gegevensset samenvoegen en transformeren met toewijzingsstroom in Azure Data Factory
Gegevens transformeren met toewijzingsgegevensstroom
Nu u gegevens naar Azure Data Lake Storage hebt gekopieerd, is het tijd om die gegevens samen te voegen en te aggregeren in een datawarehouse.
We gebruiken de toewijzingsgegevensstroom, de visueel ontworpen transformatieservice van Azure Data Factory. Met toewijzingsgegevensstromen kunnen gebruikers transformatielogica codevrij ontwikkelen en uitvoeren op Spark-clusters die worden beheerd door de ADF-service.
De gegevensstroom die in deze stap is gemaakt, voegt de gegevensset TripDataCSV die in de vorige sectie is gemaakt samen met het bestand TripFares.csv dat wordt gedeeld via Azure Data Share. gebaseerd op vier sleutelkolommen. Vervolgens worden de gegevens samengevoegd op basis van kolom payment_type om het gemiddelde van bepaalde velden te berekenen en geschreven in een Azure Synapse Analytics-tabel.
Een gegevensstroomactiviteit toevoegen aan uw pijplijn
Open in het deelvenster Activiteiten van het pijplijncanvas de accordion Verplaatsen en transformeren en sleep de activiteit Gegevensstroom naar het canvas.
Selecteer in het zijdeelvenster dat wordt geopend Nieuwe gegevensstroom maken en kies Toewijzingsgegevensstroom.
Klik op OK.
U wordt omgeleid naar het gegevensstroomcanvas waar u uw transformatielogica gaat bouwen.
Geef op het tabblad Algemeen uw gegevensstroom de naam 'JoinAndAggregateData'.
De CSV-bron van uw reisgegevens configureren
Het eerste wat u moet doen is uw twee brontransformaties configureren. De eerste bron wijst naar de DelimitedText-gegevensset 'TripDataCSV'. Als u een brontransformatie wilt toevoegen, klikt u op het vak Bron toevoegen op het canvas.
Geef de bron de naam 'TripDataCSV' en selecteer de gegevensset 'TripDataCSV' in de vervolgkeuzelijst Bron.
Omdat trip-data.csv nu bestaat, klikt u op Openen om naar het tabblad Instellingen van de gegevensset te gaan.
Ga naar het tabblad Schema en klik op Schema importeren.
Selecteer Uit verbinding/archief om rechtstreeks vanuit het bestandsarchief te importeren.
Als u From connection/store selecteert, worden de kolommen als volgt weergegeven:
Ga terug naar de gegevensstroom 'JoinAndAggregateData'.
Als uw cluster voor foutopsporing is gestart (aangegeven door een groene cirkel naast de schuifregelaar voor foutopsporing), kunt u een momentopname van de gegevens ophalen op het tabblad Gegevenspreview.
Klik op Vernieuwen om een voorbeeld van de gegevens op te halen.
Wanneer u het tabblad Gegevensvoorbeeld selecteert en op Vernieuwen klikt, worden de gegevens opgehaald zolang uw foutopsporingscluster groen is, maar gegevensvoorbeeld schrijft geen gegevens.
Configureer uw csv-bron voor reistarieven:
De tweede bron die u toevoegt, verwijst naar het CSV-bestand TripFares.csv dat is gedeeld tijdens de Azure Data Share-oefening.
Onder de bron 'TripDataCSV' bevindt zich nog een vak Bron toevoegen.
Klik hierop om een nieuwe brontransformatie toe te voegen.
Noem deze bron 'TripFaresCSV'.
Klik op Nieuw naast het veld brongegevensset om een nieuwe ADLS Gen2-gegevensset te maken.
Selecteer de tegel Azure Data Lake Storage Gen2 en klik op Doorgaan.
Het kan zijn dat veel van de connectors in data factory niet worden ondersteund in de toewijzingsgegevensstroom. Als u gegevens uit een van deze bronnen wilt transformeren, neemt u deze op in een ondersteunde bron met behulp van de kopieeractiviteit.
Selecteer in het deelvenster Indeling selecteren DelimitedText terwijl u leest vanuit een CSV-bestand. Klik op Doorgaan.
Geef uw gegevensset de naam TripFaresCSV.
Selecteer 'ADLSGen2' als uw gekoppelde service.
Stel het bestandspad in op taxidata/TripFares.csv. In de Azure Data Share-oefening hebt u deze TripFares.csv genoemd in container 'taxidata'.
Stel de eerste rij in op true als de invoergegevens kopteksten bevatten.
Importeer het schema Uit verbinding/archief.
Als u klaar bent, klikt u op OK.
Als u wilt controleren of uw bron correct is geconfigureerd, haalt u een voorbeeld van gegevens op op het tabblad Gegevensvoorbeeld.
Inner join TripDataCSV en TripFaresSQL
Als u een nieuwe transformatie wilt toevoegen, klikt u op het pluspictogram in de rechterbenedenhoek van 'TripDataCSV'.
Onder Meerdere invoeren/uitvoeren selecteert u Samenvoegen.
Geef uw join-transformatie de naam 'InnerJoinWithTripFares'.
Selecteer 'TripFaresCSV' in de juiste stroomvervolgkeuzelijst.
Selecteer Inner als join-type.
Selecteer de kolommen die u wilt vergelijken vanuit elke stroom via de vervolgkeuzelijst Join-voorwaarden.
Als u een extra join-voorwaarde wilt toevoegen, klikt u op het pluspictogram naast een bestaande voorwaarde.
Standaard worden alle join-voorwaarden gecombineerd met een AND-operator, wat betekent dat aan alle voorwaarden moet worden voldaan voor een overeenkomst.
We willen matchen op kolommen medailles, hack_license, vendor_id en pickup_datetime
Controleer of u samen met Data Preview 25 kolommen hebt gekoppeld.
Aggregatie per payment_type
Nadat u een jointransformatie hebt voltooid, voegt u een geaggregeerde transformatie toe door op het pluspictogram naast 'InnerJoinWithTripFares' te klikken.
Kies Aggregaat onder Schema-modifier en roep de aggregate aan: 'AggregateByPaymentType'
Geef uw geaggregeerde transformatie de naam 'AggregateByPaymentType'. Selecteer payment_type als de kolom groeperen op.
Ga naar het tabblad Aggregaties. Hier geeft u twee aggregaties op:
* The average fare grouped by payment type
* The total trip distance grouped by payment type
Maak de gemiddelde tariefexpressie.
Voer 'average_fare' in het tekstvak Een kolom toevoegen of selecteren in.
Als u een aggregatie-expressie wilt invoeren, klikt u op de markering naast het blauwe vak met het label Enter-expressie.
Hiermee opent u de opbouwfunctie voor de gegevensstroomexpressie, een hulpprogramma dat wordt gebruikt om gegevensstroomexpressies visueel te maken met behulp van invoerschema's, ingebouwde functies en bewerkingen, en door de gebruiker gedefinieerde parameters. Zie de Documentatie voor opbouwfunctie van expressies voor meer informatie over de mogelijkheden van de opbouwfunctie voor expressies.
Als u het gemiddelde tarief wilt ophalen, gebruikt u de aggregatiefunctie avg() om de total_amount kolom te aggregeren naar een geheel getal met toInteger(). In de expressietaal voor gegevensstromen wordt dit gedefinieerd als avg(toInteger(total_amount)).
Klik op Opslaan en voltooien wanneer u klaar bent.
Als u een extra aggregatie-expressie wilt toevoegen, klikt u op het pluspictogram naast average_fare.
Selecteer Kolom toevoegen.
Voer 'total_trip_distance' in het tekstvak Een kolom toevoegen of selecteren in. Open zoals in de laatste stap de opbouwfunctie voor expressies om de expressie in te voeren.
Als u de totale reisafstand wilt ophalen, gebruikt u de aggregatiefunctie sum() om de trip_distance kolom te aggregeren naar een geheel getal met toInteger().
In de expressietaal voor gegevensstromen wordt dit gedefinieerd als sum(toInteger(trip_distance)).
Klik op Opslaan en voltooien.
Test uw transformatielogica op het tabblad Gegevenspreview.