2018 年 6 月

第 33 卷,第 6 期

此文章由机器翻译

Azure Databricks - 使用 Application Insights 监视 Databricks 作业

通过Joseph Fultz|年 6 月 2018

我们在想要实现或将其解决方案迁移到云的大型公司侧重于数据和分析的团队中工作。这些操作都会附带优化和重新设计不同程度的应用程序以确保它们充分利用云提供的明显工作。因为这些工作可能是应用程序自身一样大,有其他难题对于组织刚开始其云旅程,因为它们还必须执行的所有工作,以及其操作将功能扩展到云中。并且,作为新的技术中显现出来和发展,这些必须折叠到现有操作的基础结构,以及。这是通过 Spark 和基于 Apache Hadoop 的解决方案中存在的挑战之一。是,Apache Ambari 存在旨在提供良好的仪表板,并具有一个 API,用于公开度量值,但许多组织已有在投资和其他监视和处理的解决方案,如 Azure Application Insights 更好地理解。

假设 WebJob 来拉取消息从 Azure 事件中心进行一些初始验证,然后将它们迁移到 Azure 存储,如中所示,通过多个 Spark 作业,处理数据的位置放置图 1。目标是可以提供信息表明不仅发生了什么情况,但还过程和业务特定的信息时处于飞行状态的一个运行时仪表板。此外,那就太棒要能够跟踪该信息作为一个整体进程的流和查看其构成进程的详细信息。

图 1 单个解决方案,单独的进程,单独的步骤

当然,你可以查看默认度量值,Application Insights 和 Spark 的一些信息中的 web 作业的作业中 Ambari-和汇总 all up with Azure 日志分析的下午 hoc 见解。但是,不想要查看具有 4 个步骤的两个独立流程。我们想要查看作为一个整体的过程,我们希望运行时的见解和运行时的警报。

在本文中,我们将演练一些注意事项和规划使一起使用 Application Insights 的完整项目。此外,我们将使用 Spark 的 Azure Databricks 风格因为它具有很好的一套功能,帮助我们更轻松地开发和实施我们的工作流。

规划应用程序 Insights 遥测

我们不会涵盖核心概念,但是对于这些概念入门需要花费阅读联机文档,网址bit.ly/2FYOCyp。此外,Victor Mushkatin 和 Sergey Kanzhelev 编写有关优化遥测数据收集,"优化遥测与 Application Insights"的很好的文章 (msdn.com/magazine/mt808502)。在这里,我们将重点组织我们笔记本和作业,以便正确地执行操作、 事件和我们从我们 Databricks 作业发送的数据的形式跟踪。

在 Databricks,你可以将作业定义为某些参数的笔记本的执行。图 2阐释了几个组织 Databricks 笔记本中的工作的基本方法。

图 2 用于 Databricks 笔记本作业的基本组织方式方法

图 2演示两种简单的可能在其中一个作业定义为大量的代码块或其他作业显示安排的子笔记本执行的控件笔记本时调用的函数的单个笔记本请按顺序或并行。这不是,通过任何方式,可以使用,只有组织,但足以帮助阐释如何考虑相关。如何着手组织笔记本和代码是肯定值得主题,具体取决于的大小和作业的性质很大的变化。有关 Databricks 笔记本工作流的更详细的信息,请在博客文章中,查看"Notebook 工作流:若要实现 Apache Spark 管道的最简单方法"(bit.ly/2HOqvTj)。

请注意笔记本组织具有独立的操作组事件 Application Insights 中的报表可用于对齐。在 Application Insights 相关是通过两个属性来完成:操作 Id 和父操作 id。中所示图 2,我们想要捕获的所有离散事件并在代码中的度量值阻止或分隔可通过对每一节中使用不同的操作 Id 的单个操作的上下文中的笔记本。此外,我们想要作为一个整体,我们可以通过设置为每个操作中的报表的所有度量值的相同值的上下文的父操作 Id 来执行此操作的一部分中查看这些单独的大操作块。父操作 Id 还可以传递中从外部触发器对于作业,然后将提供一种机制来链接所有以前的进程和 Azure Databricks 作业作为由单个 gestalt 操作的一部分从独立的操作父操作 Application Insights 中的 Id。

我们已描述以下几个方案。关键的一点是,应考虑要如何作为一个整体作业组织中的一部分组织操作、 事件和度量值。

