使用 Azure Data Factory 或 Azure Synapse Analytics 在 Snowflake 中複製和轉換資料

適用於:Azure Data Factory Azure Synapse Analytics

本文概述如何使用 Azure Data Factory 和 Azure Synapse Analytics 管線中的複製活動,在 Snowflake 複製/貼上資料,並使用資料流程來轉換 Snowflake 中的資料。 若要深入了解,請閱讀 Data FactoryAzure Synapse Analytics 簡介文章。

支援的功能

此 Snowflake 連接器支援下列活動:

針對複製活動,此 Snowflake 連接器支援下列功能:

  • 用 Snowflake 的 COPY into [location] (英文) 命令從 Snowflake 複製資料,藉此達到最佳效能。
  • 用 Snowflake 的 COPY into [table] (英文) 命令將資料複製到 Snowflake,藉此達到最佳效能。 這項命令支援 Azure 上的 Snowflake。
  • 如果需要 Proxy 從自我裝載整合執行階段連線到 Snowflake,您必須在整合執行階段主機上設定 HTTP_PROXY 和 HTTPS_PROXY 環境變數。

必要條件

如果您的資料存放區位於內部部署網路、Azure 虛擬網路或 Amazon 虛擬私人雲端中,則必須設定自我裝載整合執行階段以與其連線。 請務必將自我裝載整合執行時間使用的 IP 位址新增至允許的清單。

如果您的資料存放區是受控雲端資料服務,則可使用 Azure Integration Runtime。 如果存取權僅限於防火牆規則中核准的 IP,您可以將Azure Integration Runtime IP新增至允許的清單。

用於來源或接收的 Snowflake 帳戶應該具有 USAGE 資料庫的必要存取權,以及架構上的讀取/寫入存取權,以及其下的資料表/檢視表。 此外,它也應該具有 CREATE STAGE 架構,才能使用 SAS URI 建立外部階段。

必須設定下列 Account 屬性值

