Använda Python för att ansluta och köra SQL-kommandon på Azure Cosmos DB for PostgreSQL

GÄLLER FÖR: Azure Cosmos DB for PostgreSQL (drivs av Citus-databastillägget till PostgreSQL)

Den här snabbstarten visar hur du använder Python-kod för att ansluta till ett kluster och använder SQL-instruktioner för att skapa en tabell. Sedan infogar, frågar, uppdaterar och tar du bort data i databasen. Stegen i den här artikeln förutsätter att du är bekant med Python-utveckling och att du inte har arbetat med Azure Cosmos DB for PostgreSQL tidigare.

Installera PostgreSQL-biblioteket

Kodexemplen i den här artikeln kräver biblioteket psycopg2 . Du måste installera psycopg2 med språkpakethanteraren (till exempel pip).

Ansluta, skapa en tabell och infoga data

I följande kodexempel skapas en anslutningspool till Postgres-databasen. Den använder sedan cursor.execute-funktioner med SQL CREATE TABLE- och INSERT INTO-instruktioner för att skapa en tabell och infoga data.

Tips

Exempelkoden nedan använder en anslutningspool för att skapa och hantera anslutningar till PostgreSQL. Anslutningspooler på programsidan rekommenderas starkt eftersom:

  • Det säkerställer att programmet inte genererar för många anslutningar till databasen och undviker därför att överskrida anslutningsgränserna.
  • Det kan hjälpa dig att drastiskt förbättra prestanda – både svarstid och dataflöde. PostgreSQL-serverprocessen måste förgrenas för att hantera varje ny anslutning, och om du återanvänder en anslutning undviker du det arbetet.

I följande kod ersätter du <klustret> med klustrets namn och <lösenord> med administratörslösenordet.

Anteckning

Det här exemplet stänger anslutningen i slutet, så om du vill köra de andra exemplen i artikeln i samma session ska du inte ta med # Clean up avsnittet när du kör det här exemplet.

import psycopg2
from psycopg2 import pool

# NOTE: fill in these variables for your own cluster
host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

# Build a connection string from the variables
conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(host, user, dbname, password, sslmode)

postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20,conn_string)
if (postgreSQL_pool):
    print("Connection pool created successfully")

# Use getconn() to get a connection from the connection pool
conn = postgreSQL_pool.getconn()

cursor = conn.cursor()

# Drop previous table of same name if one exists
cursor.execute("DROP TABLE IF EXISTS pharmacy;")
print("Finished dropping table (if existed)")

# Create a table
cursor.execute("CREATE TABLE pharmacy (pharmacy_id integer, pharmacy_name text, city text, state text, zip_code integer);")
print("Finished creating table")

# Create a index
cursor.execute("CREATE INDEX idx_pharmacy_id ON pharmacy(pharmacy_id);")
print("Finished creating index")

# Insert some data into the table
cursor.execute("INSERT INTO pharmacy  (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (1,"Target","Sunnyvale","California",94001))
cursor.execute("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code) VALUES (%s, %s, %s, %s,%s);", (2,"CVS","San Francisco","California",94002))
print("Inserted 2 rows of data")

# Clean up
conn.commit()
cursor.close()
conn.close()

När koden körs skapar den följande utdata:

Connection established
Finished dropping table
Finished creating table
Finished creating index
Inserted 2 rows of data

Distribuera tabeller

Azure Cosmos DB for PostgreSQL ger dig superkraften att distribuera tabeller över flera noder för skalbarhet. Med kommandot nedan kan du distribuera en tabell. Du kan läsa mer om create_distributed_table och distributionskolumnen här.

Anteckning

Genom att distribuera tabeller kan de växa över alla arbetsnoder som läggs till i klustret.

# Create distributed table
cursor.execute("select create_distributed_table('pharmacy','pharmacy_id');")
print("Finished distributing the table")

Läsa data

I följande kodexempel används följande API:er för att läsa data från databasen:

  • cursor.execute med SQL SELECT-instruktionen för att läsa data.
  • cursor.fetchall() för att acceptera en fråga och returnera en resultatuppsättning som itererar.
# Fetch all rows from table
cursor.execute("SELECT * FROM pharmacy;")
rows = cursor.fetchall()

# Print all rows
for row in rows:
    print("Data row = (%s, %s)" %(str(row[0]), str(row[1])))

Uppdatera data

I följande kodexempel används cursor.execute med SQL UPDATE-instruktionen för att uppdatera data.

# Update a data row in the table
cursor.execute("UPDATE pharmacy SET city = %s WHERE pharmacy_id = %s;", ("guntur",1))
print("Updated 1 row of data")

Ta bort data

Följande kodexempel körs cursor.execute med SQL DELETE-instruktionen för att ta bort data.

# Delete data row from table
cursor.execute("DELETE FROM pharmacy WHERE pharmacy_name = %s;", ("Target",))
print("Deleted 1 row of data")

COPY-kommando för snabb inmatning

COPY-kommandot kan ge ett enormt dataflöde när data matas in i Azure Cosmos DB for PostgreSQL. Kommandot COPY kan mata in data i filer eller från mikrobatchar med data i minnet för inmatning i realtid.

COPY-kommandot för att läsa in data från en fil

Följande kod kopierar data från en CSV-fil till en databastabell. Koden kräver att filen pharmacies.csv.

with open('pharmacies.csv', 'r') as f:
    # Notice that we don't need the `csv` module.
    next(f) # Skip the header row.
    cursor.copy_from(f, 'pharmacy', sep=',')
    print("copying data completed")

COPY-kommandot för att läsa in minnesintern data

Följande kod kopierar minnesintern data till en tabell.

data = [[3,"Walgreens","Sunnyvale","California",94006], [4,"Target","Sunnyvale","California",94016]]
buf = io.StringIO()
writer = csv.writer(buf)
writer.writerows(data)

buf.seek(0)
with conn.cursor() as cur:
    cur.copy_from(buf, "pharmacy", sep=",")

conn.commit()
conn.close()

Appåterförsök för databasbegärandefel

Det är ibland möjligt att databasbegäranden från ditt program misslyckas. Sådana problem kan inträffa i olika scenarier, till exempel nätverksfel mellan app och databas, felaktigt lösenord osv. Vissa problem kan vara tillfälliga och lösa sig själva inom några sekunder till minuter. Du kan konfigurera omprövningslogik i din app för att lösa de tillfälliga felen.

Genom att konfigurera omförsökslogik i din app kan du förbättra slutanvändarupplevelsen. Under felscenarier väntar användarna bara lite längre på att programmet ska hantera begäranden i stället för att uppleva fel.

Exemplet nedan visar hur du implementerar omprövningslogik i din app. Exempelkodfragmentet provar en databasbegäran var 60:e sekund (upp till fem gånger) tills den lyckas. Antalet och frekvensen för återförsök kan konfigureras baserat på programmets behov.

I den här koden ersätter du <klustret> med klustrets namn och <lösenord> med administratörslösenordet.

import psycopg2
import time
from psycopg2 import pool

host = "c-<cluster>.<uniqueID>.postgres.cosmos.azure.com"
dbname = "citus"
user = "citus"
password = "<password>"
sslmode = "require"

conn_string = "host={0} user={1} dbname={2} password={3} sslmode={4}".format(
        host, user, dbname, password, sslmode)
postgreSQL_pool = psycopg2.pool.SimpleConnectionPool(1, 20, conn_string)

def executeRetry(query, retryCount):
    for x in range(retryCount):
        try:
            if (postgreSQL_pool):
                # Use getconn() to Get Connection from connection pool
                conn = postgreSQL_pool.getconn()
                cursor = conn.cursor()
                cursor.execute(query)
                return cursor.fetchall()
            break
        except Exception as err:
            print(err)
            postgreSQL_pool.putconn(conn)
            time.sleep(60)
    return None

print(executeRetry("select 1", 5))

Nästa steg