Azure Data Factory を使用して PostgreSQL からデータを移動する

Note

この記事は、Data Factory のバージョン 1 に適用されます。 現在のバージョンの Data Factory サービスを使用している場合は、V2 の PostgreSQL コネクタに関するページを参照してください。

この記事では、Azure Data Factory のコピー アクティビティを使って、オンプレミスの PostgreSQL データベースからデータを移動させる方法について説明します。 この記事は、コピー アクティビティによるデータ移動の一般的な概要について説明している、データ移動アクティビティに関する記事に基づいています。

オンプレミスの PostgreSQL データ ストアから、サポートされている任意のシンク データ ストアにデータをコピーできます。 コピー アクティビティによってシンクとしてサポートされているデータ ストアの一覧については、サポートされているデータ ストアに関するセクションをご覧ください。 Data Factory は、現時点では PostgreSQL データベースから他のデータ ストアへのデータ移動をサポートしていますが、他のデータ ストアから PostgreSQL データベースへのデータ移動に関してはサポートしていません。

前提条件

Data Factory のサービスでは、Data Management Gateway を使用したオンプレミスの PostgreSQL ソースへの接続をサポートします。 Data Management Gateway の詳細およびゲートウェイの設定手順については、「 オンプレミスの場所とクラウド間のデータ移動 」を参照してください。

PostgreSQL データベースが Azure IaaS VM でホストされている場合でも、ゲートウェイは必須です。 ゲートウェイはデータ ストアと同じ IaaS VM にインストールできるほか、ゲートウェイがデータベースに接続できれば別の VM にインストールしてもかまいません。

Note

接続/ゲートウェイに関する問題のトラブルシューティングのヒントについては、 ゲートウェイの問題のトラブルシューティング に関するセクションをご覧ください。

サポートされているバージョンとインストール

Data Management Gateway で PostgreSQL Databases に接続するには、Data Management Gateway と同じシステムに 2.0.12 ~ 3.1.9 バージョンの PostgreSQL の Ngpsql データ プロバイダーをインストールしてください。 PostgreSQL バージョン 7.4 以降がサポートされています。

作業の開始

さまざまなツールまたは API を使用して、オンプレミスの PostgreSQL データ ストアからデータを移動するコピー アクティビティを含むパイプラインを作成できます。

ツールと API のいずれを使用する場合も、次の手順を実行して、ソース データ ストアからシンク データ ストアにデータを移動するパイプラインを作成します。

  1. リンクされたサービスを作成し、入力データ ストアと出力データ ストアをデータ ファクトリにリンクします。
  2. コピー操作用の入力データと出力データを表すデータセットを作成します。
  3. 入力としてのデータセットと出力としてのデータセットを受け取るコピー アクティビティを含むパイプラインを作成します。

ウィザードを使用すると、Data Factory エンティティ (リンクされたサービス、データセット、パイプライン) に関する JSON の定義が自動的に作成されます。 (.NET API を除く) ツールまたは API を使う場合は、JSON 形式でこれらの Data Factory エンティティを定義します。 オンプレミスの PostgreSQL データ ストアからデータをコピーするために使用する Data Factory エンティティに関する JSON 定義のサンプルについては、この記事のセクション、「JSON の使用例: PostgreSQL から Azure BLOB へのデータのコピー」をご覧ください。

次のセクションでは、PostgreSQL データ ストアに固有の Data Factory エンティティの定義に使用される JSON プロパティについて詳しく説明します。

リンクされたサービスのプロパティ

次の表は、PostgreSQL のリンクされたサービスに固有の JSON 要素の説明をまとめたものです。

プロパティ 説明 必須
type type プロパティを OnPremisesPostgreSql はい
server PostgreSQL サーバーの名前です。 はい
database PostgreSQL データベースの名前です。 はい
schema データベース内のスキーマの名前です。 スキーマ名は、大文字と小文字が区別されます。 いいえ
authenticationType PostgreSQL データベースへの接続に使用される認証の種類です。 次のいずれかの値になります。Anonymous、Basic、および Windows です。 はい
username Basic または Windows 認証を使用している場合は、ユーザー名を指定します。 いいえ
password ユーザー名に指定したユーザー アカウントのパスワードを指定します。 いいえ
gatewayName Data Factory サービスが、オンプレミスの PostgreSQL データベースへの接続に使用するゲートウェイの名前です。 はい

データセットのプロパティ

データセットの定義に利用できるセクションと&プロパティの完全な一覧については、データセットの作成に関する記事をご覧ください。 データセット JSON の構造、可用性、ポリシーなどのセクションは、データセットのすべての型でほぼ同じです。

typeProperties セクションはデータセット型ごとに異なり、データ ストアのデータの場所などに関する情報を提供します。 RelationalTable 型のデータセット (PostgreSQL データセットを含む) の typeProperties セクションには次のプロパティがあります。

