Combine Scaler e SparkR em HDInsightCombine ScaleR and SparkR in HDInsight

Este documento mostra como prever atrasos na chegada de voo usando um modelo de regressão logística scaleR.This document shows how to predict flight arrival delays using a ScaleR logistic regression model. O exemplo utiliza dados de atraso de voo e meteorologia, unidos através do SparkR .The example uses flight delay and weather data, joined using SparkR .

Apesar de ambos os pacotes funcionarem no motor de execução spark da Apache Hadoop, estão bloqueados da partilha de dados na memória, uma vez que cada um deles requer as suas respetivas sessões de Spark.Although both packages run on Apache Hadoop's Spark execution engine, they're blocked from in-memory data sharing as they each require their own respective Spark sessions. Até que este problema seja abordado numa próxima versão do ML Server, a solução alternativa é manter sessões de Spark não sobrepostas e trocar dados através de ficheiros intermédios.Until this issue is addressed in an upcoming version of ML Server, the workaround is to maintain non-overlapping Spark sessions, and to exchange data through intermediate files. As instruções aqui demonstram que estes requisitos são simples de alcançar.The instructions here show that these requirements are straightforward to achieve.

Este exemplo foi inicialmente partilhado numa palestra em Strata 2016 por Mario Inchiosa e Roni Burd.This example was initially shared in a talk at Strata 2016 by Mario Inchiosa and Roni Burd. Você pode encontrar esta conversa na Building a Scalable Data Science Platform with R.You can find this talk at Building a Scalable Data Science Platform with R.

O código foi originalmente escrito para o ML Server em execução no Spark num cluster HDInsight em Azure.The code was originally written for ML Server running on Spark in an HDInsight cluster on Azure. Mas o conceito de misturar o uso de SparkR e ScaleR num só script também é válido no contexto de ambientes no local.But the concept of mixing the use of SparkR and ScaleR in one script is also valid in the context of on-premises environments.

Os passos deste documento assumem que você tem um nível intermédio de conhecimento de R e são a biblioteca ScaleR do ML Server.The steps in this document assume that you have an intermediate level of knowledge of R and R the ScaleR library of ML Server. És apresentado ao SparkR enquanto caminhas por este cenário.You're introduced to SparkR while walking through this scenario.

Os conjuntos de dados da companhia aérea e do tempoThe airline and weather datasets

Os dados de voo estão disponíveis nos arquivos governamentais dos EUA.The flight data is available from the U.S. government archives. Também está disponível como um zip de AirOnTimeCSV.zip. It's also available as a zip from AirOnTimeCSV.zip.

Os dados meteorológicos podem ser descarregados como ficheiros zip em forma bruta, por mês, do repositório da Administração Oceânica e Atmosférica Nacional.The weather data can be downloaded as zip files in raw form, by month, from the National Oceanic and Atmospheric Administration repository. Para este exemplo, faça o download dos dados para maio de 2007 a dezembro de 2012.For this example, download the data for May 2007 – December 2012. Utilize os ficheiros de dados de hora em hora e YYYYMMMstation.txt o ficheiro dentro de cada um dos fechos.Use the hourly data files and YYYYMMMstation.txt file within each of the zips.

Criação do ambiente SparkSetting up the Spark environment

Utilize o seguinte código para configurar o ambiente Spark:Use the following code to set up the Spark environment:

workDir        <- '~'  
myNameNode     <- 'default' 
myPort         <- 0
inputDataDir   <- 'wasb://hdfs@myAzureAccount.blob.core.windows.net'
hdfsFS         <- RxHdfsFileSystem(hostName=myNameNode, port=myPort)

# create a persistent Spark session to reduce startup times 
#   (remember to stop it later!)
 
sparkCC        <- RxSpark(consoleOutput=TRUE, nameNode=myNameNode, port=myPort, persistentRun=TRUE)

# create working directories 

rxHadoopMakeDir('/user')
rxHadoopMakeDir('user/RevoShare')
rxHadoopMakeDir('user/RevoShare/remoteuser')

(dataDir <- '/share')
rxHadoopMakeDir(dataDir)
rxHadoopListFiles(dataDir) 

setwd(workDir)
getwd()

# version of rxRoc that runs in a local CC 
rxRoc <- function(...){
  rxSetComputeContext(RxLocalSeq())
  roc <- RevoScaleR::rxRoc(...)
  rxSetComputeContext(sparkCC)
  return(roc)
}

