教程:使用自动加载程序将数据连续引入 Delta Lake
连续的增量数据引入是一种常见需求。 例如,从手机游戏到电子商务网站再到 IoT 传感器的应用程序都将生成连续的数据流。 分析师希望能够访问最新数据,但出于以下几个原因,实现可能会很难:
- 你可能需要在数据到达时转换和引入数据,而只处理一次文件。
- 你可能希望在写入表之前强制实施架构。 此逻辑可能很难编写和维护。
- 处理其架构随时间变化的数据极具挑战性。 例如,你必须确定如何处理具有数据质量问题的传入行,以及如何在解决原始数据问题后重新处理这些行。
- 每分钟处理数千或数百万个文件的可扩展解决方案需要集成事件通知、消息队列和触发器等云服务,这会增加开发复杂性和长期维护工作。
构建连续、经济高效、可维护且可缩放的数据转换和引入系统并不简单。 Azure Databricks 提供自动加载程序作为解决上述问题的内置优化解决方案,并为数据团队提供一种以更低的成本和延迟从云对象存储加载原始数据的方法。 自动加载程序会自动为新文件配置通知服务并侦听该服务,每秒最多可以纵向扩展到数百万个文件。 它还负责处理常见的问题,例如架构推理和架构演变。 要了解详细信息,请参阅自动加载程序。
在本教程中,你将使用自动加载程序将数据增量引入(加载)到 Delta 表中。
要求
- 一个 Azure 订阅、该订阅中的一个 Azure Databricks 工作区以及该工作区中的一个群集。 要创建这些内容,请参阅快速入门:使用 Azure 门户在 Azure Databricks 工作区上运行 Spark 作业。 (如果按照本快速入门中的说明操作,则无需按照“运行 Spark SQL 作业”部分中的说明操作。)
- 熟悉 Azure Databricks 工作区用户界面。 请参阅浏览工作区。
步骤 1. 创建示例数据
在此步骤中,你将在工作区中创建笔记本。 在此笔记本中,你将运行每隔 30 秒在工作区中生成一个逗号分隔的随机文件的代码。 其中每个文件都包含一组随机数据。
注意
自动加载器还处理以下格式的数据:Avro、二进制、CSV、JSON、ORC、Parquet 和文本。
在工作区的边栏中单击“创建”>“笔记本”。
在“创建笔记本”对话框中,输入笔记本名称,例如
Fake Data Generator。对于“默认语言”,请选择“Python”。
对于“群集”,请选择在“要求”部分中创建的群集,或选择要使用的另一个可用群集。
单击“创建”。
在笔记本的菜单栏中,如果群集名称旁边的圆圈未包含绿色复选标记,请单击群集名称旁边的下拉箭头,然后单击“启动群集”。 单击“确认”,然后等待圆圈包含绿色复选标记。
在笔记本的第一个单元格中,粘贴以下代码:
import csv import uuid import random import time from pathlib import Path count = 0 path = "/tmp/generated_raw_csv_data" Path(path).mkdir(parents=True, exist_ok=True) while True: row_list = [ ["id", "x_axis", "y_axis"], [uuid.uuid4(), random.randint(-100, 100), random.randint(-100, 100)], [uuid.uuid4(), random.randint(-100, 100), random.randint(-100, 100)], [uuid.uuid4(), random.randint(-100, 100), random.randint(-100, 100)] ] file_location = f'{path}/file_{count}.csv' with open(file_location, 'w', newline='') as file: writer = csv.writer(file) writer.writerows(row_list) file.close() count += 1 dbutils.fs.mv(f'file:{file_location}', f'dbfs:{file_location}') time.sleep(30) print(f'New CSV file created at dbfs:{file_location}. Contents:') with open(f'/dbfs{file_location}', 'r') as file: reader = csv.reader(file, delimiter=' ') for row in reader: print(', '.join(row)) file.close()上述代码执行以下操作:
如果目录尚不存在,请在工作区中的
/tmp/generated_raw_csv_data处创建一个目录。提示
如果因其他人运行了本教程而导致你的工作区中已存在此路径,建议先清除此路径中的所有现有文件。
创建一组随机数据,例如:
id,x_axis,y_axis d033faf3-b6bd-4bbc-83a4-43a37ce7e994,88,-13 fde2bdb6-b0a1-41c2-9650-35af717549ca,-96,19 297a2dfe-99de-4c52-8310-b24bc2f83874,-23,4330 秒后,创建一个名为
file_<number>.csv的文件,将这组随机数据写入该文件,将该文件存储在dbfs:/tmp/generated_raw_csv_data中,并报告该文件的路径及其内容。<number>从0开始,每创建一次文件增加 1(例如file_0.csv、file_1.csv,依此类推)。
在笔记本的菜单栏中,单击“全部运行”。 使此笔记本保持运行状态。
注意
若要查看生成的文件列表,请在边栏中单击“数据”。 单击“DBFS”,在出现提示时选择群集,然后单击“tmp”>“generated_raw_csv_data”。
步骤 2:运行自动加载程序
此步骤将使用自动加载程序在工作区中的某个位置连续读取原始数据,然后将该数据流式传输到同一工作区中的另一个位置的 Delta 表中。
在边栏中单击“创建”>“笔记本”。
在“创建笔记本”对话框中,输入笔记本名称,例如
Auto Loader Demo。对于“默认语言”,请选择“Python”。
对于“群集”,请选择在“要求”部分中创建的群集,或选择要使用的另一个可用群集。
单击“创建”。
在笔记本的菜单栏中,如果群集名称旁边的圆圈未包含绿色复选标记,请单击群集名称旁边的下拉箭头,然后单击“启动群集”。 单击“确认”,然后等待圆圈包含绿色复选标记。
在笔记本的第一个单元格中,粘贴以下代码:
raw_data_location = "dbfs:/tmp/generated_raw_csv_data" target_delta_table_location = "dbfs:/tmp/table/coordinates" schema_location = "dbfs:/tmp/auto_loader/schema" checkpoint_location = "dbfs:/tmp/auto_loader/checkpoint"此代码在工作区中定义原始数据和目标 Delta 表的路径、表架构的路径以及自动加载程序在 Delta Lake 事务日志中写入检查点文件信息的位置的路径。 检查点使自动加载程序能够仅处理新传入的数据,并跳过任何已处理过的现有数据。
提示
如果因其他人运行了本教程而导致你的工作区中已存在这些路径中的任何一个,建议先清除这些路径中的所有现有文件。
如果游标仍位于第一个单元格中,则运行该单元格。 (若要运行该单元,请按 Shift+Enter。)Azure Databricks 会将指定的路径读入内存中。
在第一个单元格下面添加一个单元格(如果尚未存在)。 (若要添加一个单元,请将鼠标指针置于该单元的下边缘,然后单击 + 图标。)在这第二个单元中粘贴以下代码(请注意
cloudFiles表示自动加载程序):stream = spark.readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "csv") \ .option("header", "true") \ .option("cloudFiles.schemaLocation", schema_location) \ .load(raw_data_location)运行此单元格。
在笔记本的第三个单元格中,粘贴以下代码:
display(stream)运行此单元格。 自动加载程序将开始处理
raw_data_location中的现有 CSV 文件以及到达该位置的任何传入 CSV 文件。 自动加载程序使用文件中的第一行作为文件名称,其余行作为字段数据来处理每个 CSV 文件。 Azure Databricks 在自动加载程序处理数据时显示数据。在笔记本的第四个单元格中,粘贴以下代码:
stream.writeStream \ .option("checkpointLocation", checkpoint_location) \ .start(target_delta_table_location)运行此单元格。 自动加载程序将数据写入
target_data_table_location中的 Delta 表。 自动加载程序还将在checkpoint_location中写入检查点文件信息。
步骤 3:发展并强制实施数据架构
如果数据的架构随时间推移而发生变化,会发生什么情况? 例如,如果你希望改进字段数据类型,以便在未来更好地解决数据质量问题,并更轻松地对数据进行计算,该怎么做? 在此步骤中,改进数据的允许数据类型,然后对传入数据强制实施此架构。
请记得使步骤 1 中的笔记本保持运行,以使用新生成的示例文件来维护数据流。
停止步骤 2 中的笔记本。 (要停止该笔记本,请单击笔记本菜单栏中的“停止执行”。)
在步骤 2 中的笔记本中,将第四个单元格(以
stream.writeStream开头的单元格)的内容替换为以下代码:stream.printSchema()运行笔记本的所有单元格。 (若要运行所有单元,请在笔记本菜单栏中单击“全部运行”。)Azure Databricks 将输出数据的架构,它会将所有字段显示为字符串。 让我们将
x_axis和y_axis字段改进为整数。停止笔记本。
将第二个单元格(以
stream = spark.readStream开头的单元格)的内容替换为以下以下代码:stream = spark.readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "csv") \ .option("header", "true") \ .option("cloudFiles.schemaLocation", schema_location) \ .option("cloudFiles.schemaHints", """x_axis integer, y_axis integer""") \ .load(raw_data_location)运行笔记本的所有单元格。 Azure Databricks 打印数据的新架构,该架构以整数形式显示
x_axis和y_axis列。 现在使用这个新架构来保证数据质量。停止笔记本。
将第二个单元格的内容替换为以下代码:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType schema = StructType([ StructField('id', StringType(), True), StructField('x_axis', IntegerType(), True), StructField('y_axis', IntegerType(), True) ]) stream = spark.readStream \ .format("cloudFiles") \ .option("cloudFiles.format", "csv") \ .option("header", "true") \ .option("cloudFiles.schemaLocation", schema_location) \ .schema(schema) \ .load(raw_data_location)运行笔记本的所有单元格。 自动加载程序现在使用其架构推理和演变逻辑来确定如何处理与新架构不匹配的传入数据。
步骤 4:清理
完成本教程后,如果不再需要保留相关 Azure Databricks 资源,可以在工作区中清理这些资源。
删除数据
停止这两个笔记本。 (要打开笔记本,请在边栏中单击“工作区”>“用户”>“你的用户名”,然后单击笔记本。)
在步骤 1 中的笔记本中,在第一个单元格后面添加一个单元格,然后将以下代码粘贴到第二个单元格中。
dbutils.fs.rm("dbfs:/tmp/generated_raw_csv_data", True) dbutils.fs.rm("dbfs:/tmp/table", True) dbutils.fs.rm("dbfs:/tmp/auto_loader", True)警告
如果这些位置中有任何其他信息,此信息也将被删除!
运行单元。 Azure Databricks 删除包含原始数据的目录、Delta 表、表的架构和自动加载程序检查点信息。
删除笔记本
- 在边栏中单击“工作区”>“用户”>“你的用户名”。
- 单击第一个笔记本旁边的下拉箭头,然后单击“放入回收站”。
- 单击“确认并放入回收站”。
- 对第二个笔记本重复步骤 1 - 3。
停止群集
如果未将群集用于任何其他任务,则应停止该群集以免产生额外费用。
- 在边栏中,单击“计算”。
- 单击群集名称。
- 单击“终止”。
- 单击“确认” 。
其他资源
- 自动加载程序技术文档
- 对大量数据使用文件通知
- 可简化 Databricks Lakehouse 中半结构化数据管理的 10 项强大功能博客
- 轻而易举的数据引入点播网络研讨会系列