Olay işlemcisi konağı
Not
Bu makale, Azure Event Hubs SDK'nın eski sürümü için geçerlidir. SDK'nın geçerli sürümü için bkz. Uygulamanın birden çok örneğinde bölüm yükünü dengeleme. Kodunuzu SDK'nın yeni sürümüne geçirmeyi öğrenmek için bu geçiş kılavuzlarına bakın.
Azure Event Hubs, düşük maliyetle milyonlarca olay akışı yapmak için kullanılan güçlü bir telemetri alımı hizmetidir. Bu makalede, Olay İşleyicisi Ana Bilgisayarı (EPH) kullanılarak edilen olayların nasıl tüketilmesi açıklanmıştır; denetim noktası, kiralama ve paralel olay okuyucularının yönetimini basitleştiren akıllı bir tüketici aracısı.
Bu özellikler için ölçeklendirilen Event Hubs bölümlenmiş tüketiciler fikridir. Rakip tüketici deseninin aksine, bölümlenmiş tüketici düzeni, rekabet sorunlarını ortadan kaldırarak ve sona paralelliği kolaylaştırarak yüksek ölçek sağlar.
Ev güvenliği senaryosu
Örnek bir senaryo olarak, 100.000 ev izleyen bir ev güvenlik şirketi düşünün. Her dakikada bir evlere yüklenmiş hareket algılayıcısı, kapı/pencere açık algılayıcısı, cam kesme algılayıcısı vb. çeşitli algılayıcılardan veri alır. Şirket, yaşayanların evlerinin etkinliğini neredeyse gerçek zamanlı olarak izlemeleri için bir web sitesi sağlar.
Her algılayıcı verileri bir olay hub'ına iletir. Olay hub'ı 16 bölümle yapılandırılır. Tüketen uçta, bu olayları okuyabilen, birleştirerek (filtre, toplama vb.) ve ardından kullanıcı dostu bir web sayfasına projelenmiş olan bir depolama blobu için dökümü alan bir mekanizma gerekir.
Tüketici uygulamasını yazma
Tüketiciyi dağıtılmış bir ortamda tasarlarken senaryonun aşağıdaki gereksinimleri işlemesi gerekir:
- Ölçeklendirme: Her tüketicinin birkaç farklı bölümden okuma sahipliğini alarak birden Event Hubs oluşturun.
- Yük dengeleme: Tüketicileri dinamik olarak artırma veya azaltma. Örneğin, her bir ev için yeni bir algılayıcı türü (örneğin, karbon monoksid algılayıcısı) eklenerek olay sayısı artar. Bu durumda işleç (insan), tüketici örneği sayısını artırır. Ardından, tüketici havuzu sahip olduğu bölüm sayısını yeniden dengeler ve yükü yeni eklenen tüketicilerle paylaşabilir.
- Hatalarda sorunsuz sürdürme: Tüketici (tüketici A) başarısız olursa (örneğin, tüketiciyi barındıran sanal makine aniden kilitleniyorsa), diğer tüketicilerin A tüketicisi tarafından sahip olunan bölümleri alıp devam etmek zorunda olması gerekir. Ayrıca denetim noktası veya uzaklık olarak adlandırılan devamlılık noktası, tüketici A'nın başarısız olduğu tam noktada veya bundan biraz önce olabilir.
- Olayları tüketme: Önceki üç nokta tüketicinin yönetimiyle ilgilenirken, olayları tüketen ve yararlı bir şey yapmak için kod olması gerekir; Örneğin, toplama ve blob depolamaya yükleme.
Event Hubs, bunun için kendi çözümlerinizi Event Hubs IEventProcessor arabirimi ve EventProcessorHost sınıfı aracılığıyla sağlar.
IEventProcessor arabirimi
İlk olarak, uygulamaları kullanmak dört yöntemi olan IEventProcessor arabirimini kullanır: OpenAsync, CloseAsync, ProcessErrorAsync ve ProcessEventsAsync. Bu arabirim, gönderilebilecek olayları tüketen gerçek Event Hubs içerir. Aşağıdaki kod basit bir uygulama gösterir:
public class SimpleEventProcessor : IEventProcessor
{
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
return Task.CompletedTask;
}
public Task OpenAsync(PartitionContext context)
{
Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
return Task.CompletedTask;
}
public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
return Task.CompletedTask;
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
}
return context.CheckpointAsync();
}
}
Ardından, bir EventProcessorHost örneği örneği örneği. Aşırı yüklemeye bağlı olarak, oluşturucuda EventProcessorHost örneği oluşturulurken aşağıdaki parametreler kullanılır:
- hostName: Her tüketici örneğinin adı. EventProcessorHost'ın her örneği bir tüketici grubu içinde bu değişken için benzersiz bir değere sahip olmalıdır, bu nedenle bu değeri sabit olarak kodlayın.
- eventHubPath: Olay hub'larının adı.
- consumerGroupName: Event Hubs varsayılan $Default grubu adı olarak bir tüketici grubu kullanır, ancak işlemeye özgü yönünüz için bir tüketici grubu oluşturmak iyi bir uygulamadır.
- eventHubConnectionString: Olay hub'ı bağlantı dizesi; bu dizeden Azure portal. Bu bağlantı dizesinin olay hub'sinde Dinleme izinlerine sahip olması gerekir.
- storageConnectionString: İç kaynak yönetimi için kullanılan depolama hesabı.
Önemli
- Denetim noktası deposu olarak kullanılan depolama hesabında yazılım silme özelliğini etkinleştirme.
- Denetim noktası deposu olarak hiyerarşik depolama (Azure Data Lake Depolama 2. Nesil) kullanmayın.
Son olarak, tüketiciler EventProcessorHost örneğini Event Hubs kaydedmektedir. EventProcessorHost örneğiyle bir olay işlemcisi sınıfı kaydetmek olay işlemeyi başlatır. Kayıt işlemi, Event Hubs hizmetine tüketici uygulamasının bazı bölümlerinin olaylarını tüketmesi ve tüketilmesi için olayları her göndermesinde IEventProcessor uygulama kodunu çağırması talimatını sağlar.
Not
consumerGroupName büyük/büyük/büyük harfe duyarlıdır. consumerGroupName'de yapılan değişiklikler, akışın başındaki tüm bölümlerin okunmasına neden olabilir.
Örnek
Örneğin, tüketen olaylara ayrılmış 5 sanal makine (VM) ve her vm'de gerçek tüketime uygun basit bir konsol uygulaması olduğunu düşünün. Her konsol uygulaması daha sonra bir EventProcessorHost örneği oluşturur ve bunu Event Hubs kaydettirmektedir.
Bu örnek senaryoda, 5 EventProcessorHost örneğine 16 bölümün ayrılmış olduğunu düşünün. Bazı EventProcessorHost örnekleri diğerlerine göre birkaç daha fazla bölüme sahip olabilir. EventProcessorHost örneğinin sahip olduğu her bölüm için sınıfının bir örneğini SimpleEventProcessor oluşturur. Bu nedenle, her bölüme bir tane SimpleEventProcessor atanmış, genel olarak 16 örnek vardır.
Aşağıdaki listede bu örnek özetlenmiştir:
- 16 Event Hubs bölüm.
- Her VM'de 5 VM, 1 tüketici uygulaması (Consumer.exe gibi).
- Kayıtlı 5 EPH örneği, sanal makineye göre 1 Consumer.exe.
- 5
SimpleEventProcessorEPH örneği tarafından oluşturulan 16 nesne. - 1 EPH Consumer.exe 4 bölüme sahip olabileceği için 1 uygulama
SimpleEventProcessor4 nesne içerebilir.
Bölüm sahipliği izleme
Bir EPH örneğinde (veya tüketicide) bir bölümün sahipliği, izleme için sağlanan Azure Depolama hesabı aracılığıyla izlenmiş olur. İzlemeyi basit bir tablo olarak aşağıdaki gibi görselleştirebilirsiniz. Sağlanan bir hesap altındaki blobları incelerken gerçek Depolama bakabilirsiniz:
| Tüketici grubu adı | Bölüm Kimliği | Ana bilgisayar adı (sahip) | Kira (veya sahiplik) alma süresi | Bölümde uzaklık (denetim noktası) |
|---|---|---|---|---|
| $Default | 0 | Tüketici _ VM3 | 2018-04-15T01:23:45 | 156 |
| $Default | 1 | Tüketici _ VM4 | 2018-04-15T01:22:13 | 734 |
| $Default | 2 | Tüketici _ VM0 | 2018-04-15T01:22:56 | 122 |
| : | ||||
| : | ||||
| $Default | 15 | Tüketici _ VM3 | 2018-04-15T01:22:56 | 976 |
Burada, her konak belirli bir süre (kira süresi) için bir bölümün sahipliğini alır. Bir konak başarısız olursa (VM kapanıyor), kiranın süresi dolar. Diğer konaklar bölümün sahipliğini almaya çalışsa da konaklardan biri başarılı olur. Bu işlem, bölümdeki kiralamayı yeni bir sahiple sıfırlar. Bu şekilde, bir tüketici grubu içindeki herhangi bir bölümden aynı anda yalnızca tek bir okuyucu okuyabilir.
İleti alma
Processeventsasync öğesine yapılan her çağrı bir olay koleksiyonu sunar. Bu olayları işlemek sizin sorumluluğunuzdadır. İşlemci konağının her iletiyi en az bir kez işlediğinden emin olmak istiyorsanız, kendi yeniden deneme kodu yazmanız gerekir. Ancak kired iletiler hakkında dikkatli olun.
Şeyleri görece hızlı yapmanız önerilir; diğer bir deyişle, mümkün olduğunca az işlem yapın. Bunun yerine, tüketici gruplarını kullanın. Depolama alanına yazmanız ve bazı yönlendirme yapmanız gerekiyorsa, iki Tüketici grubu kullanmak ve ayrı olarak çalışan iki ıeventprocessor uygulaması olması daha iyidir.
İşlem sırasında bir noktada, okuduğunuzu ve tamamlandığınızı izlemek isteyebilirsiniz. Okumayı yeniden başlatmanız gerekiyorsa, akışın başına dönmezseniz izlemenin kritik olması önemlidir. Eventprocessorhost , bu izlemeyi kontrol noktaları kullanarak basitleştirir. Bir denetim noktası, belirli bir tüketici grubundaki belirli bir bölüm için bir konum veya uzaklığa, bu noktada iletileri işletin karşılanmasını karşıladınız. Eventprocessorhost içindeki bir kontrol noktasının Işaretlenmesi, Partitioncontext nesnesinde checkpointasync yöntemi çağırarak gerçekleştirilir. Bu işlem Processeventsasync yöntemi içinde yapılır, ancak CloseAsynciçinde de yapılabilir.
Denetim noktası oluşturma
Checkpointasync yönteminin iki aşırı yüklemesi vardır: ilki parametre olmadan, Processeventsasynctarafından döndürülen koleksiyonda en yüksek olay kaydırmasına denetim noktası ekler. Bu konum "yüksek su" işaretidir; Bu, çağırdığınızda tüm son olayları işlediğinizi varsayar. Bu yöntemi bu şekilde kullanıyorsanız, diğer olay işleme kodunuz döndürülbaşladıktan sonra çağırmanız beklendiğine dikkat edin. İkinci aşırı yükleme, denetim noktası için bir eventdata örneği belirtmenizi sağlar. Bu yöntem, denetim noktası için farklı bir filigran türü kullanmanıza olanak sağlar. Bu filigranla, "düşük su" işareti uygulayabilirsiniz: en düşük sıralı olay işlendiğinde. Bu aşırı yükleme, fark yönetiminde esnekliği sağlamak için sağlanır.
Denetim noktası gerçekleştirildiğinde bölüme özgü bilgileri olan bir JSON dosyası (özellikle de, fark), oluşturucuda Eventprocessorhost'a sağlanan depolama hesabına yazılır. Bu dosya sürekli olarak güncelleştirilir. Bağlam üzerine işaret etmeyi göz önünde bulundurmanız önemlidir. her ileti için denetim noktası oluşturma işlemi geri alınamaz. checkişaret için kullanılan depolama hesabı, büyük olasılıkla bu yükü işlemez, ancak her bir olayın daha da önemlisi, bir Service Bus sırasının bir olay hub 'ından daha iyi bir seçenek olabileceği sıraya alınmış bir mesajlaşma deseninin göstergesi vardır. Event Hubs arkasındaki düşünce, harika ölçekte "en az bir kez" teslim almanızı öneririz. Aşağı akış sistemlerinizi ıdempotent yaparak, aynı olayların birden çok kez alınmasına neden olan hatalardan veya yeniden başlatmaları kurtarmak kolaydır.
İş parçacığı güvenliği ve işlemci örnekleri
Varsayılan olarak, Eventprocessorhost iş parçacığı güvenlidir ve ıeventprocessorörneğine göre zaman uyumlu bir şekilde davranır. Bir bölüme yönelik olaylar geldiğinde, bu bölüm için ıeventprocessor örneğinde processeventsasync çağrılır ve bölüm Için processeventsasync 'e yönelik daha fazla çağrı engellenir. İleti göndericisi, arka planda diğer iş parçacıklarında çalışmaya devam ettiğinden, izleyen iletiler ve arka planda gerçekleştirilen Processeventsasync kuyruğuna yapılan çağrılar. Bu iş parçacığı güvenliği, iş parçacığı açısından güvenli koleksiyonlar gereksinimini ortadan kaldırır ve performansı önemli ölçüde artırır.
Düzgün bir şekilde kapatın
Son olarak, eventprocessorhost. UnregisterEventProcessorAsync , tüm bölüm okuyucularının temiz bir şekilde kapatılmasını mümkün ve eventprocessorhostörneği kapatılırken her zaman çağrılmalıdır. Bunun yapılmaması, kira süre sonu ve dönem çakışmaları nedeniyle diğer Eventprocessorhost örnekleri başlatılırken gecikmelere neden olabilir. Dönem yönetimi, makalenin Dönem bölümünde ayrıntılı olarak ele alınmıştır.
Kira yönetimi
Bir olay işlemcisi sınıfını EventProcessorHost örneğiyle kaydetme olay işlemesini başlatır. Konak örneği, büyük olasılıkla diğer ana bilgisayar örneklerinden bazıları yakalayıp, diğer konak örneklerinden bazı bölümler arasında bir dağıtım elde etmekle bir şekilde, Olay Hub 'ının bazı bölümlerine kiralamalar edinir. Her kiralanmış bölüm için konak örneği, belirtilen olay işlemcisi sınıfının bir örneğini oluşturur, ardından bu bölümden olayları alır ve bunları olay işlemcisi örneğine geçirir. Daha fazla örnek eklendikçe ve daha fazla kira olduğundan, EventProcessorHost sonuç olarak yükü tüm tüketiciler arasında dengeler.
Daha önce açıklandığı gibi, izleme tablosu Eventprocessorhost. UnregisterEventProcessorAsyncöğesinin otomatik ölçeklendirme yapısını büyük ölçüde basitleştirir. Bir Eventprocessorhost örneği başlatıldıktan sonra, mümkün olduğunca fazla kira edinir ve olayları okumaya başlar. Süresi dolmak üzere olan kiralamalar, Eventprocessorhost , bir ayırma yerleştirerek bunları yenilemeye çalışır. Kira yenilemeye uygunsa, işlemci okumaya devam eder, ancak yoksa, okuyucu kapanır ve CloseAsync çağırılır. CloseAsync , bu bölüm için son temizleme işlemini gerçekleştirmek için uygun bir zamandır.
Eventprocessorhost , bir partitionmanageroptions özelliği içerir. Bu özellik, kiralama yönetimi üzerinde denetimi sunar. Ieventprocessor uygulamanızı kaydetmeden önce bu seçenekleri ayarlayın.
Denetim olayı Işlemcisi ana bilgisayar seçenekleri
Ayrıca, Registereventprocessorasync 'in bir aşırı yüklemesi parametre olarak bir eventprocessoroptions nesnesi alır. Eventprocessorhost. UnregisterEventProcessorAsync öğesinin davranışını denetlemek için bu parametreyi kullanın. Eventprocessoroptions dört özelliği ve bir olayı tanımlar:
- MaxBatchSize: Processeventsasyncçağrısında almak istediğiniz koleksiyonun en büyük boyutu. Bu boyut en az, yalnızca en büyük boyut değil. Alınacak daha az ileti varsa, Processeventsasync , kullanılabilir olduğu kadar çalışır.
- Prefetchcount: istemcinin kaç ileti alacağını öğrenmek için temel AMQP kanalı tarafından kullanılan bir değer. Bu değer MaxBatchSizedeğerinden büyük veya buna eşit olmalıdır.
- InvokeProcessorAfterReceiveTimeout: Bu parametre true ise, bir bölümdeki olayları almak için temeldeki çağrı zaman aşımına uğrarsa processeventsasync çağırılır. Bu yöntem, bölüm üzerinde işlem yapılmayan işlemler sırasında zaman tabanlı eylemler gerçekleştirmek için yararlıdır.
- InitialOffsetProvider: bir okuyucu bölüm okumaya başladığında ilk sapmayı sağlamak için çağrılan bir işlev işaretçisi veya lambda ifadesinin ayarlanmasını sağlar. Bu sapmayı belirtmeden, bir uzaklığa sahip bir JSON dosyası Eventprocessorhost oluşturucusuna sağlanan depolama hesabına zaten kaydedilmediği için okuyucu en eski olayda başlar. Bu yöntem, okuyucu başlatmanın davranışını değiştirmek istediğinizde yararlıdır. Bu yöntem çağrıldığında, nesne parametresi okuyucunun başlatıldığı bölüm KIMLIĞINI içerir.
- Exceptionreceivedeventargs: eventprocessorhostiçinde oluşan temeldeki tüm özel durumlarla ilgili bildirim almanızı sağlar. İstediğiniz gibi şeyler çalışmadıysanız, bu olay aramaya başlamak için iyi bir yerdir.
Süresinin
Alma süresinin nasıl çalıştığı aşağıda verilmiştir:
Dönem ile
Dönem, hizmetin kullandığı, Bölüm/kira sahipliğini zorlamak için benzersiz bir tanımlayıcıdır (dönem değeri). Createdönemler Chahize yöntemini kullanarak dönem tabanlı bir alıcı oluşturun. Bu yöntem, dönem tabanlı bir alıcı oluşturur. Alıcı, belirtilen tüketici grubundan belirli bir olay hub 'ı bölümü için oluşturulur.
Dönem özelliği, kullanıcılara, aşağıdaki kurallara göre herhangi bir zamanda bir tüketici grubunda yalnızca bir alıcı olmasını sağlar:
- Bir tüketici grubunda mevcut alıcı yoksa, Kullanıcı herhangi bir dönem değeri olan bir alıcı oluşturabilir.
- Bir dönem değeri olan bir alıcı varsa ve bir dönem değeri olan E2 (E1 <= E2) ile yeni bir alıcı oluşturulduysa, E1 ile alıcının bağlantısı otomatik olarak kesilir, E2 içeren alıcı başarıyla oluşturulur.
- Bir dönem değeri E1 olan bir alıcı varsa ve bir dönem değeri > E2 ile yeni bir alıcı oluşturulduysa, bu durumda E2 oluşturma işlemi şu hatayla başarısız olur: "dönemi olan bir alıcı zaten var.
Dönem yok
Createreceiver yöntemini kullanarak dönem tabanlı olmayan bir alıcı oluşturursunuz.
Akış işlemede, kullanıcıların tek bir tüketici grubunda birden çok alıcı oluşturmak istediğiniz bazı senaryolar vardır. Bu tür senaryoları desteklemek için, dönem olmadan bir alıcı oluşturabilir ve bu durumda tüketici grubunda 5 ' e kadar eşzamanlı alıcıya izin veririz.
Karma mod
Dönemi olan bir alıcı oluşturduğunuz ve sonra aynı tüketici grubunda dönem olmayan veya bunun tersini alan uygulama kullanımını önermiyoruz. Ancak, bu davranış oluştuğunda hizmet aşağıdaki kuralları kullanarak bunu işler:
- Zaten dönem E1 ile oluşturulmuş bir alıcı varsa ve etkin bir şekilde olay alıyorsa ve yeni bir alıcı, dönem olmadan oluşturulursa, yeni alıcı oluşturulması başarısız olur. Dönem alıcıları, her zaman sistemde önceliğe sahip olur.
- Zaten bir dönem E1 ile oluşturulmuş ve bağlantısı kesilen bir alıcı varsa ve yeni bir MessagingFactory üzerinde dönem olmadan yeni bir alıcı oluşturulduysa yeni alıcı oluşturma işlemi başarılı olur. Burada sistemimizin, yaklaşık 10 dakika sonra "alıcı bağlantısının kesilmesi" algılayacağı bir desteklenmediği uyarısıyla vardır.
- Dönem olmadan oluşturulmuş bir veya daha fazla alıcı varsa ve dönem E1 ile yeni bir alıcı oluşturulduysa, tüm eski alıcıların bağlantısı kesilir.
Not
Dönemleri kullanan uygulamalar için farklı tüketici grupları kullanmanızı ve hataları önlemek için dönemler kullanmayan dönemleri kullanmanızı öneririz.
Sonraki adımlar
Artık olay Işlemcisi konağını öğrenolduğunuza göre Event Hubs hakkında daha fazla bilgi edinmek için aşağıdaki makalelere bakın:
- Event Hubs kullanmaya başlayın