Delen via


Gegevens migreren van Amazon S3 naar Azure Data Lake Storage Gen2

VAN TOEPASSING OP: Azure Data Factory Azure Synapse Analytics

Tip

Probeer Data Factory uit in Microsoft Fabric, een alles-in-één analyseoplossing voor ondernemingen. Microsoft Fabric omvat alles, van gegevensverplaatsing tot gegevenswetenschap, realtime analyses, business intelligence en rapportage. Meer informatie over het gratis starten van een nieuwe proefversie .

Gebruik de sjablonen om petabytes aan gegevens te migreren die bestaan uit honderden miljoenen bestanden van Amazon S3 naar Azure Data Lake Storage Gen2.

Notitie

Als u een klein gegevensvolume van AWS S3 naar Azure wilt kopiëren (bijvoorbeeld minder dan 10 TB), is het efficiënter en eenvoudiger om het hulpprogramma Azure Data Factory Copy Data te gebruiken. De sjabloon die in dit artikel wordt beschreven, is meer dan wat u nodig hebt.

Over de oplossingssjablonen

Gegevenspartitie wordt aanbevolen, met name bij het migreren van meer dan 10 TB aan gegevens. Als u de gegevens wilt partitioneren, gebruikt u de instelling voorvoegsel om de mappen en bestanden op Amazon S3 op naam te filteren. Vervolgens kan elke ADF-kopieertaak één partitie tegelijk kopiëren. U kunt meerdere ADF-kopieertaken gelijktijdig uitvoeren voor betere doorvoer.

Voor gegevensmigratie is normaal gesproken eenmalige historische gegevensmigratie vereist, plus het periodiek synchroniseren van de wijzigingen van AWS S3 naar Azure. Hieronder ziet u twee sjablonen, waarbij één sjabloon betrekking heeft op eenmalige historische gegevensmigratie en een andere sjabloon het synchroniseren van de wijzigingen van AWS S3 naar Azure.

Voor het migreren van historische gegevens van Amazon S3 naar Azure Data Lake Storage Gen2 voor de sjabloon

Deze sjabloon (sjabloonnaam: historische gegevens migreren van AWS S3 naar Azure Data Lake Storage Gen2) gaat ervan uit dat u een partitielijst hebt geschreven in een externe besturingstabel in Azure SQL Database. Er wordt dus een opzoekactiviteit gebruikt om de partitielijst op te halen uit de tabel met externe besturingselementen, elke partitie te herhalen en elke ADF-kopieertaak één partitie tegelijk te maken. Zodra een kopieertaak is voltooid, wordt de activiteit Opgeslagen procedure gebruikt om de status van het kopiëren van elke partitie in de besturingstabel bij te werken.

De sjabloon bevat vijf activiteiten:

  • Met opzoeken worden de partities opgehaald die niet zijn gekopieerd naar Azure Data Lake Storage Gen2 uit een externe besturingstabel. De tabelnaam is s3_partition_control_table en de query voor het laden van gegevens uit de tabel is SELECT PartitionPrefix FROM s3_partition_control_table WHERE SuccessOrFailure = 0.
  • ForEach haalt de partitielijst op uit de opzoekactiviteit en doorloopt elke partitie naar de TriggerCopy-activiteit . U kunt de batchCount zo instellen dat meerdere ADF-kopieertaken gelijktijdig worden uitgevoerd. We hebben 2 in deze sjabloon ingesteld.
  • ExecutePipeline voert de pijplijn CopyFolderPartitionFromS3 uit . De reden waarom we een andere pijplijn maken om elke kopieertaak een partitie te kopiëren, is omdat u de mislukte kopieertaak eenvoudig opnieuw kunt uitvoeren om die specifieke partitie opnieuw te laden vanuit AWS S3. Alle andere kopieertaken die andere partities laden, worden niet beïnvloed.
  • Kopieert elke partitie van AWS S3 naar Azure Data Lake Storage Gen2.
  • SqlServerStoredProcedure werkt de status van het kopiëren van elke partitie in de besturingstabel bij.

