question

Malvarez-0276 avatar image
0 Votes"
Malvarez-0276 asked Malvarez-0276 commented

Take events from an Eventhub to another Eventhub

Hi everyone,
I have a Python program that takes loads of events from an EventHub, does some complex processing and sends results to another Eventhub. We are using the async option of EventHubConsumerClient and EventHubProducerClient to create the client connection.
We have been trying two options to create the connections and we are having some issues with each of them:

First Option:
We create a Producer connection every time we receive an EventBatch from the consumer Eventhub. This option uses all of the events (at least that is out believe) but CPU shoots up and we have issues with memory leakage.

I would be something like

 async def on_event_batch(partition_context, event_batch):
     producer = EventHubProducerClient.from_connection_string(...)
     async with producer:
         event_data_batch =  await producer.create_batch(partition_key=partition_context.partition_id)
         ...

Second Option:
We create a singleton Producer and use it every time to send EventBatches to the target Eventhub.
This option reduces CPU and memory leaks but we lose a considerable amount of Event. A schematic piece of the code.

I would be something like

 producer = EventHubProducerClient.from_connection_string(...)
 async def on_event_batch(partition_context, event_batch):
     event_data_batch =  await producer.create_batch(partition_key=partition_context.partition_id) 
    ...


Second option seems more logical for us but we cannot solve the events lost issue.
Any help is really appreciated.
Best regards


A more complete piece of the code of First Option.

 import ...
    
 with open(configFilePath) as json_data_file:
     config = json.load(json_data_file)
    
 producer = EventHubProducerClient.from_connection_string(conn_str = config.get('EventHub').get('Producer').get('connection_str')
                                                          , eventhub_name = config.get('EventHub').get('Producer').get('eventhub_name'))
 async def on_event_batch(partition_context, event_batch):
    
     print(partition_context.partition_id)  
     # Create a batch
     event_data_batch =  await producer.create_batch(partition_key=partition_context.partition_id) 
     for x in event_batch:
         can_event, devicetype = canonizer.main(x.body_as_str()) 
         ED_can_event = EventData(str(can_event))
         try: 
             event_data_batch.add(ED_can_event) # Add event data to the batch.
         except Exception as e: 
             print(e)
             try:
                 await producer.send_batch(event_data_batch)
                 event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
                 event_data_batch.add(ED_can_event)
             except Exception as e:
                 print(e)
                 event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
     try:
         await producer.send_batch(event_data_batch) # Send batch of events to the event hub.   
     except Exception as e:
         print(e)
         pass
        
     await partition_context.update_checkpoint()
    
    
 async def main(max_batch_size = 250):
    
     checkpoint_store = BlobCheckpointStore.from_connection_string(config.get('connection_str'),config.get('container_name'))
        
     consumer = EventHubConsumerClient.from_connection_string(conn_str= config.get('EventHub').get('Consumer').get('connection_str')
                                                             , consumer_group = config.get('EventHub').get('Consumer').get('consumer_group')
                                                             , eventhub_name= config.get('EventHub').get('Consumer').get('eventhub_name')
                                                             , checkpoint_store=checkpoint_store)
     async with consumer,producer:
         await consumer.receive_batch(
             on_event_batch=on_event_batch,
             max_batch_size=max_batch_size,
         )
    
 if __name__ == '__main__':
     loop = asyncio.get_event_loop()
     loop.run_until_complete(main())


A more complete piece of the code of Second Option.

 import ...
    
 with open(configFilePath) as json_data_file:
     config = json.load(json_data_file)
        
 async def on_event_batch(partition_context, event_batch):
        
     producer = EventHubProducerClient.from_connection_string(conn_str = config.get('connection_str')
                                                          , eventhub_name = config.get('eventhub_name'))
     async with producer:
         # Create a batch
         event_data_batch =  await producer.create_batch(partition_key=partition_context.partition_id) 
         for x in event_batch:
             can_event, devicetype = canonizer.main(x.body_as_str()) #Primer elemento de la lista es la trama y el segundo el tipo de dispositivo emisor 
             ED_can_event = EventData(str(can_event))
             ED_can_event.properties  = x.properties
             ED_can_event.properties.update({"DeviceType": devicetype, "Canonized_At": int(time.time())})
    
             try: 
                 event_data_batch.add(ED_can_event)
    
             except Exception as e: 
                 print(e)
                 try:
                     await producer.send_batch(event_data_batch)
                     event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
                     event_data_batch.add(ED_can_event)
                 except Exception as e:
                     print(e)
                     event_data_batch = await producer.create_batch(partition_key=partition_context.partition_id)
         try:
             await producer.send_batch(event_data_batch) # Send batch of events to the event hub.   
         except Exception as e:
             print(e)
             pass            
         await partition_context.update_checkpoint()
    
 async def main(max_batch_size = 250):
     checkpoint_store = BlobCheckpointStore.from_connection_string(config.get('Storage').get('connection_str'),config.get('Storage').get('container_name'))    
     consumer = EventHubConsumerClient.from_connection_string(conn_str= config.get('EventHub').get('Consumer').get('connection_str')
                                                             , consumer_group = config.get('EventHub').get('Consumer').get('consumer_group')
                                                             , eventhub_name= config.get('EventHub').get('Consumer').get('eventhub_name')
                                                             , checkpoint_store=checkpoint_store)
     async with consumer:
         await consumer.receive_batch(
             on_event_batch=on_event_batch,
             max_batch_size=max_batch_size)
      
 if __name__ == '__main__':
     loop = asyncio.get_event_loop()
     loop.run_until_complete(main())









azure-event-hubs
· 1
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Hi @Malvarez-0276 ,
Thanks for using Microsoft Q&A !!

I am checking internally on this issue and get back to you.

Thanks
Saurabh

0 Votes 0 ·

1 Answer

SaurabhSharma-msft avatar image
0 Votes"
SaurabhSharma-msft answered Malvarez-0276 commented

Hi @malvarez-0276,

1st option is not favorable since it runs with TCP connection and SSL overhead for each batch handled. This will incur as high CPU cost and high API latency.

2nd option is not favorable since it runs into single client bottleneck.

I would recommend you to create a pool of producer clients like may be 10 and use them randomly. You can increase the pool size if you think you need more. This is a common practice for many TCP clients.

Hope this helps. Please let me know if you have any other questions.

Thanks
Saurabh


Please do not forget to "Accept the answer" wherever the information provided helps you to help others in the community.

· 1
5 |1600 characters needed characters left characters exceeded

Up to 10 attachments (including images) can be used with a maximum of 3.0 MiB each and 30.0 MiB total.

Thank you for your reply.
That makes perfect sense. We have created the pool of producers and apparently there is no bottleneck anymore. We have no memory leaks either but we still have a high CPU cost, which I believe could be adjusted in other ways.
Thanks again for your help.
Miguel

0 Votes 0 ·