将 Application Insights 添加到环境

若要准备环境,你需要以在群集上安装 Python Application Insights 库、 抓住某些配置设置,然后添加的帮助器代码。你可以找到 pypi 上的 Application Insights (pypi.python.org/pypi/applicationinsights/0.1.0)。将其添加到 Databricks,只需在 (我们创建一个名为的 Lib) 工作区中选择的位置,右键单击并选择创建,则库。一次,你可以输入 pypi 应用程序名称然后 Databricks 将下载并安装包。你将需要决定的最后一步是想要自动将库附加到所有群集。

尝试减少的代码将添加到每个笔记本,我们添加了有几种帮助器函数的包含文件:

def NewTelemetryClient (applicationId, operationId="",  
  parentOperationId=""):
  tc = TelemetryClient(instrumentationKey)
  tc.context.application.id = applicationId
  tc.context.application.ver = '0.0.1'
  tc.context.device.id = 'Databricks notebook'
  tc.context.operation.id = operationId
  tc.context.operation.parentId = parentOperationId
  return tc

此代码包含一个名为 NewTelemetryClient 创建遥测客户端对象、 设置的某些属性和令对象返回到调用方的工厂函数。如你所见,它采用一个父级,操作 Id 以及操作 id。此值将初始化对象,但请注意,是否你需要更改操作 Id,你将需要执行此作业笔记本中直接操作。此外值得注意的,TelemetryClient 构造函数采用检测密钥,可以在你想要使用 Application Insights 实例的属性边栏选项卡中找到。我们已以静态方式分配了几个值所需的示例中,但 TelemetryClient 上下文对象有许多子对象和可用的属性。如果你需要初始化其他值,它将是执行此操作的位置。掉工厂函数分隔选项卡中的混乱,并还简化了开发人员将粘贴到笔记本从沙盒原型种代码转换为一个企业的作业类型的实现的实现。 

使用添加到群集和定义安装程序笔记本库,我们只需在作业笔记本来运行安装程序,然后创建初学者遥测对象顶部添加一行。我们将发出在粘贴到笔记本的顶部运行命令的 %:

%run ./AppInsightsSetup

在后续的单元格中我们将只需实例化 TelemetryClient 对象的新实例。

图 3显示从我们创建的预测示例的代码。有几件事记下此处。首先,我们传递了大量变量到作为作业初始化,而这是通过用作 Databricks 环境的一部分提供的 dbutils.widgets 对象的一部分发送的笔记本。因为我们父操作和独立的操作需要几个 Id,我们将继续操作并检查那些并,如果他们是空的创建并分配新的 Uuid。分配任意 Id 在此情况下主要是为更加轻松地以交互方式运行。但是,其他方法无法执行,如作业笔记本的代码封装为一系列的函数并运行测试,通过调用带有特定 id。 父函数都可以充分适用于此处我们目的正常工作。我们将分配的最后一步是操作名称,这最终显示在 Application Insights 可以用于查看和 group by、 使用的东西中所示图 4

图 3 笔记本初始化代码

baseRatingsFile = dbutils.widgets.get("baseRatingsFile")
newRatingsFile = dbutils.widgets.get("newRatingsFile")
trainOperationId = dbutils.widgets.get("trainOperationId")
parentOperationId = dbutils.widgets.get("parentOperationId")
maxIterations =  int(dbutils.widgets.get("maxIterations"))
numFolds = int(dbutils.widgets.get("numFolds"))
numUserRecommendations = int(
  dbutils.widgets.get("numUserRecommendations"))
predictionFilePath = dbutils.widgets.get("predictionFilePath")
if trainOperationId == "":
  trainOperationId = NewCorrelationId()
if parentOperationId == "":
  parentOperationId = NewCorrelationId()
#setup other needed variables
telemetryClient = NewTelemetryClient("PredictionExample",
  trainOperationId, parentOperationId)
telemetryClient.context.operation.name = "Train Model"

查看图 3,你可以查看操作名称已分配的训练模型的值。图 4描绘数据的网格中后它已被选作数据的分组机制。当我们运行更多作业,并分配不同的操作名称,我们将能够查看显示在视图中。与就地这些内容,我们在形状良好,用于检测我们作业代码,以捕获事件和度量值。

Application Insights 中的操作名称

