Tutorial: Azure Data Lake Storage Gen2, Azure Databricks y Spark

Este tutorial muestra cómo conectarse con un clúster de Azure Databricks para consultar los datos almacenados en una cuenta de almacenamiento de Azure que tiene Azure Data Lake Storage Gen2 habilitado. Esta conexión le permite ejecutar forma nativa las consultas y los análisis del clúster en los datos.

En este tutorial, aprenderá lo siguiente:

  • Ingesta de datos no estructurados en una cuenta de almacenamiento
  • Ejecución de análisis en los datos de almacenamiento de blobs

Si no tiene una suscripción a Azure, cree una cuenta gratuita antes de empezar.

Requisitos previos

Cree un área de trabajo de Azure Databricks, un clúster y un cuaderno

  1. Crear un área de trabajo de Azure Databricks. Consulte Creación de un área de trabajo de Azure Databricks.

  2. Crear un clúster. Consulte Creación de un clúster.

  3. Cree un cuaderno. Consulte Creación de un cuaderno. Elija Python como lenguaje predeterminado del cuaderno.

Mantenga el cuaderno abierto. Lo usará en las secciones siguientes.

Descarga de los datos de vuelo

En este tutorial se utilizan datos sobre puntualidad de vuelos correspondientes a enero de 2016 de Bureau of Transportation Statistics para demostrar cómo realizar una operación de ETL. Debe descargar estos datos para completar el tutorial.

  1. Descargue el archivoOn_Time_Reporting_Carrier_On_Time_Performance_1987_present_2016_1.zip. Este archivo contiene los datos de vuelo.

  2. Descomprima el contenido del archivo comprimido y anote el nombre y la ruta de acceso del archivo. Necesitará esta información en pasos posteriores.

Si quiere conocer en detalle la información capturada en los datos de rendimiento puntual de informes, puede ver las descripciones de campo en el sitio web de Bureau of Transportation Statistics.

Ingerir datos

En esta sección, cargará los datos de vuelo .csv en la cuenta de Azure Data Lake Storage Gen2 y, a continuación, montará la cuenta de almacenamiento en el clúster de Databricks. Por último, usará Databricks para leer los datos de vuelo .csv y volverá a escribirlos en el almacenamiento en formato Apache Parquet.

Carga de los datos de vuelo en la cuenta de almacenamiento

Use AzCopy para copiar el archivo .csv en su cuenta de Azure Data Lake Storage Gen2. Use el comando azcopy make para crear un contenedor en la cuenta de almacenamiento. A continuación, use el comando azcopy copy para copiar los datos csv que acaba de descargar en un directorio de ese contenedor.

En los pasos siguientes, debe escribir los nombres del contenedor que desea crear y el directorio y blob en los que desea cargar los datos de vuelo en el contenedor. Puede usar los nombres sugeridos en cada paso o especificar los suyos propios. Para ello, observe las convenciones de nomenclatura para contenedores, directorios y blobs.

  1. Abra una ventana del símbolo del sistema y escriba el siguiente comando para iniciar sesión en Azure Active Directory a fin de acceder la cuenta de almacenamiento.

    azcopy login
    

    Siga las instrucciones que aparecen en la ventana del símbolo del sistema para autenticar la cuenta de usuario.

  2. Para crear un contenedor en la cuenta de almacenamiento con el fin de almacenar los datos de vuelo, escriba el siguiente comando:

    azcopy make  "https://<storage-account-name>.dfs.core.windows.net/<container-name>" 
    
    • Reemplace el valor de marcador de posición <storage-account-name> por el nombre de la cuenta de almacenamiento.

    • Reemplace el marcador de posición <container-name> por un nombre del contenedor que desea crear para almacenar los datos csv; por ejemplo, flight-data-container.

  3. Para cargar (copiar) los datos csv en la cuenta de almacenamiento, escriba el siguiente comando.

    azcopy copy "<csv-folder-path>" https://<storage-account-name>.dfs.core.windows.net/<container-name>/<directory-name>/On_Time.csv
    
    • Reemplace el valor de marcador de posición <csv-folder-path> por la ruta de acceso al archivo csv.

    • Reemplace el valor de marcador de posición <storage-account-name> por el nombre de la cuenta de almacenamiento.

    • Reemplace el marcador de posición <container-name> por el nombre del contenedor en la cuenta de almacenamiento.

    • Reemplace el marcador de posición <directory-name> por el nombre de un directorio para almacenar los datos en el contenedor; por ejemplo, jan2016.

Montaje de la cuenta de almacenamiento en el clúster de Databricks

