pub struct PgElection {
leader_value: String,
pg_client: RwLock<ElectionPgClient>,
is_leader: AtomicBool,
leader_infancy: AtomicBool,
leader_watcher: Sender<LeaderChangeMessage>,
store_key_prefix: String,
candidate_lease_ttl: Duration,
meta_lease_ttl: Duration,
sql_set: ElectionSqlSet,
}
Expand description
PostgreSql implementation of Election.
Fields§
§leader_value: String
§pg_client: RwLock<ElectionPgClient>
§is_leader: AtomicBool
§leader_infancy: AtomicBool
§leader_watcher: Sender<LeaderChangeMessage>
§store_key_prefix: String
§candidate_lease_ttl: Duration
§meta_lease_ttl: Duration
§sql_set: ElectionSqlSet
Implementations§
Source§impl PgElection
impl PgElection
async fn maybe_init_client(&self) -> Result<()>
pub async fn with_pg_client( leader_value: String, pg_client: ElectionPgClient, store_key_prefix: String, candidate_lease_ttl: Duration, meta_lease_ttl: Duration, table_name: &str, lock_id: u64, ) -> Result<ElectionRef>
fn election_key(&self) -> String
fn candidate_root(&self) -> String
fn candidate_key(&self) -> String
Source§impl PgElection
impl PgElection
Sourceasync fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>>
async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>>
Returns value, expire time and current time. If with_origin
is true, the origin string is also returned.
Sourceasync fn get_value_with_lease_by_prefix(
&self,
key_prefix: &str,
) -> Result<(Vec<(String, Timestamp)>, Timestamp)>
async fn get_value_with_lease_by_prefix( &self, key_prefix: &str, ) -> Result<(Vec<(String, Timestamp)>, Timestamp)>
Returns all values and expire time with the given key prefix. Also returns the current time.
async fn update_value_with_lease( &self, key: &str, prev: &str, updated: &str, lease_ttl: Duration, ) -> Result<()>
Sourceasync fn put_value_with_lease(
&self,
key: &str,
value: &str,
lease_ttl: Duration,
) -> Result<bool>
async fn put_value_with_lease( &self, key: &str, value: &str, lease_ttl: Duration, ) -> Result<bool>
Returns true
if the insertion is successful
Sourceasync fn delete_value(&self, key: &str) -> Result<bool>
async fn delete_value(&self, key: &str) -> Result<bool>
Returns true
if the deletion is successful.
Caution: Should only delete the key if the lease is expired.
Sourceasync fn leader_action(&self) -> Result<()>
async fn leader_action(&self) -> Result<()>
Handles the actions of a leader in the election process.
This function performs the following checks and actions:
-
Case 1: If the current instance believes it is the leader from the previous term, it attempts to renew the lease. It checks if the lease is still valid and either renews it or steps down if it has expired.
- Case 1.1: If the instance is still the leader and the lease is valid, it renews the lease by updating the value associated with the election key.
- Case 1.2: If the instance is still the leader but the lease has expired, it logs a warning and steps down, initiating a new campaign for leadership.
- Case 1.3: If the instance is not the leader (which is a rare scenario), it logs a warning indicating that it still holds the lock and steps down to re-initiate the campaign. This may happen if the leader has failed to renew the lease and the session has expired, and recovery after a period of time during which other leaders have been elected and stepped down.
- Case 1.4: If no lease information is found, it also steps down and re-initiates the campaign.
-
Case 2: If the current instance is not leader previously, it calls the
elected
method as a newly elected leader.
Sourceasync fn follower_action(&self) -> Result<()>
async fn follower_action(&self) -> Result<()>
Handles the actions of a follower in the election process.
This function performs the following checks and actions:
- Case 1: If the current instance believes it is the leader from the previous term, it steps down without deleting the key.
- Case 2: If the current instance is not the leader but the lease has expired, it raises an error to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock will be released.
- Case 3: If all checks pass, the function returns without performing any actions.
Sourceasync fn step_down(&self) -> Result<()>
async fn step_down(&self) -> Result<()>
Step down the leader. The leader should delete the key and notify the leader watcher.
DO NOT check if the deletion is successful, since the key may be deleted by others elected.
§Caution:
Should only step down while holding the advisory lock.
Sourceasync fn step_down_without_lock(&self) -> Result<()>
async fn step_down_without_lock(&self) -> Result<()>
Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
Trait Implementations§
Source§impl Election for PgElection
impl Election for PgElection
Source§fn campaign<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn campaign<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Attempts to acquire leadership by executing a campaign. This function continuously checks if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
The function operates in a loop, where it:
- Waits for a predefined interval before attempting to acquire the lock again.
- Executes the
CAMPAIGN
SQL query to try to acquire the advisory lock. - Checks the result of the query:
- If the lock is successfully acquired (result is true), it calls the
leader_action
method to perform actions as the leader. - If the lock is not acquired (result is false), it calls the
follower_action
method to perform actions as a follower.
- If the lock is successfully acquired (result is true), it calls the
type Leader = LeaderValue
Source§fn in_leader_infancy(&self) -> bool
fn in_leader_infancy(&self) -> bool
Source§fn register_candidate<'life0, 'life1, 'async_trait>(
&'life0 self,
node_info: &'life1 MetasrvNodeInfo,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
fn register_candidate<'life0, 'life1, 'async_trait>(
&'life0 self,
node_info: &'life1 MetasrvNodeInfo,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
'life1: 'async_trait,
Source§fn all_candidates<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<MetasrvNodeInfo>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn all_candidates<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Vec<MetasrvNodeInfo>>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn reset_campaign<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn reset_campaign<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = ()> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn leader<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Self::Leader>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn leader<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<Self::Leader>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
Source§fn resign<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn resign<'life0, 'async_trait>(
&'life0 self,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'async_trait>>where
Self: 'async_trait,
'life0: 'async_trait,
fn subscribe_leader_change(&self) -> Receiver<LeaderChangeMessage>
Auto Trait Implementations§
impl !Freeze for PgElection
impl !RefUnwindSafe for PgElection
impl Send for PgElection
impl Sync for PgElection
impl Unpin for PgElection
impl !UnwindSafe for PgElection
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T, V> Convert<T> for Vwhere
V: Into<T>,
impl<T, V> Convert<T> for Vwhere
V: Into<T>,
fn convert(value: Self) -> T
fn convert_box(value: Box<Self>) -> Box<T>
fn convert_vec(value: Vec<Self>) -> Vec<T>
fn convert_vec_box(value: Vec<Box<Self>>) -> Vec<Box<T>>
fn convert_matrix(value: Vec<Vec<Self>>) -> Vec<Vec<T>>
fn convert_option(value: Option<Self>) -> Option<T>
fn convert_option_box(value: Option<Box<Self>>) -> Option<Box<T>>
fn convert_option_vec(value: Option<Vec<Self>>) -> Option<Vec<T>>
§impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
impl<Choices> CoproductSubsetter<CNil, HNil> for Choices
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
Source§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
Source§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
impl<T, U, I> LiftInto<U, I> for Twhere
U: LiftFrom<T, I>,
§impl<Source, Target> OctetsInto<Target> for Sourcewhere
Target: OctetsFrom<Source>,
impl<Source, Target> OctetsInto<Target> for Sourcewhere
Target: OctetsFrom<Source>,
type Error = <Target as OctetsFrom<Source>>::Error
§fn try_octets_into(
self,
) -> Result<Target, <Source as OctetsInto<Target>>::Error>
fn try_octets_into( self, ) -> Result<Target, <Source as OctetsInto<Target>>::Error>
§fn octets_into(self) -> Targetwhere
Self::Error: Into<Infallible>,
fn octets_into(self) -> Targetwhere
Self::Error: Into<Infallible>,
§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> Pointable for T
impl<T> Pointable for T
§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<Source> Sculptor<HNil, HNil> for Source
impl<Source> Sculptor<HNil, HNil> for Source
§impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
impl<SS, SP> SupersetOf<SS> for SPwhere
SS: SubsetOf<SP>,
§fn to_subset(&self) -> Option<SS>
fn to_subset(&self) -> Option<SS>
self
from the equivalent element of its
superset. Read more§fn is_in_subset(&self) -> bool
fn is_in_subset(&self) -> bool
self
is actually part of its subset T
(and can be converted to it).§fn to_subset_unchecked(&self) -> SS
fn to_subset_unchecked(&self) -> SS
self.to_subset
but without any property checks. Always succeeds.§fn from_subset(element: &SS) -> SP
fn from_subset(element: &SS) -> SP
self
to the equivalent element of its superset.§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.