De sjabloon bevat twee parameters:

  • AWS_S3_bucketName is de naam van uw bucket in AWS S3 waar u gegevens wilt migreren. Als u gegevens uit meerdere buckets op AWS S3 wilt migreren, kunt u nog één kolom toevoegen in de tabel met externe besturingselementen om de naam van de bucket voor elke partitie op te slaan en uw pijplijn ook bijwerken om gegevens uit die kolom op te halen.
  • Azure_Storage_fileSystem is de naam van uw bestandssysteem in Azure Data Lake Storage Gen2 waarnaar u gegevens wilt migreren.

De sjabloon kan alleen gewijzigde bestanden van Amazon S3 kopiëren naar Azure Data Lake Storage Gen2

Deze sjabloon (sjabloonnaam: deltagegevens kopiëren van AWS S3 naar Azure Data Lake Storage Gen2) maakt gebruik van LastModifiedTime van elk bestand om de nieuwe of bijgewerkte bestanden alleen van AWS S3 naar Azure te kopiëren. Houd er rekening mee dat uw bestanden of mappen al tijd zijn gepartitioneerd met tijdslicentiegegevens als onderdeel van de naam van het bestand of de map op AWS S3 (bijvoorbeeld /jjjj/mm/dd/file.csv), kunt u naar deze zelfstudie gaan om de beter presterende benadering te krijgen voor het incrementeel laden van nieuwe bestanden. In deze sjabloon wordt ervan uitgegaan dat u een partitielijst hebt geschreven in een tabel met externe besturingselementen in Azure SQL Database. Er wordt dus een opzoekactiviteit gebruikt om de partitielijst op te halen uit de tabel met externe besturingselementen, elke partitie te herhalen en elke ADF-kopieertaak één partitie tegelijk te maken. Wanneer elke kopieertaak begint met het kopiëren van de bestanden van AWS S3, is deze afhankelijk van de eigenschap LastModifiedTime om de nieuwe of bijgewerkte bestanden alleen te identificeren en te kopiëren. Zodra een kopieertaak is voltooid, wordt de activiteit Opgeslagen procedure gebruikt om de status van het kopiëren van elke partitie in de besturingstabel bij te werken.

De sjabloon bevat zeven activiteiten:

  • Met opzoeken worden de partities opgehaald uit een externe besturingstabel. De tabelnaam is s3_partition_delta_control_table en de query voor het laden van gegevens uit de tabel is 'select distinct PartitionPrefix from s3_partition_delta_control_table'.
  • ForEach haalt de partitielijst op uit de opzoekactiviteit en doorloopt elke partitie naar de TriggerDeltaCopy-activiteit . U kunt de batchCount zo instellen dat meerdere ADF-kopieertaken gelijktijdig worden uitgevoerd. We hebben 2 in deze sjabloon ingesteld.
  • ExecutePipeline voert de DeltaCopyFolderPartitionFromS3-pijplijn uit. De reden waarom we een andere pijplijn maken om elke kopieertaak een partitie te kopiëren, is omdat u de mislukte kopieertaak eenvoudig opnieuw kunt uitvoeren om die specifieke partitie opnieuw te laden vanuit AWS S3. Alle andere kopieertaken die andere partities laden, worden niet beïnvloed.
  • Met Opzoeken wordt de laatste uitvoeringstijd van de kopieertaak opgehaald uit de tabel met externe besturingselementen, zodat de nieuwe of bijgewerkte bestanden kunnen worden geïdentificeerd via LastModifiedTime. De tabelnaam is s3_partition_delta_control_table en de query voor het laden van gegevens uit de tabel is 'select max(JobRunTime) as LastModifiedTime uit s3_partition_delta_control_table where PartitionPrefix = '@{pipeline().parameters.prefixStr}' en SuccessOrFailure = 1'.
  • Kopieer nieuwe of gewijzigde bestanden alleen voor elke partitie van AWS S3 naar Azure Data Lake Storage Gen2. De eigenschap van modifiedDatetimeStart is ingesteld op de laatste uitvoeringstijd van de kopieertaak. De eigenschap van modifiedDatetimeEnd is ingesteld op de huidige uitvoeringstijd van de kopieertaak. Houd er rekening mee dat de tijd wordt toegepast op de UTC-tijdzone.
  • SqlServerStoredProcedure werkt de status bij van het kopiëren van elke partitie en het kopiëren van runtime in de besturingstabel wanneer deze slaagt. De kolom SuccessOrFailure is ingesteld op 1.
  • SqlServerStoredProcedure werkt de status bij van het kopiëren van elke partitie en het kopiëren van runtime in de besturingstabel wanneer deze mislukt. De kolom SuccessOrFailure is ingesteld op 0.