屬性 描述 必要 預設
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION 指定在使用 CREATE STAGE) 存取私人雲端儲存體位置時,建立具名外部階段 (時,是否需要儲存體整合物件作為雲端認證。 FALSE FALSE
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION 指定是否需要使用具名的外部階段,在將資料從私人雲端儲存位置載入或卸載資料時,將儲存體整合物件參考為雲端認證。 FALSE FALSE

如需 Data Factory 支援的網路安全性機制和選項的詳細資訊,請參閱資料存取策略

開始使用

若要透過管線執行複製活動,您可以使用下列其中一個工具或 SDK:

使用 UI 建立 Snowflake 的連結服務

使用下列步驟,在 Azure 入口網站 UI 中建立 Snowflake 的連結服務。

  1. 瀏覽至 Azure Data Factory 或 Synapse 工作區中的 [管理] 索引標籤,並選取 [連結服務],然後按一下 [新增]:

  2. 搜尋 Snowflake 並選取 Snowflake 連接器。

    Snowflake 連接器的螢幕擷取畫面。

  3. 設定服務詳細資料、測試連線,然後建立新的連結服務。

    Snowflake 連結服務組態的螢幕擷取畫面。

連接器設定詳細資料

下列各節提供屬性的相關詳細資料,這些屬性會定義 Snowflake 連接器的特定實體。

連結服務屬性

以下是針對 Snowflake 連結服務支援的屬性。

屬性 描述 必要
type type 屬性必須設定為 Snowflake
connectionString 指定連線到 Snowflake 執行個體所需的資訊。 您可以選擇將密碼或整個連接字串放在 Azure Key Vault。 如需詳細資料,請參閱資料表下方範例,以及在 Azure Key Vault 中儲存認證一文。

部分典型設定:
- 帳戶名稱:Snowflake 帳戶的完整帳戶名稱 (包括識別區域和雲端平台的其他區段),例如 xy12345.east-us-2.azure。
- 使用者名稱:連線使用者的登入識別碼。
- 密碼:使用者的密碼。
- 資料庫:連接後要使用的預設資料庫。 通常是指定角色擁有權限的現有資料庫。
- 倉儲:連線後要使用的虛擬倉儲。 通常是指定角色擁有權限的現有倉儲。
- 角色:Snowflake 工作階段使用的預設存取控制角色。 所指定的角色應是已指派給特定使用者的現有角色。 預設角色為 PUBLIC。
connectVia 用來連線到資料存放區的整合執行階段。 如果您的資料存放區位於私人網路,則可使用 Azure 整合執行階段或自我裝載整合執行階段。 如果未指定,則會使用預設的 Azure 整合執行階段。

範例︰

{
    "name": "SnowflakeLinkedService",
    "properties": {
        "type": "Snowflake",
        "typeProperties": {
            "connectionString": "jdbc:snowflake://<accountname>.snowflakecomputing.com/?user=<username>&password=<password>&db=<database>&warehouse=<warehouse>&role=<myRole>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Azure Key Vault 中的密碼:

{
    "name": "SnowflakeLinkedService",
    "properties": {
        "type": "Snowflake",
        "typeProperties": {
            "connectionString": "jdbc:snowflake://<accountname>.snowflakecomputing.com/?user=<username>&db=<database>&warehouse=<warehouse>&role=<myRole>",
            "password": {
                "type": "AzureKeyVaultSecret",
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>",
                    "type": "LinkedServiceReference"
                }, 
                "secretName": "<secretName>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

資料集屬性

如需可用來定義資料集的區段和屬性完整清單,請參閱資料集一文。

以下是針對 Snowflake 資料集支援的屬性。

屬性 描述 必要
type 資料集的 type 屬性必須設定為 SnowflakeTable
結構描述 結構描述的名稱。 請注意,結構描述名稱會區分大小寫。 否 (來源);是 (接收器)
資料表 資料表/檢視的名稱。 請注意,表格名稱會區分大小寫。 否 (來源);是 (接收器)

範例︰

{
    "name": "SnowflakeDataset",
    "properties": {
        "type": "SnowflakeTable",
        "typeProperties": {
            "schema": "<Schema name for your Snowflake database>",
            "table": "<Table name for your Snowflake database>"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "linkedServiceName": {
            "referenceName": "<name of linked service>",
            "type": "LinkedServiceReference"
        }
    }
}

複製活動屬性

如需可用來定義活動的區段和屬性完整清單,請參閱管線一文。 本節提供 Snowflake 來源和接收器支援的屬性清單。

以 Snowflake 做為來源

Snowflake 連接器會用 Snowflake 的 COPY into [location] (英文) 命令達到最佳效能。

如果 Snowflake COPY 命令原生支援接收器資料存放區和格式,則可使用複製活動,直接從 Snowflake 複製到接收器。 如需詳細資訊,請參閱從 Snowflake 直接複製。 若否,請使用內建的從 Snowflake 暫存複本

若要從 Snowflake 複製資料,複製活動的 [source] 區段支援下列屬性。

屬性 描述 必要
type 複製活動來源的 type 屬性必須設定為 SnowflakeSource
查詢 指定 SQL 查詢從 Snowflake 讀取資料。 如果結構描述、資料表和資料行的名稱包含小寫,請在查詢中為物件識別碼加上引號,例如 select * from "schema"."myTable"
不支援執行預存程序。
No
exportSettings 用來從 Snowflake 擷取資料的進階設定。 您可以在叫用陳述式時,將 COPY 支援的複本設定為服務會通過的命令。 No
exportSettings 底下:
類型 匯出命令的型別,設定為 SnowflakeExportCopyCommand Yes
additionalCopyOptions 其他複製選項,以機碼值組字典的形式提供。 範例:MAX_FILE_SIZE、OVERWRITE。 如需詳細資訊,請參閱 Snowflake 複製選項 (英文) No
additionalFormatOptions 提供給 COPY 命令充當機碼值組字典的其他檔案格式選項。 範例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 如需詳細資訊,請參閱 Snowflake 格式型別選項 (英文) No

從 Snowflake 直接複製

如果您的接收器資料存放區和格式符合本節所述準則,則可使用複製活動,直接從 Snowflake 複製到接收器。 該服務會檢查設定,並在不符合下列準則時讓複製活動執行失敗:

  • 接收器連結服務是具有共用存取簽章驗證的 Azure Blob 儲存體。 如果您想要以下列支援的格式直接將資料複製到 Azure Data Lake Storage Gen2,請針對 ADLS Gen2 帳戶建立具有 SAS 驗證的 Azure Blob 連結服務,以避免使用從 Snowflake 暫存複本

  • 接收器資料格式包括 Parquet分隔符號文字JSON,並具有下列設定:

    • 針對 Parquet 格式,壓縮轉碼器為 NoneSnappyLzo
    • 針對分隔符號文字格式:
      • rowDelimiter\r\n 或任何單一字元。
      • compression 可以是無壓縮gzipbzip2deflate
      • encodingName 會保留為預設值,或設定為 utf-8
      • quoteChar雙引號單引號空字串 (沒有引號字元)。
    • 針對 JSON 格式,直接複製僅支援來源 Snowflake 資料表或查詢結果只有單一資料行的案例,且此資料行的資料類型為 VARIANTOBJECTARRAY
      • compression 可以是無壓縮gzipbzip2deflate
      • encodingName 會保留為預設值,或設定為 utf-8
      • filePattern 在複製活動接收器中保留為預設值,或設定為 setOfObjects
  • 在複製活動來源中,未指定 additionalColumns

  • 未指定資料行對應。

範例︰

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeSource",
                "sqlReaderQuery": "SELECT * FROM MYTABLE",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "additionalCopyOptions": {
                        "MAX_FILE_SIZE": "64000000",
                        "OVERWRITE": true
                    },
                    "additionalFormatOptions": {
                        "DATE_FORMAT": "'MM/DD/YYYY'"
                    }
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

從 Snowflake 暫存複本

您的接收器資料存放區或格式與 Snowflake COPY 命令原生不相容時,如上一節所述,請使用過渡版 Azure Blob 儲存體執行個體啟用內建暫存複本。 分段複製功能也能提供更好的輸送量。 服務會將資料從 Snowflake 匯出至暫存儲存體,然後將資料複製到接收器,最後從暫存儲存體中清除暫存資料。 如需使用暫存複製資料的詳細資料,請參閱暫存複製

如要使用此功能,請建立 Azure Blob 儲存體連結服務,參考具有過渡暫存的 Azure 儲存體帳戶。 然後,在複製活動中指定 enableStagingstagingSettings 屬性。

注意

暫存 Azure Blob 儲存體連結服務必須使用「Snowflake COPY」命令所需的共用存取簽章驗證。 請務必在預備 Azure Blob 儲存體中授與 Snowflake 的適當存取權限。 若要深入瞭解,請參閱 這篇文章

範例︰

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeSource",
                "sqlReaderQuery": "SELECT * FROM MyTable"
            },
            "sink": {
                "type": "<sink type>"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

以 Snowflake 做為接收器

Snowflake 連接器會用 Snowflake 的 COPY into [table] (英文) 命令達到最佳效能。 此功能支援將資料寫入 Azure 上的 Snowflake。

如果 Snowflake COPY 命令原生支援來源資料存放區和格式,則可使用複製活動,直接從來源複製到 Snowflake。 如需詳細資訊,請參閱直接複製到 Snowflake。 若否,請使用內建的暫存複本到 Snowflake

若要將資料複製到 Snowflake,複製活動的 [sink] 區段支援下列屬性。

屬性 描述 必要
type 複製活動接收器的 type 屬性必須設定為 SnowflakeSink Yes
preCopyScript 指定一個供複製活動在每次執行時將資料寫入到 Snowflake 前執行的 SQL 查詢。 使用此屬性來清除預先載入的資料。
importSettings 用來將資料寫入 Snowflake 的進階設定。 您可以在叫用陳述式時,將 COPY 支援的複本設定為服務會通過的命令。 No
importSettings 底下:
類型 匯入命令的型別,設定為 SnowflakeImportCopyCommand Yes
additionalCopyOptions 其他複製選項,以機碼值組字典的形式提供。 範例:ON_ERROR、FORCE、LOAD_UNCERTAIN_FILES。 如需詳細資訊,請參閱 Snowflake 複製選項 (英文) No
additionalFormatOptions 提供給 COPY 命令充當機碼值組字典的其他檔案格式選項。 範例:DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 如需詳細資訊,請參閱 Snowflake 格式型別選項 (英文) No

直接複製到 Snowflake

如果您的來源資料存放區和格式符合本節所述的準則,則可使用複製活動,直接從來源複製到 Snowflake。 該服務會檢查設定,並在不符合下列準則時讓複製活動執行失敗:

  • 來源連結服務是具有共用存取簽章驗證的 Azure Blob 儲存體。 如果您想要以下列支援的格式直接從 Azure Data Lake Storage Gen2 複製資料,請針對 ADLS Gen2 帳戶建立具有 SAS 驗證的 Azure Blob 連結服務,以避免使用暫存複本到 Snowflake

  • 來源資料格式包括 Parquet分隔符號文字JSON,並具有下列設定:

    • 針對 Parquet 格式,壓縮轉碼器為 NoneSnappy

    • 針對分隔符號文字格式:

      • rowDelimiter\r\n 或任何單一字元。 如果資料列分隔符號不是「\r\n」,則 firstRowAsHeader 必須為 false,而且未指定 skipLineCount
      • compression 可以是無壓縮gzipbzip2deflate
      • encodingName 保留為預設值,或設定為「UTF-8」、「UTF-16」、「UTF-16BE」、「UTF-32」、「UTF-32BE」、「BIG5」、「EUC-JP」、「EUC-KR」、「GB18030」、「ISO-2022-JP」、「ISO-2022-KR」、「ISO-8859-1」、「ISO-8859-2」、「ISO-8859-2」、「ISO-8859-5」、「ISO-8859-6」、「ISO-8859-7」、「ISO-8859-8」、「ISO-8859-9」、「WINDOWS-1250」、「WINDOWS-1251」、「WINDOWS-1252」、「WINDOWS-1253」、「WINDOWS-1254」、「WINDOWS-1255」。
      • quoteChar雙引號單引號空字串 (沒有引號字元)。
    • 針對 JSON 格式,直接複製僅支援接收器 Snowflake 資料表只有單一資料行的案例,且此資料行的資料類型為 VARIANTOBJECTARRAY

      • compression 可以是無壓縮gzipbzip2deflate
      • encodingName 會保留為預設值,或設定為 utf-8
      • 未指定資料行對應。
  • 在複製活動來源中:

    • additionalColumns 未指定。
    • 如果來源是資料夾,recursive 會設定為 true。
    • 未指定 prefixmodifiedDateTimeStartmodifiedDateTimeEndenablePartitionDiscovery

範例︰

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeSink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "copyOptions": {
                        "FORCE": "TRUE",
                        "ON_ERROR": "SKIP_FILE",
                    },
                    "fileFormatOptions": {
                        "DATE_FORMAT": "YYYY-MM-DD",
                    }
                }
            }
        }
    }
]

暫存複本到 Snowflake

您的來源資料存放區或格式與 Snowflake COPY 命令原生不相容時,如上一節所述,請使用過渡版 Azure Blob 儲存體執行個體啟用內建暫存複本。 分段複製功能也能提供更好的輸送量。 該服務會自動轉換資料,以符合 Snowflake 的資料格式需求。 之後系統會叫用 COPY 命令,將資料載入 Snowflake。 最後,它會清除 Blob 儲存體中的暫存資料。 如需使用暫存複製資料的詳細資料,請參閱暫存複製

如要使用此功能,請建立 Azure Blob 儲存體連結服務,參考具有過渡暫存的 Azure 儲存體帳戶。 然後,在複製活動中指定 enableStagingstagingSettings 屬性。

注意

暫存 Azure Blob 儲存體連結服務需要使用「Snowflake COPY」命令所需的共用存取簽章驗證。

範例︰

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeSink"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

對應資料流程屬性

在對應資料流中轉換資料時,您可以從 Snowflake 資料表中讀取並寫入資料。 如需詳細資訊,請參閱對應資料流程中的來源轉換接收轉換。 您可以選擇使用 Snowflake 資料集或內嵌資料集做為來源和接收器型別。

來源轉換

下表列出 Snowflake 來源支援的屬性。 您可以在 [來源選項] 索引標籤中編輯這些屬性。連接器會使用 Snowflake 內部資料傳輸 (英文)

名稱 描述 必要 允許的值 資料流程指令碼屬性
資料表 如果您選取 [資料表] 做為輸入,資料流程會在使用內嵌資料集時,從 Snowflake 資料集或來源選項中指定的資料表擷取全部資料。 String (僅限內嵌資料集)
tableName
schemaName
查詢 如果您選取 [查詢] 做為輸入,請輸入查詢以從 Snowflake 擷取資料。 此設定會覆寫您在資料集中選擇的任何資料表。
如果結構描述、資料表和資料行的名稱包含小寫,請在查詢中為物件識別碼加上引號,例如 select * from "schema"."myTable"
String 查詢

Snowflake 來源指令碼範例

使用 Snowflake 資料集做為來源型別時,相關聯的資料流程指令碼為:

source(allowSchemaDrift: true,
	validateSchema: false,
	query: 'select * from MYTABLE',
	format: 'query') ~> SnowflakeSource

如果您使用內嵌資料集,相關聯的資料流程指令碼為:

source(allowSchemaDrift: true,
	validateSchema: false,
	format: 'query',
	query: 'select * from MYTABLE',
	store: 'snowflake') ~> SnowflakeSource

接收轉換

下表列出 Snowflake 接收器支援的屬性。 您可以在 [設定] 索引標籤中編輯這些屬性。使用內嵌資料集時,您會看到其他設定,與 [資料集屬性] 一節中所述的屬性相同。 連接器會使用 Snowflake 的內部資料傳輸 (英文)

名稱 描述 必要 允許的值 資料流程指令碼屬性
Update 方法 指定您的 Snowflake 目的地允許的作業。
若要更新、更新插入或刪除資料列,必須使用變更資料列轉換來標記這些動作的資料列。
Yes truefalse deletable
insertable
updateable
upsertable
索引鍵資料行 對於更新、更新插入和刪除,必須設定索引鍵資料行,以決定要改變哪一個資料列。 No Array 金鑰
資料表動作 決定在寫入之前,是否要重新建立或移除目的地資料表中的所有資料列。
- :資料表不會執行任何動作。
- 重新建立:資料表會遭到捨棄並重新建立。 如果要動態建立新的資料表,則為必要。
- 截斷:系統將會移除目標資料表中的所有資料列。
No truefalse recreate
truncate

Snowflake 接收器指令碼範例

使用 Snowflake 資料集做為接收器型別時,相關聯的資料流程指令碼為:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	deletable:true,
	insertable:true,
	updateable:true,
	upsertable:false,
	keys:['movieId'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

如果您使用內嵌資料集,相關聯的資料流程指令碼為:

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	format: 'table',
	tableName: 'table',
	schemaName: 'schema',
	deletable: true,
	insertable: true,
	updateable: true,
	upsertable: false,
	store: 'snowflake',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

查閱活動屬性

如需屬性的詳細資訊,請參閱查閱活動

下一步

如需複製活動支援作為來源和接收器的資料存放區清單,請參閱支援的資料存放區和格式