Connect(); 2016

Volume 31 - Número 12

Aplicativos Inteligentes do Connect(); - Extensibilidade nos aplicativos de Big Data do U-SQL

Por Michael Rys; 2016

O enfoque tradicional de abordar os três grandes Vs do Big Data (volume, velocidade e variedade) durante o processamento de Big Data concentrava-se principalmente em fornecer uma plataforma escalonável para processar o volume de dados, adicionando recursos de processamento quase em tempo real e oferecendo a capacidade de processar diversos formatos de dados de entrada, desde CSV até JSON e formatos binários personalizados. Uma variedade que, frequentemente, é uma consideração posterior é a variedade associada ao processamento de dados personalizado, não apenas em termos de formato, mas também a capacidade de facilitar a ampliação da análise com algoritmos personalizados, preservando a natureza declarativa da experiência de linguagem de consulta.

Algumas linguagens modernas de consulta e processamento de Big Data estão começando a lidar com isso. Em particular, o U-SQL foi projetado desde o início para combinar a capacidade declarativa de uma linguagem baseada em SQL à flexibilidade de usar suas bibliotecas de código existentes e desenvolver novos algoritmos personalizados.

Em um artigo anterior (bit.ly/1OtXM2K), eu apresentei o U-SQL e mostrei como usar o sistema de tipo Microsoft .NET Framework em conjunto com a linguagem de expressão baseada em C# no U-SQL o torna perfeito para ampliar a análise com expressões de código personalizado. Expliquei como usar assemblies C# para definir UDFs (funções definidas pelo usuário) e usá-las em scripts de consulta U-SQL.

O U-SQL não só permite que você adicione suas próprias funções C# personalizadas, mas também fornece uma estrutura na qual você pode adicionar seus próprios UDOs (operadores definidos pelo usuário), como seus próprios extratores, geradores de saída e operadores de conjuntos de linhas, como processadores, aplicadores, redutores e combinadores personalizados. A estrutura consiste em duas partes:

  1. Interfaces .NET que fornecem o contrato para que você crie esses operadores de forma que possa se concentrar no código, deixando a execução expandida para o U-SQL. Observe que o código real de lógica de negócios não precisa ser implementado no .NET, como mostraremos mais adiante.
  2. Expressões do U-SQL como EXTRACT e REDUCE que invocam os operadores personalizados e os executam em escala em seus dados.

Neste artigo, vou expandir o artigo anterior e mostrar como você pode usar os mecanismos de extensibilidade do U-SQL para processar vários dados diferentes, que vão desde JSON até dados de imagem. Também mostrarei como adicionar seus próprios operadores.

Gerenciar seu código personalizado no U-SQL

Antes de começar com alguns exemplos, vamos entender melhor como o U-SQL pode usar seu código personalizado.

Como mencionado, o U-SQL segue o C# com sua linguagem de expressão escalar, que está sendo usada em lugares como predicados do U-SQL e nas expressões em uma cláusula select. Para que o código personalizado se torne visível para o compilador do U-SQL, o código deve ser empacotado em um assembly .NET que deve ser referenciado pelo script do U-SQL. Para poder fazer referência ao assembly, ela deve ter sido registrado anteriormente no serviço de metadados do U-SQL usando uma instrução CREATE ASSEMBLY.

Registrar e referenciar assemblies do U-SQL Sugiro usar o Azure Data Lake Tools para Visual Studio (aka.ms/adltoolsvs), que facilita a criação e o registro de assemblies que funcionam com o U-SQL. Se escrever o código personalizado em um projeto "Biblioteca de Classes (para o aplicativo do U-SQL)" (veja a Figura 1), você poderá escrever o código, criar o projeto e registrar diretamente o arquivo DLL de assembly gerado clicando com o botão direito do mouse (veja a Figura 2).

Projeto de Biblioteca de Classes (para aplicativo U-SQL)
Figura 1: Projeto de Biblioteca de Classes (para aplicativo U-SQL)

Registrar um assembly U-SQL
Figura 2: Registrar um assembly U-SQL

Em seguida, o script do U-SQL só precisa da instrução REFERENCE ASSEMBLY para tornar as classes e métodos públicos utilizáveis no script do U-SQL, como mostrado na Figura 3.

Figura 3: Referir-se a uma função definida pelo usuário de um assembly personalizado

REFERENCE ASSEMBLY master.TweetAnalysis;
USING tweet_fns = TweetAnalysis.Udfs;
@t =
  EXTRACT date string,
          time string,
          author string,
          tweet string
  FROM "/Samples/Data/Tweets/Tweets.csv"
  USING Extractors.Csv();
// Get the mentions from the tweet string
@m =
  SELECT origin
       , tweet_fns.get_mentions(tweet) AS mentions
       , author AS mentioned_by
FROM @t;
...

Usar código existente com assemblies do U-SQL Muitas vezes, convém usar bibliotecas de código existentes ou até mesmo código não .NET. Se quiser usar código não .NET (por exemplo, uma biblioteca nativa ou até mesmo um tempo de execução de uma linguagem completamente diferente, como Python ou JavaScript), você deverá envolver o código não NET com uma camada de interoperabilidade de C# que será chamada do U-SQL e que, em seguida, chamará o código não .NET, empacotando os dados entre os componentes e implementando um contrato de interface UDO. Nesse caso, os artefatos de código não .NET, como .dlls nativas ou arquivos do tempo de execução diferentes, precisam ser adicionados como arquivos adicionais. Isso pode ser feito na opção Arquivo Adicional do registro de assembly. Esses arquivos são implantados automaticamente em cada nó quando o assembly do .NET é referenciado em um script e são disponibilizados para o diretório de trabalho do assembly .NET localmente para esse nó.

Para usar bibliotecas .NET existentes, é necessário registrar as bibliotecas de código existentes como Dependências Gerenciadas em seu próprio assembly ou, se você reutilizar uma biblioteca que pode ser usada diretamente no U-SQL, registrá-las diretamente em seu banco de dados U-SQL. Em ambos os casos, o script deve fazer referência a todos os assemblies .NET necessários ao script.

Mostrarei alguns exemplos dessas opções de registro no restante do artigo, ao discutir alguns cenários de código personalizado em que faz sentido usar o modelo de extensibilidade. Esses cenários incluem: mesclar intervalos sobrepostos a um redutor personalizado, processar documentos JSON, processar dados de imagem e processar dados espaciais. Discutiremos cada um deles separadamente.

Mesclar intervalos sobrepostos a um redutor personalizado

Digamos que você tenha um arquivo de log que rastreia quando um usuário interage com seu serviço. Além disso, vamos supor que um usuário possa interagir com seu serviço de várias maneiras (por exemplo, realizando pesquisas do Bing em vários dispositivos ou janelas de navegador). Como parte do trabalho do U-SQL que prepara o arquivo de log para análise posterior, você deseja mesclar intervalos sobrepostos.

Por exemplo, se o arquivo de log de entrada for semelhante à Figura 4, você desejará mesclar os intervalos sobrepostos para cada usuário na Figura 5.

Figura 4: arquivo de log com intervalos de tempo sobrepostos

Hora de início  Hora de término  Nome do Usuário
5h00  6h00  ABC
5h00  6h00  XYZ
8h00  9h00  ABC
8h00  10h00  ABC
10h00  14h00  ABC
7h00  11h00  ABC
9h00  11h00  ABC
11h00  11h30  ABC
23h40  23h59  FOO
23h50  0h40  FOO

Figura 5: arquivo de log após mesclar intervalos de tempo sobrepostos

Hora de início  Hora de término  Nome do Usuário
5h00  6h00  ABC
5h00  6h00  XYZ
7h00  14h00  ABC
23h40  0h40  FOO

Se examinar o problema, primeiro você notará que deseja definir algo como uma agregação definida pelo usuário para combinar os intervalos de tempo sobrepostos. No entanto, se observar os dados de entrada, você notará que, como os dados não são ordenados, terá que manter o estado para todos os intervalos possíveis e mesclar intervalos disjuntos à medida que intervalos de ponte aparecerem ou precisará pré-ordenar os intervalos para cada nome de usuário para facilitar a mesclagem dos intervalos.

A agregação ordenada é mais simples de escalonar, mas o U-SQL não fornece UDAGGs (agregadores definidos pelo usuário ordenados). Além disso, UDAGGs normalmente produzem uma linha por grupo, enquanto, neste caso, posso ter várias linhas por grupo se os intervalos são intervalos disjuntos.

Felizmente, o U-SQL fornece um UDO escalável chamado redutor (bit.ly/2evGsDA), que pode agregar um conjunto de linhas com base em um conjunto de chaves de agrupamento usando código personalizado.

Primeiro, vamos escrever a lógica do U-SQL em que ReducedSample.Range Reducer é o redutor definido pelo usuário (Reducer UDO) do assembly RangeReducer, e os dados de log estão localizados no arquivo /Samples/Blogs/MRys/Ranges/ranges.txt (bit.ly/2eseZyw) e usam "-" como o delimitador de coluna. Eis o código:

REFERENCE ASSEMBLY RangeReducer;
@in = EXTRACT start DateTime, end DateTime, user string
FROM "/Samples/Blogs/MRys/Ranges/ranges.txt"
USING Extractors.Text(delimiter:'-');
@r =  REDUCE @in PRESORT start ON user
      PRODUCE start DateTime, end DateTime, user string
      READONLY user
      USING new ReduceSample.RangeReducer();
OUTPUT @r
TO "/temp/result.csv"
USING Outputters.Csv();

A expressão REDUCE utiliza o conjunto de linhas @in como entrada, particiona-o com base na coluna de usuário, pré-ordena as partições com base nos valores da coluna de início e aplica o RangeReducer, produzindo o mesmo esquema de conjunto de linhas na saída. Como o redutor apenas ajusta o intervalo do início ao fim, na verdade ele não toca na coluna do usuário. Portanto, você o marca como READONLY. Isso dá à estrutura do redutor a permissão para passar os dados automaticamente para essa coluna e, em troca, permite que o processador de consultas do U-SQL aplique agressivamente otimizações em colunas somente leitura, como enviar predicados em uma coluna somente leitura antes do redutor.

A maneira de escrever um redutor é implementar uma instância de Microsoft.Analytics.Interfaces.IReducer. Nesse caso, como você não precisa fornecer parâmetros, basta substituir o método abstrato Reduce. Você pode copiar o código em uma biblioteca C# para o U-SQL e registrá-lo como o assembly RangeReducer, como explicado anteriormente. A Figura 6 mostra a implementação do RangeReducer. (Observe que práticas de recuo de código normais foram alteradas em alguns exemplos de código devido a restrições de espaço.)

Figura 6: implementação em C# de RangeReducer

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
namespace ReduceSample
{
  public class RangeReducer : IReducer
  {
    public override IEnumerable<IRow> Reduce(
      IRowset input, IUpdatableRow output)
    {
      // Init aggregation values
      bool first_row_processed = false;
      var begin = DateTime.MaxValue;
      var end = DateTime.MinValue;
      // Requires that the reducer is PRESORTED on begin and
      // READONLY on the reduce key.
      foreach (var row in input.Rows)
      {
        // Initialize the first interval with the first row if i is 0
       if (!first_row_processed)
        {
         first_row_processed = true; // Mark that the first row was handled
          begin = row.Get<DateTime>("start");
          end = row.Get<DateTime>("end");
          // If the end is just a time and not a date, it can be earlier
          // than the begin, indicating it is on the next day;
          // this let's you fix up the end to the next day in that case
          if (end < begin) { end = end.AddDays(1); }
        }
        else // Handle the remaining rows
        {
          var b = row.Get<DateTime>("start");
          var e = row.Get<DateTime>("end");
          // Fix up the date if end is earlier than begin
          if (e < b) { e = e.AddDays(1); }
          // If begin time is still inside the interval,
          // increase the interval if it is longer
          if (b <= end)
          {
            // If the new end time is later than the current,
            // extend the interval
            if (e > end) { end = e; }
          }
          else // Output the previous interval and start a new one
          {
            output.Set<DateTime>("start", begin);
            output.Set<DateTime>("end", end);
            yield return output.AsReadOnly();
            begin = b; end = e;
          } // if
        } // if
      } // foreach
      // Now output the last interval
      output.Set<DateTime>("start", begin);
      output.Set<DateTime>("end", end);
      yield return output.AsReadOnly();
    } // Reduce
  } // RangeReducer
} // ReduceSample

A expressão U-SQL REDUCE aplicará o método Reduce uma vez para cada chave de partição distinta em paralelo. Portanto, o parâmetro de entrada conterá apenas as linhas para determinado grupo, e a implementação pode retornar zero para muitas linhas como saída.

Como a cláusula PRESORT garante que as linhas sejam ordenadas, a lógica interna pode presumir que os dados são ordenados e, como a coluna do usuário está marcada como READONLY, a coluna será passada automaticamente, e você poderá escrever o código UDO de forma mais genérica, concentrando-se apenas nas colunas que deseja transformar.

Se agora você aplicar o redutor em um grande conjunto de dados e se alguns dos usuários usarem o sistema muito mais frequentemente do que outros, você encontrará algo chamado distorção de dados, em que alguns usuários têm grandes partições e outros têm apenas pequenas partições. Como o contrato do redutor tem a garantia de ver todos os dados dessa partição, todos os dados devem ser misturados para esse nó e lidos em uma chamada. Devido a esse requisito, no melhor dos casos, a distorção de dados pode fazer com que algumas partições demorem muito mais do que outras para serem processadas e, no pior dos casos, pode fazer com que alguns redutores fiquem sem os recursos de memória e tempo disponíveis (um vértice U-SQL atingirá o tempo limite após ser executado por cerca de cinco horas).

Se a semântica do redutor for associativa e comutativa e seu esquema de saída for igual ao esquema de entrada, um redutor poderá ser marcado como recursivo, o que permite ao mecanismo de consulta dividir grandes grupos em subgrupos menores e aplicar recursivamente o redutor a esses subgrupos para calcular o resultado final. Essa aplicação recursiva permite ao redutor um melhor equilíbrio e paralelismo na presença de distorção de dados. Um redutor é marcado como recursivo usando a anotação de propriedade SqlUserDefinedReducer(IsRecursive = true):

namespace ReduceSample
{
  [SqlUserDefinedReducer(IsRecursive = true)]
  public class RangeReducer : IReducer
  {
    public override IEnumerable<IRow> Reduce(
      IRowset input, IUpdatableRow output)
    {
      // Insert the code from Figure 6 here
    } // Reduce
  } // RangeReducer
} // ReduceSample

Em nosso caso, o redutor pode ser marcado como recursivo para melhorar a escalabilidade e o desempenho, presumindo que o processamento preserve a ordenação entre as linhas em cada invocação recursiva.

Você pode encontrar um projeto do Visual Studio para o exemplo em nosso repositório do GitHub, em bit.ly/2ecLe5B.

Processar documentos JSON

Um dos formatos de dados mais frequentes, após arquivos de texto separados por vírgulas, é JSON. Diferentemente de formatos de arquivo CSV, o U-SQL não fornece um extrator JSON interno. No entanto, a comunidade U-SQL forneceu um assembly de exemplo em bit.ly/2d9O4va que dá suporte para extração e processamento de documentos JSON e XML.

Essa solução usa a biblioteca Json.NET da Newtonsoft (bit.ly/2evWJbz) para o trabalho mais intenso com JSON e System.XML para o processamento XML. O assembly pode extrair dados de um documento JSON usando o JsonExtractor (bit.ly/2dPARsM), dividir um documento JSON em um SqlMap para permitir a navegação e a decomposição de documentos JSON com a função JsonTuple (bit.ly/2e8tSuX) e, finalmente, transformar um conjunto de linhas em um arquivo formatado JSON com JSONOutputter (bit.ly/2e4uv3W).

Observe que o assembly foi projetado para ser um processador JSON genérico. Isso significa que ele não faz suposições sobre a estrutura do documento JSON e precisa ser resiliente à natureza semiestruturada do JSON, incluindo elementos tipados heterogêneos (escalares versus estruturados, diferentes tipos de dados para o mesmo elemento, elementos ausentes e assim por diante). Se souber que seus documentos JSON aderem a um esquema específico, você poderá criar um extrator JSON mais eficiente.

Diferentemente do exemplo do redutor anterior, em que você escreve seu próprio assembly e depois o implanta, nesse caso, a solução está pronta para ser usada. Você pode carregar a solução de nosso repositório GitHub no Visual Studio e compilá-la e implantá-la por conta própria ou localizar as DLLs no diretório bin\Debug da solução.

Como mencionado anteriormente, a dependência que não é do sistema requer que os assembliesSamples.Format e Json.NET sejam registrados no repositório de metadados do U-SQL (você pode selecionar o assembly Newtonsoft como Dependência Gerenciada ao registrar o assembly Format usando o Visual Studio), e ambos precisarão ser referenciados se você quiser processar documentos JSON. Supondo que você tenha instalado os conjuntos de módulos JSON no catálogo do U-SQL com os nomes [Microsoft.Analytics.Samples.Formats] e [NewtonSoft.Json] no JSONBlog do banco de dados do U-SQL (veja a Figura 7), poderá usar os assemblies referenciando-os no início dos scripts com:

REFERENCE ASSEMBLY JSONBlog.[NewtonSoft.Json];
REFERENCE ASSEMBLY JSONBlog.[Microsoft.Analytics.Samples.Formats];

Registrar o assembly de formatos no Visual Studio
Figura 7: registrar o assembly Formats no Visual Studio

O extrator JSON implementa a interface do U-SQL IExtractor. Como os documentos JSON precisam ser totalmente analisados para garantir que estão formados corretamente, um arquivo que contiver um único documento JSON precisará ser processado em um único vértice do Extrator. Assim, você indica que o extrator precisa ver o conteúdo do arquivo completo definindo a propriedade AtomicFileProcessing como true (veja a Figura 8). O extrator pode ser chamado com um parâmetro opcional denominado rowpath, que permite identificar os objetos JSON que serão mapeados para uma linha usando uma expressão JSONPath (bit.ly/1EmvgKO).

Figura 8: o Extrator JSON

[SqlUserDefinedExtractor(AtomicFileProcessing = true)]
public class JsonExtractor : IExtractor
{
  private string rowpath;            
  public JsonExtractor(string rowpath = null)
  {
    this.rowpath = rowpath;
  }
  public override IEnumerable<IRow> Extract(
    IUnstructuredReader input, IUpdatableRow output)
  {
    // Json.NET
    using (var reader = new JsonTextReader(
      new StreamReader(input.BaseStream)))
    {
      // Parse Json
      var root = JToken.ReadFrom(reader);
      // Rows
      // All objects are represented as rows
      foreach (JObject o in SelectChildren(root, this.rowpath))
      {
        // All fields are represented as columns
        this.JObjectToRow(o, output);
        yield return output.AsReadOnly();
      }
    }
  }
}

A implementação do extrator passará o fluxo de entrada que a estrutura do U-SQL Extractor inclui no extrator para o Json.NET JsonTextReader. Em seguida, usará o caminho de linha para obter as sub-árvores que estão sendo mapeadas para uma linha usando SelectChildren. Como objetos JSON podem ser heterogêneos, o código retorna o JObject genérico em vez de JArray posicional ou valores escalares.

Observe que esse extrator carrega o documento JSON na memória. Se o documento for muito grande, poderá causar uma condição de falta de memória. Nesse caso, você teria que escrever seu próprio extrator para transmitir o documento sem ter que carregar o documento inteiro na memória.

Agora vamos usar o JSON Extractor e a função de tupla JSON para analisar o documento JSON complexo de /Samples/Blogs/MRys/JSON/complex.json (bit.ly/2ekwOEQ) fornecido na Figura 9.

Figura 9: um documento JSON de exemplo

[{
  "person": {
    "personid": 123456,
    "name": "Person 1",
    "addresses": {
      "address": [{
        "addressid": "2",
        "street": "Street 2",
        "postcode": "1234 AB",
        "city": "City 1"
      }, {
        "addressid": "2",
        "street": "Street 2",
        "postcode": "5678 CD",
        "city": "City 2"
      }]
    }
  }
}, {
     "person": {
     "personid": 798,
     "name": "Person 2",
     "addresses": {
       "address": [{
         "addressid": "1",
         "street": "Street 1",
         "postcode": "1234 AB",
         "city": "City 1"
     }, {
         "addressid": "4",
         "street": "Street 7",
         "postcode": "98799",
         "city": "City 3"
     }]
   }
  }
}]

O formato é uma matriz de "objetos" de pessoa (tecnicamente, objetos com uma única chave de pessoa cada um) que, por sua vez, contêm algumas propriedades de pessoas e objetos de endereço. O script do U-SQL na Figura 10 extrai uma linha por combinação de pessoa/endereço.

Figura 10: script do U-SQL processando o documento JSON de exemplo da Figura 9

DECLARE @input string = "/Samples/Blogs/MRys/JSON/complex.json";
REFERENCE ASSEMBLY JSONBlog.[Newtonsoft.Json];
REFERENCE ASSEMBLY JSONBlog.[Microsoft.Analytics.Samples.Formats];
USING Microsoft.Analytics.Samples.Formats.Json;
@json =
  EXTRACT personid int,
          name string,
          addresses string
  FROM @input
  USING new JsonExtractor("[*].person");
@person =
  SELECT personid,
         name,
         JsonFunctions.JsonTuple(
           addresses, "address")["address"] AS address_array
  FROM @json;
@addresses =
  SELECT personid,
         name,
         JsonFunctions.JsonTuple(address) AS address
  FROM @person
       CROSS APPLY
         EXPLODE (JsonFunctions.JsonTuple(address_array).Values)
           AS A(address);
@result =
  SELECT personid,
         name,
         address["addressid"]AS addressid,
         address["street"]AS street,
         address["postcode"]AS postcode,
         address["city"]AS city
  FROM @addresses;
OUTPUT @result
TO "/output/json/persons.csv"
USING Outputters.Csv();

Observe que o script passa a expressão JSONPath [*].person para o extrator, gerando assim uma linha para cada campo de pessoa na matriz de nível superior. O esquema EXTRACT está sendo usado pelo extrator para obter as propriedades do objeto resultante em colunas. Como o campo de endereços em si é um documento JSON aninhado, a primeira invocação da função JsonTuple cria um mapa que contém os objetos de endereço, que são mapeados para uma linha por endereço com a expressão CROSS APPLY EXPLODE. Finalmente, todas as propriedades de endereço são projetadas do tipo de dados de mapa para fornecer o conjunto de linhas, como mostrado na Figura 11.

Figura 11: o conjunto de linhas gerado pelo processamento do documento JSON da Figura 9

123456 Pessoa 1 2 Rua 2 1234 AB Cidade 1
123456 Pessoa 1 2 Rua 2 5678 CD Cidade 2
798 Pessoa 2 1 Rua 1 1234 AB Cidade 1
798 Pessoa 2 4 Rua 7 98799 Cidade 3

Em nosso repositório do GitHub em bit.ly/2dzceLv, você pode encontrar um projeto do Visual Studio do exemplo e outros cenários de processamento JSON, incluindo vários documentos JSON em um arquivo.

Processar dados de imagem

Neste exemplo, estou processando alguns dados não estruturados maiores: imagens. Em particular, quero processar imagens JPEG e extrair algumas das propriedades JPEG EXIF, bem como criar uma miniatura da imagem. Felizmente, o .NET fornece vários recursos de processamento de imagens na classe System.Drawing. Portanto, basta criar as funções de extensão do U-SQL e os operadores, delegando o processamento JPEG a essas classes.

Existem várias formas de se fazer isso. Uma tentativa inicial pode carregar todas as imagens como matrizes de bytes em um conjunto de linhas e, em seguida, aplicar funções individuais definidas pelo usuário para extrair cada uma das propriedades e criar a miniatura, como mostrado na Figura 12.

Figura 12: processamento de imagens no U-SQL carregando imagens em linhas

REFERENCE ASSEMBLY Images;
USING Images;
@image_data =
  EXTRACT image_data byte[]  // Max size of row is 4MB!
        , name string
        , format string
  FROM @"/Samples/Data/Images/{name}.{format}"
  USING new ImageExtractor();
// Use UDFs
@image_properties =
  SELECT ImageOps.getImageProperty(image_data, ImageProperties.copyright)
         AS image_copyright,
         ImageOps.getImageProperty(image_data, ImageProperties.equipment_make)
         AS image_equipment_make,
         ImageOps.getImageProperty(image_data, ImageProperties.equipment_model)
         AS image_equipment_model,
         ImageOps.getImageProperty(image_data, ImageProperties.description)
         AS image_description
  FROM @image_data
  WHERE format IN ("JPEG", "jpeg", "jpg", "JPG");

No entanto, essa abordagem tem algumas desvantagens:

  • As linhas do U-SQL podem ter no máximo 4 MB de tamanho, limitando a solução a imagens com 4 MB de tamanho (menos o tamanho das outras colunas).
  • Cada uma das invocações de função pode aumentar a pressão da memória e requer o fluxo da matriz de bytes pelo processamento do U-SQL.

Portanto, uma abordagem melhor é fazer a extração de propriedades e a criação de miniaturas diretamente no extrator personalizado. A Figura 13 mostra um script U-SQL revisado.

Figura 13: processamento de imagens no U-SQL extraindo os recursos com um extrator

REFERENCE ASSEMBLY Images;
@image_features =
  EXTRACT copyright string,
          equipment_make string,
          equipment_model string,
          description string,
          thumbnail byte[],
          name string,
          format string
  FROM @"/Samples/Data/Images/{name}.{format}"
  USING new Images.ImageFeatureExtractor(scaleWidth:500, scaleHeight:300);
@image_features =
  SELECT *
  FROM @image_features
  WHERE format IN ("JPEG", "jpeg", "jpg", "JPG");
OUTPUT @image_features
TO @"/output/images/image_features.csv"
USING Outputters.Csv();
@scaled_image =
  SELECT thumbnail
  FROM @image_features
  WHERE name == "GT4";
OUTPUT @scaled_image
TO "/output/images/GT4_thumbnail_2.jpg"
USING new Images.ImageOutputter();

Esse script extrai as propriedades e a miniatura das imagens especificadas pelo padrão de conjunto de arquivos (bit.ly/2ektTY6): /Samples/Data/Images/{name}.{format}. A instrução SELECT restringe então a extração para arquivos JPEG usando um predicado somente na coluna de formato que eliminará todos os arquivos não JPEG da extração (o otimizador aplicará somente o extractor aos arquivos que satisfizerem ao predicado na coluna de formato) . O extrator fornece a opção para especificar as dimensões da miniatura. O script coloca os recursos em um arquivo CSV e usa um gerador de saída de nível de byte simples para criar um arquivo de miniatura para uma das imagens reduzidas.

A Figura 14 mostra a implementação do extrator.

Figura 14: o extrator de recursos de imagem

using Microsoft.Analytics.Interfaces;
using Microsoft.Analytics.Types.Sql;
using System;
using System.Collections.Generic;
using System.Linq;
using System.IO;
using System.Drawing;
using System.Drawing.Imaging;
using System.Drawing.Drawing2D;
namespace Images
{
  public static class UpdatableRowExtensions
  {
    public static void SetColumnIfExists<T>(this IUpdatableRow source
                                           , string colName, T value)
    {
      var colIdx = source.Schema.IndexOf(colName);
      if (colIdx != -1)
      { source.Set<T>(colIdx, value); }
    }
  }
  [SqlUserDefinedExtractor(AtomicFileProcessing = true)]
  public class ImageFeatureExtractor : IExtractor
  {
    private int _scaleWidth, _scaleHeight;
    public ImageFeatureExtractor(int scaleWidth = 150, int scaleHeight = 150)
    { _scaleWidth = scaleWidth; _scaleHeight = scaleHeight; }
    public override IEnumerable<IRow> Extract(IUnstructuredReader input
                                             , IUpdatableRow output)
    {
      byte[] img = ImageOps.GetByteArrayforImage(input.BaseStream);
      using (StreamImage inImage = new StreamImage(img))
      {
        output.SetColumnIfExists("image", img);
        output.SetColumnIfExists("equipment_make",
          inImage.getStreamImageProperty(ImageProperties.equipment_make));
        output.SetColumnIfExists("equipment_model",
          inImage.getStreamImageProperty(ImageProperties.equipment_model));
        output.SetColumnIfExists("description",
          inImage.getStreamImageProperty(ImageProperties.description));
        output.SetColumnIfExists("copyright",
          inImage.getStreamImageProperty(ImageProperties.copyright));
        output.SetColumnIfExists("thumbnail",
          inImage.scaleStreamImageTo(this._scaleWidth, this._scaleHeight));
      }
      yield return output.AsReadOnly();
    }
  }
}

Novamente, o extrator precisa ver o arquivo inteiro e opera no entrada.BaseStream, mas, agora, cria apenas uma imagem na memória, diferentemente do script na Figura 12. O extrator também verifica cada uma das colunas solicitadas e processa apenas os dados para os nomes de coluna solicitados usando o método de extensão SetColumnIfExists.

Para obter mais detalhes, confira o projeto do Visual Studio em nosso site do GitHub em bit.ly/2dngXCE.

Processar dados espaciais

Neste exemplo, mostrarei como usar o assembly do tipo Espacial do SQL Server Microsoft.SqlServer.Types.dll no U-SQL. Em particular, quero usar as funções de biblioteca espacial nos scripts do U-SQL como funções definidas pelo usuário. Como no caso do extractor JSON discutido anteriormente, isso significa que você deseja registrar um assembly já existente no U-SQL, sem ter que escrever seu próprio assembly.

Primeiro, você precisa baixar e instalar o assembly do pacote de recursos do SQL Server 2016 (bit.ly/2dZTw1k). Selecione a versão de 64 bits do instalador (ENU\x64\SQLSysClrTypes.msi) para garantir que você tenha a versão de 64 bits das bibliotecas.

O instalador instala o assembly gerenciado Microsoft.Sql­­Server.Types.dll em C:\Program Files (x86)\Microsoft SQL Server\130\SDK\Assemblies e o assembly nativo SqlServerSpatial130.dll em \Windows\System32\. Em seguida, envie os assemblies ao Azure Data Lake Store (por exemplo, para uma pasta chamada /upload/asm/spatial). Como o instalador instalou a biblioteca nativa na pasta do sistema c:\Windows\System32, você tem que copiar SqlServerSpatial130.dll dessa pasta antes de carregá-lo ou verificar se a ferramenta que você usa não executa o Redirecionamento de Sistema de Arquivos (bit.ly/1TYm9YZ) em pastas do sistema. Por exemplo, se quiser carregá-lo com o Visual Studio ADL File Explorer atual, você terá que copiar o arquivo para outro diretório primeiro. Caso contrário (no momento da redação deste artigo), a versão de 32 bits será carregada (porque o Visual Studio é um aplicativo de 32 bits que faz o Redirecionamento do Sistema de Arquivos em sua janela de seleção de arquivos de upload do ADL) e, quando executar um script U-SQL que chama o assembly nativo, você obterá o seguinte erro (interno) em tempo de execução: "Exceção interna da expressão do usuário: Foi feita uma tentativa de carregar um programa com um formato incorreto. (Exceção de HRESULT: 0x8007000B)."

Depois de carregar os dois arquivos de assembly, registre-os em um banco de dados chamado SQLSpatial com este script:

DECLARE @ASSEMBLY_PATH string = "/upload/asm/spatial/";
DECLARE @SPATIAL_ASM string = @ASSEMBLY_PATH+"Microsoft.SqlServer.Types.dll";
DECLARE @SPATIAL_NATIVEDLL string = @ASSEMBLY_PATH+"SqlServerSpatial130.dll";
CREATE DATABASE IF NOT EXISTS SQLSpatial;
USE DATABASE SQLSpatial;
DROP ASSEMBLY IF EXISTS SqlSpatial;
CREATE ASSEMBLY SqlSpatial
FROM @SPATIAL_ASM
WITH ADDITIONAL_FILES =
  (
    @SPATIAL_NATIVEDLL
  );

Observe que, nesse caso, você só registra um assembly do U-SQL e inclui o assembly nativo como uma dependência forte para o assembly do U-SQL. Para usar os assemblies espaciais, basta fazer referência ao assembly do U-SQL, e o arquivo adicional será disponibilizado automaticamente para o assembly. A Figura 15 mostra um script de exemplo simples usando o assembly espacial.

Figura 15: uso dos recursos espaciais no U-SQL

REFERENCE SYSTEM ASSEMBLY [System.Xml];
REFERENCE ASSEMBLY SQLSpatial.SqlSpatial;
USING Geometry = Microsoft.SqlServer.Types.SqlGeometry;
USING Geography = Microsoft.SqlServer.Types.SqlGeography;
USING SqlChars = System.Data.SqlTypes.SqlChars;
@spatial =
    SELECT * FROM (VALUES
                   // The following expression is not using the native DDL
                   ( Geometry.Point(1.0,1.0,0).ToString()),   
                   // The following expression is using the native DDL
                   ( Geometry.STGeomFromText(
                     new SqlChars("LINESTRING (100 100, 20 180, 180 180)"),
                     0).ToString())
                  ) AS T(geom);
OUTPUT @spatial
TO "/output/spatial.csv"
USING Outputters.Csv();

A biblioteca Tipos SQL tem uma dependência no assembly System.Xml; portanto, você precisa fazer referência a ela. Além disso, alguns dos métodos estão usando os tipos System.Data.SqlTypes em vez dos tipos C# internos. Como System.Data já está incluído por padrão, você pode simplesmente fazer referência ao tipo de SQL necessário. O código na Figura 15 está disponível em nosso site do GitHub em bit.ly/2dMSBm9.

Conclusão: Algumas dicas e práticas recomendadas para UDOs

Embora tenha apenas abordado superficialmente os poderosos recursos de extensibilidade do U-SQL, este artigo mostrou como o mecanismo de extensibilidade do U-SQL permite reutilizar o código de domínio específico existente ao usar a estrutura de extensão do U-SQL para dimensionar o processamento no volume típico de Big Data.

Porém, uma ferramenta tão poderosa também pode ser mal utilizada com facilidade. Por isso, aqui estão algumas dicas e conselhos de práticas recomendadas.

Enquanto os formatos de dados personalizados costumem precisar de um extrator personalizado e, potencialmente, um gerador de saída, deve-se considerar muito cuidadosamente se o formato de dados pode ser extraído em paralelo (como os formatos do tipo CSV) ou se o processamento precisa ver todos os dados em uma única instância do operador. Além disso, tornar os operadores suficientemente genéricos para que o processamento só aconteça se uma coluna específica for solicitada também pode melhorar potencialmente o desempenho.

Ao considerar UDOs como processadores, redutores, combinadores e aplicadores, é altamente recomendável considerar primeiro uma solução pura do U-SQL que aproveite os operadores internos. Por exemplo, o script de redutor de intervalo discutido anteriormente poderia realmente ser escrito com uso inteligente de janelas e funções de classificação. Aqui estão algumas razões pelas quais ainda convém considerar UDOs:

  • A lógica precisa acessar dinamicamente o esquema de entrada ou saída do conjunto de linhas que está sendo processado. Por exemplo, crie um documento JSON para os dados na linha em que as colunas não sejam conhecidas antecipadamente.
  • Uma solução que usa várias funções definidas pelo usuário na expressão SELECT cria demasiada pressão de memória, e você pode escrever o código para usar a memória de forma mais eficiente em um UDO processador.
  • Você precisa de um agregador ordenado ou um agregador que produza mais de uma linha por grupo, e não é possível escrevê-los com funções de janela.

Ao usar UDOs, tenha sempre em mente as seguintes dicas:

  • Use a cláusula READ ONLY para permitir o envio de predicados por meio de UDOs.
  • Use a cláusula REQUIRED para permitir que a redução de coluna seja enviada por meio de UDOs.
  • Indique a Cardinalidade em sua expressão de consulta que usa um UDO, caso o otimizador de consulta escolha o plano errado.

Michael Rys é gerente de programas chefe na Microsoft. Ele trabalha com processamento de dados e linguagens de consulta desde os anos 80. Ele representou a Microsoft nos comitês de design XQuery e SQL e expandiu o SQL Server para além do uso relacional, com XML, Pesquisa Semântica e Geoespacial. Atualmente, ele trabalha em linguagens de consulta de Big Data como SCOPE e U-SQL, quando não está desfrutando da companhia de sua família em atividades subaquáticas ou de autocross. Siga-o no Twitter: @MikeDoesBigData.

Agradecemos aos seguintes especialistas técnicos da Microsoft pela revisão deste artigo: Clemens Szyperski, Ed Triou, Saveen Reddy e Michael Kadaner
Clemens Szyperski é gerente de engenharia de grupo chefe na Microsoft. Há décadas, sua paixão são as linguagens especializadas, as ferramentas e as abordagens que facilitam a criação de sistemas de software complexos. Atualmente, ele lidera as equipes do Azure Data Lake U-SQL e Scope, quando não está velejando com sua família. Siga-o no Twitter: @ClemensSzy.

Ed Triou é líder de desenvolvimento chefe na Microsoft. Nos últimos 20 anos, ele tem se concentrado em programabilidade de dados (ODBC, OLEDB, ADO.NET, JDBC, PHP e EDM), com especialização em compiladores e linguagens de consulta (IDL, TSQl, LINQ para SQL/Entidades, eSQL, SCOPE e U-SQL).  Atualmente, ele lidera as equipes de compilador e linguagens do U-SQL, tentando ficar um passo à frente de nossas empresas externas e internas que dependem diariamente de ADL e Cosmos na escala de exabytes.

Saveen Reddy é gerente de programas chefe na Microsoft, concentrando-se em projetar e criar a Plataforma Azure Data Lake: os componentes e experiências que dão suporte a todos os serviços de nuvem de Big Data da Microsoft. Saveen detém uma classificação de 100% de conclusão para Metal Gear Solid V: The Phantom Pain. Siga-o no Twitter: @saveenr

Michael Kadaner é engenheiro de software chefe na Microsoft. Apesar de décadas de experiência em várias áreas de ciência da computação e desenvolvimento de software, ele afirma que escrever programas é uma arte precisa, e o software pode ser livre de bugs. Sua verdadeira paixão é resolver problemas complexos de algoritmos e engenharia e implementar as soluções em código conciso e elegante que é correto por design. Ele divide seu tempo livre entre a leitura e projetos do tipo "faça você mesmo".