De sjabloon bevat twee parameters:

  • AWS_S3_bucketName is de naam van uw bucket in AWS S3 waar u gegevens wilt migreren. Als u gegevens uit meerdere buckets op AWS S3 wilt migreren, kunt u nog één kolom toevoegen in de tabel met externe besturingselementen om de naam van de bucket voor elke partitie op te slaan en uw pijplijn ook bijwerken om gegevens uit die kolom op te halen.
  • Azure_Storage_fileSystem is de naam van uw bestandssysteem in Azure Data Lake Storage Gen2 waarnaar u gegevens wilt migreren.

Deze twee oplossingssjablonen gebruiken

Voor het migreren van historische gegevens van Amazon S3 naar Azure Data Lake Storage Gen2 voor de sjabloon

  1. Maak een besturingstabel in Azure SQL Database om de partitielijst van AWS S3 op te slaan.

    Notitie

    De tabelnaam is s3_partition_control_table. Het schema van de besturingstabel is PartitionPrefix en SuccessOrFailure, waarbij PartitionPrefix de instelling voor het voorvoegsel is in S3 om de mappen en bestanden in Amazon S3 op naam te filteren, en SuccessOrFailure is de status van het kopiëren van elke partitie: 0 betekent dat deze partitie niet naar Azure is gekopieerd en 1 betekent dat deze partitie is gekopieerd naar Azure. Er zijn vijf partities gedefinieerd in de besturingstabel en de standaardstatus van het kopiëren van elke partitie is 0.

    CREATE TABLE [dbo].[s3_partition_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [SuccessOrFailure] [bit] NULL
    )
    
    INSERT INTO s3_partition_control_table (PartitionPrefix, SuccessOrFailure)
    VALUES
    ('a', 0),
    ('b', 0),
    ('c', 0),
    ('d', 0),
    ('e', 0);
    
  2. Maak een opgeslagen procedure in dezelfde Azure SQL Database voor besturingstabel.

    Notitie

    De naam van de opgeslagen procedure is sp_update_partition_success. Deze wordt aangeroepen door de sqlServerStoredProcedure-activiteit in uw ADF-pijplijn.

    CREATE PROCEDURE [dbo].[sp_update_partition_success] @PartPrefix varchar(255)
    AS
    BEGIN
    
        UPDATE s3_partition_control_table
        SET [SuccessOrFailure] = 1 WHERE [PartitionPrefix] = @PartPrefix
    END
    GO
    
  3. Ga naar de sjabloon Historische gegevens migreren van AWS S3 naar Azure Data Lake Storage Gen2 . Voer de verbindingen met uw externe beheertabel, AWS S3 in als het gegevensbronarchief en Azure Data Lake Storage Gen2 als doelarchief. Houd er rekening mee dat de tabel met externe besturingselementen en de opgeslagen procedure verwijzen naar dezelfde verbinding.

    Screenshot that shows the Migrate historical data from AWS S3 to Azure Data Lake Storage Gen2 template.

  4. Selecteer Deze sjabloon gebruiken.

    Screenshot that highlights the Use this template button.

  5. U ziet dat de 2 pijplijnen en 3 gegevenssets zijn gemaakt, zoals wordt weergegeven in het volgende voorbeeld:

    Screenshot that shows the two pipelines and three datasets that were created by using the template.

  6. Ga naar de pijplijn BulkCopyFromS3 en selecteer Debug, voer de parameters in. Selecteer Vervolgens Voltooien.

    Screenshot that shows where to select Debug and enter the parameters before you select Finish.

  7. U ziet resultaten die vergelijkbaar zijn met het volgende voorbeeld:

    Screenshot that shows the returned results.

