Data wrangling with Apache Spark pools (preview)
In this article, you learn how to perform data wrangling tasks interactively within a dedicated Synapse session, powered by Azure Synapse Analytics, in a Jupyter notebook using the Azure Machine Learning Python SDK.
If you prefer to use Azure Machine Learning pipelines, see How to use Apache Spark (powered by Azure Synapse Analytics) in your machine learning pipeline (preview).
For guidance on how to use Azure Synapse Analytics with a Synapse workspace, see the Azure Synapse Analytics get started series.
The Azure Machine Learning and Azure Synapse Analytics integration is in preview. The capabilities presented in this article employ the
azureml-synapse package which contains experimental preview features that may change at any time.
Azure Machine Learning and Azure Synapse Analytics integration
The Azure Synapse Analytics integration with Azure Machine Learning (preview) allows you to attach an Apache Spark pool backed by Azure Synapse for interactive data exploration and preparation. With this integration, you can have a dedicated compute for data wrangling at scale, all within the same Python notebook you use for training your machine learning models.
azureml-synapsepackage (preview) with the following code:
pip install azureml-synapse
Attach a Synapse Spark pool as a compute target.
Launch Synapse Spark pool for data wrangling tasks
To begin data preparation with the Apache Spark pool, specify the attached Spark Synapse compute name. This name can be found via the Azure Machine Learning studio under the Attached computes tab.
To continue use of the Apache Spark pool you must indicate which compute resource to use throughout your data wrangling tasks with
%synapse for single lines of code and
%%synapse for multiple lines.
%synapse start -c SynapseSparkPoolAlias
After the session starts, you can check the session's metadata.
You can specify an Azure Machine Learning environment to use during your Apache Spark session. Only Conda dependencies specified in the environment will take effect. Docker image is not supported.
Python dependencies specified in environment Conda dependencies are not supported in Apache Spark pools. Currently, only fixed Python versions are supported.
Check your Python version by including
sys.version_info in your script.
The following code, creates the environment,
myenv, which installs
azureml-core version 1.20.0 and
numpy version 1.17.0 before the session begins. You can then include this environment in your Apache Spark session
from azureml.core import Workspace, Environment # creates environment with numpy and azureml-core dependencies ws = Workspace.from_config() env = Environment(name="myenv") env.python.conda_dependencies.add_pip_package("azureml-core==1.20.0") env.python.conda_dependencies.add_conda_package("numpy==1.17.0") env.register(workspace=ws)
To begin data preparation with the Apache Spark pool and your custom environment, specify the Apache Spark pool name and which environment to use during the Apache Spark session. Furthermore, you can provide your subscription ID, the machine learning workspace resource group, and the name of the machine learning workspace.
%synapse start -c SynapseSparkPoolAlias -e myenv -s AzureMLworkspaceSubscriptionID -r AzureMLworkspaceResourceGroupName -w AzureMLworkspaceName
Load data from storage
Once your Apache Spark session starts, read in the data that you wish to prepare. Data loading is supported for Azure Blob storage and Azure Data Lake Storage Generations 1 and 2.
There are two ways to load data from these storage services:
Directly load data from storage using its Hadoop Distributed Files System (HDFS) path.
Read in data from an existing Azure Machine Learning dataset.
To access these storage services, you need Storage Blob Data Reader permissions. If you plan to write data back to these storage services, you need Storage Blob Data Contributor permissions. Learn more about storage permissions and roles.
Load data with Hadoop Distributed Files System (HDFS) path
To load and read data in from storage with the corresponding HDFS path, you need to have your data access authentication credentials readily available. These credentials differ depending on your storage type.
The following code demonstrates how to read data from an Azure Blob storage into a Spark dataframe with either your shared access signature (SAS) token or access key.
%%synapse # setup access key or SAS token sc._jsc.hadoopConfiguration().set("fs.azure.account.key.<storage account name>.blob.core.windows.net", "<access key>") sc._jsc.hadoopConfiguration().set("fs.azure.sas.<container name>.<storage account name>.blob.core.windows.net", "<sas token>") # read from blob df = spark.read.option("header", "true").csv("wasbs://email@example.com/Titanic.csv")
The following code demonstrates how to read data in from Azure Data Lake Storage Generation 1 (ADLS Gen 1) with your service principal credentials.
%%synapse # setup service principal which has access of the data sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.access.token.provider.type","ClientCredential") sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.client.id", "<client id>") sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.credential", "<client secret>") sc._jsc.hadoopConfiguration().set("fs.adl.account.<storage account name>.oauth2.refresh.url", "https://login.microsoftonline.com/<tenant id>/oauth2/token") df = spark.read.csv("adl://<storage account name>.azuredatalakestore.net/<path>")
The following code demonstrates how to read data in from Azure Data Lake Storage Generation 2 (ADLS Gen 2) with your service principal credentials.
%%synapse # setup service principal which has access of the data sc._jsc.hadoopConfiguration().set("fs.azure.account.auth.type.<storage account name>.dfs.core.windows.net","OAuth") sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth.provider.type.<storage account name>.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider") sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.id.<storage account name>.dfs.core.windows.net", "<client id>") sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.secret.<storage account name>.dfs.core.windows.net", "<client secret>") sc._jsc.hadoopConfiguration().set("fs.azure.account.oauth2.client.endpoint.<storage account name>.dfs.core.windows.net", "https://login.microsoftonline.com/<tenant id>/oauth2/token") df = spark.read.csv("abfss://<container name>@<storage account>.dfs.core.windows.net/<path>")
Read in data from registered datasets
You can also get an existing registered dataset in your workspace and perform data preparation on it by converting it into a spark dataframe.
The following example authenticates to the workspace, gets a registered TabularDataset,
blob_dset, that references files in blob storage, and converts it into a spark dataframe. When you convert your datasets into a spark dataframe, you can leverage
pyspark data exploration and preparation libraries.
%%synapse from azureml.core import Workspace, Dataset subscription_id = "<enter your subscription ID>" resource_group = "<enter your resource group>" workspace_name = "<enter your workspace name>" ws = Workspace(workspace_name = workspace_name, subscription_id = subscription_id, resource_group = resource_group) dset = Dataset.get_by_name(ws, "blob_dset") spark_df = dset.to_spark_dataframe()
Perform data wrangling tasks
After you've retrieved and explored your data, you can perform data wrangling tasks.
The following code, expands upon the HDFS example in the previous section and filters the data in spark dataframe,
df, based on the Survivor column and groups that list by Age
%%synapse from pyspark.sql.functions import col, desc df.filter(col('Survived') == 1).groupBy('Age').count().orderBy(desc('count')).show(10) df.show()
Save data to storage and stop spark session
Once your data exploration and preparation is complete, store your prepared data for later use in your storage account on Azure.
In the following example, the prepared data is written back to Azure Blob storage and overwrites the original
Titanic.csv file in the
training_data directory. To write back to storage, you need Storage Blob Data Contributor permissions. Learn more about storage permissions and roles.
%% synapse df.write.format("csv").mode("overwrite").save("wasbs://firstname.lastname@example.org/training_data/Titanic.csv")
When you've completed data preparation and saved your prepared data to storage, stop using your Apache Spark pool with the following command.
Create dataset to represent prepared data
When you're ready to consume your prepared data for model training, connect to your storage with an Azure Machine Learning datastore, and specify which file(s) you want to use with an Azure Machine Learning dataset.
The following code example,
- Assumes you already created a datastore that connects to the storage service where you saved your prepared data.
- Gets that existing datastore,
mydatastore, from the workspace,
wswith the get() method.
- Creates a FileDataset,
train_ds, that references the prepared data files located in the
- Creates the variable
input1, which can be used at a later time to make the data files of the
train_dsdataset available to a compute target for your training tasks.
from azureml.core import Datastore, Dataset datastore = Datastore.get(ws, datastore_name='mydatastore') datastore_paths = [(datastore, '/training_data/')] train_ds = Dataset.File.from_files(path=datastore_paths, validate=True) input1 = train_ds.as_mount()
ScriptRunConfig to submit an experiment run to a Synapse Spark pool
Similarly, if you have an Azure Machine Learning pipeline, you can use the SynapseSparkStep to specify your Synapse Spark pool as the compute target for the data preparation step in your pipeline.
Making your data available to the Synapse Spark pool depends on your dataset type.
- For a FileDataset, you can use the
as_hdfs()method. When the run is submitted, the dataset is made available to the Synapse Spark pool as a Hadoop distributed file system (HFDS).
- For a TabularDataset, you can use the
The following code,
- Creates the variable
input2from the FileDataset
train_dsthat was created in the previous code example.
- Creates the variable
outputwith the HDFSOutputDatasetConfiguration class. After the run is complete, this class allows us to save the output of the run as the dataset,
testin the datastore,
mydatastore. In the Azure Machine Learning workspace, the
testdataset is registered under the name
- Configures settings the run should use in order to perform on the Synapse Spark pool.
- Defines the ScriptRunConfig parameters to,
- Use the
dataprep.py, for the run.
- Specify which data to use as input and how to make it available to the Synapse Spark pool.
- Specify where to store output data,
- Use the
from azureml.core import Dataset, HDFSOutputDatasetConfig from azureml.core import RunConfiguration from azureml.core import ScriptRunConfig from azureml.core import Experiment input2 = train_ds.as_hdfs() output = HDFSOutputDatasetConfig(destination=(datastore, "test").register_on_complete(name="registered_dataset") run_config = RunConfiguration(framework="pyspark") run_config.target = synapse_compute_name run_config.spark.configuration["spark.driver.memory"] = "1g" run_config.spark.configuration["spark.driver.cores"] = 2 run_config.spark.configuration["spark.executor.memory"] = "1g" run_config.spark.configuration["spark.executor.cores"] = 1 run_config.spark.configuration["spark.executor.instances"] = 1 run_config.environment.python.conda_dependencies = conda_dep script_run_config = ScriptRunConfig(source_directory = './code', script= 'dataprep.py', arguments = ["--file_input", input2, "--output_dir", output], run_config = run_config)
ScriptRunConfig object is set up, you can submit the run.
from azureml.core import Experiment exp = Experiment(workspace=ws, name="synapse-spark") run = exp.submit(config=script_run_config) run
For additional details, like the
dataprep.py script used in this example, see the example notebook.
After your data is prepared, you can then use it as input for your training jobs. In the aforementioned code example, the
registered_dataset is what you would specify as your input data for training jobs.
See the example notebooks for more concepts and demonstrations of the Azure Synapse Analytics and Azure Machine Learning integration capabilities.
- Run an interactive Spark session from a notebook in your Azure Machine Learning workspace.
- Submit an Azure Machine Learning experiment run with a Synapse Spark pool as your compute target.