您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

使用 Azure 数据工厂将数据复制到 Azure Synapse 分析Copy data to and from Azure Synapse Analytics using Azure Data Factory

备注

本文适用于数据工厂版本 1。This article applies to version 1 of Data Factory. 如果使用的是最新版本的数据工厂服务,请参阅 V2 中的 Azure Synapse Analytics 连接器If you are using the current version of the Data Factory service, see Azure Synapse Analytics connector in V2.

本文介绍如何使用 Azure 数据工厂中的复制活动将数据移入/移出 Azure Synapse 分析。This article explains how to use the Copy Activity in Azure Data Factory to move data to/from Azure Synapse Analytics. 它基于数据移动活动一文,其中总体概述了如何使用复制活动移动数据。It builds on the Data Movement Activities article, which presents a general overview of data movement with the copy activity.

提示

要实现最佳性能,请使用 PolyBase 将数据载入 Azure Synapse Analytics。To achieve best performance, use PolyBase to load data into Azure Synapse Analytics. 有关详细信息,请参阅使用 PolyBase 将数据载入 Azure Synapse Analytics 部分。The Use PolyBase to load data into Azure Synapse Analytics section has details. 有关带有用例的演练,请参阅在不到 15 分钟的时间里通过 Azure 数据工厂将 1 TB 的数据载入 Azure Synapse AnalyticsFor a walkthrough with a use case, see Load 1 TB into Azure Synapse Analytics under 15 minutes with Azure Data Factory.

支持的方案Supported scenarios

可以将数据 从 Azure Synapse Analytics 复制到以下数据存储:You can copy data from Azure Synapse Analytics to the following data stores:

类别Category 数据存储Data store
AzureAzure Azure Blob 存储Azure Blob storage
Azure Data Lake Storage Gen1Azure Data Lake Storage Gen1
Azure Cosmos DB (SQL API)Azure Cosmos DB (SQL API)
Azure SQL 数据库Azure SQL Database
Azure Synapse AnalyticsAzure Synapse Analytics
Azure 认知搜索索引Azure Cognitive Search Index
Azure 表存储Azure Table storage
数据库Databases SQL ServerSQL Server
OracleOracle
文件File 文件系统File system

可以将数据从以下数据存储复制 到 Azure Synapse 分析You can copy data from the following data stores to Azure Synapse Analytics:

类别Category 数据存储Data store
AzureAzure Azure Blob 存储Azure Blob storage
Azure Cosmos DB (SQL API)Azure Cosmos DB (SQL API)
Azure Data Lake Storage Gen1Azure Data Lake Storage Gen1
Azure SQL 数据库Azure SQL Database
Azure Synapse AnalyticsAzure Synapse Analytics
Azure 表存储Azure Table storage
数据库Databases Amazon RedshiftAmazon Redshift
DB2DB2
MySQLMySQL
OracleOracle
PostgreSQLPostgreSQL
SAP Business WarehouseSAP Business Warehouse
SAP HANASAP HANA
SQL ServerSQL Server
SybaseSybase
TeradataTeradata
NoSQLNoSQL CassandraCassandra
MongoDBMongoDB
文件File Amazon S3Amazon S3
文件系统File system
FTPFTP
HDFSHDFS
SFTPSFTP
其他Others 泛型 HTTPGeneric HTTP
泛型 ODataGeneric OData
泛型 ODBCGeneric ODBC
SalesforceSalesforce
Web 表(HTML 中的表)Web table (table from HTML)

提示

将数据从 SQL Server 或 Azure SQL 数据库复制到 Azure Synapse Analytics 时,如果目标存储中不存在该表,数据工厂可以通过使用源数据存储中表的架构在 Azure Synapse Analytics 中自动创建表。When copying data from SQL Server or Azure SQL Database to Azure Synapse Analytics, if the table does not exist in the destination store, Data Factory can automatically create the table in Azure Synapse Analytics by using the schema of the table in the source data store. 有关详细信息,请参阅自动表创建See Auto table creation for details.

支持的身份验证类型Supported authentication type

Azure Synapse Analytics 连接器支持基本身份验证。Azure Synapse Analytics connector support basic authentication.

入门Getting started

可以使用不同的工具/Api 创建包含复制活动的管道,以将数据移到 Azure Synapse 分析或从 Azure 分析移动数据。You can create a pipeline with a copy activity that moves data to/from an Azure Synapse Analytics by using different tools/APIs.

若要创建将数据复制到 Azure Synapse Analytics/从 Azure Analytics 复制数据的管道,最简单的方法是使用复制数据向导。The easiest way to create a pipeline that copies data to/from Azure Synapse Analytics is to use the Copy data wizard. 有关使用复制数据向导创建管道的快速演练,请参阅 教程:使用数据工厂将数据加载到 Azure Synapse AnalyticsSee Tutorial: Load data into Azure Synapse Analytics with Data Factory for a quick walkthrough on creating a pipeline using the Copy data wizard.

你还可以使用以下工具创建管道: Visual StudioAzure PowerShellAZURE 资源管理器模板.net APIREST APIYou can also use the following tools to create a pipeline: Visual Studio, Azure PowerShell, Azure Resource Manager template, .NET API, and REST API. 有关创建包含复制活动的管道的分步说明,请参阅复制活动教程See Copy activity tutorial for step-by-step instructions to create a pipeline with a copy activity.

无论使用工具还是 API,执行以下步骤都可创建管道,以便将数据从源数据存储移到接收器数据存储:Whether you use the tools or APIs, you perform the following steps to create a pipeline that moves data from a source data store to a sink data store:

  1. 创建 数据工厂Create a data factory. 数据工厂可以包含一个或多个管道。A data factory may contain one or more pipelines.
  2. 创建 链接服务 以将输入和输出数据存储链接到数据工厂。Create linked services to link input and output data stores to your data factory. 例如,如果要将数据从 Azure blob 存储复制到 Azure Synapse Analytics,请创建两个链接服务,将 Azure 存储帐户和 Azure Synapse Analytics 链接到数据工厂。For example, if you are copying data from an Azure blob storage to an Azure Synapse Analytics, you create two linked services to link your Azure storage account and Azure Synapse Analytics to your data factory. 有关特定于 Azure Synapse Analytics 的链接服务属性,请参阅 链接服务属性 部分。For linked service properties that are specific to Azure Synapse Analytics, see linked service properties section.
  3. 创建用于表示复制操作的输入和输出数据的 数据集Create datasets to represent input and output data for the copy operation. 在上一个步骤所述的示例中,创建了一个数据集来指定 Blob 容器和包含输入数据的文件夹。In the example mentioned in the last step, you create a dataset to specify the blob container and folder that contains the input data. 另外,还可以创建另一个数据集来指定 Azure Synapse 分析中的表,以保存从 blob 存储复制的数据。And, you create another dataset to specify the table in the Azure Synapse Analytics that holds the data copied from the blob storage. 有关特定于 Azure Synapse Analytics 的数据集属性,请参阅 数据集属性 部分。For dataset properties that are specific to Azure Synapse Analytics, see dataset properties section.
  4. 创建包含复制活动的 管道 ,该活动将数据集作为输入,并将数据集作为输出。Create a pipeline with a copy activity that takes a dataset as an input and a dataset as an output. 在前面所述的示例中,对复制活动使用 BlobSource 作为源,SqlDWSink 作为接收器。In the example mentioned earlier, you use BlobSource as a source and SqlDWSink as a sink for the copy activity. 同样,如果要从 Azure Synapse Analytics 复制到 Azure Blob 存储,请在复制活动中使用 SqlDWSource 和 BlobSink。Similarly, if you are copying from Azure Synapse Analytics to Azure Blob Storage, you use SqlDWSource and BlobSink in the copy activity. 有关特定于 Azure Synapse Analytics 的复制活动属性,请参阅 复制活动属性 部分。For copy activity properties that are specific to Azure Synapse Analytics, see copy activity properties section. 有关如何将数据存储用作源或接收器的详细信息,请单击前面章节中的相应数据存储链接。For details on how to use a data store as a source or a sink, click the link in the previous section for your data store.