De sjabloon kan alleen gewijzigde bestanden van Amazon S3 kopiëren naar Azure Data Lake Storage Gen2

  1. Maak een besturingstabel in Azure SQL Database om de partitielijst van AWS S3 op te slaan.

    Notitie

    De tabelnaam is s3_partition_delta_control_table. Het schema van de besturingstabel is PartitionPrefix, JobRunTime en SuccessOrFailure, waarbij PartitionPrefix de instelling voor het voorvoegsel is in S3 om de mappen en bestanden in Amazon S3 op naam te filteren, JobRunTime is de datum/tijd-waarde wanneer kopieertaken worden uitgevoerd en SuccessOrFailure de status is van het kopiëren van elke partitie: 0 betekent dat deze partitie niet naar Azure is gekopieerd en 1 betekent dat deze partitie is gekopieerd naar Azure. Er zijn vijf partities gedefinieerd in de besturingstabel. De standaardwaarde voor JobRunTime kan het tijdstip zijn waarop eenmalige historische gegevensmigratie wordt gestart. De ADF-kopieeractiviteit kopieert de bestanden op AWS S3 die na die tijd voor het laatst zijn gewijzigd. De standaardstatus van het kopiëren van elke partitie is 1.

    CREATE TABLE [dbo].[s3_partition_delta_control_table](
        [PartitionPrefix] [varchar](255) NULL,
        [JobRunTime] [datetime] NULL,
        [SuccessOrFailure] [bit] NULL
        )
    
    INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
    VALUES
    ('a','1/1/2019 12:00:00 AM',1),
    ('b','1/1/2019 12:00:00 AM',1),
    ('c','1/1/2019 12:00:00 AM',1),
    ('d','1/1/2019 12:00:00 AM',1),
    ('e','1/1/2019 12:00:00 AM',1);
    
  2. Maak een opgeslagen procedure in dezelfde Azure SQL Database voor besturingstabel.

    Notitie

    De naam van de opgeslagen procedure is sp_insert_partition_JobRunTime_success. Deze wordt aangeroepen door de sqlServerStoredProcedure-activiteit in uw ADF-pijplijn.

    CREATE PROCEDURE [dbo].[sp_insert_partition_JobRunTime_success] @PartPrefix varchar(255), @JobRunTime datetime, @SuccessOrFailure bit
    AS
    BEGIN
        INSERT INTO s3_partition_delta_control_table (PartitionPrefix, JobRunTime, SuccessOrFailure)
        VALUES
            (@PartPrefix,@JobRunTime,@SuccessOrFailure)
    END
    GO
    
  3. Ga naar de sjabloon Delta-gegevens kopiëren van AWS S3 naar Azure Data Lake Storage Gen2 . Voer de verbindingen met uw externe beheertabel, AWS S3 in als het gegevensbronarchief en Azure Data Lake Storage Gen2 als doelarchief. Houd er rekening mee dat de tabel met externe besturingselementen en de opgeslagen procedure verwijzen naar dezelfde verbinding.

    Create a new connection

  4. Selecteer Deze sjabloon gebruiken.

    Use this template

  5. U ziet dat de 2 pijplijnen en 3 gegevenssets zijn gemaakt, zoals wordt weergegeven in het volgende voorbeeld:

    Review the pipeline

  6. Ga naar de pijplijn 'DeltaCopyFromS3' en selecteer Debug en voer de parameters in. Selecteer Vervolgens Voltooien.

    Click **Debug**

  7. U ziet resultaten die vergelijkbaar zijn met het volgende voorbeeld:

    Review the result

  8. U kunt ook de resultaten uit de besturingstabel controleren door een query 'select * from s3_partition_delta_control_table' te selecteren. De uitvoer ziet er ongeveer als in het volgende voorbeeld uit:

    Screenshot that shows the results from the control table after you run the query.