PartitionManager Class

Manages the partition event pump execution.

Inheritance
builtins.object
PartitionManager

Constructor

PartitionManager(host)

Parameters

host

Methods

attempt_renew_lease

Make attempt_renew_lease async call sync.

attempt_renew_lease_async

Attempts to renew a potential lease if possible and marks in the queue as none adds to adds to the queue.

check_and_add_pump_async

Updates the lease on an exisiting pump.

count_leases_by_owner

Returns a dictionary of leases by current owner.

create_new_pump_async

Create a new pump thread with a given lease.

get_partition_ids_async

Returns a list of all the event hub partition IDs.

initialize_stores_async

Intializes the partition checkpoint and lease store ensures that a checkpoint exists for all partitions. Note in this case checkpoint and lease stores are the same storage manager construct.

remove_all_pumps_async

Stops all partition pumps (Note this might be wrong and need to await all tasks before returning done).

remove_pump_async

Stops a single partiton pump.

retry

Make attempt_renew_lease async call sync.

retry_async

Throws if it runs out of retries. If it returns, action succeeded.

run_async

Starts the run loop and manages exceptions and cleanup.

run_loop_async

This is the main execution loop for allocating and manging pumps.

start_async

Intializes the partition checkpoint and lease store and then calls run async.

stop_async

Terminiates the partition manger.

which_lease_to_steal

Determines and return which lease to steal If the number of leases is a multiple of the number of hosts, then the desired configuration is that all hosts own the name number of leases, and the difference between the "biggest" owner and any other is 0.

If the number of leases is not a multiple of the number of hosts, then the most even configurationpossible is for some hosts to have (self, leases/hosts) leases and others to have (self, (self, leases/hosts) + 1). For example, for 16 partitions distributed over five hosts, the distribution would be 4, 3, 3, 3, 3, or any of the possible reorderings.

In either case, if the difference between this host and the biggest owner is 2 or more, then thesystem is not in the most evenly-distributed configuration, so steal one lease from the biggest. If there is a tie for biggest, we pick whichever appears first in the list because it doesn't really matter which "biggest" is trimmed down.

Stealing one at a time prevents flapping because it reduces the difference between the biggest and this host by two at a time. If the starting difference is two or greater, then the difference cannot end up below 0. This host may become tied for biggest, but it cannot become larger than the host that it is stealing from.

attempt_renew_lease

Make attempt_renew_lease async call sync.

attempt_renew_lease(lease_task, owned_by_others_q, lease_manager)

Parameters

lease_task
Required
owned_by_others_q
Required
lease_manager
Required

attempt_renew_lease_async

Attempts to renew a potential lease if possible and marks in the queue as none adds to adds to the queue.

async attempt_renew_lease_async(lease_task, owned_by_others_q, lease_manager)

Parameters

lease_task
Required
owned_by_others_q
Required
lease_manager
Required

check_and_add_pump_async

Updates the lease on an exisiting pump.

async check_and_add_pump_async(partition_id, lease)

Parameters

partition_id
str
Required

The partition ID.

lease
Lease
Required

The lease to be used.

count_leases_by_owner

Returns a dictionary of leases by current owner.

count_leases_by_owner(leases)

Parameters

leases
Required

create_new_pump_async

Create a new pump thread with a given lease.

async create_new_pump_async(partition_id, lease)

Parameters

partition_id
str
Required

The partition ID.

lease
Lease
Required

The lease to be used.

get_partition_ids_async

Returns a list of all the event hub partition IDs.

async get_partition_ids_async()

Return type

initialize_stores_async

Intializes the partition checkpoint and lease store ensures that a checkpoint exists for all partitions. Note in this case checkpoint and lease stores are the same storage manager construct.

async initialize_stores_async()

Returns

Returns the number of partitions.

Return type

int

remove_all_pumps_async

Stops all partition pumps (Note this might be wrong and need to await all tasks before returning done).

async remove_all_pumps_async(reason)

Parameters

reason
str
Required

A reason for closing.

Return type

remove_pump_async

Stops a single partiton pump.

async remove_pump_async(partition_id, reason)

Parameters

partition_id
str
Required

The partition ID.

reason
str
Required

A reason for closing.

retry

Make attempt_renew_lease async call sync.

retry(func, partition_id, retry_message, final_failure_message, max_retries, host_id)

Parameters

func
Required
partition_id
Required
retry_message
Required
final_failure_message
Required
max_retries
Required
host_id
Required

retry_async

Throws if it runs out of retries. If it returns, action succeeded.

async retry_async(func, partition_id, retry_message, final_failure_message, max_retries, host_id)

Parameters

func
Required
partition_id
Required
retry_message
Required
final_failure_message
Required
max_retries
Required
host_id
Required

run_async

Starts the run loop and manages exceptions and cleanup.

async run_async()

run_loop_async

This is the main execution loop for allocating and manging pumps.

async run_loop_async()

start_async

Intializes the partition checkpoint and lease store and then calls run async.

async start_async()

stop_async

Terminiates the partition manger.

async stop_async()

which_lease_to_steal

Determines and return which lease to steal If the number of leases is a multiple of the number of hosts, then the desired configuration is that all hosts own the name number of leases, and the difference between the "biggest" owner and any other is 0.

If the number of leases is not a multiple of the number of hosts, then the most even configurationpossible is for some hosts to have (self, leases/hosts) leases and others to have (self, (self, leases/hosts) + 1). For example, for 16 partitions distributed over five hosts, the distribution would be 4, 3, 3, 3, 3, or any of the possible reorderings.

In either case, if the difference between this host and the biggest owner is 2 or more, then thesystem is not in the most evenly-distributed configuration, so steal one lease from the biggest. If there is a tie for biggest, we pick whichever appears first in the list because it doesn't really matter which "biggest" is trimmed down.

Stealing one at a time prevents flapping because it reduces the difference between the biggest and this host by two at a time. If the starting difference is two or greater, then the difference cannot end up below 0. This host may become tied for biggest, but it cannot become larger than the host that it is stealing from.

which_lease_to_steal(stealable_leases, have_lease_count)

Parameters

stealable_leases
list[Lease]
Required

List of leases to determine which can be stolen.

have_lease_count
int
Required

Lease count.

Return type