Considerations for High Volume ETL Using SQL Server Integration Services
by Pat Martin, Senior SQL Server Premier Field Engineer, Microsoft New Zealand.
Providing for high volume extract, transformation, and load (ETL) throughput requires consideration of both best practices for standard relational databases, and of high performance techniques for ETL.
This article presents a number of suggestions relating to high volume ETL throughput, specifically using SQL Server Integration Services. Aspects of both ETL performance and management are described.
Note. This article is based on SQL Server 2005 Integration Services. However its suggestions are also applicable to SQL Server 2008 Integration Services.
Many of these suggestions are taken from best practices documented as part of the Microsoft Project REAL reference implementation, and from Microsoft’s Business Intelligence Architecture and Design Guide.
High volume ETL throughput and management require specific considerations and implementation techniques. SQL Server Integration Services provides a number of specific benefits for meeting the requirements of high-volume ETL. Often, technology choices have to be made based on available infrastructure and expertise. Many of these alternative options are covered below. This article focuses on data extraction and loading, transformation techniques are not discussed.
The main functions to be performed in an ETL data warehouse update process are:
- Load data to staging from holding tables with audits and row counts
- Check for consistency of dimension and fact data
- Check for changes to dimension members and handle these according to business rules
- Add new dimension members
- Create partitions for newly arriving fact data
- Add fact table entries
- Maintain audit history
- Log metadata
- Report errors
These functions are discussed below.
SQL Server Integration Services is a high throughput ETL system for managing high-performance data transfers. There are a large number of data transformation and cleansing capabilities available to the ETL designer as part of the Integration Services design ‘toolbox’.
The following diagram shows the Integration Services design used within the main processing flow for Project REAL, which is Microsoft’s Business Intelligence reference implementation. Project REAL is freely available on the web and includes all source code and numerous best practice whitepapers . I strongly recommend this guide as a pattern on which to base a best practice BI implementation.
Figure 1: Control flow - Project REAL Data Warehouse Load
Batch windows for ETL data processing are generally time-constrained, particularly when managing large volumes of data. Options for optimising transfers include using the SQL Server destination as the Integration Services destination adapter, and setting Fast Load properties on the transfer. These solutions are further discussed in the ETL performance section.
Audits and row counts are cheap to implement in Integration Services from a performance perspective. Audit rows can be written at the start and end of a transfer to provide simple record count based auditing. Row Count transformations have negligible impact on the data flow.
Consideration can be given to comparing rows written against a control count value supplied as part of the ETL transfer logic. Syntactically invalid rows will need to be written to a transfer table and sent back to the source provider for correction and resubmission. Each load should be associated with metadata describing the load transfer status, throughput metrics, and a unique Load ID for downstream reconciliation purposes.
A best practice recommendation from Project REAL is to disable foreign key constraints in the data mart and to rely on the ETL for managing consistency during the load process. This can significantly improve query and load times. However it moves the responsibility for data integrity to the ETL process. Again, whether the ETL process can be responsible for this will be determined in part by the time window available for processing.
All fact table records must refer to parent dimension records either in a standard way or as inferred or unknown dimension members. Orphan queries should be run as part of the ETL process to ensure that there are no orphan fact rows. Orphan fact rows are unacceptable in a dimensional model, as they directly impact the quality of data analysis.
Managing changes to dimensions
There are a number of scenarios to consider with respect to managing dimensions as part of an ETL process, such as inferred member management and dimension pruning. The focus here, however, is on managing changes to dimensions. High volume throughput for an ETL process requires careful consideration of how dimensional changes are to be applied.
There are a number of techniques which can be used to manage dimensional changes in the ETL process using Integration Services. The implementation choice depends upon the volume of data to be processed and the time window in which this processing needs to take place. Project REAL provides excellent examples and supporting detail as to the relative merits of each. The next section presents these choices in summary format. It would be prudent to model each approach in a test lab with a representative data load prior to making an implementation decision.
Slowly Changing Dimension Wizard
Integration Services contains a tool for managing Slowly Changing Dimensions called the Slowly Changing Dimension Wizard. The ETL developer supplies the names of the columns in the data flow which are used to signify new and / or changed records. This tool then automatically generates the ETL components required for generating new and changed records as part of the ETL data flow. When requirements change, the logic can be modified without breaking downstream components. This component can be used to manage the addition of new members, changes to attributes in existing members, and historical (type 2) changes, in addition to the handling of inferred members.
A consideration for the use of this tool is that it processes dimensional changes on a row by row basis. While this design performs adequately, it does have scalability limits. For Project REAL, where the Item dimension contained 6 million members with tens of thousands of changes per day (across 100 attributes), an alternative approach was taken. These alternatives are discussed below.
Integration Services provides a Lookup transformation task which can be used to look for a dimension record in a reference table. This task can be used to bring the entire dimension into memory so that all columns are available for change type comparisons. In this way both type 1 and type 2 changes can be handled. This approach was not taken in Project REAL due to the high number of dimension records processed. Several gigabytes of RAM would have been required, and loading these records into memory would take considerable time. However the SQL Server 2008 Integration Services Lookup Transformation has seen significant improvements. It may be that the Project REAL implementation would have used this component had it been available.
Merge Join Transformation
A left outer Merge Join followed by a Conditional Split transformation can be used to manage dimensional changes in the ETL data flow. New records (identified by a null surrogate key resulting from the join) are written to the dimension table. SCD type 1 and SCD type 2 changes are managed using the conditional split transformation by piping the output to appropriate downstream components. Set-based logic is then used to perform the actual updates. This high volume dimension processing mechanism is in fact the solution implemented by Project REAL, and is shown below.
Figure 2: Data flow - Project REAL Large Dimension Processing
Using best practices for high-volume fact loading
This section details some of the techniques and best practices associated with managing fact loading in high volume scenarios.
Loading Only Changed Rows: Delta Detection
A key consideration is to reduce the volume of facts that are loaded by an individual ETL process load. The most significant way to achieve this is to extract from the source system only those rows which are new or have changed since the previous ETL run. Determining the minimum amount of data required for loading is termed delta detection. Data should not be fully loaded to staging and then filtered - this is usually not practicable.
A number of options are available for delta detection. The most efficient approach is to capture a snapshot of changed records at the source which captures added and changed records directly from master and / or transactional changes. This process requires a modification date and time of changes; intermediate changes are not captured. The ETL process includes metadata for a last ETL run date. This date is used together with the last modification date in the source system in determining which source records are to be transferred.
Delta detection can also be implemented at the source by producing extract tables populated since the last ETL run date. This approach trades processor cycles at the staging server for processing cycles at the data source. This mechanism works well for updates and new rows, but not for deleted rows. Deletes need to be managed by using ‘soft deletion’, that is, deletion indicators. Rows from fact tables are not typically deleted, neither are they updated, this is effected using contra entries.
In addition to delta detection, loading of fact data should be ‘batched’ wherever possible. This can be achieved in two ways, either by reducing the extraction interval (that is, running the data extraction more frequently) or by logically partitioning the rows to be extracted. Differentiating data logically - for example, by country or customer location - would allow for extractions on different schedules, thereby reducing the amount of data transferred per ETL fact load execution. This reduces concurrent network load and resource contention at the staging database. Data extractions can proceed in parallel for logically partitioned source data, subject to infrastructure capabilities.
Other Considerations for Fact Loading
A further consideration introduced by high volume load requirements is the need to include a data staging component which allows for restarting ETL processes without going back to the source system.
Other considerations include:
- The time required to calculate measures. This should be done at source where possible for large fact loads;.
- The need to reprocess aggregation tables when data changes and the need to keep the data warehouse physically optimised. This optimisation includes ensuring that index fragmentation and statistics are managed appropriately.
A key optimisation technique for fact loading is to use partition switching. This minimises data warehouse downtime during fact loading and is covered in detail in the Project Real Partitioning whitepaper.
This section summarises a number of best practices used by Project REAL or suggested in the Patterns and Practices Business Intelligence reference documentation. These suggestions are categorised according to subject area.
For the Item dimension as described in the previous section, Project REAL uses a staging table to load the business keys from the source system. Then it joins to these keys rather than using the Integration Services Lookup transformation. This approach avoids the length of time and high RAM usage that would be required for the Lookup transformation to load the entire dimension into memory. Instead this narrow table is populated during fact staging, also by using a data flow that contains a single source and destination, extracting only the business keys from the transactional source. 4.4 Million business keys were staged in 55 seconds using this approach.
These staged keys are then used to filter the query used by the Lookup transformation when loading its cache. The business keys in the dimension are already indexed, so joining these staged keys to the dimension table is a high-performance mechanism for cache loading.
In some cases a Merge Join transformation is used to compare source and destination data. This is particularly favoured over use of the Lookup transformation when there are many (perhaps dozens) of columns to be compared, given the Lookup component’s memory constraints.
A significant further performance benefit is obtained by the fact that the staging table is in the same database as the data warehouse. This removes the performance overhead involved in cross-database joins. A separate schema, ETL, is used for staging objects.
Important. Targeted staging is to be preferred in high volume ETL throughput scenarios over joining directly to the source data or using joins across databases. The staging tables should be in the same database as the data warehouse under a different schema.
Limiting Asynchronous Data Transformations
Data transformation components in Integration Services are either asynchronous or synchronous, meaning that they either block the data flow and wait for all rows to arrive before they operate, or they operate on a row by row basis. Two key asynchronous transformations are the Sort transformation and the Aggregate transformation. Both of these operations require all rows to be present. For example, to effect a sort, the process needs all the rows to be sorted. While this is acceptable for small data sets, blocking the data flow for large transformations would place back pressure upstream on the Integration Services pipeline. This back pressure can filter back to the source connections, thereby slowing down the extraction process.
Important. For High Volume ETL scenarios, blocking transformations should be avoided wherever possible.
For the Merge Join scenarios in Project REAL, the data used in Merge Joins is pre-sorted in the back end by stored procedures and / or select queries which deliver the data. Given that joins are on business keys which are indexed (according to best practices), the execution plan for the sort at the relational engine is optimised. The properties of the Merge Join input stream are set to indicate that the sort keys are preset, allowing the merge join to bypass the sort. For implementation specifics refer to the Project REAL reference implementation.
Important. For High Volume ETL scenarios, prefer stored procedures to inline selects, and ensure that join keys are sufficiently indexed. Pre-sort data passed to Merge Join transformations.
Handling Common Scenarios before Exceptions
Always consider the ordering of outputs within, for example, a Conditional Split transformation. The first output stream should be used to handle the common cases that apply to the most rows in the data flow. Specify the order in which the conditions are evaluated. Order is significant, because a row is sent to the output corresponding to the first condition that evaluates to true. Handle the exceptions after the common cases.
Another variant of this technique is to split data wherever the majority of the data can be handled by a high speed process like the cached Lookup or the SQL Server destination. An example would be to use a Multicast to two downstream outputs, one which employed high speed techniques, the other less optimised techniques. Then rejoin the data flows (by using a Union All transformation) subsequent to the downstream processes.
Important. For High Volume ETL scenarios, order Conditional Split outputs according to probability of the match expression, with the highest probability being the first output and the lowest the last, or default output.
Favoring Batch Updates over the OLE DB Command Transformation
Large volume updates to fact tables present serious issues for high-throughput ETL processes. Some systems avoid fact table updates completely by creating change records in the fact table. This approach, while simplifying the ETL, can complicate downstream analytical reporting interfaces, as they have to take these compensating fact rows into account.
For large update processes in Integration Services, there are two approaches which can be taken:
- Use the OLE DB Command transformation with a parameterised query.
- Load the changed records to a staging table and perform a set-based update in the relational database with a stored procedure.
The first approach executes a SQL statement (which can be a procedural call) using the data from the current row in the data flow. This operation executes on a row by row basis, and therefore may be impractical when there are millions of rows passing through the ETL process.
The second approach, that taken by Project REAL, is to stage the data and then use the relational engine to perform the update by joining the staged table with the destination table in an update statement. There are resource costs associated with using a staging environment ,and the impact on the data warehouse has to be considered during the update process. However, the ETL load process should not be running concurrently with data warehouse usage, to protect against resource contention such as table, page and row locks.
By staging the data required for the update (, the destination transformation is optimised as a set based operation that performs significantly better than that based on a row by row approach. Project REAL also uses this approach for adding new rows.) Finally, by carrying out these updates (and insertions) in back end stored procedures, considerable attention can be given to the execution plan generated for these relational operations.
Important. For High Volume ETL scenarios, prefer the use of batched stored procedural updates over use of the OLE DB Command transformation. Always place these set operations in back end stored procedures
Specific Considerations for Integration Services
There are a number of performance enhancement techniques specific to Integration Services which can be adopted in the Data Flow task to improve ETL throughput. This section describes and summarises the suggestions made by Kirk Haselden (who was the development manager for SQL Server 2005 Integration Services) in his book “SQL Server 2005 Integration Services”. Kirk states that, for data flows managing less than a gigabyte, the Data Flow task will perform adequately out of the box without recourse to performance optimisations.
Eliminating Unnecessary Work
This is the most fundamental suggestion - don’t do anything in the data flow that you don’t need to do. This sounds obvious, however most performance issues that I have seen in Integration Services relate to the processing of unnecessary columns in the data flow.
Important. Removing unneeded columns makes the Integration Services buffer rows smaller, which increases the number of rows which can fit into a single buffer. This allows rows to be processed more efficiently
Using a SQL Statement to Retrieve Data from a View
Avoid using the table or view access mode in the OLE DB Source, as this opens a rowset both to retrieve column metadata and to retrieve rows. A select statement can be faster by an order of magnitude.
Important. Use select statements to retrieve source data as these can be optimised at the server with respect to the generated execution plan. Optimise all queries at the source. Integration Services does not optimise SQL queries
Optimising the Data Flow
There are a number of specific data flow optimisations which can be considered:
- Use set based operations at the server.
- Monitor memory intensive transformations by using the performance counters provided by Integration Services. Specifically the Buffers Spooled counter indicates when buffers are being swapped to disk, which is to be avoided whenever possible.
- Reduce the columns covered by the Lookup transformation in fact processing to the minimal possible set, specifically the natural key and the surrogate key. Project REAL takes this approach.
- Use the Merge Join transformation in preference to the Lookup transformation when possible.
- Increase the values of the DefaultBufferMaxSize and DefaultBufferMaxRows properties of Data Flow tasks. This serves to reduce the number of buffers moving through the data flow. Be careful however, that this does not lead to a situation where the data flow engine is swapping buffers to disk, as this would be counter-productive.
- Implement parallel execution. Experiment with the MaxEngineThreads and MaxConcurrentExecutables properties of packages to optimise throughput in SMP architectures. Test throughput and reset values accordingly.
Increasing the Performance of Inserts
Using the SQL Server destination can significantly improve performance, sometimes by up to 25%. Note that to use this component the Integration Services package must execute on the same physical server as the target SQL Server. This is because the connection between the package and SQL Server is an in memory loopback connection.
Important. Use the SQL Server destination when Integration Services is running on the same physical server as the target destination SQL Server.
Use Fast Load options in the OLE DB destination wherever possible as this explicitly sets the batch commit size. A default commit size of zero attempts to commit all rows in a single batch. Fast Load can also turn on table locking and be used to disable constraints.
Important. Use Fast load options at in the OLE DB destination, as this significantly improves ETL throughput.
The following suggestions relate to settings in the back end relational engine rather than in the Integration Services pipeline. Both areas need to be considered as part of ETL optimisation.
Index Statistics must be kept up to date. Indexes are used to optimise query plans used by SQL Server to execute queries. Back end staging queries, particularly those involving joins between source transaction and reference data, must perform well. Keeping statistics up to date is very important here. Equally important is the need to maintain indexes in order to reduce fragmentation. It is also necessary to ensure that optimal indexes are available to support the SQL queries used during ETL execution. My suggestion is to create a batch (or multiple batches) containing all the SQL statements that are used as part of the ETL process and subject this batch to query tuning and performance optimisation.
Important. Optimise all SQL queries used in the ETL load by following standard SQL optimisation best practices.
Rerun statistics updates after the ETL load completes. Consider reorganising indexes after loading significant data volumes.
Avoid cross-database joins as these slow down ETL extracts considerably. This can be achieved by bringing new data into staging tables within the data warehouse itself. SQL Server schemas can be used to separate staging data from data warehouse data.
Transaction Isolation can be reduced in data warehouse settings. This is because data is only updated during the ETL load process. Therefore reducing transaction isolation will not compromise data quality for users by introducing dirty reads.
Important. Set Transaction Isolation Level Read Uncommitted during normal data warehouse usage. Reset it to the SQL Server default of Read Committed during ETL loads.
Project REAL carries out a column by column comparison of type 2 changes for the Item dimension within the Conditional Split transformation used to distinguish inserts from updates. Alternatives are to compute and compare hash values (using Transact-SQL binary checksums) across dimension rows. This approach should be tested and benchmarked for performance.
For huge fact tables, enforce referential constraints in load procedures rather than declaratively. One approach to consider is to drop referential constraints between fact and dimension tables during the loading process, and then to run orphan query checks after the load process is complete. This topic is the subject of much debate in the relational database community.
Use partitioning for large data loads to optimise both the load time and the query time.
In addition to performance considerations, specific areas of ETL management have to be considered for high volume data transfers. This section details a number of these factors.
Some ETL processes avoid staging and load directly to data warehouse structures. This is inadvisable, particularly in the case of large data extracts. Large extracts run for long periods of time. These processes require restart points so that, if errors occur, then the process can be restarted from a save point rather than from the beginning of the extract. There are two ways to address this requirement:
- You can use a staging database to store intermediate load results for restarts.
- Second, Integration Services provides a checkpoint feature which facilitates restarting an ETL data flow from a checkpointed state. Integration Services checkpoints should be included in the ETL design
Important. Integration Services checkpoints only relate to the control flow and not the data flow. Therefore, when designing a restartable process in Integration Services, many individual data flow tasks should be added to the control flow. Each call out to a data flow task can act as a potential restart point
Another reason to stage data is if there is a time lag between the extraction and the loading process. Extracted data has to be stored before it can be loaded in this situation.
Having a staging area decouples the extraction processes from operational processes. This serves to minimise the load placed on the source systems by the extraction processes. Additionally, using staging areas allows for multiple source systems to deliver to staging which can act as a consolidation point prior to processing the transformations.
This approach follows the architectural best practice of maintaining loose coupling between systems. A mechanism will have to be developed for identifying the sequence of holding tables to be used and a method for indentifying new, updated and deleted data.
The use of a staging area allows for data maintenance and exploration should any degree of data auditing be required. Data auditing may be required either for reconciliation or for problem resolution purposes.
A physical staging design will depend on data volumes and extract schedules. A ‘chunked accumulated’ staging approach is often adopted. This allows for multiple extracts to run at different times of the working day with the actual load process being done once at the end of the day. This chunking might be based on logically separate information such as time zones or country codes.
For the reasons described in the previous section, having the staging tables in the same database as the data warehouse (under a different schema) can be hugely beneficial for the performance of the ETL load. However in this case the extract runs would share data warehouse SQL Server usage with standard warehouse users if they take place during the working day. The impact of these extracts on the data warehouse server would therefore require careful monitoring.
Consideration should also be given to archiving the staging structures according to business requirements.
Data extraction mechanisms are either push or pull in nature.
- In a push model, data is pushed by the source system to the staging structures at an interval determined by the source system.
- In a pull model, data is pulled from the source system to the staging structures at an interval determined by the staging system.
The push approach is commonly used for text files which are pushed to ETL systems as they become available, often by FTP from a mainframe source.
A pull model is more commonly used where SQL statements can be issued to retrieve the source system information. A pull method is the most common type of data extraction process, as this minimises changes and resource usage at the source server. When using a pull method, the extraction process is managed by the staging server which handles scheduling, configuration and metadata management for the extract.
Auditing and logging need to be considered and managed during the ETL process. For high data volume loads, it will be necessary to measure throughput at a detailed level in order to assess the performance impact of any system changes. Counting the number of rows passed through various stages of the ETL flow provides for rudimentary checks against expected counts. Errors will need to be logged to the Windows Event Log. This is particularly important for error management as Windows-based management tools such as Microsoft Operations Manager are driven by Event Log entries.
In Integration Services packages, events can be logged to text files in addition to SQL Server. Error handling should be centralised by using the package level OnError event handler provided by Integration Services. SQL Server provides the best target for logging events as SQL tables provide for a simple, yet powerful, query mechanism. Again the impact of logging will have to be carefully considered. It may prove beneficial to write log entries to a separate logging server, subject to testing and measurement.
Project REAL implements an auditing mechanism which allows for drill down reporting against captured ETL metadata. This includes job execution times and states, row counts etc. For implementation details and for the reports themselves refer to Project REAL.
A further auditing function provided by Project REAL is support for data lineage. A batch identifier is added to the data flow immediately after the data extract by using a Derived Column transformation . This metadata is therefore available to all downstream transformations for update, insert and tracking purposes. The batch identifier groups auditing metadata together for reporting purposes and is also used within the data warehouse to identify the record source. Every dimension and fact record originates from a specific data load and this batch identifier identifies that load. This is useful for data lineage and validation purposes as well as any manual corrections that may be needed in case of data corruption. For example, it may be necessary to reverse out a data load from the data warehouse. This data lineage provides for that capability.
All packages should be dynamically and centrally configured by using a SQL Server database as a configuration source. There should never be any hard-coded server connection strings or business rules within an ETL package. Microsoft strongly suggests not to hard-code package details, as this makes them difficult to migrate from one server to another – for example, from a development server to a test server. Integration Services best practices for configuration are to place configuration data in SQL Server and to include an indirect reference to that server’s location either through the use of an XML configuration file or an Environment Variable setting. Again, refer to Project REAL for best practice implementation details.
In addition to the performance considerations relating to loading data in the previous performance section loads, must be verified for correctness and any errors handled appropriately. Test scripts will have to be developed for ETL processes and are of two types - data count checks and data validation checks.
- Data count checks simply cross check that the number of rows loaded for each dimension and fact table match the number of rows delivered to staging from the source system. This row count checking may need to take into account the mechanisms for denoting new and changed rows as this may be done in a logical rather than a physical manner.
- Data validation checks codes and descriptions against reference tables together with any required reconciliation checks.
Records may fail to load successfully for a number of reasons including:
- Schema changes in the data warehouse.
- Data Type mismatches.
- Measure Limit exceptions.
- Surrogate Key Limit exceptions.
- Foreign Key Constraint violations.
- Unexpected Data.
- Null data in non nullable fields.
- Server Unavailable.
- Logic errors, and so forth.
The key point here is that errors will occur. Failed records should be captured and reported together with appropriate error messages, allowing for immediate error resolution and future error avoidance.
As stated in the introduction, providing for high-volume ETL throughput requires consideration of both standard relational database best practices and ETL high performance techniques.
By optimising SQL Server data access and adopting Integration Services best practices, ETL throughput can be significantly enhanced. By following best practices for measuring ETL throughput, the effect of change can be carefully documented. By following best practices for ETL management, such as careful scheduling and ETL package configuration, Integration Services deployments can be both well supported and effective in maintaining large volume data warehouses on SQL Server.
About the author**. Pat Martin has spent the last six years as an MCS Consultant with Microsoft in New Zealand, specialising in SQL Server and Business Intelligence related technologies. Prior to that he worked as a C and C++ developer on multiple hardware platforms. Pat has a small farm in New Zealand and of course, sheep.