您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

教程:在 Azure HDInsight 的 Spark 计算上下文中使用 RTutorial: Use R in a Spark compute context in Azure HDInsight

本教程逐步介绍如何在 Azure HDInsight 中的机器学习服务群集上运行的 Apache Spark 中使用 R 函数。This tutorial provides a step-by-step introduction to using the R functions in Apache Spark that run on an Azure HDInsight Machine Learning services cluster.

在本教程中,你将了解如何执行以下操作:In this tutorial, you learn how to:

  • 将示例数据下载到本地存储Download the sample data to local storage
  • 将数据复制到默认存储Copy the data to default storage
  • 设置数据集Set up a dataset
  • 创建数据源Create data sources
  • 创建 Spark 的计算上下文Create a compute context for Spark
  • 拟合线性模型Fit a linear model
  • 使用复合 XDF 文件Use composite XDF files
  • 将 XDF 转换为 CSVConvert XDF to CSV

先决条件Prerequisites

连接到 RStudio ServerConnect to RStudio Server

RStudio Server 在群集的边缘节点上运行。RStudio Server runs on the cluster’s edge node. 转到以下站点(其中,URL 的 CLUSTERNAME 是创建的 HDInsight 机器学习服务群集的名称):Go to the following site (where CLUSTERNAME in the URL is the name of the HDInsight Machine Learning services cluster you created):

https://CLUSTERNAME.azurehdinsight.net/rstudio/

首次登录时需要进行两次身份验证。The first time you sign in, you authenticate twice. 对于第一次身份验证提示,请提供群集管理员用户名和密码(默认为 admin)。At the first authentication prompt, provide the cluster admin username and password (the default is admin). 对于第一次身份验证提示,请提供 SSH 用户名和密码(默认为 sshuser)。At the second authentication prompt, provide the SSH username and password (the default is sshuser). 后续登录只需提供 SSH 凭据。Subsequent sign-ins require only the SSH credentials.

将示例数据下载到本地存储Download the sample data to local storage

“2012 年航班准时性数据集”由 12 个逗号分隔的文件组成,这些文件包含 2012 年美国所有商务航班的抵达和出发详细信息。 The Airline 2012 On-Time Data Set consists of 12 comma-separated files that contain flight arrival and departure details for all commercial flights within the US for the year 2012. 此数据集很大,包含 600 万条以上的观测结果。This dataset is large, with over 6 million observations.

  1. 初始化几个环境变量。Initialize a few environment variables. 在 RStudio Server 控制台中输入以下代码:In the RStudio Server console, enter the following code:

    bigDataDirRoot <- "/tutorial/data" # root directory on cluster default storage
    localDir <- "/tmp/AirOnTimeCSV2012" # directory on edge node
    remoteDir <- "https://packages.revolutionanalytics.com/datasets/AirOnTimeCSV2012" # location of data
    
  2. 在右窗格中选择“环境”选项卡。 变量显示在“值”下面。 In the right pane, select the Environment tab. The variables are displayed under Values.

    HDInsight R studio Web 控制台

  3. 创建本地目录并下载示例数据。Create a local directory, and download the sample data. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    # Create local directory
    dir.create(localDir)
    
    # Download data to the tmp folder(local)
    download.file(file.path(remoteDir, "airOT201201.csv"), file.path(localDir, "airOT201201.csv"))
    download.file(file.path(remoteDir, "airOT201202.csv"), file.path(localDir, "airOT201202.csv"))
    download.file(file.path(remoteDir, "airOT201203.csv"), file.path(localDir, "airOT201203.csv"))
    download.file(file.path(remoteDir, "airOT201204.csv"), file.path(localDir, "airOT201204.csv"))
    download.file(file.path(remoteDir, "airOT201205.csv"), file.path(localDir, "airOT201205.csv"))
    download.file(file.path(remoteDir, "airOT201206.csv"), file.path(localDir, "airOT201206.csv"))
    download.file(file.path(remoteDir, "airOT201207.csv"), file.path(localDir, "airOT201207.csv"))
    download.file(file.path(remoteDir, "airOT201208.csv"), file.path(localDir, "airOT201208.csv"))
    download.file(file.path(remoteDir, "airOT201209.csv"), file.path(localDir, "airOT201209.csv"))
    download.file(file.path(remoteDir, "airOT201210.csv"), file.path(localDir, "airOT201210.csv"))
    download.file(file.path(remoteDir, "airOT201211.csv"), file.path(localDir, "airOT201211.csv"))
    download.file(file.path(remoteDir, "airOT201212.csv"), file.path(localDir, "airOT201212.csv"))
    

    完成下载大约需要 9.5 分钟。The download should be complete in about 9.5 minutes.

