Använd Node.js för att ansluta och köra SQL-kommandon på Azure Cosmos DB for PostgreSQL

GÄLLER FÖR: Azure Cosmos DB for PostgreSQL (drivs av Citus-databastillägget till PostgreSQL)

Den här snabbstarten visar hur du använder Node.js kod för att ansluta till ett kluster och använder SQL-uttryck för att skapa en tabell. Sedan infogar, frågar, uppdaterar och tar du bort data i databasen. Stegen i den här artikeln förutsätter att du är bekant med Node.js utveckling och är nybörjare på att arbeta med Azure Cosmos DB for PostgreSQL.

Installera PostgreSQL-biblioteket

Kodexemplen i den här artikeln kräver att pg-biblioteket samverkar med PostgreSQL-servern. Du måste installera pg med språkpakethanteraren (till exempel npm).

Ansluta, skapa en tabell och infoga data

Skapa den gemensamma anslutningsmodulen

Tips

Exempelkoden nedan använder en anslutningspool för att skapa och hantera anslutningar till PostgreSQL. Anslutningspooler på programsidan rekommenderas starkt eftersom:

  • Det säkerställer att programmet inte genererar för många anslutningar till databasen och undviker därför att överskrida anslutningsgränserna.
  • Det kan hjälpa dig att drastiskt förbättra prestanda – både svarstid och dataflöde. PostgreSQL-serverprocessen måste förgrenas för att hantera varje ny anslutning, och om du återanvänder en anslutning undviker du det arbetet.

Skapa en mapp med namnet db och skapa en citus.js fil i den här mappen som innehåller följande vanliga anslutningskod. I den här koden ersätter du <klustret> med klustrets namn och <lösenord> med administratörslösenordet.

/**
* file: db/citus.js
*/

const { Pool } = require('pg');

const pool = new Pool({
  max: 300,
  connectionTimeoutMillis: 5000,

  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
});

module.exports = {
  pool,
};

Skapa en tabell

Använd följande kod för att ansluta och läsa in data med SQL-uttrycken CREATE TABLE och INSERT INTO. Koden skapar en ny pharmacy tabell och infogar exempeldata.

/**
* file: create.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DROP TABLE IF EXISTS pharmacy;
    CREATE TABLE pharmacy (pharmacy_id integer,pharmacy_name text,city text,state text,zip_code integer);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (0,'Target','Sunnyvale','California',94001);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (1,'CVS','San Francisco','California',94002);
    INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (2,'Walgreens','San Diego','California',94003);
    CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);
  `;

  try {
    /* Real application code would probably request a dedicated client with
       pool.connect() and run multiple queries with the client. In this
       example, you're running only one query, so you use the pool.query()
       helper method to run it on the first available idle client.
    */

    await pool.query(queryString);
    console.log('Created the Pharmacy table and inserted rows.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Distribuera tabeller

Azure Cosmos DB for PostgreSQL ger dig superkraften att distribuera tabeller över flera noder för skalbarhet. Med kommandot nedan kan du distribuera en tabell. Du kan läsa mer om create_distributed_table och distributionskolumnen här.

Anteckning

Genom att distribuera tabeller kan de växa över alla arbetsnoder som läggs till i klustret.

Använd följande kod för att ansluta till databasen och distribuera tabellen.

/**
* file: distribute-table.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT create_distributed_table('pharmacy', 'pharmacy_id');
  `;

  try {
    await pool.query(queryString);
    console.log('Distributed pharmacy table.');
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Läsa data

Använd följande kod för att ansluta och läsa data med en SELECT-SQL-instruktion.

/**
* file: read.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    SELECT * FROM pharmacy;
  `;

  try {
    const res = await pool.query(queryString);
    console.log(res.rows);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Uppdatera data

Använd följande kod för att ansluta och uppdatera data med en UPDATE-SQL-instruktion.

/**
* file: update.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    UPDATE pharmacy SET city = 'Long Beach'
    WHERE pharmacy_id = 1;
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Update completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

Ta bort data

Använd följande kod för att ansluta och läsa data med SQL-instruktionen DELETE.

/**
* file: delete.js
*/

const { pool } = require('./db/citus');

async function queryDatabase() {
  const queryString = `
    DELETE FROM pharmacy
    WHERE pharmacy_name = 'Target';
  `;

  try {
    const result = await pool.query(queryString);
    console.log('Delete completed.');
    console.log(`Rows affected: ${result.rowCount}`);
  } catch (err) {
    console.log(err.stack);
  } finally {
    pool.end();
  }
}

queryDatabase();

COPY-kommando för snabb inmatning

COPY-kommandot kan ge ett enormt dataflöde när data matas in i Azure Cosmos DB for PostgreSQL. Kommandot COPY kan mata in data i filer eller från mikrobatchar med data i minnet för inmatning i realtid.

COPY-kommandot för att läsa in data från en fil

Följande kod kopierar data från en CSV-fil till en databastabell. Koden kräver paketet pg-copy-streams och filen pharmacies.csv.

/**
* file: copycsv.js
*/

const inputFile = require('path').join(__dirname, '/pharmacies.csv');
const fileStream = require('fs').createReadStream(inputFile);
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importCsvDatabase() {
  return new Promise((resolve, reject) => {
    const queryString = `
      COPY pharmacy FROM STDIN WITH (FORMAT CSV, HEADER true, NULL '');
    `;

    fileStream.on('error', reject);

    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom(queryString))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        fileStream.pipe(stream);
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}

(async () => {
  console.log('Copying from CSV...');
  await importCsvDatabase();
  await pool.end();
  console.log('Inserted csv successfully');
})();

COPY-kommandot för att läsa in minnesintern data

Följande kod kopierar minnesintern data till en tabell. Koden kräver through2-paketet , vilket möjliggör rörlänkning.

/**
 * file: copyinmemory.js
 */

const through2 = require('through2');
const copyFrom = require('pg-copy-streams').from;
const { pool } = require('./db/citus');

async function importInMemoryDatabase() {
  return new Promise((resolve, reject) => {
    pool
      .connect()
      .then(client => {
        const stream = client
          .query(copyFrom('COPY pharmacy FROM STDIN'))
          .on('error', reject)
          .on('end', () => {
            reject(new Error('Connection closed!'));
          })
          .on('finish', () => {
            client.release();
            resolve();
          });

        const internDataset = [
          ['100', 'Target', 'Sunnyvale', 'California', '94001'],
          ['101', 'CVS', 'San Francisco', 'California', '94002'],
        ];

        let started = false;
        const internStream = through2.obj((arr, _enc, cb) => {
          const rowText = (started ? '\n' : '') + arr.join('\t');
          started = true;
          cb(null, rowText);
        });

        internStream.on('error', reject).pipe(stream);

        internDataset.forEach((record) => {
          internStream.write(record);
        });

        internStream.end();
      })
      .catch(err => {
        reject(new Error(err));
      });
  });
}
(async () => {
  await importInMemoryDatabase();
  await pool.end();
  console.log('Inserted inmemory data successfully.');
})();

Appåterförsök för databasbegärandefel

Det är ibland möjligt att databasbegäranden från ditt program misslyckas. Sådana problem kan inträffa i olika scenarier, till exempel nätverksfel mellan app och databas, felaktigt lösenord osv. Vissa problem kan vara tillfälliga och lösa sig själva inom några sekunder till minuter. Du kan konfigurera omprövningslogik i din app för att lösa de tillfälliga felen.

Genom att konfigurera omförsökslogik i din app kan du förbättra slutanvändarupplevelsen. Under felscenarier väntar användarna bara lite längre på att programmet ska hantera begäranden i stället för att uppleva fel.

Exemplet nedan visar hur du implementerar omprövningslogik i din app. Exempelkodfragmentet provar en databasbegäran var 60:e sekund (upp till fem gånger) tills den lyckas. Antalet och frekvensen för återförsök kan konfigureras baserat på programmets behov.

I den här koden ersätter du <klustret> med klustrets namn och <lösenord> med administratörslösenordet.

const { Pool } = require('pg');
const { sleep } = require('sleep');

const pool = new Pool({
  host: 'c-<cluster>.<uniqueID>.postgres.cosmos.azure.com',
  port: 5432,
  user: 'citus',
  password: '<password>',
  database: 'citus',
  ssl: true,
  connectionTimeoutMillis: 0,
  idleTimeoutMillis: 0,
  min: 10,
  max: 20,
});

(async function() {
  res = await executeRetry('select nonexistent_thing;',5);
  console.log(res);
  process.exit(res ? 0 : 1);
})();

async function executeRetry(sql,retryCount)
{
  for (let i = 0; i < retryCount; i++) {
    try {
      result = await pool.query(sql)
      return result;
    } catch (err) {
      console.log(err.message);
      sleep(60);
    }
  }

  // didn't succeed after all the tries
  return null;
}

Nästa steg