Uso combinato di ScaleR e SparkR in HDInsightCombine ScaleR and SparkR in HDInsight

Questo documento illustra come stimare i ritardi di arrivo dei voli usando un modello di regressione logistica ScaleR.This document shows how to predict flight arrival delays using a ScaleR logistic regression model. Nell'esempio vengono usati dati sul ritardo dei voli e dati sulle condizioni atmosferiche, uniti in join tramite SparkR.The example uses flight delay and weather data, joined using SparkR.

Sebbene vengano eseguiti nel motore di esecuzione di Hadoop Spark, entrambi i pacchetti sono bloccati da una condivisione dei dati in memoria in quanto ognuno di essi richiede le rispettive sessioni Spark.Although both packages run on Hadoop’s Spark execution engine, they are blocked from in-memory data sharing as they each require their own respective Spark sessions. Finché questo problema non verrà risolto in una versione futura di ML Server, la soluzione alternativa consiste nel mantenere sessioni di Spark non sovrapposte e scambiare i dati tramite file intermedi.Until this issue is addressed in an upcoming version of ML Server, the workaround is to maintain non-overlapping Spark sessions, and to exchange data through intermediate files. Le istruzioni riportate di seguito mostrano che questi requisiti sono semplici da rispettare.The instructions here show that these requirements are straightforward to achieve.

Questo esempio è stato condiviso inizialmente in un discorso tenuto a Strata 2016 da Mario Inchiosa e Roni Burd.This example was initially shared in a talk at Strata 2016 by Mario Inchiosa and Roni Burd. Il discorso è disponibile nel webinar Building a Scalable Data Science Platform with R (Compilazione di una piattaforma Data Science scalabile con R).You can find this talk at Building a Scalable Data Science Platform with R.

Il codice è stato scritto originariamente per ML Server in esecuzione in Spark in un cluster HDInsight di Azure.The code was originally written for ML Server running on Spark in an HDInsight cluster on Azure. Tuttavia, il concetto di combinazione dell'uso di SparkR e ScaleR in un unico script è valido anche nel contesto di ambienti locali.But the concept of mixing the use of SparkR and ScaleR in one script is also valid in the context of on-premises environments.

I passaggi in questo documento presuppongono un livello intermedio di conoscenza di R e della libreria ScaleR di ML Server.The steps in this document assume that you have an intermediate level of knowledge of R and R the ScaleR library of ML Server. Durante l'analisi di questo scenario si introdurrà anche SparkR.You are introduced to SparkR while walking through this scenario.

Set di dati relativi alle compagnie aeree e alle previsioni meteoThe airline and weather datasets

I dati di volo sono disponibili negli archivi del governo statunitense.The flight data is available from the U.S. government archives. Sono inoltre disponibili come file ZIP AirOnTimeCSV.zip.It is also available as a zip from AirOnTimeCSV.zip.

I dati meteo suddivisi per mese possono essere scaricati come file ZIP in formato non elaborato dal repository della National Oceanic and Atmospheric Administration.The weather data can be downloaded as zip files in raw form, by month, from the National Oceanic and Atmospheric Administration repository. Per questo esempio, scaricare i dati per maggio 2007 - dicembre 2012.For this example, download the data for May 2007 – December 2012. Usare i file dei dati orari e il file YYYYMMMstation.txt all'interno di ognuno degli ZIP.Use the hourly data files and YYYYMMMstation.txt file within each of the zips.

Configurazione dell'ambiente SparkSetting up the Spark environment

Usare il codice seguente per configurare l'ambiente Spark:Use the following code to set up the Spark environment:

workDir        <- '~'  
myNameNode     <- 'default' 
myPort         <- 0
inputDataDir   <- 'wasb://hdfs@myAzureAcccount.blob.core.windows.net'
hdfsFS         <- RxHdfsFileSystem(hostName=myNameNode, port=myPort)

# create a persistent Spark session to reduce startup times 
#   (remember to stop it later!)

sparkCC        <- RxSpark(consoleOutput=TRUE, nameNode=myNameNode, port=myPort, persistentRun=TRUE)

# create working directories 

rxHadoopMakeDir('/user')
rxHadoopMakeDir('user/RevoShare')
rxHadoopMakeDir('user/RevoShare/remoteuser')

(dataDir <- '/share')
rxHadoopMakeDir(dataDir)
rxHadoopListFiles(dataDir) 