将数据复制到默认存储Copy the data to default storage

使用 airDataDir 变量指定 Hadoop 分布式文件系统 (HDFS) 位置。The Hadoop Distributed File System (HDFS) location is specified with the airDataDir variable. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

# Set directory in bigDataDirRoot to load the data into
airDataDir <- file.path(bigDataDirRoot,"AirOnTimeCSV2012")

# Create directory (default storage)
rxHadoopMakeDir(airDataDir)

# Copy data from local storage to default storage
rxHadoopCopyFromLocal(localDir, bigDataDirRoot)
    
# Optional. Verify files
rxHadoopListFiles(airDataDir)

该步骤在大约 10 秒内应会完成。The step should be complete in about 10 seconds.

设置数据集Set up a dataset

  1. 创建使用默认值的文件系统对象。Create a file system object that uses the default values. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    # Define the HDFS (WASB) file system
    hdfsFS <- RxHdfsFileSystem()
    
  2. 由于原始 CSV 文件包含的变量名称比较杂乱,因此你提供了 colInfo 列表使其变得更加整齐有序。Because the original CSV files have rather unwieldy variable names, you supply a colInfo list to make them more manageable. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    airlineColInfo <- list(
         MONTH = list(newName = "Month", type = "integer"),
        DAY_OF_WEEK = list(newName = "DayOfWeek", type = "factor",
            levels = as.character(1:7),
            newLevels = c("Mon", "Tues", "Wed", "Thur", "Fri", "Sat",
                          "Sun")),
        UNIQUE_CARRIER = list(newName = "UniqueCarrier", type =
                                "factor"),
        ORIGIN = list(newName = "Origin", type = "factor"),
        DEST = list(newName = "Dest", type = "factor"),
        CRS_DEP_TIME = list(newName = "CRSDepTime", type = "integer"),
        DEP_TIME = list(newName = "DepTime", type = "integer"),
        DEP_DELAY = list(newName = "DepDelay", type = "integer"),
        DEP_DELAY_NEW = list(newName = "DepDelayMinutes", type =
                             "integer"),
        DEP_DEL15 = list(newName = "DepDel15", type = "logical"),
        DEP_DELAY_GROUP = list(newName = "DepDelayGroups", type =
                               "factor",
           levels = as.character(-2:12),
           newLevels = c("< -15", "-15 to -1","0 to 14", "15 to 29",
                         "30 to 44", "45 to 59", "60 to 74",
                         "75 to 89", "90 to 104", "105 to 119",
                         "120 to 134", "135 to 149", "150 to 164",
                         "165 to 179", ">= 180")),
        ARR_DELAY = list(newName = "ArrDelay", type = "integer"),
        ARR_DELAY_NEW = list(newName = "ArrDelayMinutes", type =
                             "integer"),  
        ARR_DEL15 = list(newName = "ArrDel15", type = "logical"),
        AIR_TIME = list(newName = "AirTime", type =  "integer"),
        DISTANCE = list(newName = "Distance", type = "integer"),
        DISTANCE_GROUP = list(newName = "DistanceGroup", type =
                             "factor",
         levels = as.character(1:11),
         newLevels = c("< 250", "250-499", "500-749", "750-999",
             "1000-1249", "1250-1499", "1500-1749", "1750-1999",
             "2000-2249", "2250-2499", ">= 2500")))
    
    varNames <- names(airlineColInfo)
    

创建数据源Create data sources

在 Spark 计算上下文中,可以使用以下函数创建数据源:In a Spark compute context, you can create data sources by using the following functions:

函数Function 说明Description
RxTextData 逗号分隔的文本数据源。A comma-delimited text data source.
RxXdfData 采用 XDF 数据文件格式的数据。Data in the XDF data file format. 在 RevoScaleR 中,XDF 文件格式已针对 Hadoop 进行修改,将在一组复合文件而不是单个文件中存储数据。In RevoScaleR, the XDF file format is modified for Hadoop to store data in a composite set of files rather than a single file.
RxHiveData 生成 Hive 数据源对象。Generates a Hive Data Source object.
RxParquetData 生成 Parquet 数据源对象。Generates a Parquet Data Source object.
RxOrcData 生成 Orc 数据源对象。Generates an Orc Data Source object.