logmsg <- function(msg) { cat(format(Sys.time(), "%Y-%m-%d %H:%M:%S"),':',msg,'\n') } 
t0 <- proc.time() 

#..start 

logmsg('Start') 
(trackers <- system("mapred job -list-active-trackers", intern = TRUE))
logmsg(paste('Number of task nodes=',length(trackers)))

Em seguida, adicione Spark_Home ao caminho de pesquisa para pacotes R.Next, add Spark_Home to the search path for R packages. Adicioná-lo ao caminho de pesquisa permite-lhe usar o SparkR e rubricar uma sessão de SparkR:Adding it to the search path allows you to use SparkR, and initialize a SparkR session:

#..setup for use of SparkR  

logmsg('Initialize SparkR') 

.libPaths(c(file.path(Sys.getenv("SPARK_HOME"), "R", "lib"), .libPaths()))
library(SparkR)

sparkEnvir <- list(spark.executor.instances = '10',
                   spark.yarn.executor.memoryOverhead = '8000')

sc <- sparkR.init(
  sparkEnvir = sparkEnvir,
  sparkPackages = "com.databricks:spark-csv_2.10:1.3.0"
)

sqlContext <- sparkRSQL.init(sc)

Preparação dos dados meteorológicosPreparing the weather data

Para preparar os dados meteorológicos, subconte-os nas colunas necessárias para modelar:To prepare the weather data, subset it to the columns needed for modeling:

  • "Visibilidade""Visibility"
  • "DryBulbCelsius""DryBulbCelsius"
  • "DewPointCelsius""DewPointCelsius"
  • "RelativaHumidity""RelativeHumidity"
  • "WindSpeed""WindSpeed"
  • "Altímetro""Altimeter"

Em seguida, adicione um código de aeroporto associado à estação meteorológica e converta as medições da hora local para a UTC.Then add an airport code associated with the weather station and convert the measurements from local time to UTC.

Comece por criar um ficheiro para mapear a informação da estação meteorológica (WBAN) para um código de aeroporto.Begin by creating a file to map the weather station (WBAN) info to an airport code. O código seguinte lê cada um dos ficheiros de dados meteorológicos brutos de hora em hora, subconjuntos às colunas de que precisamos, funde o ficheiro de mapeamento da estação meteorológica, ajusta os horários das medições para a UTC e, em seguida, escreve uma nova versão do ficheiro:The following code reads each of the hourly raw weather data files, subsets to the columns we need, merges the weather station-mapping file, adjusts the date times of measurements to UTC, and then writes out a new version of the file:

# Look up AirportID and Timezone for WBAN (weather station ID) and adjust time

adjustTime <- function(dataList)
{
  dataset0 <- as.data.frame(dataList)
  
  dataset1 <- base::merge(dataset0, wbanToAirIDAndTZDF1, by = "WBAN")

  if(nrow(dataset1) == 0) {
    dataset1 <- data.frame(
      Visibility = numeric(0),
      DryBulbCelsius = numeric(0),
      DewPointCelsius = numeric(0),
      RelativeHumidity = numeric(0),
      WindSpeed = numeric(0),
      Altimeter = numeric(0),
      AdjustedYear = numeric(0),
      AdjustedMonth = numeric(0),
      AdjustedDay = integer(0),
      AdjustedHour = integer(0),
      AirportID = integer(0)
    )
    
    return(dataset1)
  }
  
  Year <- as.integer(substr(dataset1$Date, 1, 4))
  Month <- as.integer(substr(dataset1$Date, 5, 6))
  Day <- as.integer(substr(dataset1$Date, 7, 8))
  
  Time <- dataset1$Time
  Hour <- ceiling(Time/100)
  
  Timezone <- as.integer(dataset1$TimeZone)
  
  adjustdate = as.POSIXlt(sprintf("%4d-%02d-%02d %02d:00:00", Year, Month, Day, Hour), tz = "UTC") + Timezone * 3600

  AdjustedYear = as.POSIXlt(adjustdate)$year + 1900
  AdjustedMonth = as.POSIXlt(adjustdate)$mon + 1
  AdjustedDay   = as.POSIXlt(adjustdate)$mday
  AdjustedHour  = as.POSIXlt(adjustdate)$hour
  
  AirportID = dataset1$AirportID
  Weather = dataset1[,c("Visibility", "DryBulbCelsius", "DewPointCelsius", "RelativeHumidity", "WindSpeed", "Altimeter")]
  
  data.set = data.frame(cbind(AdjustedYear, AdjustedMonth, AdjustedDay, AdjustedHour, AirportID, Weather))
  
  return(data.set)
}

