Nawiązywanie połączenia i uruchamianie poleceń SQL w usłudze Azure Cosmos DB for PostgreSQL za pomocą Node.js

DOTYCZY: Usługa Azure Cosmos DB for PostgreSQL (obsługiwana przez rozszerzenie bazy danych Citus do bazy danych PostgreSQL)

W tym przewodniku Szybki start pokazano, jak używać kodu Node.js do nawiązywania połączenia z klastrem, a następnie utworzyć tabelę przy użyciu instrukcji SQL. Następnie wstawisz, wykonasz zapytanie, zaktualizujesz i usuniesz dane w bazie danych. W krokach opisanych w tym artykule założono, że wiesz już, Node.js programowanie i zaczynasz pracę z usługą Azure Cosmos DB for PostgreSQL.

Instalowanie biblioteki PostgreSQL

Przykłady kodu w tym artykule wymagają biblioteki pg do interfejsu z serwerem PostgreSQL. Musisz zainstalować narzędzie pg za pomocą menedżera pakietów językowych (npm).

Łączenie, tworzenie tabeli i wstawianie danych

Tworzenie wspólnego modułu połączenia

Porada

Poniższy przykładowy kod używa puli połączeń do tworzenia połączeń z bazą danych PostgreSQL i zarządzania nimi. Buforowanie połączeń po stronie aplikacji jest zdecydowanie zalecane, ponieważ:

  • Gwarantuje to, że aplikacja nie generuje zbyt wielu połączeń z bazą danych, dlatego nie przekracza limitów połączeń.
  • Może to pomóc znacząco zwiększyć wydajność — zarówno opóźnienie, jak i przepływność. Proces serwera PostgreSQL musi rozwidlić w celu obsługi każdego nowego połączenia i ponowne użycie połączenia pozwala uniknąć tego obciążenia.

Utwórz folder o nazwie db, a wewnątrz tego folderu utwórz plik citus.js zawierający następujący wspólny kod połączenia. W tym kodzie zastąp <klaster> nazwą klastra i <hasłem> administratora.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Tworzenie tabeli

Użyj poniższego kodu, aby nawiązać połączenie i załadować dane przy użyciu instrukcji CREATE TABLE i INSERT INTO języka SQL. Kod tworzy nową pharmacy tabelę i wstawia przykładowe dane.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Dystrybuowanie tabel

Usługa Azure Cosmos DB for PostgreSQL zapewnia super możliwości dystrybucji tabel między wieloma węzłami w celu zapewnienia skalowalności. Poniższe polecenie umożliwia dystrybucję tabeli. Więcej informacji na temat create_distributed_table i kolumny dystrybucji można znaleźć tutaj.

Uwaga

Dystrybucja tabel umożliwia ich rozwijanie między wszystkimi węzłami roboczymi dodanymi do klastra.

Użyj następującego kodu, aby nawiązać połączenie z bazą danych i rozpowszechnić tabelę.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Odczyt danych

Użyj poniższego kodu, aby nawiązać połączenie i odczytać dane za pomocą instrukcji SELECT języka SQL.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Aktualizowanie danych

Użyj poniższego kodu, aby nawiązać połączenie i zaktualizować dane za pomocą instrukcji UPDATE języka SQL.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Usuwanie danych

Użyj poniższego kodu, aby nawiązać połączenie i usunąć dane za pomocą instrukcji DELETE języka SQL.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

POLECENIE COPY na potrzeby szybkiego pozyskiwania

Polecenie COPY może przynieść ogromną przepływność podczas pozyskiwania danych do usługi Azure Cosmos DB for PostgreSQL. Polecenie COPY może pozyskiwać dane w plikach lub z mikrosadów danych w pamięci na potrzeby pozyskiwania danych w czasie rzeczywistym.

POLECENIE COPY w celu załadowania danych z pliku

Poniższy kod kopiuje dane z pliku CSV do tabeli bazy danych. Kod wymaga pakietu pg-copy-streams i pliku pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

POLECENIE COPY w celu załadowania danych w pamięci

Poniższy kod kopiuje dane w pamięci do tabeli. Kod wymaga pakietu do 2 , który umożliwia tworzenie łańcuchów potoków.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Ponawianie próby aplikacji dla niepowodzeń żądań bazy danych

Czasami istnieje możliwość, że żądania bazy danych z aplikacji kończą się niepowodzeniem. Takie problemy mogą wystąpić w różnych scenariuszach, takich jak awaria sieci między aplikacją i bazą danych, nieprawidłowe hasło itp. Niektóre problemy mogą być przejściowe i rozwiązać je w ciągu kilku sekund do kilku minut. Możesz skonfigurować logikę ponawiania prób w aplikacji, aby przezwyciężyć błędy przejściowe.

Konfigurowanie logiki ponawiania prób w aplikacji pomaga ulepszyć środowisko użytkownika końcowego. W scenariuszach awarii użytkownicy będą czekać nieco dłużej, aż aplikacja będzie obsługiwać żądania, a nie napotkać błędów.

W poniższym przykładzie pokazano, jak zaimplementować logikę ponawiania prób w aplikacji. Przykładowy fragment kodu próbuje żądać bazy danych co 60 sekund (maksymalnie pięć razy), dopóki nie powiedzie się. Liczbę i częstotliwość ponownych prób można skonfigurować na podstawie potrzeb aplikacji.

W tym kodzie zastąp <klaster> nazwą klastra i <hasłem> administratora.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Następne kroki