Azure Data Factory または Azure Synapse Analytics を使用して、Snowflake のデータをコピーして変換します。

適用対象: Azure Data Factory Azure Synapse Analytics

ヒント

企業向けのオールインワン分析ソリューション、Microsoft Fabric の Data Factory をお試しください。 Microsoft Fabric は、データ移動からデータ サイエンス、リアルタイム分析、ビジネス インテリジェンス、レポートまで、あらゆるものをカバーしています。 無料で新しい試用版を開始する方法について説明します。

この記事では、Azure Data Factory および Azure Synapse パイプラインのコピー アクティビティを使用して、Snowflake との間でデータをコピーし合い、Data Flow を使用して Snowflake のデータを変換する方法について説明します。 詳細については、Data Factory または Azure Synapse Analytics の概要記事を参照してください。

重要

新しい Snowflake コネクタでは、ネイティブ Snowflake のサポートが強化されています。 下位互換性のためにのみ現状のままサポートされているレガシ Snowflake コネクタをソリューションの中で使用している場合は、Snowflake コネクタ (レガシ) に関する記事を参照してください。

サポートされる機能

この Snowflake コネクタは、次の機能でサポートされます。

サポートされる機能 IR
Copy アクティビティ (ソース/シンク) ① ②
マッピング データ フロー (ソース/シンク)
Lookup アクティビティ ① ②
スクリプト活動 ① ②

① Azure 統合ランタイム ② セルフホステッド統合ランタイム

コピー アクティビティの場合、この Snowflake コネクタは次の機能をサポートします。

  • Snowflake からのデータのコピー。Snowflake の COPY into [location] コマンドを利用して、最適なパフォーマンスを実現します。
  • Snowflake へのデータのコピー。Snowflake の COPY into [table] コマンドを利用して、最適なパフォーマンスを実現します。 Azure 上の Snowflake がサポートされています。
  • セルフホステッド Integration Runtime から Snowflake に接続するためにプロキシが必要な場合は、Integration Runtime ホストで HTTP_PROXY と HTTPS_PROXY の環境変数を設定する必要があります。

前提条件

データ ストアがオンプレ ミスネットワーク、Azure 仮想ネットワーク、または Amazon Virtual Private Cloud 内にある場合は、それに接続するようセルフホステッド統合ランタイムを構成する必要があります。 セルフホステッド統合ランタイムが使用する IP アドレスを許可リストに追加する必要があります。

データ ストアがマネージド クラウド データ サービスである場合は、Azure Integration Runtime を使用できます。 ファイアウォール規則で承認されている IP にアクセスが制限されている場合は、Azure Integration Runtime の IP を許可リストに追加できます。

ソースまたはシンクに使用される Snowflake アカウントには、データベースに対する必要な USAGE アクセス権と、スキーマおよびその下のテーブル/ビューに対する読み取り/書き込みアクセス権が必要です。 さらに、SAS URI を使用して外部ステージを作成できるように、スキーマ上に CREATE STAGE も存在する必要があります。

次のアカウント プロパティの値を設定する必要があります

プロパティ 内容 必要 Default
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_CREATION プライベート クラウド ストレージの場所にアクセスするために (CREATE STAGE を使用して) 名前付き外部ステージを作成するときに、ストレージ統合オブジェクトをクラウド資格情報として要求するかどうかを指定します。 FALSE FALSE
REQUIRE_STORAGE_INTEGRATION_FOR_STAGE_OPERATION プライベート クラウド ストレージの場所へのデータの読み込みまたはアンロードを行うときに、ストレージ統合オブジェクトをクラウド資格情報として参照する名前付き外部ステージの使用を要求するかどうかを指定します。 FALSE FALSE

Data Factory によってサポートされるネットワーク セキュリティ メカニズムやオプションの詳細については、「データ アクセス戦略」を参照してください。

はじめに

パイプラインでコピー アクティビティを実行するには、次のいずれかのツールまたは SDK を使用します。

UI を使用して Snowflake のリンク サービスを作成する

次の手順を使用して、Azure portal UI で Snowflake のリンク サービスを作成します。

  1. Azure Data Factory または Synapse ワークスペースの [管理] タブに移動し、[リンク サービス] を選択して、[新規] をクリックします。

  2. Snowflake を検索し、Snowflake コネクタを選択します。

    Screenshot of the Snowflake connector.

  3. サービスの詳細を構成し、接続をテストして、新しいリンク サービスを作成します。

    Screenshot of linked service configuration for Snowflake.

コネクタの構成の詳細

次のセクションでは、Snowflake コネクタに固有のエンティティを定義するプロパティについて詳しく説明します。

リンクされたサービス プロパティ

Snowflake のリンク サービスでは、次の汎用プロパティがサポートされます。

プロパティ 内容 必須
type type プロパティは SnowflakeV2 に設定する必要があります。 はい
accountIdentifier その組織とアカウントの名前。 たとえば、myorg-account123。 はい
database 接続後にセッションで使用される既定のデータベース。 はい
倉庫 接続後にセッションで使用される既定の仮想ウェアハウス。 はい
authenticationType Snowflake サービスへの接続に使用される認証の種類。 使用することができる値は、Basic (既定値) と KeyPair です。 それぞれのプロパティとサンプルについては、以下の対応するセクションを参照してください。 いいえ
role 接続後にセッションで使用される既定のセキュリティ ロール。 いいえ
connectVia データ ストアに接続するために使用される統合ランタイム。 Azure 統合ランタイムまたはセルフホステッド統合ランタイムを使用できます (データ ストアがプライベート ネットワークにある場合)。 指定されていない場合は、既定の Azure 統合ランタイムが使用されます。 いいえ

この Snowflake コネクタでは、次の認証の種類がサポートされています。 詳細については、対応するセクションをご覧ください。

基本認証

前のセクションで説明した汎用的なプロパティに加えて、基本認証を使用するには、次のプロパティを指定します。

プロパティ 内容 必須
ユーザー Snowflake ユーザーのログイン名。 はい
password Snowflake ユーザーのパスワード。 安全に保存するには、このフィールドを SecureString 型としてマークします。 Azure Key Vault に格納されているシークレットを参照することもできます。 はい

例:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "SecureString",
                "value": "<password>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

Azure Key Vault 内のパスワード:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "Basic",
            "user": "<username>",
            "password": {
                "type": "AzureKeyVaultSecret",
                "store": { 
                    "referenceName": "<Azure Key Vault linked service name>",
                    "type": "LinkedServiceReference"
                }, 
                "secretName": "<secretName>"
            }
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

キー ペア認証

キー ペア認証を使用するには、「キー ペア認証とキー ペア ローテーション」を参照して、Snowflake 内にキー ペア認証ユーザーを構成および作成する必要があります。 その後、秘密キーとパスフレーズ (省略可能) を書き留めます (リンク サービスを定義するために使用します)。

前のセクションで説明した汎用的なプロパティに加えて、次のプロパティを指定します。

プロパティ 内容 必須
ユーザー Snowflake ユーザーのログイン名。 はい
privateKey キー ペア認証に使用される秘密キー。

Azure Data Factory に送信される際に秘密キーが有効であることを確認し、privateKey ファイルに改行文字 (\n) が含まれることを考慮するには、privateKey の内容を文字列リテラル形式で正しく書式設定することが不可欠です。 このプロセスでは、各改行に「\n」を明示的に追加します。
はい
privateKeyPassphrase 秘密キーの暗号化解除に使用されるパスフレーズ (暗号化されている場合)。 いいえ

例:

{
    "name": "SnowflakeV2LinkedService",
    "properties": {
        "type": "SnowflakeV2",
        "typeProperties": {
            "accountIdentifier": "<accountIdentifier>",
            "database": "<database>",
            "warehouse": "<warehouse>",
            "authenticationType": "KeyPair",
            "user": "<username>",
            "privateKey": {
                "type": "SecureString",
                "value": "<privateKey>"
            },
            "privateKeyPassphrase": { 
                "type": "SecureString",
                "value": "<privateKeyPassphrase>"
            },
            "role": "<role>"
        },
        "connectVia": {
            "referenceName": "<name of Integration Runtime>",
            "type": "IntegrationRuntimeReference"
        }
    }
}

データセットのプロパティ

データセットを定義するために使用できるセクションとプロパティの完全な一覧については、データセットに関する記事をご覧ください。

Snowflake データセットでは、次のプロパティがサポートされます。