wbanToAirIDAndTZDF <- read.csv("wban-to-airport-id-tz.csv")

colInfo <- list(
  WBAN = list(type="integer"),
  Date = list(type="character"),
  Time = list(type="integer"),
  Visibility = list(type="numeric"),
  DryBulbCelsius = list(type="numeric"),
  DewPointCelsius = list(type="numeric"),
  RelativeHumidity = list(type="numeric"),
  WindSpeed = list(type="numeric"),
  Altimeter = list(type="numeric")
)

weatherDF <- RxTextData(file.path(inputDataDir, "WeatherRaw"), colInfo = colInfo)

weatherDF1 <- RxTextData(file.path(inputDataDir, "Weather"), colInfo = colInfo,
                filesystem=hdfsFS)

rxSetComputeContext("localpar")
rxDataStep(weatherDF, outFile = weatherDF1, rowsPerRead = 50000, overwrite = T,
           transformFunc = adjustTime,
           transformObjects = list(wbanToAirIDAndTZDF1 = wbanToAirIDAndTZDF))

Importar a companhia aérea e os dados meteorológicos para a Spark DataFramesImporting the airline and weather data to Spark DataFrames

Agora usamos a função SparkR read.df() para importar os dados meteorológicos e da companhia aérea para Spark DataFrames.Now we use the SparkR read.df() function to import the weather and airline data to Spark DataFrames. Esta função, como muitos outros métodos de faísca, é executada preguiçosamente, o que significa que são executados para execução, mas não executados até ser necessário.This function, like many other Spark methods, is executed lazily, meaning that they're queued for execution but not executed until required.

airPath     <- file.path(inputDataDir, "AirOnTime08to12CSV")
weatherPath <- file.path(inputDataDir, "Weather") # pre-processed weather data
rxHadoopListFiles(airPath) 
rxHadoopListFiles(weatherPath) 

# create a SparkR DataFrame for the airline data

logmsg('create a SparkR DataFrame for the airline data') 
# use inferSchema = "false" for more robust parsing
airDF <- read.df(sqlContext, airPath, source = "com.databricks.spark.csv", 
                 header = "true", inferSchema = "false")

# Create a SparkR DataFrame for the weather data

logmsg('create a SparkR DataFrame for the weather data') 
weatherDF <- read.df(sqlContext, weatherPath, source = "com.databricks.spark.csv", 
                     header = "true", inferSchema = "true")

Limpeza e transformação de dadosData cleansing and transformation

A seguir fazemos uma limpeza dos dados da companhia aérea que importámos para mudar o nome das colunas.Next we do some cleanup on the airline data we've imported to rename columns. Apenas mantemos as variáveis necessárias, e os horários de partida programados redondos até à hora mais próxima para permitir a fusão com os dados meteorológicos mais recentes à partida:We only keep the variables needed, and round scheduled departure times down to the nearest hour to enable merging with the latest weather data at departure:

logmsg('clean the airline data') 
airDF <- rename(airDF,
                ArrDel15 = airDF$ARR_DEL15,
                Year = airDF$YEAR,
                Month = airDF$MONTH,
                DayofMonth = airDF$DAY_OF_MONTH,
                DayOfWeek = airDF$DAY_OF_WEEK,
                Carrier = airDF$UNIQUE_CARRIER,
                OriginAirportID = airDF$ORIGIN_AIRPORT_ID,
                DestAirportID = airDF$DEST_AIRPORT_ID,
                CRSDepTime = airDF$CRS_DEP_TIME,
                CRSArrTime =  airDF$CRS_ARR_TIME
)

# Select desired columns from the flight data. 
varsToKeep <- c("ArrDel15", "Year", "Month", "DayofMonth", "DayOfWeek", "Carrier", "OriginAirportID", "DestAirportID", "CRSDepTime", "CRSArrTime")
airDF <- select(airDF, varsToKeep)