使用向导时,会自动创建这些数据工厂实体(链接服务、数据集和管道)的 JSON 定义。When you use the wizard, JSON definitions for these Data Factory entities (linked services, datasets, and the pipeline) are automatically created for you. 使用工具/API(.NET API 除外)时,使用 JSON 格式定义这些数据工厂实体。When you use tools/APIs (except .NET API), you define these Data Factory entities by using the JSON format. 有关用于向/从 Azure Synapse 分析复制数据的数据工厂实体的 JSON 定义示例,请参阅本文的 json 示例 部分。For samples with JSON definitions for Data Factory entities that are used to copy data to/from Azure Synapse Analytics, see JSON examples section of this article.

以下部分提供有关用于定义特定于 Azure Synapse Analytics 的数据工厂实体的 JSON 属性的详细信息:The following sections provide details about JSON properties that are used to define Data Factory entities specific to Azure Synapse Analytics:

链接服务属性Linked service properties

下表提供了特定于 Azure Synapse Analytics 链接服务的 JSON 元素的说明。The following table provides description for JSON elements specific to Azure Synapse Analytics linked service.

propertiesProperty 说明Description 必须Required
typetype Type 属性必须设置为: AzureSqlDWThe type property must be set to: AzureSqlDW Yes
connectionStringconnectionString 为 connectionString 属性指定连接到 Azure Synapse Analytics 实例所需的信息。Specify information needed to connect to the Azure Synapse Analytics instance for the connectionString property. 仅支持基本身份验证。Only basic authentication is supported. Yes

重要

配置 Azure SQL 数据库防火墙和数据库服务器以允许 Azure 服务访问该服务器Configure Azure SQL Database Firewall and the database server to allow Azure Services to access the server. 此外,如果将数据从 Azure 外部(包括数据工厂网关的本地数据源)复制到 Azure Synapse Analytics,请为将数据发送到 Azure Synapse Analytics 的计算机配置适当的 IP 地址范围。Additionally, if you are copying data to Azure Synapse Analytics from outside Azure including from on-premises data sources with data factory gateway, configure appropriate IP address range for the machine that is sending data to Azure Synapse Analytics.

数据集属性Dataset properties

有关可用于定义数据集的节和属性的完整列表,请参阅创建数据集一文。For a full list of sections & properties available for defining datasets, see the Creating datasets article. 对于所有数据集类型(Azure SQL、Azure Blob、Azure 表等),结构、可用性和数据集 JSON 的策略等部分均类似。Sections such as structure, availability, and policy of a dataset JSON are similar for all dataset types (Azure SQL, Azure blob, Azure table, etc.).

每种数据集的 typeProperties 节有所不同,该部分提供有关数据在数据存储区中的位置信息。The typeProperties section is different for each type of dataset and provides information about the location of the data in the data store. AzureSqlDWTable 类型数据集的 typeProperties 节具有以下属性:The typeProperties section for the dataset of type AzureSqlDWTable has the following properties:

propertiesProperty 说明Description 必须Required
tableNametableName 链接服务引用的 Azure Synapse 分析数据库中的表或视图的名称。Name of the table or view in the Azure Synapse Analytics database that the linked service refers to. Yes

复制活动属性Copy activity properties

有关可用于定义活动的节和属性的完整列表,请参阅创建管道一文。For a full list of sections & properties available for defining activities, see the Creating Pipelines article. 名称、说明、输入和输出表格等属性和策略可用于所有类型的活动。Properties such as name, description, input and output tables, and policy are available for all types of activities.

备注

复制活动只使用一个输入,只生成一个输出。The Copy Activity takes only one input and produces only one output.

而可用于此活动的 typeProperties 节的属性因每个活动类型而异。Whereas, properties available in the typeProperties section of the activity vary with each activity type. 对于复制活动,这些属性则因源和接收器的类型而异。For Copy activity, they vary depending on the types of sources and sinks.

SqlDWSourceSqlDWSource

源为 SqlDWSource 类型时,可在 typeProperties 节中使用以下属性:When source is of type SqlDWSource, the following properties are available in typeProperties section:

propertiesProperty 说明Description 允许的值Allowed values 必须Required
sqlReaderQuerysqlReaderQuery 使用自定义查询读取数据。Use the custom query to read data. SQL 查询字符串。SQL query string. 例如:select * from MyTable。For example: select * from MyTable. No
sqlReaderStoredProcedureNamesqlReaderStoredProcedureName 从源表读取数据的存储过程的名称。Name of the stored procedure that reads data from the source table. 存储过程的名称。Name of the stored procedure. 最后一个 SQL 语句必须是存储过程中的 SELECT 语句。The last SQL statement must be a SELECT statement in the stored procedure. No
storedProcedureParametersstoredProcedureParameters 存储过程的参数。Parameters for the stored procedure. 名称/值对。Name/value pairs. 参数的名称和大小写必须与存储过程参数的名称和大小写匹配。Names and casing of parameters must match the names and casing of the stored procedure parameters. No

如果为 SqlDWSource 指定 sqlReaderQuery ,则复制活动针对 Azure Synapse 分析源运行此查询以获取数据。If the sqlReaderQuery is specified for the SqlDWSource, the Copy Activity runs this query against the Azure Synapse Analytics source to get the data.

此外,也可以通过指定 sqlReaderStoredProcedureName 和 storedProcedureParameters 来指定存储过程(如果存储过程使用参数)。Alternatively, you can specify a stored procedure by specifying the sqlReaderStoredProcedureName and storedProcedureParameters (if the stored procedure takes parameters).

如果未指定 sqlReaderQuery 或 sqlReaderStoredProcedureName,则使用在数据集 JSON 的 "结构" 部分中定义的列来生成针对 Azure Synapse 分析运行的查询。If you do not specify either sqlReaderQuery or sqlReaderStoredProcedureName, the columns defined in the structure section of the dataset JSON are used to build a query to run against Azure Synapse Analytics. 示例:select column1, column2 from mytableExample: select column1, column2 from mytable. 如果数据集定义不具备该结构,则从表中选择所有列。If the dataset definition does not have the structure, all columns are selected from the table.

SqlDWSource 示例SqlDWSource example

"source": {
    "type": "SqlDWSource",
    "sqlReaderStoredProcedureName": "CopyTestSrcStoredProcedureWithParameters",
    "storedProcedureParameters": {
        "stringData": { "value": "str3" },
        "identifier": { "value": "$$Text.Format('{0:yyyy}', SliceStart)", "type": "Int"}
    }
}

