使用 Azure Data Factory 或 Azure Synapse Analytics 在 Snowflake 中複製和轉換資料
適用於:Azure Data Factory
Azure Synapse Analytics
本文概述如何使用 Azure Data Factory 和 Azure Synapse Analytics 管線中的複製活動,在 Snowflake 複製/貼上資料,並使用資料流程來轉換 Snowflake 中的資料。 若要深入了解,請閱讀 Data Factory 或 Azure Synapse Analytics 簡介文章。
支援的功能
此 Snowflake 連接器支援下列活動:
- 在支援來源/接收器矩陣資料表的複製活動
- 對應資料流程
- 查閱活動
- 指令碼活動
針對複製活動,此 Snowflake 連接器支援下列功能:
- 用 Snowflake 的 COPY into [location] (英文) 命令從 Snowflake 複製資料,藉此達到最佳效能。
- 用 Snowflake 的 COPY into [table] (英文) 命令將資料複製到 Snowflake,藉此達到最佳效能。 這項命令支援 Azure 上的 Snowflake。
- 如果需要 Proxy 從自我裝載整合執行階段連線到 Snowflake,您必須在整合執行階段主機上設定 HTTP_PROXY 和 HTTPS_PROXY 環境變數。
必要條件
如果您的資料存放區位於內部部署網路、Azure 虛擬網路或 Amazon 虛擬私人雲端中,則必須設定自我裝載整合執行階段以與其連線。 請務必將自我裝載整合執行時間使用的 IP 位址新增至允許的清單。
如果您的資料存放區是受控雲端資料服務,則可使用 Azure Integration Runtime。 如果存取權僅限於防火牆規則中核准的 IP,您可以將Azure Integration Runtime IP新增至允許的清單。
用於來源或接收的 Snowflake 帳戶應該具有 USAGE
資料庫的必要存取權,以及架構上的讀取/寫入存取權,以及其下的資料表/檢視表。 此外,它也應該具有 CREATE STAGE
架構,才能使用 SAS URI 建立外部階段。
必須設定下列 Account 屬性值
屬性 | 描述 | 必要 | 預設 |
---|---|---|---|
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION | 指定在使用 CREATE STAGE) 存取私人雲端儲存體位置時,建立具名外部階段 (時,是否需要儲存體整合物件作為雲端認證。 | FALSE | FALSE |
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION | 指定是否需要使用具名的外部階段,在將資料從私人雲端儲存位置載入或卸載資料時,將儲存體整合物件參考為雲端認證。 | FALSE | FALSE |
如需 Data Factory 支援的網路安全性機制和選項的詳細資訊,請參閱資料存取策略。
開始使用
若要透過管線執行複製活動,您可以使用下列其中一個工具或 SDK:
使用 UI 建立 Snowflake 的連結服務
使用下列步驟,在 Azure 入口網站 UI 中建立 Snowflake 的連結服務。
瀏覽至 Azure Data Factory 或 Synapse 工作區中的 [管理] 索引標籤,並選取 [連結服務],然後按一下 [新增]:
搜尋 Snowflake 並選取 Snowflake 連接器。
設定服務詳細資料、測試連線,然後建立新的連結服務。
連接器設定詳細資料
下列各節提供屬性的相關詳細資料,這些屬性會定義 Snowflake 連接器的特定實體。
連結服務屬性
以下是針對 Snowflake 連結服務支援的屬性。
屬性 | 描述 | 必要 |
---|---|---|
type | type 屬性必須設定為 Snowflake。 | 是 |
connectionString | 指定連線到 Snowflake 執行個體所需的資訊。 您可以選擇將密碼或整個連接字串放在 Azure Key Vault。 如需詳細資料,請參閱資料表下方範例,以及在 Azure Key Vault 中儲存認證一文。 部分典型設定: - 帳戶名稱:Snowflake 帳戶的完整帳戶名稱 (包括識別區域和雲端平台的其他區段),例如 xy12345.east-us-2.azure。 - 使用者名稱:連線使用者的登入識別碼。 - 密碼:使用者的密碼。 - 資料庫:連接後要使用的預設資料庫。 通常是指定角色擁有權限的現有資料庫。 - 倉儲:連線後要使用的虛擬倉儲。 通常是指定角色擁有權限的現有倉儲。 - 角色:Snowflake 工作階段使用的預設存取控制角色。 所指定的角色應是已指派給特定使用者的現有角色。 預設角色為 PUBLIC。 |
是 |
connectVia | 用來連線到資料存放區的整合執行階段。 如果您的資料存放區位於私人網路,則可使用 Azure 整合執行階段或自我裝載整合執行階段。 如果未指定,則會使用預設的 Azure 整合執行階段。 | 否 |
範例︰
{
"name": "SnowflakeLinkedService",
"properties": {
"type": "Snowflake",
"typeProperties": {
"connectionString": "jdbc:snowflake://<accountname>.snowflakecomputing.com/?user=<username>&password=<password>&db=<database>&warehouse=<warehouse>&role=<myRole>"
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
Azure Key Vault 中的密碼:
{
"name": "SnowflakeLinkedService",
"properties": {
"type": "Snowflake",
"typeProperties": {
"connectionString": "jdbc:snowflake://<accountname>.snowflakecomputing.com/?user=<username>&db=<database>&warehouse=<warehouse>&role=<myRole>",
"password": {
"type": "AzureKeyVaultSecret",
"store": {
"referenceName": "<Azure Key Vault linked service name>",
"type": "LinkedServiceReference"
},
"secretName": "<secretName>"
}
},
"connectVia": {
"referenceName": "<name of Integration Runtime>",
"type": "IntegrationRuntimeReference"
}
}
}
資料集屬性
如需可用來定義資料集的區段和屬性完整清單,請參閱資料集一文。
以下是針對 Snowflake 資料集支援的屬性。
屬性 | 描述 | 必要 |
---|---|---|
type | 資料集的 type 屬性必須設定為 SnowflakeTable。 | 是 |
結構描述 | 結構描述的名稱。 請注意,結構描述名稱會區分大小寫。 | 否 (來源);是 (接收器) |
資料表 | 資料表/檢視的名稱。 請注意,表格名稱會區分大小寫。 | 否 (來源);是 (接收器) |
範例︰
{
"name": "SnowflakeDataset",
"properties": {
"type": "SnowflakeTable",
"typeProperties": {
"schema": "<Schema name for your Snowflake database>",
"table": "<Table name for your Snowflake database>"
},
"schema": [ < physical schema, optional, retrievable during authoring > ],
"linkedServiceName": {
"referenceName": "<name of linked service>",
"type": "LinkedServiceReference"
}
}
}
複製活動屬性
如需可用來定義活動的區段和屬性完整清單,請參閱管線一文。 本節提供 Snowflake 來源和接收器支援的屬性清單。
以 Snowflake 做為來源
Snowflake 連接器會用 Snowflake 的 COPY into [location] (英文) 命令達到最佳效能。
如果 Snowflake COPY 命令原生支援接收器資料存放區和格式,則可使用複製活動,直接從 Snowflake 複製到接收器。 如需詳細資訊,請參閱從 Snowflake 直接複製。 若否,請使用內建的從 Snowflake 暫存複本。
若要從 Snowflake 複製資料,複製活動的 [source] 區段支援下列屬性。
屬性 | 描述 | 必要 |
---|---|---|
type | 複製活動來源的 type 屬性必須設定為 SnowflakeSource。 | 是 |
查詢 | 指定 SQL 查詢從 Snowflake 讀取資料。 如果結構描述、資料表和資料行的名稱包含小寫,請在查詢中為物件識別碼加上引號,例如 select * from "schema"."myTable" 。不支援執行預存程序。 |
No |
exportSettings | 用來從 Snowflake 擷取資料的進階設定。 您可以在叫用陳述式時,將 COPY 支援的複本設定為服務會通過的命令。 | No |
在 exportSettings 底下: |
||
類型 | 匯出命令的型別,設定為 SnowflakeExportCopyCommand。 | Yes |
additionalCopyOptions | 其他複製選項,以機碼值組字典的形式提供。 範例:MAX_FILE_SIZE、OVERWRITE。 如需詳細資訊,請參閱 Snowflake 複製選項 (英文)。 | No |
additionalFormatOptions | 提供給 COPY 命令充當機碼值組字典的其他檔案格式選項。 範例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 如需詳細資訊,請參閱 Snowflake 格式型別選項 (英文)。 | No |
從 Snowflake 直接複製
如果您的接收器資料存放區和格式符合本節所述準則,則可使用複製活動,直接從 Snowflake 複製到接收器。 該服務會檢查設定,並在不符合下列準則時讓複製活動執行失敗:
接收器連結服務是具有共用存取簽章驗證的 Azure Blob 儲存體。 如果您想要以下列支援的格式直接將資料複製到 Azure Data Lake Storage Gen2,請針對 ADLS Gen2 帳戶建立具有 SAS 驗證的 Azure Blob 連結服務,以避免使用從 Snowflake 暫存複本。
接收器資料格式包括 Parquet、分隔符號文字 或 JSON,並具有下列設定:
- 針對 Parquet 格式,壓縮轉碼器為 None、Snappy 或 Lzo。
- 針對分隔符號文字格式:
rowDelimiter
為 \r\n 或任何單一字元。compression
可以是無壓縮、gzip、bzip2 或 deflate。encodingName
會保留為預設值,或設定為 utf-8。quoteChar
為雙引號、單引號或空字串 (沒有引號字元)。
- 針對 JSON 格式,直接複製僅支援來源 Snowflake 資料表或查詢結果只有單一資料行的案例,且此資料行的資料類型為 VARIANT、OBJECT 或 ARRAY。
compression
可以是無壓縮、gzip、bzip2 或 deflate。encodingName
會保留為預設值,或設定為 utf-8。filePattern
在複製活動接收器中保留為預設值,或設定為 setOfObjects。
在複製活動來源中,未指定
additionalColumns
。未指定資料行對應。
範例︰
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeSource",
"sqlReaderQuery": "SELECT * FROM MYTABLE",
"exportSettings": {
"type": "SnowflakeExportCopyCommand",
"additionalCopyOptions": {
"MAX_FILE_SIZE": "64000000",
"OVERWRITE": true
},
"additionalFormatOptions": {
"DATE_FORMAT": "'MM/DD/YYYY'"
}
}
},
"sink": {
"type": "<sink type>"
}
}
}
]
從 Snowflake 暫存複本
您的接收器資料存放區或格式與 Snowflake COPY 命令原生不相容時,如上一節所述,請使用過渡版 Azure Blob 儲存體執行個體啟用內建暫存複本。 分段複製功能也能提供更好的輸送量。 服務會將資料從 Snowflake 匯出至暫存儲存體,然後將資料複製到接收器,最後從暫存儲存體中清除暫存資料。 如需使用暫存複製資料的詳細資料,請參閱暫存複製。
如要使用此功能,請建立 Azure Blob 儲存體連結服務,參考具有過渡暫存的 Azure 儲存體帳戶。 然後,在複製活動中指定 enableStaging
和 stagingSettings
屬性。
注意
暫存 Azure Blob 儲存體連結服務必須使用「Snowflake COPY」命令所需的共用存取簽章驗證。 請務必在預備 Azure Blob 儲存體中授與 Snowflake 的適當存取權限。 若要深入瞭解,請參閱 這篇文章。
範例︰
"activities":[
{
"name": "CopyFromSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<Snowflake input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "SnowflakeSource",
"sqlReaderQuery": "SELECT * FROM MyTable"
},
"sink": {
"type": "<sink type>"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
以 Snowflake 做為接收器
Snowflake 連接器會用 Snowflake 的 COPY into [table] (英文) 命令達到最佳效能。 此功能支援將資料寫入 Azure 上的 Snowflake。
如果 Snowflake COPY 命令原生支援來源資料存放區和格式,則可使用複製活動,直接從來源複製到 Snowflake。 如需詳細資訊,請參閱直接複製到 Snowflake。 若否,請使用內建的暫存複本到 Snowflake。
若要將資料複製到 Snowflake,複製活動的 [sink] 區段支援下列屬性。
屬性 | 描述 | 必要 |
---|---|---|
type | 複製活動接收器的 type 屬性必須設定為 SnowflakeSink。 | Yes |
preCopyScript | 指定一個供複製活動在每次執行時將資料寫入到 Snowflake 前執行的 SQL 查詢。 使用此屬性來清除預先載入的資料。 | 否 |
importSettings | 用來將資料寫入 Snowflake 的進階設定。 您可以在叫用陳述式時,將 COPY 支援的複本設定為服務會通過的命令。 | No |
在 importSettings 底下: |
||
類型 | 匯入命令的型別,設定為 SnowflakeImportCopyCommand。 | Yes |
additionalCopyOptions | 其他複製選項,以機碼值組字典的形式提供。 範例:ON_ERROR、FORCE、LOAD_UNCERTAIN_FILES。 如需詳細資訊,請參閱 Snowflake 複製選項 (英文)。 | No |
additionalFormatOptions | 提供給 COPY 命令充當機碼值組字典的其他檔案格式選項。 範例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 如需詳細資訊,請參閱 Snowflake 格式型別選項 (英文)。 | No |
直接複製到 Snowflake
如果您的來源資料存放區和格式符合本節所述的準則,則可使用複製活動,直接從來源複製到 Snowflake。 該服務會檢查設定,並在不符合下列準則時讓複製活動執行失敗:
來源連結服務是具有共用存取簽章驗證的 Azure Blob 儲存體。 如果您想要以下列支援的格式直接從 Azure Data Lake Storage Gen2 複製資料,請針對 ADLS Gen2 帳戶建立具有 SAS 驗證的 Azure Blob 連結服務,以避免使用暫存複本到 Snowflake。
來源資料格式包括 Parquet、分隔符號文字 或 JSON,並具有下列設定:
針對 Parquet 格式,壓縮轉碼器為 None 或 Snappy。
針對分隔符號文字格式:
rowDelimiter
為 \r\n 或任何單一字元。 如果資料列分隔符號不是「\r\n」,則firstRowAsHeader
必須為 false,而且未指定skipLineCount
。compression
可以是無壓縮、gzip、bzip2 或 deflate。encodingName
保留為預設值,或設定為「UTF-8」、「UTF-16」、「UTF-16BE」、「UTF-32」、「UTF-32BE」、「BIG5」、「EUC-JP」、「EUC-KR」、「GB18030」、「ISO-2022-JP」、「ISO-2022-KR」、「ISO-8859-1」、「ISO-8859-2」、「ISO-8859-2」、「ISO-8859-5」、「ISO-8859-6」、「ISO-8859-7」、「ISO-8859-8」、「ISO-8859-9」、「WINDOWS-1250」、「WINDOWS-1251」、「WINDOWS-1252」、「WINDOWS-1253」、「WINDOWS-1254」、「WINDOWS-1255」。quoteChar
為雙引號、單引號或空字串 (沒有引號字元)。
針對 JSON 格式,直接複製僅支援接收器 Snowflake 資料表只有單一資料行的案例,且此資料行的資料類型為 VARIANT、OBJECT 或 ARRAY。
compression
可以是無壓縮、gzip、bzip2 或 deflate。encodingName
會保留為預設值,或設定為 utf-8。- 未指定資料行對應。
在複製活動來源中:
additionalColumns
未指定。- 如果來源是資料夾,
recursive
會設定為 true。 - 未指定
prefix
、modifiedDateTimeStart
、modifiedDateTimeEnd
和enablePartitionDiscovery
。
範例︰
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeSink",
"importSettings": {
"type": "SnowflakeImportCopyCommand",
"copyOptions": {
"FORCE": "TRUE",
"ON_ERROR": "SKIP_FILE",
},
"fileFormatOptions": {
"DATE_FORMAT": "YYYY-MM-DD",
}
}
}
}
}
]
暫存複本到 Snowflake
您的來源資料存放區或格式與 Snowflake COPY 命令原生不相容時,如上一節所述,請使用過渡版 Azure Blob 儲存體執行個體啟用內建暫存複本。 分段複製功能也能提供更好的輸送量。 該服務會自動轉換資料,以符合 Snowflake 的資料格式需求。 之後系統會叫用 COPY 命令,將資料載入 Snowflake。 最後,它會清除 Blob 儲存體中的暫存資料。 如需使用暫存複製資料的詳細資料,請參閱暫存複製。
如要使用此功能,請建立 Azure Blob 儲存體連結服務,參考具有過渡暫存的 Azure 儲存體帳戶。 然後,在複製活動中指定 enableStaging
和 stagingSettings
屬性。
注意
暫存 Azure Blob 儲存體連結服務需要使用「Snowflake COPY」命令所需的共用存取簽章驗證。
範例︰
"activities":[
{
"name": "CopyToSnowflake",
"type": "Copy",
"inputs": [
{
"referenceName": "<input dataset name>",
"type": "DatasetReference"
}
],
"outputs": [
{
"referenceName": "<Snowflake output dataset name>",
"type": "DatasetReference"
}
],
"typeProperties": {
"source": {
"type": "<source type>"
},
"sink": {
"type": "SnowflakeSink"
},
"enableStaging": true,
"stagingSettings": {
"linkedServiceName": {
"referenceName": "MyStagingBlob",
"type": "LinkedServiceReference"
},
"path": "mystagingpath"
}
}
}
]
對應資料流程屬性
在對應資料流中轉換資料時,您可以從 Snowflake 資料表中讀取並寫入資料。 如需詳細資訊,請參閱對應資料流程中的來源轉換和接收轉換。 您可以選擇使用 Snowflake 資料集或內嵌資料集做為來源和接收器型別。
來源轉換
下表列出 Snowflake 來源支援的屬性。 您可以在 [來源選項] 索引標籤中編輯這些屬性。連接器會使用 Snowflake 內部資料傳輸 (英文)。
名稱 | 描述 | 必要 | 允許的值 | 資料流程指令碼屬性 |
---|---|---|---|---|
資料表 | 如果您選取 [資料表] 做為輸入,資料流程會在使用內嵌資料集時,從 Snowflake 資料集或來源選項中指定的資料表擷取全部資料。 | 否 | String | (僅限內嵌資料集) tableName schemaName |
查詢 | 如果您選取 [查詢] 做為輸入,請輸入查詢以從 Snowflake 擷取資料。 此設定會覆寫您在資料集中選擇的任何資料表。 如果結構描述、資料表和資料行的名稱包含小寫,請在查詢中為物件識別碼加上引號,例如 select * from "schema"."myTable" 。 |
否 | String | 查詢 |
Snowflake 來源指令碼範例
使用 Snowflake 資料集做為來源型別時,相關聯的資料流程指令碼為:
source(allowSchemaDrift: true,
validateSchema: false,
query: 'select * from MYTABLE',
format: 'query') ~> SnowflakeSource
如果您使用內嵌資料集,相關聯的資料流程指令碼為:
source(allowSchemaDrift: true,
validateSchema: false,
format: 'query',
query: 'select * from MYTABLE',
store: 'snowflake') ~> SnowflakeSource
接收轉換
下表列出 Snowflake 接收器支援的屬性。 您可以在 [設定] 索引標籤中編輯這些屬性。使用內嵌資料集時,您會看到其他設定,與 [資料集屬性] 一節中所述的屬性相同。 連接器會使用 Snowflake 的內部資料傳輸 (英文)。
名稱 | 描述 | 必要 | 允許的值 | 資料流程指令碼屬性 |
---|---|---|---|---|
Update 方法 | 指定您的 Snowflake 目的地允許的作業。 若要更新、更新插入或刪除資料列,必須使用變更資料列轉換來標記這些動作的資料列。 |
Yes | true 或 false |
deletable insertable updateable upsertable |
索引鍵資料行 | 對於更新、更新插入和刪除,必須設定索引鍵資料行,以決定要改變哪一個資料列。 | No | Array | 金鑰 |
資料表動作 | 決定在寫入之前,是否要重新建立或移除目的地資料表中的所有資料列。 - 無:資料表不會執行任何動作。 - 重新建立:資料表會遭到捨棄並重新建立。 如果要動態建立新的資料表,則為必要。 - 截斷:系統將會移除目標資料表中的所有資料列。 |
No | true 或 false |
recreate truncate |
Snowflake 接收器指令碼範例
使用 Snowflake 資料集做為接收器型別時,相關聯的資料流程指令碼為:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
deletable:true,
insertable:true,
updateable:true,
upsertable:false,
keys:['movieId'],
format: 'table',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
如果您使用內嵌資料集,相關聯的資料流程指令碼為:
IncomingStream sink(allowSchemaDrift: true,
validateSchema: false,
format: 'table',
tableName: 'table',
schemaName: 'schema',
deletable: true,
insertable: true,
updateable: true,
upsertable: false,
store: 'snowflake',
skipDuplicateMapInputs: true,
skipDuplicateMapOutputs: true) ~> SnowflakeSink
查閱活動屬性
如需屬性的詳細資訊,請參閱查閱活動。
下一步
如需複製活動支援作為來源和接收器的資料存放區清單,請參閱支援的資料存放區和格式。