Creating a DSC File Set from a Database Table

The following code is an example of how to create a DSC file set from records in a database table. It uses the ProductRecord class that is described in Creating a DSC File Set by Using Default Binary Serialization.

var config = new HpcLinqConfiguration("MyHpcClusterHeadNode");
var context = new HpcLinqContext(config);

string productFileSetName = ...

int stageSize = context.GetNodeCount();

// Now get rows from DB table based on the vertex rank.

int maxProductId = GetMaxProductId();
int span = maxProductId / stageSize;
int[] ranges = new int[stageSize + 1];

for (int i = 0; i < stageSize; i++)
   ranges[i] = i * span;
ranges[stageSize] = maxProductId + 1;

Console.WriteLine("Data ingress partition ranges:");
for (int i = 0; i < stageSize; i++)
  Console.WriteLine("  Stage {0}: {1} - {2}", i, ranges[i], ranges[i+1]);

context. FromEnumerable(Enumerable.Range(0, stageSize))
       .HashPartition(r => r.Value, stageSize)
       .SelectMany(r => ReadProductRecords(ranges[r], 
                                           ranges[r + 1]))
       .ToDsc(productFileSetName)
       .SubmitAndWait();

This code segments the database records into equally-sized groups that equal the number of compute nodes. It then writes the records to the cluster.

The code first uses FromEnumerable to create a single DSC file that contains the integers 0, 1, 2, …, nNodes – 1. Next, the code performs a LINQ to HPC query on the temporary file set and invokes the hash partitioning operation. This operation creates a DSC file that contains a single record on each compute node. The record identifies which range of database records should be written to that compute node. (For more information on hash partitioning, see Creating a DSC file set from text files by using hash partitioning.)

Next, the code invokes a SelectMany operation. The argument r is the range index. The body of the lambda expression provided to the SelectMany method invokes a helper method named ReadProductRecords that retrieves the data from the database that is within the specified range. The same number of DSC files will be used to store the product record data as were used to store the range data. In this example, the number of DSC files is equal to the number of DSC nodes. Finally, the query writes the database records to a new DSC file set.

Here is the code for the ReadProductRecords method.

public static IEnumerable<ProductRecord>
ReadProductRecords(int from, int to)
{
  const string queryString = "SELECT ProductID, UnitPrice, ProductName from dbo.products WHERE ProductID >= @from AND ProductID < @to ORDER BY ProductID ASC;";

  using (SqlConnection connection = 
           new SqlConnection(SampleConfiguration.DbConnectionString))
  {
    SqlCommand command = new SqlCommand(queryString, connection);
    command.Parameters.AddWithValue("@from", from);
    command.Parameters.AddWithValue("@to", to);

    connection.Open();
    SqlDataReader reader = command.ExecuteReader();
    while (reader.Read())
    {
      yield return new ProductRecord((int)reader[0], 
                                              (decimal)reader[1], 
                                              (string)reader[2]);
    }
    reader.Close();
  }
}

This code queries the database for a range of product records, and then returns these records as an enumerable collection of instances of the ProductRecord class.

At the conclusion of the query, the database records have been distributed across the cluster. It is then possible to run a distributed LINQ to HPC query against the new file set. Here is an example.

var config = new HpcLinqConfiguration("MyHpcClusterHeadNode");
var context = new HpcLinqContext(config);
string productFileSetName = ...

var result = context.FromDsc<ProductRecord>(productFileSetName);
foreach (ProductRecord p in result)
   Console.WriteLine("  {0}\t{1:c}\t{2}", p.ProductId, p.UnitPrice, p.ProductName);

This query prints the product ID, the unit price, and the product name for each of the product records in the file set.