存储过程定义:The stored procedure definition:

CREATE PROCEDURE CopyTestSrcStoredProcedureWithParameters
(
    @stringData varchar(20),
    @identifier int
)
AS
SET NOCOUNT ON;
BEGIN
    select *
    from dbo.UnitTestSrcTable
    where dbo.UnitTestSrcTable.stringData != stringData
    and dbo.UnitTestSrcTable.identifier != identifier
END
GO

SqlDWSinkSqlDWSink

SqlDWSink 支持以下属性:SqlDWSink supports the following properties:

propertiesProperty 说明Description 允许的值Allowed values 必须Required
sqlWriterCleanupScriptsqlWriterCleanupScript 指定复制活动要执行的查询,以便清除特定切片的数据。Specify a query for Copy Activity to execute such that data of a specific slice is cleaned up. 有关详细信息,请参阅可重复性部分For details, see repeatability section. 查询语句。A query statement. No
allowPolyBaseallowPolyBase 指示是否使用 PolyBase(如果适用)而不是 BULKINSERT 机制。Indicates whether to use PolyBase (when applicable) instead of BULKINSERT mechanism.

使用 PolyBase 是将数据加载到 Azure Synapse Analytics 的建议方法。Using PolyBase is the recommended way to load data into Azure Synapse Analytics. 有关约束和详细信息,请参阅使用 PolyBase 将数据加载到 Azure Synapse Analytics 部分。See Use PolyBase to load data into Azure Synapse Analytics section for constraints and details.
TrueTrue
False(默认值)False (default)
No
polyBaseSettingspolyBaseSettings allowPolybase 属性设置为 true 时可以指定的一组属性。A group of properties that can be specified when the allowPolybase property is set to true.   No
rejectValuerejectValue 指定在查询失败之前可以拒绝的行数或百分比。Specifies the number or percentage of rows that can be rejected before the query fails.

有关 PolyBase 的拒绝选项的详细信息,请参阅 CREATE EXTERNAL TABLE (transact-sql)主题的 Arguments 部分。Learn more about the PolyBase's reject options in the Arguments section of CREATE EXTERNAL TABLE (Transact-SQL) topic.
0(默认值)、1、2 …0 (default), 1, 2, … No
rejectTyperejectType 指定将 rejectValue 选项指定为文字值还是百分比。Specifies whether the rejectValue option is specified as a literal value or a percentage. 值(默认),百分比Value (default), Percentage No
rejectSampleValuerejectSampleValue 确定在 PolyBase 重新计算被拒绝行的百分比之前要检索的行数。Determines the number of rows to retrieve before the PolyBase recalculates the percentage of rejected rows. 1、2 …1, 2, … 如果 rejectType百分比,则为“是”Yes, if rejectType is percentage
useTypeDefaultuseTypeDefault 指定在 PolyBase 从文本文件中检索数据时如何处理带分隔符的文本文件中的缺失值。Specifies how to handle missing values in delimited text files when PolyBase retrieves data from the text file.

有关此属性的详细信息,请参阅创建外部文件格式 (Transact SQL) 中的参数部分。Learn more about this property from the Arguments section in CREATE EXTERNAL FILE FORMAT (Transact-SQL).
True、False(默认值)True, False (default) No
writeBatchSizewriteBatchSize 缓冲区大小达到 writeBatchSize 时会数据插入 SQL 表Inserts data into the SQL table when the buffer size reaches writeBatchSize 整数(行数)Integer (number of rows) 否(默认值:10000)No (default: 10000)
writeBatchTimeoutwriteBatchTimeout 超时之前等待批插入操作完成时的等待时间。Wait time for the batch insert operation to complete before it times out. timespantimespan

示例:"00:30:00"(30 分钟)。Example: "00:30:00" (30 minutes).
No

SqlDWSink 示例SqlDWSink example

"sink": {
    "type": "SqlDWSink",
    "allowPolyBase": true
}

使用 PolyBase 将数据加载到 Azure Synapse AnalyticsUse PolyBase to load data into Azure Synapse Analytics

使用 PolyBase 是将大量数据加载到高吞吐量的 Azure Synapse 分析的有效方法。Using PolyBase is an efficient way of loading large amount of data into Azure Synapse Analytics with high throughput. 可通过使用 PolyBase 而非默认 BULKINSERT 机制实现吞吐量的巨大增加。You can see a large gain in the throughput by using PolyBase instead of the default BULKINSERT mechanism. 请参阅复制性能参考数量了解详细比较。See copy performance reference number with detailed comparison. 有关带有用例的演练,请参阅在不到 15 分钟的时间里通过 Azure 数据工厂将 1 TB 的数据载入 Azure Synapse AnalyticsFor a walkthrough with a use case, see Load 1 TB into Azure Synapse Analytics under 15 minutes with Azure Data Factory.

  • 如果源数据位于 Azure Blob 或 Azure Data Lake Store 中,并且格式与 polybase 兼容,则可以使用 polybase 直接复制到 Azure Synapse 分析。If your source data is in Azure Blob or Azure Data Lake Store, and the format is compatible with PolyBase, you can directly copy to Azure Synapse Analytics using PolyBase. 有关详细信息,请参阅 使用 PolyBase 直接复制See Direct copy using PolyBase with details.
  • 如果 PolyBase 最初不支持源数据存储和格式,可改用 使用 PolyBase 的暂存复制 功能。If your source data store and format is not originally supported by PolyBase, you can use the Staged Copy using PolyBase feature instead. 通过自动将数据转换为 PolyBase 兼容的格式并将数据存储在 Azure Blob 存储中,它还可提供更高的吞吐量。It also provides you better throughput by automatically converting the data into PolyBase-compatible format and storing the data in Azure Blob storage. 然后,它会将数据加载到 Azure Synapse Analytics 中。It then loads data into Azure Synapse Analytics.

allowPolyBase如以下示例中所示,将属性设置为 true ,以便 Azure 数据工厂使用 PolyBase 将数据复制到 Azure Synapse Analytics 中。Set the allowPolyBase property to true as shown in the following example for Azure Data Factory to use PolyBase to copy data into Azure Synapse Analytics. 将 allowPolyBase 设置为“true”时,可使用 polyBaseSettings 属性组指定特定于 PolyBase 的属性。When you set allowPolyBase to true, you can specify PolyBase specific properties using the polyBaseSettings property group. 有关可与 polyBaseSettings 配合使用的属性的详细信息,请参阅 SqlDWSink 部分。see the SqlDWSink section for details about properties that you can use with polyBaseSettings.

"sink": {
    "type": "SqlDWSink",
    "allowPolyBase": true,
    "polyBaseSettings":
    {
        "rejectType": "percentage",
        "rejectValue": 10.0,
        "rejectSampleValue": 100,
        "useTypeDefault": true
    }
}

使用 PolyBase 直接复制Direct copy using PolyBase

