建立具有恢復功能的 StreamInsight 應用程式
此主題描述建立具有恢復功能之 StreamInsight 應用程式的步驟。
恢復功能僅能在 Premium 版的 StreamInsight 中使用。如需詳細資訊,請參閱<選擇 StreamInsight 版本>。
如需包含重新執行和重複資料刪除之具有恢復功能的應用程式端對端程式碼範例,請參閱 Codeplex 上的 StreamInsight 範例頁面中的<檢查點檢查範例>。
本主題內容
步驟 1:設定具有恢復功能的伺服器
步驟 2:定義具有恢復功能的查詢
步驟 3:擷取檢查點
步驟 4:在輸入配接器中重新執行事件
步驟 5:排除輸出配接器中的重複項目
步驟 6:從失敗復原
在不停用復原的情況下關機
範例
步驟 1:設定具有恢復功能的伺服器
必要的設定
若要設定具有恢復功能的伺服器,請在建立伺服器時提供下列組態設定的值:
中繼資料儲存區:您必須使用 SQL Server Compact 儲存伺服器的中繼資料;您無法將中繼資料儲存在記憶體中。
記錄檔路徑:此設定會決定是否要針對具有恢復功能的查詢儲存檢查點資料。路徑的預設值為 StreamInsight 處理序的工作目錄。相關的設定 (CreateLogPathIfMissing) 會決定是否要建立指定的目錄 (如果該目錄不存在)。
設定具有恢復功能的伺服器時,可以擷取檢查點,但不會造成檢查點遭到擷取。如需有關叫用檢查點的詳細資訊,請參閱步驟 3:擷取檢查點。
管理檢查點記錄檔路徑
為避免未經授權讀取或竄改檢查點檔案,請確實設定包含資料夾的權限,讓只有受信任的實體擁有存取權。
每個 StreamInsight 執行個體都應該擁有自己的記錄檔路徑。
請確定主控 StreamInsight 的處理序可以讀取和寫入指定的資料夾。
請勿編輯資料夾的內容。不再需要檢查點檔案時,StreamInsight 會刪除它們。
跨處理序伺服器
若是用戶端透過呼叫 Server.Connect 連接的跨處理序伺服器,佈建伺服器的人員會提供恢復功能組態。如果跨處理序伺服器擁有恢復功能組態,則用戶端可以在設定恢復功能時使用它;如果伺服器沒有恢復功能組態,則用戶端無法使用恢復功能。
指定恢復功能選項的方法
您可以使用下列其中一個方法來指定恢復功能設定:
當您呼叫 Server.Create 時提供恢復功能組態,以使用程式設計方式指定設定。
在應用程式組態檔中,以宣告方式指定設定。
若是同處理序伺服器,這是 app.config 檔案。
若是跨處理序伺服器,這是 StreamInsightHost.exe.config 檔案,這個檔案可以在 StreamInsight 安裝資料夾底下的 Host 資料夾中找到。
如果您同時使用兩種方法,則您在 API 呼叫中指定的設定會覆寫組態檔中的設定。
以程式設計方式建立具有恢復功能的伺服器
下列範例說明如何以程式設計方式建立具有恢復功能的同處理序伺服器。如需更詳細的範例,請參閱範例。當您呼叫 Server.Create 時,請嘗試擷取將導致檢查點檢查失敗的所有例外狀況。
SqlCeMetadataProviderConfiguration metadataConfig = new SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";
CheckpointConfiguration recoveryConfig = new CheckpointConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";
using (EmbeddedServer server =
Server.Create("Default", metadataConfig, recoveryConfig))
以宣告方式建立具有恢復功能的伺服器
下列範例示範如何使用組態檔,以宣告方式建立具有恢復功能的伺服器。
<?xml version="1.0" encoding="utf-8"?>
<configuration>
…
<appSettings>
<add key="InstanceName" value="Default"/>
<add key="CreateSqlCeMetadataFileIfMissing" value="true"/>
<add key="SQLCEMetadataFile" value="CepMetadata.sdf"/>
<add key="CheckpointEnabled" value="true"/>
<add key="CheckpointLogPath" value="CepLogPath"/>
<add key="CreateCheckpointLogPathIfMissing" value="true"/>
</appSettings>
<runtime>
<gcServer enabled="true"/>
</runtime>
</configuration>
回到頂端
步驟 2:定義具有恢復功能的查詢
若要建立具有恢復功能的查詢,請在程式碼中加入下列步驟。
在建立新查詢之前,請先確認該查詢是否已存在於中繼資料中。如果查詢已經存在,這表示應用程式已經從失敗復原。您的程式碼必須重新啟動查詢,而不是重新建立查詢。
如果查詢不存在於中繼資料中,請為 ToQuery 方法的 IsResilient 參數指定 true,以建立該查詢並將其定義為具有恢復功能。您也可以搭配 IsResilient 參數來呼叫 Application.CreateQuery 方法。
設定具有恢復功能的查詢時,可以擷取檢查點,但不會造成檢查點遭到擷取。如需有關叫用檢查點的詳細資訊,請參閱步驟 3:擷取檢查點。
定義具有恢復功能之查詢的範例
如需更詳細的範例,請參閱範例。
Query query = application.CreateQuery(
"TrafficSensorQuery",
"Minute average count, filtered by location threshold",
queryBinder,
true);
回到頂端
步驟 3:擷取檢查點
當一個或多個查詢正在執行時,定期擷取檢查點以記錄查詢的狀態。
支援檢查點檢查的 API 方法會遵循一般模式來進行非同步作業。
若要叫用檢查點,請呼叫 BeginCheckpoint 方法。如果您提供選用的 AsyncCallback,當檢查點完成時,將會呼叫它。從呼叫傳回到 BeginCheckpoint 的 IAsyncResult 會識別此檢查點要求,而且稍後可以在 EndCheckpoint 或 CancelCheckpoint 的呼叫中使用。
/// <summary> /// Take an asynchronous checkpoint for the query. /// </summary> /// <param name="query">The query to checkpoint.</param> /// <param name="asyncCallback">An optional asynchronous callback, to be called when the checkpoint is complete.</param> /// <param name="asyncState">A user-provided object that distinguishes this particular asynchronous checkpoint request from other requests.</param> /// <returns></returns> IAsyncResult BeginCheckpoint( Query query, AsyncCallback asyncCallback, Object asyncState);
在檢查點作業完成之前,會封鎖 EndCheckpoint 方法。如果檢查點作業成功,此呼叫會傳回 true;如果發生錯誤,此呼叫會引發例外狀況。
/// <summary> /// Waits for the pending asynchronous checkpoint request to complete. /// </summary> /// <param name="asyncResult">The reference to the pending asynchronous request to finish.</param> /// <returns>True if the checkpoint succeeded, false if it was canceled.</returns> bool EndCheckpoint( IAsyncResult asyncResult);
您也可以呼叫 CancelCheckpoint 來取消檢查點檢查程序。當 CancelCheckpoint 的呼叫成功時,後續對 EndCheckpoint 的呼叫會傳回 false。
/// <summary> /// Cancels the pending asynchronous checkpoint request. /// </summary> /// <param name="asyncResult">The asyncResult handle identifying the call.</param> void CancelCheckpoint( IAsyncResult asyncResult);
這個非同步模式可以透過不同的方式使用:
BeginCheckpoint 的呼叫可以緊接在 EndCheckpoint 的呼叫之後。接著,在檢查點作業完成前,會封鎖 EndCheckpoint,然後傳回結果 (或例外狀況)。使用這個模式時,通常不會使用 asyncCallback 和 asyncState。
使用者可以呼叫 BeginCheckpoint,然後輪詢傳回之 IAsyncResult 的 IsCompleted 屬性。當 IsCompleted 為 true 時,可以呼叫 EndCheckpoint 來擷取結果。使用這個模式時,通常不會使用 asyncCallback 和 asyncState。
您可以使用回撥方法呼叫 BeginCheckpoint。在此情況下,可以使用 asyncState 識別呼叫,並將任何必要資訊傳回到回撥方法。當回撥執行時,它會呼叫 EndCheckpoint 來擷取結果。
不管使用哪一種模式 (甚至在取消檢查點時),都必須呼叫 EndCheckpoint 方法。此方法是使用者從呼叫取得傳回值的唯一方式,也是 StreamInsight 得知呼叫完成的唯一方式。在呼叫 EndCheckpoint 之前,您無法開始其他檢查點。
在檢查點檢查程序中發生的錯誤不會停止或影響相關聯的查詢。如果您在檢查點作業正在進行時停止查詢,就會取消檢查點。
回到頂端
步驟 4:在輸入配接器中重新執行事件
為支援在復原期間重新執行事件,輸入配接器 Factory 必須實作 IHighWaterMarkInputAdapterFactory 或 IHighWaterMarkTypedInputAdapterFactory 介面。接著,對配接器 Factory 之 Create 方法的呼叫會提供上限標準,協助配接器識別要重新執行的事件。
若要確認已完成的輸出,所有輸入配接器都必須重新執行實體資料流中的所有事件,這個實體資料流會在上限標準表示之位置所在或之後發生。
回到頂端
步驟 5:排除輸出配接器中的重複項目
為支援在復原期間排除重複項目,輸出配接器 Factory 必須實作 IHighWaterMarkOutputAdapterFactory 或 IHighWaterMarkTypedOutputAdapterFactory 介面。接著,對配接器 Factory 之 Create 方法的呼叫會提供上限標準與位移值,協助配接器識別重複的值。這個位移是必要的,因為在輸出資料流中,對應到檢查點的位置可能會落在資料流中的任何點上。
第一次啟動查詢時,會在沒有上限標準和位移的情況下,呼叫配接器 Factory 的 Create 方法。如果伺服器尚未擷取查詢的任何檢查點,則會使用 DateTime.MinValue 的上限標準以及位移 0 (零),呼叫配接器 Factory 的 Create 方法。
如果有正確重新執行查詢,在最後一個檢查點之後產生的任何事件都會遭到擷取,但是在過時之前,將會在重新啟動時再次產生。這些是輸出配接器必須移除的重複項目。如何移除這些重複項目,取決於輸出配接器:您可以放棄原始副本,或者忽略重複副本。
若要確認相等的輸出,所有輸入配接器都必須正確重新執行輸入事件,而且所有輸出配接器都必須移除實體資料流中的所有重複事件,這個實體資料流會在過時前發生,以及在上限標準位移表示之位置所在或之後發生。
回到頂端
步驟 6:從失敗復原
伺服器會在啟動時自動執行復原,並使所有查詢恢復一致的狀態。這是一個非同步作業;因此,在復原完成之前,會傳回 Server.Create 的呼叫。
不具有恢復功能的查詢會置於「已停止」狀態。此行為尚未變更。
具有恢復功能的查詢會置於「正在初始化」狀態。接著,伺服器會載入已儲存的檢查點資訊。
您可以在此時呼叫 Start 來重新啟動查詢。一旦初始化完成之後,便會重新啟動具有恢復功能的查詢。
啟動程式碼必須執行下列步驟以便從失敗復原:
從中繼資料擷取應用程式之查詢的清單。
針對每個查詢,請先確認該查詢是否已存在於中繼資料中。
如果查詢已經存在,請重新啟動它。
如果查詢不存在於中繼資料中,請建立該查詢並將其定義為具有恢復功能,如上述步驟 2:定義具有恢復功能的查詢底下所述。
如果在復原期間本身發生問題,您可以重新啟動沒有恢復功能的伺服器。
回到頂端
在不停用復原的情況下關機
您可以關閉伺服器,而不必透過呼叫 Server 的 Dispose 方法來停用復原。
不具有恢復功能的查詢已停止。
具有恢復功能的查詢已暫停。當您重新啟動伺服器時,伺服器會嘗試復原已暫停之查詢的狀態。為防止此行為,請在關機前停止查詢。
以此方式關閉伺服器時,會同時保留不具有恢復功能和具有恢復功能的中繼資料。
回到頂端
範例
如需包含重新執行和重複資料刪除之具有恢復功能的應用程式端對端程式碼範例,請參閱 Codeplex 上的 StreamInsight 範例頁面中的<檢查點檢查範例>。
回到頂端
使用明確的開發模型定義具有恢復功能的查詢
namespace StreamInsight.Samples.TrafficJoinQuery
{
using...
internal class EmbeddedCepServer
{
internal static void Main()
{
// SQL CE was available as an optional metadata provider in v1.1
// For the server to support recovery, this becomes mandatory
// A log path is also a mandatory requirement.
SqlCeMetadataProviderConfiguration metadataConfig = new
SqlCeMetadataProviderConfiguration();
metadataConfig.CreateDataSourceIfMissing = true;
metadataConfig.DataSource = "C:\\CepMetadata.sdf";
ServerRecoveryConfiguration recoveryConfig = new ServerRecoveryConfiguration();
recoveryConfig.CreateLogPathIfMissing = true;
recoveryConfig.LogPath = "C:\\CepLogPath";
using (EmbeddedServer server = Server.Create(
"Default", metadataConfig, recoveryConfig))
{
try
{
Application application = server.CreateApplication("TrafficJoinSample");
QueryTemplate queryTemplate = CreateQueryTemplate(application);
InputAdapter csvInputAdapter =
application.CreateInputAdapter<TextFileReaderFactory>(
"CSV Input", "Reading tuples from a CSV file");
OutputAdapter csvOutputAdapter =
application.CreateOutputAdapter<TextFileWriterFactory>(
"CSV Output", "Writing result events to a CSV file");
// bind query to event producers and consumers
QueryBinder queryBinder = BindQuery(
csvInputAdapter, csvOutputAdapter, queryTemplate);
// Create bound query that can be run
Console.WriteLine("Registering bound query");
Query query = application.CreateQuery(
"TrafficSensorQuery",
"Minute average count, filtered by location threshold",
queryBinder,
true); // v1.2 addition - Specify the query as resilient
// Start the query
// v1.2 has additional semantics during recovery
query.Start();
// submit a checkpoint request
// query.Stop();
}
catch (Exception e)
{
Console.WriteLine(e);
Console.ReadLine();
}
}
Console.WriteLine("\npress enter to exit application");
Console.ReadLine();
}
檢查點檢查 - 回撥 Rendezvous 模型
namespace StreamInsight.Samples.TrafficJoinQuery
{
using...
internal class EmbeddedCepServer
{
internal static void Main()
{
// Same code through query start …
{
try
{
// Start the query
query.Start();
// submit a checkpoint request
IAsyncResult result = server.BeginCheckpoint(query,
r => {
if (server.EndCheckpoint(r))
{
// the checkpoint succeeded
}
else
{
// the checkpoint was canceled
}
},
null);
}
catch (Exception e)
{
Console.WriteLine(e);
Console.ReadLine();
}
}
Console.WriteLine("\npress enter to exit application");
Console.ReadLine();
}