使用已复制到 HDFS 的文件创建 RxTextData 对象。Create an RxTextData object by using the files you copied to HDFS. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

airDS <- RxTextData( airDataDir,
                        colInfo = airlineColInfo,
                        varsToKeep = varNames,
                        fileSystem = hdfsFS ) 

创建 Spark 的计算上下文Create a compute context for Spark

若要在工作器节点上加载数据和运行分析,请将脚本中的计算上下文设置为 RxSparkTo load data and run analyses on worker nodes, you set the compute context in your script to RxSpark. 在此上下文中,R 函数会自动在所有工作器节点之间分配工作负荷,不需要满足有关管理作业或队列的固有要求。In this context, R functions automatically distribute the workload across all the worker nodes, with no built-in requirement for managing jobs or the queue. 通过 RxSparkrxSparkConnect() 建立 Spark 计算上下文,使用 rxSparkDisconnect() 返回到本地计算上下文。The Spark compute context is established through RxSpark or rxSparkConnect() to create the Spark compute context, and it uses rxSparkDisconnect() to return to a local compute context. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

# Define the Spark compute context
mySparkCluster <- RxSpark()

# Set the compute context
rxSetComputeContext(mySparkCluster)

拟合线性模型Fit a linear model

  1. 使用 rxLinMod 函数通过 airDS 数据源拟合线性模型。Use the rxLinMod function to fit a linear model using your airDS data source. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    system.time(
         delayArr <- rxLinMod(ArrDelay ~ DayOfWeek, data = airDS,
              cube = TRUE)
    )
    

    此步骤在 2 到 3 分钟内应可完成。This step should be complete in 2 to 3 minutes.

  2. 查看结果。View the results. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    summary(delayArr)
    

    应该看到以下结果:You should see the following results:

    Call:
    rxLinMod(formula = ArrDelay ~ DayOfWeek, data = airDS, cube = TRUE)
    
    Cube Linear Regression Results for: ArrDelay ~ DayOfWeek
    Data: airDataXdf (RxXdfData Data Source)
    File name: /tutorial/data/AirOnTimeCSV2012
    Dependent variable(s): ArrDelay
    Total independent variables: 7 
    Number of valid observations: 6005381
    Number of missing observations: 91381 
    
    Coefficients:
                   Estimate Std. Error t value Pr(>|t|)     | Counts
    DayOfWeek=Mon   3.54210    0.03736   94.80 2.22e-16 *** | 901592
    DayOfWeek=Tues  1.80696    0.03835   47.12 2.22e-16 *** | 855805
    DayOfWeek=Wed   2.19424    0.03807   57.64 2.22e-16 *** | 868505
    DayOfWeek=Thur  4.65502    0.03757  123.90 2.22e-16 *** | 891674
    DayOfWeek=Fri   5.64402    0.03747  150.62 2.22e-16 *** | 896495
    DayOfWeek=Sat   0.91008    0.04144   21.96 2.22e-16 *** | 732944
    DayOfWeek=Sun   2.82780    0.03829   73.84 2.22e-16 *** | 858366
    ---
    Signif. codes:  0 ‘***’ 0.001 ‘**’ 0.01 ‘*’ 0.05 ‘.’ 0.1 ‘ ’ 1
    
    Residual standard error: 35.48 on 6005374 degrees of freedom
    Multiple R-squared: 0.001827 (as if intercept included)
    Adjusted R-squared: 0.001826 
    F-statistic:  1832 on 6 and 6005374 DF,  p-value: < 2.2e-16 
    Condition number: 1 
    

    结果指示你已使用指定目录中的所有 CSV 文件处理了所有数据 - 600 万条观测结果。The results indicate that you've processed all the data, 6 million observations, using all the CSV files in the specified directory. 由于指定了 cube = TRUE,我们获得了每个星期日期的估计系数(而不是截距)。Because you specified cube = TRUE, you have an estimated coefficient for each day of the week (and not the intercept).

使用复合 XDF 文件Use composite XDF files

