SDK REST Apache Phoenix Query Server
Apache Phoenix est une couche de base de données relationnelle massivement parallèle open source qui vient s’ajouter à Apache HBase. Phoenix vous permet d’utiliser des requêtes de type SQL avec HBase via des outils SSH tels que SQLUltraLite. Phoenix fournit également un serveur HTTP appelé un serveur de requêtes Phoenix (PQS), un client léger qui prend en charge deux mécanismes de transport pour la communication client : JSON et les mémoires tampons de protocole. Les mémoires tampons de protocole constituent le mécanisme par défaut et offrent une communication plus efficace que JSON.
Cet article décrit comment utiliser le Kit de développement logiciel (SDK) REST PQS pour créer des tables, des lignes d’upsert individuellement et en bloc, puis sélectionner des données à l’aide d’instructions SQL. Les exemples utilisent le pilote Microsoft .NET pour le serveur de requêtes Apache Phoenix. Ce SDK repose sur des API Avatica d’Apache Calcite qui utilisent exclusivement des mémoires tampons de protocole pour le format de sérialisation.
Pour plus d’informations, consultez Référence des mémoires tampons de protocole Apache Calcite Avatica.
Installer le SDK
Le pilote Microsoft .NET pour le serveur de requêtes Apache Phoenix est fourni sous la forme d’un package NuGet qui peut être installé à partir de la Console du gestionnaire de package NuGet de Visual Studio avec la commande suivante :
Install-Package Microsoft.Phoenix.Client
Instancier un nouvel objet PhoenixClient
Pour utiliser la bibliothèque, instanciez un nouvel objet PhoenixClient
, en passant ClusterCredentials
qui contient le Uri
à votre cluster, ainsi que le nom d’utilisateur et le mot de passe Apache Hadoop du cluster.
var credentials = new ClusterCredentials(new Uri("https://CLUSTERNAME.azurehdinsight.net/"), "USERNAME", "PASSWORD");
client = new PhoenixClient(credentials);
Remplacez CLUSTERNAME par le nom de votre cluster HDInsight HBase, ainsi que USERNAME et PASSWORD par les informations d’identification Hadoop spécifiées lors de la création du cluster. Le nom d’utilisateur Hadoop par défaut est admin.
Générer un identificateur de connexion unique
Pour envoyer une ou plusieurs requêtes à PQS, vous devez inclure un identificateur de connexion unique afin d’associer la ou les requêtes à la connexion.
string connId = Guid.NewGuid().ToString();
Dans chaque exemple, un appel de la méthode OpenConnectionRequestAsync
est tout d’abord effectué, en passant l’identificateur de connexion unique. Ensuite, définissez ConnectionProperties
et RequestOptions
, en passant ces objets et l’identificateur de connexion généré à la méthode ConnectionSyncRequestAsync
. L’objet ConnectionSyncRequest
du PQS permet de s’assurer que le client et le serveur ont une vue cohérente des propriétés de base de données.
ConnectionSyncRequest et ses ConnectionProperties
Pour appeler ConnectionSyncRequestAsync
, passez dans un objet ConnectionProperties
.
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
Voici certaines propriétés intéressantes :
Propriété | Description |
---|---|
AutoCommit | Booléen indiquant si autoCommit est activé pour les transactions Phoenix. |
Lecture seule | Booléen indiquant si la connexion est en lecture seule. |
TransactionIsolation | Entier indiquant le niveau d’isolation des transactions selon la spécification JDBC ; consultez le tableau suivant. |
Catalogue | Nom du catalogue à utiliser lors de la récupération des propriétés de connexion. |
schéma | Nom du schéma à utiliser lors de la récupération des propriétés de connexion. |
IsDirty | Booléen indiquant si les propriétés ont été modifiées. |
Voici les valeurs TransactionIsolation
:
Valeur d’isolation | Description |
---|---|
0 | Les transactions ne sont pas prises en charge. |
1 | Des lectures erronées, des lectures non reproductibles et des lectures fantômes peuvent se produire. |
2 | Les lectures erronées sont évitées, mais des lectures non reproductibles et des lectures fantômes peuvent se produire. |
4 | Les lectures erronées et les lectures non reproductibles sont évitées, mais des lectures fantômes peuvent se produire. |
8 | Les lectures erronées, les lectures non reproductibles et les lectures fantômes sont toutes évitées. |
Créer une table
HBase, comme n’importe quel autre SGBDR, stocke les données dans des tables. Phoenix utilise des requêtes SQL standard pour créer de nouvelles tables lors de la définition de la clé primaire et des types de colonnes.
Cet exemple et tous les exemples suivants utilisent l’objet PhoenixClient
instancié tel que défini dans Instancier un nouvel objet PhoenixClient.
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
// You can set certain request options, such as timeout in milliseconds:
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
CreateStatementResponse createStatementResponse = null;
OpenConnectionResponse openConnResponse = null;
try
{
// Opening connection
var info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
// Create the statement
createStatementResponse = client.CreateStatementRequestAsync(connId, options).Result;
// Create the table if it does not exist
string sql = "CREATE TABLE IF NOT EXISTS Customers (Id varchar(20) PRIMARY KEY, FirstName varchar(50), " +
"LastName varchar(100), StateProvince char(2), Email varchar(255), Phone varchar(15))";
await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
Console.WriteLine($"Table \"Customers\" created.");
}
catch (Exception e)
{
Console.WriteLine(e);
throw;
}
finally
{
if (createStatementResponse != null)
{
client.CloseStatementRequestAsync(connId, createStatementResponse.StatementId, options).Wait();
createStatementResponse = null;
}
if (openConnResponse != null)
{
client.CloseConnectionRequestAsync(connId, options).Wait();
openConnResponse = null;
}
}
L’exemple précédent crée une nouvelle table nommée Customers
à l’aide de l’option IF NOT EXISTS
. L’appel CreateStatementRequestAsync
crée une nouvelle instruction sur le serveur Avatica (PQS). Le bloc finally
ferme le CreateStatementResponse
retourné et les objets OpenConnectionResponse
.
Insérer des données individuellement
Cet exemple montre une insertion de données individuelles, en faisant référence à une collection List<string>
d’abréviations d’états et de territoires américains :
var states = new List<string> { "AL", "AK", "AS", "AZ", "AR", "CA", "CO", "CT", "DE", "DC", "FM", "FL", "GA", "GU", "HI", "ID", "IL", "IN", "IA", "KS", "KY", "LA", "ME", "MH", "MD", "MA", "MI", "MN", "MS", "MO", "MT", "NE", "NV", "NH", "NJ", "NM", "NY", "NC", "ND", "MP", "OH", "OK", "OR", "PW", "PA", "PR", "RI", "SC", "SD", "TN", "TX", "UT", "VT", "VI", "VA", "WA", "WV", "WI", "WY" };
La valeur de colonne StateProvince
de la table sera utilisée dans une opération de sélection ultérieure.
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
StatementHandle statementHandle = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)";
PrepareResponse prepareResponse = await client.PrepareRequestAsync(connId, sql, 100, options);
statementHandle = prepareResponse.Statement;
var r = new Random();
// Insert 300 rows
for (int i = 0; i < 300; i++)
{
var list = new pbc.RepeatedField<TypedValue>();
var id = new TypedValue
{
StringValue = "id" + i,
Type = Rep.String
};
var firstName = new TypedValue
{
StringValue = "first" + i,
Type = Rep.String
};
var lastName = new TypedValue
{
StringValue = "last" + i,
Type = Rep.String
};
var state = new TypedValue
{
StringValue = states.ElementAt(r.Next(0, 49)),
Type = Rep.String
};
var email = new TypedValue
{
StringValue = $"email{1}@junkemail.com",
Type = Rep.String
};
var phone = new TypedValue
{
StringValue = $"555-229-341{i.ToString().Substring(0,1)}",
Type = Rep.String
};
list.Add(id);
list.Add(firstName);
list.Add(lastName);
list.Add(state);
list.Add(email);
list.Add(phone);
Console.WriteLine("Inserting customer " + i);
await client.ExecuteRequestAsync(statementHandle, list, long.MaxValue, true, options);
}
await client.CommitRequestAsync(connId, options);
Console.WriteLine("Upserted customer data");
}
catch (Exception ex)
{
}
finally
{
if (statementHandle != null)
{
await client.CloseStatementRequestAsync(connId, statementHandle.Id, options);
statementHandle = null;
}
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
La structure d’exécution d’une instruction d’insertion est similaire à la création d’une nouvelle table. À la fin du bloc try
, la transaction est validée explicitement. Cet exemple répète une transaction d’insertion 300 fois. L’exemple suivant montre un processus d’insertion par lot plus efficace.
Insertion par lot des données
Le code suivant est presque identique au code d’insertion de données individuellement. Cet exemple utilise l’objet UpdateBatch
dans un appel à ExecuteBatchRequestAsync
, plutôt que d’appeler à plusieurs reprises ExecuteRequestAsync
avec une instruction préparée.
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
options.TimeoutMillis = 300000;
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
CreateStatementResponse createStatementResponse = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
// Creating statement
createStatementResponse = await client.CreateStatementRequestAsync(connId, options);
string sql = "UPSERT INTO Customers VALUES (?,?,?,?,?,?)";
PrepareResponse prepareResponse = client.PrepareRequestAsync(connId, sql, long.MaxValue, options).Result;
var statementHandle = prepareResponse.Statement;
var updates = new pbc.RepeatedField<UpdateBatch>();
var r = new Random();
// Insert 300 rows
for (int i = 300; i < 600; i++)
{
var list = new pbc.RepeatedField<TypedValue>();
var id = new TypedValue
{
StringValue = "id" + i,
Type = Rep.String
};
var firstName = new TypedValue
{
StringValue = "first" + i,
Type = Rep.String
};
var lastName = new TypedValue
{
StringValue = "last" + i,
Type = Rep.String
};
var state = new TypedValue
{
StringValue = states.ElementAt(r.Next(0, 49)),
Type = Rep.String
};
var email = new TypedValue
{
StringValue = $"email{1}@junkemail.com",
Type = Rep.String
};
var phone = new TypedValue
{
StringValue = $"555-229-341{i.ToString().Substring(0, 1)}",
Type = Rep.String
};
list.Add(id);
list.Add(firstName);
list.Add(lastName);
list.Add(state);
list.Add(email);
list.Add(phone);
var batch = new UpdateBatch
{
ParameterValues = list
};
updates.Add(batch);
Console.WriteLine($"Added customer {i} to batch");
}
var executeBatchResponse = await client.ExecuteBatchRequestAsync(connId, statementHandle.Id, updates, options);
Console.WriteLine("Batch upserted customer data");
}
catch (Exception ex)
{
}
finally
{
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
Dans un environnement de test, l’insertion individuellement de 300 nouveaux enregistrements a nécessité près de 2 minutes. En revanche, l’insertion des 300 enregistrements par lot n’a pris que 6 secondes.
Sélectionner les données
Cet exemple montre comment réutiliser une connexion pour exécuter plusieurs requêtes :
- Sélectionnez tous les enregistrements, puis extrayez les enregistrements restants lorsque le maximum par défaut de 100 a été retourné.
- Utilisez une instruction de sélection du nombre total de lignes pour récupérer le résultat scalaire unique.
- Exécutez une instruction de sélection qui retourne le nombre total de clients par état ou territoire.
string connId = Guid.NewGuid().ToString();
RequestOptions options = RequestOptions.GetGatewayDefaultOptions();
// In gateway mode, PQS requests will be https://<cluster dns name>.azurehdinsight.net/hbasephoenix<N>/
// Requests sent to hbasephoenix0/ will be forwarded to PQS on workernode0
options.AlternativeEndpoint = "hbasephoenix0/";
OpenConnectionResponse openConnResponse = null;
StatementHandle statementHandle = null;
try
{
// Opening connection
pbc::MapField<string, string> info = new pbc::MapField<string, string>();
openConnResponse = await client.OpenConnectionRequestAsync(connId, info, options);
// Syncing connection
ConnectionProperties connProperties = new ConnectionProperties
{
HasAutoCommit = true,
AutoCommit = true,
HasReadOnly = true,
ReadOnly = false,
TransactionIsolation = 0,
Catalog = "",
Schema = "",
IsDirty = true
};
await client.ConnectionSyncRequestAsync(connId, connProperties, options);
var createStatementResponse = await client.CreateStatementRequestAsync(connId, options);
string sql = "SELECT * FROM Customers";
ExecuteResponse executeResponse = await client.PrepareAndExecuteRequestAsync(connId, sql, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
pbc::RepeatedField<Row> rows = executeResponse.Results[0].FirstFrame.Rows;
// Loop through all of the returned rows and display the first two columns
for (int i = 0; i < rows.Count; i++)
{
Row row = rows[i];
Console.WriteLine(row.Value[0].ScalarValue.StringValue + " " + row.Value[1].ScalarValue.StringValue);
}
// 100 is hard-coded on the server side as the default firstframe size
// FetchRequestAsync is called to get any remaining rows
Console.WriteLine("");
Console.WriteLine($"Number of rows: {rows.Count}");
// Fetch remaining rows, offset is not used, simply set to 0
// When FetchResponse.Frame.Done is true, all rows were fetched
FetchResponse fetchResponse = await client.FetchRequestAsync(connId, createStatementResponse.StatementId, 0, int.MaxValue, options);
Console.WriteLine($"Frame row count: {fetchResponse.Frame.Rows.Count}");
Console.WriteLine($"Fetch response is done: {fetchResponse.Frame.Done}");
Console.WriteLine("");
// Running query 2
string sql2 = "select count(*) from Customers";
ExecuteResponse countResponse = await client.PrepareAndExecuteRequestAsync(connId, sql2, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
long count = countResponse.Results[0].FirstFrame.Rows[0].Value[0].ScalarValue.NumberValue;
Console.WriteLine($"Total customer records: {count}");
Console.WriteLine("");
// Running query 3
string sql3 = "select StateProvince, count(*) as Number from Customers group by StateProvince order by Number desc";
ExecuteResponse groupByResponse = await client.PrepareAndExecuteRequestAsync(connId, sql3, createStatementResponse.StatementId, long.MaxValue, int.MaxValue, options);
pbc::RepeatedField<Row> stateRows = groupByResponse.Results[0].FirstFrame.Rows;
for (int i = 0; i < stateRows.Count; i++)
{
Row row = stateRows[i];
Console.WriteLine(row.Value[0].ScalarValue.StringValue + ": " + row.Value[1].ScalarValue.NumberValue);
}
}
catch (Exception ex)
{
}
finally
{
if (statementHandle != null)
{
await client.CloseStatementRequestAsync(connId, statementHandle.Id, options);
statementHandle = null;
}
if (openConnResponse != null)
{
await client.CloseConnectionRequestAsync(connId, options);
openConnResponse = null;
}
}
La sortie des instructions select
doit être le résultat suivant :
id0 first0
id1 first1
id10 first10
id100 first100
id101 first101
id102 first102
. . .
id185 first185
id186 first186
id187 first187
id188 first188
Number of rows: 100
Frame row count: 500
Fetch response is done: True
Total customer records: 600
NJ: 21
CA: 19
GU: 17
NC: 16
IN: 16
MA: 16
AZ: 16
ME: 16
IL: 15
OR: 15
. . .
MO: 10
HI: 10
GA: 10
DC: 9
NM: 9
MD: 9
MP: 9
SC: 7
AR: 7
MH: 6
FM: 5