プロパティ 説明 必須
tableName リンクされたサービスが参照する PostgreSQL Databases インスタンスのテーブルの名前です。 tableName は、大文字と小文字が区別されます。 いいえ (RelationalSourceクエリ が指定されている場合)

コピー アクティビティのプロパティ

アクティビティの定義に利用できるセクションと&プロパティの完全な一覧については、パイプラインの作成に関する記事を参照してください。 名前、説明、入力テーブル、出力テーブル、ポリシーなどのプロパティは、あらゆる種類のアクティビティで使用できます。

一方、アクティビティの typeProperties セクションで使用できるプロパティは、各アクティビティの種類によって異なります。 コピー アクティビティの場合、ソースとシンクの種類によって異なります。

source の種類が RelationalSource (PostgreSQL を含む) である場合は、typeProperties セクションで次のプロパティを使用できます。

プロパティ 説明 使用できる値 必須
query カスタム クエリを使用してデータを読み取ります。 SQL クエリ文字列。 (例: "query": "select * from \"MySchema\".\"MyTable\"")。 いいえ (データセットtableName が指定されている場合)

Note

スキーマ名とテーブル名は、大文字と小文字が区別されます。 クエリ内では、これらを "" (二重引用符) で囲んでください。

例:

"query": "select * from \"MySchema\".\"MyTable\""

JSON の使用例: PostgreSQL から Azure BLOB へのデータのコピー

次の例は、Visual Studio または Azure PowerShell を使用してパイプラインを作成する際に使用できるサンプルの JSON 定義です。 これらの例は、PostgreSQL データベースから Azure BLOB ストレージにデータをコピーする方法を示しています。 ただし、Azure Data Factory のコピー アクティビティを使用して、 こちら に記載されているシンクのいずれかにデータをコピーすることができます。

重要

このサンプルでは、JSON のスニペットを使用します。 データ ファクトリを作成する手順は含まれてません。 手順については、記事「 Data Management Gateway を使用してオンプレミスのソースとクラウドの間でデータを移動する 」を参照してください。

このサンプルでは、次の Data Factory のエンティティがあります。

  1. OnPremisesPostgreSql型のリンクされたサービス。
  2. AzureStorage型のリンクされたサービス。
  3. RelationalTable 型の入力データセット
  4. AzureBlob 型の出力データセット
  5. RelationalSourceBlobSink を使用するコピー アクティビティを含むパイプライン

このサンプルは PostgreSQL データベースのクエリ結果のデータを BLOB に 1 時間ごとにコピーします。 これらのサンプルで使用される JSON プロパティの説明はサンプルに続くセクションにあります。

最初の手順として、データ管理ゲートウェイを設定します。 設定手順は、 オンプレミスの場所とクラウドの間でのデータ移動 に関する記事に記載されています。

PostgreSQL のリンクされたサービス:

{
    "name": "OnPremPostgreSqlLinkedService",
    "properties": {
        "type": "OnPremisesPostgreSql",
        "typeProperties": {
            "server": "<server>",
            "database": "<database>",
            "schema": "<schema>",
            "authenticationType": "<authentication type>",
            "username": "<username>",
            "password": "<password>",
            "gatewayName": "<gatewayName>"
        }
    }
}

Azure BLOB ストレージのリンクされたサービス:

{
    "name": "AzureStorageLinkedService",
    "properties": {
        "type": "AzureStorage",
        "typeProperties": {
            "connectionString": "DefaultEndpointsProtocol=https;AccountName=<AccountName>;AccountKey=<AccountKey>"
        }
    }
}

PostgreSQL の入力データセット:

このサンプルでは、PostgreSQL で「MyTable」という名前のテーブルを作成し、時系列データ用に「timestamp」という名前の列が含まれているものと想定しています。

"external": true の設定により、このデータセットがデータ ファクトリの外部にあり、データ ファクトリのアクティビティによって生成されたものではないことが Data Factory サービスに通知されます。

{
    "name": "PostgreSqlDataSet",
    "properties": {
        "type": "RelationalTable",
        "linkedServiceName": "OnPremPostgreSqlLinkedService",
        "typeProperties": {},
        "availability": {
            "frequency": "Hour",
            "interval": 1
        },
        "external": true,
        "policy": {
            "externalData": {
                "retryInterval": "00:01:00",
                "retryTimeout": "00:10:00",
                "maximumRetry": 3
            }
        }
    }
}

Azure BLOB の出力データセット:

データは新しい BLOB に 1 時間おきに書き込まれます (頻度: 時間、間隔: 1)。 BLOB のフォルダー パスとファイル名は、処理中のスライスの開始時間に基づき、動的に評価されます。 フォルダー パスは開始時間の年、月、日、時刻の部分を使用します。

{
    "name": "AzureBlobPostgreSqlDataSet",
    "properties": {
        "type": "AzureBlob",
        "linkedServiceName": "AzureStorageLinkedService",
        "typeProperties": {
            "folderPath": "mycontainer/postgresql/yearno={Year}/monthno={Month}/dayno={Day}/hourno={Hour}",
            "format": {
                "type": "TextFormat",
                "rowDelimiter": "\n",
                "columnDelimiter": "\t"
            },
            "partitionedBy": [
                {
                    "name": "Year",
                    "value": {
                        "type": "DateTime",
                        "date": "SliceStart",
                        "format": "yyyy"
                    }
                },
                {
                    "name": "Month",
                    "value": {
                        "type": "DateTime",
                        "date": "SliceStart",
                        "format": "MM"
                    }
                },
                {
                    "name": "Day",
                    "value": {
                        "type": "DateTime",
                        "date": "SliceStart",
                        "format": "dd"
                    }
                },
                {
                    "name": "Hour",
                    "value": {
                        "type": "DateTime",
                        "date": "SliceStart",
                        "format": "HH"
                    }
                }
            ]
        },
        "availability": {
            "frequency": "Hour",
            "interval": 1
        }
    }
}

コピー アクティビティのあるパイプライン:

パイプラインには、入力データセットと出力データセットを使用するように構成され、1 時間おきに実行するようにスケジュールされているコピー アクティビティが含まれています。 パイプライン JSON 定義で、source 型が RelationalSource に設定され、sink 型が BlobSink に設定されています。 query プロパティに指定された SQL クエリは、PostgreSQL データベースの public.usstates テーブルからデータを選択します。

{
    "name": "CopyPostgreSqlToBlob",
    "properties": {
        "description": "pipeline for copy activity",
        "activities": [
            {
                "type": "Copy",
                "typeProperties": {
                    "source": {
                        "type": "RelationalSource",
                        "query": "select * from \"public\".\"usstates\""
                    },
                    "sink": {
                        "type": "BlobSink"
                    }
                },
                "inputs": [
                    {
                        "name": "PostgreSqlDataSet"
                    }
                ],
                "outputs": [
                    {
                        "name": "AzureBlobPostgreSqlDataSet"
                    }
                ],
                "policy": {
                    "timeout": "01:00:00",
                    "concurrency": 1
                },
                "scheduler": {
                    "frequency": "Hour",
                    "interval": 1
                },
                "name": "PostgreSqlToBlob"
            }
        ],
        "start": "2014-06-01T18:00:00Z",
        "end": "2014-06-01T19:00:00Z"
    }
}

PostgreSQL の型マッピング

データ移動アクティビティ に関する記事のとおり、コピー アクティビティは次の 2 段階のアプローチで型を source から sink に自動的に変換します。

  1. ネイティブの source 型から .NET 型に変換する
  2. .NET 型からネイティブの sink 型に変換する

PostgreSQL にデータを移動する場合、PostgreSQL 型から .NET 型に対する次のマッピングが使用されます。

PostgreSQL Databases 型 PostgreSQL エイリアス .NET Framework 型
abstime Datetime
bigint int8 Int64
bigserial serial8 Int64
bit [(n)] Byte[], String
bit varying [ (n) ] varbit Byte[], String
boolean [bool] Boolean
box Byte[], String
bytea Byte[], String
character [(n)] char [(n)] String
character varying [(n)] varchar [(n)] String
cid String
cidr String
circle Byte[], String
date Datetime
daterange String
double precision float8 Double
inet Byte[], String
intarry String
int4range String
int8range String
整数 (integer) int, int4 Int32
interval [fields] [(p)] Timespan
json String
jsonb Byte[]
line Byte[], String
lseg Byte[], String
macaddr Byte[], String
money Decimal
numeric [(p, s)] decimal [(p, s)] Decimal
numrange String
oid Int32
path Byte[], String
pg_lsn Int64
point Byte[], String
polygon Byte[], String
real float4 Single
smallint int2 Int16
smallserial serial2 Int16
serial serial4 Int32
text String

ソース列からシンク列へのマップ

ソース データセット列のシンク データセット列へのマッピングの詳細については、Azure Data Factory のデータセット列のマッピングに関するページをご覧ください。

リレーショナル ソースからの反復可能読み取り

リレーショナル データ ストアからデータをコピーする場合は、意図しない結果を避けるため、再現性に注意する必要があります。 Azure Data Factory では、スライスを手動で再実行できます。 障害が発生したときにスライスを再実行できるように、データセットの再試行ポリシーを構成することもできます。 いずれかの方法でスライスが再実行された際は、何度スライスが実行されても同じデータが読み込まれることを確認する必要があります。 リレーショナル ソースからの反復可能読み取りに関するページをご覧ください。

パフォーマンスとチューニング

Azure Data Factory でのデータ移動 (コピー アクティビティ) のパフォーマンスに影響する主な要因と、パフォーマンスを最適化するための各種方法については、「コピー アクティビティのパフォーマンスと&チューニングに関するガイド」を参照してください。