我们知道,可以直接在 Hadoop 上使用 R 分析 CSV 文件。As you've seen, you can analyze CSV files directly with R on Hadoop. 但如果以更有效的格式存储数据,则可以更快地执行分析。But you can do the analysis more quickly if you store the data in a more efficient format. R XDF 格式非常高效,但已针对 HDFS 进行一定程度的修改,目的是使各个文件保留在单个 HDFS 块中。The R XDF file format is efficient, but it's modified somewhat for HDFS so that individual files remain within a single HDFS block. (HDFS 块大小根据安装的不同而异,但通常为 64 MB 或 128 MB。)(The HDFS block size varies from installation to installation but is typically either 64 MB or 128 MB.)

在 Hadoop 上使用 rxImport 创建一组复合 XDF 文件时,请指定某个 RxTextData 数据源(例如 AirDS)作为 inData 参数,并指定某个 RxXdfData 数据源(其 fileSystem 设置为某个 HDFS 文件系统)作为 outFile 参数。When you use rxImport on Hadoop to create a set of composite XDF files, you specify an RxTextData data source such as AirDS as the inData and an RxXdfData data source with fileSystem set to an HDFS file system as the outFile argument. 然后,可以使用 RxXdfData 对象作为后续 R 分析中的数据参数。You can then use the RxXdfData object as the data argument in subsequent R analyses.

  1. 定义 RxXdfData 对象。Define an RxXdfData object. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    airDataXdfDir <- file.path(bigDataDirRoot,"AirOnTimeXDF2012")
    
    airDataXdf <- RxXdfData( airDataXdfDir,
                            fileSystem = hdfsFS )
    
  2. 设置 250000 行的块大小,并指定我们要读取所有数据。Set a block size of 250000 rows and specify that we read all the data. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    blockSize <- 250000
    numRowsToRead = -1
    
  3. 使用 rxImport 导入数据。Import the data using rxImport. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    rxImport(inData = airDS,
             outFile = airDataXdf,
             rowsPerRead = blockSize,
             overwrite = TRUE,
             numRows = numRowsToRead )
    

    此步骤在几分钟内应可完成。This step should be complete in a few minutes.

  4. 使用新的更快的数据源重新评估同一线性模型。Re-estimate the same linear model, using the new, faster data source. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    system.time(
         delayArr <- rxLinMod(ArrDelay ~ DayOfWeek, data = airDataXdf,
              cube = TRUE)
    )
    

    该步骤在一分钟内应可完成。The step should be complete in less than a minute.

  5. 查看结果。View the results. 结果应与 CSV 文件中的内容相同。The results should be the same as from the CSV files. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    summary(delayArr)
    

将 XDF 转换为 CSVConvert XDF to CSV

在 Spark 上下文中In a Spark context

如果你已将 CSV 文件转换为 XDF 文件格式以便在运行分析时提高效率,但现在想要将数据转换回到 CSV,可以使用 rxDataStep 来实现此目的。If you converted your CSV files to XDF file format for greater efficiency while running the analyses, but now want to convert your data back to CSV, you can do so by using rxDataStep.

若要创建 CSV 文件的文件夹,请先使用目录名称作为文件参数创建 RxTextData 对象。To create a folder of CSV files, first create an RxTextData object by using a directory name as the file argument. 此对象表示要在其中创建 CSV 文件的文件夹。This object represents the folder in which to create the CSV files. 运行 rxDataStep 时已创建此目录。This directory is created when you run the rxDataStep. 然后,在 rxDataStepoutFile 参数中指向此 RxTextData 对象。Then, point to this RxTextData object in the outFile argument of the rxDataStep. 创建的每个 CSV 根据目录名称命名,后接一个数字。Each CSV that's created is named based on the directory name and followed by a number.

假设你在执行逻辑回归和预测后要从 airDataXdf 复合 XDF 写出 HDFS 中 CSV 文件的文件夹,使新的 CSV 文件包含预测值和残差。Suppose that you want to write out a folder of CSV files in HDFS from your airDataXdf composite XDF after you perform the logistic regression and prediction, so that the new CSV files contain the predicted values and residuals. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

airDataCsvDir <- file.path(bigDataDirRoot,"AirDataCSV2012")
airDataCsvDS <- RxTextData(airDataCsvDir,fileSystem=hdfsFS)
rxDataStep(inData=airDataXdf, outFile=airDataCsvDS)

此步骤在大约 2.5 分钟内应可完成。This step should be complete in about 2.5 minutes.

