Tutorial: PySpark and revoscalepy interoperability in Machine Learning Server

Important

This content is being retired and may not be updated in the future. The support for Machine Learning Server will end on July 1, 2022. For more information, see What's happening to Machine Learning Server?

Applies to: Microsoft Machine Learning Server 9.x

PySpark is Apache Spark's programmable interface for Python. The revoscalepy module is Machine Learning Server's Python library for predictive analytics at scale. In this tutorial, you learn how to create a logistic regression model using functions from both libraries.

  • Import packages
  • Connect to Spark using revoscalepy.rx_spark_connect(), specifying PySpark interop
  • Use PySpark for basic data manipulation
  • Use revoscalepy to build a logistic regression model

Note

The revoscalepy module provides functions for data sources and data manipulation. We are using PySpark in this tutorial to illustrate a basic technique for passing data objects between the two programming contexts.

Prerequisites

Note

Jupyter Notebook users, update your notebook to include the MMLSPy kernel. Select this kernel in your Jupyter Notebook to use the interoperability feature.

Import the relevant packages

The following commands import the required libraries into the current session.

from pyspark import SparkContext
from pyspark.sql import SparkSession
from revoscalepy import *

Connect to Spark

Setting interop = ‘pyspark’ indicates that you want interoperability.

    # with PySpark for this Spark session
    cc = rx_spark_connect(interop='pyspark', reset=True)
    
    # Get the PySpark context
    sc = rx_get_pyspark_connection(cc)
    spark = SparkSession(sc)

Data acquisition and manipulation

The sample data used in this tutorial is airline arrival and departure data, which you can store in a local file path.

    # Read in the airline data into a data frame
    airlineDF = spark.read.csv('<data source location like "file:///some-file-path/airline.csv">')
    
    # Get a count on rows
    airlineDF.count()

    # Return the first 10 lines to get familiar with the data
    airlineDF.take(10)

    # Rename columns for readability
    airlineTransformed = airlineDF.selectExpr('ARR_DEL15 as ArrDel15', \
    'YEAR as Year', \
    'MONTH as Month', \
    'DAY_OF_MONTH as DayOfMonth', \
    'DAY_OF_WEEK as DayOfWeek', \
    'UNIQUE_CARRIER as Carrier', \
    'ORIGIN_AIRPORT_ID as OriginAirportID', \
    'DEST_AIRPORT_ID as DestAirportID', \
    'FLOOR(CRS_DEP_TIME / 100) as CRSDepTime', \
    'CRS_ARR_TIME as CRSArrTime')
    
    # Break up the data set into train and test. We use training data for  
    # all years before 2012 to predict flight delays for Jan 2012
    airlineTrainDF = airlineTransformed.filter('Year < 2012')
    airlineTestDF = airlineTransformed.filter('(Year == 2012) AND (Month == 1)')
    
    # Define column info for factors
    column_info = {
        'ArrDel15': { 'type': 'numeric' },
        #'CRSDepTime': { 'type': 'integer' },
        'CRSDepTime': {
            'type': 'factor',
            'levels': ['0', '2', '3', '4', '5', '6', '7', '8', '9', '10',
                      '11', '12', '13', '14', '15', '16', '17', '18', '19', '20',
                      '21', '22', '23']
        },
        'CRSArrTime': { 'type': 'integer' },
        'Month': {
            'type': 'factor',
            'levels': ['1', '2']
        },
        'DayOfMonth': {
            'type': 'factor',
            'levels': ['1', '2', '3', '4', '5', '6', '7', '8', '9', '10',
                      '11', '12', '13', '14', '15', '16', '17', '18', '19', '20',
                      '21', '22', '23', '24', '25', '26', '27', '28', '29', '30',
                      '31']
        },
        'DayOfWeek': {
            'type': 'factor',
            'levels': ['1', '2', '3', '4', '5', '6', '7']
            #, 'newLevels': ['Monday', 'Tuesday', 'Wednesday', 'Thursday', 'Friday', 'Saturday', 'Sunday'] # ignored
        }#,
        #'Carrier': { 'type': 'factor' }
    }
    
    # Define a Spark data frame data source, required for passing to revoscalepy
    trainDS = RxSparkDataFrame(airlineTrainDF, column_info=column_info)
    testDS = RxSparkDataFrame(airlineTestDF, column_info=column_info)

Create the model

A logistic regression model requires a symbolic formula, specifying the dependent and independent variables, and a data set. You can output the results using the print function.

   # Create the formula
   formula = "ArrDel15 ~ DayOfMonth + DayOfWeek + CRSDepTime + CRSArrTime"
   
   # Run a logistic regression to predict arrival delay
   logitModel = rx_logit(formula, data = trainDS)
   
   # Print the model summary to look at the co-efficients
   print(logitModel.summary())

Next steps

This tutorial provides an introduction to a basic workflow using PySpark for data preparation and revoscalepy functions for logistic regression. For further exploration, review our Python samples.

See also