Aktivitas Aliran Data di Azure Data Factory dan Azure Synapse Analytics

BERLAKU UNTUK:Azure Data Factory Azure Synapse Analytics

Gunakan aktivitas Aliran Data untuk mentransformasi dan memindahkan data melalui aliran data pemetaan. Jika Anda baru menggunakan aliran data, lihat Gambaran umum pemetaan Aliran Data

Buat aktivitas Aliran Data dengan antarmuka pengguna

Untuk menggunakan aktivitas Aliran Data dalam alur, selesaikan langkah-langkah berikut:

  1. Cari Aliran Data di panel Aktivitas alur, dan seret aktivitas Aliran Data ke kanvas alur.

  2. Pilih aktivitas Aliran Data baru di kanvas jika belum dipilih, dan tab Pengaturan, untuk mengedit detailnya.

    Shows the UI for a Data Flow activity.

  3. Titik pemeriksaan digunakan untuk mengatur titik pemeriksaan saat aliran data digunakan untuk pengambilan data yang diubah. Anda bisa menimpanya. Aktivitas aliran data menggunakan nilai panduan sebagai kunci titik pemeriksaan alih-alih "nama alur + nama aktivitas" sehingga selalu dapat melacak status pengambilan data perubahan pelanggan meskipun ada tindakan penggantian nama. Semua aktivitas aliran data yang ada akan menggunakan kunci pola lama untuk kompatibilitas mundur. Opsi kunci titik pemeriksaan setelah menerbitkan aktivitas aliran data baru dengan mengubah sumber daya aliran data yang diaktifkan pengambilan data ditampilkan seperti di bawah ini.

    Shows the UI for a Data Flow activity with checkpoint key.

  4. Pilih aliran data yang sudah ada atau buat yang baru menggunakan tombol Baru. Pilih opsi lain yang diperlukan untuk menyelesaikan konfigurasi Anda.

Sintaks

{
    "name": "MyDataFlowActivity",
    "type": "ExecuteDataFlow",
    "typeProperties": {
      "dataflow": {
         "referenceName": "MyDataFlow",
         "type": "DataFlowReference"
      },
      "compute": {
         "coreCount": 8,
         "computeType": "General"
      },
      "traceLevel": "Fine",
      "runConcurrently": true,
      "continueOnError": true,      
      "staging": {
          "linkedService": {
              "referenceName": "MyStagingLinkedService",
              "type": "LinkedServiceReference"
          },
          "folderPath": "my-container/my-folder"
      },
      "integrationRuntime": {
          "referenceName": "MyDataFlowIntegrationRuntime",
          "type": "IntegrationRuntimeReference"
      }
}

Properti jenis

Properti Deskripsi Nilai yang diizinkan Diperlukan
dataflow Referensi ke Aliran Data yang dijalankan DataFlowReference Ya
integrationRuntime Lingkungan komputasi yang dijalankan aliran data. Jika tidak ditentukan, runtime integrasi Azure yang diatasi otomatis akan digunakan. IntegrationRuntimeReference Tidak
compute.coreCount Jumlah inti yang digunakan dalam kluster spark. Hanya dapat ditentukan jika runtime Integrasi penyelesaian otomatis Azure digunakan 8, 16, 32, 48, 80, 144, 272 Tidak
compute.computeType Jenis komputasi yang digunakan dalam kluster spark. Hanya dapat ditentukan jika runtime Integrasi penyelesaian otomatis Azure digunakan "General", "MemoryOptimized" Tidak
staging.linkedService Jika Anda menggunakan sumber atau sink Azure Synapse Analytics, tentukan akun penyimpanan yang digunakan untuk penahapan PolyBase.

Jika Azure Storage Anda dikonfigurasi dengan titik akhir layanan VNet, Anda harus menggunakan autentikasi identitas terkelola dengan "izinkan layanan Microsoft tepercaya" diaktifkan pada akun penyimpanan, lihat Dampak penggunaan Titik Akhir Layanan VNet dengan penyimpanan Azure. Pelajari juga konfigurasi yang diperlukan untuk masing-masing Blob Azure dan Azure Data Lake Storage Gen2.
LinkedServiceReference Hanya jika aliran data membaca atau menulis ke Azure Synapse Analytics
staging.folderPath Jika Anda menggunakan sumber atau sink Azure Synapse Analytics, jalur folder di akun penyimpanan blob yang digunakan untuk penahapan PolyBase Untai (karakter) Hanya jika aliran data membaca atau menulis ke Azure Synapse Analytics
traceLevel Mengatur tingkat pengelogan eksekusi aktivitas aliran data Anda Halus, Kasar, Tidak ada Tidak

Execute Data Flow

Komputasi aliran data ukuran dinamis pada runtime

Properti Core Count dan Compute Type dapat diatur secara dinamis untuk menyesuaikan dengan ukuran data sumber masuk Anda pada runtime. Gunakan aktivitas alur seperti Pencarian atau Dapatkan Metadata untuk menemukan ukuran data himpunan data sumber. Lalu, gunakan Tambahkan Konten Dinamis di properti aktivitas Aliran Data.

Catatan

Saat memilih inti driver dan simpul pekerja di Aliran Data Azure Synapse, minimal 3 simpul akan selalu digunakan.

Dynamic Data Flow

Berikut adalah tutorial video singkat yang menjelaskan teknik ini

Runtime integrasi Aliran Data

Pilih Runtime Integrasi mana yang akan digunakan untuk eksekusi aktivitas Aliran Data Anda. Secara default, layanan akan menggunakan penyelesaian otomatis Runtime integrasi Azure dengan empat inti pekerja. Runtime integrasi ini memiliki jenis komputasi tujuan umum dan berjalan di wilayah yang sama dengan instans layanan Anda. Untuk alur yang dioperasionalkan, sangat disarankan agar Anda membuat Integration Runtimes Azure Anda sendiri yang menentukan wilayah tertentu, jenis komputasi, jumlah inti, dan TTL untuk eksekusi aktivitas aliran data Anda.

Jenis komputasi minimum untuk Tujuan Umum dengan konfigurasi 8+8 (16 total v-core) dan Time to live (TTL) 10 menit adalah rekomendasi minimum untuk sebagian besar beban kerja produksi. Dengan mengatur TTL kecil, IR Azure dapat mempertahankan kluster hangat yang tidak akan dikenakan beberapa menit waktu mulai untuk kluster dingin. Untuk mengetahui informasi selengkapnya, lihat Runtime integrasi Azure.

Azure Integration Runtime

Penting

Pilihan Runtime Integrasi dalam aktivitas Aliran Data hanya berlaku untuk eksekusi yang dipicu oleh alur Anda. Men-debug alur Anda dengan aliran data berjalan pada kluster yang ditentukan dalam sesi debug.

PolyBase

Jika menggunakan Azure Synapse Analytics sebagai sink atau sumber, Anda harus memilih lokasi penahapan untuk beban batch PolyBase Anda. PolyBase memungkinkan pemuatan batch secara massal alih-alih memuat baris demi baris data. PolyBase secara drastis mengurangi waktu muat ke Azure Synapse Analytics.

Kunci titik pemeriksaan

Saat menggunakan opsi ubah pengambilan untuk sumber aliran data, Azure Data Factory akan memelihara dan mengelola titik pemeriksaan untuk Anda secara otomatis. Kunci titik pemeriksaan default adalah hash dari nama aliran data dan nama alur. Jika Anda menggunakan pola dinamis untuk tabel atau folder sumber Anda, Anda mungkin ingin mengambil alih hash ini dan menyetel nilai kunci titik pemeriksaan Anda sendiri di sini.

Tingkat pengelogan

Jika Anda tidak memerlukan setiap eksekusi alur aktivitas aliran data Anda untuk mencatat semua log telemetri verbose sepenuhnya, Anda dapat secara opsional mengatur tingkat pengelogan Anda ke "Dasar" atau "Tidak Ada". Saat menjalankan aliran data Anda dalam mode "Verbose" (default), Anda meminta layanan untuk mencatat aktivitas sepenuhnya di setiap tingkat partisi individu selama transformasi data Anda. Ini bisa menjadi operasi yang mahal, jadi aktifkan verbose hanya saat pemecahan masalah dapat meningkatkan aliran data dan performa alur Anda secara keseluruhan. Mode "Dasar" hanya akan mencatat durasi transformasi, sementara "Tidak Ada" hanya akan memberikan ringkasan durasi.

Logging level

Properti sink

Fitur pengelompokan dalam aliran data memungkinkan Anda untuk mengatur urutan eksekusi sink Anda serta untuk mengelompokkan sink bersama-sama menggunakan nomor grup yang sama. Untuk membantu mengelola grup, Anda dapat meminta layanan untuk menjalankan sink, dalam grup yang sama, secara paralel. Anda juga dapat mengatur grup sink untuk melanjutkan bahkan setelah salah satu sink mengalami kesalahan.

Perilaku default sink aliran data adalah mengeksekusi setiap sink secara berurutan, dalam urutan serial, dan menggagalkan aliran data saat terjadi kesalahan di sink. Selain itu, semua sink ditetapkan secara default ke grup yang sama kecuali Anda masuk ke properti aliran data dan menetapkan prioritas berbeda untuk sink.

Sink properties

Baris pertama saja

Opsi ini hanya tersedia untuk aliran data yang memiliki sink cache yang diaktifkan untuk "Output ke aktivitas". Output dari aliran data yang dimasukkan langsung ke pipeline Anda dibatasi hingga 2MB. Pengaturan "baris pertama saja" membantu Anda membatasi output data dari aliran data saat memasukkan output aktivitas aliran data langsung ke pipeline anda.

Parameterisasi Aliran Data

Himpunan data berparameter

Jika aliran data Anda menggunakan himpunan data berparameter, atur nilai parameter di tab Pengaturan.

Execute Data Flow Parameters

Aliran data berparameter

Jika aliran data Anda berparameter, atur nilai dinamis parameter aliran data di tab Parameter. Anda dapat menggunakan bahasa ekspresi alur atau bahasa ekspresi aliran data untuk menetapkan nilai parameter dinamis atau literal. Untuk mengetahui informasi selengkapnya, lihat Parameter Aliran Data.

Properti komputasi berparameter.

Anda dapat memberi parameter pada jumlah inti atau jenis komputasi jika Anda menggunakan runtime Integrasi penyelesaian otomatis Azure dan menentukan nilai untuk compute.coreCount dan compute.computeType.

Execute Data Flow Parameter Example

Debug alur aktivitas Aliran Data

Untuk menjalankan saluran debug yang dijalankan dengan aktivitas Aliran Data, Anda harus mengaktifkan mode debug aliran data melalui penggeser Debug Aliran Data di bilah atas. Mode debug memungkinkan Anda menjalankan aliran data terhadap kluster Spark aktif. Untuk mengetahui informasi selengkapnya, lihat Mode Debug.

Screenshot that shows where is the Debug button

Alur debug berjalan terhadap kluster debug aktif, bukan lingkungan runtime integrasi yang ditentukan dalam pengaturan aktivitas Aliran Data. Anda dapat memilih lingkungan komputasi debug saat memulai mode debug.

Memantau aktivitas Aliran Data

Aktivitas Aliran Data memiliki pengalaman pemantauan khusus di mana Anda dapat melihat informasi partisi, waktu tahapan, dan silsilah data. Buka panel pemantauan melalui ikon kacamata di bawah Tindakan. Untuk mengetahui informasi selengkapnya, lihat Memantau Aliran Data.

Menggunakan aktivitas Aliran Data menghasilkan aktivitas berikutnya

Metrik output aktivitas aliran data mengenai jumlah baris yang ditulis ke setiap sink dan baris yang dibaca dari setiap sumber. Hasil dikembalikan di bagian output hasil eksekusi aktivitas. Metrik yang dikembalikan dalam format json di bawah ini.

{
    "runStatus": {
        "metrics": {
            "<your sink name1>": {
                "rowsWritten": <number of rows written>,
                "sinkProcessingTime": <sink processing time in ms>,
                "sources": {
                    "<your source name1>": {
                        "rowsRead": <number of rows read>
                    },
                    "<your source name2>": {
                        "rowsRead": <number of rows read>
                    },
                    ...
                }
            },
            "<your sink name2>": {
                ...
            },
            ...
        }
    }
}

Misalnya, untuk mendapatkan jumlah baris yang ditulis ke sink bernama 'sink1' dalam aktivitas bernama 'dataflowActivity', gunakan @activity('dataflowActivity').output.runStatus.metrics.sink1.rowsWritten.

Untuk mendapatkan jumlah baris yang dibaca dari sumber bernama 'source1' yang digunakan dalam sink tersebut, gunakan @activity('dataflowActivity').output.runStatus.metrics.sink1.sources.source1.rowsRead.

Catatan

Jika sink memiliki baris nol yang ditulis, sink tidak akan muncul dalam metrik. Keberadaan dapat diverifikasi menggunakan fungsi contains. Misalnya, contains(activity('dataflowActivity').output.runStatus.metrics, 'sink1') akan memeriksa apakah ada baris yang ditulis ke sink1.

Langkah berikutnya

Lihat aktivitas alur kontrol yang didukung: