您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

教程:使用 Apache Spark MLlib 和 Azure Synapse Analytics 构建机器学习应用Tutorial: Build a machine learning app with Apache Spark MLlib and Azure Synapse Analytics

本文介绍如何使用 Apache Spark MLlib 创建机器学习应用程序,该应用程序对 Azure 开放数据集执行简单的预测分析。In this article, you'll learn how to use Apache Spark MLlib to create a machine learning application that does simple predictive analysis on an Azure open dataset. Spark 提供内置机器学习库。Spark provides built-in machine learning libraries. 此示例通过逻辑回归使用分类。This example uses classification through logistic regression.

SparkML 和 MLlib 是核心 Spark 库,提供许多可用于机器学习任务的实用工具,包括适用于以下任务的实用工具:SparkML and MLlib are core Spark libraries that provides many utilities that are useful for machine learning tasks, including utilities that are suitable for:

  • 分类Classification
  • 回归Regression
  • 群集Clustering
  • 主题建模Topic modeling
  • 单值分解 (SVD) 和主体组件分析 (PCA)Singular value decomposition (SVD) and principal component analysis (PCA)
  • 假设测试和计算示例统计信息Hypothesis testing and calculating sample statistics

了解分类和逻辑回归Understand classification and logistic regression

分类 是一种常见的机器学习任务,是将输入数据按类别排序的过程。Classification, a popular machine learning task, is the process of sorting input data into categories. 它是一种分类算法的作业,旨在算出如何将标签分配到提供的输入数据。It is the job of a classification algorithm to figure out how to assign labels to input data that you provide. 例如,可以联想机器学习算法,该算法接受股票信息作为输入并将股票划分为两个类别:应该卖出的股票和应该保留的股票。For example, you can think of a machine learning algorithm that accepts stock information as input and divides the stock into two categories: stocks that you should sell and stocks that you should keep.

逻辑回归是可用于分类的算法。Logistic regression is an algorithm that you can use for classification. Spark 的逻辑回归 API 可用于 二元分类,或将输入数据归类到两组中的一组。Spark's logistic regression API is useful for binary classification, or classifying input data into one of two groups. 有关逻辑回归的详细信息,请参阅维基百科For more information about logistic regressions, see Wikipedia.

总之,逻辑回归的过程会产生 逻辑函数,可用于预测输入向量属于一个组或另一个组的概率。In summary, the process of logistic regression produces a logistic function that can be used to predict the probability that an input vector belongs in one group or the other.

NYC 出租车数据的预测分析示例Predictive analysis example on NYC Taxi data

在此示例中,使用 Spark 对纽约的出租车行程提示数据执行一些预测分析。In this example, you use Spark to perform some predictive analysis on taxi trip tip data from New York. 数据通过 Azure 开放数据集提供。The data is available through Azure Open Datasets. 此数据集的子集包含有关黄色出租车行程的信息,其中包括有关每次行程、开始和结束时间、位置、成本和其他感兴趣属性的信息。This subset of the dataset contains information about yellow taxi trips, including information about each trip, the start and end time and locations, the cost, and other interesting attributes.


从存储位置拉取这些数据可能会产生额外的费用。There may be additional charges for pulling this data from its storage location.

在下面的步骤中,你将开发一个模型来预测特定行程是否包含提示。In the following steps, you develop a model to predict whether a particular trip includes a tip or not.

创建 Apache Spark 机器学习模型Create an Apache Spark machine learning model

  1. 使用 PySpark 内核创建笔记本。Create a notebook using the PySpark kernel. 有关说明,请参阅创建笔记本For the instructions, see Create a notebook.

  2. 导入此应用程序所需的类型。Import the types required for this application. 将以下代码复制粘贴到一个空单元格中,然后按 Shift+Enter,或使用代码左侧的蓝色播放图标运行该单元格。Copy and paste the following code into an empty cell, and then press SHIFT + ENTER, or run the cell by using the blue play icon to the left of the code.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator

    由于使用的是 PySpark 内核,因此不需要显式创建任何上下文。Because of the PySpark kernel, you do not need to create any contexts explicitly. 运行第一个代码单元格时,系统会自动创建 Spark 上下文。The Spark context is automatically created for you when you run the first code cell.

构造输入数据帧Construct the input dataframe

由于原始数据是 Parquet 格式,因此可以使用 Spark 上下文直接将文件作为数据帧提取到内存中。Because the raw data is in a Parquet format, you can use the Spark context to pull the file into memory as a dataframe directly. 尽管下面的代码使用默认选项,但如果需要,可以强制映射数据类型和其他架构属性。While the code below uses the default options, it is possible to force mapping of data types and other schema attributes if needed.

  1. 通过将代码粘贴到新单元格,运行以下行来创建 Spark 数据帧。Run the following lines to create a Spark dataframe by pasting the code into a new cell. 该过程会通过开放数据集 API 检索数据。This retrieves the data via the Open Datasets API. 拉取所有这些数据将生成约 15 亿行。Pulling all of this data generates about 1.5 billion rows. 根据无服务器 Apache Spark 池的大小,原始数据可能太大或需要花费太长时间来操作。Depending on the size of your serverless Apache Spark pool, the raw data may be too large or take too much time to operate on. 可以将此数据筛选为较小的数据。You can filter this data down to something smaller. 下面的代码示例使用 start_date 和 end_date 应用一个会返回单个月份数据的筛选器。The following code example uses start_date and end_date to apply a filter that returns a single month of data.

    from azureml.opendatasets import NycTlcYellow
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    filtered_df = nyc_tlc.to_spark_dataframe()
  2. 简单筛选的缺点在于,从统计的角度来看,它可能会导致数据偏差。The downside to simple filtering is that, from a statistical perspective, it may introduce bias into the data. 另一种方法是使用 Spark 中内置的采样。Another approach is to use the sampling built into Spark. 如果在上面的代码之后应用以下代码,则数据集将减少到大约 2000 行。The following code reduces the dataset down to about 2000 rows, if applied after the code above. 此采样步骤可代替简单筛选器,也可与简单筛选器结合使用。This sampling step can be used instead of the simple filter or in conjunction with the simple filter.

    # To make development easier, faster and less expensive down sample for now
    sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)
  3. 现在可以查看数据以查看读取的内容。It is now possible to look at the data to see what was read. 通常最好使用子集而不是完整集查看数据,具体取决于数据集的大小。It is normally better to review data with a subset rather than the full set depending on the size of the dataset. 下面的代码提供了两种查看数据的方法:前者是基本方法,后者提供了更丰富的网格体验以及以图形方式直观显示数据的功能。The following code offers two ways to view the data: the former being basic and the latter providing a much richer grid experience, as well as the capability to visualize the data graphically.

  4. 根据生成的数据集大小和多次试验或运行笔记本的需要,建议在工作区本地缓存数据集。Depending on the size of the dataset size generated, and your need to experiment or run the notebook many times, it may be advisable to cache the dataset locally in the workspace. 有三种方法可以执行显式缓存:There are three ways to perform explicit caching:

    • 将数据帧作为文件本地保存Save the dataframe locally as a file
    • 将数据帧另存为临时表或视图Save the dataframe as a temporary table or view
    • 将数据帧另存为永久表Save the dataframe as a permanent table

下面的代码示例中包含了这些方法中的前两种方法。The first 2 of these approaches are included in the following code examples.

创建临时表或视图可提供访问数据的不同路径,但仅在 Spark 实例会话期间有效。Creating a temp table or view provides different access paths to the data, but only lasts for the duration of the Spark instance session.


准备数据Prepare the data

原始格式的数据通常不适合直接传递给模型。The data in its raw form is frequently not suitable for passing directly to a model. 必须对数据执行一系列操作,使其变为模型可以使用的状态。A series of actions must be performed on the data to get it into a state where the model can consume it.

在下面的代码中,将执行以下四类操作:In the code below four classes of operations are performed:

  • 通过筛选删除离群值/错误值。The removal of outliers/incorrect values through filtering.
  • 删除不需要的列。The removal of columns, which are not needed.
  • 创建从原始数据派生的新列,使模型更有效地工作,有时称为特征化。The creation of new columns derived from the raw data to make the model work more effectively, sometimes called featurization.
  • 标记 - 因为在进行二进制分类(给定行程中是否有提示)时,需要将提示数量转换为值 0 或 1。Labeling - since you are undertaking binary classification (will there be a tip or not on a given trip) there is a need to convert the tip amount into a 0 or 1 value.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                                , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                                , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                                , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                                , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                                , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                                & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                                & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                                & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                                & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                                & (sampled_taxi_df.rateCodeId <= 5)
                                & (sampled_taxi_df.paymentType.isin({"1", "2"}))

然后,将对数据进行第二次传递以添加最终功能。A second pass is then made over the data to add the final features.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

创建逻辑回归模型Create a logistic regression model

最后一项任务是将标签数据转换为逻辑回归可分析的格式。The final task is to convert the labeled data into a format that can be analyzed by logistic regression. 逻辑回归算法的输入需是一组标签特征矢量对,其中特征矢量是表示输入点的数字矢量 。The input to a logistic regression algorithm needs to be a set of label-feature vector pairs, where the feature vector is a vector of numbers representing the input point. 因此,需要将分类列转换为数字。So, we need to convert the categorical columns into numbers. trafficTimeBinsweekdayString 列需要转换为整数表示形式。The trafficTimeBins and weekdayString columns need converting into integer representations. 有多种方法可执行转换,但在此示例中采用常用方法 OneHotEncoding。There are multiple approaches to performing the conversion, however the approach taken in this example is OneHotEncoding, a common approach.

# Since the sample uses an algorithm that only works with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new dataframe that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

此操作会生成一个新的数据帧,其中所有列都采用正确的格式来训练模型。This action results in a new dataframe with all columns in the right format to train a model.

训练逻辑回归模型Train a logistic regression model

第一个任务是将数据集拆分为训练集、测试集或验证集。The first task is to split the dataset into a training set and a testing or validation set. 此处的拆分是任意的,你应该使用不同的拆分设置来查看它们是否影响模型。The split here is arbitrary and you should play around with different split settings to see if they impact the model.

#Decide on the split between training and testing data from the dataframe
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the dataframe into test and training dataframes
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

现在有两个数据帧,下一个任务就是创建模型公式并针对训练数据帧运行公式,然后针对测试数据帧进行验证。Now that there are two DataFrames, the next task is to create the model formula and run it against the training DataFrame, then validate against the testing DataFrame. 你应该试验不同版本的模型公式,以查看不同组合的影响。You should experiment with different versions of the model formula to see the impact of different combinations.


要保存模型,需要存储 Blob 数据参与者 Azure 角色。To save the model, you will need the Storage Blob Data Contributor Azure role. 在存储帐户下,导航到“访问控制(IAM)”,然后选择“添加角色分配”。Under your storage account, navigate to Access Control (IAM), and select Add role assignment. 向 SQL Database 服务器分配存储 Blob 数据参与者 Azure 角色。Assign Storage Blob Data Contributor Azure role to your SQL Database server. 只有具有“所有者”特权的成员能够执行此步骤。Only members with Owner privilege can perform this step. 有关各种 Azure 内置角色,请参阅此指南For various Azure built-in roles, refer to this guide.

## Create a new LR object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create an LR model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Saving the model is optional but its another form of inter session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName

## Predict tip 1/0 (yes/no) on the test dataset, evaluation using AUROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

此单元格的输出为:The output from this cell is:

Area under ROC = 0.9779470729751403

创建预测的直观表示形式Create a visual representation of the prediction

现在可以构造一个最终可视化效果,以帮助推理此测试的结果。You can now construct a final visualization to help you reason about the results of this test. ROC 曲线是查看结果的一种方法。An ROC Curve is one way to review the result.

## Plot the ROC curve, no need for pandas as this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')

逻辑回归提示模型的 ROC 曲线

关闭 Spark 实例Shut down the Spark instance

运行完应用程序后,通过关闭选项卡或在笔记本底部的状态面板中选择“结束会话”,关闭笔记本以释放资源。After you have finished running the application, shut down the notebook to release the resources by closing the tab or select End Session from the status panel at the bottom of the notebook.

另请参阅See also

后续步骤Next steps


某些 Apache Spark 官方文档依赖于使用 Spark 控制台,但该控制台在 Azure Synapse Spark 中不可用。Some of the official Apache Spark documentation relies on using the Spark console, which is not available on Azure Synapse Spark. 请改用笔记本IntelliJ 体验。Use the notebook or IntelliJ experiences instead.