Azure Data Factory を使用して PostgreSQL からデータを移動する
Note
この記事は、Data Factory のバージョン 1 に適用されます。 現在のバージョンの Data Factory サービスを使用している場合は、V2 の PostgreSQL コネクタに関するページを参照してください。
この記事では、Azure Data Factory のコピー アクティビティを使って、オンプレミスの PostgreSQL データベースからデータを移動させる方法について説明します。 この記事は、コピー アクティビティによるデータ移動の一般的な概要について説明している、データ移動アクティビティに関する記事に基づいています。
オンプレミスの PostgreSQL データ ストアから、サポートされている任意のシンク データ ストアにデータをコピーできます。 コピー アクティビティによってシンクとしてサポートされているデータ ストアの一覧については、サポートされているデータ ストアに関するセクションをご覧ください。 Data Factory は、現時点では PostgreSQL データベースから他のデータ ストアへのデータ移動をサポートしていますが、他のデータ ストアから PostgreSQL データベースへのデータ移動に関してはサポートしていません。
前提条件
Data Factory のサービスでは、Data Management Gateway を使用したオンプレミスの PostgreSQL ソースへの接続をサポートします。 Data Management Gateway の詳細およびゲートウェイの設定手順については、「 オンプレミスの場所とクラウド間のデータ移動 」を参照してください。
PostgreSQL データベースが Azure IaaS VM でホストされている場合でも、ゲートウェイは必須です。 ゲートウェイはデータ ストアと同じ IaaS VM にインストールできるほか、ゲートウェイがデータベースに接続できれば別の VM にインストールしてもかまいません。
Note
接続/ゲートウェイに関する問題のトラブルシューティングのヒントについては、 ゲートウェイの問題のトラブルシューティング に関するセクションをご覧ください。
サポートされているバージョンとインストール
Data Management Gateway で PostgreSQL Databases に接続するには、Data Management Gateway と同じシステムに 2.0.12 ~ 3.1.9 バージョンの PostgreSQL の Ngpsql データ プロバイダーをインストールしてください。 PostgreSQL バージョン 7.4 以降がサポートされています。
作業の開始
さまざまなツールまたは API を使用して、オンプレミスの PostgreSQL データ ストアからデータを移動するコピー アクティビティを含むパイプラインを作成できます。
- パイプラインを作成する最も簡単な方法は、コピー ウィザードを使うことです。 データのコピー ウィザードを使用してパイプラインを作成する簡単な手順については、「 チュートリアル: コピー ウィザードを使用してパイプラインを作成する 」をご覧ください。
- また、次のツールを使用してパイプラインを作成することもできます。
Visual Studio
Azure PowerShell
Azure Resource Manager テンプレート
.NET API
REST API
コピー アクティビティを含むパイプラインを作成するための詳細な手順については、コピー アクティビティのチュートリアルをご覧ください。
ツールと API のいずれを使用する場合も、次の手順を実行して、ソース データ ストアからシンク データ ストアにデータを移動するパイプラインを作成します。
- リンクされたサービスを作成し、入力データ ストアと出力データ ストアをデータ ファクトリにリンクします。
- コピー操作用の入力データと出力データを表すデータセットを作成します。
- 入力としてのデータセットと出力としてのデータセットを受け取るコピー アクティビティを含むパイプラインを作成します。
ウィザードを使用すると、Data Factory エンティティ (リンクされたサービス、データセット、パイプライン) に関する JSON の定義が自動的に作成されます。 (.NET API を除く) ツールまたは API を使う場合は、JSON 形式でこれらの Data Factory エンティティを定義します。 オンプレミスの PostgreSQL データ ストアからデータをコピーするために使用する Data Factory エンティティに関する JSON 定義のサンプルについては、この記事のセクション、「JSON の使用例: PostgreSQL から Azure BLOB へのデータのコピー」をご覧ください。
次のセクションでは、PostgreSQL データ ストアに固有の Data Factory エンティティの定義に使用される JSON プロパティについて詳しく説明します。
リンクされたサービスのプロパティ
次の表は、PostgreSQL のリンクされたサービスに固有の JSON 要素の説明をまとめたものです。
プロパティ | 説明 | 必須 |
---|---|---|
type | type プロパティを OnPremisesPostgreSql | はい |
server | PostgreSQL サーバーの名前です。 | はい |
database | PostgreSQL データベースの名前です。 | はい |
schema | データベース内のスキーマの名前です。 スキーマ名は、大文字と小文字が区別されます。 | いいえ |
authenticationType | PostgreSQL データベースへの接続に使用される認証の種類です。 次のいずれかの値になります。Anonymous、Basic、および Windows です。 | はい |
username | Basic または Windows 認証を使用している場合は、ユーザー名を指定します。 | いいえ |
password | ユーザー名に指定したユーザー アカウントのパスワードを指定します。 | いいえ |
gatewayName | Data Factory サービスが、オンプレミスの PostgreSQL データベースへの接続に使用するゲートウェイの名前です。 | はい |
データセットのプロパティ
データセットの定義に利用できるセクションと&プロパティの完全な一覧については、データセットの作成に関する記事をご覧ください。 データセット JSON の構造、可用性、ポリシーなどのセクションは、データセットのすべての型でほぼ同じです。
typeProperties セクションはデータセット型ごとに異なり、データ ストアのデータの場所などに関する情報を提供します。 RelationalTable 型のデータセット (PostgreSQL データセットを含む) の typeProperties セクションには次のプロパティがあります。
プロパティ | 説明 | 必須 |
---|---|---|
tableName | リンクされたサービスが参照する PostgreSQL Databases インスタンスのテーブルの名前です。 tableName は、大文字と小文字が区別されます。 | いいえ (RelationalSource の クエリ が指定されている場合) |
コピー アクティビティのプロパティ
アクティビティの定義に利用できるセクションと&プロパティの完全な一覧については、パイプラインの作成に関する記事を参照してください。 名前、説明、入力テーブル、出力テーブル、ポリシーなどのプロパティは、あらゆる種類のアクティビティで使用できます。
一方、アクティビティの typeProperties セクションで使用できるプロパティは、各アクティビティの種類によって異なります。 コピー アクティビティの場合、ソースとシンクの種類によって異なります。
source の種類が RelationalSource (PostgreSQL を含む) である場合は、typeProperties セクションで次のプロパティを使用できます。
プロパティ | 説明 | 使用できる値 | 必須 |
---|---|---|---|
query | カスタム クエリを使用してデータを読み取ります。 | SQL クエリ文字列。 (例: "query": "select * from \"MySchema\".\"MyTable\"" )。 |
いいえ (データセットの tableName が指定されている場合) |
Note
スキーマ名とテーブル名は、大文字と小文字が区別されます。 クエリ内では、これらを ""
(二重引用符) で囲んでください。
例:
"query": "select * from \"MySchema\".\"MyTable\""
JSON の使用例: PostgreSQL から Azure BLOB へのデータのコピー
次の例は、Visual Studio または Azure PowerShell を使用してパイプラインを作成する際に使用できるサンプルの JSON 定義です。 これらの例は、PostgreSQL データベースから Azure BLOB ストレージにデータをコピーする方法を示しています。 ただし、Azure Data Factory のコピー アクティビティを使用して、 こちら に記載されているシンクのいずれかにデータをコピーすることができます。
重要
このサンプルでは、JSON のスニペットを使用します。 データ ファクトリを作成する手順は含まれてません。 手順については、記事「 Data Management Gateway を使用してオンプレミスのソースとクラウドの間でデータを移動する 」を参照してください。
このサンプルでは、次の Data Factory のエンティティがあります。
- OnPremisesPostgreSql型のリンクされたサービス。
- AzureStorage型のリンクされたサービス。
- RelationalTable 型の入力データセット。
- AzureBlob 型の出力データセット。
- RelationalSource と BlobSink を使用するコピー アクティビティを含むパイプライン。
このサンプルは PostgreSQL データベースのクエリ結果のデータを BLOB に 1 時間ごとにコピーします。 これらのサンプルで使用される JSON プロパティの説明はサンプルに続くセクションにあります。
最初の手順として、データ管理ゲートウェイを設定します。 設定手順は、 オンプレミスの場所とクラウドの間でのデータ移動 に関する記事に記載されています。
PostgreSQL のリンクされたサービス:
{
"name": "OnPremPostgreSqlLinkedService",
"properties": {
"type": "OnPremisesPostgreSql",
"typeProperties": {
"server": "<server>",
"database": "<database>",
"schema": "<schema>",
"authenticationType": "<authentication type>",
"username": "<username>",
"password": "<password>",
"gatewayName": "<gatewayName>"
}
}
}
Azure BLOB ストレージのリンクされたサービス:
{
"name": "AzureStorageLinkedService",
"properties": {
"type": "AzureStorage",
"typeProperties": {
"connectionString": "DefaultEndpointsProtocol=https;AccountName=<AccountName>;AccountKey=<AccountKey>"
}
}
}
PostgreSQL の入力データセット:
このサンプルでは、PostgreSQL で「MyTable」という名前のテーブルを作成し、時系列データ用に「timestamp」という名前の列が含まれているものと想定しています。
"external": true
の設定により、このデータセットがデータ ファクトリの外部にあり、データ ファクトリのアクティビティによって生成されたものではないことが Data Factory サービスに通知されます。
{
"name": "PostgreSqlDataSet",
"properties": {
"type": "RelationalTable",
"linkedServiceName": "OnPremPostgreSqlLinkedService",
"typeProperties": {},
"availability": {
"frequency": "Hour",
"interval": 1
},
"external": true,
"policy": {
"externalData": {
"retryInterval": "00:01:00",
"retryTimeout": "00:10:00",
"maximumRetry": 3
}
}
}
}
Azure BLOB の出力データセット:
データは新しい BLOB に 1 時間おきに書き込まれます (頻度: 時間、間隔: 1)。 BLOB のフォルダー パスとファイル名は、処理中のスライスの開始時間に基づき、動的に評価されます。 フォルダー パスは開始時間の年、月、日、時刻の部分を使用します。
{
"name": "AzureBlobPostgreSqlDataSet",
"properties": {
"type": "AzureBlob",
"linkedServiceName": "AzureStorageLinkedService",
"typeProperties": {
"folderPath": "mycontainer/postgresql/yearno={Year}/monthno={Month}/dayno={Day}/hourno={Hour}",
"format": {
"type": "TextFormat",
"rowDelimiter": "\n",
"columnDelimiter": "\t"
},
"partitionedBy": [
{
"name": "Year",
"value": {
"type": "DateTime",
"date": "SliceStart",
"format": "yyyy"
}
},
{
"name": "Month",
"value": {
"type": "DateTime",
"date": "SliceStart",
"format": "MM"
}
},
{
"name": "Day",
"value": {
"type": "DateTime",
"date": "SliceStart",
"format": "dd"
}
},
{
"name": "Hour",
"value": {
"type": "DateTime",
"date": "SliceStart",
"format": "HH"
}
}
]
},
"availability": {
"frequency": "Hour",
"interval": 1
}
}
}
コピー アクティビティのあるパイプライン:
パイプラインには、入力データセットと出力データセットを使用するように構成され、1 時間おきに実行するようにスケジュールされているコピー アクティビティが含まれています。 パイプライン JSON 定義で、source 型が RelationalSource に設定され、sink 型が BlobSink に設定されています。 query プロパティに指定された SQL クエリは、PostgreSQL データベースの public.usstates テーブルからデータを選択します。
{
"name": "CopyPostgreSqlToBlob",
"properties": {
"description": "pipeline for copy activity",
"activities": [
{
"type": "Copy",
"typeProperties": {
"source": {
"type": "RelationalSource",
"query": "select * from \"public\".\"usstates\""
},
"sink": {
"type": "BlobSink"
}
},
"inputs": [
{
"name": "PostgreSqlDataSet"
}
],
"outputs": [
{
"name": "AzureBlobPostgreSqlDataSet"
}
],
"policy": {
"timeout": "01:00:00",
"concurrency": 1
},
"scheduler": {
"frequency": "Hour",
"interval": 1
},
"name": "PostgreSqlToBlob"
}
],
"start": "2014-06-01T18:00:00Z",
"end": "2014-06-01T19:00:00Z"
}
}
PostgreSQL の型マッピング
データ移動アクティビティ に関する記事のとおり、コピー アクティビティは次の 2 段階のアプローチで型を source から sink に自動的に変換します。
- ネイティブの source 型から .NET 型に変換する
- .NET 型からネイティブの sink 型に変換する
PostgreSQL にデータを移動する場合、PostgreSQL 型から .NET 型に対する次のマッピングが使用されます。
PostgreSQL Databases 型 | PostgreSQL エイリアス | .NET Framework 型 |
---|---|---|
abstime | Datetime | |
bigint | int8 | Int64 |
bigserial | serial8 | Int64 |
bit [(n)] | Byte[], String | |
bit varying [ (n) ] | varbit | Byte[], String |
boolean | [bool] | Boolean |
box | Byte[], String | |
bytea | Byte[], String | |
character [(n)] | char [(n)] | String |
character varying [(n)] | varchar [(n)] | String |
cid | String | |
cidr | String | |
circle | Byte[], String | |
date | Datetime | |
daterange | String | |
double precision | float8 | Double |
inet | Byte[], String | |
intarry | String | |
int4range | String | |
int8range | String | |
整数 (integer) | int, int4 | Int32 |
interval [fields] [(p)] | Timespan | |
json | String | |
jsonb | Byte[] | |
line | Byte[], String | |
lseg | Byte[], String | |
macaddr | Byte[], String | |
money | Decimal | |
numeric [(p, s)] | decimal [(p, s)] | Decimal |
numrange | String | |
oid | Int32 | |
path | Byte[], String | |
pg_lsn | Int64 | |
point | Byte[], String | |
polygon | Byte[], String | |
real | float4 | Single |
smallint | int2 | Int16 |
smallserial | serial2 | Int16 |
serial | serial4 | Int32 |
text | String |
ソース列からシンク列へのマップ
ソース データセット列のシンク データセット列へのマッピングの詳細については、Azure Data Factory のデータセット列のマッピングに関するページをご覧ください。
リレーショナル ソースからの反復可能読み取り
リレーショナル データ ストアからデータをコピーする場合は、意図しない結果を避けるため、再現性に注意する必要があります。 Azure Data Factory では、スライスを手動で再実行できます。 障害が発生したときにスライスを再実行できるように、データセットの再試行ポリシーを構成することもできます。 いずれかの方法でスライスが再実行された際は、何度スライスが実行されても同じデータが読み込まれることを確認する必要があります。 リレーショナル ソースからの反復可能読み取りに関するページをご覧ください。
パフォーマンスとチューニング
Azure Data Factory でのデータ移動 (コピー アクティビティ) のパフォーマンスに影響する主な要因と、パフォーマンスを最適化するための各種方法については、「コピー アクティビティのパフォーマンスと&チューニングに関するガイド」を参照してください。