您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

使数据分析管道可操作化Operationalize a data analytics pipeline

数据管道构成多个数据分析解决方案的基础**。Data pipelines underly many data analytics solutions. 顾名思义,数据管道会根据需要获取原始数据、清理和重塑原始数据,然后通常在存储已处理的数据之前执行计算或聚合。As the name suggests, a data pipeline takes in raw data, cleans, and reshapes it as needed, and then typically performs calculations or aggregations before storing the processed data. 处理的数据供客户端、报表或 API 使用。The processed data is consumed by clients, reports, or APIs. 数据管道必须提供可重复的结果,无论是按计划还是由新数据触发。A data pipeline must provide repeatable results, whether on a schedule or when triggered by new data.

本文介绍如何使用 HDInsight Hadoop 群集上运行的 Oozie 让数据管道可操作化,以实现可重复性。This article describes how to operationalize your data pipelines for repeatability, using Oozie running on HDInsight Hadoop clusters. 示例方案演示的数据管道用于准备和处理航班时间序列数据。The example scenario walks you through a data pipeline that prepares and processes airline flight time-series data.

以下方案中,输入数据是一个平面文件,包含一个月的一批航班数据。In the following scenario, the input data is a flat file containing a batch of flight data for one month. 此航班数据包括出发机场和目标机场、飞行英里数、起飞和到达时间等信息。This flight data includes information such as the origin and destination airport, the miles flown, the departure and arrival times, and so forth. 此管道的目标是汇总每日航线绩效,其中每个航线每天有一行记录,含平均起飞和到达延迟时间(分钟数)和该日的总飞行英里数。The goal with this pipeline is to summarize daily airline performance, where each airline has one row for each day with the average departure and arrival delays in minutes, and the total miles flown that day.

YEAR MONTH DAY_OF_MONTHDAY_OF_MONTH 承运商CARRIER AVG_DEP_DELAYAVG_DEP_DELAY AVG_ARR_DELAYAVG_ARR_DELAY TOTAL_DISTANCETOTAL_DISTANCE
20172017 11 33 AAAA 10.14222910.142229 7.8629267.862926 26445392644539
20172017 11 33 ASAS 9.4354499.435449 5.4821435.482143 572289572289
20172017 11 33 DLDL 6.9354096.935409 -2.1893024-2.1893024 19096961909696

示例管道等待一个新时间段的航班数据到达,然后将详细航班信息存储到 Apache Hive 数据仓库,用于长期分析。The example pipeline waits until a new time period's flight data arrives, then stores that detailed flight information into your Apache Hive data warehouse for long-term analyses. 管道还创建一个较小的数据集,用于汇总每日航班数据。The pipeline also creates a much smaller dataset that summarizes just the daily flight data. 此每日航班摘要数据将发送到 SQL 数据库以提供报告,例如用于网站。This daily flight summary data is sent to a SQL Database to provide reports, such as for a website.

下图展示了此示例管道。The following diagram illustrates the example pipeline.

HDI 飞行示例数据管道概述

Apache Oozie 解决方案概述Apache Oozie solution overview

此管道使用 HDInsight Hadoop 群集上运行的 Apache Oozie。This pipeline uses Apache Oozie running on an HDInsight Hadoop cluster.

Oozie 根据操作、工作流和协调器对管道进行描述******。Oozie describes its pipelines in terms of actions, workflows, and coordinators. 操作决定要执行的实际工作,例如运行 Hive 查询。Actions determine the actual work to perform, such as running a Hive query. 工作流定义操作序列。Workflows define the sequence of actions. 协调器定义工作流运行的时间计划。Coordinators define the schedule for when the workflow is run. 协调器还可依照新数据的可用性情况来启动工作流的实例。Coordinators can also wait on the availability of new data before launching an instance of the workflow.

下图展示此示例 Oozie 管道的高级设计。The following diagram shows the high-level design of this example Oozie pipeline.

Oozie 飞行示例数据管道

预配 Azure 资源Provision Azure resources

此管道要求 Azure SQL 数据库和 HDInsight Hadoop 群集位于同一位置。This pipeline requires an Azure SQL Database and an HDInsight Hadoop cluster in the same location. Azure SQL 数据库同时存储管道和 Oozie 元数据存储生成的摘要数据。The Azure SQL Database stores both the summary data produced by the pipeline and the Oozie Metadata Store.

