チュートリアル: Azure Cosmos DB for PostgreSQL を使用したリアルタイム分析ダッシュボードの設計

適用対象: Azure Cosmos DB for PostgreSQL (PostgreSQL の Citus データベース拡張機能を利用)

このチュートリアルでは、Azure Cosmos DB for PostgreSQL を使用して、次の方法を学習します。

  • クラスターの作成
  • psql ユーティリティを使用してスキーマを作成する
  • ノード全体でテーブルをシャード化する
  • サンプル データを作成する
  • ロールアップを実行する
  • 生データと集計データのクエリを実行する
  • データを期限切れにする

前提条件

Azure サブスクリプションをお持ちでない場合は、開始する前に無料アカウントを作成してください。

クラスターの作成

Azure portal にサインインし、次の手順を実行して、Azure Cosmos DB for PostgreSQL クラスターを作成します。

Azure portal の [Azure Cosmos DB for PostgreSQL クラスターの作成] に移動します。

[Azure Cosmos DB for PostgreSQL クラスターの作成] フォームで、次を実行します。

  1. [基本] タブに情報を入力します。

    [作成] 画面の [基本] タブを示すスクリーンショット。

    ほとんどのオプションは文字どおりのわかりやすいものですが、次の点に注意してください。

    • <node-qualifier>-<clustername>.<uniqueID>.postgres.cosmos.azure.com という形式で、クラスター名によって、アプリケーションで接続に使用する DNS 名が決まります。
    • 15 などの主要な PostgreSQL バージョンを選択できます。 Azure Cosmos DB for PostgreSQL では、選択したメジャー Postgres バージョンの最新の Citus バージョンが常にサポートされます。
    • 管理者ユーザー名は、citus という値にする必要があります。
    • データベース名は既定値 'citus' のままにするか、データベース名のみを定義できます。 クラスターのプロビジョニング後にデータベースの名前を変更することはできません。
  2. 画面の下部にある [次へ: ネットワーク] を選択します。

  3. [ネットワーク] 画面で、[Azure 内の Azure サービスおよびリソースにこのクラスターへのパブリック アクセスを許可する] を選択します。

    [作成] 画面の [ネットワーク] タブを示すスクリーンショット。

  4. [確認および作成] を選択し、検証で問題がなければ、[作成] を選択してクラスターを作成します。

  5. プロビジョニングには数分かかります。 デプロイを監視するために、ページがリダイレクトされます。 状態が [デプロイが進行中です] から [デプロイが完了しました] に変わったら、[リソースに移動] を選択します。

psql ユーティリティを使用してスキーマを作成する

psql を使用して Azure Cosmos DB for PostgreSQL に接続すると、いくつかの基本的なタスクを完了することができます。 このチュートリアルでは、Web 分析からトラフィック データを取り込んでから、データをロールアップし、そのデータに基づいてリアルタイムのダッシュボードを提供する方法について説明します。

生の Web トラフィック データをすべて使用するテーブルを作成しましょう。 psql ターミナルで次のコマンドを実行します。

CREATE TABLE http_request (
  site_id INT,
  ingest_time TIMESTAMPTZ DEFAULT now(),

  url TEXT,
  request_country TEXT,
  ip_address TEXT,

  status_code INT,
  response_time_msec INT
);

また、1 分あたりの集計を保持するテーブルと、前回のロールアップの位置を維持するテーブルを作成します。 psql で次のコマンドも実行します。

CREATE TABLE http_request_1min (
  site_id INT,
  ingest_time TIMESTAMPTZ, -- which minute this row represents

  error_count INT,
  success_count INT,
  request_count INT,
  average_response_time_msec INT,
  CHECK (request_count = error_count + success_count),
  CHECK (ingest_time = date_trunc('minute', ingest_time))
);

CREATE INDEX http_request_1min_idx ON http_request_1min (site_id, ingest_time);

CREATE TABLE latest_rollup (
  minute timestamptz PRIMARY KEY,

  CHECK (minute = date_trunc('minute', minute))
);

この psql コマンドを使用すると、テーブルの一覧に新しく作成されたテーブルが表示されます。

\dt

ノード全体でテーブルをシャード化する

Azure Cosmos DB for PostgreSQL のデプロイでは、テーブルの行はユーザー指定の列の値に基づいて異なるノードに保存されます。 この "ディストリビューション列" には、ノード全体でデータがどのようにシャード化されているかが示されます。

ディストリビューション列がシャード キーである site_id になるように設定しましょう。 psql で、次の関数を実行します。

SELECT create_distributed_table('http_request',      'site_id');
SELECT create_distributed_table('http_request_1min', 'site_id');

重要

Azure Cosmos DB for PostgreSQL パフォーマンス機能を利用するには、テーブルの分散が必要です。 テーブルを分散しない場合は、ワーカー ノードでこれらのテーブルに関連するクエリを実行することができません。

サンプル データを作成する

これで、クラスターでデータを取り込む準備が整いました。 psql 接続からローカルで以下を実行して、継続的にデータを挿入することができます。

DO $$
  BEGIN LOOP
    INSERT INTO http_request (
      site_id, ingest_time, url, request_country,
      ip_address, status_code, response_time_msec
    ) VALUES (
      trunc(random()*32), clock_timestamp(),
      concat('http://example.com/', md5(random()::text)),
      ('{China,India,USA,Indonesia}'::text[])[ceil(random()*4)],
      concat(
        trunc(random()*250 + 2), '.',
        trunc(random()*250 + 2), '.',
        trunc(random()*250 + 2), '.',
        trunc(random()*250 + 2)
      )::inet,
      ('{200,404}'::int[])[ceil(random()*2)],
      5+trunc(random()*150)
    );
    COMMIT;
    PERFORM pg_sleep(random() * 0.25);
  END LOOP;
END $$;

クエリは、毎秒約 8 行を挿入します。 行は、ディストリビューション列 site_id の指示に従って、さまざまなワーカー ノードに格納されます。

注意

データ生成クエリを実行したままにして、このチュートリアルのその他のコマンド用に 2 つ目の psql 接続を開きます。

クエリ

Azure Cosmos DB for PostgreSQL を使用すると、複数のノードでクエリを並列処理して速度を上げることができます。 たとえば、データベースではワーカー ノード上で SUM や COUNT などの集計が計算され、その結果が最終的な回答にまとめられます。

1 分あたりの Web 要求といくつかの統計情報をカウントするクエリを次に示します。 psql で実行して結果を確認してみてください。

SELECT
  site_id,
  date_trunc('minute', ingest_time) as minute,
  COUNT(1) AS request_count,
  SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
  SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
  SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
FROM http_request
WHERE date_trunc('minute', ingest_time) > now() - '5 minutes'::interval
GROUP BY site_id, minute
ORDER BY minute ASC;

データのロールアップ

上記のクエリは早い段階では問題なく動作しますが、データの規模が大きくなるにつれてパフォーマンスは低下します。 分散処理でも、繰り返し再計算するよりも、このデータを事前に計算する方が高速です。

生データを集計テーブルに定期的にロールアップすることで、ダッシュボードを確実に高速に維持させることができます。 集計期間で実験できます。 分単位の集計テーブルを使用しましたが、代わりにデータを 5、15、または 60 分に分割できます。

このロールアップをより簡単に実行するために、これを plpgsql 関数で処理します。 psql でこれらのコマンドを実行して rollup_http_request 関数を作成します。

-- initialize to a time long ago
INSERT INTO latest_rollup VALUES ('10-10-1901');

-- function to do the rollup
CREATE OR REPLACE FUNCTION rollup_http_request() RETURNS void AS $$
DECLARE
  curr_rollup_time timestamptz := date_trunc('minute', now());
  last_rollup_time timestamptz := minute from latest_rollup;
BEGIN
  INSERT INTO http_request_1min (
    site_id, ingest_time, request_count,
    success_count, error_count, average_response_time_msec
  ) SELECT
    site_id,
    date_trunc('minute', ingest_time),
    COUNT(1) as request_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 1 ELSE 0 END) as success_count,
    SUM(CASE WHEN (status_code between 200 and 299) THEN 0 ELSE 1 END) as error_count,
    SUM(response_time_msec) / COUNT(1) AS average_response_time_msec
  FROM http_request
  -- roll up only data new since last_rollup_time
  WHERE date_trunc('minute', ingest_time) <@
          tstzrange(last_rollup_time, curr_rollup_time, '(]')
  GROUP BY 1, 2;

  -- update the value in latest_rollup so that next time we run the
  -- rollup it will operate on data newer than curr_rollup_time
  UPDATE latest_rollup SET minute = curr_rollup_time;
END;
$$ LANGUAGE plpgsql;

関数を用意できたら、それを実行してデータをロールアップします。

SELECT rollup_http_request();

また、事前に集計されたフォームのデータを使用して、ロールアップ テーブルのクエリを実行し、前と同じレポートを取得することもできます。 次のクエリを実行します。

SELECT site_id, ingest_time as minute, request_count,
       success_count, error_count, average_response_time_msec
  FROM http_request_1min
 WHERE ingest_time > date_trunc('minute', now()) - '5 minutes'::interval;

古いデータを期限切れにする

ロールアップによってクエリは高速になりますが、無制限のストレージ コストを回避するために古いデータを期限切れにする必要があります。 細分性ごとにデータを保持する期間を決定し、標準のクエリを使用して期限切れのデータを削除します。 次の例では、生データは 1 日、分ごとの集計は 1 か月間保持することにしました。

DELETE FROM http_request WHERE ingest_time < now() - interval '1 day';
DELETE FROM http_request_1min WHERE ingest_time < now() - interval '1 month';

運用環境では、これらのクエリを関数にラップして cron ジョブで毎分呼び出すことができます。

リソースをクリーンアップする

前の手順では、クラスター内に Azure リソースを作成しました。 これらのリソースが将来不要であると思われる場合は、クラスターを削除します。 クラスターの [概要] ページで、[削除] ボタンを押します。 ポップアップ ページでメッセージが表示されたら、クラスターの名前を確認し、最後の [削除] ボタンをクリックします。

次の手順

このチュートリアルでは、クラスターのプロビジョニング方法を学習しました。 そのサーバー グループに psql で接続し、スキーマを作成して、データを分散しました。 生のフォームでデータのクエリを実行すること、そのデータを定期的に集計すること、集計されたテーブルのクエリを実行すること、古いデータを期限切れにすることを学習しました。