# Apply schema
coltypes(airDF) <- c("character", "integer", "integer", "integer", "integer", "character", "integer", "integer", "integer", "integer")

# Round down scheduled departure time to full hour.
airDF$CRSDepTime <- floor(airDF$CRSDepTime / 100)

Agora realizamos operações semelhantes nos dados meteorológicos:Now we perform similar operations on the weather data:

# Average weather readings by hour
logmsg('clean the weather data') 
weatherDF <- agg(groupBy(weatherDF, "AdjustedYear", "AdjustedMonth", "AdjustedDay", "AdjustedHour", "AirportID"), Visibility="avg",
                  DryBulbCelsius="avg", DewPointCelsius="avg", RelativeHumidity="avg", WindSpeed="avg", Altimeter="avg"
                  )

weatherDF <- rename(weatherDF,
                    Visibility = weatherDF$'avg(Visibility)',
                    DryBulbCelsius = weatherDF$'avg(DryBulbCelsius)',
                    DewPointCelsius = weatherDF$'avg(DewPointCelsius)',
                    RelativeHumidity = weatherDF$'avg(RelativeHumidity)',
                    WindSpeed = weatherDF$'avg(WindSpeed)',
                    Altimeter = weatherDF$'avg(Altimeter)'
)

Juntando-se aos dados meteorológicos e aéreosJoining the weather and airline data

Utilizamos agora a função de adere sparkr para fazer uma junção externa à esquerda da companhia aérea e dados meteorológicos por partida do AirportID e data.We now use the SparkR join() function to do a left outer join of the airline and weather data by departure AirportID and datetime. A união externa permite-nos reter todos os registos de dados da companhia aérea, mesmo que não haja dados meteorológicos correspondentes.The outer join allows us to retain all the airline data records even if there's no matching weather data. Após a junção, removemos algumas colunas redundantes e rebatizamos as colunas mantidas para remover o prefixo DataFrame introduzido pela junta.Following the join, we remove some redundant columns, and rename the kept columns to remove the incoming DataFrame prefix introduced by the join.

logmsg('Join airline data with weather at Origin Airport')
joinedDF <- SparkR::join(
  airDF,
  weatherDF,
  airDF$OriginAirportID == weatherDF$AirportID &
    airDF$Year == weatherDF$AdjustedYear &
    airDF$Month == weatherDF$AdjustedMonth &
    airDF$DayofMonth == weatherDF$AdjustedDay &
    airDF$CRSDepTime == weatherDF$AdjustedHour,
  joinType = "left_outer"
)

# Remove redundant columns
vars <- names(joinedDF)
varsToDrop <- c('AdjustedYear', 'AdjustedMonth', 'AdjustedDay', 'AdjustedHour', 'AirportID')
varsToKeep <- vars[!(vars %in% varsToDrop)]
joinedDF1 <- select(joinedDF, varsToKeep)

joinedDF2 <- rename(joinedDF1,
                    VisibilityOrigin = joinedDF1$Visibility,
                    DryBulbCelsiusOrigin = joinedDF1$DryBulbCelsius,
                    DewPointCelsiusOrigin = joinedDF1$DewPointCelsius,
                    RelativeHumidityOrigin = joinedDF1$RelativeHumidity,
                    WindSpeedOrigin = joinedDF1$WindSpeed,
                    AltimeterOrigin = joinedDF1$Altimeter
)

De forma semelhante, juntamo-nos aos dados meteorológicos e aéreos com base na chegada do AirportID e na hora da data:In a similar fashion, we join the weather and airline data based on arrival AirportID and datetime:

logmsg('Join airline data with weather at Destination Airport')
joinedDF3 <- SparkR::join(
  joinedDF2,
  weatherDF,
  airDF$DestAirportID == weatherDF$AirportID &
    airDF$Year == weatherDF$AdjustedYear &
    airDF$Month == weatherDF$AdjustedMonth &
    airDF$DayofMonth == weatherDF$AdjustedDay &
    airDF$CRSDepTime == weatherDF$AdjustedHour,
  joinType = "left_outer"
)

# Remove redundant columns
vars <- names(joinedDF3)
varsToDrop <- c('AdjustedYear', 'AdjustedMonth', 'AdjustedDay', 'AdjustedHour', 'AirportID')
varsToKeep <- vars[!(vars %in% varsToDrop)]
joinedDF4 <- select(joinedDF3, varsToKeep)