rxDataStep 为输入复合 XDF 文件中的每个 XDFD 文件写出了一个 CSV 文件。The rxDataStep wrote out one CSV file for every XDFD file in the input composite XDF file. 这是在将计算上下文设置为 RxSpark 时,将复合 XDF 文件中的 CSV 文件写入 HDFS 的默认行为。This is the default behavior for writing CSV files from composite XDF files to HDFS when the compute context is set to RxSpark.

在本地上下文中In a local context

或者,在执行完分析后,可将计算上下文切换回到 local,并利用 RxTextData 中的以下两个参数,这样,在将 CSV 文件写出到 HDFS 时可以稍微提高控制度:createFileSetrowsPerOutFileAlternatively, when you're done performing your analyses, you could switch your compute context back to local to take advantage of two arguments within RxTextData that give you slightly more control when you write out CSV files to HDFS: createFileSet and rowsPerOutFile. createFileSet 设置为 TRUE 时,CSV 文件的文件夹将写入到指定的目录。When you set createFileSet to TRUE, a folder of CSV files is written to the directory that you specify. createFileSet 设置为 FALSE 时,将写入单个 CSV 文件。When you set createFileSet to FALSE, a single CSV file is written. 可将第二个参数 rowsPerOutFile 设置为整数,指示当 createFileSetTRUE 时,要将多少行写入每个 CSV 文件。You can set the second argument, rowsPerOutFile, to an integer to indicate how many rows to write to each CSV file when createFileSet is TRUE.

在 RStudio 中输入以下代码:In RStudio, enter the following code:

rxSetComputeContext("local")
airDataCsvRowsDir <- file.path(bigDataDirRoot,"AirDataCSVRows2012")
airDataCsvRowsDS <- RxTextData(airDataCsvRowsDir, fileSystem=hdfsFS, createFileSet=TRUE, rowsPerOutFile=1000000)
rxDataStep(inData=airDataXdf, outFile=airDataCsvRowsDS)

此步骤在大约 10 分钟内应可完成。This step should be complete in about 10 minutes.

使用 RxSpark 计算上下文时,createFileSet 默认为 TRUErowsPerOutFile 不起作用。When you use an RxSpark compute context, createFileSet defaults to TRUE and rowsPerOutFile has no effect. 因此,若要创建单个 CSV 或自定义每个文件的行数,请在 local 计算上下文中执行 rxDataStep(数据仍可在 HDFS 中)。Therefore, if you want to create a single CSV or customize the number of rows per file, perform rxDataStep in a local compute context (the data can still be in HDFS).

最后的步骤Final steps

  1. 清理数据。Clean up the data. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    rxHadoopRemoveDir(airDataDir)
    rxHadoopRemoveDir(airDataXdfDir)
    rxHadoopRemoveDir(airDataCsvDir)
    rxHadoopRemoveDir(airDataCsvRowsDir)
    rxHadoopRemoveDir(bigDataDirRoot)
    
  2. 停止远程 Spark 应用程序。Stop the remote Spark application. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    rxStopEngine(mySparkCluster)
    
  3. 退出 R 会话。Quit the R session. 在 RStudio 中输入以下代码:In RStudio, enter the following code:

    quit()
    

清理资源Clean up resources

完成教程之后,可能要删除群集。After you complete the tutorial, you might want to delete the cluster. 使用 HDInsight 可将数据存储在 Azure 存储中,因此可以在不使用群集时安全地删除群集。With HDInsight, your data is stored in Azure Storage, so you can safely delete a cluster when it's not in use. 此外,还需要为 HDInsight 群集付费,即使不用也是如此。You're also charged for an HDInsight cluster, even when it's not in use. 由于群集费用数倍于存储空间费用,因此在群集不用时删除群集可以节省费用。Because the charges for the cluster are many times more than the charges for storage, it makes economic sense to delete clusters when they're not in use.

若要删除群集,请参阅使用浏览器、PowerShell 或 Azure CLI 删除 HDInsight 群集To delete a cluster, see Delete an HDInsight cluster by using your browser, PowerShell, or the Azure CLI.

后续步骤Next steps

本教程已介绍如何在 Azure HDInsight 机器学习服务群集上运行的 Apache Spark 中使用 R 函数。In this tutorial, you learned how to use R functions in Apache Spark that are running on an HDInsight Machine Learning services cluster. 有关详细信息,请参阅以下文章:For more information, see the following articles: