Pengantar Utilitas Microsoft Spark

Microsoft Spark Utilities (MSSparkUtils) adalah paket bawaan untuk membantu Anda melakukan tugas umum dengan mudah. Anda dapat menggunakan MSSparkUtils untuk bekerja dengan sistem file, untuk mendapatkan variabel lingkungan, untuk mengikat notebook bersama-sama, dan bekerja dengan rahasia. MSSparkUtils tersedia dalam PySpark (Python)alur , , Scala, .NET Spark (C#)dan R (Preview) notebook dan Synapse.

Prasyarat

Konfigurasikan akses ke Azure Data Lake Storage Gen2

Notebook Synapse menggunakan pass-through Microsoft Entra untuk mengakses akun ADLS Gen2. Anda harus menjadi Kontributor Data Gumpalan Penyimpanan untuk mengakses akun ADLS Gen2 (atau folder).

Alur Synapse menggunakan Managed Service Identity (MSI) ruang kerja untuk mengakses akun penyimpanan. Untuk menggunakan MSSparkUtils dalam aktivitas alur Anda, identitas ruang kerja Anda harus menjadi Storage Blob Data Contributor untuk mengakses akun (atau folder) ADLS Gen2.

Ikuti langkah-langkah ini untuk memastikan ID Microsoft Entra dan MSI ruang kerja Anda memiliki akses ke akun ADLS Gen2:

  1. Buka portal Azure dan akun penyimpanan yang ingin Anda akses. Anda dapat menavigasi ke kontainer tertentu yang ingin Anda akses.

  2. Pilih Kontrol akses (IAM) dari panel kiri.

  3. Pilih Tambahkan>Tambahkan penetapan peran untuk membuka halaman Tambahkan penetapan peran.

  4. Tetapkan peran berikut. Untuk langkah-langkah mendetail, lihat Menetapkan peran Azure menggunakan portal Azure.

    Pengaturan Nilai
    Peran Kontributor Data Blob Penyimpanan
    Tetapkan akses ke USER dan MANAGEDIDENTITY
    Anggota akun Microsoft Entra dan identitas ruang kerja Anda

    Catatan

    Nama identitas terkelola juga merupakan nama ruang kerja.

    Menambahkan halaman penetapan peran di portal Microsoft Azure.

  5. Pilih Simpan.

Anda dapat mengakses data di ADLS Gen2 dengan Synapse Spark melalui URL berikut:

abfss://<container_name>@<storage_account_name>.dfs.core.windows.net/<path>

Konfigurasikan akses ke Azure Blob Storage

Synapse menggunakan tanda tangan akses bersama (SAS) untuk mengakses Azure Blob Storage. Untuk menghindari mengekspos kunci SAS dalam kode, sebaiknya buat layanan tertaut baru di ruang kerja Synapse ke akun Azure Blob Storage yang ingin Anda akses.

Ikuti langkah-langkah ini untuk menambahkan layanan tertaut baru untuk akun Azure Blob Storage:

  1. Buka Azure Synapse Studio.
  2. Pilih Kelola dari panel kiri dan pilih Layanan tertaut di bawah koneksi Eksternal.
  3. Cari Azure Blob Storage di panel Layanan tertaut baru di sebelah kanan.
  4. Pilih Lanjutkan.
  5. Pilih Akun Azure Blob Storage untuk mengakses dan mengonfigurasi nama layanan tertaut. Sarankan untuk menggunakan Kunci akun untuk Metode autentikasi.
  6. Pilih Uji koneksi untuk memvalidasi apakah pengaturan benar.
  7. Pilih Buat terlebih dahulu dan klik Terbitkan semua untuk menyimpan perubahan Anda.

Anda dapat mengakses data di Azure Blob Storage dengan Synapse Spark melalui URL berikut:

wasb[s]://<container_name>@<storage_account_name>.blob.core.windows.net/<path>

Berikut adalah contoh kode:

from pyspark.sql import SparkSession

# Azure storage access info
blob_account_name = 'Your account name' # replace with your blob name
blob_container_name = 'Your container name' # replace with your container name
blob_relative_path = 'Your path' # replace with your relative folder path
linked_service_name = 'Your linked service name' # replace with your linked service name

blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely

wasb_path = 'wasbs://%s@%s.blob.core.windows.net/%s' % (blob_container_name, blob_account_name, blob_relative_path)

spark.conf.set('fs.azure.sas.%s.%s.blob.core.windows.net' % (blob_container_name, blob_account_name), blob_sas_token)
print('Remote blob path: ' + wasb_path)
val blob_account_name = "" // replace with your blob name
val blob_container_name = "" //replace with your container name
val blob_relative_path = "/" //replace with your relative folder path
val linked_service_name = "" //replace with your linked service name


val blob_sas_token = mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

val wasbs_path = f"wasbs://$blob_container_name@$blob_account_name.blob.core.windows.net/$blob_relative_path"
spark.conf.set(f"fs.azure.sas.$blob_container_name.$blob_account_name.blob.core.windows.net",blob_sas_token)

var blob_account_name = "";  // replace with your blob name
var blob_container_name = "";     // replace with your container name
var blob_relative_path = "";  // replace with your relative folder path
var linked_service_name = "";    // replace with your linked service name
var blob_sas_token = Credentials.GetConnectionStringOrCreds(linked_service_name);

spark.Conf().Set($"fs.azure.sas.{blob_container_name}.{blob_account_name}.blob.core.windows.net", blob_sas_token);

var wasbs_path = $"wasbs://{blob_container_name}@{blob_account_name}.blob.core.windows.net/{blob_relative_path}";

Console.WriteLine(wasbs_path);

# Azure storage access info
blob_account_name <- 'Your account name' # replace with your blob name
blob_container_name <- 'Your container name' # replace with your container name
blob_relative_path <- 'Your path' # replace with your relative folder path
linked_service_name <- 'Your linked service name' # replace with your linked service name

blob_sas_token <- mssparkutils.credentials.getConnectionStringOrCreds(linked_service_name)

# Allow SPARK to access from Blob remotely
sparkR.session()
wasb_path <- sprintf('wasbs://%s@%s.blob.core.windows.net/%s',blob_container_name, blob_account_name, blob_relative_path)
sparkR.session(sprintf('fs.azure.sas.%s.%s.blob.core.windows.net',blob_container_name, blob_account_name), blob_sas_token)

print( paste('Remote blob path: ',wasb_path))

Konfigurasikan akses ke Azure Key Vault

Anda dapat menambahkan Azure Key Vault sebagai layanan tertaut untuk mengelola kredensial Anda di Synapse. Ikuti langkah-langkah ini untuk menambahkan Azure Key Vault sebagai layanan tertaut Synapse:

  1. Buka Azure Synapse Studio.

  2. Pilih Kelola dari panel kiri dan pilih Layanan tertaut di bawah koneksi Eksternal.

  3. Cari Azure Key Vault di panel Layanan tertaut baru di sebelah kanan.

  4. Pilih Akun Azure Key Vault untuk mengakses dan mengonfigurasi nama layanan tertaut.

  5. Pilih Uji koneksi untuk memvalidasi apakah pengaturan benar.

  6. Pilih Buat terlebih dahulu dan klik Terbitkan semua untuk menyimpan perubahan Anda.

Notebook Synapse menggunakan pass-through Microsoft Entra untuk mengakses Azure Key Vault. Alur Synapse menggunakan identitas ruang kerja (MSI) untuk mengakses Azure Key Vault. Untuk memastikan kode Anda berfungsi baik di notebook maupun di alur Synapse, sebaiknya berikan izin akses rahasia untuk akun Microsoft Entra dan identitas ruang kerja Anda.

Ikuti langkah-langkah ini untuk memberikan akses rahasia ke identitas ruang kerja Anda:

  1. Buka portal Azure dan Azure Key Vault yang ingin Anda akses.
  2. Pilih Kebijakan Akses dari panel kiri.
  3. Pilih Tambahkan Kebijakan Akses:
    • Pilih Manajemen Kunci, Rahasia, & Sertifikat sebagai templat konfigurasi.
    • Pilih akun Microsoft Entra dan identitas ruang kerja Anda (sama dengan nama ruang kerja Anda) di prinsipal pilihan atau pastikan akun tersebut sudah ditetapkan.
  4. Pilih Pilih dan Tambahkan.
  5. Pilih tombol Simpan untuk menerapkan perubahan.

Utilitas sistem file

mssparkutils.fs menyediakan utilitas untuk bekerja dengan berbagai sistem file, termasuk Azure Data Lake Storage Gen2 (ADLS Gen2) dan Azure Blob Storage. Pastikan Anda mengonfigurasi akses ke Azure Data Lake Storage Gen2 dan Azure Blob Storage dengan tepat.

Jalankan perintah berikut untuk mendapatkan gambaran umum metode yang tersedia:

from notebookutils import mssparkutils
mssparkutils.fs.help()
mssparkutils.fs.help()
using Microsoft.Spark.Extensions.Azure.Synapse.Analytics.Notebook.MSSparkUtils;
FS.Help()
library(notebookutils)
mssparkutils.fs.help()

Menghasilkan:


mssparkutils.fs provides utilities for working with various FileSystems.

Below is overview about the available methods:

cp(from: String, to: String, recurse: Boolean = false): Boolean -> Copies a file or directory, possibly across FileSystems
mv(src: String, dest: String, create_path: Boolean = False, overwrite: Boolean = False): Boolean -> Moves a file or directory, possibly across FileSystems
ls(dir: String): Array -> Lists the contents of a directory
mkdirs(dir: String): Boolean -> Creates the given directory if it does not exist, also creating any necessary parent directories
put(file: String, contents: String, overwrite: Boolean = false): Boolean -> Writes the given String out to a file, encoded in UTF-8
head(file: String, maxBytes: int = 1024 * 100): String -> Returns up to the first 'maxBytes' bytes of the given file as a String encoded in UTF-8
append(file: String, content: String, createFileIfNotExists: Boolean): Boolean -> Append the content to a file
rm(dir: String, recurse: Boolean = false): Boolean -> Removes a file or directory

Use mssparkutils.fs.help("methodName") for more info about a method.

Daftar file

Cantumkan isi direktori.

mssparkutils.fs.ls('Your directory path')
mssparkutils.fs.ls("Your directory path")
FS.Ls("Your directory path")
mssparkutils.fs.ls("Your directory path")

Perlihatkan properti file

Mengembalikan properti file termasuk nama file, jalur file, ukuran file, waktu modifikasi file, dan apakah itu direktori dan file.

files = mssparkutils.fs.ls('Your directory path')
for file in files:
    print(file.name, file.isDir, file.isFile, file.path, file.size, file.modifyTime)
val files = mssparkutils.fs.ls("/")
files.foreach{
    file => println(file.name,file.isDir,file.isFile,file.size,file.modifyTime)
}
var Files = FS.Ls("/");
foreach(var File in Files) {
    Console.WriteLine(File.Name+" "+File.IsDir+" "+File.IsFile+" "+File.Size);
}
files <- mssparkutils.fs.ls("/")
for (file in files) {
    writeLines(paste(file$name, file$isDir, file$isFile, file$size, file$modifyTime))
}

Buat direktori baru

Membuat direktori yang ditentukan jika tidak ada dan direktori induk yang diperlukan.

mssparkutils.fs.mkdirs('new directory name')
mssparkutils.fs.mkdirs("new directory name")
FS.Mkdirs("new directory name")
mssparkutils.fs.mkdirs("new directory name")

Salin file

Menyalin file atau direktori. Mendukung penyalinan di seluruh sistem file.

mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)# Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
FS.Cp("source file or directory", "destination file or directory", true) // Set the third parameter as True to copy all files and directories recursively
mssparkutils.fs.cp('source file or directory', 'destination file or directory', True)

File salin berkinerja

Metode ini menyediakan cara penyalinan atau pemindahan file yang lebih cepat, terutama data dalam volume besar.

mssparkutils.fs.fastcp('source file or directory', 'destination file or directory', True) # Set the third parameter as True to copy all files and directories recursively

Pratinjau konten file

Mengembalikan hingga bita 'maxBytes' pertama dari file yang ditentukan sebagai String yang dikodekan dalam UTF-8.

mssparkutils.fs.head('file path', maxBytes to read)
mssparkutils.fs.head("file path", maxBytes to read)
FS.Head("file path", maxBytes to read)
mssparkutils.fs.head('file path', maxBytes to read)

Pindahkan file

Memindahkan file atau direktori. Mendukung pemindahan di seluruh sistem file.

mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist
mssparkutils.fs.mv("source file or directory", "destination directory", true) // Set the last parameter as True to firstly create the parent directory if it does not exist
FS.Mv("source file or directory", "destination directory", true)
mssparkutils.fs.mv('source file or directory', 'destination directory', True) # Set the last parameter as True to firstly create the parent directory if it does not exist

Tulis file

Menulis string yang ditentukan ke file, yang dikodekan dalam UTF-8.

mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
FS.Put("file path", "content to write", true) // Set the last parameter as True to overwrite the file if it existed already
mssparkutils.fs.put("file path", "content to write", True) # Set the last parameter as True to overwrite the file if it existed already

Tambahkan konten ke file

Menambahkan string yang ditentukan ke file, yang dikodekan dalam UTF-8.

mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path","content to append",true) // Set the last parameter as True to create the file if it does not exist
FS.Append("file path", "content to append", true) // Set the last parameter as True to create the file if it does not exist
mssparkutils.fs.append("file path", "content to append", True) # Set the last parameter as True to create the file if it does not exist

Hapus file atau direktori

Menghapus file atau direktori.

mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
FS.Rm("file path", true) // Set the last parameter as True to remove all files and directories recursively
mssparkutils.fs.rm('file path', True) # Set the last parameter as True to remove all files and directories recursively

Utilitas notebook

Tidak didukung.

Anda bisa menggunakan MSSparkUtils Notebook Utilities untuk menjalankan notebook atau keluar dari notebook dengan nilai. Jalankan perintah berikut untuk mendapatkan gambaran umum tentang metode yang tersedia:

mssparkutils.notebook.help()

Dapatkan hasil:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Catatan

Utilitas notebook tidak berlaku untuk definisi kerja Apache Spark (SJD).

Referensikan notebook

Referensikan notebook dan tampilkan nilai keluarnya. Anda bisa menjalankan panggilan fungsi nesting di notebook secara interaktif atau dalam alur. Notebook yang direferensikan akan berjalan pada pool Spark di mana notebook memanggil fungsi ini.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Contohnya:

mssparkutils.notebook.run("folder/Sample1", 90, {"input": 20 })

Setelah eksekusi selesai, Anda akan melihat tautan snapshot bernama 'Tampilkan eksekusi buku catatan: Nama Buku Catatan' yang diperlihatkan dalam output sel, Anda dapat mengklik tautan untuk melihat snapshot untuk eksekusi khusus ini.

Cuplikan layar python tautan snap

Referensi menjalankan beberapa buku catatan secara paralel

Metode mssparkutils.notebook.runMultiple() ini memungkinkan Anda menjalankan beberapa notebook secara paralel atau dengan struktur topologi yang telah ditentukan sebelumnya. API menggunakan mekanisme implementasi multi-utas dalam sesi spark, yang berarti sumber daya komputasi dibagikan oleh notebook referensi yang dijalankan.

Dengan mssparkutils.notebook.runMultiple(), Anda dapat:

  • Jalankan beberapa notebook secara bersamaan, tanpa menunggu masing-masing selesai.

  • Tentukan dependensi dan urutan eksekusi untuk notebook Anda, menggunakan format JSON sederhana.

  • Optimalkan penggunaan sumber daya komputasi Spark dan kurangi biaya proyek Synapse Anda.

  • Tampilkan Rekam jepret dari setiap catatan eksekusi buku catatan dalam output, dan debug/pantau tugas buku catatan Anda dengan mudah.

  • Dapatkan nilai keluar dari setiap aktivitas eksekutif dan gunakan dalam tugas hilir.

Anda juga dapat mencoba menjalankan mssparkutils.notebook.help("runMultiple") untuk menemukan contoh dan penggunaan terperinci.

Berikut adalah contoh sederhana menjalankan daftar buku catatan secara paralel menggunakan metode ini:


mssparkutils.notebook.runMultiple(["NotebookSimple", "NotebookSimple2"])

Hasil eksekusi dari buku catatan akar adalah sebagai berikut:

Cuplikan layar referensi daftar buku catatan.

Berikut ini adalah contoh menjalankan notebook dengan struktur topologis menggunakan mssparkutils.notebook.runMultiple(). Gunakan metode ini untuk mengatur notebook dengan mudah melalui pengalaman kode.

# run multiple notebooks with parameters
DAG = {
    "activities": [
        {
            "name": "NotebookSimple", # activity name, must be unique
            "path": "NotebookSimple", # notebook path
            "timeoutPerCellInSeconds": 90, # max timeout for each cell, default to 90 seconds
            "args": {"p1": "changed value", "p2": 100}, # notebook parameters
        },
        {
            "name": "NotebookSimple2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 2", "p2": 200}
        },
        {
            "name": "NotebookSimple2.2",
            "path": "NotebookSimple2",
            "timeoutPerCellInSeconds": 120,
            "args": {"p1": "changed value 3", "p2": 300},
            "retry": 1,
            "retryIntervalInSeconds": 10,
            "dependencies": ["NotebookSimple"] # list of activity names that this activity depends on
        }
    ]
}
mssparkutils.notebook.runMultiple(DAG)

Catatan

Keluar dari notebook

Keluar dari notebook dengan nilai. Anda bisa menjalankan panggilan fungsi nesting di notebook secara interaktif atau dalam alur.

  • Saat Anda memanggil fungsi exit() dari notebook secara interaktif, Azure Synapse akan melemparkan pengecualian, melewati sel berikutnya, dan menjaga sesi Spark tetap hidup.

  • Saat Anda mengatur notebook yang memanggil exit() fungsi dalam saluran Synapse, Azure Synapse akan mengembalikan nilai keluar, menyelesaikan proses alur, dan menghentikan sesi Spark.

  • Saat Anda memanggil exit() fungsi dalam notebook yang sedang direferensikan, Azure Synapse akan menghentikan eksekusi lebih lanjut dalam notebook yang direferensikan tersebut, dan terus menjalankan sel berikutnya di notebook yang memanggil run() fungsi tersebut. Misalnya: Notebook1 memiliki tiga sel dan memanggilexit() fungsi di sel kedua. Notebook2 memiliki lima sel dan panggilan run(notebook1) di sel ketiga. Saat Anda menjalankan Notebook2, Notebook1 akan dihentikan di sel kedua saat menekan exit() fungsi. Notebook2 akan terus menjalankan sel keempat dan sel kelimanya.

mssparkutils.notebook.exit("value string")

Contohnya:

Notebook Sample1 ditemukan di bawah folder/ dengan dua sel berikut:

  • sel 1 menentukan parameter input dengan nilai default yang diatur ke 10.
  • sel 2 keluar dari notebook dengan input sebagai nilai keluar.

Cuplikan layar notebook sampel

Anda bisa menjalankan Sample1 di notebook lain dengan nilai default:


exitVal = mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

Menghasilkan:

Sample1 run success with input is 10

Anda bisa menjalankan Sample1 di notebook lainnya dan mengatur nilai input sebagai 20:

exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print (exitVal)

Menghasilkan:

Sample1 run success with input is 20

Anda bisa menggunakan MSSparkUtils Notebook Utilities untuk menjalankan notebook atau keluar dari notebook dengan nilai. Jalankan perintah berikut untuk mendapatkan gambaran umum tentang metode yang tersedia:

mssparkutils.notebook.help()

Dapatkan hasil:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Referensikan notebook

Referensikan notebook dan tampilkan nilai keluarnya. Anda bisa menjalankan panggilan fungsi nesting di notebook secara interaktif atau dalam alur. Notebook yang direferensikan akan berjalan pada pool Spark di mana notebook memanggil fungsi ini.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Contohnya:

mssparkutils.notebook.run("folder/Sample1", 90, Map("input" -> 20))

Setelah eksekusi selesai, Anda akan melihat tautan snapshot bernama 'Tampilkan eksekusi buku catatan: Nama Buku Catatan' yang diperlihatkan dalam output sel, Anda dapat mengklik tautan untuk melihat snapshot untuk eksekusi khusus ini.

Cuplikan layar skala tautan snap

Keluar dari notebook

Keluar dari notebook dengan nilai. Anda bisa menjalankan panggilan fungsi nesting di notebook secara interaktif atau dalam alur.

  • Saat Anda memanggil satu exit() fungsikan notebook secara interaktif, Azure Synapse akan memberikan pengecualian, melewati sel selanjutnya yang berjalan, dan menjaga sesi Spark tetap jalan.

  • Saat Anda mengatur notebook yang memanggil exit() fungsi dalam saluran Synapse, Azure Synapse akan mengembalikan nilai keluar, menyelesaikan proses alur, dan menghentikan sesi Spark.

  • Saat Anda memanggil exit() fungsi dalam notebook yang sedang direferensikan, Azure Synapse akan menghentikan eksekusi lebih lanjut dalam notebook yang direferensikan tersebut, dan terus menjalankan sel berikutnya di notebook yang memanggil run() fungsi tersebut. Misalnya: Notebook1 memiliki tiga sel dan memanggilexit() fungsi di sel kedua. Notebook2 memiliki lima sel dan panggilan run(notebook1) di sel ketiga. Saat Anda menjalankan Notebook2, Notebook1 akan dihentikan di sel kedua saat menekan exit() fungsi. Notebook2 akan terus menjalankan sel keempat dan sel kelimanya.

mssparkutils.notebook.exit("value string")

Contohnya:

Notebook Sample1 ditemukan di bawah mssparkutils/folder/ dengan dua sel berikut:

  • sel 1 menentukan parameter input dengan nilai default yang diatur ke 10.
  • sel 2 keluar dari notebook dengan input sebagai nilai keluar.

Cuplikan layar notebook sampel

Anda bisa menjalankan Sample1 di notebook lain dengan nilai default:


val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1")
print(exitVal)

Menghasilkan:

exitVal: String = Sample1 run success with input is 10
Sample1 run success with input is 10

Anda bisa menjalankan Sample1 di notebook lainnya dan mengatur nilai input sebagai 20:

val exitVal = mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, {"input": 20 })
print(exitVal)

Menghasilkan:

exitVal: String = Sample1 run success with input is 20
Sample1 run success with input is 20

Anda bisa menggunakan MSSparkUtils Notebook Utilities untuk menjalankan notebook atau keluar dari notebook dengan nilai. Jalankan perintah berikut untuk mendapatkan gambaran umum tentang metode yang tersedia:

mssparkutils.notebook.help()

Dapatkan hasil:

The notebook module.

exit(value: String): void -> This method lets you exit a notebook with a value.
run(path: String, timeoutSeconds: int, arguments: Map): String -> This method runs a notebook and returns its exit value.

Referensikan notebook

Referensikan notebook dan tampilkan nilai keluarnya. Anda bisa menjalankan panggilan fungsi nesting di notebook secara interaktif atau dalam alur. Notebook yang direferensikan akan berjalan pada pool Spark di mana notebook memanggil fungsi ini.


mssparkutils.notebook.run("notebook path", <timeoutSeconds>, <parameterMap>)

Contohnya:

mssparkutils.notebook.run("folder/Sample1", 90, list("input": 20))

Setelah eksekusi selesai, Anda akan melihat tautan snapshot bernama 'Tampilkan eksekusi buku catatan: Nama Buku Catatan' yang diperlihatkan dalam output sel, Anda dapat mengklik tautan untuk melihat snapshot untuk eksekusi khusus ini.

Keluar dari notebook

Keluar dari notebook dengan nilai. Anda bisa menjalankan panggilan fungsi nesting di notebook secara interaktif atau dalam alur.

  • Saat Anda memanggil satu exit() fungsikan notebook secara interaktif, Azure Synapse akan memberikan pengecualian, melewati sel selanjutnya yang berjalan, dan menjaga sesi Spark tetap jalan.

  • Saat Anda mengatur notebook yang memanggil exit() fungsi dalam saluran Synapse, Azure Synapse akan mengembalikan nilai keluar, menyelesaikan proses alur, dan menghentikan sesi Spark.

  • Saat Anda memanggil exit() fungsi dalam notebook yang sedang direferensikan, Azure Synapse akan menghentikan eksekusi lebih lanjut dalam notebook yang direferensikan tersebut, dan terus menjalankan sel berikutnya di notebook yang memanggil run() fungsi tersebut. Misalnya: Notebook1 memiliki tiga sel dan memanggilexit() fungsi di sel kedua. Notebook2 memiliki lima sel dan panggilan run(notebook1) di sel ketiga. Saat Anda menjalankan Notebook2, Notebook1 akan dihentikan di sel kedua saat menekan exit() fungsi. Notebook2 akan terus menjalankan sel keempat dan sel kelimanya.

mssparkutils.notebook.exit("value string")

Contohnya:

Notebook Sample1 ditemukan di bawah folder/ dengan dua sel berikut:

  • sel 1 menentukan parameter input dengan nilai default yang diatur ke 10.
  • sel 2 keluar dari notebook dengan input sebagai nilai keluar.

Cuplikan layar notebook sampel

Anda bisa menjalankan Sample1 di notebook lain dengan nilai default:


exitVal <- mssparkutils.notebook.run("folder/Sample1")
print (exitVal)

Menghasilkan:

Sample1 run success with input is 10

Anda bisa menjalankan Sample1 di notebook lainnya dan mengatur nilai input sebagai 20:

exitVal <- mssparkutils.notebook.run("mssparkutils/folder/Sample1", 90, list("input": 20))
print (exitVal)

Menghasilkan:

Sample1 run success with input is 20

Utilitas kredensial

Anda dapat menggunakan MSSparkUtils Credentials Utilities untuk mendapatkan token akses layanan tertaut dan mengelola rahasia di Azure Key Vault.

Jalankan perintah berikut untuk mendapatkan gambaran umum tentang metode yang tersedia:

mssparkutils.credentials.help()
mssparkutils.credentials.help()
Not supported.
mssparkutils.credentials.help()

Dapatkan hasil:

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName
getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

Catatan

Saat ini getSecretWithLS(linkedService, secret) tidak didukung di C#.

getToken(audience, name): returns AAD token for a given audience, name (optional)
isValidToken(token): returns true if token hasn't expired
getConnectionStringOrCreds(linkedService): returns connection string or credentials for linked service
getFullConnectionString(linkedService): returns full connection string with credentials
getPropertiesAll(linkedService): returns all the properties of a linked servicegetSecret(akvName, secret, linkedService): returns AKV secret for a given AKV linked service, akvName, secret key
getSecret(akvName, secret): returns AKV secret for a given akvName, secret key
getSecretWithLS(linkedService, secret): returns AKV secret for a given linked service, secret key
putSecret(akvName, secretName, secretValue, linkedService): puts AKV secret for a given akvName, secretName
putSecret(akvName, secretName, secretValue): puts AKV secret for a given akvName, secretName
putSecretWithLS(linkedService, secretName, secretValue): puts AKV secret for a given linked service, secretName

Dapatkan token

Mengembalikan token Microsoft Entra untuk audiens tertentu, nama (opsional). Tabel di bawah ini mencantumkan semua tipe audiens yang tersedia:

Tipe Audiens String literal yang akan digunakan dalam panggilan API
Azure Storage Storage
Azure Key Vault Vault
Manajemen Azure AzureManagement
Gudang Data Azure SQL (Khusus dan Tanpa Server) DW
Azure Synapse Synapse
Azure Data Lake Store DataLakeStore
Azure Data Factory ADF
Azure Data Explorer AzureDataExplorer
Azure Database untuk MySQL AzureOSSDB
Azure Database untuk MariaDB AzureOSSDB
Azure Database untuk PostgreSQL AzureOSSDB
mssparkutils.credentials.getToken('audience Key')
mssparkutils.credentials.getToken("audience Key")
Credentials.GetToken("audience Key")
mssparkutils.credentials.getToken('audience Key')

Validasikan token

Mengembalikan true jika token belum kedaluwarsa.

mssparkutils.credentials.isValidToken('your token')
mssparkutils.credentials.isValidToken("your token")
Credentials.IsValidToken("your token")
mssparkutils.credentials.isValidToken('your token')

Dapatkan string koneksi atau kredensial untuk layanan tertaut

Mengembalikan string koneksi atau kredensial untuk layanan tertaut.

mssparkutils.credentials.getConnectionStringOrCreds('linked service name')
mssparkutils.credentials.getConnectionStringOrCreds("linked service name")
Credentials.GetConnectionStringOrCreds("linked service name")
mssparkutils.credentials.getConnectionStringOrCreds('linked service name')

Dapatkan rahasia dengan menggunakan identitas ruang kerja

Mengembalikan rahasia Azure Key Vault untuk nama Azure Key Vault tertentu, nama rahasia, dan nama layanan tertaut dengan menggunakan identitas ruang kerja. Pastikan Anda mengonfigurasi akses ke Azure Key Vault dengan tepat.

mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')
mssparkutils.credentials.getSecret("azure key vault name","secret name","linked service name")
Credentials.GetSecret("azure key vault name","secret name","linked service name")
mssparkutils.credentials.getSecret('azure key vault name','secret name','linked service name')

Dapatkan rahasia dengan menggunakan kredensial pengguna

Mengembalikan rahasia Azure Key Vault untuk nama Azure Key Vault tertentu, nama rahasia, dan nama layanan tertaut dengan menggunakan identitas pengguna.

mssparkutils.credentials.getSecret('azure key vault name','secret name')
mssparkutils.credentials.getSecret("azure key vault name","secret name")
Credentials.GetSecret("azure key vault name","secret name")
mssparkutils.credentials.getSecret('azure key vault name','secret name')

Pasang rahasia dengan menggunakan identitas ruang kerja

Memasang rahasia Azure Key Vault untuk nama Azure Key Vault tertentu, nama rahasia, dan nama layanan tertaut dengan menggunakan identitas ruang kerja. Pastikan Anda mengonfigurasi akses ke Azure Key Vault dengan tepat.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

Pasang rahasia dengan menggunakan identitas ruang kerja

Memasang rahasia Azure Key Vault untuk nama Azure Key Vault tertentu, nama rahasia, dan nama layanan tertaut dengan menggunakan identitas ruang kerja. Pastikan Anda mengonfigurasi akses ke Azure Key Vault dengan tepat.

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value","linked service name")

Pasang rahasia dengan menggunakan identitas ruang kerja

Memasang rahasia Azure Key Vault untuk nama Azure Key Vault tertentu, nama rahasia, dan nama layanan tertaut dengan menggunakan identitas ruang kerja. Pastikan Anda mengonfigurasi akses ke Azure Key Vault dengan tepat.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value','linked service name')

Pasang rahasia dengan menggunakan kredensial pengguna

Memasang rahasia Azure Key Vault untuk nama Azure Key Vault tertentu, nama rahasia, dan nama layanan tertaut dengan menggunakan identitas pengguna.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

Pasang rahasia dengan menggunakan kredensial pengguna

Memasang rahasia Azure Key Vault untuk nama Azure Key Vault tertentu, nama rahasia, dan nama layanan tertaut dengan menggunakan identitas pengguna.

mssparkutils.credentials.putSecret('azure key vault name','secret name','secret value')

Pasang rahasia dengan menggunakan kredensial pengguna

Memasang rahasia Azure Key Vault untuk nama Azure Key Vault tertentu, nama rahasia, dan nama layanan tertaut dengan menggunakan identitas pengguna.

mssparkutils.credentials.putSecret("azure key vault name","secret name","secret value")

Utilitas lingkungan

Jalankan perintah berikut untuk mendapatkan gambaran umum tentang metode yang tersedia:

mssparkutils.env.help()
mssparkutils.env.help()
mssparkutils.env.help()
Env.Help()

Dapatkan hasil:

getUserName(): returns user name
getUserId(): returns unique user id
getJobId(): returns job id
getWorkspaceName(): returns workspace name
getPoolName(): returns Spark pool name
getClusterId(): returns cluster id

Dapatkan nama pengguna

Mengembalikan nama pengguna saat ini.

mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
mssparkutils.env.getUserName()
Env.GetUserName()

Dapatkan ID pengguna

Mengembalikan ID pengguna saat ini.

mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
mssparkutils.env.getUserId()
Env.GetUserId()

Dapatkan ID tugas

Mengembalikan ID tugas.

mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
mssparkutils.env.getJobId()
Env.GetJobId()

Dapatkan nama ruang kerja

Mengembalikan nama ruang kerja.

mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
mssparkutils.env.getWorkspaceName()
Env.GetWorkspaceName()

Dapatkan nama pool

Mengembalikan nama pool Spark.

mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
mssparkutils.env.getPoolName()
Env.GetPoolName()

Dapatkan ID kluster

Mengembalikan ID kluster saat ini.

mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
mssparkutils.env.getClusterId()
Env.GetClusterId()

Konteks Runtime

Mssparkutils runtime utils mengekspos 3 properti runtime, Anda dapat menggunakan konteks runtime mssparkutils untuk mendapatkan properti yang terdaftar seperti di bawah ini:

  • Notebookname - Nama notebook saat ini, akan selalu menampilkan nilai untuk mode interaktif dan mode alur.
  • Pipelinejobid - ID eksekusi alur, akan menampilkan nilai dalam mode alur dan menampilkan string kosong dalam mode interaktif.
  • Activityrunid - ID eksekusi aktivitas notebook, akan menampilkan nilai dalam mode alur dan menampilkan string kosong dalam mode interaktif.

Saat ini konteks runtime mendukung Python dan Scala.

mssparkutils.runtime.context
ctx <- mssparkutils.runtime.context()
for (key in ls(ctx)) {
    writeLines(paste(key, ctx[[key]], sep = "\t"))
}
%%spark
mssparkutils.runtime.context

Manajemen sesi

Menghentikan sesi interaktif

Alih-alih mengklik tombol hentikan secara manual, terkadang lebih nyaman untuk menghentikan sesi interaktif dengan memanggil API dalam kode. Untuk kasus seperti itu, kami menyediakan API mssparkutils.session.stop() untuk mendukung penghentian sesi interaktif melalui kode, tersedia untuk Scala dan Python.

mssparkutils.session.stop()
mssparkutils.session.stop()
mssparkutils.session.stop()

mssparkutils.session.stop() API akan menghentikan sesi interaktif saat ini secara asinkron di latar belakang, API tersebut menghentikan sesi Spark dan merilis sumber daya yang ditempati oleh sesi sehingga tersedia untuk sesi lain di kumpulan yang sama.

Catatan

Kami tidak merekomendasikan API bawaan bahasa panggilan seperti sys.exit di Scala atau sys.exit() di Python dalam kode Anda, karena API tersebut hanya mematikan proses interpreter, sehingga sesi Spark tetap hidup dan sumber daya tidak dirilis.

Dependensi Paket

Jika Anda ingin mengembangkan notebook atau pekerjaan secara lokal dan perlu mereferensikan paket yang relevan untuk petunjuk kompilasi/IDE, Anda dapat menggunakan paket berikut.

Langkah berikutnya