预配置 Azure SQL 数据库Provision Azure SQL Database

  1. 创建 Azure SQL 数据库。Create an Azure SQL Database. 请参阅在 Azure 门户中创建 Azure SQL 数据库See Create an Azure SQL Database in the Azure portal.

  2. 若要确保 HDInsight 群集能够访问连接的 Azure SQL 数据库,请配置 Azure SQL 数据库防火墙规则,允许 Azure 服务和资源访问服务器。To make sure that your HDInsight cluster can access the connected Azure SQL Database, configure Azure SQL Database firewall rules to allow Azure services and resources to access the server. 您可以通过选择 "设置服务器防火墙" 和选择"Azure 服务和资源下方允许 Azure 服务和资源访问 Azure SQL 数据库服务器或数据库的此服务器", 在 Azure 门户中启用此选项。You can enable this option in the Azure portal by selecting Set server firewall, and selecting ON underneath Allow Azure services and resources to access this server for the Azure SQL Database server or database. 有关详细信息,请参阅创建和管理 IP 防火墙规则For more information, see Create and manage IP firewall rules.

  3. 使用查询编辑器执行以下 SQL 语句以创建将存储dailyflights管道每次运行的汇总数据的表。Use Query editor to execute the following SQL statements to create the dailyflights table that will store the summarized data from each run of the pipeline.

    CREATE TABLE dailyflights
    (
        YEAR INT,
        MONTH INT,
        DAY_OF_MONTH INT,
        CARRIER CHAR(2),
        AVG_DEP_DELAY FLOAT,
        AVG_ARR_DELAY FLOAT,
        TOTAL_DISTANCE FLOAT
    )
    GO
    
    CREATE CLUSTERED INDEX dailyflights_clustered_index on dailyflights(YEAR,MONTH,DAY_OF_MONTH,CARRIER)
    GO
    

Azure SQL 数据库现已准备就绪。Your Azure SQL Database is now ready.

预配阿帕奇哈达普群集Provision an Apache Hadoop Cluster

使用自定义元存储创建 Apache Hadoop 群集。Create an Apache Hadoop cluster with a custom metastore. 在从门户(从 "存储" 选项卡)创建群集期间,请确保在Metastore 设置下选择 SQL 数据库。During cluster creation from the portal, from the Storage tab, ensure you select your SQL Database under Metastore settings. 有关选择元存储的详细信息,请参阅在群集创建期间选择自定义元存储For more information on selecting a metastore, see Select a custom metastore during cluster creation. 有关群集创建的详细信息,请参阅在Linux 上使用 HDInsight 入门For more information on cluster creation, see Get Started with HDInsight on Linux.

验证 SSH 隧道设置Verify SSH tunneling set up

若要使用 Oozie Web 控制台查看协调器和工作流实例的状态,请将 SSH 隧道设为 HDInsight 群集。To use the Oozie Web Console to view the status of your coordinator and workflow instances, set up an SSH tunnel to your HDInsight cluster. 有关详细信息,请参阅 SSH 隧道For more information, see SSH Tunnel.

备注

还可以结合使用 Chrome 和 Foxy Proxy 扩展,跨 SSH 隧道浏览群集的 Web 资源。You can also use Chrome with the Foxy Proxy extension to browse your cluster's web resources across the SSH tunnel. 将其配置为通过隧道端口 9876 上的主机 localhost 代理所有请求。Configure it to proxy all request through the host localhost on the tunnel's port 9876. 此方法与适用于 Linux 的 Windows 子系统(也称为 Windows 10 上的 Bash)兼容。This approach is compatible with the Windows Subsystem for Linux, also known as Bash on Windows 10.

  1. 运行以下命令以打开群集的 SSH 隧道,群集的名称CLUSTERNAME在哪里:Run the following command to open an SSH tunnel to your cluster, where CLUSTERNAME is the name of your cluster:

    ssh -C2qTnNf -D 9876 sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  2. 通过导航到头节点上的 Ambari 验证隧道可操作,方法是浏览到:Verify the tunnel is operational by navigating to Ambari on your head node by browsing to:

    http://headnodehost:8080

  3. 要从 Ambari 内部访问Oozie Web 控制台,请导航到oozie > 快速链接> [活动服务器] > Oozie Web UITo access the Oozie Web Console from within Ambari, navigate to Oozie > Quick Links > [Active server] > Oozie Web UI.

配置 HiveConfigure Hive

上传数据Upload data

  1. 下载包含一个月航班数据的示例 CSV 文件。Download an example CSV file that contains flight data for one month. 2017-01-FlightData.zipHDInsight GitHub 存储库下载其 ZIP 文件,并将其解压缩到2017-01-FlightData.csvCSV 文件。Download its ZIP file 2017-01-FlightData.zip from the HDInsight GitHub repository and unzip it to the CSV file 2017-01-FlightData.csv.

  2. 将此 CSV 文件复制到附加到 HDInsight 群集的 Azure 存储帐户,并将其置于 /example/data/flights 文件夹中。Copy this CSV file up to the Azure Storage account attached to your HDInsight cluster and place it in the /example/data/flights folder.

    1. 使用 SCP 将文件从本地计算机复制到 HDInsight 群集头节点的本地存储。Use SCP to copy the files from your local machine to the local storage of your HDInsight cluster head node.

      scp ./2017-01-FlightData.csv sshuser@CLUSTERNAME-ssh.azurehdinsight.net:2017-01-FlightData.csv
      
    2. 使用 ssh 命令连接到群集。Use ssh command to connect to your cluster. 编辑以下命令,将 CLUSTERNAME 替换为群集的名称,然后输入该命令:Edit the command below by replacing CLUSTERNAME with the name of your cluster, and then enter the command:

      ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
      
    3. 从 ssh 会话中,使用 HDFS 命令将文件从头节点本地存储复制到 Azure 存储。From you ssh session, use the HDFS command to copy the file from your head node local storage to Azure Storage.

      hadoop fs -mkdir /example/data/flights
      hdfs dfs -put ./2017-01-FlightData.csv /example/data/flights/2017-01-FlightData.csv
      