プロパティ 内容 必須
type データセットの type プロパティは SnowflakeV2Table に設定する必要があります。 はい
schema スキーマの名前。 スキーマ名は、大文字と小文字が区別されることに注意してください。 ソースの場合はいいえ、シンクの場合ははい
table テーブル/ビューの名前。 テーブル名は、大文字と小文字が区別されることに注意してください。 ソースの場合はいいえ、シンクの場合ははい

例:

{
    "name": "SnowflakeV2Dataset",
    "properties": {
        "type": "SnowflakeV2Table",
        "typeProperties": {
            "schema": "<Schema name for your Snowflake database>",
            "table": "<Table name for your Snowflake database>"
        },
        "schema": [ < physical schema, optional, retrievable during authoring > ],
        "linkedServiceName": {
            "referenceName": "<name of linked service>",
            "type": "LinkedServiceReference"
        }
    }
}

コピー アクティビティのプロパティ

アクティビティの定義に利用できるセクションとプロパティの完全な一覧については、パイプラインに関する記事を参照してください。 このセクションでは、Snowflake のソースとシンクでサポートされるプロパティの一覧を示します。

ソースとしての Snowflake

Snowflake コネクタは、Snowflake の COPY into [location] コマンドを利用して、最適なパフォーマンスを実現します。

シンクのデータ ストアと形式が Snowflake COPY コマンドでネイティブにサポートされている場合は、コピー アクティビティを使用して Snowflake からシンクに直接コピーできます。 詳しくは、「Snowflake から直接コピーする」をご覧ください。 それ以外の場合は、組み込みの Snowflake からのステージング コピーを使用します。

Snowflake からデータをコピーするために、コピー アクティビティの source セクションでは次のプロパティがサポートされています。

プロパティ 内容 必須
type Copy アクティビティのソースの type プロパティは SnowflakeV2Source に設定する必要があります。 はい
query Snowflake からデータを読み取る SQL クエリを指定します。 スキーマ、テーブル、および列の名前に小文字が含まれている場合は、クエリでオブジェクト識別子を引用符で囲みます (例: select * from "schema"."myTable")。
ストアド プロシージャの実行はサポートされていません。
いいえ
exportSettings Snowflake からデータを取得するために使用される詳細設定。 COPY into コマンドでサポートされるものを構成できます。これは、ステートメントを呼び出すときにこのサービスによって渡されます。 はい
exportSettings の下:
type エクスポート コマンドの type を SnowflakeExportCopyCommand に設定します。 はい
additionalCopyOptions 追加のコピー オプション。キーと値のペアのディクショナリとして指定されます。 例 :MAX_FILE_SIZE、OVERWRITE。 詳細については、「Snowflake コピー オプション」を参照してください。 いいえ
additionalFormatOptions キーと値のペアのディクショナリとして COPY コマンドに指定される、追加のファイル形式オプション。 例 :DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 詳細については、「Snowflake 形式の種類のオプション」を参照してください。 いいえ

注意

次のコマンドを実行し、INFORMATION_SCHEMA スキーマと COLUMNS テーブルにアクセスするアクセス許可があることを確認します。

  • COPY INTO <location>

Snowflake から直接コピーする

シンクのデータ ストアと形式がこのセクションで説明する基準を満たす場合は、コピー アクティビティを使用して、Snowflake からシンクに直接コピーできます。 次の条件が満たされていない場合は、サービスが設定をチェックして、Copy アクティビティの実行は失敗します。

  • シンクのリンクされたサービスが、Shared Access Signature 認証を使用する Azure Blob Storage です。 サポートされている次の形式で Azure Data Lake Storage Gen2 にデータを直接コピーする場合は、ADLS Gen2 アカウントに対する SAS 認証を使用して Azure BLOB のリンクされたサービスを作成し、Snowflake からのステージング コピーを使用しないようにすることができます。

  • シンク データ形式が、次のように構成された Parquet区切りテキスト、または JSON です。

    • Parquet 形式の場合は、圧縮コーデックが NoneSnappy、または Lzo です。
    • 区切りテキスト形式の場合:
      • rowDelimiter\r\n または任意の 1 文字です。
      • compression が、no compressiongzipbzip2、または deflate です。
      • encodingName が既定値のままか、utf-8 に設定されている。
      • quoteChar が、double quotesingle quote、または empty string (引用符なし) です。
    • JSON 形式の場合、直接コピーでは、ソースの Snowflake テーブルまたはクエリ結果に 1 つの列しかなく、この列のデータ型が VARIANTOBJECT、または ARRAY であるケースのみがサポートされます。
      • compression が、no compressiongzipbzip2、または deflate です。
      • encodingName が既定値のままか、utf-8 に設定されている。
      • コピー アクティビティのシンクでは、filePattern は既定値のままにするか、setOfObjects に設定します。
  • Copy アクティビティのソース内で、additionalColumns が指定されていません。

  • 列マッピングが指定されていません。

例:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",
                "sqlReaderQuery": "SELECT * FROM MYTABLE",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand",
                    "additionalCopyOptions": {
                        "MAX_FILE_SIZE": "64000000",
                        "OVERWRITE": true
                    },
                    "additionalFormatOptions": {
                        "DATE_FORMAT": "'MM/DD/YYYY'"
                    }
                }
            },
            "sink": {
                "type": "<sink type>"
            }
        }
    }
]

Snowflake からのステージング コピー

前のセクションで説明したように、シンクのデータ ストアまたは形式が Snowflake COPY コマンドとネイティブに互換性がない場合は、中間の Azure Blob Storage インスタンスを使用して組み込みのステージング コピーを有効にします。 ステージング コピー機能はスループットも優れています。 Snowflake のデータをステージング ストレージにエクスポートしてから、データをシンクにコピーし、最後にステージング ストレージの一時データをクリーンアップします。 ステージングを使用したデータのコピーの詳細は、「ステージング コピー」を参照してください。

この機能を使うには、中間ステージとして、Azure ストレージ アカウントを参照する Azure Blob Storage のリンクされたサービスを作成します。 次に、コピー アクティビティに enableStaging プロパティと stagingSettings プロパティを指定します。

注意

ステージングのための Azure Blob Storage のリンクされたサービスは、Snowflake COPY コマンドで必要な、Shared Access Signature 認証を使用する必要があります。 ステージング Azure Blob Storage で Snowflake に適切なアクセス許可を付与していることを確認してください。 この詳細については、こちらの記事を参照してください。

例:

"activities":[
    {
        "name": "CopyFromSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<Snowflake input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "SnowflakeV2Source",               
                "sqlReaderQuery": "SELECT * FROM MyTable",
                "exportSettings": {
                    "type": "SnowflakeExportCopyCommand"
                }
            },
            "sink": {
                "type": "<sink type>"
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

シンクとしての Snowflake

Snowflake コネクタは、Snowflake の COPY into [table] コマンドを利用して、最適なパフォーマンスを実現します。 Azure での Snowflake へのデータの書き込みがサポートされています。

ソースのデータ ストアと形式が Snowflake COPY コマンドでネイティブにサポートされている場合は、コピー アクティビティを使用してソースから Snowflake に直接コピーできます。 詳しくは、「Snowflake に直接コピーする」をご覧ください。 それ以外の場合は、組み込みの Snowflake へのステージング コピーを使用します。

Snowflake にデータをコピーするために、コピー アクティビティの sink セクションでは次のプロパティがサポートされています。

プロパティ 内容 必須
type Copy アクティビティの sink の type プロパティは、SnowflakeV2Sink に設定します。 はい
preCopyScript コピー アクティビティの毎回の実行で、データを Snowflake に書き込む前に実行する SQL クエリを指定します。 前に読み込まれたデータをクリーンアップするには、このプロパティを使います。 いいえ
importSettings Snowflake にデータを書き込むために使用される詳細設定。 COPY into コマンドでサポートされるものを構成できます。これは、ステートメントを呼び出すときにこのサービスによって渡されます。 はい
importSettings の下:
type インポート コマンドの type を SnowflakeImportCopyCommand に設定します。 はい
additionalCopyOptions 追加のコピー オプション。キーと値のペアのディクショナリとして指定されます。 例 :ON_ERROR、FORCE、LOAD_UNCERTAIN_FILES。 詳細については、「Snowflake コピー オプション」を参照してください。 いいえ
additionalFormatOptions キーと値のペアのディクショナリとして COPY コマンドに指定される、追加のファイル形式オプション。 例 :DATE_FORMAT、TIME_FORMAT、TIMESTAMP_FORMAT。 詳細については、「Snowflake 形式の種類のオプション」を参照してください。 いいえ

注意

次のコマンドを実行し、INFORMATION_SCHEMA スキーマと COLUMNS テーブルにアクセスするアクセス許可があることを確認します。

  • SELECT CURRENT_REGION()
  • COPY INTO <table>
  • SHOW REGIONS
  • CREATE OR REPLACE STAGE
  • DROP STAGE

Snowflake に直接コピーする

ソースのデータ ストアと形式がこのセクションで説明する基準を満たす場合は、コピー アクティビティを使用して、ソースから Snowflake に直接コピーできます。 次の条件が満たされていない場合は、サービスが設定をチェックして、Copy アクティビティの実行は失敗します。

  • ソースのリンクされたサービスが、Shared Access Signature 認証を使用する Azure Blob Storage です。 サポートされている次の形式で Azure Data Lake Storage Gen2 からデータを直接コピーする場合は、ADLS Gen2 アカウントに対する SAS 認証を使用して Azure BLOB のリンク サービスを作成し、Snowflake へのステージング コピーを使用しないようにできます。

  • ソース データ形式が、次のように構成された Parquet区切りテキスト、または JSON です。

    • Parquet 形式の場合は、圧縮コーデックが None またはSnappyです。

    • 区切りテキスト形式の場合:

      • rowDelimiter\r\n または任意の 1 文字です。 行区切り記号が “\r\n” ではない場合は、firstRowAsHeaderfalse にする必要があり、skipLineCount が指定されていません。
      • compression が、no compressiongzipbzip2、または deflate です。
      • encodingName が既定値のままになっているか、"UTF-8"、"UTF-16"、"UTF-16BE"、"UTF-32"、"UTF-32BE"、"BIG5"、"EUC-JP"、"EUC-KR"、"GB18030"、"ISO-2022-JP"、"ISO-2022-KR"、"ISO-8859-1"、"ISO-8859-2"、"ISO-8859-5"、"ISO-8859-6"、"ISO-8859-7"、"ISO-8859-8"、"ISO-8859-9"、"WINDOWS-1250"、"WINDOWS-1251"、"WINDOWS-1252"、"WINDOWS-1253"、"WINDOWS-1254"、"WINDOWS-1255" に設定されています。
      • quoteChar が、double quotesingle quote、または empty string (引用符なし) です。
    • JSON 形式の場合、直接コピーでは、シンクの Snowflake テーブルに 1 つの列しかなく、この列のデータ型が VARIANTOBJECT、または ARRAY であるケースのみがサポートされます。

      • compression が、no compressiongzipbzip2、または deflate です。
      • encodingName が既定値のままか、utf-8 に設定されている。
      • 列マッピングが指定されていません。
  • コピー アクティビティ ソース内:

    • additionalColumns が指定されていません。
    • ソースがフォルダーの場合、recursive が true に設定されています。
    • prefixmodifiedDateTimeStartmodifiedDateTimeEnd、および enablePartitionDiscovery が指定されていない。

例:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand",
                    "copyOptions": {
                        "FORCE": "TRUE",
                        "ON_ERROR": "SKIP_FILE"
                    },
                    "fileFormatOptions": {
                        "DATE_FORMAT": "YYYY-MM-DD"
                    }
                }
            }
        }
    }
]

Snowflake へのステージング コピー

前のセクションで説明したように、ソースのデータ ストアまたは形式が Snowflake COPY コマンドとネイティブに互換性がない場合は、中間の Azure Blob Storage インスタンスを使用して組み込みのステージング コピーを有効にします。 ステージング コピー機能はスループットも優れています。 Snowflake のデータ形式要件を満たすようにデータを自動的に変換します。 次に、COPY コマンドを呼び出して、Snowflake にデータを読み込みます。 最後に、BLOB ストレージから一時データをクリーンアップします。 ステージングを使用したデータのコピーの詳細は、「ステージング コピー」を参照してください。

この機能を使うには、中間ステージとして、Azure ストレージ アカウントを参照する Azure Blob Storage のリンクされたサービスを作成します。 次に、コピー アクティビティに enableStaging プロパティと stagingSettings プロパティを指定します。

Note

ステージングのための Azure Blob Storage のリンクされたサービスは、Snowflake COPY コマンドで必要な、Shared Access Signature 認証を使用する必要があります。

例:

"activities":[
    {
        "name": "CopyToSnowflake",
        "type": "Copy",
        "inputs": [
            {
                "referenceName": "<input dataset name>",
                "type": "DatasetReference"
            }
        ],
        "outputs": [
            {
                "referenceName": "<Snowflake output dataset name>",
                "type": "DatasetReference"
            }
        ],
        "typeProperties": {
            "source": {
                "type": "<source type>"
            },
            "sink": {
                "type": "SnowflakeV2Sink",
                "importSettings": {
                    "type": "SnowflakeImportCopyCommand"
                }
            },
            "enableStaging": true,
            "stagingSettings": {
                "linkedServiceName": {
                    "referenceName": "MyStagingBlob",
                    "type": "LinkedServiceReference"
                },
                "path": "mystagingpath"
            }
        }
    }
]

Mapping Data Flow のプロパティ

マッピング データ フローでデータを変換する場合、Snowflake のテーブルから読み書きすることができます。 詳細については、マッピング データ フローのソース変換シンク変換に関する記事をご覧ください。 ソースとシンクの種類として、Snowflake データセットまたはインライン データセットを使用することができます。

ソース変換

次の表に、Snowflake ソースでサポートされるプロパティの一覧を示します。 これらのプロパティは、 [ソース オプション] タブで編集できます。コネクタは、Snowflake 内部データ転送を利用します。

名前 説明 必須 使用できる値 データ フロー スクリプトのプロパティ
テーブル 入力として [テーブル] を選択した場合、データ フローは、Snowflake データセットで指定された、またはインライン データセットを使用するときにソース オプションで指定されたテーブルからすべてのデータをフェッチします。 いいえ String (インライン データセットのみ)
tableName
schemaName
クエリ 入力として [クエリ] を選択した場合は、Snowflake からデータをフェッチするクエリを入力します。 この設定により、データセットで選択したすべてのテーブルがオーバーライドされます。
スキーマ、テーブル、および列の名前に小文字が含まれている場合は、クエリでオブジェクト識別子を引用符で囲みます (例: select * from "schema"."myTable")。
いいえ String query
増分抽出を有効にする (プレビュー) このオプションを使用して、パイプラインが前回実行されてから変更された行のみを処理するように ADF に指示します。 いいえ ブール型 enableCdc
増分列 増分抽出機能を使用する場合は、ソース テーブルのウォーターマークとして使用する日時または数値列を選択する必要があります。 いいえ String waterMarkColumn
Snowflake の変更追跡を有効にする (プレビュー) このオプションを使用すると、ADF が Snowflake 変更データ キャプチャ テクノロジを利用して、前回のパイプライン実行以降の差分データのみを処理できるようになります。 このオプションは、増分列を必要とせずに、行の挿入、更新、削除の操作による差分データを自動的に読み込みます。 いいえ ブール型 enableNativeCdc
正味変更 Snowflake の変更追跡を使用するときに、このオプションを使用して、重複除去された変更行または完全な変更を取得できます。 重複除去された変更行には、特定の時点以降に変更された行の最新バージョンのみが表示されますが、完全な変更では、削除または更新された行を含め、変更された各行のすべてのバージョンが表示されます。 たとえば、行を更新すると、完全な変更では削除バージョンと挿入バージョンが表示されますが、重複除去された変更行では挿入バージョンのみが表示されます。 ユース ケースに応じて、自分のニーズに合ったオプションを選択できます。 既定のオプションは false です。これは、完全な変更を意味します。 いいえ ブール型 netChanges
システム列を含める Snowflake の変更追跡を使用するときに、systemColumns オプションを使用して、Snowflake によって提供されるメタデータ ストリーム列を変更追跡の出力に含めるか除外するかを制御できます。 既定では、systemColumns は true に設定されています。つまり、メタデータ ストリーム列が含まれます。 除外する場合は、systemColumns を false に設定できます。 いいえ ブール型 systemColumns
最初から読み取りを開始する 増分抽出と変更追跡でこのオプションを設定すると、増分抽出が有効になっているパイプラインの最初の実行時にすべての行を読み取るよう ADF に指示します。 いいえ ブール型 skipInitialLoad

Snowflake のソース スクリプトの例

ソースの種類として Snowflake データセットを使用すると、関連付けられているデータ フロー スクリプトは次のようになります。

source(allowSchemaDrift: true,
	validateSchema: false,
	query: 'select * from MYTABLE',
	format: 'query') ~> SnowflakeSource

インライン データセットを使用する場合、関連付けられているデータ フロー スクリプトは次のようになります。