En esta sección, montará el almacenamiento de objetos en la nube de Azure Data Lake Storage Gen2 en el sistema de archivos de Databricks (DBFS). Use la entidad de servicio de Azure AD que creó anteriormente para la autenticación con la cuenta de almacenamiento. Para más información, consulte Montaje del almacenamiento de objetos en la nube en Azure Databricks.

  1. Adjunte el cuaderno al clúster.

    1. En el cuaderno que creó anteriormente, seleccione el botón Conectar de la esquina superior derecha de la barra de herramientas del cuaderno. Este botón abre el selector de proceso (si ya ha conectado el cuaderno a un clúster, el nombre de ese clúster se muestra en el texto del botón en lugar de en Conectar).

    2. En el menú desplegable del clúster, seleccione el clúster que creó anteriormente.

    3. Observe que el texto del selector de clústeres cambia a iniciando. Espere a que el clúster termine de iniciarse y a que el nombre del clúster aparezca en el botón antes de continuar.

  2. Copie y pegue el siguiente bloque de código en la primera celda, pero no ejecute el código aún.

    configs = {"fs.azure.account.auth.type": "OAuth",
           "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
           "fs.azure.account.oauth2.client.id": "<appId>",
           "fs.azure.account.oauth2.client.secret": "<clientSecret>",
           "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/<tenantId>/oauth2/token",
           "fs.azure.createRemoteFileSystemDuringInitialization": "true"}
    
    dbutils.fs.mount(
    source = "abfss://<container-name>@<storage-account-name>.dfs.core.windows.net/<directory-name>",
    mount_point = "/mnt/flightdata",
    extra_configs = configs)
    
  3. En este bloque de código:

    • En configs, reemplace los valores de marcador de posición <appId>, <clientSecret> y <tenantId> por el identificador de aplicación, el secreto de cliente y el identificador de inquilino que copió al crear la entidad de servicio en los requisitos previos.

    • En el URI source, reemplace los valores de marcador de posición <storage-account-name>, <container-name>y <directory-name> por el nombre de la cuenta de almacenamiento de Azure Data Lake Storage Gen2 y el nombre del contenedor y directorio que especificó al cargar los datos de vuelo en la cuenta de almacenamiento.

      Nota:

      El identificador de esquema del URI, abfss, indica a Databricks que use el controlador de Azure Blob File System con Seguridad de la capa de transporte (TLS). Para obtener más información sobre el URI, consulte Uso del URI de Azure Data Lake Storage Gen2.

  4. Asegúrese de que el clúster ha terminado de iniciarse antes de continuar.

  5. Presione las teclas MAYÚS + ENTRAR para ejecutar el código de este bloque.

El contenedor y directorio donde cargó los datos de vuelo de la cuenta de almacenamiento ahora es accesible en el cuaderno a través del punto de montaje, /mnt/flightdata.

Uso de Databricks Notebook para convertir CSV en Parquet

Ahora que se puede acceder a los datos de vuelo csv a través de un punto de montaje de DBFS, puede usar un objeto DataFrame de Apache Spark para cargarlos en el área de trabajo y volver a escribirlos en formato Apache Parquet en el almacenamiento de objetos de Azure Data Lake Storage Gen2.

  • Un objeto DataFrame de Spark es una estructura de datos etiquetada bidimensional con columnas de tipos potencialmente diferentes. Puede usar un objeto DataFrame para leer y escribir datos fácilmente en varios formatos admitidos. Con un objeto DataFrame, puede cargar datos desde el almacenamiento de objetos en la nube y realizar análisis y transformaciones en ellos dentro del clúster de proceso sin que esto afecte a los datos subyacentes en el almacenamiento de objetos en la nube. Para obtener más información, consulte Uso de DataFrames de PySpark en Azure Databricks.

  • Apache Parquet es un formato de archivo en columnas con optimizaciones para acelerar las consultas. Es un formato de archivo mucho más eficaz que el archivo CSV o JSON. Para obtener más información, consulte Archivos de Parquet.

En el cuaderno, agregue una celda nueva y pegue el código siguiente en ella.

# Use the previously established DBFS mount point to read the data.
# Create a DataFrame to read the csv data.
# The header option specifies that the first row of data should be used as the DataFrame column names
# The inferschema option specifies that the column data types should be inferred from the data in the file
flight_df = spark.read.format('csv').options(
    header='true', inferschema='true').load("/mnt/flightdata/*.csv")

# Read the airline csv file and write the output to parquet format for easy query.
flight_df.write.mode("append").parquet("/mnt/flightdata/parquet/flights")
print("Done")

Presione las teclas MAYÚS + ENTRAR para ejecutar el código de este bloque.