创建表Create tables

示例数据现在可用。The sample data is now available. 但是,管道需要两个用于处理的 Hive 表,一个用于传入数据 (rawFlights),一个用于汇总数据 (flights)。However, the pipeline requires two Hive tables for processing, one for the incoming data (rawFlights) and one for the summarized data (flights). 在 Ambari 中创建这些表,如下所示。Create these tables in Ambari as follows.

  1. 通过导航到 http://headnodehost:8080 登录 Ambari。Log in to Ambari by navigating to http://headnodehost:8080.

  2. 从服务列表选择“Hive”****。From the list of services, select Hive.

    阿帕奇·安巴里服务列表选择蜂巢

  3. 选择 Hive 视图 2.0 标签旁的“转到视图”****。Select Go To View next to the Hive View 2.0 label.

    安巴里·阿帕奇·希奇摘要列表

  4. 在查询文本区域中,粘贴以下语句以创建 rawFlights 表。In the query text area, paste the following statements to create the rawFlights table. rawFlights 表在 Azure 存储的 /example/data/flights 文件夹内为 CSV 文件提供读取时架构。The rawFlights table provides a schema-on-read for the CSV files within the /example/data/flights folder in Azure Storage.

    CREATE EXTERNAL TABLE IF NOT EXISTS rawflights (
        YEAR INT,
        MONTH INT,
        DAY_OF_MONTH INT,
        FL_DATE STRING,
        CARRIER STRING,
        FL_NUM STRING,
        ORIGIN STRING,
        DEST STRING,
        DEP_DELAY FLOAT,
        ARR_DELAY FLOAT,
        ACTUAL_ELAPSED_TIME FLOAT,
        DISTANCE FLOAT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES
    (
        "separatorChar" = ",",
        "quoteChar"     = "\""
    )
    LOCATION '/example/data/flights'
    
  5. 选择“执行”以创建表****。Select Execute to create the table.

    hdi ambari 服务蜂巢查询

  6. 若要创建 flights 表,请使用以下语句替换查询文本区域中的文本。To create the flights table, replace the text in the query text area with the following statements. flights表是一个 Hive 管理的表,用于按年、月和月数对加载到其中的数据进行分区。The flights table is a Hive-managed table that partitions data loaded into it by year, month, and day of month. 此表将包含全部历史航班数据,其中原始数据的呈现采用最小粒度,达到每个航班一行数据。This table will contain all historical flight data, with the lowest granularity present in the source data of one row per flight.

    SET hive.exec.dynamic.partition.mode=nonstrict;
    
    CREATE TABLE flights
    (
        FL_DATE STRING,
        CARRIER STRING,
        FL_NUM STRING,
        ORIGIN STRING,
        DEST STRING,
        DEP_DELAY FLOAT,
        ARR_DELAY FLOAT,
        ACTUAL_ELAPSED_TIME FLOAT,
        DISTANCE FLOAT
    )
    PARTITIONED BY (YEAR INT, MONTH INT, DAY_OF_MONTH INT)
    ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde'
    WITH SERDEPROPERTIES 
    (
        "separatorChar" = ",",
        "quoteChar"     = "\""
    );
    
  7. 选择“执行”以创建表****。Select Execute to create the table.

创建 Oozie 工作流Create the Oozie workflow

管道通常按给定时间间隔对数据进行批处理。Pipelines typically process data in batches by a given time interval. 在这种情况下,管道每天处理航班数据。In this case, the pipeline processes the flight data daily. 通过此方法,输入 CSV 文件可按每日、每周、每月或每年的间隔到达。This approach allows for the input CSV files to arrive daily, weekly, monthly, or annually.

示例工作流每天通过三个主要步骤处理航班数据:The sample workflow processes the flight data day-by-day, in three major steps:

  1. 运行 Hive 查询,从 rawFlights 表表示的源 CSV 文件中提取当日日期范围内的数据,并将数据插入 flights 表。Run a Hive query to extract the data for that day's date range from the source CSV file represented by the rawFlights table and insert the data into the flights table.
  2. 运行 Hive 查询在 Hive 中动态创建该日的临时表,其中包含按天和承运商汇总的航班数据的副本。Run a Hive query to dynamically create a staging table in Hive for the day, which contains a copy of the flight data summarized by day and carrier.
  3. 使用 Apache Sqoop 将所有数据从 Hive 中的每日临时表复制到 Azure SQL 数据库中的目标 dailyflights 表。Use Apache Sqoop to copy all the data from the daily staging table in Hive to the destination dailyflights table in Azure SQL Database. Sqoop 读取驻留在 Azure 存储中的 Hive 表中数据的源行,并使用 JDBC 连接将它们加载到 SQL 数据库中。Sqoop reads the source rows from the data behind the Hive table residing in Azure Storage and loads them into SQL Database using a JDBC connection.

这三个步骤由 Oozie 工作流进行协调。These three steps are coordinated by an Oozie workflow.

  1. 从本地工作站创建名为 的文件job.propertiesFrom your local workstation, create a file called job.properties. 使用下面的文本作为文件的起始内容。Use the text below as the starting contents for the file. 然后更新特定环境的值。Then update the values for your specific environment. 文本下方的表总结了每个属性,并指示在哪里可以找到您自己的环境的值。The table below the text summarizes each of the properties and indicates where you can find the values for your own environment.

    nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net
    jobTracker=[ACTIVERESOURCEMANAGER]:8050
    queueName=default
    oozie.use.system.libpath=true
    appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie
    oozie.wf.application.path=${appBase}/load_flights_by_day
    hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql
    hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql
    hiveDailyTableName=dailyflights${year}${month}${day}
    hiveDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/${year}/${month}/${day}
    sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]"
    sqlDatabaseTableName=dailyflights
    year=2017
    month=01
    day=03
    
    propertiesProperty 值源Value source
    nameNodenameNode 附加到 HDInsight 群集的 Azure 存储容器的完整路径。The full path to the Azure Storage Container attached to your HDInsight cluster.
    jobTrackerjobTracker 活动群集的 YARN 头节点的内部主机名。The internal hostname to your active cluster's YARN head node. 在 Ambari 主页上,从服务列表中选择 YARN,然后选择“活动资源管理器”。On the Ambari home page, select YARN from the list of services, then choose Active Resource Manager. 主机名 URI 显示在页面顶部。The hostname URI is displayed at the top of the page. 追加端口 8050。Append the port 8050.
    queueNamequeueName 计划 Hive 操作时使用的 YARN 队列的名称。The name of the YARN queue used when scheduling the Hive actions. 保留为默认值。Leave as default.
    oozie.use.system.libpathoozie.use.system.libpath 保留为 true。Leave as true.
    appBaseappBase Azure 存储中用于部署 Oozie 工作流和支持文件的子文件夹的路径。The path to the subfolder in Azure Storage where you deploy the Oozie workflow and supporting files.
    oozie.wf.application.pathoozie.wf.application.path 要运行的 Oozie 工作流 workflow.xml 的位置。The location of the Oozie workflow workflow.xml to run.
    hiveScriptLoadPartitionhiveScriptLoadPartition Azure 存储中 Hive 查询文件 hive-load-flights-partition.hql 的路径。The path in Azure Storage to the Hive query file hive-load-flights-partition.hql.
    hiveScriptCreateDailyTablehiveScriptCreateDailyTable Azure 存储中 Hive 查询文件 hive-create-daily-summary-table.hql 的路径。The path in Azure Storage to the Hive query file hive-create-daily-summary-table.hql.
    hiveDailyTableNamehiveDailyTableName 动态生成的、用于临时表的名称。The dynamically generated name to use for the staging table.
    hiveDataFolderhiveDataFolder Azure 存储中指向临时表包含的数据的路径。The path in Azure Storage to the data contained by the staging table.
    sqlDatabaseConnectionStringsqlDatabaseConnectionString 指向 Azure SQL 数据库的 JDBC 语法连接字符串。The JDBC syntax connection string to your Azure SQL Database.
    sqlDatabaseTableNamesqlDatabaseTableName Azure SQL 数据库中插入了汇总行的表的名称。The name of the table in Azure SQL Database into which summary rows are inserted. 保留为 dailyflightsLeave as dailyflights.
    yearyear 用于计算航班汇总的日期的年份部分。The year component of the day for which flight summaries are computed. 原样保留。Leave as is.
    月份month 用于计算航班汇总的日期的月份部分。The month component of the day for which flight summaries are computed. 原样保留。Leave as is.
    dayday 用于计算航班汇总的日期的月份部分的日期。The day of month component of the day for which flight summaries are computed. 原样保留。Leave as is.
  2. 从本地工作站创建名为 的文件hive-load-flights-partition.hqlFrom your local workstation, create a file called hive-load-flights-partition.hql. 使用以下代码作为文件的内容。Use the code below as the contents for the file.

    SET hive.exec.dynamic.partition.mode=nonstrict;
    
    INSERT OVERWRITE TABLE flights
    PARTITION (YEAR, MONTH, DAY_OF_MONTH)
    SELECT 
        FL_DATE,
        CARRIER,
        FL_NUM,
        ORIGIN,
        DEST,
        DEP_DELAY,
        ARR_DELAY,
        ACTUAL_ELAPSED_TIME,
        DISTANCE,
        YEAR,
        MONTH,
        DAY_OF_MONTH
    FROM rawflights
    WHERE year = ${year} AND month = ${month} AND day_of_month = ${day};
    

    Oozie 变量使用语法 ${variableName}Oozie variables use the syntax ${variableName}. 这些变量在job.properties文件中设置。These variables are set in the job.properties file. Oozie 在运行时替换实际值。Oozie substitutes the actual values at runtime.

  3. 从本地工作站创建名为 的文件hive-create-daily-summary-table.hqlFrom your local workstation, create a file called hive-create-daily-summary-table.hql. 使用以下代码作为文件的内容。Use the code below as the contents for the file.

    DROP TABLE ${hiveTableName};
    CREATE EXTERNAL TABLE ${hiveTableName}
    (
        YEAR INT,
        MONTH INT,
        DAY_OF_MONTH INT,
        CARRIER STRING,
        AVG_DEP_DELAY FLOAT,
        AVG_ARR_DELAY FLOAT,
        TOTAL_DISTANCE FLOAT
    )
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION '${hiveDataFolder}';
    INSERT OVERWRITE TABLE ${hiveTableName}
    SELECT  year, month, day_of_month, carrier, avg(dep_delay) avg_dep_delay, 
            avg(arr_delay) avg_arr_delay, sum(distance) total_distance 
    FROM flights
    GROUP BY year, month, day_of_month, carrier 
    HAVING year = ${year} AND month = ${month} AND day_of_month = ${day};
    

    此查询创建一个仅将汇总数据存储一天的临时表,请注意 SELECT 语句,该语句按承运商计算每日平均延迟和总飞行距离。This query creates a staging table that will store only the summarized data for one day, take note of the SELECT statement that computes the average delays and total of distance flown by carrier by day. 插入到此表的数据存储在已知位置(路径由 hiveDataFolder 变量指示),以便用作下一步骤中 Sqoop 的源。The data inserted into this table stored at a known location (the path indicated by the hiveDataFolder variable) so that it can be used as the source for Sqoop in the next step.

  4. 从本地工作站创建名为 的文件workflow.xmlFrom your local workstation, create a file called workflow.xml. 使用以下代码作为文件的内容。Use the code below as the contents for the file. 上述步骤在 Oozie 工作流文件中表示为单独的操作。These steps above are expressed as separate actions in Oozie workflow file.

    <workflow-app name="loadflightstable" xmlns="uri:oozie:workflow:0.5">
        <start to = "RunHiveLoadFlightsScript"/>
        <action name="RunHiveLoadFlightsScript">
            <hive xmlns="uri:oozie:hive-action:0.2">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                </configuration>
                <script>${hiveScriptLoadPartition}</script>
                <param>year=${year}</param>
                <param>month=${month}</param>
                <param>day=${day}</param>
            </hive>
            <ok to="RunHiveCreateDailyFlightTableScript"/>
            <error to="fail"/>
        </action>
    
        <action name="RunHiveCreateDailyFlightTableScript">
            <hive xmlns="uri:oozie:hive-action:0.2">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <configuration>
                <property>
                    <name>mapred.job.queue.name</name>
                    <value>${queueName}</value>
                </property>
                </configuration>
                <script>${hiveScriptCreateDailyTable}</script>
                <param>hiveTableName=${hiveDailyTableName}</param>
                <param>year=${year}</param>
                <param>month=${month}</param>
                <param>day=${day}</param>
                <param>hiveDataFolder=${hiveDataFolder}/${year}/${month}/${day}</param>
            </hive>
            <ok to="RunSqoopExport"/>
            <error to="fail"/>
        </action>
    
        <action name="RunSqoopExport">
            <sqoop xmlns="uri:oozie:sqoop-action:0.2">
                <job-tracker>${jobTracker}</job-tracker>
                <name-node>${nameNode}</name-node>
                <configuration>
                <property>
                    <name>mapred.compress.map.output</name>
                    <value>true</value>
                </property>
                </configuration>
                <arg>export</arg>
                <arg>--connect</arg>
                <arg>${sqlDatabaseConnectionString}</arg>
                <arg>--table</arg>
                <arg>${sqlDatabaseTableName}</arg>
                <arg>--export-dir</arg>
                <arg>${hiveDataFolder}/${year}/${month}/${day}</arg>
                <arg>-m</arg>
                <arg>1</arg>
                <arg>--input-fields-terminated-by</arg>
                <arg>"\t"</arg>
                <archive>mssql-jdbc-7.0.0.jre8.jar</archive>
                </sqoop>
            <ok to="end"/>
            <error to="fail"/>
        </action>
        <kill name="fail">
            <message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message>
        </kill>
        <end name="end"/>
    </workflow-app>
    