图 4 中 Application Insights 的操作名称

检测 Databricks 作业代码

让我们演练一下的示例,使用 Application Insights 监视 Databricks 中的典型数据工程作业。在此方案中,我们将使用从 Fannie Mae 公开可用数据 (bit.ly/2AhL5sS) 并将对独门独户贷款性能采用原始源数据和准备用于报告和分析。若要正确准备数据,需要执行几个步骤。使用每个步骤中,我们将捕获信息,如记录计数和运行时间,并记录这些 Application Insights 中。图 5阐释了作业中的高级步骤。我们已使用在顶部的标题结算图 5来标识我们单独的操作。

图 5 数据流工程作业

此外,我们建立了一组具有类似名称 (例如,写入的持续时间、 读取持续时间、 记录计数) 将报告的测量值以不同方式命名的事件中。这将是在分析中重要,因为我们查看特定的指标,然后由操作或事件中查看它们。中所示图 5,首先我们引入多个数据文件,然后合并和转换它们,并最后写入到两个目标位置。完全已准备的数据集保存到长期 Blob 存储和聚合的子集发送到我们 RDBMS,Azure SQL 数据库。当然,在每个高级步骤有几个子步骤。具体而言,我们将四个不同的文件导入、 将它们合并为单个的 Spark 数据帧,和原始、 合并数据集写入 Blob 存储。合并的数据然后读入出 Blob 存储的新的数据帧用于清理和转换。若要完成转换,我们子集数据帧 (即,缩小它到仅限相关列),将列重命名为有意义的名称,并将服务名称列中的 null 值。数据的最终形式保存在 Parquet 文件格式。在此示例中的最后一步保存到 Azure SQL 数据库的数据。

对于此 Azure Databricks 作业示例中,我们已使用在单独的代码的单元格编程的步骤执行的单个笔记本方法。一个父操作 Id 设置为每个运行作业。(子) 操作 Id 适用于该作业,每项操作,我们已定义购置、 转换和持久性为这些操作。我们将跟踪事件发生的每个操作,在作业运行时在 Application Insights 记录时间戳、 记录计数、 持续时间和其他参数。

如在前面的预测示例中,我们将 Python 包"applicationinsights"添加到群集,运行安装程序 notebook,而实例化 TelemetryClient 对象的新实例。此时我们命名实例 DataEngineeringExample,然后将初始操作名称设置为获取,以便准备步骤我们第一个序列,以获取源的数据:

telemetryClient = NewTelemetryClient(
  "DataEngineeringExample", operationId, parentOperationId)
telemetryClient.context.operation.name = "Acquisition"

接下来,我们捕获当前时间和跟踪中 Application Insights,记录已启动作业我们第一个事件:

import datetime
jobStartTime = datetime.datetime.now()
jobStartTimeStr = str(jobStartTime)
telemetryClient.track_event('Start Job', { 'Start Time': jobStartTimeStr,
  'perfDataFilePath':perfDataFilePath, 'perfDataFileNamePrefix' :  
  perfDataFileNamePrefix, 'consolidatedDataPath':consolidatedDataPath, 
  'transformedFilePath' : transformedFilePath, 'perfDBConnectionString':
  perfDBConnectionString, 'perfDBTableName': perfDBTableName})
telemetryClient.flush()

这是用于将当前时间戳设置为该作业的开始时间并将其记录在我们的第一个 Application Insights 事件的代码。首先,我们为适当的日期和时间函数,导入的 Python 库日期时间,并将变量 jobStartTime 设置为当前时间戳。值得注意的 track_event 的签名 ([eventName],[{属性}],[{度量}]) 方法采用参数的事件名称、 属性的字典和的测量值的字典。为此,时间戳变量必须是 JSON 序列化以将其包括在遥测事件的属性。因此,我们将以字符串形式的 jobStartTime 对象强制转换,并将值放入新的变量 jobStartTimeStr。在下一步的步骤中,我们发送使用 track_event 方法的我们初始遥测事件将其传递我们自定义事件名称开始时间以及我们选择了要捕获与此事件的多个参数。作业中介绍了各种文件路径和连接字符串引用的参数。例如,perfDataFilePath 包含源数据文件的位置和 perfDBConnectionString 对于 Azure SQL 数据库,我们将保留的某些数据包含的连接字符串。这是在其中我们查看 0 记录连接或具有警报设置; 这种情况下有用的信息我们可以快速了解相关的操作的遥测数据,并快速检查的文件和/或正在访问的数据库。