setwd(workDir)
getwd()

# version of rxRoc that runs in a local CC 
rxRoc <- function(...){
  rxSetComputeContext(RxLocalSeq())
  roc <- RevoScaleR::rxRoc(...)
  rxSetComputeContext(sparkCC)
  return(roc)
}

logmsg <- function(msg) { cat(format(Sys.time(), "%Y-%m-%d %H:%M:%S"),':',msg,'\n') } 
t0 <- proc.time() 

#..start 

logmsg('Start') 
(trackers <- system("mapred job -list-active-trackers", intern = TRUE))
logmsg(paste('Number of task nodes=',length(trackers)))

Successivamente, aggiungere Spark_Home al percorso di ricerca per i pacchetti R.Next, add Spark_Home to the search path for R packages. Questo consente di usare SparkR e inizializzare una sessione SparkR:Adding it to the search path allows you to use SparkR, and initialize a SparkR session:

#..setup for use of SparkR  

logmsg('Initialize SparkR') 

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)

sparkEnvir <- list(spark.executor.instances = '10',
                   spark.yarn.executor.memoryOverhead = '8000')

sc <- sparkR.init(
  sparkEnvir = sparkEnvir,
  sparkPackages = "com.databricks:spark-csv_2.10:1.3.0"
)

sqlContext <- sparkRSQL.init(sc)

Preparazione dei dati meteoPreparing the weather data

Per preparare i dati meteo, creare un subset per le colonne necessarie per la modellazione:To prepare the weather data, subset it to the columns needed for modeling:

  • "Visibility""Visibility"
  • "DryBulbCelsius""DryBulbCelsius"
  • "DewPointCelsius""DewPointCelsius"
  • "RelativeHumidity""RelativeHumidity"
  • "WindSpeed""WindSpeed"
  • "Altimeter""Altimeter"

Aggiungere quindi un codice aeroporto associato a una stazione meteo e convertire le misurazioni dall'ora locale in ora UTC.Then add an airport code associated with the weather station and convert the measurements from local time to UTC.

Iniziare creando un file per eseguire il mapping delle informazioni sulla stazione meteo (WBAN) al codice dell'aeroporto.Begin by creating a file to map the weather station (WBAN) info to an airport code. Il codice seguente legge ognuno dei file di dati relativi al meteo non elaborati per ora, crea i subset per le colonne necessari, unisce il file di mapping della stazione meteo, modifica orari e date dei valori in UTC e quindi scrive una nuova versione del file:The following code reads each of the hourly raw weather data files, subsets to the columns we need, merges the weather station mapping file, adjusts the date times of measurements to UTC, and then writes out a new version of the file:

# Look up AirportID and Timezone for WBAN (weather station ID) and adjust time

adjustTime <- function(dataList)
{
  dataset0 <- as.data.frame(dataList)

  dataset1 <- base::merge(dataset0, wbanToAirIDAndTZDF1, by = "WBAN")

  if(nrow(dataset1) == 0) {
    dataset1 <- data.frame(
      Visibility = numeric(0),
      DryBulbCelsius = numeric(0),
      DewPointCelsius = numeric(0),
      RelativeHumidity = numeric(0),
      WindSpeed = numeric(0),
      Altimeter = numeric(0),
      AdjustedYear = numeric(0),
      AdjustedMonth = numeric(0),
      AdjustedDay = integer(0),
      AdjustedHour = integer(0),
      AirportID = integer(0)
    )

    return(dataset1)
  }

  Year <- as.integer(substr(dataset1$Date, 1, 4))
  Month <- as.integer(substr(dataset1$Date, 5, 6))
  Day <- as.integer(substr(dataset1$Date, 7, 8))

  Time <- dataset1$Time
  Hour <- ceiling(Time/100)

  Timezone <- as.integer(dataset1$TimeZone)

  adjustdate = as.POSIXlt(sprintf("%4d-%02d-%02d %02d:00:00", Year, Month, Day, Hour), tz = "UTC") + Timezone * 3600

  AdjustedYear = as.POSIXlt(adjustdate)$year + 1900
  AdjustedMonth = as.POSIXlt(adjustdate)$mon + 1
  AdjustedDay   = as.POSIXlt(adjustdate)$mday
  AdjustedHour  = as.POSIXlt(adjustdate)$hour

  AirportID = dataset1$AirportID
  Weather = dataset1[,c("Visibility", "DryBulbCelsius", "DewPointCelsius", "RelativeHumidity", "WindSpeed", "Altimeter")]

  data.set = data.frame(cbind(AdjustedYear, AdjustedMonth, AdjustedDay, AdjustedHour, AirportID, Weather))

  return(data.set)
}