Antes de continuar con la sección siguiente, asegúrese de que se han escrito todos los datos de Parquet y de que aparece "Listo" en la salida.

Exploración de datos

En esta sección, usará la utilidad del sistema de archivos de Databricks para explorar el almacenamiento de objetos de Azure Data Lake Storage Gen2 mediante el punto de montaje de DBFS que creó en la sección anterior.

En una celda nueva, pegue el código siguiente para obtener una lista de los archivos en el punto de montaje. El primer comando genera una lista de archivos y directorios. El segundo comando muestra la salida en formato tabular para facilitar la lectura.

dbutils.fs.ls("/mnt/flightdata")
display(dbutils.fs.ls("/mnt/flightdata"))

Presione las teclas MAYÚS + ENTRAR para ejecutar el código de este bloque.

Observe que el directorio parquet aparece en la lista. Ha guardado los datos de vuelo .csv en formato Parquet en el directorio parquet/flights en la sección anterior. Para enumerar archivos en el directorio parquet/flights, pegue el código siguiente en una nueva celda y ejecútelo:

display(dbutils.fs.ls("/mnt/flightdata/parquet/flights"))

Para crear un nuevo archivo y enumerarlo, pegue el código siguiente en una nueva celda y ejecútelo:

dbutils.fs.put("/mnt/flightdata/mydirectory/mysubdirectory/1.txt", "Hello, World!", True)
display(dbutils.fs.ls("/mnt/flightdata/mydirectory/mysubdirectory"))

Dado que no necesita el archivo 1.txt en este tutorial, puede pegar el código siguiente en una celda y ejecutarlo para eliminar mydirectory de forma recursiva. El parámetro True indica una eliminación recursiva.

dbutils.fs.rm("/mnt/flightdata/mydirectory", True)

Para mayor comodidad, puede usar el comando de ayuda para obtener información detallada sobre otros comandos.

dbutils.fs.help("rm")

Con estos ejemplos de código, ha explorado la naturaleza jerárquica de HDFS usando datos almacenados en una cuenta de almacenamiento con Azure Data Lake Storage Gen2 habilitado.

Consultar los datos

A continuación, puede empezar a consultar los datos cargados en la cuenta de almacenamiento. Escriba cada uno de los siguientes bloques de código en una celda nueva y presione MAYÚS + ENTRAR para ejecutar el script de Python.

DataFrames proporciona un amplio conjunto de funciones (selección de columnas, filtro, unión, incorporación) que permiten resolver problemas comunes de análisis de datos de forma eficaz.

Para cargar un objeto DataFrame desde los datos de vuelo de Parquet guardados anteriormente y explorar algunas de las funcionalidades admitidas, escriba este script en una nueva celda y ejecútelo.

# Read the existing parquet file for the flights database that was created earlier
flight_df = spark.read.parquet("/mnt/flightdata/parquet/flights")

# Print the schema of the dataframe
flight_df.printSchema()

# Print the flight database size
print("Number of flights in the database: ", flight_df.count())

# Show the first 25 rows (20 is the default)
# To show the first n rows, run: df.show(n)
# The second parameter indicates that column lengths shouldn't be truncated (default is 20 characters)
flight_df.show(25, False)

# You can also use the DataFrame to run simple queries. Results are returned in a DataFrame.
# Show the first 25 rows of the results of a query that returns selected colums for all flights originating from airports in Texas
flight_df.select("FlightDate", "Reporting_Airline", "Flight_Number_Reporting_Airline", "OriginCityName", "DepTime", "DestCityName", "ArrTime", "ArrDelay").filter("OriginState = 'TX'").show(258, False)

# Use display to run visualizations
# Preferably run this in a separate cmd cell
display(flight_df)

Introduzca este script en una celda nueva para ejecutar algunas consultas de análisis básico en los datos. Puede optar por ejecutar todo el script (MAYÚS + ENTRAR), resaltar cada consulta y ejecutarla de forma independiente con CTRL + MAYÚS + ENTRAR, o bien escribir cada consulta en una celda independiente y ejecutarla allí.

# create a temporary sql view for querying flight information
flight_data = spark.read.parquet('/mnt/flightdata/parquet/flights')
flight_data.createOrReplaceTempView('FlightTable')

# Print the total number of flights in Jan 2016 (the number of rows in the flight data).
print("Number of flights in Jan 2016: ", flight_data.count())

# Using spark sql, query the parquet file to return the total flights of each airline
num_flights_by_airline=spark.sql("SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline ORDER BY NumFlights DESC")
num_flights_by_airline.show()

# List out all the airports in Texas
airports_in_texas = spark.sql(
    "SELECT DISTINCT(OriginCityName) FROM FlightTable WHERE OriginStateName = 'Texas'")