Azure Synapse Analytics PolyBase 使用服务主体) 作为源,并使用特定的文件格式要求直接支持 Azure Blob 和 Azure Data Lake Store (。Azure Synapse Analytics PolyBase directly support Azure Blob and Azure Data Lake Store (using service principal) as source and with specific file format requirements. 如果源数据满足此部分中所述的条件,则可以使用 PolyBase 直接从源数据存储复制到 Azure Synapse Analytics。If your source data meets the criteria described in this section, you can directly copy from source data store to Azure Synapse Analytics using PolyBase. 否则,可改用使用 PolyBase 的暂存复制Otherwise, you can use Staged Copy using PolyBase.

提示

若要有效地将数据从 Data Lake Store 复制到 Azure Synapse Analytics,请从 Azure 数据工厂了解更多, 在将 Data Lake Store 与 Azure Synapse Analytics 结合使用时,可以更轻松、更方便地从数据中发现见解To copy data from Data Lake Store to Azure Synapse Analytics efficiently, learn more from Azure Data Factory makes it even easier and convenient to uncover insights from data when using Data Lake Store with Azure Synapse Analytics.

如果不满足要求,Azure 数据工厂会检查设置,并自动回退到 BULKINSERT 机制以进行数据移动。If the requirements are not met, Azure Data Factory checks the settings and automatically falls back to the BULKINSERT mechanism for the data movement.

  1. 源链接服务 的类型为:AzureStorage使用服务主体身份验证的 AzureDataLakeStoreSource linked service is of type: AzureStorage or AzureDataLakeStore with service principal authentication.

  2. 输入数据集 的类型为:AzureBlobAzureDataLakeStoretype 属性下的格式类型为 OrcFormatParquetFormatTextFormat,其配置如下:The input dataset is of type: AzureBlob or AzureDataLakeStore, and the format type under type properties is OrcFormat, ParquetFormat, or TextFormat with the following configurations:

    1. rowDelimiter 必须是 \nrowDelimiter must be \n.

    2. nullValue 设置为 空字符串 (""),或者 treatEmptyAsNull 设置为“true”。nullValue is set to empty string (""), or treatEmptyAsNull is set to true.

    3. encodingName 设置为“utf-8”,即 默认 值。encodingName is set to utf-8, which is default value.

    4. 未指定 escapeCharquoteCharfirstRowAsHeaderskipLineCountescapeChar, quoteChar, firstRowAsHeader, and skipLineCount are not specified.

    5. compression 可为 无压缩GZipDeflatecompression can be no compression, GZip, or Deflate.

      "typeProperties": {
       "folderPath": "<blobpath>",
       "format": {
           "type": "TextFormat",
           "columnDelimiter": "<any delimiter>",
           "rowDelimiter": "\n",
           "nullValue": "",
           "encodingName": "utf-8"
       },
       "compression": {
           "type": "GZip",
           "level": "Optimal"
       }
      },
      
  3. 管道中复制活动的 BlobSourceAzureDataLakeStore 下没有 skipHeaderLineCount 设置。There is no skipHeaderLineCount setting under BlobSource or AzureDataLakeStore for the Copy activity in the pipeline.

  4. 管道中复制活动的 SqlDWSink 下没有 sliceIdentifierColumnName 设置。There is no sliceIdentifierColumnName setting under SqlDWSink for the Copy activity in the pipeline. (PolyBase 保证所有数据都已更新或在单次运行中没有任何更新。(PolyBase guarantees that all data is updated or nothing is updated in a single run. 若要实现 可重复性,可使用 sqlWriterCleanupScript)。To achieve repeatability, you could use sqlWriterCleanupScript).

  5. 复制活动的关联内容中没有使用 columnMappingThere is no columnMapping being used in the associated in Copy activity.

使用 PolyBase 的暂存复制Staged Copy using PolyBase

如果源数据不满足上一部分中引入的条件,则可以通过临时暂存 Azure Blob 存储启用复制数据 (不能是高级存储) 。When your source data doesn't meet the criteria introduced in the previous section, you can enable copying data via an interim staging Azure Blob Storage (cannot be Premium Storage). 在这种情况下,Azure 数据工厂会自动对数据执行转换以满足 PolyBase 的数据格式要求,然后使用 PolyBase 将数据加载到 Azure Synapse Analytics 中,并在最后清理 Blob 存储中的临时数据。In this case, Azure Data Factory automatically performs transformations on the data to meet data format requirements of PolyBase, then use PolyBase to load data into Azure Synapse Analytics, and at last clean-up your temp data from the Blob storage. 有关通常如何通过暂存 Azure Blob 复制数据的详细信息,请参阅暂存复制See Staged Copy for details on how copying data via a staging Azure Blob works in general.

备注

使用 PolyBase 和暂存将数据从本地数据存储复制到 Azure Synapse Analytics 时,如果你的数据管理网关版本低于2.4,则你的网关计算机上需要 JRE (Java Runtime Environment) ,用于将源数据转换为正确的格式。When copying data from an on premises data store into Azure Synapse Analytics using PolyBase and staging, if your Data Management Gateway version is below 2.4, JRE (Java Runtime Environment) is required on your gateway machine that is used to transform your source data into proper format. 建议将网关升级到最新版本,以避免此类依赖项。Suggest you upgrade your gateway to the latest to avoid such dependency.

要使用此功能,请创建 Azure 存储链接服务(引用具有临时 blob 存储的 Azure 存储帐户),并指定复制活动的 enableStagingstagingSettings 属性,如下方代码所示:To use this feature, create an Azure Storage linked service that refers to the Azure Storage Account that has the interim blob storage, then specify the enableStaging and stagingSettings properties for the Copy Activity as shown in the following code:

"activities":[
{
    "name": "Sample copy activity from SQL Server to Azure Synapse Analytics via PolyBase",
    "type": "Copy",
    "inputs": [{ "name": "OnpremisesSQLServerInput" }],
    "outputs": [{ "name": "AzureSQLDWOutput" }],
    "typeProperties": {
        "source": {
            "type": "SqlSource",
        },
        "sink": {
            "type": "SqlDwSink",
            "allowPolyBase": true
        },
        "enableStaging": true,
        "stagingSettings": {
            "linkedServiceName": "MyStagingBlob"
        }
    }
}
]

使用 PolyBase 的最佳实践Best practices when using PolyBase

以下各节提供了 Azure Synapse Analytics 最佳做法中提到的最佳实践。The following sections provide additional best practices to the ones that are mentioned in Best practices for Azure Synapse Analytics.

所需数据库权限Required database permission

若要使用 PolyBase,要求用于将数据加载到 Azure Synapse Analytics 的用户具有对目标数据库的 "控制" 权限To use PolyBase, it requires the user being used to load data into Azure Synapse Analytics has the "CONTROL" permission on the target database. 一种实现方法是将该用户添加为“db_owner”角色的成员。One way to achieve that is to add that user as a member of "db_owner" role. 参阅本节了解如何进行此操作。Learn how to do that by following this section.

行大小和数据类型限制Row size and data type limitation

Polybase 加载限制为加载小于 1 MB 的行,并且无法加载到 VARCHR(MAX)、NVARCHAR(MAX) 或 VARBINARY(MAX)。Polybase loads are limited to loading rows both smaller than 1 MB and cannot load to VARCHR(MAX), NVARCHAR(MAX) or VARBINARY(MAX). 请参阅此处Refer to here.

如果源数据的行大小大于 1 MB,则需要将源表垂直拆分为几个小的源表,其中每个源表的最大行大小不超过限制。If you have source data with rows of size greater than 1 MB, you may want to split the source tables vertically into several small ones where the largest row size of each of them does not exceed the limit. 然后,可以使用 PolyBase 加载较小的表,并将它们合并到 Azure Synapse Analytics 中。The smaller tables can then be loaded using PolyBase and merged together in Azure Synapse Analytics.

Azure Synapse Analytics 资源类Azure Synapse Analytics resource class

若要获得最佳吞吐量,请考虑将更大的资源类分配给用户,以便通过 PolyBase 将数据加载到 Azure Synapse Analytics 中。To achieve best possible throughput, consider to assign larger resource class to the user being used to load data into Azure Synapse Analytics via PolyBase. 请参阅更改用户资源类示例,了解如何执行该操作。Learn how to do that by following Change a user resource class example.

Azure Synapse 分析中的 tableNametableName in Azure Synapse Analytics

下表提供了示例,说明如何在数据集 JSON 中为架构和表名的各种组合指定 tableName 属性。The following table provides examples on how to specify the tableName property in dataset JSON for various combinations of schema and table name.

DB 架构DB Schema 表名称Table name tableName JSON 属性tableName JSON property
dbodbo MyTableMyTable MyTable 或 dbo.MyTable 或 [dbo].[MyTable]MyTable or dbo.MyTable or [dbo].[MyTable]
dbo1dbo1 MyTableMyTable dbo1.MyTable 或 [dbo1].[MyTable]dbo1.MyTable or [dbo1].[MyTable]
dbodbo My.TableMy.Table [My.Table] 或 [dbo].[My.Table][My.Table] or [dbo].[My.Table]
dbo1dbo1 My.TableMy.Table [dbo1].[My.Table][dbo1].[My.Table]

如果看到以下错误,此问题可能与为 tableName 属性指定的值有关。If you see the following error, it could be an issue with the value you specified for the tableName property. 有关为 tableName JSON 属性指定值的正确方法,请参阅表。See the table for the correct way to specify values for the tableName JSON property.

Type=System.Data.SqlClient.SqlException,Message=Invalid object name 'stg.Account_test'.,Source=.Net SqlClient Data Provider

具有默认值的列Columns with default values

目前,数据工厂中的 PolyBase 功能只接受与目标表中相同数量的列。Currently, PolyBase feature in Data Factory only accepts the same number of columns as in the target table. 假设一个表包含四列,其中一列由默认值定义。Say, you have a table with four columns and one of them is defined with a default value. 则输入数据仍应包含四列。The input data should still contain four columns. 提供包含 3 列的输入数据集将产生与以下消息类似的错误:Providing a 3-column input dataset would yield an error similar to the following message:

All columns of the table must be specified in the INSERT BULK statement.

NULL 值是特殊形式的默认值。NULL value is a special form of default value. 如果列可为 null,则该列的输入数据(以 blob 为单位)可以为空(输入数据集中不能缺失数据)。If the column is nullable, the input data (in blob) for that column could be empty (cannot be missing from the input dataset). PolyBase 在 Azure Synapse 分析中插入 NULL。PolyBase inserts NULL for them in the Azure Synapse Analytics.

自动表创建Auto table creation

如果使用复制向导将数据从 SQL Server 或 Azure SQL 数据库复制到 Azure Synapse Analytics,并且目标存储中不存在对应于源表的表,则数据工厂可以使用源表架构在数据仓库中自动创建表。If you are using Copy Wizard to copy data from SQL Server or Azure SQL Database to Azure Synapse Analytics and the table that corresponds to the source table does not exist in the destination store, Data Factory can automatically create the table in the data warehouse by using the source table schema.

数据工厂在目标存储中创建与源数据存储中具有相同表名称的表。Data Factory creates the table in the destination store with the same table name in the source data store. 根据以下类型映射选择列的数据类型。The data types for columns are chosen based on the following type mapping. 如果需要,它会执行类型转换,从而修复源存储和目标存储之间的任何不兼容问题。If needed, it performs type conversions to fix any incompatibilities between source and destination stores. 它还使用轮循机制表分布。It also uses Round Robin table distribution.

源 SQL 数据库列类型Source SQL Database column type 目标 Azure Synapse Analytics 列类型 (大小限制) Destination Azure Synapse Analytics column type (size limitation)
intInt intInt
BigIntBigInt BigIntBigInt
SmallIntSmallInt SmallIntSmallInt
TinyIntTinyInt TinyIntTinyInt
bitBit bitBit
小数Decimal DecimalDecimal
NumericNumeric 小数Decimal
FloatFloat FloatFloat
MoneyMoney MoneyMoney
RealReal RealReal
SmallMoneySmallMoney SmallMoneySmallMoney
二进制Binary 二进制Binary
VarbinaryVarbinary Varbinary(最多 8000)Varbinary (up to 8000)
DateDate DateDate
DateTimeDateTime DateTimeDateTime
DateTime2DateTime2 DateTime2DateTime2
时间Time 时间Time
DateTimeOffsetDateTimeOffset DateTimeOffsetDateTimeOffset
SmallDateTimeSmallDateTime SmallDateTimeSmallDateTime
文本Text Varchar(最多 8000)Varchar (up to 8000)
NTextNText NVarChar(最多 4000)NVarChar (up to 4000)
图像Image VarBinary(最多 8000)VarBinary (up to 8000)
UniqueIdentifierUniqueIdentifier UniqueIdentifierUniqueIdentifier
CharChar CharChar
NCharNChar NCharNChar
VarCharVarChar VarChar(最多 8000)VarChar (up to 8000)
NVarCharNVarChar NVarChar(最多 4000)NVarChar (up to 4000)
XmlXml Varchar(最多 8000)Varchar (up to 8000)

复制期间的可重复性Repeatability during Copy

从其他数据存储中的数据复制到 Azure SQL/SQL Server 时,需要记住可重复性,以免发生意外的结果。When copying data to Azure SQL/SQL Server from other data stores one needs to keep repeatability in mind to avoid unintended outcomes.

将数据复制到 Azure SQL/SQL Server 数据库时,复制活动默认会将数据集追加到接收器表。When copying data to Azure SQL/SQL Server Database, copy activity will by default APPEND the data set to the sink table by default. 例如,将数据从包含两条记录的 CSV(逗号分隔值)文件源复制到 Azure SQL/SQL Server 数据库时,表的外观如下所示:For example, when copying data from a CSV (comma separated values data) file source containing two records to Azure SQL/SQL Server Database, this is what the table looks like:

ID    Product        Quantity    ModifiedDate
...    ...            ...            ...
6    Flat Washer    3            2015-05-01 00:00:00
7     Down Tube    2            2015-05-01 00:00:00

假设在源文件中发现错误,然后将源文件中的 Down Tube 数量从 2 更新为 4。Suppose you found errors in source file and updated the quantity of Down Tube from 2 to 4 in the source file. 如果重新运行该时间段的数据切片,会发现有两条新记录已追加到 Azure SQL/SQL Server 数据库。If you re-run the data slice for that period, you’ll find two new records appended to Azure SQL/SQL Server Database. 下面假设表中的列都没有主键约束。The below assumes none of the columns in the table have the primary key constraint.

ID    Product        Quantity    ModifiedDate
...    ...            ...            ...
6    Flat Washer    3            2015-05-01 00:00:00
7     Down Tube    2            2015-05-01 00:00:00
6    Flat Washer    3            2015-05-01 00:00:00
7     Down Tube    4            2015-05-01 00:00:00

若要避免此问题,必须利用下述 2 种机制之一指定 UPSERT 语义。To avoid this, you will need to specify UPSERT semantics by leveraging one of the below 2 mechanisms stated below.

备注

可以根据指定的重试策略在 Azure 数据工厂中自动重新运行切片。A slice can be re-run automatically in Azure Data Factory as per the retry policy specified.

机制 1Mechanism 1

可以利用 sqlWriterCleanupScript 属性在运行切片时先执行清理操作。You can leverage sqlWriterCleanupScript property to first perform cleanup action when a slice is run.

"sink":  
{ 
  "type": "SqlSink", 
  "sqlWriterCleanupScript": "$$Text.Format('DELETE FROM table WHERE ModifiedDate >= \\'{0:yyyy-MM-dd HH:mm}\\' AND ModifiedDate < \\'{1:yyyy-MM-dd HH:mm}\\'', WindowStart, WindowEnd)"
}

在复制给定的切片期间会先执行清除脚本,这会删除 SQL 表中对应于该切片的数据。The cleanup script would be executed first during copy for a given slice which would delete the data from the SQL Table corresponding to that slice. 然后,活动会将数据插入 SQL 表。The activity will subsequently insert the data into the SQL Table.

如果此时重新运行切片,则会发现数量已根据需要更新。If the slice is now re-run, then you will find the quantity is updated as desired.

ID    Product        Quantity    ModifiedDate
...    ...            ...            ...
6    Flat Washer    3            2015-05-01 00:00:00
7     Down Tube    4            2015-05-01 00:00:00

假设 Flat Washer 已从原始 csv 中删除。Suppose the Flat Washer record is removed from the original csv. 重新运行切片会生成以下结果:Then re-running the slice would produce the following result:

ID    Product        Quantity    ModifiedDate
...    ...            ...            ...
7     Down Tube    4            2015-05-01 00:00:00

不需要执行其他操作。Nothing new had to be done. 复制活动已运行清除脚本来删除该切片的相应数据。The copy activity ran the cleanup script to delete the corresponding data for that slice. 然后,它从 csv(只包含 1 条记录)中读取了输入并将其插入表中。Then it read the input from the csv (which then contained only 1 record) and inserted it into the Table.

机制 2Mechanism 2

重要

Azure Synapse Analytics 目前不支持 sliceIdentifierColumnName。sliceIdentifierColumnName is not supported for Azure Synapse Analytics at this time.

实现可重复性的另一种机制是在目标表中使用专用列 ( sliceIdentifierColumnName )。Another mechanism to achieve repeatability is by having a dedicated column ( sliceIdentifierColumnName ) in the target Table. Azure 数据工厂使用此列来确保源与目标保持同步。This column would be used by Azure Data Factory to ensure the source and destination stay synchronized. 如果可以灵活更改或定义目标 SQL 表架构,则很适合使用这种方法。This approach works when there is flexibility in changing or defining the destination SQL Table schema.

出于可重复性的目的,Azure 数据工厂将使用此列,在此过程中,Azure 数据工厂不会对表做出任何架构更改。This column would be used by Azure Data Factory for repeatability purposes and in the process Azure Data Factory will not make any schema changes to the Table. 如何使用此方法:Way to use this approach:

  1. 在目标 SQL 表中定义二进制 (32) 类型的列。Define a column of type binary (32) in the destination SQL Table. 此列不应有任何约束。There should be no constraints on this column. 在本示例中,我们将此列命名为“ColumnForADFuseOnly”。Let's name this column as ‘ColumnForADFuseOnly’ for this example.

  2. 如下所示,在复制活动中使用该列:Use it in the copy activity as follows:

    "sink":  
    { 
    
        "type": "SqlSink", 
        "sliceIdentifierColumnName": "ColumnForADFuseOnly"
    }
    

Azure 数据工厂会根据此列的需求填充数据,确保源与目标保持同步。Azure Data Factory will populate this column as per its need to ensure the source and destination stay synchronized. 用户不应在此上下文以外使用此列的值。The values of this column should not be used outside of this context by the user.

类似于机制 1,复制活动首先会自动从目标 SQL 表中清除给定切片的数据,然后正常运行复制活动,将数据从源插入该切片的目标。Similar to mechanism 1, Copy Activity will automatically first clean up the data for the given slice from the destination SQL Table and then run the copy activity normally to insert the data from source to destination for that slice.

Azure Synapse Analytics 的类型映射Type mapping for Azure Synapse Analytics

数据移动活动一文中所述,复制活动通过以下 2 步方法执行从源类型到接收器类型的自动类型转换:As mentioned in the data movement activities article, Copy activity performs automatic type conversions from source types to sink types with the following 2-step approach:

  1. 从本机源类型转换为 .NET 类型Convert from native source types to .NET type
  2. 从 .NET 类型转换为本机接收器类型Convert from .NET type to native sink type

将数据从 Azure Synapse Analytics 移到 & 时,将从 SQL 类型到 .NET 类型使用以下映射,反之亦然。When moving data to & from Azure Synapse Analytics, the following mappings are used from SQL type to .NET type and vice versa.

映射与 ADO.NET 的 SQL Server 数据类型映射相同。The mapping is same as the SQL Server Data Type Mapping for ADO.NET.

SQL Server 数据库引擎类型SQL Server Database Engine type .NET Framework 类型.NET Framework type
bigintbigint Int64Int64
binarybinary Byte[]Byte[]
bitbit BooleanBoolean
charchar String, Char[]String, Char[]
datedate DateTimeDateTime
datetimeDatetime DateTimeDateTime
datetime2datetime2 DateTimeDateTime
DatetimeoffsetDatetimeoffset DateTimeOffsetDateTimeOffset
小数Decimal 小数Decimal
FILESTREAM attribute (varbinary(max))FILESTREAM attribute (varbinary(max)) Byte[]Byte[]
FloatFloat DoubleDouble
imageimage Byte[]Byte[]
intint Int32Int32
moneymoney 小数Decimal
ncharnchar String, Char[]String, Char[]
ntextntext String, Char[]String, Char[]
numericnumeric 小数Decimal
nvarcharnvarchar String, Char[]String, Char[]
realreal SingleSingle
rowversionrowversion Byte[]Byte[]
smalldatetimesmalldatetime DateTimeDateTime
smallintsmallint Int16Int16
smallmoneysmallmoney 小数Decimal
sql_variantsql_variant Object *Object *
texttext String, Char[]String, Char[]
timetime TimeSpanTimeSpan
timestamptimestamp Byte[]Byte[]
tinyinttinyint ByteByte
uniqueidentifieruniqueidentifier GuidGuid
varbinaryvarbinary Byte[]Byte[]
varcharvarchar String, Char[]String, Char[]
xmlxml XmlXml

还可以在复制活动定义中将源数据集中的列映射到接收器数据集中的列。You can also map columns from source dataset to columns from sink dataset in the copy activity definition. 有关详细信息,请参阅映射 Azure 数据工厂中的数据集列For details, see Mapping dataset columns in Azure Data Factory.

将数据复制到 Azure Synapse 分析和从 Azure Analytics 复制数据的 JSON 示例JSON examples for copying data to and from Azure Synapse Analytics

下面的示例提供示例 JSON 定义,可用于通过使用 Visual StudioAzure PowerShell创建管道。The following examples provide sample JSON definitions that you can use to create a pipeline by using Visual Studio or Azure PowerShell. 它们说明了如何将数据复制到 Azure Synapse 分析和 Azure Blob 存储。They show how to copy data to and from Azure Synapse Analytics and Azure Blob Storage. 但是,可使用 Azure 数据工厂中的复制活动将数据 直接 从任何源复制到 此处所述的任何接收器。However, data can be copied directly from any of sources to any of the sinks stated here using the Copy Activity in Azure Data Factory.

示例:将数据从 Azure Synapse Analytics 复制到 Azure BlobExample: Copy data from Azure Synapse Analytics to Azure Blob

此示例定义以下数据工厂实体:The sample defines the following Data Factory entities:

  1. AzureSqlDW 类型的链接服务。A linked service of type AzureSqlDW.
  2. AzureStorage类型的链接服务。A linked service of type AzureStorage.
  3. AzureSqlDWTable 类型的输入数据集An input dataset of type AzureSqlDWTable.
  4. AzureBlob类型的输出数据集An output dataset of type AzureBlob.
  5. 包含复制活动的管道,它使用 SqlDWSourceBlobSinkA pipeline with Copy Activity that uses SqlDWSource and BlobSink.

此示例每小时将时间序列 (每小时、每天等 ) 数据从 Azure Synapse 分析数据库中的表复制到 blob。The sample copies time-series (hourly, daily, etc.) data from a table in Azure Synapse Analytics database to a blob every hour. 对于这些示例中使用的 JSON 属性,在示例后的部分对其进行描述。The JSON properties used in these samples are described in sections following the samples.

Azure Synapse Analytics 链接服务:Azure Synapse Analytics linked service:

{
  "name": "AzureSqlDWLinkedService",
  "properties": {
    "type": "AzureSqlDW",
    "typeProperties": {
      "connectionString": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
    }
  }
}

Azure Blob 存储链接服务:Azure Blob storage linked service:

{
  "name": "StorageLinkedService",
  "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountname>;AccountKey=<accountkey>"
    }
  }
}

Azure Synapse Analytics 输入数据集:Azure Synapse Analytics input dataset:

该示例假设已在 Azure Synapse Analytics 中创建了表 "MyTable",并且它包含了用于时序数据的名为 "timestampcolumn" 的列。The sample assumes you have created a table "MyTable" in Azure Synapse Analytics and it contains a column called "timestampcolumn" for time series data.

设置 "external": "true" 将告知数据工厂服务:数据集位于数据工厂外且不由数据工厂中的活动生成。Setting "external": "true" informs the Data Factory service that the dataset is external to the data factory and is not produced by an activity in the data factory.

{
  "name": "AzureSqlDWInput",
  "properties": {
    "type": "AzureSqlDWTable",
    "linkedServiceName": "AzureSqlDWLinkedService",
    "typeProperties": {
      "tableName": "MyTable"
    },
    "external": true,
    "availability": {
      "frequency": "Hour",
      "interval": 1
    },
    "policy": {
      "externalData": {
        "retryInterval": "00:01:00",
        "retryTimeout": "00:10:00",
        "maximumRetry": 3
      }
    }
  }
}

Azure Blob 输出数据集:Azure Blob output dataset:

数据将写入到新 blob,每小时进行一次(频率:小时,间隔:1)。Data is written to a new blob every hour (frequency: hour, interval: 1). 根据处理中切片的开始时间,动态计算 blob 的文件夹路径。The folder path for the blob is dynamically evaluated based on the start time of the slice that is being processed. 文件夹路径使用开始时间的年、月、日和小时部分。The folder path uses year, month, day, and hours parts of the start time.