joinedDF5 <- rename(joinedDF4,
                    VisibilityDest = joinedDF4$Visibility,
                    DryBulbCelsiusDest = joinedDF4$DryBulbCelsius,
                    DewPointCelsiusDest = joinedDF4$DewPointCelsius,
                    RelativeHumidityDest = joinedDF4$RelativeHumidity,
                    WindSpeedDest = joinedDF4$WindSpeed,
                    AltimeterDest = joinedDF4$Altimeter
                    )

Guardar resultados para CSV para troca com ScaleRSave results to CSV for exchange with ScaleR

Isso completa as juntas que precisamos fazer com o SparkR.That completes the joins we need to do with SparkR. Guardamos os dados do Último Spark DataFrame "joinedDF5" para um CSV para entrada no ScaleR e, em seguida, fechamos a sessão sparkr.We save the data from the final Spark DataFrame "joinedDF5" to a CSV for input to ScaleR and then close out the SparkR session. Dizemos explicitamente ao SparkR para guardar o CSV resultante em 80 divisórias separadas para permitir o paralelismo suficiente no processamento da ScaleR:We explicitly tell SparkR to save the resultant CSV in 80 separate partitions to enable sufficient parallelism in ScaleR processing:

logmsg('output the joined data from Spark to CSV') 
joinedDF5 <- repartition(joinedDF5, 80) # write.df below will produce this many CSVs

# write result to directory of CSVs
write.df(joinedDF5, file.path(dataDir, "joined5Csv"), "com.databricks.spark.csv", "overwrite", header = "true")

# We can shut down the SparkR Spark context now
sparkR.stop()

# remove non-data files
rxHadoopRemove(file.path(dataDir, "joined5Csv/_SUCCESS"))

Importação para XDF para utilização por ScaleRImport to XDF for use by ScaleR

Poderíamos usar o ficheiro CSV de companhias aéreas e dados meteorológicos aderidos como é para modelação através de uma fonte de dados de texto ScaleR.We could use the CSV file of joined airline and weather data as-is for modeling via a ScaleR text data source. Mas importamos primeiro para a XDF, já que é mais eficiente quando executa várias operações no conjunto de dados:But we import it to XDF first, since it's more efficient when running multiple operations on the dataset:

logmsg('Import the CSV to compressed, binary XDF format') 

# set the Spark compute context for ML Services 
rxSetComputeContext(sparkCC)
rxGetComputeContext()

colInfo <- list(
  ArrDel15 = list(type="numeric"),
  Year = list(type="factor"),
  Month = list(type="factor"),
  DayofMonth = list(type="factor"),
  DayOfWeek = list(type="factor"),
  Carrier = list(type="factor"),
  OriginAirportID = list(type="factor"),
  DestAirportID = list(type="factor"),
  RelativeHumidityOrigin = list(type="numeric"),
  AltimeterOrigin = list(type="numeric"),
  DryBulbCelsiusOrigin = list(type="numeric"),
  WindSpeedOrigin = list(type="numeric"),
  VisibilityOrigin = list(type="numeric"),
  DewPointCelsiusOrigin = list(type="numeric"),
  RelativeHumidityDest = list(type="numeric"),
  AltimeterDest = list(type="numeric"),
  DryBulbCelsiusDest = list(type="numeric"),
  WindSpeedDest = list(type="numeric"),
  VisibilityDest = list(type="numeric"),
  DewPointCelsiusDest = list(type="numeric")
)

joinedDF5Txt <- RxTextData(file.path(dataDir, "joined5Csv"),
                           colInfo = colInfo, fileSystem = hdfsFS)
rxGetInfo(joinedDF5Txt) 

destData <- RxXdfData(file.path(dataDir, "joined5XDF"), fileSystem = hdfsFS)

rxImport(inData = joinedDF5Txt, destData, overwrite = TRUE)

rxGetInfo(destData, getVarInfo = T)

# File name: /user/RevoShare/dev/delayDataLarge/joined5XDF 
# Number of composite data files: 80 
# Number of observations: 148619655 
# Number of variables: 22 
# Number of blocks: 320 
# Compression type: zlib 
# Variable information: 
#   Var 1: ArrDel15, Type: numeric, Low/High: (0.0000, 1.0000)
# Var 2: Year
# 26 factor levels: 1987 1988 1989 1990 1991 ... 2008 2009 2010 2011 2012
# Var 3: Month
# 12 factor levels: 10 11 12 1 2 ... 5 6 7 8 9
# Var 4: DayofMonth
# 31 factor levels: 1 3 4 5 7 ... 29 30 2 18 31
# Var 5: DayOfWeek
# 7 factor levels: 4 6 7 1 3 2 5
# Var 6: Carrier
# 30 factor levels: PI UA US AA DL ... HA F9 YV 9E VX
# Var 7: OriginAirportID
# 374 factor levels: 15249 12264 11042 15412 13930 ... 13341 10559 14314 11711 10558
# Var 8: DestAirportID
# 378 factor levels: 13303 14492 10721 11057 13198 ... 14802 11711 11931 12899 10559
# Var 9: CRSDepTime, Type: integer, Low/High: (0, 24)
# Var 10: CRSArrTime, Type: integer, Low/High: (0, 2400)
# Var 11: RelativeHumidityOrigin, Type: numeric, Low/High: (0.0000, 100.0000)
# Var 12: AltimeterOrigin, Type: numeric, Low/High: (28.1700, 31.1600)
# Var 13: DryBulbCelsiusOrigin, Type: numeric, Low/High: (-46.1000, 47.8000)
# Var 14: WindSpeedOrigin, Type: numeric, Low/High: (0.0000, 81.0000)
# Var 15: VisibilityOrigin, Type: numeric, Low/High: (0.0000, 90.0000)
# Var 16: DewPointCelsiusOrigin, Type: numeric, Low/High: (-41.7000, 29.0000)
# Var 17: RelativeHumidityDest, Type: numeric, Low/High: (0.0000, 100.0000)
# Var 18: AltimeterDest, Type: numeric, Low/High: (28.1700, 31.1600)
# Var 19: DryBulbCelsiusDest, Type: numeric, Low/High: (-46.1000, 53.9000)
# Var 20: WindSpeedDest, Type: numeric, Low/High: (0.0000, 136.0000)
# Var 21: VisibilityDest, Type: numeric, Low/High: (0.0000, 88.0000)
# Var 22: DewPointCelsiusDest, Type: numeric, Low/High: (-43.0000, 29.0000)

finalData <- RxXdfData(file.path(dataDir, "joined5XDF"), fileSystem = hdfsFS)

Divisão de dados para treino e testeSplitting data for training and test

Utilizamos o rxDataStep para dividir os dados de 2012 para testes e manter o resto para treino:We use rxDataStep to split out the 2012 data for testing and keep the rest for training:

# split out the training data

logmsg('split out training data as all data except year 2012')
trainDS <- RxXdfData( file.path(dataDir, "finalDataTrain" ),fileSystem = hdfsFS)

rxDataStep( inData = finalData, outFile = trainDS,
            rowSelection = ( Year != 2012 ), overwrite = T )

# split out the testing data

logmsg('split out the test data for year 2012') 
testDS <- RxXdfData( file.path(dataDir, "finalDataTest" ), fileSystem = hdfsFS)

rxDataStep( inData = finalData, outFile = testDS,
            rowSelection = ( Year == 2012 ), overwrite = T )

rxGetInfo(trainDS)
rxGetInfo(testDS)

Treine e teste um modelo de regressão logísticaTrain and test a logistic regression model

Agora estamos prontos para construir um modelo.Now we're ready to build a model. Para ver a influência dos dados meteorológicos sobre o atraso na hora de chegada, usamos a rotina de regressão logística da ScaleR.To see the influence of weather data on delay in the arrival time, we use ScaleR's logistic regression routine. Utilizamo-lo para modelar se um atraso de chegada superior a 15 minutos é influenciado pelo tempo nos aeroportos de partida e chegada:We use it to model whether an arrival delay of greater than 15 minutes is influenced by the weather at the departure and arrival airports:

logmsg('train a logistic regression model for Arrival Delay > 15 minutes') 
formula <- as.formula(ArrDel15 ~ Year + Month + DayofMonth + DayOfWeek + Carrier +
                     OriginAirportID + DestAirportID + CRSDepTime + CRSArrTime + 
                     RelativeHumidityOrigin + AltimeterOrigin + DryBulbCelsiusOrigin +
                     WindSpeedOrigin + VisibilityOrigin + DewPointCelsiusOrigin + 
                     RelativeHumidityDest + AltimeterDest + DryBulbCelsiusDest +
                     WindSpeedDest + VisibilityDest + DewPointCelsiusDest
                   )

