交易和大量複製作業
大量複製作業可以做為隔離作業或做為多步驟交易的一部分執行。 這第二個選項可供執行相同交易內的多項大量複製作業,以及執行其他插入、更新和刪除等資料庫作業,同時仍然能夠認可或回復整個交易。
根據預設,大量複製作業會作為隔離作業完成。 該複製作業會以非交易的方式進行,且沒有機會復原。 如果需要在發生錯誤時復原全部或部分大量複製,您可以:
使用 SqlBulkCopy 管理的交易
在現有交易中執行大量複製作業
在 System.TransactionsTransaction 中登錄。
執行非交易大量複製作業
對於非交易大量複製作業在作業過程遭遇錯誤時的狀況,下列主控台應用程式顯示此時會發生什麼事。
在範例中,來源資料表及目標資料表都包含名為 ProductID 的 Identity
資料行。 程式碼會先準備目的地資料表,方法是刪除所有資料列,然後插入單一資料列,且其 ProductID 已知存在於來源資料表中。 根據預設,在目的地資料表中,會針對每個新增的資料列產生 Identity
資料行的新值。 在此範例中,當此連接開啟時,且其強制大量載入處理序改為使用來源資料表中的 Identity
值時,就會設定某個選項。
此大量複製作業執行時的 BatchSize 屬性設定為 10。 當此作業遇到無效的資料列時,就會擲回例外狀況。 在第一個範例中,大量複製作業會為非交易作業。 所有出錯之前的複製批次都會被認可。 包含重複索引鍵的批次會回復,而大量複製作業會暫停,直到處理完任何剩餘的批次。
注意
除非您已如大量複製範例設定中所述建立工作資料表,否則此範例不會執行。 這個程式碼僅是為了示範使用 SqlBulkCopy 的語法而提供。 如果來源和目的地資料表位於相同的 SQL Server 執行個體,則使用 Transact-SQL INSERT … SELECT
陳述式來複製資料會更方便且快速。
using Microsoft.Data.SqlClient;
class Program
{
static void Main()
{
string connectionString = GetConnectionString();
// Open a sourceConnection to the AdventureWorks database.
using (SqlConnection sourceConnection =
new SqlConnection(connectionString))
{
sourceConnection.Open();
// Delete all from the destination table.
SqlCommand commandDelete = new SqlCommand();
commandDelete.Connection = sourceConnection;
commandDelete.CommandText =
"DELETE FROM dbo.BulkCopyDemoMatchingColumns";
commandDelete.ExecuteNonQuery();
// Add a single row that will result in duplicate key
// when all rows from source are bulk copied.
// Note that this technique will only be successful in
// illustrating the point if a row with ProductID = 446
// exists in the AdventureWorks Production.Products table.
// If you have made changes to the data in this table, change
// the SQL statement in the code to add a ProductID that
// does exist in your version of the Production.Products
// table. Choose any ProductID in the middle of the table
// (not first or last row) to best illustrate the result.
SqlCommand commandInsert = new SqlCommand();
commandInsert.Connection = sourceConnection;
commandInsert.CommandText =
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" +
"INSERT INTO " + "dbo.BulkCopyDemoMatchingColumns " +
"([ProductID], [Name] ,[ProductNumber]) " +
"VALUES(446, 'Lock Nut 23','LN-3416');" +
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF";
commandInsert.ExecuteNonQuery();
// Perform an initial count on the destination table.
SqlCommand commandRowCount = new SqlCommand(
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;",
sourceConnection);
long countStart = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Starting row count = {0}", countStart);
// Get data from the source table as a SqlDataReader.
SqlCommand commandSourceData = new SqlCommand(
"SELECT ProductID, Name, ProductNumber " +
"FROM Production.Product;", sourceConnection);
SqlDataReader reader = commandSourceData.ExecuteReader();
// Set up the bulk copy object using the KeepIdentity option.
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(
connectionString, SqlBulkCopyOptions.KeepIdentity))
{
bulkCopy.BatchSize = 10;
bulkCopy.DestinationTableName =
"dbo.BulkCopyDemoMatchingColumns";
// Write from the source to the destination.
// This should fail with a duplicate key error
// after some of the batches have been copied.
try
{
bulkCopy.WriteToServer(reader);
}
catch (Exception ex)
{
// Print the number of rows processed using the
// RowsCopied property.
Console.WriteLine("{0} rows were processed.",
bulkCopy.RowsCopied);
Console.WriteLine(ex.Message);
}
finally
{
reader.Close();
}
}
// Perform a final count on the destination
// table to see how many rows were added.
// Note that for this scenario, the value will
// not be equal to the RowsCopied property.
long countEnd = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Ending row count = {0}", countEnd);
Console.WriteLine("{0} rows were added.", countEnd - countStart);
Console.WriteLine("Press Enter to finish.");
Console.ReadLine();
}
}
private static string GetConnectionString()
// To avoid storing the sourceConnection string in your code,
// you can retrieve it from a configuration file.
{
return "Data Source=(local); " +
" Integrated Security=true;" +
"Initial Catalog=AdventureWorks;";
}
}
在交易中執行專用大量複製作業
根據預設,大量複製作業為其自己的交易。 當要執行專用大量複製作業時,請建立具有連接字串的 SqlBulkCopy 執行個體,或使用不含使用中交易的現有 SqlConnection 物件。 大量複製作業會先在每個案例中建立交易,再認可或回復此交易。
您可在 SqlBulkCopy 類別的建構函式中明確指定 UseInternalTransaction 選項,以在其自己的交易中執行大量複製作業。 每個批次的作業會在個別交易內執行。
注意
由於不同的批次在不同交易中執行,如果大量複製作業期間發生錯誤時,目前的批次中的所有資料列將會回復,但是先前批次的資料列將保留在資料庫中。
在下方的主控台應用程式中,除了大量複製作業會自行管理本身的交易外,其餘部分均與前述範例類似。 所有出錯之前的複製批次都會被認可。 包含重複索引鍵的批次會回復,而大量複製作業會暫停,直到處理完任何剩餘的批次。
重要
除非您已如大量複製範例設定中所述建立工作資料表,否則此範例不會執行。 這個程式碼僅是為了示範使用 SqlBulkCopy 的語法而提供。 如果來源和目的地資料表位於相同的 SQL Server 執行個體,則使用 Transact-SQL INSERT … SELECT
陳述式來複製資料會更方便且快速。
using Microsoft.Data.SqlClient;
class Program
{
static void Main()
{
string connectionString = GetConnectionString();
// Open a sourceConnection to the AdventureWorks database.
using (SqlConnection sourceConnection =
new SqlConnection(connectionString))
{
sourceConnection.Open();
// Delete all from the destination table.
SqlCommand commandDelete = new SqlCommand();
commandDelete.Connection = sourceConnection;
commandDelete.CommandText =
"DELETE FROM dbo.BulkCopyDemoMatchingColumns";
commandDelete.ExecuteNonQuery();
// Add a single row that will result in duplicate key
// when all rows from source are bulk copied.
// Note that this technique will only be successful in
// illustrating the point if a row with ProductID = 446
// exists in the AdventureWorks Production.Products table.
// If you have made changes to the data in this table, change
// the SQL statement in the code to add a ProductID that
// does exist in your version of the Production.Products
// table. Choose any ProductID in the middle of the table
// (not first or last row) to best illustrate the result.
SqlCommand commandInsert = new SqlCommand();
commandInsert.Connection = sourceConnection;
commandInsert.CommandText =
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" +
"INSERT INTO " + "dbo.BulkCopyDemoMatchingColumns " +
"([ProductID], [Name] ,[ProductNumber]) " +
"VALUES(446, 'Lock Nut 23','LN-3416');" +
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF";
commandInsert.ExecuteNonQuery();
// Perform an initial count on the destination table.
SqlCommand commandRowCount = new SqlCommand(
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;",
sourceConnection);
long countStart = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Starting row count = {0}", countStart);
// Get data from the source table as a SqlDataReader.
SqlCommand commandSourceData = new SqlCommand(
"SELECT ProductID, Name, ProductNumber " +
"FROM Production.Product;", sourceConnection);
SqlDataReader reader = commandSourceData.ExecuteReader();
// Set up the bulk copy object.
// Note that when specifying the UseInternalTransaction
// option, you cannot also specify an external transaction.
// Therefore, you must use the SqlBulkCopy construct that
// requires a string for the connection, rather than an
// existing SqlConnection object.
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(
connectionString, SqlBulkCopyOptions.KeepIdentity |
SqlBulkCopyOptions.UseInternalTransaction))
{
bulkCopy.BatchSize = 10;
bulkCopy.DestinationTableName =
"dbo.BulkCopyDemoMatchingColumns";
// Write from the source to the destination.
// This should fail with a duplicate key error
// after some of the batches have been copied.
try
{
bulkCopy.WriteToServer(reader);
}
catch (Exception ex)
{
// Print the number of rows processed using the
// RowsCopied property.
Console.WriteLine("{0} rows were processed.",
bulkCopy.RowsCopied);
Console.WriteLine(ex.Message);
}
finally
{
reader.Close();
}
}
// Perform a final count on the destination
// table to see how many rows were added.
// Note that for this scenario, the value will
// not be equal to the RowsCopied property.
long countEnd = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Ending row count = {0}", countEnd);
Console.WriteLine("{0} rows were added.", countEnd - countStart);
Console.WriteLine("Press Enter to finish.");
Console.ReadLine();
}
}
private static string GetConnectionString()
// To avoid storing the sourceConnection string in your code,
// you can retrieve it from a configuration file.
{
return "Data Source=(local); " +
" Integrated Security=true;" +
"Initial Catalog=AdventureWorks;";
}
}
使用現有的交易
您可以在 SqlBulkCopy 建構函式中,將現有的 SqlTransaction 物件指定為參數。 在此情況下,大量複製作業會在現有的交易中完成,且不會變更交易狀態,也就是其尚未認可或中止。 此方法可供應用程式在其他資料庫作業的交易中包含大量複製作業。 不過,如果您未指定 SqlTransaction 物件並傳遞了 Null 參考,而且連線具有使用中交易,則會擲回例外狀況。
如果因為發生錯誤,或大量複製應該要以可復原之較大處理序的一部分來執行,而使得您需要復原整個大量複製作業,則您可以將 SqlTransaction 物件提供給 SqlBulkCopy 建構函式。
下列主控台應用程式與第一個 (非交易) 範例類似,但有一項例外:在此範例中,大量複製作業包含在較大的外部交易中。 當發生主要索引鍵違規錯誤時,整個交易便會回復,並且沒有任何資料列會加入目的地資料表。
重要
除非您已如大量複製範例設定中所述建立工作資料表,否則此範例不會執行。 這個程式碼僅是為了示範使用 SqlBulkCopy 的語法而提供。 如果來源和目的地資料表位於相同的 SQL Server 執行個體,則使用 Transact-SQL INSERT … SELECT
陳述式來複製資料會更方便且快速。
using Microsoft.Data.SqlClient;
class Program
{
static void Main()
{
string connectionString = GetConnectionString();
// Open a sourceConnection to the AdventureWorks database.
using (SqlConnection sourceConnection =
new SqlConnection(connectionString))
{
sourceConnection.Open();
// Delete all from the destination table.
SqlCommand commandDelete = new SqlCommand();
commandDelete.Connection = sourceConnection;
commandDelete.CommandText =
"DELETE FROM dbo.BulkCopyDemoMatchingColumns";
commandDelete.ExecuteNonQuery();
// Add a single row that will result in duplicate key
// when all rows from source are bulk copied.
// Note that this technique will only be successful in
// illustrating the point if a row with ProductID = 446
// exists in the AdventureWorks Production.Products table.
// If you have made changes to the data in this table, change
// the SQL statement in the code to add a ProductID that
// does exist in your version of the Production.Products
// table. Choose any ProductID in the middle of the table
// (not first or last row) to best illustrate the result.
SqlCommand commandInsert = new SqlCommand();
commandInsert.Connection = sourceConnection;
commandInsert.CommandText =
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns ON;" +
"INSERT INTO " + "dbo.BulkCopyDemoMatchingColumns " +
"([ProductID], [Name] ,[ProductNumber]) " +
"VALUES(446, 'Lock Nut 23','LN-3416');" +
"SET IDENTITY_INSERT dbo.BulkCopyDemoMatchingColumns OFF";
commandInsert.ExecuteNonQuery();
// Perform an initial count on the destination table.
SqlCommand commandRowCount = new SqlCommand(
"SELECT COUNT(*) FROM dbo.BulkCopyDemoMatchingColumns;",
sourceConnection);
long countStart = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Starting row count = {0}", countStart);
// Get data from the source table as a SqlDataReader.
SqlCommand commandSourceData = new SqlCommand(
"SELECT ProductID, Name, ProductNumber " +
"FROM Production.Product;", sourceConnection);
SqlDataReader reader = commandSourceData.ExecuteReader();
//Set up the bulk copy object inside the transaction.
using (SqlConnection destinationConnection =
new SqlConnection(connectionString))
{
destinationConnection.Open();
using (SqlTransaction transaction =
destinationConnection.BeginTransaction())
{
using (SqlBulkCopy bulkCopy = new SqlBulkCopy(
destinationConnection, SqlBulkCopyOptions.KeepIdentity,
transaction))
{
bulkCopy.BatchSize = 10;
bulkCopy.DestinationTableName =
"dbo.BulkCopyDemoMatchingColumns";
// Write from the source to the destination.
// This should fail with a duplicate key error.
try
{
bulkCopy.WriteToServer(reader);
transaction.Commit();
}
catch (Exception ex)
{
// Print the number of rows processed using the
// RowsCopied property.
Console.WriteLine("{0} rows were processed.",
bulkCopy.RowsCopied);
Console.WriteLine(ex.Message);
transaction.Rollback();
}
finally
{
reader.Close();
}
}
}
}
// Perform a final count on the destination
// table to see how many rows were added.
long countEnd = System.Convert.ToInt32(
commandRowCount.ExecuteScalar());
Console.WriteLine("Ending row count = {0}", countEnd);
Console.WriteLine("{0} rows were added.", countEnd - countStart);
Console.WriteLine("Press Enter to finish.");
Console.ReadLine();
}
}
private static string GetConnectionString()
// To avoid storing the sourceConnection string in your code,
// you can retrieve it from a configuration file.
{
return "Data Source=(local); " +
" Integrated Security=true;" +
"Initial Catalog=AdventureWorks;";
}
}
後續步驟
意見反應
https://aka.ms/ContentUserFeedback。
即將登場:在 2024 年,我們將逐步淘汰 GitHub 問題作為內容的意見反應機制,並將它取代為新的意見反應系統。 如需詳細資訊,請參閱:提交並檢視相關的意見反應