{
  "name": "AzureBlobOutput",
  "properties": {
    "type": "AzureBlob",
    "linkedServiceName": "StorageLinkedService",
    "typeProperties": {
      "folderPath": "mycontainer/myfolder/yearno={Year}/monthno={Month}/dayno={Day}/hourno={Hour}",
      "partitionedBy": [
        {
          "name": "Year",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "yyyy"
          }
        },
        {
          "name": "Month",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "MM"
          }
        },
        {
          "name": "Day",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "dd"
          }
        },
        {
          "name": "Hour",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "HH"
          }
        }
      ],
      "format": {
        "type": "TextFormat",
        "columnDelimiter": "\t",
        "rowDelimiter": "\n"
      }
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

管道中使用 SqlDWSource 和 BlobSink 的复制活动:Copy activity in a pipeline with SqlDWSource and BlobSink:

管道包含配置为使用输入和输出数据集、且计划每小时运行一次的复制活动。The pipeline contains a Copy Activity that is configured to use the input and output datasets and is scheduled to run every hour. 在管道 JSON 定义中, 类型设置为 SqlDWSource接收器 类型设置为 BlobSinkIn the pipeline JSON definition, the source type is set to SqlDWSource and sink type is set to BlobSink. SqlReaderQuery 属性指定的 SQL 查询选择复制过去一小时的数据。The SQL query specified for the SqlReaderQuery property selects the data in the past hour to copy.

