Aplicación de Java para conectarse y ejecutar comandos SQL en Azure Cosmos DB for PostgreSQL

SE APLICA A: Azure Cosmos DB for PostgreSQL (con tecnología de la extensión de base de datos de Citus en PostgreSQL)

En este inicio rápido se muestra cómo usar el código de Java para conectarse a un clúster y usar instrucciones SQL para crear una tabla. A continuación, insertará, consultará, actualizará y eliminará datos de la base de datos. En los pasos de este artículo se asume que está familiarizado con el desarrollo de Java y JDBC, pero que nunca ha trabajado con Azure Cosmos DB for PostgreSQL.

Configuración del proyecto y la conexión de Java

Cree un nuevo proyecto de Java y un archivo de configuración para conectarse a Azure Cosmos DB for PostgreSQL.

Creación de un nuevo proyecto de Java

Con el entorno de desarrollo integrado (IDE) que prefiera, cree un nuevo proyecto de Java con groupId test y artifactId crud. En el directorio raíz del proyecto, agregue un archivo pom.xml con el siguiente contenido. En este archivo se configura Apache Maven para usar Java 8 y un controlador de PostgreSQL reciente para Java.

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>test</groupId>
  <artifactId>crud</artifactId>
  <version>0.0.1-SNAPSHOT</version>
  <packaging>jar</packaging>

  <name>crud</name>
  <url>http://www.example.com</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.target>1.8</maven.compiler.target>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-engine</artifactId>
      <version>5.7.1</version>
      <scope>test</scope>
    </dependency>
    <dependency>
      <groupId>org.postgresql</groupId>
      <artifactId>postgresql</artifactId>
      <version>42.2.12</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/com.zaxxer/HikariCP -->
    <dependency>
      <groupId>com.zaxxer</groupId>
      <artifactId>HikariCP</artifactId>
      <version>5.0.0</version>
    </dependency>
    <dependency>
      <groupId>org.junit.jupiter</groupId>
      <artifactId>junit-jupiter-params</artifactId>
      <version>5.7.1</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <plugins>
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-surefire-plugin</artifactId>
        <version>3.0.0-M5</version>
      </plugin>
    </plugins>
  </build>
</project>

Configuración de la conexión de base de datos

En src/main/resources/, cree un archivo application.properties con el siguiente contenido. Reemplace el <clúster> por el nombre del clúster y la <contraseña> por la contraseña de administrador.

driver.class.name=org.postgresql.Driver
db.url=jdbc:postgresql://c-<cluster>.<uniqueID>.postgres.cosmos.azure.com:5432/citus?ssl=true&sslmode=require
db.username=citus
db.password=<password>

La cadena ?ssl=true&sslmode=require de la propiedad db.url indica al controlador JDBC que use la Seguridad de la capa de transporte (TLS) al conectarse a la base de datos. Con Azure Cosmos DB for PostgreSQL es obligatorio usar TLS, pero además es aconsejable hacerlo por seguridad.

Crear tablas.

Configure un esquema dela base de datos que tenga tablas distribuidas. Conéctese a la base de datos para crear el esquema y las tablas.

Generación del esquema de la base de datos

En src/main/resources/, cree un archivo schema.sql con el siguiente contenido:

DROP TABLE IF EXISTS public.pharmacy;
CREATE TABLE  public.pharmacy(pharmacy_id integer,pharmacy_name text ,city text ,state text ,zip_code integer);
CREATE INDEX idx_pharmacy_id ON public.pharmacy(pharmacy_id);

Distribución de tablas

Azure Cosmos DB for PostgreSQL le proporciona la habilidad de distribuir tablas entre varios nodos para mejorar la escalabilidad. El uso del siguiente comando le permitirá distribuir una tabla. Puede obtener más información sobre create_distributed_table y la columna de distribución aquí.

Nota

La distribución de tablas les permite crecer en todos los nodos de trabajo que se han agregado al clúster.

Para distribuir tablas, anexe la línea siguiente al archivo schema.sql que ha creado en la sección anterior.

select create_distributed_table('public.pharmacy','pharmacy_id');

Conéctese a la base de datos y cree el esquema

A continuación, agregue el código de Java que usa JDBC para almacenar y recuperar datos del clúster. El código usa los archivos application.properties y schema.sql para conectarse al clúster y crear el esquema.

  1. Cree un archivo DButil.java con el código siguiente, que contiene la clase DButil. La clase DBUtil configura un grupo de conexiones en PostgreSQL mediante HikariCP. Use esta clase para conectarse a PostgreSQL e iniciar consultas.

    Sugerencia

    El siguiente código de ejemplo usa un grupo de conexiones para crear y administrar las conexiones a PostgreSQL. Se recomienda encarecidamente la agrupación de conexiones en el lado de la aplicación porque:

    • Garantiza que la aplicación no genere demasiadas conexiones a la base de datos, lo que evita que se superen los límites de conexiones.
    • Puede ayudar a mejorar drásticamente el rendimiento, tanto la latencia como el procesamiento. El proceso del servidor PostgreSQL debe bifurcarse para controlar cada nueva conexión y reutilizar una conexión evita esa sobrecarga.
    //DButil.java
    package test.crud;
    
    import java.io.FileInputStream;
    import java.io.IOException;
    import java.sql.SQLException;
    import java.util.Properties;
    
    import javax.sql.DataSource;
    
    import com.zaxxer.hikari.HikariDataSource;
    
    public class DButil {
        private static final String DB_USERNAME = "db.username";
        private static final String DB_PASSWORD = "db.password";
        private static final String DB_URL = "db.url";
        private static final String DB_DRIVER_CLASS = "driver.class.name";
        private static Properties properties =  null;
        private static HikariDataSource datasource;
    
        static {
            try {
                properties = new Properties();
                properties.load(new FileInputStream("src/main/java/application.properties"));
    
                datasource = new HikariDataSource();
                datasource.setDriverClassName(properties.getProperty(DB_DRIVER_CLASS ));
                datasource.setJdbcUrl(properties.getProperty(DB_URL));
                datasource.setUsername(properties.getProperty(DB_USERNAME));
                datasource.setPassword(properties.getProperty(DB_PASSWORD));
                datasource.setMinimumIdle(100);
                datasource.setMaximumPoolSize(1000000000);
                datasource.setAutoCommit(true);
                datasource.setLoginTimeout(3);
            } catch (IOException | SQLException  e) {
                e.printStackTrace();
            }
        }
        public static DataSource getDataSource() {
            return datasource;
        }
    }
    
  2. En src/main/java/, cree un archivo DemoApplication.java que contenga el código siguiente:

    package test.crud;
    import java.io.IOException;
    import java.sql.*;
    import java.util.*;
    import java.util.logging.Logger;
    import java.io.FileInputStream;
    import java.io.FileOutputStream;
    import org.postgresql.copy.CopyManager;
    import org.postgresql.core.BaseConnection;
    import java.io.IOException;
    import java.io.Reader;
    import java.io.StringReader;
    
    public class DemoApplication {
    
        private static final Logger log;
    
        static {
            System.setProperty("java.util.logging.SimpleFormatter.format", "[%4$-7s] %5$s %n");
            log =Logger.getLogger(DemoApplication.class.getName());
        }
        public static void main(String[] args)throws Exception
        {
            log.info("Connecting to the database");
            Connection connection = DButil.getDataSource().getConnection();
            System.out.println("The Connection Object is of Class: " + connection.getClass());
            log.info("Database connection test: " + connection.getCatalog());
            log.info("Creating table");
            log.info("Creating index");
            log.info("distributing table");
            Scanner scanner = new Scanner(DemoApplication.class.getClassLoader().getResourceAsStream("schema.sql"));
            Statement statement = connection.createStatement();
            while (scanner.hasNextLine()) {
                statement.execute(scanner.nextLine());
            }
            log.info("Closing database connection");
            connection.close();
        }
    
    }
    

    Nota

    La base de datos user y las credenciales password se usan al ejecutar DriverManager.getConnection(properties.getProperty("url"), properties);. Las credenciales se almacenan en el archivo application.properties, que se pasa como argumento.

  3. Ahora puede ejecutar esta clase principal con su herramienta favorita:

    • Con el IDE, debería poder hacer clic con el botón derecho en la clase DemoApplication y ejecutarla.
    • Con Maven, puede ejecutar la aplicación mediante la ejecución de:
      mvn exec:java -Dexec.mainClass="com.example.demo.DemoApplication".

La aplicación debe conectarse a Azure Cosmos DB for PostgreSQL, crear un esquema de base de datos y, a continuación, cerrar la conexión, tal y como debería ver en los registros de la consola:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Create database schema
[INFO   ] Closing database connection

Creación de una clase de dominio

Cree la nueva clase de Java Pharmacy, junto a la clase DemoApplication y agregue el siguiente código:

public class Pharmacy {
    private Integer pharmacy_id;
    private String pharmacy_name;
    private String city;
    private String state;
    private Integer zip_code;
    public Pharmacy() { }
    public Pharmacy(Integer pharmacy_id, String pharmacy_name, String city,String state,Integer zip_code)
    {
        this.pharmacy_id = pharmacy_id;
        this.pharmacy_name = pharmacy_name;
        this.city = city;
        this.state = state;
        this.zip_code = zip_code;
    }

    public Integer getpharmacy_id() {
        return pharmacy_id;
    }

    public void setpharmacy_id(Integer pharmacy_id) {
        this.pharmacy_id = pharmacy_id;
    }

    public String getpharmacy_name() {
        return pharmacy_name;
    }

    public void setpharmacy_name(String pharmacy_name) {
        this.pharmacy_name = pharmacy_name;
    }

    public String getcity() {
        return city;
    }

    public void setcity(String city) {
        this.city = city;
    }

    public String getstate() {
        return state;
    }

    public void setstate(String state) {
        this.state = state;
    }

    public Integer getzip_code() {
        return zip_code;
    }

    public void setzip_code(Integer zip_code) {
        this.zip_code = zip_code;
    }
    @Override
    public String toString() {
        return "TPharmacy{" +
               "pharmacy_id=" + pharmacy_id +
               ", pharmacy_name='" + pharmacy_name + '\'' +
               ", city='" + city + '\'' +
               ", state='" + state + '\'' +
               ", zip_code='" + zip_code + '\'' +
               '}';
    }
}

Esta clase es un modelo de dominio asignado a la tabla Pharmacy que creó al ejecutar el script schema.sql.

Insertar datos

En el archivo DemoApplication.java, después del método main, agregue el siguiente método que usa la instrucción INSERT INTO SQL para insertar datos en la base de datos:

private static void insertData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Insert data");
    PreparedStatement insertStatement = connection
        .prepareStatement("INSERT INTO pharmacy (pharmacy_id,pharmacy_name,city,state,zip_code)  VALUES (?, ?, ?, ?, ?);");

    insertStatement.setInt(1, todo.getpharmacy_id());
    insertStatement.setString(2, todo.getpharmacy_name());
    insertStatement.setString(3, todo.getcity());
    insertStatement.setString(4, todo.getstate());
    insertStatement.setInt(5, todo.getzip_code());

    insertStatement.executeUpdate();
}

Agregue las dos líneas siguientes al método principal:

Pharmacy todo = new Pharmacy(0,"Target","Sunnyvale","California",94001);
insertData(todo, connection);

La ejecución de la clase main debería generar ahora la siguiente salida:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Closing database connection

Lectura de datos

Lea los datos que se insertaron anteriormente para validar que el código funciona correctamente.

En el archivo DemoApplication.java, después del método insertData, agregue el siguiente método que usa la instrucción SQL SELECT para leer datos de la base de datos:

private static Pharmacy readData(Connection connection) throws SQLException {
    log.info("Read data");
    PreparedStatement readStatement = connection.prepareStatement("SELECT * FROM Pharmacy;");
    ResultSet resultSet = readStatement.executeQuery();
    if (!resultSet.next()) {
        log.info("There is no data in the database!");
        return null;
    }
    Pharmacy todo = new Pharmacy();
    todo.setpharmacy_id(resultSet.getInt("pharmacy_id"));
    todo.setpharmacy_name(resultSet.getString("pharmacy_name"));
    todo.setcity(resultSet.getString("city"));
    todo.setstate(resultSet.getString("state"));
    todo.setzip_code(resultSet.getInt("zip_code"));
    log.info("Data read from the database: " + todo.toString());
    return todo;
}

Agregue la siguientes línea al método principal:

todo = readData(connection);

La ejecución de la clase main debería generar ahora la siguiente salida:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Closing database connection

Actualización de datos

Actualice los datos que se insertaron anteriormente.

En el archivo DemoApplication.java, después del método readData, agregue el siguiente método para actualizar los datos de la base de datos mediante la instrucción SQL UPDATE:

private static void updateData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Update data");
    PreparedStatement updateStatement = connection
        .prepareStatement("UPDATE pharmacy SET city = ? WHERE pharmacy_id = ?;");

    updateStatement.setString(1, todo.getcity());

    updateStatement.setInt(2, todo.getpharmacy_id());
    updateStatement.executeUpdate();
    readData(connection);
}

Agregue las dos líneas siguientes al método principal:

todo.setcity("Guntur");
updateData(todo, connection);

La ejecución de la clase main debería generar ahora la siguiente salida:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Closing database connection

Eliminación de datos

Por último, elimine los datos que se insertaron anteriormente. En el archivo DemoApplication.java, después del método updateData, agregue el siguiente método para eliminar datos de la base de datos mediante la instrucción SQL DELETE:

private static void deleteData(Pharmacy todo, Connection connection) throws SQLException {
    log.info("Delete data");
    PreparedStatement deleteStatement = connection.prepareStatement("DELETE FROM pharmacy WHERE pharmacy_id = ?;");
    deleteStatement.setLong(1, todo.getpharmacy_id());
    deleteStatement.executeUpdate();
    readData(connection);
}

Ahora, puede agregar la siguiente línea al método principal:

deleteData(todo, connection);

La ejecución de la clase main debería generar ahora la siguiente salida:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
[INFO   ] Closing database connection

Comando COPY para llevar a cabo una ingesta rápida

El comando COPY es capaz de conseguir un rendimiento enorme cuando ingiere datos en Azure Cosmos DB for PostgreSQL. El comando COPY puede ingerir datos ubicados en archivos o microprocesos de datos ubicados en memoria durante un proceso de ingesta en tiempo real.

Uso del comando COPY para cargar datos desde un archivo

El código siguiente copia datos de un archivo .csv a una tabla de base de datos. El ejemplo de código necesita el archivo pharmacies.csv.

public static long
copyFromFile(Connection connection, String filePath, String tableName)
throws SQLException, IOException {
    long count = 0;
    FileInputStream fileInputStream = null;

    try {
        Connection unwrap = connection.unwrap(Connection.class);
        BaseConnection  connSec = (BaseConnection) unwrap;

        CopyManager copyManager = new CopyManager((BaseConnection) connSec);
        fileInputStream = new FileInputStream(filePath);
        count = copyManager.copyIn("COPY " + tableName + " FROM STDIN delimiter ',' csv", fileInputStream);
    } finally {
        if (fileInputStream != null) {
            try {
                fileInputStream.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
    return count;
}

Ahora, puede agregar la siguiente línea al método principal:

int c = (int) copyFromFile(connection,"C:\\Users\\pharmacies.csv", "pharmacy");
log.info("Copied "+ c +" rows using COPY command");

Tras esta adición, la ejecución de la clase main generará la siguiente salida:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
[INFO ] Copied 5000 rows using COPY command
[INFO   ] Closing database connection

Comando COPY para cargar datos en memoria

El código siguiente copia en una tabla los datos en memoria.

private static void inMemory(Connection connection) throws SQLException,IOException
    {
    log.info("Copying inmemory data into table");
            
    final List<String> rows = new ArrayList<>();
    rows.add("0,Target,Sunnyvale,California,94001");
    rows.add("1,Apollo,Guntur,Andhra,94003");
        
    final BaseConnection baseConnection = (BaseConnection) connection.unwrap(Connection.class);
    final CopyManager copyManager = new CopyManager(baseConnection);

    // COPY command can change based on the format of rows. This COPY command is for above rows.
    final String copyCommand = "COPY pharmacy FROM STDIN with csv";        
       
    try (final Reader reader = new StringReader(String.join("\n", rows))) {
        copyManager.copyIn(copyCommand, reader);
    }
}

Ahora, puede agregar la siguiente línea al método principal:

inMemory(connection);

La ejecución de la clase main debería generar ahora la siguiente salida:

[INFO   ] Loading application properties
[INFO   ] Connecting to the database
[INFO   ] Database connection test: citus
[INFO   ] Creating table
[INFO   ] Creating index
[INFO   ] distributing table
[INFO   ] Insert data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Sunnyvale', state='California', zip_code='94001'}
[INFO   ] Update data
[INFO   ] Read data
[INFO   ] Data read from the database: Pharmacy{pharmacy_id=0, pharmacy_name='Target', city='Guntur', state='California', zip_code='94001'}
[INFO   ] Delete data
[INFO   ] Read data
[INFO   ] There is no data in the database!
5000
[INFO   ] Copying in-memory data into table
[INFO   ] Closing database connection

Reintento de la aplicación para errores de solicitud de la base de datos

A veces es posible que las solicitudes de base de datos de la aplicación produzcan un error. Estos problemas pueden producirse en diferentes escenarios, como errores de red entre la aplicación y la base de datos, contraseñas incorrectas, etc. Algunos problemas pueden ser transitorios y resolverse por sí mismos en unos segundos o minutos. Puede configurar la lógica de reintento en la aplicación para resolver los errores transitorios.

Configurar la lógica de reintento en la aplicación ayuda a mejorar la experiencia del usuario final. En escenarios de error, los usuarios simplemente esperarán un poco más para que la aplicación atienda las solicitudes, en lugar de experimentar errores.

En el ejemplo siguiente se muestra cómo implementar la lógica de reintento en la aplicación. El fragmento de código de ejemplo intenta una solicitud de base de datos cada 60 segundos (hasta cinco veces) hasta que se realiza correctamente. El número y la frecuencia de los reintentos se pueden configurar en función de las necesidades de la aplicación.

En este código, reemplace el <clúster> por el nombre del clúster y la <contraseña> por la contraseña de administrador.

package test.crud;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.util.logging.Logger;
import com.zaxxer.hikari.HikariDataSource;

public class DemoApplication
{
    private static final Logger log;

    static
    {
        System.setProperty("java.util.logging.SimpleFormatter.format", "[%4$-7s] %5$s %n");
        log = Logger.getLogger(DemoApplication.class.getName());
    }
    private static final String DB_USERNAME = "citus";
    private static final String DB_PASSWORD = "<password>";
    private static final String DB_URL = "jdbc:postgresql://c-<cluster>.<uniqueID>.postgres.cosmos.azure.com:5432/citus?sslmode=require";
    private static final String DB_DRIVER_CLASS = "org.postgresql.Driver";
    private static HikariDataSource datasource;

    private static String executeRetry(String sql, int retryCount) throws InterruptedException
    {
        Connection con = null;
        PreparedStatement pst = null;
        ResultSet rs = null;
        for (int i = 1; i <= retryCount; i++)
        {
            try
            {
                datasource = new HikariDataSource();
                datasource.setDriverClassName(DB_DRIVER_CLASS);
                datasource.setJdbcUrl(DB_URL);
                datasource.setUsername(DB_USERNAME);
                datasource.setPassword(DB_PASSWORD);
                datasource.setMinimumIdle(10);
                datasource.setMaximumPoolSize(1000);
                datasource.setAutoCommit(true);
                datasource.setLoginTimeout(3);
                log.info("Connecting to the database");
                con = datasource.getConnection();
                log.info("Connection established");
                log.info("Read data");
                pst = con.prepareStatement(sql);
                rs = pst.executeQuery();
                StringBuilder builder = new StringBuilder();
                int columnCount = rs.getMetaData().getColumnCount();
                while (rs.next())
                {
                    for (int j = 0; j < columnCount;)
                    {
                        builder.append(rs.getString(j + 1));
                        if (++j < columnCount)
                            builder.append(",");
                    }
                    builder.append("\r\n");
                }
                return builder.toString();
            }
            catch (Exception e)
            {
                Thread.sleep(60000);
                System.out.println(e.getMessage());
            }
        }
        return null;
    }

    public static void main(String[] args) throws Exception
    {
        String result = executeRetry("select 1", 5);
        System.out.print(result);
    }
}

Pasos siguientes