source(allowSchemaDrift: true,
	validateSchema: false,
	format: 'query',
	query: 'select * from MYTABLE',
	store: 'snowflake') ~> SnowflakeSource

ネイティブ変更追跡

Azure Data Factory で、変更追跡と呼ばれる Snowflake のネイティブ機能がサポートされるようになりました。これには、ログの形式での変更追跡が含まれます。 Snowflake のこの機能を使用すると、時間の経過に伴うデータの変更を追跡できるため、増分データの読み込みと監査に役立ちます。 この機能を利用するために、変更データ キャプチャを有効にして Snowflake の変更追跡を選択すると、ソース テーブルの Stream オブジェクトが作成され、ソースの Snowflake テーブルでの変更追跡が可能になります。 その後、クエリで CHANGES 句を使用して、ソース テーブルから新規の、または更新されたデータのみをフェッチします。 また、Snowflake のソース テーブルに設定されたデータ保持時間の間隔内に変更が消費されるようにパイプラインをスケジュールすることもお勧めします。そうしなければ、キャプチャされた変更の動作に一貫性がなくなる可能性があります。

シンク変換

次の表に、Snowflake シンクでサポートされるプロパティの一覧を示します。 これらのプロパティは、[設定] タブで編集できます。インライン データセットを使用する場合、「データセットのプロパティ」セクションで説明されているプロパティと同じ追加の設定が表示されます。 コネクタは、Snowflake 内部データ転送を利用します。

名前 説明 必須 使用できる値 データ フロー スクリプトのプロパティ
更新方法 対象となる Snowflake に対して許可される操作を指定します。
行を更新、アップサート、または削除するには、それらのアクションに対して行をタグ付けするために行の変更変換が必要になります。
はい true または false deletable
insertable
updateable
upsertable
[キー列] 更新、アップサート、削除の場合、1 つまたは複数のキー列を設定して、変更する行を決定する必要があります。 いいえ Array キー
テーブル アクション 書き込み前に変換先テーブルのすべての行を再作成するか削除するかを指定します。
- なし: テーブルに対してアクションは実行されません。
- Recreate:テーブルが削除され、再作成されます。 新しいテーブルを動的に作成する場合に必要です。
- Truncate:ターゲット テーブルのすべての行が削除されます。
いいえ true または false recreate
truncate

Snowflake シンク スクリプトの例

シンクの種類として Snowflake データセットを使用する際、関連付けられているデータ フロー スクリプトは次のようになります。

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	deletable:true,
	insertable:true,
	updateable:true,
	upsertable:false,
	keys:['movieId'],
	format: 'table',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

インライン データセットを使用する場合、関連付けられているデータ フロー スクリプトは次のようになります。

IncomingStream sink(allowSchemaDrift: true,
	validateSchema: false,
	format: 'table',
	tableName: 'table',
	schemaName: 'schema',
	deletable: true,
	insertable: true,
	updateable: true,
	upsertable: false,
	store: 'snowflake',
	skipDuplicateMapInputs: true,
	skipDuplicateMapOutputs: true) ~> SnowflakeSink

クエリ プッシュダウンの最適化

パイプライン ログ レベルを None に設定することで、中間変換メトリックの送信を除外して Spark 最適化に対する潜在的な障害を防ぎ、Snowflake によって提供されるクエリ プッシュダウンの最適化を有効にします。 このプッシュダウンの最適化により、大量のデータセットを含む大規模な Snowflake テーブルのパフォーマンスが大幅に向上します。

Note

一時テーブルはセッションに対してローカルであり、作成したユーザーが他のセッションにアクセスすることができなくなり、Snowflake によって通常のテーブルとして上書きされる傾向があるため、Snowflake ではサポートされていません。 Snowflake はグローバルにアクセス可能な、一時的なテーブルを代替として提供しますが、これには手動削除が必要になるため、ソース スキーマでの削除操作を回避するという一時テーブルの使用の主目的とは矛盾しています。

Lookup アクティビティのプロパティ

プロパティの詳細については、ルックアップ アクティビティに関する記事を参照してください。

Snowflake のリンク サービスをアップグレードする

Snowflake のリンク サービスをアップグレードするには、新しい Snowflake のリンク サービスを作成し、「リンク サービスのプロパティ」を参照して構成します。

コピー アクティビティによってソース、シンクとしてサポートされるデータ ストアの一覧については、サポートされるデータ ストアと形式の表を参照してください。