This article shows you how to use query acceleration to retrieve a subset of data from your storage account.
Query acceleration enables applications and analytics frameworks to dramatically optimize data processing by retrieving only the data that they require to perform a given operation. To learn more, see Azure Data Lake Storage Query Acceleration.
Prerequisites
To access Azure Storage, you'll need an Azure subscription. If you don't already have a subscription, create a free account before you begin.
This article assumes that you've created a Java project by using Apache Maven. For an example of how to create a project by using Apache Maven, see Setting up.
There are no additional prerequisites required to use the Node.js SDK.
Enable query acceleration
To use query acceleration, you must register the query acceleration feature with your subscription. Once you've verified that the feature is registered, you must register the Azure Storage resource provider.
Step 1: Register the query acceleration feature
To use query acceleration, you must first register the query acceleration feature with your subscription.
Add these using statements to the top of your code file.
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Azure.Storage.Blobs.Specialized;
Query acceleration retrieves CSV and Json formatted data. Therefore, make sure to add using statements for any CSV or Json parsing libraries that you choose to use. The examples that appear in this article parse a CSV file by using the CsvHelper library that is available on NuGet. Therefore, we'd add these using statements to the top of the code file.
using CsvHelper;
using CsvHelper.Configuration;
To compile examples presented in this article, you'll also need to add these using statements as well.
using System.Threading.Tasks;
using System.IO;
using System.Globalization;
Add these import statements to the top of your code file.
Query acceleration retrieves CSV and Json formatted data. Therefore, make sure to add statements for any CSV or Json parsing modules that you choose to use. The examples that appear in this article parse a CSV file by using the fast-csv module. Therefore, we'd add this statement to the top of the code file.
const csv = require('@fast-csv/parse');
Retrieve data by using a filter
You can use SQL to specify the row filter predicates and column projections in a query acceleration request. The following code queries a CSV file in storage and returns all rows of data where the third column matches the value Hemingway, Ernest.
In the SQL query, the keyword BlobStorage is used to denote the file that is being queried.
Column references are specified as _N where the first column is _1. If the source file contains a header row, then you can refer to columns by the name that is specified in the header row.
The async method BlobQuickQueryClient.QueryAsync sends the query to the query acceleration API, and then streams the results back to the application as a Stream object.
static async Task QueryHemingway(BlockBlobClient blob)
{
string query = @"SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
await DumpQueryCsv(blob, query, false);
}
private static async Task DumpQueryCsv(BlockBlobClient blob, string query, bool headers)
{
try
{
var options = new BlobQueryOptions() {
InputTextConfiguration = new BlobQueryCsvTextOptions() { HasHeaders = headers },
OutputTextConfiguration = new BlobQueryCsvTextOptions() { HasHeaders = true },
ProgressHandler = new Progress<long>((finishedBytes) => Console.Error.WriteLine($"Data read: {finishedBytes}"))
};
options.ErrorHandler += (BlobQueryError err) => {
Console.ForegroundColor = ConsoleColor.Red;
Console.Error.WriteLine($"Error: {err.Position}:{err.Name}:{err.Description}");
Console.ResetColor();
};
// BlobDownloadInfo exposes a Stream that will make results available when received rather than blocking for the entire response.
using (var reader = new StreamReader((await blob.QueryAsync(
query,
options)).Value.Content))
{
using (var parser = new CsvReader(reader, new CsvConfiguration(CultureInfo.CurrentCulture, hasHeaderRecord: true) { HasHeaderRecord = true }))
{
while (await parser.ReadAsync())
{
Console.Out.WriteLine(String.Join(" ", parser.Parser.Record));
}
}
}
}
catch (Exception ex)
{
Console.Error.WriteLine("Exception: " + ex.ToString());
}
}
The method BlobQuickQueryClient.openInputStream() sends the query to the query acceleration API, and then streams the results back to the application as a InputStream object which can be read like any other InputStream object.
static void QueryHemingway(BlobClient blobClient) {
String expression = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'";
DumpQueryCsv(blobClient, expression, true);
}
static void DumpQueryCsv(BlobClient blobClient, String query, Boolean headers) {
try {
BlobQuerySerialization input = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(headers)
.setFieldQuote('\0')
.setEscapeChar('\\');
BlobQuerySerialization output = new BlobQueryDelimitedSerialization()
.setRecordSeparator('\n')
.setColumnSeparator(',')
.setHeadersPresent(true)
.setFieldQuote('\0')
.setEscapeChar('\n');
Consumer<BlobQueryError> errorConsumer = System.out::println;
Consumer<BlobQueryProgress> progressConsumer = progress -> System.out.println("total bytes read: " + progress.getBytesScanned());
BlobQueryOptions queryOptions = new BlobQueryOptions(query)
.setInputSerialization(input)
.setOutputSerialization(output)
.setErrorConsumer(errorConsumer)
.setProgressConsumer(progressConsumer);
/* Open the query input stream. */
InputStream stream = blobClient.openQueryInputStream(queryOptions).getValue();
try (BufferedReader reader = new BufferedReader(new InputStreamReader(stream))) {
/* Read from stream like you normally would. */
for (CSVRecord record : CSVParser.parse(reader, CSVFormat.EXCEL.withHeader())) {
System.out.println(record.toString());
}
}
} catch (Exception e) {
System.err.println("Exception: " + e.toString());
e.printStackTrace(System.err);
}
}
def query_hemingway(blob: BlobClient):
query = "SELECT * FROM BlobStorage WHERE _3 = 'Hemingway, Ernest, 1899-1961'"
dump_query_csv(blob, query, False)
def dump_query_csv(blob: BlobClient, query: str, headers: bool):
qa_reader = blob.query_blob(query, blob_format=DelimitedTextDialect(has_header=headers), on_error=report_error, encoding='utf-8')
# records() returns a generator that will stream results as received. It will not block pending all results.
csv_reader = csv.reader(qa_reader.records())
for row in csv_reader:
print("*".join(row))
You can scope your results to a subset of columns. That way you retrieve only the columns needed to perform a given calculation. This improves application performance and reduces cost because less data is transferred over the network.
Note
The maximum number of columns that you can scope your results to is 49. If you need your results to contain more than 49 columns, then use a wildcard character (*) for the SELECT expression (For example: SELECT *).
This code retrieves only the BibNum column for all books in the data set. It also uses the information from the header row in the source file to reference columns in the query.