wbanToAirIDAndTZDF <- read.csv("wban-to-airport-id-tz.csv")

colInfo <- list(
  WBAN = list(type="integer"),
  Date = list(type="character"),
  Time = list(type="integer"),
  Visibility = list(type="numeric"),
  DryBulbCelsius = list(type="numeric"),
  DewPointCelsius = list(type="numeric"),
  RelativeHumidity = list(type="numeric"),
  WindSpeed = list(type="numeric"),
  Altimeter = list(type="numeric")
)

weatherDF <- RxTextData(file.path(inputDataDir, "WeatherRaw"), colInfo = colInfo)

weatherDF1 <- RxTextData(file.path(inputDataDir, "Weather"), colInfo = colInfo,
                filesystem=hdfsFS)

rxSetComputeContext("localpar")
rxDataStep(weatherDF, outFile = weatherDF1, rowsPerRead = 50000, overwrite = T,
           transformFunc = adjustTime,
           transformObjects = list(wbanToAirIDAndTZDF1 = wbanToAirIDAndTZDF))

Importazione dei dati relativi al meteo e alle compagnie aeree per i DataFrame di SparkImporting the airline and weather data to Spark DataFrames

A questo punto verrà usata la funzione SparkR read.df() per importare i dati relativi al meteo e alle compagnie aeree nei DataFrame di Spark.Now we use the SparkR read.df() function to import the weather and airline data to Spark DataFrames. Questa funzione, come molti altri metodi Spark, viene eseguita in modo differito, vale a dire che viene accodata per l'esecuzione, ma eseguita solo quando è necessario.This function, like many other Spark methods, are executed lazily, meaning that they are queued for execution but not executed until required.

airPath     <- file.path(inputDataDir, "AirOnTime08to12CSV")
weatherPath <- file.path(inputDataDir, "Weather") # pre-processed weather data
rxHadoopListFiles(airPath) 
rxHadoopListFiles(weatherPath) 

# create a SparkR DataFrame for the airline data

logmsg('create a SparkR DataFrame for the airline data') 
# use inferSchema = "false" for more robust parsing
airDF <- read.df(sqlContext, airPath, source = "com.databricks.spark.csv", 
                 header = "true", inferSchema = "false")

# Create a SparkR DataFrame for the weather data

logmsg('create a SparkR DataFrame for the weather data') 
weatherDF <- read.df(sqlContext, weatherPath, source = "com.databricks.spark.csv", 
                     header = "true", inferSchema = "true")

Pulizia e trasformazione dei datiData cleansing and transformation

Successivamente viene eseguita una pulizia dei dati relativi alle compagnie aeree importati per rinominare le colonne.Next we do some cleanup on the airline data we’ve imported to rename columns. Verranno tenute le sole variabili necessarie e gli orari di partenza pianificati saranno arrotondati all'ora più vicina per consentire l'unione con i dati meteo più recenti alla partenza:We only keep the variables needed, and round scheduled departure times down to the nearest hour to enable merging with the latest weather data at departure:

logmsg('clean the airline data') 
airDF <- rename(airDF,
                ArrDel15 = airDF$ARR_DEL15,
                Year = airDF$YEAR,
                Month = airDF$MONTH,
                DayofMonth = airDF$DAY_OF_MONTH,
                DayOfWeek = airDF$DAY_OF_WEEK,
                Carrier = airDF$UNIQUE_CARRIER,
                OriginAirportID = airDF$ORIGIN_AIRPORT_ID,
                DestAirportID = airDF$DEST_AIRPORT_ID,
                CRSDepTime = airDF$CRS_DEP_TIME,
                CRSArrTime =  airDF$CRS_ARR_TIME
)

