了解适用于 U-SQL 开发人员的 Apache Spark 代码

重要

Azure Data Lake Analytics于 2024 年 2 月 29 日停用。 通过此公告了解更多信息。

对于数据分析,组织可以使用 Azure Synapse AnalyticsMicrosoft Fabric

本节提供有关如何将 U-SQL 脚本转换为 Apache Spark 的高级指导。

了解 U-SQL 和 Spark 语言和处理模式

在开始将 Azure Data Lake Analytics 的 U-SQL 脚本迁移到 Spark 之前,了解两个系统的一般语言和处理理念非常有用。

U-SQL 是一种类似 SQL 的声明性查询语言,它使用数据流模式,允许轻松地嵌入和横向扩展以 .NET(例如 C#)、Python 和 R 编写的用户代码。用户扩展可以实现简单的表达式或用户定义的函数,但也可以帮助用户实现所谓的用户定义的运算符,这些运算符可以实现自定义运算符来执行行集级别的转换、提取和写入输出。

Spark 是一种横向扩展框架,提供 Scala、Java、Python、.NET 等多种语言绑定。在此框架中,你主要采用其中一种语言编写代码,创建称为弹性分布式数据集 (RDD)、数据帧和数据集的数据抽象,然后使用类似 LINQ 的特定域语言 (DSL) 来转换它们。 它还提供 SparkSQL 作为数据帧和数据集抽象上的声明性子语言。 DSL 提供两种类别的操作:转换和操作。 将转换应用于数据抽象不会执行转换,而是会构建执行计划,该计划将通过操作 (提交以供评估,例如,将结果写入临时表或文件,或打印结果) 。

因此,在将 U-SQL 脚本转换为 Spark 程序时,必须决定要至少使用哪种语言来生成数据帧抽象 (这是目前最常用的数据抽象) ,以及是否要使用 DSL 或 SparkSQL 编写声明性数据流转换。 在一些更复杂的情况下,可能需要将 U-SQL 脚本拆分为 Spark 序列以及使用 Azure Batch 或 Azure Functions 实现的其他步骤。

此外,Azure Data Lake Analytics 在一个为每个作业分配资源的无服务器作业服务环境中提供 U-SQL,而 Azure Synapse Spark、Azure Databricks 和 Azure HDInsight 则通过群集服务的形式或通过所谓的 Spark 池模板提供 Spark。 转换应用程序时,必须考虑到现在创建、调整大小、缩放和解除群集或池授权的影响。

转换 U-SQL 脚本

U-SQL 脚本遵循以下处理模式:

  1. 使用 EXTRACT 语句、位置或文件集规范以及内置或用户定义的提取器和所需的架构从非结构化文件读取数据,或者从 U-SQL 表(托管表或外部表)读取数据。 它表示为行集。
  2. 在多个 U-SQL 语句中转换行集,这些语句将 U-SQL 表达式应用于行集并生成新行集。
  3. 最后,使用指定位置的语句 OUTPUT 和内置或用户定义的输出器将生成的行集输出到文件,或将其输出到 U-SQL 表。

系统延迟计算此脚本,意味着每个提取和转换步骤均组成一个表达式树,并进行全局计算(数据流)。

Spark 程序类似于使用 Spark 连接器读取数据并创建数据帧,然后使用类似 LINQ 的 DSL 或 SparkSQL 在数据帧上应用转换,再将结果写入文件、临时 Spark 表、某些编程语言类型或控制台。

转换 .NET 代码

U-SQL 的表达式语言是 C#,它提供各种方法,通过用户定义的函数、用户定义的运算符和用户定义的聚合器扩展自定义 .NET 代码。

Azure Synapse 和 Azure HDInsight Spark 现在都以原生方式支持使用 .NET for Apache Spark 来执行 .NET 代码。 这意味着,可以重复使用部分或全部与 Spark 配合使用的.NET 用户定义函数。 但请注意,U-SQL 使用 .NET Framework,而 .NET for Apache Spark 基于 .NET Core 3.1 或更高版本。

U-SQL 用户定义运算符 (UDO) 使用 U-SQL UDO 模型来提供对运算符代码的横向扩展执行。 因此,必须将 UDO 重写为用户定义的函数,使之适应 Spark 执行模型。

.NET for Apache Spark 目前不支持用户定义的聚合器。 因此,U-SQL 用户定义聚合器必须转换为用 Scala 编写的 Spark 用户定义聚合器。

如果不想利用 .NET for Apache Spark 功能,则必须将表达式重写为等效的 Spark、Scala、Java 或 Python 表达式、函数、聚合器或连接器。

无论如何,如果 U-SQL 脚本中存在大量 .NET 逻辑,请通过 Microsoft 帐户代表与我们联系以获得进一步指导。

以下详细介绍了 .NET 和 C# 在 U-SQL 脚本中的不同用法案例。

转换标量内联 U-SQL C# 表达式

U-SQL 的表达式语言为 C#。 许多标量内联 U-SQL 表达式是本机实现的,以提高性能,而更复杂的表达式可以通过调用 .NET Framework 来执行。

Spark 有自己的标量表达式语言(属于 DSL 或在 SparkSQL 中),支持调用为 JVM、.NET 或 Python 运行时编写的用户定义的函数。

如果在 U-SQL 中使用标量表达式,则应首先查找最适合的本机理解的 Spark 标量表达式以获得最大性能,然后将其他表达式映射到所选 Spark 运行时语言编写的用户定义函数。

请注意,.NET 和 C# 的语义类型不同于 JVM 和 Python 运行时以及 Spark 的 DSL。 有关类型系统差异的详细信息,请参阅下文

转换用户定义的标量 .NET 函数和用户定义的聚合器

U-SQL 提供调用任意标量 .NET 函数和以 .NET 编写的用户定义聚合器的方法。

Spark 还支持以其承载的大多数语言(可从 Spark DSL 和 SparkSQL 调用)编写的用户定义函数和用户定义聚合器。

如上所述,.NET for Apache Spark 支持以 .NET 编写的用户定义的函数,但不支持用户定义的聚合器。 因此,对于用户定义的函数,可以使用 .NET for Apache Spark,但必须在 Scala for Spark 中创作用户定义的聚合器。

转换用户定义的运算符 (UDO)

U-SQL 提供可采用 .NET 编写(某种程度上也可以采用 Python 和 R)的多种类别的用户定义运算符 (UDO),例如提取器、输出器、减速器、处理器、应用器和合并器。

Spark 不为运算符提供相同的扩展性模型,但对某些运算符具有等效的功能。

Spark 中与提取器和输出器等效的是 Spark 连接器。 对于很多 U-SQL 提取器,可在 Spark 社区中找到等效的连接器。 对于其他连接器,必须编写自定义连接器。 如果 U-SQL 提取器很复杂,且使用多个 .NET 库,则最好采用 Scala 生成连接器,以使用互操作来调用对数据进行实际处理的 .NET 库。 在这种情况下,必须将 .NET Core 运行时部署到 Spark 群集,并确保引用的 .NET 库符合 .NET Standard 2.0。

其他类型的 U-SQL UDO 需要使用用户定义的函数和聚合器以及对应语义的 Spark DLS 或 SparkSQL 表达式进行重写。 例如,处理器可以映射到各种 UDF 调用的 SELECT,打包为一个函数,该函数采用数据帧作为参数并返回数据帧。

转换 U-SQL 的可选库

U-SQL 提供了一组可选和演示库,这些库提供 PythonR、JSON、XML、AVRO 支持和一些 Azure AI 服务功能

Spark 提供自己的 Python 和 R 集成(分别为 pySpark 和 SparkR),并提供连接器来读取和写入 JSON、XML 和 AVRO。

如果需要转换引用 Azure AI 服务库的脚本,建议通过 Microsoft 客户代表与我们联系。

转换类型化值

由于 U-SQL 的类型系统基于 .NET 类型系统,并且 Spark 具有受主机语言绑定影响的自己的类型系统,因此必须确保所操作的类型接近且对于某些类型,类型范围、精度和/或小数位数可能略有不同。 此外,U-SQL 和 Spark 处理 null 值的方式不同。

数据类型

下表提供了 Spark、Scala 和 PySpark 中与给定 U-SQL 类型等效的类型。

U-SQL Spark Scala PySpark
byte
sbyte ByteType Byte ByteType
int IntegerType Int IntegerType
uint
long LongType Long LongType
ulong
float FloatType Float FloatType
double DoubleType Double DoubleType
decimal DecimalType java.math.BigDecimal DecimalType
short ShortType Short ShortType
ushort
char Char
string StringType String StringType
DateTime DateType, TimestampType java.sql.Date, java.sql.Timestamp DateType, TimestampType
bool BooleanType Boolean BooleanType
Guid
byte[] BinaryType Array[Byte] BinaryType
SQL.MAP<K,V> MapType(keyType, valueType, valueContainsNull) scala.collection.Map MapType(keyType, valueType, valueContainsNull=True)
SQL.ARRAY<T> ArrayType(elementType, containsNull) scala.collection.Seq ArrayType(elementType, containsNull=True)

有关详细信息,请参阅:

NULL 处理方式

在 Spark 中,类型默认允许 NULL 值,而在 U-SQL 中,则将标量、非对象显式标记为可为 NULL。 虽然通过 Spark 可以将列定义为不可为 NULL,但不会强制约束,这可能会导致结果错误

在 Spark 中,NULL 表示值未知。 Spark NULL 值不同于任何值,包括自身也各不相同。 比较两个 Spark NULL 值或将 NULL 值与其他任何值进行比较均返回 unknown,这是因为每个 NULL 值都是未知的。

此行为与 U-SQL 不同,后者遵循 C# 语义,其中 null 不同于任何值,但自身是相同的。

因此,即使 column_name 中存在 NULL 值,使用 WHERE column_name = NULL 的 SparkSQL 语句 SELECT 仍返回零行,而在 U-SQL 中,它将返回 column_name 已设置为 null 的行。 同样,即使 column_name 中存在非 NULL 值,使用 WHERE column_name != NULL 的 Spark 语句 SELECT 仍返回零行,而在 U-SQL 中,它将返回具有非 NULL 值的行。 因此,如果想要使用 U-SQL NULL 检查语义,则应分别使用 isnullisnotnull(或其 DSL 等效项)。

转换 U-SQL 目录对象

一个主要区别在于,U-SQL 脚本可以利用其目录对象,而其中许多对象并没有直接的 Spark 等效项。

Spark 支持 Hive 元存储概念(主要为数据库、表和视图),因此你可以将 U-SQL 数据库和架构映射到 Hive 数据库,将 U-SQL 表映射到 Spark 表(请参阅移动 U-SQL 表中存储的数据),但它不支持表值函数 (TVF)、存储过程、U-SQL 程序集、外部数据源等。

可通过 Spark 中的代码函数和库对 U-SQL 代码对象(例如视图、TVF、存储过程和程序集)进行建模,并使用主机语言函数和过程抽象机制(例如,通过导入 Python 模块或引用 Scala 函数)对其进行引用。

如果已使用 U-SQL 目录跨项目和团队共享数据和代码对象,则必须使用等效的共享机制(例如,使用 Maven 共享代码对象)。

转换 U-SQL 行集表达式和基于 SQL 的标量表达式

U-SQL 核心语言正在转换行集,该语言基于 SQL。 下面是 U-SQL 中提供的最常见行集表达式的非表达式列表:

  • SELECT/FROM/WHERE/GROUP BY+聚合+HAVING/ORDER BY+FETCH

  • INNER/OUTER/CROSS/SEMIJOIN 表达式

  • CROSS/OUTERAPPLY 表达式

  • PIVOT/UNPIVOT 表达式

  • VALUES 行集构造函数

  • 设置表达式 UNION/OUTER UNION/INTERSECT/EXCEPT

此外,U-SQL 提供各种基于 SQL 的标量表达式,例如

  • OVER 窗口表达式
  • 各种内置聚合器和排名函数 (SUMFIRST )
  • 一些最熟悉的 SQL 标量表达式:CASELIKE、(NOT) INANDOR

Spark 为大多数这些表达式提供 DSL 和 SparkSQL 形式的等效表达式。 对于某些本机 Spark 不支持的表达式,必须组合本机 Spark 表达式和语义等效模式对其进行重新编写。 例如,必须将 OUTER UNION 转换为等效的投影和联合组合。

由于 NULL 值的处理方式不同,如果比较的两个列都包含 NULL 值,U-SQL 联接将始终匹配行,而 Spark 中的联接将不匹配此类列,除非添加了显式 null 检查。

转换其他 U-SQL 概念

U-SQL 还提供各种其他功能和概念,例如针对SQL Server数据库、参数、标量和 lambda 表达式变量、系统变量、OPTION提示的联合查询。

对 SQL Server 数据库/外部表执行的联合查询

U-SQL 提供数据源、外部表以及对 Azure SQL 数据库的直接查询。 虽然 Spark 不提供相同的对象抽象,但它为可用于查询 SQL 数据库的 Azure SQL Database 提供 Spark 连接器

U-SQL 参数和变量

参数和用户变量在 Spark 及其承载语言中具有等效概念。

例如在 Scala 中,可以使用关键字 var 定义变量:

var x = 2 * 3;
println(x)

U-SQL 的系统变量(以 @@ 开头的变量)可拆分为两个类别:

  • 可设置的系统变量,可将其设置为特定值以影响脚本行为
  • 信息系统变量,可查询系统和作业级别的信息

大多数可设置的系统变量在 Spark 中没有直接等效项。 在作业执行期间,可以通过将信息作为参数传递来对某些信息系统变量进行建模,而其他变量可能在 Spark 承载语言中具有等效的函数。

U-SQL 提示

U-SQL 提供多种语法方法来向查询优化器和执行引擎提供提示:

  • 设置 U-SQL 系统变量
  • 与行集表达式关联的 OPTION 子句,用于提供数据或计划提示
  • 联接表达式语法中的联接提示(例如 BROADCASTLEFT

Spark 基于成本的查询优化器拥有提供提示并优化查询性能的功能。 请参阅相应的文档。

后续步骤