{
  "name":"SamplePipeline",
  "properties":{
    "start":"2014-06-01T18:00:00",
    "end":"2014-06-01T19:00:00",
    "description":"pipeline for copy activity",
    "activities":[
      {
        "name": "AzureSQLDWtoBlob",
        "description": "copy activity",
        "type": "Copy",
        "inputs": [
          {
            "name": "AzureSqlDWInput"
          }
        ],
        "outputs": [
          {
            "name": "AzureBlobOutput"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "SqlDWSource",
            "sqlReaderQuery": "$$Text.Format('select * from MyTable where timestampcolumn >= \\'{0:yyyy-MM-dd HH:mm}\\' AND timestampcolumn < \\'{1:yyyy-MM-dd HH:mm}\\'', WindowStart, WindowEnd)"
          },
          "sink": {
            "type": "BlobSink"
          }
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 0,
          "timeout": "01:00:00"
        }
      }
    ]
  }
}

备注

在本例中,为 SqlDWSource 指定了 sqlReaderQueryIn the example, sqlReaderQuery is specified for the SqlDWSource. 复制活动对 Azure Synapse Analytics 源运行此查询以获取数据。The Copy Activity runs this query against the Azure Synapse Analytics source to get the data.

此外,也可以通过指定 sqlReaderStoredProcedureName 和 storedProcedureParameters 来指定存储过程(如果存储过程使用参数)。Alternatively, you can specify a stored procedure by specifying the sqlReaderStoredProcedureName and storedProcedureParameters (if the stored procedure takes parameters).