现在我们可以继续执行到笔记本中的命令单元格之间将类似的事件跟踪代码添加到每个步骤,相关的内部作业的步骤的少量更改。因为很有帮助,要在整个数据工程作业记录计数用于监视性能和资源使用率时,请考虑数据量,我们已添加到每个跟踪事件的记录计数度量值。

图 6显示几个基本的数据转换,为 Application Insights 跟事件跟踪。在异常处理 Try 块中,我们执行三种类型的转换在一次 perfTransformDF 数据帧上。我们子集通过仅允许组选择的相关列和丢弃剩余数据帧。我们将服务名称列中的 null 值替换为"未知。 并且,因为原始的列名称为无意义 (例如,"_C0,""_C1"),我们将重命名列的相关子集为有意义的名称,如"loan_id"和"loan_age。"

图 6 数据转换事件跟踪代码

if notebookError == "":
  try:
    perfTransformedDF = perfTransformedDF['_c0','_c1','_C2','_C3','_C4', \
                                          '_C5','_C6','_C7','_C8','_C9', \
                                          '_C10','_C11','_C12','_C13'] \
      .fillna({'_C2':'UNKNOWN'}) \
      .withColumnRenamed("_C0", "loan_id") \
      .withColumnRenamed("_C1", "period") \
      .withColumnRenamed("_C2", "servicer_name") \
      .withColumnRenamed("_C3", "new_int_rt") \
      .withColumnRenamed("_C4", "act_endg_upb") \
      .withColumnRenamed("_C5", "loan_age") \
      .withColumnRenamed("_C6", "mths_remng") \
      .withColumnRenamed("_C7", "aj_mths_remng") \
      .withColumnRenamed("_C8", "dt_matr") \
      .withColumnRenamed("_C9", "cd_msa") \
      .withColumnRenamed("_C10", "delq_sts") \
      .withColumnRenamed("_C11", "flag_mod") \
      .withColumnRenamed("_C12", "cd_zero_bal") \
      .withColumnRenamed("_C13", "dt_zero_bal")
    print("nulls replaced")
    end = datetime.datetime.now()
    rowCount = perfTransformedDF.count()
    duration = round((end - start).total_seconds(), 1)
    telemetryClient.track_event('Transformation Complete', {}, \
                                { 'Records Transformed': rowCount, \
                                 'Transformation Duration':duration })
    telemetryClient.flush()
  except Exception as e:
    notebookError = str(e)
    telemetryClient.track_exception(e,{"action":"column transform"},{})
else:
  print("command skipped due to previous error")

转换完成后,我们捕获变量"end"中的当前时间戳与时间完成; 此步骤数据帧; 中的行计数并计算基于开始和结束时间单步持续时间。我们将该遥测发送到 Application Insights 中,其 telemetryClient.track_event 方法,使用"转换完成,"事件名称,我们包括用于记录转换和转换持续时间的度量。

我们将添加到我们笔记本纯粹是为了说明跟踪异常使用 Application Insights 和处理某些非常基本异常。注意在中除中块图 6 ,如果我们捕获异常,我们将调用 track_exception 方法。我们将异常作为第一个参数传递和后续参数是相同的类型如下所示 track_event,从而可以记录可能围绕着的事件一样多的信息。若要在此处进行的一个重要注意事项是,目前内联 sql 任何异常处理语义。因此,它可能是最好添加异常处理,如生产作业的 %sql magics 跳过之前支持。

在我们数据工程作业中,包括用于获取和持久性操作的其他步骤遵循将自定义度量遥测事件发送到 Application Insights 的转换代码所示的模式。

配置分析和警报

替换的位置,以将遥测发送代码,我们将配置应用程序的见解,从而创建实时仪表板、 查看通过事件和关联的事件的详细信息,然后设置警报通知,并且可能采取基于事件触发器的操作。

图 7描述了几种我们已经通过资源管理器中度量值边栏选项卡和 Application Insights 中的度量值 (预览版) 边栏选项卡配置,然后固定到我们的 Azure 门户仪表板的图表。

在 Azure 的仪表板上的应用程序 Insights 图表

