meta_srv/election/rds/
postgres.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_telemetry::{error, warn};
20use common_time::Timestamp;
21use deadpool_postgres::{Manager, Pool};
22use snafu::{ensure, OptionExt, ResultExt};
23use tokio::sync::{broadcast, RwLock};
24use tokio::time::MissedTickBehavior;
25use tokio_postgres::types::ToSql;
26use tokio_postgres::Row;
27
28use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
29use crate::election::{
30    listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
31    CANDIDATES_ROOT, ELECTION_KEY,
32};
33use crate::error::{
34    DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35    Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40    lock_id: u64,
41    table_name: &'a str,
42}
43
44struct ElectionSqlSet {
45    campaign: String,
46    step_down: String,
47    // SQL to put a value with expire time.
48    //
49    // Parameters for the query:
50    // `$1`: key,
51    // `$2`: value,
52    // `$3`: lease time in seconds
53    //
54    // Returns:
55    // If the key already exists, return the previous value.
56    put_value_with_lease: String,
57    // SQL to update a value with expire time.
58    //
59    // Parameters for the query:
60    // `$1`: key,
61    // `$2`: previous value,
62    // `$3`: updated value,
63    // `$4`: lease time in seconds
64    update_value_with_lease: String,
65    // SQL to get a value with expire time.
66    //
67    // Parameters:
68    // `$1`: key
69    get_value_with_lease: String,
70    // SQL to get all values with expire time with the given key prefix.
71    //
72    // Parameters:
73    // `$1`: key prefix like 'prefix%'
74    //
75    // Returns:
76    // column 0: value,
77    // column 1: current timestamp
78    get_value_with_lease_by_prefix: String,
79    // SQL to delete a value.
80    //
81    // Parameters:
82    // `$1`: key
83    //
84    // Returns:
85    // column 0: key deleted,
86    // column 1: value deleted
87    delete_value: String,
88}
89
90impl<'a> ElectionSqlFactory<'a> {
91    fn new(lock_id: u64, table_name: &'a str) -> Self {
92        Self {
93            lock_id,
94            table_name,
95        }
96    }
97
98    fn build(self) -> ElectionSqlSet {
99        ElectionSqlSet {
100            campaign: self.campaign_sql(),
101            step_down: self.step_down_sql(),
102            put_value_with_lease: self.put_value_with_lease_sql(),
103            update_value_with_lease: self.update_value_with_lease_sql(),
104            get_value_with_lease: self.get_value_with_lease_sql(),
105            get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
106            delete_value: self.delete_value_sql(),
107        }
108    }
109
110    fn campaign_sql(&self) -> String {
111        format!("SELECT pg_try_advisory_lock({})", self.lock_id)
112    }
113
114    fn step_down_sql(&self) -> String {
115        format!("SELECT pg_advisory_unlock({})", self.lock_id)
116    }
117
118    fn put_value_with_lease_sql(&self) -> String {
119        format!(
120            r#"WITH prev AS (
121                SELECT k, v FROM "{}" WHERE k = $1
122            ), insert AS (
123                INSERT INTO "{}"
124                VALUES($1, convert_to($2 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
125                ON CONFLICT (k) DO NOTHING
126            )
127            SELECT k, v FROM prev;
128            "#,
129            self.table_name, self.table_name, LEASE_SEP
130        )
131    }
132
133    fn update_value_with_lease_sql(&self) -> String {
134        format!(
135            r#"UPDATE "{}"
136               SET v = convert_to($3 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
137               WHERE k = $1 AND v = $2"#,
138            self.table_name, LEASE_SEP
139        )
140    }
141
142    fn get_value_with_lease_sql(&self) -> String {
143        format!(
144            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k = $1"#,
145            self.table_name
146        )
147    }
148
149    fn get_value_with_lease_by_prefix_sql(&self) -> String {
150        format!(
151            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k LIKE $1"#,
152            self.table_name
153        )
154    }
155
156    fn delete_value_sql(&self) -> String {
157        format!(
158            "DELETE FROM \"{}\" WHERE k = $1 RETURNING k,v;",
159            self.table_name
160        )
161    }
162}
163
164/// PgClient for election.
165pub struct ElectionPgClient {
166    current: Option<deadpool::managed::Object<Manager>>,
167    pool: Pool,
168    /// The client-side timeout for statement execution.
169    ///
170    /// This timeout is enforced by the client application and is independent of any server-side timeouts.
171    /// If a statement takes longer than this duration to execute, the client will abort the operation.
172    execution_timeout: Duration,
173
174    /// The idle session timeout.
175    ///
176    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
177    /// If a session remains idle for longer than this duration, the server will terminate it.
178    idle_session_timeout: Duration,
179
180    /// The statement timeout.
181    ///
182    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
183    /// If a statement takes longer than this duration to execute, the server will abort it.
184    statement_timeout: Duration,
185}
186
187impl ElectionPgClient {
188    pub fn new(
189        pool: Pool,
190        execution_timeout: Duration,
191        idle_session_timeout: Duration,
192        statement_timeout: Duration,
193    ) -> Result<ElectionPgClient> {
194        Ok(ElectionPgClient {
195            current: None,
196            pool,
197            execution_timeout,
198            idle_session_timeout,
199            statement_timeout,
200        })
201    }
202
203    fn set_idle_session_timeout_sql(&self) -> String {
204        format!(
205            "SET idle_session_timeout = '{}s';",
206            self.idle_session_timeout.as_secs()
207        )
208    }
209
210    fn set_statement_timeout_sql(&self) -> String {
211        format!(
212            "SET statement_timeout = '{}s';",
213            self.statement_timeout.as_secs()
214        )
215    }
216
217    async fn reset_client(&mut self) -> Result<()> {
218        self.current = None;
219        self.maybe_init_client().await
220    }
221
222    async fn maybe_init_client(&mut self) -> Result<()> {
223        if self.current.is_none() {
224            let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
225
226            self.current = Some(client);
227            // Set idle session timeout and statement timeout.
228            let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
229            self.execute(&idle_session_timeout_sql, &[]).await?;
230            let statement_timeout_sql = self.set_statement_timeout_sql();
231            self.execute(&statement_timeout_sql, &[]).await?;
232        }
233
234        Ok(())
235    }
236
237    /// Returns the result of the query.
238    ///
239    /// # Panics
240    /// if `current` is `None`.
241    async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
242        let result = tokio::time::timeout(
243            self.execution_timeout,
244            self.current.as_ref().unwrap().execute(sql, params),
245        )
246        .await
247        .map_err(|_| {
248            SqlExecutionTimeoutSnafu {
249                sql: sql.to_string(),
250                duration: self.execution_timeout,
251            }
252            .build()
253        })?;
254
255        result.context(PostgresExecutionSnafu { sql })
256    }
257
258    /// Returns the result of the query.
259    ///
260    /// # Panics
261    /// if `current` is `None`.
262    async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
263        let result = tokio::time::timeout(
264            self.execution_timeout,
265            self.current.as_ref().unwrap().query(sql, params),
266        )
267        .await
268        .map_err(|_| {
269            SqlExecutionTimeoutSnafu {
270                sql: sql.to_string(),
271                duration: self.execution_timeout,
272            }
273            .build()
274        })?;
275
276        result.context(PostgresExecutionSnafu { sql })
277    }
278}
279
280/// PostgreSql implementation of Election.
281pub struct PgElection {
282    leader_value: String,
283    pg_client: RwLock<ElectionPgClient>,
284    is_leader: AtomicBool,
285    leader_infancy: AtomicBool,
286    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
287    store_key_prefix: String,
288    candidate_lease_ttl: Duration,
289    meta_lease_ttl: Duration,
290    sql_set: ElectionSqlSet,
291}
292
293impl PgElection {
294    async fn maybe_init_client(&self) -> Result<()> {
295        if self.pg_client.read().await.current.is_none() {
296            self.pg_client.write().await.maybe_init_client().await?;
297        }
298
299        Ok(())
300    }
301
302    pub async fn with_pg_client(
303        leader_value: String,
304        pg_client: ElectionPgClient,
305        store_key_prefix: String,
306        candidate_lease_ttl: Duration,
307        meta_lease_ttl: Duration,
308        table_name: &str,
309        lock_id: u64,
310    ) -> Result<ElectionRef> {
311        let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
312
313        let tx = listen_leader_change(leader_value.clone());
314        Ok(Arc::new(Self {
315            leader_value,
316            pg_client: RwLock::new(pg_client),
317            is_leader: AtomicBool::new(false),
318            leader_infancy: AtomicBool::new(false),
319            leader_watcher: tx,
320            store_key_prefix,
321            candidate_lease_ttl,
322            meta_lease_ttl,
323            sql_set: sql_factory.build(),
324        }))
325    }
326
327    fn election_key(&self) -> String {
328        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
329    }
330
331    fn candidate_root(&self) -> String {
332        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
333    }
334
335    fn candidate_key(&self) -> String {
336        format!("{}{}", self.candidate_root(), self.leader_value)
337    }
338}
339
340#[async_trait::async_trait]
341impl Election for PgElection {
342    type Leader = LeaderValue;
343
344    fn is_leader(&self) -> bool {
345        self.is_leader.load(Ordering::Relaxed)
346    }
347
348    fn in_leader_infancy(&self) -> bool {
349        self.leader_infancy
350            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
351            .is_ok()
352    }
353
354    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
355        let key = self.candidate_key();
356        let node_info =
357            serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
358                input: format!("{node_info:?}"),
359            })?;
360        let res = self
361            .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
362            .await?;
363        // May registered before, just update the lease.
364        if !res {
365            self.delete_value(&key).await?;
366            self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
367                .await?;
368        }
369
370        // Check if the current lease has expired and renew the lease.
371        let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
372        loop {
373            let _ = keep_alive_interval.tick().await;
374
375            let lease = self
376                .get_value_with_lease(&key)
377                .await?
378                .context(UnexpectedSnafu {
379                    violated: format!("Failed to get lease for key: {:?}", key),
380                })?;
381
382            ensure!(
383                lease.expire_time > lease.current,
384                UnexpectedSnafu {
385                    violated: format!(
386                        "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
387                        lease.expire_time, lease.current, key
388                    ),
389                }
390            );
391
392            // Safety: origin is Some since we are using `get_value_with_lease` with `true`.
393            self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
394                .await?;
395        }
396    }
397
398    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
399        let key_prefix = self.candidate_root();
400        let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
401        // Remove expired candidates
402        candidates.retain(|c| c.1 > current);
403        let mut valid_candidates = Vec::with_capacity(candidates.len());
404        for (c, _) in candidates {
405            let node_info: MetasrvNodeInfo =
406                serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
407                    input: format!("{:?}", c),
408                })?;
409            valid_candidates.push(node_info);
410        }
411        Ok(valid_candidates)
412    }
413
414    /// Attempts to acquire leadership by executing a campaign. This function continuously checks
415    /// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
416    ///
417    /// The function operates in a loop, where it:
418    ///
419    /// 1. Waits for a predefined interval before attempting to acquire the lock again.
420    /// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock.
421    /// 3. Checks the result of the query:
422    ///    - If the lock is successfully acquired (result is true), it calls the `leader_action` method
423    ///      to perform actions as the leader.
424    ///    - If the lock is not acquired (result is false), it calls the `follower_action` method
425    ///      to perform actions as a follower.
426    async fn campaign(&self) -> Result<()> {
427        let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
428        keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
429
430        self.maybe_init_client().await?;
431        loop {
432            let res = self
433                .pg_client
434                .read()
435                .await
436                .query(&self.sql_set.campaign, &[])
437                .await?;
438            let row = res.first().context(UnexpectedSnafu {
439                violated: "Failed to get the result of acquiring advisory lock",
440            })?;
441            let is_leader = row.try_get(0).map_err(|_| {
442                UnexpectedSnafu {
443                    violated: "Failed to get the result of get lock",
444                }
445                .build()
446            })?;
447            if is_leader {
448                self.leader_action().await?;
449            } else {
450                self.follower_action().await?;
451            }
452            let _ = keep_alive_interval.tick().await;
453        }
454    }
455
456    async fn reset_campaign(&self) {
457        if let Err(err) = self.pg_client.write().await.reset_client().await {
458            error!(err; "Failed to reset client");
459        }
460    }
461
462    async fn leader(&self) -> Result<Self::Leader> {
463        if self.is_leader.load(Ordering::Relaxed) {
464            Ok(self.leader_value.as_bytes().into())
465        } else {
466            let key = self.election_key();
467            if let Some(lease) = self.get_value_with_lease(&key).await? {
468                ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
469                Ok(lease.leader_value.as_bytes().into())
470            } else {
471                NoLeaderSnafu.fail()
472            }
473        }
474    }
475
476    async fn resign(&self) -> Result<()> {
477        todo!()
478    }
479
480    fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
481        self.leader_watcher.subscribe()
482    }
483}
484
485impl PgElection {
486    /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
487    async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
488        let key = key.as_bytes();
489        self.maybe_init_client().await?;
490        let res = self
491            .pg_client
492            .read()
493            .await
494            .query(&self.sql_set.get_value_with_lease, &[&key])
495            .await?;
496
497        if res.is_empty() {
498            Ok(None)
499        } else {
500            // Safety: Checked if res is empty above.
501            let current_time_str = res[0].try_get(1).unwrap_or_default();
502            let current_time = match Timestamp::from_str(current_time_str, None) {
503                Ok(ts) => ts,
504                Err(_) => UnexpectedSnafu {
505                    violated: format!("Invalid timestamp: {}", current_time_str),
506                }
507                .fail()?,
508            };
509            // Safety: Checked if res is empty above.
510            let value_and_expire_time =
511                String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
512            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
513
514            Ok(Some(Lease {
515                leader_value: value,
516                expire_time,
517                current: current_time,
518                origin: value_and_expire_time.to_string(),
519            }))
520        }
521    }
522
523    /// Returns all values and expire time with the given key prefix. Also returns the current time.
524    async fn get_value_with_lease_by_prefix(
525        &self,
526        key_prefix: &str,
527    ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
528        let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
529        self.maybe_init_client().await?;
530        let res = self
531            .pg_client
532            .read()
533            .await
534            .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
535            .await?;
536
537        let mut values_with_leases = vec![];
538        let mut current = Timestamp::default();
539        for row in res {
540            let current_time_str = row.try_get(1).unwrap_or_default();
541            current = match Timestamp::from_str(current_time_str, None) {
542                Ok(ts) => ts,
543                Err(_) => UnexpectedSnafu {
544                    violated: format!("Invalid timestamp: {}", current_time_str),
545                }
546                .fail()?,
547            };
548
549            let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
550            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
551
552            values_with_leases.push((value, expire_time));
553        }
554        Ok((values_with_leases, current))
555    }
556
557    async fn update_value_with_lease(
558        &self,
559        key: &str,
560        prev: &str,
561        updated: &str,
562        lease_ttl: Duration,
563    ) -> Result<()> {
564        let key = key.as_bytes();
565        let prev = prev.as_bytes();
566        self.maybe_init_client().await?;
567        let lease_ttl_secs = lease_ttl.as_secs() as f64;
568        let res = self
569            .pg_client
570            .read()
571            .await
572            .execute(
573                &self.sql_set.update_value_with_lease,
574                &[&key, &prev, &updated, &lease_ttl_secs],
575            )
576            .await?;
577
578        ensure!(
579            res == 1,
580            UnexpectedSnafu {
581                violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
582            }
583        );
584
585        Ok(())
586    }
587
588    /// Returns `true` if the insertion is successful
589    async fn put_value_with_lease(
590        &self,
591        key: &str,
592        value: &str,
593        lease_ttl: Duration,
594    ) -> Result<bool> {
595        let key = key.as_bytes();
596        let lease_ttl_secs = lease_ttl.as_secs() as f64;
597        let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
598        self.maybe_init_client().await?;
599        let res = self
600            .pg_client
601            .read()
602            .await
603            .query(&self.sql_set.put_value_with_lease, &params)
604            .await?;
605        Ok(res.is_empty())
606    }
607
608    /// Returns `true` if the deletion is successful.
609    /// Caution: Should only delete the key if the lease is expired.
610    async fn delete_value(&self, key: &str) -> Result<bool> {
611        let key = key.as_bytes();
612        self.maybe_init_client().await?;
613        let res = self
614            .pg_client
615            .read()
616            .await
617            .query(&self.sql_set.delete_value, &[&key])
618            .await?;
619
620        Ok(res.len() == 1)
621    }
622
623    /// Handles the actions of a leader in the election process.
624    ///
625    /// This function performs the following checks and actions:
626    ///
627    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
628    ///   it attempts to renew the lease. It checks if the lease is still valid and either renews it
629    ///   or steps down if it has expired.
630    ///
631    ///   - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease
632    ///     by updating the value associated with the election key.
633    ///   - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning
634    ///     and steps down, initiating a new campaign for leadership.
635    ///   - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning
636    ///     indicating that it still holds the lock and steps down to re-initiate the campaign. This may
637    ///     happen if the leader has failed to renew the lease and the session has expired, and recovery
638    ///     after a period of time during which other leaders have been elected and stepped down.
639    ///   - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
640    ///
641    /// - **Case 2**: If the current instance is not leader previously, it calls the
642    ///   `elected` method as a newly elected leader.
643    async fn leader_action(&self) -> Result<()> {
644        let key = self.election_key();
645        // Case 1
646        if self.is_leader() {
647            match self.get_value_with_lease(&key).await? {
648                Some(lease) => {
649                    match (
650                        lease.leader_value == self.leader_value,
651                        lease.expire_time > lease.current,
652                    ) {
653                        // Case 1.1
654                        (true, true) => {
655                            // Safety: prev is Some since we are using `get_value_with_lease` with `true`.
656                            self.update_value_with_lease(
657                                &key,
658                                &lease.origin,
659                                &self.leader_value,
660                                self.meta_lease_ttl,
661                            )
662                            .await?;
663                        }
664                        // Case 1.2
665                        (true, false) => {
666                            warn!("Leader lease expired, now stepping down.");
667                            self.step_down().await?;
668                        }
669                        // Case 1.3
670                        (false, _) => {
671                            warn!("Leader lease not found, but still hold the lock. Now stepping down.");
672                            self.step_down().await?;
673                        }
674                    }
675                }
676                // Case 1.4
677                None => {
678                    warn!("Leader lease not found, but still hold the lock. Now stepping down.");
679                    self.step_down().await?;
680                }
681            }
682        // Case 2
683        } else {
684            self.elected().await?;
685        }
686        Ok(())
687    }
688
689    /// Handles the actions of a follower in the election process.
690    ///
691    /// This function performs the following checks and actions:
692    ///
693    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
694    ///   it steps down without deleting the key.
695    /// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error
696    ///   to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
697    ///   will be released.
698    /// - **Case 3**: If all checks pass, the function returns without performing any actions.
699    async fn follower_action(&self) -> Result<()> {
700        let key = self.election_key();
701        // Case 1
702        if self.is_leader() {
703            self.step_down_without_lock().await?;
704        }
705        let lease = self
706            .get_value_with_lease(&key)
707            .await?
708            .context(NoLeaderSnafu)?;
709        // Case 2
710        ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
711        // Case 3
712        Ok(())
713    }
714
715    /// Step down the leader. The leader should delete the key and notify the leader watcher.
716    ///
717    /// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected.
718    ///
719    /// ## Caution:
720    /// Should only step down while holding the advisory lock.
721    async fn step_down(&self) -> Result<()> {
722        let key = self.election_key();
723        let leader_key = RdsLeaderKey {
724            name: self.leader_value.clone().into_bytes(),
725            key: key.clone().into_bytes(),
726            ..Default::default()
727        };
728        self.delete_value(&key).await?;
729        self.maybe_init_client().await?;
730        self.pg_client
731            .read()
732            .await
733            .query(&self.sql_set.step_down, &[])
734            .await?;
735        send_leader_change_and_set_flags(
736            &self.is_leader,
737            &self.leader_infancy,
738            &self.leader_watcher,
739            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
740        );
741        Ok(())
742    }
743
744    /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
745    async fn step_down_without_lock(&self) -> Result<()> {
746        let key = self.election_key().into_bytes();
747        let leader_key = RdsLeaderKey {
748            name: self.leader_value.clone().into_bytes(),
749            key: key.clone(),
750            ..Default::default()
751        };
752        if self
753            .is_leader
754            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
755            .is_ok()
756        {
757            if let Err(e) = self
758                .leader_watcher
759                .send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
760            {
761                error!(e; "Failed to send leader change message");
762            }
763        }
764        Ok(())
765    }
766
767    /// Elected as leader. The leader should put the key and notify the leader watcher.
768    /// Caution: Should only elected while holding the advisory lock.
769    async fn elected(&self) -> Result<()> {
770        let key = self.election_key();
771        let leader_key = RdsLeaderKey {
772            name: self.leader_value.clone().into_bytes(),
773            key: key.clone().into_bytes(),
774            ..Default::default()
775        };
776        self.delete_value(&key).await?;
777        self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
778            .await?;
779
780        if self
781            .is_leader
782            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
783            .is_ok()
784        {
785            self.leader_infancy.store(true, Ordering::Release);
786
787            if let Err(e) = self
788                .leader_watcher
789                .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
790            {
791                error!(e; "Failed to send leader change message");
792            }
793        }
794        Ok(())
795    }
796}
797
798#[cfg(test)]
799mod tests {
800    use std::assert_matches::assert_matches;
801    use std::env;
802
803    use common_meta::maybe_skip_postgres_integration_test;
804
805    use super::*;
806    use crate::bootstrap::create_postgres_pool;
807    use crate::error;
808
809    async fn create_postgres_client(
810        table_name: Option<&str>,
811        execution_timeout: Duration,
812        idle_session_timeout: Duration,
813        statement_timeout: Duration,
814    ) -> Result<ElectionPgClient> {
815        let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
816        if endpoint.is_empty() {
817            return UnexpectedSnafu {
818                violated: "Postgres endpoint is empty".to_string(),
819            }
820            .fail();
821        }
822        let pool = create_postgres_pool(&[endpoint]).await.unwrap();
823        let mut pg_client = ElectionPgClient::new(
824            pool,
825            execution_timeout,
826            idle_session_timeout,
827            statement_timeout,
828        )
829        .unwrap();
830        pg_client.maybe_init_client().await?;
831        if let Some(table_name) = table_name {
832            let create_table_sql = format!(
833                "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
834                table_name
835            );
836            pg_client.execute(&create_table_sql, &[]).await?;
837        }
838        Ok(pg_client)
839    }
840
841    async fn drop_table(pg_election: &PgElection, table_name: &str) {
842        let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
843        pg_election
844            .pg_client
845            .read()
846            .await
847            .execute(&sql, &[])
848            .await
849            .unwrap();
850    }
851
852    #[tokio::test]
853    async fn test_postgres_crud() {
854        maybe_skip_postgres_integration_test!();
855        let key = "test_key".to_string();
856        let value = "test_value".to_string();
857
858        let uuid = uuid::Uuid::new_v4().to_string();
859        let table_name = "test_postgres_crud_greptime_metakv";
860        let candidate_lease_ttl = Duration::from_secs(10);
861        let execution_timeout = Duration::from_secs(10);
862        let statement_timeout = Duration::from_secs(10);
863        let meta_lease_ttl = Duration::from_secs(2);
864        let idle_session_timeout = Duration::from_secs(0);
865        let client = create_postgres_client(
866            Some(table_name),
867            execution_timeout,
868            idle_session_timeout,
869            statement_timeout,
870        )
871        .await
872        .unwrap();
873
874        let (tx, _) = broadcast::channel(100);
875        let pg_election = PgElection {
876            leader_value: "test_leader".to_string(),
877            pg_client: RwLock::new(client),
878            is_leader: AtomicBool::new(false),
879            leader_infancy: AtomicBool::new(true),
880            leader_watcher: tx,
881            store_key_prefix: uuid,
882            candidate_lease_ttl,
883            meta_lease_ttl,
884            sql_set: ElectionSqlFactory::new(28319, table_name).build(),
885        };
886
887        let res = pg_election
888            .put_value_with_lease(&key, &value, candidate_lease_ttl)
889            .await
890            .unwrap();
891        assert!(res);
892
893        let lease = pg_election
894            .get_value_with_lease(&key)
895            .await
896            .unwrap()
897            .unwrap();
898        assert_eq!(lease.leader_value, value);
899
900        pg_election
901            .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
902            .await
903            .unwrap();
904
905        let res = pg_election.delete_value(&key).await.unwrap();
906        assert!(res);
907
908        let res = pg_election.get_value_with_lease(&key).await.unwrap();
909        assert!(res.is_none());
910
911        for i in 0..10 {
912            let key = format!("test_key_{}", i);
913            let value = format!("test_value_{}", i);
914            pg_election
915                .put_value_with_lease(&key, &value, candidate_lease_ttl)
916                .await
917                .unwrap();
918        }
919
920        let key_prefix = "test_key".to_string();
921        let (res, _) = pg_election
922            .get_value_with_lease_by_prefix(&key_prefix)
923            .await
924            .unwrap();
925        assert_eq!(res.len(), 10);
926
927        for i in 0..10 {
928            let key = format!("test_key_{}", i);
929            let res = pg_election.delete_value(&key).await.unwrap();
930            assert!(res);
931        }
932
933        let (res, current) = pg_election
934            .get_value_with_lease_by_prefix(&key_prefix)
935            .await
936            .unwrap();
937        assert!(res.is_empty());
938        assert!(current == Timestamp::default());
939
940        drop_table(&pg_election, table_name).await;
941    }
942
943    async fn candidate(
944        leader_value: String,
945        candidate_lease_ttl: Duration,
946        store_key_prefix: String,
947        table_name: String,
948    ) {
949        let execution_timeout = Duration::from_secs(10);
950        let statement_timeout = Duration::from_secs(10);
951        let meta_lease_ttl = Duration::from_secs(2);
952        let idle_session_timeout = Duration::from_secs(0);
953        let client = create_postgres_client(
954            None,
955            execution_timeout,
956            idle_session_timeout,
957            statement_timeout,
958        )
959        .await
960        .unwrap();
961
962        let (tx, _) = broadcast::channel(100);
963        let pg_election = PgElection {
964            leader_value,
965            pg_client: RwLock::new(client),
966            is_leader: AtomicBool::new(false),
967            leader_infancy: AtomicBool::new(true),
968            leader_watcher: tx,
969            store_key_prefix,
970            candidate_lease_ttl,
971            meta_lease_ttl,
972            sql_set: ElectionSqlFactory::new(28319, &table_name).build(),
973        };
974
975        let node_info = MetasrvNodeInfo {
976            addr: "test_addr".to_string(),
977            version: "test_version".to_string(),
978            git_commit: "test_git_commit".to_string(),
979            start_time_ms: 0,
980        };
981        pg_election.register_candidate(&node_info).await.unwrap();
982    }
983
984    #[tokio::test]
985    async fn test_candidate_registration() {
986        maybe_skip_postgres_integration_test!();
987        let leader_value_prefix = "test_leader".to_string();
988        let uuid = uuid::Uuid::new_v4().to_string();
989        let table_name = "test_candidate_registration_greptime_metakv";
990        let mut handles = vec![];
991        let candidate_lease_ttl = Duration::from_secs(5);
992        let execution_timeout = Duration::from_secs(10);
993        let statement_timeout = Duration::from_secs(10);
994        let meta_lease_ttl = Duration::from_secs(2);
995        let idle_session_timeout = Duration::from_secs(0);
996        let client = create_postgres_client(
997            Some(table_name),
998            execution_timeout,
999            idle_session_timeout,
1000            statement_timeout,
1001        )
1002        .await
1003        .unwrap();
1004
1005        for i in 0..10 {
1006            let leader_value = format!("{}{}", leader_value_prefix, i);
1007            let handle = tokio::spawn(candidate(
1008                leader_value,
1009                candidate_lease_ttl,
1010                uuid.clone(),
1011                table_name.to_string(),
1012            ));
1013            handles.push(handle);
1014        }
1015        // Wait for candidates to registrate themselves and renew their leases at least once.
1016        tokio::time::sleep(Duration::from_secs(3)).await;
1017
1018        let (tx, _) = broadcast::channel(100);
1019        let leader_value = "test_leader".to_string();
1020        let pg_election = PgElection {
1021            leader_value,
1022            pg_client: RwLock::new(client),
1023            is_leader: AtomicBool::new(false),
1024            leader_infancy: AtomicBool::new(true),
1025            leader_watcher: tx,
1026            store_key_prefix: uuid.clone(),
1027            candidate_lease_ttl,
1028            meta_lease_ttl,
1029            sql_set: ElectionSqlFactory::new(28319, table_name).build(),
1030        };
1031
1032        let candidates = pg_election.all_candidates().await.unwrap();
1033        assert_eq!(candidates.len(), 10);
1034
1035        for handle in handles {
1036            handle.abort();
1037        }
1038
1039        // Wait for the candidate leases to expire.
1040        tokio::time::sleep(Duration::from_secs(5)).await;
1041        let candidates = pg_election.all_candidates().await.unwrap();
1042        assert!(candidates.is_empty());
1043
1044        // Garbage collection
1045        for i in 0..10 {
1046            let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1047            let res = pg_election.delete_value(&key).await.unwrap();
1048            assert!(res);
1049        }
1050
1051        drop_table(&pg_election, table_name).await;
1052    }
1053
1054    #[tokio::test]
1055    async fn test_elected_and_step_down() {
1056        maybe_skip_postgres_integration_test!();
1057        let leader_value = "test_leader".to_string();
1058        let uuid = uuid::Uuid::new_v4().to_string();
1059        let table_name = "test_elected_and_step_down_greptime_metakv";
1060        let candidate_lease_ttl = Duration::from_secs(5);
1061        let execution_timeout = Duration::from_secs(10);
1062        let statement_timeout = Duration::from_secs(10);
1063        let meta_lease_ttl = Duration::from_secs(2);
1064        let idle_session_timeout = Duration::from_secs(0);
1065        let client = create_postgres_client(
1066            Some(table_name),
1067            execution_timeout,
1068            idle_session_timeout,
1069            statement_timeout,
1070        )
1071        .await
1072        .unwrap();
1073
1074        let (tx, mut rx) = broadcast::channel(100);
1075        let leader_pg_election = PgElection {
1076            leader_value: leader_value.clone(),
1077            pg_client: RwLock::new(client),
1078            is_leader: AtomicBool::new(false),
1079            leader_infancy: AtomicBool::new(true),
1080            leader_watcher: tx,
1081            store_key_prefix: uuid,
1082            candidate_lease_ttl,
1083            meta_lease_ttl,
1084            sql_set: ElectionSqlFactory::new(28320, table_name).build(),
1085        };
1086
1087        leader_pg_election.elected().await.unwrap();
1088        let lease = leader_pg_election
1089            .get_value_with_lease(&leader_pg_election.election_key())
1090            .await
1091            .unwrap()
1092            .unwrap();
1093        assert!(lease.leader_value == leader_value);
1094        assert!(lease.expire_time > lease.current);
1095        assert!(leader_pg_election.is_leader());
1096
1097        match rx.recv().await {
1098            Ok(LeaderChangeMessage::Elected(key)) => {
1099                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1100                assert_eq!(
1101                    String::from_utf8_lossy(key.key()),
1102                    leader_pg_election.election_key()
1103                );
1104                assert_eq!(key.lease_id(), i64::default());
1105                assert_eq!(key.revision(), i64::default());
1106            }
1107            _ => panic!("Expected LeaderChangeMessage::Elected"),
1108        }
1109
1110        leader_pg_election.step_down_without_lock().await.unwrap();
1111        let lease = leader_pg_election
1112            .get_value_with_lease(&leader_pg_election.election_key())
1113            .await
1114            .unwrap()
1115            .unwrap();
1116        assert!(lease.leader_value == leader_value);
1117        assert!(!leader_pg_election.is_leader());
1118
1119        match rx.recv().await {
1120            Ok(LeaderChangeMessage::StepDown(key)) => {
1121                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1122                assert_eq!(
1123                    String::from_utf8_lossy(key.key()),
1124                    leader_pg_election.election_key()
1125                );
1126                assert_eq!(key.lease_id(), i64::default());
1127                assert_eq!(key.revision(), i64::default());
1128            }
1129            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1130        }
1131
1132        leader_pg_election.elected().await.unwrap();
1133        let lease = leader_pg_election
1134            .get_value_with_lease(&leader_pg_election.election_key())
1135            .await
1136            .unwrap()
1137            .unwrap();
1138        assert!(lease.leader_value == leader_value);
1139        assert!(lease.expire_time > lease.current);
1140        assert!(leader_pg_election.is_leader());
1141
1142        match rx.recv().await {
1143            Ok(LeaderChangeMessage::Elected(key)) => {
1144                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1145                assert_eq!(
1146                    String::from_utf8_lossy(key.key()),
1147                    leader_pg_election.election_key()
1148                );
1149                assert_eq!(key.lease_id(), i64::default());
1150                assert_eq!(key.revision(), i64::default());
1151            }
1152            _ => panic!("Expected LeaderChangeMessage::Elected"),
1153        }
1154
1155        leader_pg_election.step_down().await.unwrap();
1156        let res = leader_pg_election
1157            .get_value_with_lease(&leader_pg_election.election_key())
1158            .await
1159            .unwrap();
1160        assert!(res.is_none());
1161        assert!(!leader_pg_election.is_leader());
1162
1163        match rx.recv().await {
1164            Ok(LeaderChangeMessage::StepDown(key)) => {
1165                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1166                assert_eq!(
1167                    String::from_utf8_lossy(key.key()),
1168                    leader_pg_election.election_key()
1169                );
1170                assert_eq!(key.lease_id(), i64::default());
1171                assert_eq!(key.revision(), i64::default());
1172            }
1173            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1174        }
1175
1176        drop_table(&leader_pg_election, table_name).await;
1177    }
1178
1179    #[tokio::test]
1180    async fn test_leader_action() {
1181        maybe_skip_postgres_integration_test!();
1182        let leader_value = "test_leader".to_string();
1183        let uuid = uuid::Uuid::new_v4().to_string();
1184        let table_name = "test_leader_action_greptime_metakv";
1185        let candidate_lease_ttl = Duration::from_secs(5);
1186        let execution_timeout = Duration::from_secs(10);
1187        let statement_timeout = Duration::from_secs(10);
1188        let meta_lease_ttl = Duration::from_secs(2);
1189        let idle_session_timeout = Duration::from_secs(0);
1190        let client = create_postgres_client(
1191            Some(table_name),
1192            execution_timeout,
1193            idle_session_timeout,
1194            statement_timeout,
1195        )
1196        .await
1197        .unwrap();
1198
1199        let (tx, mut rx) = broadcast::channel(100);
1200        let leader_pg_election = PgElection {
1201            leader_value: leader_value.clone(),
1202            pg_client: RwLock::new(client),
1203            is_leader: AtomicBool::new(false),
1204            leader_infancy: AtomicBool::new(true),
1205            leader_watcher: tx,
1206            store_key_prefix: uuid,
1207            candidate_lease_ttl,
1208            meta_lease_ttl,
1209            sql_set: ElectionSqlFactory::new(28321, table_name).build(),
1210        };
1211
1212        // Step 1: No leader exists, campaign and elected.
1213        let res = leader_pg_election
1214            .pg_client
1215            .read()
1216            .await
1217            .query(&leader_pg_election.sql_set.campaign, &[])
1218            .await
1219            .unwrap();
1220        let res: bool = res[0].get(0);
1221        assert!(res);
1222        leader_pg_election.leader_action().await.unwrap();
1223        let lease = leader_pg_election
1224            .get_value_with_lease(&leader_pg_election.election_key())
1225            .await
1226            .unwrap()
1227            .unwrap();
1228        assert!(lease.leader_value == leader_value);
1229        assert!(lease.expire_time > lease.current);
1230        assert!(leader_pg_election.is_leader());
1231
1232        match rx.recv().await {
1233            Ok(LeaderChangeMessage::Elected(key)) => {
1234                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1235                assert_eq!(
1236                    String::from_utf8_lossy(key.key()),
1237                    leader_pg_election.election_key()
1238                );
1239                assert_eq!(key.lease_id(), i64::default());
1240                assert_eq!(key.revision(), i64::default());
1241            }
1242            _ => panic!("Expected LeaderChangeMessage::Elected"),
1243        }
1244
1245        // Step 2: As a leader, renew the lease.
1246        let res = leader_pg_election
1247            .pg_client
1248            .read()
1249            .await
1250            .query(&leader_pg_election.sql_set.campaign, &[])
1251            .await
1252            .unwrap();
1253        let res: bool = res[0].get(0);
1254        assert!(res);
1255        leader_pg_election.leader_action().await.unwrap();
1256        let new_lease = leader_pg_election
1257            .get_value_with_lease(&leader_pg_election.election_key())
1258            .await
1259            .unwrap()
1260            .unwrap();
1261        assert!(new_lease.leader_value == leader_value);
1262        assert!(
1263            new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1264        );
1265        assert!(leader_pg_election.is_leader());
1266
1267        // Step 3: Something wrong, the leader lease expired.
1268        tokio::time::sleep(Duration::from_secs(2)).await;
1269
1270        let res = leader_pg_election
1271            .pg_client
1272            .read()
1273            .await
1274            .query(&leader_pg_election.sql_set.campaign, &[])
1275            .await
1276            .unwrap();
1277        let res: bool = res[0].get(0);
1278        assert!(res);
1279        leader_pg_election.leader_action().await.unwrap();
1280        let res = leader_pg_election
1281            .get_value_with_lease(&leader_pg_election.election_key())
1282            .await
1283            .unwrap();
1284        assert!(res.is_none());
1285
1286        match rx.recv().await {
1287            Ok(LeaderChangeMessage::StepDown(key)) => {
1288                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1289                assert_eq!(
1290                    String::from_utf8_lossy(key.key()),
1291                    leader_pg_election.election_key()
1292                );
1293                assert_eq!(key.lease_id(), i64::default());
1294                assert_eq!(key.revision(), i64::default());
1295            }
1296            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1297        }
1298
1299        // Step 4: Re-campaign and elected.
1300        let res = leader_pg_election
1301            .pg_client
1302            .read()
1303            .await
1304            .query(&leader_pg_election.sql_set.campaign, &[])
1305            .await
1306            .unwrap();
1307        let res: bool = res[0].get(0);
1308        assert!(res);
1309        leader_pg_election.leader_action().await.unwrap();
1310        let lease = leader_pg_election
1311            .get_value_with_lease(&leader_pg_election.election_key())
1312            .await
1313            .unwrap()
1314            .unwrap();
1315        assert!(lease.leader_value == leader_value);
1316        assert!(lease.expire_time > lease.current);
1317        assert!(leader_pg_election.is_leader());
1318
1319        match rx.recv().await {
1320            Ok(LeaderChangeMessage::Elected(key)) => {
1321                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1322                assert_eq!(
1323                    String::from_utf8_lossy(key.key()),
1324                    leader_pg_election.election_key()
1325                );
1326                assert_eq!(key.lease_id(), i64::default());
1327                assert_eq!(key.revision(), i64::default());
1328            }
1329            _ => panic!("Expected LeaderChangeMessage::Elected"),
1330        }
1331
1332        // Step 5: Something wrong, the leader key is deleted by other followers.
1333        leader_pg_election
1334            .delete_value(&leader_pg_election.election_key())
1335            .await
1336            .unwrap();
1337        leader_pg_election.leader_action().await.unwrap();
1338        let res = leader_pg_election
1339            .get_value_with_lease(&leader_pg_election.election_key())
1340            .await
1341            .unwrap();
1342        assert!(res.is_none());
1343        assert!(!leader_pg_election.is_leader());
1344
1345        match rx.recv().await {
1346            Ok(LeaderChangeMessage::StepDown(key)) => {
1347                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1348                assert_eq!(
1349                    String::from_utf8_lossy(key.key()),
1350                    leader_pg_election.election_key()
1351                );
1352                assert_eq!(key.lease_id(), i64::default());
1353                assert_eq!(key.revision(), i64::default());
1354            }
1355            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1356        }
1357
1358        // Step 6: Re-campaign and elected.
1359        let res = leader_pg_election
1360            .pg_client
1361            .read()
1362            .await
1363            .query(&leader_pg_election.sql_set.campaign, &[])
1364            .await
1365            .unwrap();
1366        let res: bool = res[0].get(0);
1367        assert!(res);
1368        leader_pg_election.leader_action().await.unwrap();
1369        let lease = leader_pg_election
1370            .get_value_with_lease(&leader_pg_election.election_key())
1371            .await
1372            .unwrap()
1373            .unwrap();
1374        assert!(lease.leader_value == leader_value);
1375        assert!(lease.expire_time > lease.current);
1376        assert!(leader_pg_election.is_leader());
1377
1378        match rx.recv().await {
1379            Ok(LeaderChangeMessage::Elected(key)) => {
1380                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1381                assert_eq!(
1382                    String::from_utf8_lossy(key.key()),
1383                    leader_pg_election.election_key()
1384                );
1385                assert_eq!(key.lease_id(), i64::default());
1386                assert_eq!(key.revision(), i64::default());
1387            }
1388            _ => panic!("Expected LeaderChangeMessage::Elected"),
1389        }
1390
1391        // Step 7: Something wrong, the leader key changed by others.
1392        let res = leader_pg_election
1393            .pg_client
1394            .read()
1395            .await
1396            .query(&leader_pg_election.sql_set.campaign, &[])
1397            .await
1398            .unwrap();
1399        let res: bool = res[0].get(0);
1400        assert!(res);
1401        leader_pg_election
1402            .delete_value(&leader_pg_election.election_key())
1403            .await
1404            .unwrap();
1405        leader_pg_election
1406            .put_value_with_lease(
1407                &leader_pg_election.election_key(),
1408                "test",
1409                Duration::from_secs(10),
1410            )
1411            .await
1412            .unwrap();
1413        leader_pg_election.leader_action().await.unwrap();
1414        let res = leader_pg_election
1415            .get_value_with_lease(&leader_pg_election.election_key())
1416            .await
1417            .unwrap();
1418        assert!(res.is_none());
1419        assert!(!leader_pg_election.is_leader());
1420
1421        match rx.recv().await {
1422            Ok(LeaderChangeMessage::StepDown(key)) => {
1423                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1424                assert_eq!(
1425                    String::from_utf8_lossy(key.key()),
1426                    leader_pg_election.election_key()
1427                );
1428                assert_eq!(key.lease_id(), i64::default());
1429                assert_eq!(key.revision(), i64::default());
1430            }
1431            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1432        }
1433
1434        // Clean up
1435        leader_pg_election
1436            .pg_client
1437            .read()
1438            .await
1439            .query(&leader_pg_election.sql_set.step_down, &[])
1440            .await
1441            .unwrap();
1442
1443        drop_table(&leader_pg_election, table_name).await;
1444    }
1445
1446    #[tokio::test]
1447    async fn test_follower_action() {
1448        maybe_skip_postgres_integration_test!();
1449        common_telemetry::init_default_ut_logging();
1450        let uuid = uuid::Uuid::new_v4().to_string();
1451        let table_name = "test_follower_action_greptime_metakv";
1452
1453        let candidate_lease_ttl = Duration::from_secs(5);
1454        let execution_timeout = Duration::from_secs(10);
1455        let statement_timeout = Duration::from_secs(10);
1456        let meta_lease_ttl = Duration::from_secs(2);
1457        let idle_session_timeout = Duration::from_secs(0);
1458        let follower_client = create_postgres_client(
1459            Some(table_name),
1460            execution_timeout,
1461            idle_session_timeout,
1462            statement_timeout,
1463        )
1464        .await
1465        .unwrap();
1466        let (tx, mut rx) = broadcast::channel(100);
1467        let follower_pg_election = PgElection {
1468            leader_value: "test_follower".to_string(),
1469            pg_client: RwLock::new(follower_client),
1470            is_leader: AtomicBool::new(false),
1471            leader_infancy: AtomicBool::new(true),
1472            leader_watcher: tx,
1473            store_key_prefix: uuid.clone(),
1474            candidate_lease_ttl,
1475            meta_lease_ttl,
1476            sql_set: ElectionSqlFactory::new(28322, table_name).build(),
1477        };
1478
1479        let leader_client = create_postgres_client(
1480            Some(table_name),
1481            execution_timeout,
1482            idle_session_timeout,
1483            statement_timeout,
1484        )
1485        .await
1486        .unwrap();
1487        let (tx, _) = broadcast::channel(100);
1488        let leader_pg_election = PgElection {
1489            leader_value: "test_leader".to_string(),
1490            pg_client: RwLock::new(leader_client),
1491            is_leader: AtomicBool::new(false),
1492            leader_infancy: AtomicBool::new(true),
1493            leader_watcher: tx,
1494            store_key_prefix: uuid,
1495            candidate_lease_ttl,
1496            meta_lease_ttl,
1497            sql_set: ElectionSqlFactory::new(28322, table_name).build(),
1498        };
1499
1500        leader_pg_election
1501            .pg_client
1502            .read()
1503            .await
1504            .query(&leader_pg_election.sql_set.campaign, &[])
1505            .await
1506            .unwrap();
1507        leader_pg_election.elected().await.unwrap();
1508
1509        // Step 1: As a follower, the leader exists and the lease is not expired.
1510        follower_pg_election.follower_action().await.unwrap();
1511
1512        // Step 2: As a follower, the leader exists but the lease expired.
1513        tokio::time::sleep(Duration::from_secs(2)).await;
1514        assert!(follower_pg_election.follower_action().await.is_err());
1515
1516        // Step 3: As a follower, the leader does not exist.
1517        leader_pg_election
1518            .delete_value(&leader_pg_election.election_key())
1519            .await
1520            .unwrap();
1521        assert!(follower_pg_election.follower_action().await.is_err());
1522
1523        // Step 4: Follower thinks it's the leader but failed to acquire the lock.
1524        follower_pg_election
1525            .is_leader
1526            .store(true, Ordering::Relaxed);
1527        assert!(follower_pg_election.follower_action().await.is_err());
1528
1529        match rx.recv().await {
1530            Ok(LeaderChangeMessage::StepDown(key)) => {
1531                assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1532                assert_eq!(
1533                    String::from_utf8_lossy(key.key()),
1534                    follower_pg_election.election_key()
1535                );
1536                assert_eq!(key.lease_id(), i64::default());
1537                assert_eq!(key.revision(), i64::default());
1538            }
1539            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1540        }
1541
1542        // Clean up
1543        leader_pg_election
1544            .pg_client
1545            .read()
1546            .await
1547            .query(&leader_pg_election.sql_set.step_down, &[])
1548            .await
1549            .unwrap();
1550
1551        drop_table(&follower_pg_election, table_name).await;
1552    }
1553
1554    #[tokio::test]
1555    async fn test_idle_session_timeout() {
1556        maybe_skip_postgres_integration_test!();
1557        common_telemetry::init_default_ut_logging();
1558        let execution_timeout = Duration::from_secs(10);
1559        let statement_timeout = Duration::from_secs(10);
1560        let idle_session_timeout = Duration::from_secs(1);
1561        let mut client = create_postgres_client(
1562            None,
1563            execution_timeout,
1564            idle_session_timeout,
1565            statement_timeout,
1566        )
1567        .await
1568        .unwrap();
1569        tokio::time::sleep(Duration::from_millis(1100)).await;
1570        // Wait for the idle session timeout.
1571        let err = client.query("SELECT 1", &[]).await.unwrap_err();
1572        assert_matches!(err, error::Error::PostgresExecution { .. });
1573        let error::Error::PostgresExecution { error, .. } = err else {
1574            panic!("Expected PostgresExecution error");
1575        };
1576        assert!(error.is_closed());
1577        // Reset the client and try again.
1578        client.reset_client().await.unwrap();
1579        let _ = client.query("SELECT 1", &[]).await.unwrap();
1580    }
1581}