如果未指定 sqlReaderQuery 或 sqlReaderStoredProcedureName,则使用在数据集 JSON 的结构部分中定义的列来生成查询 (选择 column1,从 mytable) 运行,以针对 Azure Synapse 分析运行。If you do not specify either sqlReaderQuery or sqlReaderStoredProcedureName, the columns defined in the structure section of the dataset JSON are used to build a query (select column1, column2 from mytable) to run against Azure Synapse Analytics. 如果数据集定义不具备该结构,则从表中选择所有列。If the dataset definition does not have the structure, all columns are selected from the table.

示例:将数据从 Azure Blob 复制到 Azure Synapse AnalyticsExample: Copy data from Azure Blob to Azure Synapse Analytics

此示例定义以下数据工厂实体:The sample defines the following Data Factory entities:

  1. AzureSqlDW 类型的链接服务。A linked service of type AzureSqlDW.
  2. AzureStorage类型的链接服务。A linked service of type AzureStorage.
  3. AzureBlob类型的输入数据集An input dataset of type AzureBlob.
  4. AzureSqlDWTable 类型的输出数据集An output dataset of type AzureSqlDWTable.
  5. 包含复制活动的管道,该复制活动使用 BlobSourceSqlDWSinkA pipeline with Copy activity that uses BlobSource and SqlDWSink.

此示例每小时将时间序列 (数据从 Azure blob 复制到 Azure Synapse 分析数据库中的表 ) 。The sample copies time-series data (hourly, daily, etc.) from Azure blob to a table in an Azure Synapse Analytics database every hour. 对于这些示例中使用的 JSON 属性,在示例后的部分对其进行描述。The JSON properties used in these samples are described in sections following the samples.

Azure Synapse Analytics 链接服务:Azure Synapse Analytics linked service:

{
  "name": "AzureSqlDWLinkedService",
  "properties": {
    "type": "AzureSqlDW",
    "typeProperties": {
      "connectionString": "Server=tcp:<servername>.database.windows.net,1433;Database=<databasename>;User ID=<username>@<servername>;Password=<password>;Trusted_Connection=False;Encrypt=True;Connection Timeout=30"
    }
  }
}

Azure Blob 存储链接服务:Azure Blob storage linked service:

{
  "name": "StorageLinkedService",
  "properties": {
    "type": "AzureStorage",
    "typeProperties": {
      "connectionString": "DefaultEndpointsProtocol=https;AccountName=<accountname>;AccountKey=<accountkey>"
    }
  }
}

Azure Blob 输入数据集:Azure Blob input dataset:

每小时从新的 blob 获取数据一次(频率:小时,间隔:1)。Data is picked up from a new blob every hour (frequency: hour, interval: 1). 根据处理中切片的开始时间,动态评估 blob 的文件夹路径和文件名。The folder path and file name for the blob are dynamically evaluated based on the start time of the slice that is being processed. 文件夹路径使用开始时间的年、月和日部分,文件名使用开始时间的小时部分。The folder path uses year, month, and day part of the start time and file name uses the hour part of the start time. "external": "true" 设置将告知数据工厂服务:此表位于数据工厂外部,且不由数据工厂中的活动生成。"external": "true" setting informs the Data Factory service that this table is external to the data factory and is not produced by an activity in the data factory.

{
  "name": "AzureBlobInput",
  "properties": {
    "type": "AzureBlob",
    "linkedServiceName": "StorageLinkedService",
    "typeProperties": {
      "folderPath": "mycontainer/myfolder/yearno={Year}/monthno={Month}/dayno={Day}",
      "fileName": "{Hour}.csv",
      "partitionedBy": [
        {
          "name": "Year",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "yyyy"
          }
        },
        {
          "name": "Month",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "MM"
          }
        },
        {
          "name": "Day",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "dd"
          }
        },
        {
          "name": "Hour",
          "value": {
            "type": "DateTime",
            "date": "SliceStart",
            "format": "HH"
          }
        }
      ],
      "format": {
        "type": "TextFormat",
        "columnDelimiter": ",",
        "rowDelimiter": "\n"
      }
    },
    "external": true,
    "availability": {
      "frequency": "Hour",
      "interval": 1
    },
    "policy": {
      "externalData": {
        "retryInterval": "00:01:00",
        "retryTimeout": "00:10:00",
        "maximumRetry": 3
      }
    }
  }
}

Azure Synapse Analytics 输出数据集:Azure Synapse Analytics output dataset:

此示例将数据复制到 Azure Synapse Analytics 中名为 "MyTable" 的表。The sample copies data to a table named "MyTable" in Azure Synapse Analytics. 在 Azure Synapse Analytics 中创建表,其列数与 Blob CSV 文件要包含的列数相同。Create the table in Azure Synapse Analytics with the same number of columns as you expect the Blob CSV file to contain. 每隔一小时会向表添加新行。New rows are added to the table every hour.

{
  "name": "AzureSqlDWOutput",
  "properties": {
    "type": "AzureSqlDWTable",
    "linkedServiceName": "AzureSqlDWLinkedService",
    "typeProperties": {
      "tableName": "MyOutputTable"
    },
    "availability": {
      "frequency": "Hour",
      "interval": 1
    }
  }
}

管道中使用 BlobSource 和 SqlDWSink 的复制活动:Copy activity in a pipeline with BlobSource and SqlDWSink:

管道包含配置为使用输入和输出数据集、且计划每小时运行一次的复制活动。The pipeline contains a Copy Activity that is configured to use the input and output datasets and is scheduled to run every hour. 在管道 JSON 定义中, 类型设置为 BlobSource接收器 类型设置为 SqlDWSinkIn the pipeline JSON definition, the source type is set to BlobSource and sink type is set to SqlDWSink.

{
  "name":"SamplePipeline",
  "properties":{
    "start":"2014-06-01T18:00:00",
    "end":"2014-06-01T19:00:00",
    "description":"pipeline with copy activity",
    "activities":[
      {
        "name": "AzureBlobtoSQLDW",
        "description": "Copy Activity",
        "type": "Copy",
        "inputs": [
          {
            "name": "AzureBlobInput"
          }
        ],
        "outputs": [
          {
            "name": "AzureSqlDWOutput"
          }
        ],
        "typeProperties": {
          "source": {
            "type": "BlobSource",
            "blobColumnSeparators": ","
          },
          "sink": {
            "type": "SqlDWSink",
            "allowPolyBase": true
          }
        },
        "scheduler": {
          "frequency": "Hour",
          "interval": 1
        },
        "policy": {
          "concurrency": 1,
          "executionPriorityOrder": "OldestFirst",
          "retry": 0,
          "timeout": "01:00:00"
        }
      }
    ]
  }
}

有关演练,请参阅 Azure Synapse 分析文档中的使用 Azure 数据工厂和 azure数据工厂加载数据一文中的 "查看使用 Azure 数据工厂加载 1 TB 到 azure Synapse 分析"。For a walkthrough, see the see Load 1 TB into Azure Synapse Analytics under 15 minutes with Azure Data Factory and Load data with Azure Data Factory article in the Azure Synapse Analytics documentation.

性能和优化Performance and Tuning

若要了解影响 Azure 数据工厂中数据移动(复制活动)性能的关键因素及各种优化方法,请参阅复制活动性能和优化指南See Copy Activity Performance & Tuning Guide to learn about key factors that impact performance of data movement (Copy Activity) in Azure Data Factory and various ways to optimize it.