图 7 Azure 仪表板上的应用程序 Insights 图表

记下右两个四分位数。右上角显示的网格按我们报告遥测下我们添加跟踪调用时使用的操作名称分组的持续时间。右下角显示记录计数度量值按我们使用的事件名称分组。确保足够"保留到 SQL DB"是比其他,要低得多,因为这是一个小的、 经过筛选子集的我们的数据写入 Azure SQL 数据库的事件。选择你的操作分组、 操作名称和事件名称是关闭在此时作为你的付款获取来直观显示并报告有关你考虑你的操作有意义的方式数据规划的一个重要部分。

在左侧的两个分位图 7显示使用具有良好的配置 UI 中,将拆分基于另一个属性的度量值以及一些其他功能的度量值 (预览版) 创建的图表。在左上方中,你可以看到的记录计数,但我们已将其拆分,以便此报告的事件名称,为不同的事件的记录计数的关系图和数据让我们。此处我们要比较的源数据已读取到合并的数据已加载到数据帧时更高版本执行的记录计数时采用的记录计数。由于记录计数可能非常普遍度量跨我们父操作,但我们想要看到它在每个操作或事件,这是一项重要功能。

如果你看到的内容在一个操作调用的一些研究的图,则可以搜索通过所有的遥测数据。图 8描述了搜索和关系图显示一段时间的左窗格中的匹配项计数的结果。在右窗格中你可以查看所有事件中记录的信息。回想一下 track_event ([名称] [属性] [度量]) 签名。我们已在其中你可以看到在顶部的自定义属性的 SQL DB 事件上的保留的详细信息移至超类。在中间,标记为自定义数据是可在其中找到了与遥测数据一起发送的自定义度量值。在右下方是所有其中你可以轻松地导航到全部属于操作或父操作的事件的相关项目。此外,在底部才能看到所有可用的遥测数据在事件的时间是行。如果已针对你的运行时监视对 Application Insights 进行标准化,这是一个强大的工具,以了解总体的系统状态和事件的操作上下文。具有深入了解发生了什么情况广泛可能帮助说明当记录计数处于禁用状态或持续时间是 askew。

事件搜索和详细信息

图 8 事件搜索和详细信息

我们想要涵盖 Application Insights 的最后一步是设置警报的能力。在图 9可以查看警报的配置的一部分。我们讨论过的其他元素,如我们发送的事件中的自定义信息显示在此处我们选择作为警报的条件。

图 9 上度量值的警报设置

如你所料,警报可以发送一封电子邮件。但是,它还可以调用 WebHook,因而提供一种很好便捷的方式来执行您可能需要的任何其他操作。Azure 函数是完美适合于此安装程序,您便可创建您喜欢的任何自定义操作。更有趣的是,Application Insights 与 Logic Apps 直接集成。这样,集成并安排各种集成和 Microsoft Azure 中的操作的本机功能。因此,Application Insights 警报可以开始通过 Logic Apps 业务流程,包括操作以及与下游和上游系统的集成的补偿和/或纠正操作时通知用户。

结束语

我们想要确保我们突出显示关键信息的位。Application Insights 不是一个日志分析解决方案。它与 Azure 日志分析,它提供下午 hoc 分析和长期的日志保留集成。Application Insights 适用于监视和分析你的运行时操作,向您提供信息、 insights 和有关发生了什么情况现在的警报。与其他 Azure 服务和广泛应用的平台 Sdk 的直接集成可以很好的合适大小,以帮助实施 Azure Databricks 作业。因此,对于这些作业监视并未完成接收器中但而是在完整的解决方案体系结构上下文中。


Joseph Fultz  是 Microsoft 的云解决方案架构师。他适用于 Microsoft 客户为解决业务问题列弗 eraging 体系结构的开发 Microsoft Azure。此前,Fultz 负责开发和生成 GM 的汽车共享程序 (mavendrive.com)。Con 原封不动他在 Twitter 上: @JosephRFultz或通过电子邮件jofultz@microsoft.com

Ryan 墨是存留于 Saint 圣路易斯,密苏里州的解决方案架构师他已生成和创新与数据几乎 20 年,游戏和农行业中包括大量工作。目前,墨帮助世界上最大企业的某些实现其业务现代化与由 Microsoft Azure 云提供支持的数据解决方案。在 Twitter 上关注他: @murphrp