print('Airports in Texas: ', airports_in_texas.count())
airports_in_texas.show(100, False)

# Find all airlines that fly from Texas
airlines_flying_from_texas = spark.sql(
    "SELECT DISTINCT(Reporting_Airline) FROM FlightTable WHERE OriginStateName='Texas'")
print('Airlines that fly to/from Texas: ', airlines_flying_from_texas.count())
airlines_flying_from_texas.show(100, False)

# List airlines by average arrival delay (negative values indicate early flights)
avg_arrival_delay=spark.sql(
    "SELECT Reporting_Airline, count(*) AS NumFlights, avg(DepDelay) AS AverageDepDelay, avg(ArrDelay) AS AverageArrDelay FROM FlightTable GROUP BY Reporting_Airline ORDER BY AverageArrDelay DESC")
print("Airlines by average arrival delay")
avg_arrival_delay.show()

# List airlines by the highest percentage of delayed flights. A delayed flight is one with a  departure or arrival delay that is greater than 15 minutes
spark.sql("DROP VIEW IF EXISTS totalFlights")
spark.sql("DROP VIEW IF EXISTS delayedFlights")
spark.sql(
    "CREATE TEMPORARY VIEW totalFlights AS SELECT Reporting_Airline, count(*) AS NumFlights FROM FlightTable GROUP BY Reporting_Airline")
spark.sql(
    "CREATE TEMPORARY VIEW delayedFlights AS SELECT Reporting_Airline, count(*) AS NumDelayedFlights FROM FlightTable WHERE DepDelay>15 or ArrDelay>15 GROUP BY Reporting_Airline")
percent_delayed_flights=spark.sql(
    "SELECT totalFlights.Reporting_Airline, totalFlights.NumFlights, delayedFlights.NumDelayedFlights, delayedFlights.NumDelayedFlights/totalFlights.NumFlights*100 AS PercentFlightsDelayed FROM totalFlights INNER JOIN delayedFlights ON totalFlights.Reporting_Airline = delayedFlights.Reporting_Airline ORDER BY PercentFlightsDelayed DESC")
print("Airlines by percentage of flights delayed")
percent_delayed_flights.show()

Resumen

En este tutorial ha:

  • Se han creado recursos de Azure, incluida una cuenta de almacenamiento de Azure Data Lake Storage Gen2 y una entidad de servicio de Azure AD, y permisos asignados para acceder a la cuenta de almacenamiento.

  • Se ha creado un área de trabajo de Azure Databricks, un cuaderno y un clúster de proceso.

  • Se ha usado AzCopy para cargar datos de vuelo .csv no estructurados en la cuenta de almacenamiento de Azure Data Lake Storage Gen2.

  • Se han usado funciones de utilidad del sistema de archivos de Databricks para montar la cuenta de almacenamiento de Azure Data Lake Storage Gen2 y explorar su sistema de archivos jerárquico.

  • Se han usado objetos DataFrame de Apache Spark para transformar los datos de vuelo .csv en formato de Apache Parquet y almacenarlos de nuevo en la cuenta de almacenamiento de Azure Data Lake Storage Gen2.

  • Se han usado objetos DataFrame para explorar los datos de vuelo y realizar una consulta sencilla.

  • Se ha usado SQL de Apache Spark para consultar los datos de vuelo del número total de vuelos de cada aerolínea en enero de 2016, los aeropuertos de Texas, las aerolíneas que vuelan desde Texas, el retraso medio de llegada en minutos de cada aerolínea a nivel nacional y el porcentaje de vuelos de cada aerolínea con retrasos en sus salidas o llegadas.

Limpieza de recursos

Si desea conservar el cuaderno y volver a él más adelante, es buena idea cerrar (finalizar) el clúster para evitar cargos. Para finalizar el clúster, selecciónelo en el selector de proceso situado en la esquina superior derecha de la barra de herramientas del cuaderno, seleccione Finalizar en el menú y confirme la selección (De manera predeterminada, el clúster se finalizará automáticamente tras 120 minutos de inactividad).

Si desea eliminar recursos individuales del área de trabajo, como cuadernos y clústeres, puede hacerlo desde la barra lateral izquierda del área de trabajo. Para obtener instrucciones detalladas, consulte Eliminar un clúster o Eliminar un cuaderno.

Cuando ya no los necesite, elimine el grupo de recursos y todos los recursos relacionados. Para hacerlo en Azure Portal, seleccione el grupo de recursos de la cuenta de almacenamiento y el área de trabajo y seleccione Eliminar.

Pasos siguientes