两个 Hive 查询由其路径在 Azure 存储中访问,其余变量值由job.properties文件提供。The two Hive queries are accessed by their path in Azure Storage, and the remaining variable values are provided by the job.properties file. 此文件将工作流配置为在 2017 年 1 月 3 日运行。This file configures the workflow to run for the date January 3, 2017.

部署和运行 Oozie 工作流Deploy and run the Oozie workflow

使用 bash 会话中的 SCP 部署 Oozieworkflow.xml工作流 ()、Hivehive-load-flights-partition.hql hive-create-daily-summary-table.hql查询 (和 )job.properties和作业配置 ()。Use SCP from your bash session to deploy your Oozie workflow (workflow.xml), the Hive queries (hive-load-flights-partition.hql and hive-create-daily-summary-table.hql), and the job configuration (job.properties). 在 Oozie 中,仅 job.properties 文件可位于头节点的本地存储上。In Oozie, only the job.properties file can exist on the local storage of the headnode. 所有其他文件必须存储在 HDFS 中,在此例中为 Azure 存储。All other files must be stored in HDFS, in this case Azure Storage. 工作流使用的 Sqoop 操作取决于用于与 SQL 数据库进行通信的 JDBC 驱动程序,必须从头节点将其复制到 HDFS。The Sqoop action used by the workflow depends on a JDBC driver for communicating with your SQL Database, which must be copied from the head node to HDFS.

  1. 在头节点本地存储的用户路径下创建 load_flights_by_day 子文件夹。Create the load_flights_by_day subfolder underneath the user's path in the local storage of the head node. 从打开的 ssh 会话中,执行以下命令:From your open ssh session, execute the following command:

    mkdir load_flights_by_day
    
  2. 将当前目录中的所有文件(workflow.xmljob.properties 文件)复制到 load_flights_by_day 子文件夹。Copy all files in the current directory (the workflow.xml and job.properties files) up to the load_flights_by_day subfolder. 从本地工作站执行以下命令:From your local workstation, execute the following command:

    scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:load_flights_by_day
    
  3. 将工作流文件复制到 HDFS。Copy workflow files to HDFS. 从打开的 ssh 会话中,执行以下命令:From your open ssh session, execute the following commands:

    cd load_flights_by_day
    hadoop fs -mkdir -p /oozie/load_flights_by_day
    hdfs dfs -put ./* /oozie/load_flights_by_day
    
  4. mssql-jdbc-7.0.0.jre8.jar本地头节点复制到 HDFS 中的工作流文件夹。Copy mssql-jdbc-7.0.0.jre8.jar from the local head node to the workflow folder in HDFS. 如果群集包含其他 jar 文件,则根据需要修改命令。Revise command as needed if your cluster contains a different jar file. 根据需要workflow.xml进行修订以反映不同的 jar 文件。Revise workflow.xml as needed to reflect a different jar file. 从打开的 ssh 会话中,执行以下命令:From your open ssh session, execute the following command:

    hdfs dfs -put /usr/share/java/sqljdbc_7.0/enu/mssql-jdbc*.jar /oozie/load_flights_by_day
    
  5. 运行工作流。Run the workflow. 从打开的 ssh 会话中,执行以下命令:From your open ssh session, execute the following command:

    oozie job -config job.properties -run
    
  6. 使用 Oozie Web 控制台观察状态。Observe the status using the Oozie Web Console. 从 Ambari 内部,依次选择“Oozie”、“快速链接”和“Oozie Web 控制台”************。From within Ambari, select Oozie, Quick Links, and then Oozie Web Console. 在“工作流作业”选项卡下,选择“所有作业”********。Under the Workflow Jobs tab, select All Jobs.

    hdi oozie Web 控制台工作流

  7. 当状态为"成功"时,查询 SQL 数据库表以查看插入的行。When the status is SUCCEEDED, query the SQL Database table to view the inserted rows. 使用 Azure 端口,导航到 SQL 数据库的窗格,选择“工具”,然后打开“查询编辑器”********。Using the Azure portal, navigate to the pane for your SQL Database, select Tools, and open the Query Editor.

     SELECT * FROM dailyflights
    

现在,工作流要针对单个测试日运行,可使用协调器包装此工作流,将工作流计划为每日运行。Now that the workflow is running for the single test day, you can wrap this workflow with a coordinator that schedules the workflow so it runs daily.

使用协调器运行工作流Run the workflow with a coordinator

若要将此工作流计划为每日运行(或者是一段日期范围内的所有日期),可以使用协调器。To schedule this workflow so that it runs daily (or all days in a date range), you can use a coordinator. 协调器由 XML 文件定义,例如 coordinator.xmlA coordinator is defined by an XML file, for example coordinator.xml:

<coordinator-app name="daily_export" start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" timezone="UTC" xmlns="uri:oozie:coordinator:0.4">
    <datasets>
        <dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC">
            <uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template>
            <done-flag></done-flag>
        </dataset>
    </datasets>
    <input-events>
        <data-in name="event_input1" dataset="ds_input1">
            <instance>${coord:current(0)}</instance>
        </data-in>
    </input-events>
    <action>
        <workflow>
            <app-path>${appBase}/load_flights_by_day</app-path>
            <configuration>
                <property>
                    <name>year</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'yyyy')}</value>
                </property>
                <property>
                    <name>month</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'MM')}</value>
                </property>
                <property>
                    <name>day</name>
                    <value>${coord:formatTime(coord:nominalTime(), 'dd')}</value>
                </property>
                <property>
                    <name>hiveScriptLoadPartition</name>
                    <value>${hiveScriptLoadPartition}</value>
                </property>
                <property>
                    <name>hiveScriptCreateDailyTable</name>
                    <value>${hiveScriptCreateDailyTable}</value>
                </property>
                <property>
                    <name>hiveDailyTableNamePrefix</name>
                    <value>${hiveDailyTableNamePrefix}</value>
                </property>
                <property>
                    <name>hiveDailyTableName</name>
                    <value>${hiveDailyTableNamePrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}${coord:formatTime(coord:nominalTime(), 'MM')}${coord:formatTime(coord:nominalTime(), 'dd')}</value>
                </property>
                <property>
                    <name>hiveDataFolderPrefix</name>
                    <value>${hiveDataFolderPrefix}</value>
                </property>
                <property>
                    <name>hiveDataFolder</name>
                    <value>${hiveDataFolderPrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}/${coord:formatTime(coord:nominalTime(), 'MM')}/${coord:formatTime(coord:nominalTime(), 'dd')}</value>
                </property>
                <property>
                    <name>sqlDatabaseConnectionString</name>
                    <value>${sqlDatabaseConnectionString}</value>
                </property>
                <property>
                    <name>sqlDatabaseTableName</name>
                    <value>${sqlDatabaseTableName}</value>
                </property>
            </configuration>
        </workflow>
    </action>
</coordinator-app>

可以看到,大部分协调器仅将配置信息传递到工作流实例。As you can see, the majority of the coordinator is just passing configuration information to the workflow instance. 但是,有几点需要强调。However, there are a few important items to call out.

  • 第 1 点:coordinator-app 元素上的 startend 属性控制协调器运行的时间间隔。Point 1: The start and end attributes on the coordinator-app element itself control the time interval over which the coordinator runs.

    <coordinator-app ... start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" ...>
    

    协调器负责按照 frequency 属性指定的间隔,在 startend 日期范围内计划操作。A coordinator is responsible for scheduling actions within the start and end date range, according to the interval specified by the frequency attribute. 每个计划的操作反过来按配置运行工作流。Each scheduled action in turn runs the workflow as configured. 在上面的协调员定义中,协调员配置为从 2017 年 1 月 1 日至 2017 年 1 月 5 日运行操作。In the coordinator definition above, the coordinator is configured to run actions from January 1, 2017 to January 5, 2017. 频率由Oozie 表达式语言频率表达式${coord:days(1)}设置为一天。The frequency is set to one day by the Oozie Expression Language frequency expression ${coord:days(1)}. 通过此操作,协调器会按每天一次的频率计划一个操作(以及工作流)。This results in the coordinator scheduling an action (and hence the workflow) once per day. 对于过去的日期范围,如本示例所示,操作将计划为无延迟运行。For date ranges that are in the past, as in this example, the action will be scheduled to run without delay. 操作运行计划的开始日期称为“名义时间”**。The start of the date from which an action is scheduled to run is called the nominal time. 例如,要处理 2017 年 1 月 1 日的数据,协调员将安排标称时间为 2017-01-01T00:00:00 GMT 的操作。For example, to process the data for January 1, 2017 the coordinator will schedule action with a nominal time of 2017-01-01T00:00:00 GMT.

  • 第 2 点:在工作流的日期范围内,dataset 元素指定 HDFS 中查找特定日期范围的数据的位置,并配置 Oozie 如何确定数据是否可进行处理。Point 2: Within the date range of the workflow, the dataset element specifies where to look in HDFS for the data for a particular date range, and configures how Oozie determines whether the data is available yet for processing.

    <dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC">
        <uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template>
        <done-flag></done-flag>
    </dataset>
    

    HDFS 中数据的路径根据 uri-template 元素中提供的表达式动态生成。The path to the data in HDFS is built dynamically according to the expression provided in the uri-template element. 在此协调器中,一天的频率也用于数据集。In this coordinator, a frequency of one day is also used with the dataset. 协调器元素上的开始和结束日期控制操作的计划时间(并定义它们的名义时间),而数据集上的 initial-instancefrequency 控制构建 uri-template 时使用的日期的计算。While the start and end dates on the coordinator element control when the actions are scheduled (and defines their nominal times), the initial-instance and frequency on the dataset control the calculation of the date that is used in constructing the uri-template. 在此情况下,在协调器启动前将初始实例设为一天以确保它选取第一天 (1/1/2017) 的数据。In this case, set the initial instance to one day before the start of the coordinator to ensure that it picks up the first day's (1/1/2017) worth of data. 数据集的日期计算从initial-instance(12/31/2016) 的值向前滚动,以数据集频率(一天)的增量前进,直到找到未通过协调器设置的名义时间(第一个操作的 2017-01-01T00:00:00 GMT)的最新日期。The dataset's date calculation rolls forward from the value of initial-instance (12/31/2016) advancing in increments of dataset frequency (one day) until it finds the most recent date that doesn't pass the nominal time set by the coordinator (2017-01-01T00:00:00 GMT for the first action).

    done-flag 元素指示当 Oozie 在指定时间检查输入数据是否存在时,Oozie 通过目录或文件的存在情况确定数据是否可用。The empty done-flag element indicates that when Oozie checks for the presence of input data at the appointed time, Oozie determines data whether available by presence of a directory or file. 在这种情况下,它是 csv 文件的存在。In this case, it's the presence of a csv file. 如果存在 csv 文件,则 Oozie 假设数据已准备就绪并启动工作流实例以处理文件。If a csv file is present, Oozie assumes the data is ready and launches a workflow instance to process the file. 如果没有 csv 文件存在,Oozie 假定数据尚未就绪,并且工作流的运行进入等待状态。If there's no csv file present, Oozie assumes the data isn't yet ready and that run of the workflow goes into a waiting state.

  • 第 3 点:data-in 元素指定在 uri-template 中替换关联数据集的值时,要用作名义时间的特定时间戳。Point 3: The data-in element specifies the particular timestamp to use as the nominal time when replacing the values in uri-template for the associated dataset.

    <data-in name="event_input1" dataset="ds_input1">
        <instance>${coord:current(0)}</instance>
    </data-in>
    

    在此例中,将实例设为表达式 ${coord:current(0)},这会转换为协调器最初计划的操作的名义时间。In this case, set the instance to the expression ${coord:current(0)}, which translates to using the nominal time of the action as originally scheduled by the coordinator. 换言之,当协调器将操作运行的名义时间计划为 01/01/2017 时,则用 01/01/2017 来替换 URI 模板中的年份 (2017) 和月份 (01) 变量。In other words, when the coordinator schedules the action to run with a nominal time of 01/01/2017, then 01/01/2017 is what is used to replace the YEAR (2017) and MONTH (01) variables in the URI template. 为此实例计算了 URI 模板后,Oozie 会立即检查预期目录或文件是否可用并相应计划工作流的下一次运行。Once the URI template is computed for this instance, Oozie checks whether the expected directory or file is available and schedules the next run of the workflow accordingly.

结合上述三点的结果是:协调器按照逐日的方式计划源数据的处理。The three preceding points combine to yield a situation where the coordinator schedules processing of the source data in a day-by-day fashion.

  • 第 1 点:协调器从名义时间 2017-01-01 开始。Point 1: The coordinator starts with a nominal date of 2017-01-01.

  • 第 2 点:Oozie 在 sourceDataFolder/2017-01-FlightData.csv 中查找可用数据。Point 2: Oozie looks for data available in sourceDataFolder/2017-01-FlightData.csv.

  • 第 3 点:Oozie 找到该文件后,会计划一个工作流实例,该实例会处理 2017-01-01 的数据。Point 3: When Oozie finds that file, it schedules an instance of the workflow that will process the data for 2017-01-01. Oozie 接着继续处理 2017-01-02 的数据。Oozie then continues processing for 2017-01-02. 此计算反复执行至 2017-01-05(不含)。This evaluation repeats up to but not including 2017-01-05.

与工作流一样,协调器的配置也在 job.properties 文件中定义,其中具有工作流使用的设置的超集。As with workflows, the configuration of a coordinator is defined in a job.properties file, which has a superset of the settings used by the workflow.

nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net
jobTracker=[ACTIVERESOURCEMANAGER]:8050
queueName=default
oozie.use.system.libpath=true
appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie
oozie.coord.application.path=${appBase}
sourceDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/
hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql
hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql
hiveDailyTableNamePrefix=dailyflights
hiveDataFolderPrefix=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/
sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]"
sqlDatabaseTableName=dailyflights

在此 job.properties 文件中引入的新属性为:The only new properties introduced in this job.properties file are:

propertiesProperty 值源Value source
oozie.coord.application.pathoozie.coord.application.path 指示 coordinator.xml 文件的位置,其中包含要运行的 Oozie 协调器。Indicates the location of the coordinator.xml file containing the Oozie coordinator to run.
hiveDailyTableNamePrefixhiveDailyTableNamePrefix 动态创建临时表表名时使用的前缀。The prefix used when dynamically creating the table name of the staging table.
hiveDataFolderPrefixhiveDataFolderPrefix 用于存储所有临时表的路径的前缀。The prefix of the path where all the staging tables will be stored.

部署和运行 Oozie 协调器Deploy and run the Oozie Coordinator

若要使用协调器运行管道,请使用与工作流相似的方式执行操作,有一点除外,就是所使用的文件夹的级别比包含工作流的文件夹高一级。To run the pipeline with a coordinator, proceed in a similar fashion as for the workflow, except you work from a folder one level above the folder that contains your workflow. 此文件夹约定将协调器与磁盘上的工作流相分离,所以可将一个协调器与不同的子工作流关联起来。This folder convention separates the coordinators from the workflows on disk, so you can associate one coordinator with different child workflows.

  1. 从本地计算机使用 SCP 将协调器文件复制到群集头节点的本地存储。Use SCP from your local machine to copy the coordinator files up to the local storage of the head node of your cluster.

    scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:~
    
  2. SSH 到头节点内。SSH into your head node.

    ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
    
  3. 将协调器文件复制到 HDFS。Copy the coordinator files to HDFS.

    hdfs dfs -put ./* /oozie/
    
  4. 运行协调器。Run the coordinator.

    oozie job -config job.properties -run
    
  5. 使用 Oozie Web 控制台验证状态,这一次选择“协调器作业”选项卡,然后选择“全部作业”********。Verify the status using the Oozie Web Console, this time selecting the Coordinator Jobs tab, and then All jobs.

    Oozie Web 控制台协调器作业

  6. 选择一个协调器实例显示计划操作的列表。Select a coordinator instance to display the list of scheduled actions. 在此例中,应会显示四个名义时间在 2017/1/1 到 2017/1/4 范围内的操作。In this case, you should see four actions with nominal times in the range from 1/1/2017 to 1/4/2017.

    Oozie Web 控制台协调器作业

    此列表中的每个操作对应于工作流的一个实例,用于处理一天的数据量,其中该天的开始时间由名义时间指示。Each action in this list corresponds to an instance of the workflow that processes one day's worth of data, where the start of that day is indicated by the nominal time.

后续步骤Next steps

Apache Oozie 文档Apache Oozie Documentation