# Select desired columns from the flight data. 
varsToKeep <- c("ArrDel15", "Year", "Month", "DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "CRSDepTime", "CRSArrTime")
airDF <- select(airDF, varsToKeep)

# Apply schema
coltypes(airDF) <- c("character", "integer", "integer", "integer", "integer", "character", "integer", "integer", "integer", "integer")

# Round down scheduled departure time to full hour.
airDF$CRSDepTime <- floor(airDF$CRSDepTime / 100)

A questo punto si eseguiranno operazioni simili sui dati meteo:Now we perform similar operations on the weather data:

# Average weather readings by hour
logmsg('clean the weather data') 
weatherDF <- agg(groupBy(weatherDF, "AdjustedYear", "AdjustedMonth", "AdjustedDay", "AdjustedHour", "AirportID"), Visibility="avg",
                  DryBulbCelsius="avg", DewPointCelsius="avg", RelativeHumidity="avg", WindSpeed="avg", Altimeter="avg"
                  )

weatherDF <- rename(weatherDF,
                    Visibility = weatherDF$'avg(Visibility)',
                    DryBulbCelsius = weatherDF$'avg(DryBulbCelsius)',
                    DewPointCelsius = weatherDF$'avg(DewPointCelsius)',
                    RelativeHumidity = weatherDF$'avg(RelativeHumidity)',
                    WindSpeed = weatherDF$'avg(WindSpeed)',
                    Altimeter = weatherDF$'avg(Altimeter)'
)

Unione in join dei dati meteo e delle compagnie aereeJoining the weather and airline data

La funzione join() di SparkR verrà ora usata per eseguire un left outer join dei dati sulle compagnie aeree e dei dati meteo in base ai valori di datetime e AirportID di partenza.We now use the SparkR join() function to do a left outer join of the airline and weather data by departure AirportID and datetime. L'outer join consente di conservare tutti i record dei dati relativi alle compagnie aeree, anche in assenza di dati meteo corrispondenti.The outer join allows us to retain all the airline data records even if there is no matching weather data. Dopo il join verranno rimosse alcune colonne ridondanti e verranno rinominate le colonne conservate per rimuovere il prefisso DataFrame introdotto dal join.Following the join, we remove some redundant columns, and rename the kept columns to remove the incoming DataFrame prefix introduced by the join.

logmsg('Join airline data with weather at Origin Airport')
joinedDF <- SparkR::join(
  airDF,
  weatherDF,
  airDF$OriginAirportID == weatherDF$AirportID &
    airDF$Year == weatherDF$AdjustedYear &
    airDF$Month == weatherDF$AdjustedMonth &
    airDF$DayofMonth == weatherDF$AdjustedDay &
    airDF$CRSDepTime == weatherDF$AdjustedHour,
  joinType = "left_outer"
)

# Remove redundant columns
vars <- names(joinedDF)
varsToDrop <- c('AdjustedYear', 'AdjustedMonth', 'AdjustedDay', 'AdjustedHour', 'AirportID')
varsToKeep <- vars[!(vars %in% varsToDrop)]
joinedDF1 <- select(joinedDF, varsToKeep)

joinedDF2 <- rename(joinedDF1,
                    VisibilityOrigin = joinedDF1$Visibility,
                    DryBulbCelsiusOrigin = joinedDF1$DryBulbCelsius,
                    DewPointCelsiusOrigin = joinedDF1$DewPointCelsius,
                    RelativeHumidityOrigin = joinedDF1$RelativeHumidity,
                    WindSpeedOrigin = joinedDF1$WindSpeed,
                    AltimeterOrigin = joinedDF1$Altimeter
)

In modo analogo verrà eseguito il join dei dati meteo e dei dati delle compagnie aeree in base ai valori datetime e AirportID di arrivo:In a similar fashion, we join the weather and airline data based on arrival AirportID and datetime:

logmsg('Join airline data with weather at Destination Airport')
joinedDF3 <- SparkR::join(
  joinedDF2,
  weatherDF,
  airDF$DestAirportID == weatherDF$AirportID &
    airDF$Year == weatherDF$AdjustedYear &
    airDF$Month == weatherDF$AdjustedMonth &
    airDF$DayofMonth == weatherDF$AdjustedDay &
    airDF$CRSDepTime == weatherDF$AdjustedHour,
  joinType = "left_outer"
)

# Remove redundant columns
vars <- names(joinedDF3)
varsToDrop <- c('AdjustedYear', 'AdjustedMonth', 'AdjustedDay', 'AdjustedHour', 'AirportID')
varsToKeep <- vars[!(vars %in% varsToDrop)]
joinedDF4 <- select(joinedDF3, varsToKeep)

joinedDF5 <- rename(joinedDF4,
                    VisibilityDest = joinedDF4$Visibility,
                    DryBulbCelsiusDest = joinedDF4$DryBulbCelsius,
                    DewPointCelsiusDest = joinedDF4$DewPointCelsius,
                    RelativeHumidityDest = joinedDF4$RelativeHumidity,
                    WindSpeedDest = joinedDF4$WindSpeed,
                    AltimeterDest = joinedDF4$Altimeter
                    )

Salvare i risultati in un file con formato CSV per lo scambio con ScaleRSave results to CSV for exchange with ScaleR

Questa azione completa i join che è necessario eseguire con SparkR.That completes the joins we need to do with SparkR. I dati verranno salvati da "joinedDF5" del DataFrame di Spark finale in un file CSV per ScaleR e quindi verrà chiusa la sessione di SparkR.We save the data from the final Spark DataFrame “joinedDF5” to a CSV for input to ScaleR and then close out the SparkR session. È possibile indicare in modo esplicito a SparkR di salvare il file CSV risultante in 80 partizioni separate per abilitare un parallelismo sufficiente durante l'elaborazione in ScaleR:We explicitly tell SparkR to save the resultant CSV in 80 separate partitions to enable sufficient parallelism in ScaleR processing:

logmsg('output the joined data from Spark to CSV') 
joinedDF5 <- repartition(joinedDF5, 80) # write.df below will produce this many CSVs

# write result to directory of CSVs
write.df(joinedDF5, file.path(dataDir, "joined5Csv"), "com.databricks.spark.csv", "overwrite", header = "true")

# We can shut down the SparkR Spark context now
sparkR.stop()

# remove non-data files
rxHadoopRemove(file.path(dataDir, "joined5Csv/_SUCCESS"))

Importare in XDF per l'uso da parte di ScaleRImport to XDF for use by ScaleR

È possibile usare direttamente il file CSV dei dati relativi al meteo e alle compagnie aeree aggiunti per la modellazione tramite un'origine dati di testo ScaleR.We could use the CSV file of joined airline and weather data as-is for modeling via a ScaleR text data source. Tuttavia, il file CSV dovrà prima essere importato con estensione XDF, in quanto è più efficiente quando si eseguono più operazioni sul set di dati:But we import it to XDF first, since it is more efficient when running multiple operations on the dataset:

logmsg('Import the CSV to compressed, binary XDF format') 

# set the Spark compute context for ML Services 
rxSetComputeContext(sparkCC)
rxGetComputeContext()

colInfo <- list(
  ArrDel15 = list(type="numeric"),
  Year = list(type="factor"),
  Month = list(type="factor"),
  DayofMonth = list(type="factor"),
  DayOfWeek = list(type="factor"),
  Carrier = list(type="factor"),
  OriginAirportID = list(type="factor"),
  DestAirportID = list(type="factor"),
  RelativeHumidityOrigin = list(type="numeric"),
  AltimeterOrigin = list(type="numeric"),
  DryBulbCelsiusOrigin = list(type="numeric"),
  WindSpeedOrigin = list(type="numeric"),
  VisibilityOrigin = list(type="numeric"),
  DewPointCelsiusOrigin = list(type="numeric"),
  RelativeHumidityDest = list(type="numeric"),
  AltimeterDest = list(type="numeric"),
  DryBulbCelsiusDest = list(type="numeric"),
  WindSpeedDest = list(type="numeric"),
  VisibilityDest = list(type="numeric"),
  DewPointCelsiusDest = list(type="numeric")
)

joinedDF5Txt <- RxTextData(file.path(dataDir, "joined5Csv"),
                           colInfo = colInfo, fileSystem = hdfsFS)
rxGetInfo(joinedDF5Txt) 

destData <- RxXdfData(file.path(dataDir, "joined5XDF"), fileSystem = hdfsFS)

rxImport(inData = joinedDF5Txt, destData, overwrite = TRUE)

rxGetInfo(destData, getVarInfo = T)

# File name: /user/RevoShare/dev/delayDataLarge/joined5XDF 
# Number of composite data files: 80 
# Number of observations: 148619655 
# Number of variables: 22 
# Number of blocks: 320 
# Compression type: zlib 
# Variable information: 
#   Var 1: ArrDel15, Type: numeric, Low/High: (0.0000, 1.0000)
# Var 2: Year
# 26 factor levels: 1987 1988 1989 1990 1991 ... 2008 2009 2010 2011 2012
# Var 3: Month
# 12 factor levels: 10 11 12 1 2 ... 5 6 7 8 9
# Var 4: DayofMonth
# 31 factor levels: 1 3 4 5 7 ... 29 30 2 18 31
# Var 5: DayOfWeek
# 7 factor levels: 4 6 7 1 3 2 5
# Var 6: Carrier
# 30 factor levels: PI UA US AA DL ... HA F9 YV 9E VX
# Var 7: OriginAirportID
# 374 factor levels: 15249 12264 11042 15412 13930 ... 13341 10559 14314 11711 10558
# Var 8: DestAirportID
# 378 factor levels: 13303 14492 10721 11057 13198 ... 14802 11711 11931 12899 10559
# Var 9: CRSDepTime, Type: integer, Low/High: (0, 24)
# Var 10: CRSArrTime, Type: integer, Low/High: (0, 2400)
# Var 11: RelativeHumidityOrigin, Type: numeric, Low/High: (0.0000, 100.0000)
# Var 12: AltimeterOrigin, Type: numeric, Low/High: (28.1700, 31.1600)
# Var 13: DryBulbCelsiusOrigin, Type: numeric, Low/High: (-46.1000, 47.8000)
# Var 14: WindSpeedOrigin, Type: numeric, Low/High: (0.0000, 81.0000)
# Var 15: VisibilityOrigin, Type: numeric, Low/High: (0.0000, 90.0000)
# Var 16: DewPointCelsiusOrigin, Type: numeric, Low/High: (-41.7000, 29.0000)
# Var 17: RelativeHumidityDest, Type: numeric, Low/High: (0.0000, 100.0000)
# Var 18: AltimeterDest, Type: numeric, Low/High: (28.1700, 31.1600)
# Var 19: DryBulbCelsiusDest, Type: numeric, Low/High: (-46.1000, 53.9000)
# Var 20: WindSpeedDest, Type: numeric, Low/High: (0.0000, 136.0000)
# Var 21: VisibilityDest, Type: numeric, Low/High: (0.0000, 88.0000)
# Var 22: DewPointCelsiusDest, Type: numeric, Low/High: (-43.0000, 29.0000)

finalData <- RxXdfData(file.path(dataDir, "joined5XDF"), fileSystem = hdfsFS)

Suddivisione dei dati per il training e il testSplitting data for training and test

rxDataStep viene usato per suddividere i dati del 2012 per il test e mantenere il resto per il training:We use rxDataStep to split out the 2012 data for testing and keep the rest for training:

# split out the training data

logmsg('split out training data as all data except year 2012')
trainDS <- RxXdfData( file.path(dataDir, "finalDataTrain" ),fileSystem = hdfsFS)

rxDataStep( inData = finalData, outFile = trainDS,
            rowSelection = ( Year != 2012 ), overwrite = T )

# split out the testing data

logmsg('split out the test data for year 2012') 
testDS <- RxXdfData( file.path(dataDir, "finalDataTest" ), fileSystem = hdfsFS)

rxDataStep( inData = finalData, outFile = testDS,
            rowSelection = ( Year == 2012 ), overwrite = T )

rxGetInfo(trainDS)
rxGetInfo(testDS)

Eseguire il training e il test di un modello di regressione logisticaTrain and test a logistic regression model

A questo punto è possibile compilare un modello.Now we are ready to build a model. Per visualizzare l'influenza dei dati meteo sui ritardi dell'ora di arrivo verrà usata la routine di regressione logistica di ScaleR.To see the influence of weather data on delay in the arrival time, we use ScaleR’s logistic regression routine. Questa viene usata per la modellazione se un ritardo di arrivo maggiore di 15 minuti è influenzato dal meteo negli aeroporti di partenza e arrivo:We use it to model whether an arrival delay of greater than 15 minutes is influenced by the weather at the departure and arrival airports:

logmsg('train a logistic regression model for Arrival Delay > 15 minutes') 
formula <- as.formula(ArrDel15 ~ Year + Month + DayofMonth + DayOfWeek + Carrier +
                     OriginAirportID + DestAirportID + CRSDepTime + CRSArrTime + 
                     RelativeHumidityOrigin + AltimeterOrigin + DryBulbCelsiusOrigin +
                     WindSpeedOrigin + VisibilityOrigin + DewPointCelsiusOrigin + 
                     RelativeHumidityDest + AltimeterDest + DryBulbCelsiusDest +
                     WindSpeedDest + VisibilityDest + DewPointCelsiusDest
                   )

# Use the scalable rxLogit() function but set max iterations to 3 for the purposes of 
# this exercise 

logitModel <- rxLogit(formula, data = trainDS, maxIterations = 3)

base::summary(logitModel)

Esaminare come procede sui dati di test eseguendo alcune previsioni e analizzando le curve ROC e AUC.Now let’s see how it does on the test data by making some predictions and looking at ROC and AUC.

# Predict over test data (Logistic Regression).

logmsg('predict over the test data') 
logitPredict <- RxXdfData(file.path(dataDir, "logitPredict"), fileSystem = hdfsFS)

# Use the scalable rxPredict() function

rxPredict(logitModel, data = testDS, outData = logitPredict,
          extraVarsToWrite = c("ArrDel15"), 
          type = 'response', overwrite = TRUE)

# Calculate ROC and Area Under the Curve (AUC).

logmsg('calculate the roc and auc') 
logitRoc <- rxRoc("ArrDel15", "ArrDel15_Pred", logitPredict)
logitAuc <- rxAuc(logitRoc)
head(logitAuc)
logitAuc

plot(logitRoc)

Assegnazione di punteggi in un'altra posizioneScoring elsewhere

È possibile anche usare il modello per l'assegnazione di punteggi ai dati in un'altra piattaformaWe can also use the model for scoring data on another platform. salvandolo come file di Servizi desktop remoto e trasferendo e importando questo file di Servizi desktop remoto nell'ambiente per l'assegnazione di punteggi di destinazione, ad esempio Servizi R per SQL Server.By saving it to an RDS file and then transferring and importing that RDS into a destination scoring environment such as SQL Server R Services. È importante assicurarsi che i factor level dei dati a cui assegnare un punteggio corrispondano a quelli in cui è stato creato il modello.It is important to ensure that the factor levels of the data to be scored match those on which the model was built. Questa corrispondenza può essere ottenuta tramite l'estrazione e il salvataggio delle informazioni sulla colonna associate ai dati di modellazione tramite la funzione rxCreateColInfo() di ScaleR, quindi applicando queste informazioni sulla colonna all'origine dati di input per la previsione.That match can be achieved by extracting and saving the column information associated with the modeling data via ScaleR’s rxCreateColInfo() function and then applying that column information to the input data source for prediction. Di seguito sono state salvate alcune righe del set di dati di test e le informazioni sulla colonna verranno estratte e usate da questo esempio nello script di previsione:In the following we save a few rows of the test dataset and extract and use the column information from this sample in the prediction script:

# save the model and a sample of the test dataset 

logmsg('save serialized version of the model and a sample of the test data')
rxSetComputeContext('localpar') 
saveRDS(logitModel, file = "logitModel.rds")
testDF <- head(testDS, 1000)  
saveRDS(testDF    , file = "testDF.rds"    )
list.files()

rxHadoopListFiles(file.path(inputDataDir,''))
rxHadoopListFiles(dataDir)

# stop the spark engine 
rxStopEngine(sparkCC) 

logmsg('Done.')
elapsed <- (proc.time() - t0)[3]
logmsg(paste('Elapsed time=',sprintf('%6.2f',elapsed),'(sec)\n\n'))

SummarySummary

In questo articolo è stato illustrato come è possibile combinare l'uso di SparkR per manipolare i dati con ScaleR per lo sviluppo di modelli di Hadoop Spark.In this article, we’ve shown how it’s possible to combine use of SparkR for data manipulation with ScaleR for model development in Hadoop Spark. In questo scenario è necessario gestire sessioni di Spark separate eseguendo una sola sessione alla volta e scambiando i dati tramite file CSV.This scenario requires that you maintain separate Spark sessions, only running one session at a time, and exchange data via CSV files. Anche se semplice, questo processo sarà ancora più semplice nella prossima versione di ML Services in cui SparkR e ScaleR potranno condividere una sessione di Spark e pertanto i relativi DataFrame.Although straightforward, this process should be even easier in an upcoming ML Services release, when SparkR and ScaleR can share a Spark session and so share Spark DataFrames.

Passaggi successivi e altre informazioniNext steps and more information

Per altre informazioni sull'uso di SparkR, vedere:For more information on use of SparkR, see: