Volume 31 Number 1
Making Big Data Batch Analytics Easier Using U-SQL
By Michael Rys | January 2016
The new Microsoft Azure Data Lake services for analytics in the cloud (bit.ly/1VcCkaH) includes a hyper-scale repository; a new analytics service built on YARN (bit.ly/1iS8xvP) that lets data developers and data scientists analyze all data; and HDInsight (bit.ly/1KFywqg), a fully managed Hadoop, Spark, Storm and HBase service. Azure Data Lake Analytics also includes U-SQL, a language that unifies the benefits of SQL with the expressive power of your own code. The U-SQL scalable distributed query capability enables you to efficiently analyze data in the store and across relational stores such as Azure SQL Database. In this article, I’ll outline the motivation for U-SQL, some of the inspiration and design philosophy behind the language and give a few examples of the major aspects of the language.
If you analyze the characteristics of Big Data analytics, several requirements arise naturally for an easy-to-use, yet powerful language:
- Process any type of data. From analyzing BotNet attack patterns from security logs to extracting features from images and videos for machine learning, the language needs to enable you to work on any data.
- Use custom code easily to express complex, often proprietary business algorithms. The example scenarios in the first bullet point might all require custom processing that’s often not easily expressed in standard query languages, ranging from user-defined functions, to custom input and output formats.
- Scale efficiently to any size of data without focusing on scale-out topologies, plumbing code or limitations of a specific distributed infrastructure.
How Do Existing Big Data Languages Stack Up to These Requirements?
SQL-based languages such as Hive (hive.apache.org) provide a declarative approach that natively does scaling, parallel execution and optimizations. This makes them easy to use, familiar to a wide range of developers, and powerful for many standard types of analytics and warehousing. However, their extensibility model and support for non-structured data and files are often bolted on and more difficult to use. For example, even if you just want to quickly explore data in a file or remote data source, you need to create catalog objects to schematize file data or remote sources before you can query them, which reduces agility. And although SQL-based languages often have several extensibility points for custom formatters, user-defined functions, and aggregators, they’re rather complex to build, integrate and maintain, with varying degrees of consistency in the programming models.
Programming language-based approaches to process Big Data, for their part, provide an easy way to add your custom code. However, a programmer often has to explicitly code for scale and performance, often down to managing the execution topology and workflow such as the synchronization between the different execution stages or the scale-out architecture. This code can be difficult to write correctly and even harder to optimize for performance. Some frameworks support declarative components such as language-integrated queries or support for embedded SQL. However, this SQL may be integrated as literal strings lacking tool support. Therefore, the extensibility integration might be limited or—due to the procedural code that doesn’t guard against side effects—hard to optimize and difficult to reuse.
Satisfying the Requirements with U-SQL
Taking the issues of both SQL-based and procedural languages into account, U-SQL was designed from the ground up as an evolution of the declarative SQL language with native extensibility through user code written in C#. U-SQL unifies the following:
- The declarative and imperative coding paradigms and experiences
- The experience around extending your language capabilities with custom user code
- Processing over all data, whether unstructured or structured
- Data processing in the Azure Data Lake and other Azure data sources using federated queries
U-SQL is built on the learnings from the Microsoft internal experience with a declarative and extensible scripting language (bit.ly/1OGUNIY) called Structured Computations Optimized for Parallel Execution, or SCOPE, and existing languages such as T-SQL, ANSI SQL and Hive. For example, we base our SQL and programming language integration and the execution and optimization framework for U-SQL on SCOPE, which currently runs hundreds of thousands of jobs each day internally. We also align the metadata system (databases, tables and so on), the SQL syntax and language semantics with T-SQL and ANSI SQL, the query languages with which most of our SQL Server customers are familiar. And we use C# data types and the C# expression language so you can seamlessly write C# predicates and expressions inside SELECT statements and use C# to add your custom logic. Finally, we looked to Hive and other Big Data languages to identify patterns and data processing requirements and integrate them into our framework.
In short, basing U-SQL language on these existing languages and experiences should represent a true evolution in the Big Data query language space and make it easy for you to get started, yet powerful enough for the most difficult problems.
Show Me U-SQL
Let’s assume that I’ve downloaded my Twitter history of all my tweets, retweets and mentions as a CSV file and placed it into my Azure Data Lake store. I can use Azure Data Lake Tools for Visual Studio to preview the file and understand the structure. I can do this using the Server Explorer to find my Data Lake Storage accounts and open the explorer on my default storage account. Figure 1 shows a data stream (in this case my CSV file). In U-SQL I use a late binding of format to my data streams, so this preview just shows columns based on a delimiter, but doesn’t specify type.
Figure 1 Data Stream Preview in Visual Studio
In this case, I know the schema of the data I want to process and for starters I want to just count the number of tweets for each of the authors in the tweet “network.” The U-SQL script in Figure 2 shows the three major steps of processing data with U-SQL:
- Extract data from your sources into rowsets. Note that you just schematize it in your query with the EXTRACT statement. The datatypes are based on C# datatypes and the script uses the built-in Extractors library to read and schematize the CSV file.
- Transform the rowset using U-SQL or custom-defined opertors in a series of rowset manipulations, each often building on one or more past rowsets. In the example in Figure 2, it’s a familiar SQL expression that does a GROUP BY aggregation on the rowset generated by the EXTRACT expression.
- Output the resulting rowsets either into files or into U-SQL tables to store it for further processing.
Figure 2 U-SQL Script for Extracting Tweets from CSV
1 @t = EXTRACT date string 2 , time string 3 , author string 4 , tweet string 5 FROM "/input/MyTwitterHistory.csv" 6 USING Extractors.Csv(); 7 8 @res = SELECT author 9 , COUNT(*) AS tweetcount 10 FROM @t 11 GROUP BY author; 12 13 OUTPUT @res TO "/output/MyTwitterAnalysis.csv" 14 ORDER BY tweetcount DESC 15 USING Outputters.Csv();
Note that SQL keywords in U-SQL have to be uppercase to provide syntactic differentiation from syntactic C# expressions with the same keywords but different meaning. A common error in early scripts is to use “as” instead of “AS” for assigning an alias to a column in my SELECT statement. The correct form is the uppercase AS, which is the SQL expression syntax. Of course, the error will be caught at compile time.
Also notice that each of the expressions is assigned to a variable (@t and @res). This lets U-SQL incrementally transform and combine data step-by-step expressed as an expression flow using functional lambda composition (similar to what you find in the Pig [pig.apache.org] language). The execution framework, then, composes the expressions together into a single expression. That single expression can then be globally optimized and scaled out in a way that isn’t possible if expressions are being executed line by line. Figure 3 shows the execution graph generated for one of the later queries in this article. The graph represents the globally optimized execution stages arrived at by the compiler and optimizer.
Figure 3 Job Execution Graph in Visual Studio
C# Integration in U-SQL
Going back to my example, I now want to add additional information about the people mentioned in the tweets and extend my aggregation to return how often people in my tweet network are authoring tweets and how often they’re being mentioned. For this I’m taking a look at C# integration in U-SQL.
The core reliance of U-SQL on C# for its type system and scalar expression language provides the query writer access to the wealth of the C# and CLR libraries of classes, methods, functions, operators and types. All C# operators except for the assignment operators (=, += and so on) are valid in U-SQL scalar expressions. In particular, all comparison operators such as ==, !=, <, >, the ternary comparison cond ? true-expression : false-expression, the null coalesce operator ?? are supported. Even lambda expressions using => can be used inside U-SQL expressions.
This very tight integration and seamless programming experience is syntactically also enabled by U-SQL being integrated with the C# Roslyn compiler (bit.ly/1BsPced). In fact, the U-SQL integration is probably the most complex application of the C# Roslyn compiler platform.
There are several ways how C# code can be used to extend U-SQL expressions:
- Provide inline C# expressions in your U-SQL script: This often makes sense if a small set of C# methods needs to be applied to process one of the scalar values—a string type method or a math function.
- Write user-defined functions in a C# assembly and reference them in the U-SQL script: This is preferred for more complex functions, if the logic of the function requires the full power of C# beyond its expression language, such as procedural logic or recursion.
- Write user-defined aggregators in a C# assembly and reference them in the U-SQL script: By providing user-defined aggregators, custom aggregation logic can be plugged into U-SQL’s processing of aggregation with a GROUP BY clause.
- Write user-defined operators in a C# assembly and reference them in the U-SQL script: User-defined Operators (UDO) are U-SQL custom-coded rowset operators. They’re written in C# and provide the ability to generate, process and consume rowsets.
For the user-defined functions, aggregators and operators, the C# assembly will have to be loaded into the U-SQL metadata catalog with CREATE ASSEMBLY (U-SQL) and then referenced in the script with REFERENCE ASSEMBLY. The Azure Data Lake Tools for Visual Studio makes the registration process easy and even provides a so-called codebehind experience where you just write the code into a special C# file attached to a given script and on submission the tool takes care of all the plumbing.
Figure 4 shows an expanded query for the aggregation I just mentioned. I use a SELECT statement with an inline C# LINQ expression to extract the “mentions” from each tweet into an ARRAY in lines 8 to 10. @m (line 8) now contains a rowset of ARRAYs. I use the EXPLODE function in line 14 to turn each array into a rowset containing one row per array item. I do this EXPLODE as part of a CROSS APPLY, which means that it will be applied to each row of the rowset @m. The resulting new rowset (line 12) contains one “mention” per row. Note that I reuse the same name @m. I need to drop the leading @sign to align it with my existing author values. This is done with another C# expression where I take the Substring starting at position 1 in line 12. Finally, I union the authors with the mentions in lines 16 to 21, and extend my COUNT aggregation to group by a case-insensitive Twitter handle and the category (lines 23 to 27) before outputting the result ordered by descending tweet count in lines 29 to 31.
Figure 4 Tweet Manipulation Using C# Methods
1 @t = EXTRACT date string 2 , time string 3 , author string 4 , tweet string 5 FROM "/input/MyTwitterHistory.csv" 6 USING Extractors.Csv(); 7 8 @m = SELECT new SQL.ARRAY<string>( 9 tweet.Split(' ').Where(x => x.StartsWith("@"))) AS refs 10 FROM @t; 11 12 @m = SELECT r.Substring(1) AS r 13 , "referenced" AS category 14 FROM @m CROSS APPLY EXPLODE(refs) AS t(r); 15 16 @t = SELECT author, "authored" AS category 17 FROM @t 18 UNION ALL 19 SELECT * 20 FROM @m 21 WHERE r != null && r != ""; 22 23 @res = SELECT author.ToLowerInvariant() AS author 24 , category 25 , COUNT( * ) AS tweetcount 26 FROM @t 27 GROUP BY author.ToLowerInvariant(), category; 28 29 OUTPUT @res TO "/output/MyTwitterAnalysis.csv" 30 ORDER BY tweetcount DESC 31 USING Outputters.Csv();
Let’s look at some of these parts in more detail to see the power of closely integrating C# into U-SQL.
The bolded parts of the script in Figure 4 show some of the places where U-SQL expects and accepts C# expressions. As you can see, you can use the C# expressions in the EXTRACT and OUPUT USING clause, in the SELECT clause and WHERE clause, as well as in the GROUP BY clause, ORDER BY clause and EXPLODE function, although in this example I just refer to a column name in the two latter cases.
An important aspect of the integration is that the C# expressions have full access to the scalar values inside the query expression in a seamless way due to the fact that they’re typed with C# types. For example, the columns tweet in line 9, r in lines 12, 21, and author in lines 23 and 27 are all seamlessly integrated into the C# expression without the need for extra wrapper syntax.
The EXTRACT and OUPUT USING clauses in lines 6 and 31 take C# expressions resulting in a user-defined operator instance. The two built-in expressions are calls to factory methods that return an extractor and outputter instance, respectively.
Let’s look at the C# expression from lines 8 and 9 in a bit more detail:
new SQL.ARRAY<string>(tweet.Split(' ').Where(x => x.StartsWith("@")))
This is a great example of the use of C#: The U-SQL built-in type SQL.ARRAY<T> is actually a C# object type that provides the expected SQL/Hive capabilities without the side-effecting update functions of the existing C# Array type. You just use the new C# operator to create a new instance. The instance gets created by simply applying one of the many string operations on the column tweet that has been defined with the string type to break it into words. There’s no more wondering where you get a certain string-type functionality as in normal SQL dialects: You have the full power of the CLR at your fingertips.
It gets even better. The Split method returns an IEnumerable<string>. So any further processing, such as filtering to get the “mentions” from the tweet words can be done with a LINQ expression and using a lambda expression as the predicate. Now try that with your standard SQL language!
Let’s also look at the WHERE clause in line 21. Again, I can simply provide the C# expression referring to the columns in the rowset. The expression in this context has to result in a value of type bool. Now C# has two-valued logic and not three-valued logic the way SQL does it. Thus, the comparison r != null will return true if r is not null or false if it is null. By using the C# logical operator &&, I get the guarantee of the C# execution order being preserved and, more important, the short-cutting that will not execute the right comparison if the first will evaluate to false. U-SQL also supports the SQL-based AND and OR conjunctions that do not provide short-cutting, but let the predicates be reordered for better performance. All of this gives the developer the choice between more efficient execution and semantic safeguards.
U-SQL, Visual Studio Codebehind Capabilities and Assemblies
As a next step I can use the Azure Data Lake Tools for Visual Studio to refactor the C# code into C# functions using the tool’s codebehind functionality, as shown in Figure 5. When I then submit the script, it automatically deploys the code in the associated .cs file to the service on submission. In order to be able to refer to the methods and types and functions in U-SQL, the classes have to be defined as public and the objects need to be defined as static public.
Figure 5 Codebehind in Visual Studio
The tool follows these three steps:
- The .cs file is compiled into an assembly file.
- The user’s U-SQL Script gets augmented with a header that adds a CREATE ASSEMBLY statement that creates the assembly files binary content in your U-SQL metadata catalog.
- It adds a cleanup at the end of the script that removes the registered assembly with a DROP ASSEMBLY statement.
I can also deploy and register the code as an assembly in my U-SQL metadata catalog myself explicitly. This lets me and other people use the code in future scripts. It’s also the preferred way to manage your user-defined functions, aggregators and operators, if you have more complex code that you want to maintain separately, where you may want to include existing code that may have been written in other contexts (like your XML or JSON libraries) or even call out to external executables.
Similar to relational databases such as SQL Server, U-SQL provides a metadata catalog and supports the standard database objects such as databases, schemas, tables and so on. One of the objects is an assembly metadata object. By using the CREATE ASSEMBLY statement, you can register an assembly in the database. Assemblies are objects scoped to a database; the assembly DLL file gets placed into the assembly folder inside the relevant database folder inside the catalog folder in your primary Azure Data Lake storage account.
In addition to storing your assembly, you can specify additional files that will be stored together with your assembly and will be included when you reference the assemblies. The CREATE ASSEMBLY statement syntax grammar looks like this (see the U-SQL language reference documentation at bit.ly/1HWw0cc for more details):
Create_Assembly_Statement := 'CREATE' 'ASSEMBLY' ['IF' 'NOT' 'EXISTS'] Assembly_Name 'FROM' Assembly_Source ['WITH' 'ADDITIONAL_FILES' '=' '(' Assembly_Additional_File_List ')']. Assembly_Name := Quoted_or_Unquoted_Identifier. Assembly_Source := Static_String_Expression | lexical_binary_value.
Speaking of referencing an assembly, U-SQL has a small set of pre-loaded System assemblies and namespaces, including System and System.Linq. The set is kept small to keep the compilation time and the job’s resource utilization lower. If you want to refer to a different system assembly, you can just include them with the following statement that adds System.Xml:
REFERENCE SYSTEM ASSEMBLY [System.Xml];
Once the assembly containing the tweet analysis functions has been registered with the name TweetAnalysis, it can be referenced and used as in Figure 6. I need to do a bit more cleanup around the mentions besides just dropping the @ sign; the assembly also contains a cleanup_mentions function that does additional processing beyond dropping the @.
Figure 6 Assembly References in U-SQL
1 REFERENCE ASSEMBLY TweetAnalysis; 2 3 @t = EXTRACT date string 4 , time string 5 , author string 6 , tweet string 7 FROM "/input/MyTwitterHistory.csv" 8 USING Extractors.Csv(); 9 10 @m = SELECT Tweets.Udfs.get_mentions(tweet) AS refs 11 FROM @t; 12 13 @t = SELECT author, "authored" AS category 14 FROM @t 15 UNION ALL 16 SELECT Tweets.Udfs.cleanup_mentions(r) AS r, "mentioned" AS category 17 FROM @m CROSS APPLY EXPLODE(refs) AS Refs(r); 18 19 @res = SELECT author.ToLowerInvariant() AS author 20 , category 21 , COUNT(*) AS tweetcount 22 FROM @t 23 GROUP BY author.ToLowerInvariant(), category; 24 25 OUTPUT @res 26 TO "/output/MyTwitterAnalysis.csv" 27 ORDER BY tweetcount DESC 28 USING Outputters.Csv();
U-SQL Unifies Structured and Unstructured Data
As seen so far, U-SQL makes it very easy to schematize a file on read using the EXTRACT expression. However, once the data preparation has reached a stage where the schema is known, it makes sense to either wrap the EXTRACT into a view or a table-valued function that provides a multi-statement, parameterized view.
Figure 7 shows the new code. The CREATE FUNCTION statement in line 2 creates the U-SQL table-valued function Tweet_Authors_Mentions with the parameter @file that has a provided default value (line 4) and returns the @res rowset of the table type with the three columns author, category and tweetcount (lines 6 to 11). The parameter gets referenced in line 20 and the last assignment to the @res result in line 34 will be returned.
Figure 7 Parameterized Table-Valued Function
1 DROP FUNCTION IF EXISTS Tweet_Authors_Mentions; 2 CREATE FUNCTION Tweet_Authors_Mentions 3 ( 4 @file string = "/Samples/Data/MyTwitterHistory.csv" 5 ) 6 RETURNS @res TABLE 7 ( 8 author string 9 , category string 10 , tweetcount long? 11 ) 12 AS BEGIN 13 REFERENCE ASSEMBLY TweetAnalysis; 14 15 @t = 16 EXTRACT date string 17 , time string 18 , author string 19 , tweet string 20 FROM @file 21 USING Extractors.Csv(); 22 23 @m = 24 SELECT AzureConDemo.Udfs.get_ref(tweet) AS refs 25 FROM @t; 26 27 @t = 28 SELECT author, "authored" AS category 29 FROM @t 30 UNION ALL 31 SELECT AzureConDemo.Udfs.cleanup(r) AS r, "referenced" AS category 32 FROM @m CROSS APPLY EXPLODE(refs) AS t(r); 33 34 @res = 35 SELECT author.ToLowerInvariant() AS author 36 , category 37 , COUNT( * ) AS tweetcount 38 FROM @t 39 GROUP BY author.ToLowerInvariant(), category; 40 END;
Note that U-SQL table-valued functions are always inlined into the query script, so the U-SQL optimizer can reason and optimize across all the statements. The function can, for example, be called as follows:
1 OUTPUT Tweet_Authors_Mentions(DEFAULT) 2 TO "/Samples/Data/Output/MyTwitterAnalysis.csv" 3 ORDER BY tweetcount DESC 4 USING Outputters.Csv();
Often, though, the prepared data will be stored as structured data in a U-SQL table that provides additional storage optimizations such as a clustered index and the ability to partition the data. The statements here show how easy U-SQL makes it to create a table by using a CREATE TABLE AS query statement:
1 DROP TABLE IF EXISTS TweetAuthorsAndMentions; 2 CREATE TABLE TweetAuthorsAndMentions(INDEX idx 3 CLUSTERED(author ASC) 4 PARTITIONED BY HASH(author) INTO 5 5 ) 6 AS Tweet_Authors_Mentions(DEFAULT);
Line 3 specifies that the index of the table is clustered by the author column in ascending order and line 4 will horizontally partition the internal representation using a hash on the author values into 5 partitions. After this statement is run the output of the Tweet_Authors_Mentions function will be saved as a new table with these characteristics. U-SQL also supports round robin and range partitioning for horizontal partitioning schemes, as well as vertical partitioning that allows managing the partitions individually.
Now the table can be used by others to query the data and perform further analysis. The following query in Figure 8 uses the built-in U-SQL ranking functions together with a windowing expression to calculate the median count of tweets per author and category. It also calculates the relative and absolute ranking position for the author and categories that have more than 50 tweets.
Figure 8 Doing Analytics with U-SQL Windowing Expressions
1 @res = 2 SELECT DISTINCT 3 author, category, tweetcount 4 , PERCENTILE_DISC(0.5) WITHIN GROUP (ORDER BY tweetcount ASC) 5 OVER (PARTITION BY category) AS median_tweetcount_perhandle_category 6 , PERCENT_RANK() OVER 7 (PARTITION BY category ORDER BY tweetcount ASC) AS relative_rank 8 , ROW_NUMBER() OVER 9 (PARTITION BY category ORDER BY tweetcount DESC) AS absolute_rank 10 FROM TweetAuthorsAndMentions 11 WHERE tweetcount > 50; 12 13 OUTPUT @res 14 TO "/Output/Demo/tweeter_ranking.csv" 15 ORDER BY absolute_rank, category ASC 16 USING Outputters.Csv();
To better understand the SQL-based windowing expressions, let’s take a closer look at the expressions in lines 4 to 9. The OVER expression applies the two analytics functions PERCENTILE_DISC and PERCENT_RANK and the ranking function ROW_NUMBER on the left-hand side each to a partitioning of the rowset (the so-called windows) specified with the PARITION BY clause (in all three cases the same partitioning based on the category). The two functions that calculate ranks also need an ordering of data within each window to say where the value is placed. PERCENT_RANK calculates the relative rank of a row within a group of rows specified by the windowing expression. ROW_NUMBER calculates the position of the row within the group of rows. The PERCENTILE_DISC function computes a specific percentile for sorted values in the specified window based on a discrete distribution of the column values. PERCENTILE_DISC(0.5) will compute the 50th percentile (that is, the median) within each window as ordered by the tweet counts in ascending order.
This Is Why U-SQL!
I hope you got a glimpse at why U-SQL makes it easy to query and process Big Data and that you understand the thinking behind the language. The language offers many additional capabilities such as:
- Operates overset of files with patterns
- Uses vertically partitioned tables
- Federated Queries against Azure SQL DB, SQL Data Warehouse and SQL Server in Azure VMs
- Encapsulates your U-SQL code with Views and Procedures
- More SQL Windowing Functions
- Programs with C# User-defined Operators (custom extractors, processors)
- More Complex Types (MAP, ARRAY)
Please refer to the U-SQL reference documentation for details at bit.ly/1HWw0cc.
To summarize, U-SQL makes Big Data processing easy because it:
- Unifies declarative queries with the expressiveness of your user code
- Unifies querying structured and unstructured data
- Unifies local and remote queries
- Increases productivity and agility from day one
U-SQL is just one of the ways Microsoft is working to make Azure Data Lake services the most productive environment for authoring, debugging and optimizing analytics at any scale. With rich support for authoring and monitoring Hive jobs, a C#-based authoring model for building Storm (storm.apache.org) jobs for real-time streaming, and supporting every stage of the job lifecycle from development to operational, Azure Data Lake services lets you focus more on the questions you want to answer than spending time debugging distributed infrastructure. The goal is to make Big Data technology simpler and more accessible to the greatest number possible: Big Data professionals, engineers, data scientists, analysts and application developers.
Michael Rys is a principal program manager at Microsoft. He has been doing data processing and query languages since the 1980s. He has represented Microsoft on the XQuery and SQL design committees and has taken SQL Server beyond relational with XML, Geospatial and Semantic Search. Currently he’s working on Big Data query languages such as SCOPE and U-SQL when he’s not enjoying time with his family underwater or at autocross. Follow him on Twitter: @MikeDoesBigData.
Thanks to the following Microsoft technical experts for reviewing this article: Omid Afnan and Ed Triou