# Use the scalable rxLogit() function but set max iterations to 3 for the purposes of 
# this exercise 

logitModel <- rxLogit(formula, data = trainDS, maxIterations = 3)

base::summary(logitModel)

Agora vamos ver como se sai nos dados do teste fazendo algumas previsões e olhando para ROC e AUC.Now let's see how it does on the test data by making some predictions and looking at ROC and AUC.

# Predict over test data (Logistic Regression).

logmsg('predict over the test data') 
logitPredict <- RxXdfData(file.path(dataDir, "logitPredict"), fileSystem = hdfsFS)

# Use the scalable rxPredict() function

rxPredict(logitModel, data = testDS, outData = logitPredict,
          extraVarsToWrite = c("ArrDel15"), 
          type = 'response', overwrite = TRUE)

# Calculate ROC and Area Under the Curve (AUC).

logmsg('calculate the roc and auc') 
logitRoc <- rxRoc("ArrDel15", "ArrDel15_Pred", logitPredict)
logitAuc <- rxAuc(logitRoc)
head(logitAuc)
logitAuc

plot(logitRoc)

Pontuação em outro lugarScoring elsewhere

Também podemos usar o modelo para marcar dados noutra plataforma.We can also use the model for scoring data on another platform. Ao guardá-lo para um ficheiro RDS e, em seguida, transferindo e importando esse RDS para um ambiente de pontuação de destino, como o Microsoft SQL Server R Services.By saving it to an RDS file and then transferring and importing that RDS into a destination scoring environment such as Microsoft SQL Server R Services. É importante garantir que os níveis de fator dos dados a serem pontuados correspondem aos dos quais o modelo foi construído.It's important to ensure that the factor levels of the data to be scored match those on which the model was built. Esta correspondência pode ser conseguida extraindo e guardando as informações da coluna associadas aos dados de modelação através da função da ScaleR rxCreateColInfo() e, em seguida, aplicando essa informação de coluna à fonte de dados de entrada para previsão.That match can be achieved by extracting and saving the column information associated with the modeling data via ScaleR's rxCreateColInfo() function and then applying that column information to the input data source for prediction. No seguinte exemplo de código, guardamos algumas linhas do conjunto de dados de teste e extraímos e utilizamos as informações da coluna desta amostra no script de previsão:In the following code example, we save a few rows of the test dataset and extract and use the column information from this sample in the prediction script:

# save the model and a sample of the test dataset 

logmsg('save serialized version of the model and a sample of the test data')
rxSetComputeContext('localpar') 
saveRDS(logitModel, file = "logitModel.rds")
testDF <- head(testDS, 1000)  
saveRDS(testDF    , file = "testDF.rds"    )
list.files()

rxHadoopListFiles(file.path(inputDataDir,''))
rxHadoopListFiles(dataDir)

# stop the spark engine 
rxStopEngine(sparkCC) 

logmsg('Done.')
elapsed <- (proc.time() - t0)[3]
logmsg(paste('Elapsed time=',sprintf('%6.2f',elapsed),'(sec)\n\n'))

ResumoSummary

Neste artigo, mostrámos como é possível combinar o uso do SparkR para manipulação de dados com a ScaleR para desenvolvimento de modelos em Hadoop Spark.In this article, we've shown how it's possible to combine use of SparkR for data manipulation with ScaleR for model development in Hadoop Spark. Este cenário requer que mantenha sessões separadas de Spark, executando apenas uma sessão de cada vez, e troque dados através de ficheiros CSV.This scenario requires that you maintain separate Spark sessions, only running one session at a time, and exchange data via CSV files. Embora simples, este processo deve ser ainda mais fácil numa próxima versão dos ML Services, quando o SparkR e o ScaleR podem partilhar uma sessão de Spark e assim partilhar o Spark DataFrames.Although straightforward, this process should be even easier in an upcoming ML Services release, when SparkR and ScaleR can share a Spark session and so share Spark DataFrames.

Próximos passos e mais informaçõesNext steps and more information

Para obter mais informações sobre a utilização do SparkR, consulte:For more information on use of SparkR, see: