meta_srv/election/rds/
postgres.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, info, warn};
21use common_time::Timestamp;
22use deadpool_postgres::{Manager, Pool};
23use snafu::{OptionExt, ResultExt, ensure};
24use tokio::sync::{RwLock, broadcast};
25use tokio::time::MissedTickBehavior;
26use tokio_postgres::Row;
27use tokio_postgres::types::ToSql;
28
29use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
30use crate::election::{
31    Election, LeaderChangeMessage, listen_leader_change, send_leader_change_and_set_flags,
32};
33use crate::error::{
34    DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35    Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40    lock_id: u64,
41    schema_name: Option<&'a str>,
42    table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46    campaign: String,
47    step_down: String,
48    // SQL to put a value with expire time.
49    //
50    // Parameters for the query:
51    // `$1`: key,
52    // `$2`: value,
53    // `$3`: lease time in seconds
54    //
55    // Returns:
56    // If the key already exists, return the previous value.
57    put_value_with_lease: String,
58    // SQL to update a value with expire time.
59    //
60    // Parameters for the query:
61    // `$1`: key,
62    // `$2`: previous value,
63    // `$3`: updated value,
64    // `$4`: lease time in seconds
65    update_value_with_lease: String,
66    // SQL to get a value with expire time.
67    //
68    // Parameters:
69    // `$1`: key
70    get_value_with_lease: String,
71    // SQL to get all values with expire time with the given key prefix.
72    //
73    // Parameters:
74    // `$1`: key prefix like 'prefix%'
75    //
76    // Returns:
77    // column 0: value,
78    // column 1: current timestamp
79    get_value_with_lease_by_prefix: String,
80    // SQL to delete a value.
81    //
82    // Parameters:
83    // `$1`: key
84    //
85    // Returns:
86    // column 0: key deleted,
87    // column 1: value deleted
88    delete_value: String,
89}
90
91impl<'a> ElectionSqlFactory<'a> {
92    fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
93        Self {
94            lock_id,
95            schema_name,
96            table_name,
97        }
98    }
99
100    fn table_ident(&self) -> String {
101        match self.schema_name {
102            Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
103            _ => format!("\"{}\"", self.table_name),
104        }
105    }
106
107    fn build(self) -> ElectionSqlSet {
108        ElectionSqlSet {
109            campaign: self.campaign_sql(),
110            step_down: self.step_down_sql(),
111            put_value_with_lease: self.put_value_with_lease_sql(),
112            update_value_with_lease: self.update_value_with_lease_sql(),
113            get_value_with_lease: self.get_value_with_lease_sql(),
114            get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
115            delete_value: self.delete_value_sql(),
116        }
117    }
118
119    fn campaign_sql(&self) -> String {
120        format!("SELECT pg_try_advisory_lock({})", self.lock_id)
121    }
122
123    fn step_down_sql(&self) -> String {
124        format!("SELECT pg_advisory_unlock({})", self.lock_id)
125    }
126
127    fn put_value_with_lease_sql(&self) -> String {
128        let table = self.table_ident();
129        format!(
130            r#"WITH prev AS (
131                SELECT k, v FROM {table} WHERE k = $1
132            ), insert AS (
133                INSERT INTO {table}
134                VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
135                ON CONFLICT (k) DO NOTHING
136            )
137            SELECT k, v FROM prev;
138            "#,
139            table = table,
140            lease_sep = LEASE_SEP
141        )
142    }
143
144    fn update_value_with_lease_sql(&self) -> String {
145        let table = self.table_ident();
146        format!(
147            r#"UPDATE {table}
148               SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
149               WHERE k = $1 AND v = $2"#,
150            table = table,
151            lease_sep = LEASE_SEP
152        )
153    }
154
155    fn get_value_with_lease_sql(&self) -> String {
156        let table = self.table_ident();
157        format!(
158            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
159            table = table
160        )
161    }
162
163    fn get_value_with_lease_by_prefix_sql(&self) -> String {
164        let table = self.table_ident();
165        format!(
166            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
167            table = table
168        )
169    }
170
171    fn delete_value_sql(&self) -> String {
172        let table = self.table_ident();
173        format!(
174            "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
175            table = table
176        )
177    }
178}
179
180/// PgClient for election.
181pub struct ElectionPgClient {
182    current: Option<deadpool::managed::Object<Manager>>,
183    pool: Pool,
184    /// The client-side timeout for statement execution.
185    ///
186    /// This timeout is enforced by the client application and is independent of any server-side timeouts.
187    /// If a statement takes longer than this duration to execute, the client will abort the operation.
188    execution_timeout: Duration,
189
190    /// The idle session timeout.
191    ///
192    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
193    /// If a session remains idle for longer than this duration, the server will terminate it.
194    idle_session_timeout: Duration,
195
196    /// The statement timeout.
197    ///
198    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
199    /// If a statement takes longer than this duration to execute, the server will abort it.
200    statement_timeout: Duration,
201}
202
203impl ElectionPgClient {
204    pub fn new(
205        pool: Pool,
206        execution_timeout: Duration,
207        idle_session_timeout: Duration,
208        statement_timeout: Duration,
209    ) -> Result<ElectionPgClient> {
210        Ok(ElectionPgClient {
211            current: None,
212            pool,
213            execution_timeout,
214            idle_session_timeout,
215            statement_timeout,
216        })
217    }
218
219    fn set_idle_session_timeout_sql(&self) -> String {
220        format!(
221            "SET idle_session_timeout = '{}s';",
222            self.idle_session_timeout.as_secs()
223        )
224    }
225
226    fn set_statement_timeout_sql(&self) -> String {
227        format!(
228            "SET statement_timeout = '{}s';",
229            self.statement_timeout.as_secs()
230        )
231    }
232
233    async fn reset_client(&mut self) -> Result<()> {
234        self.current = None;
235        self.maybe_init_client().await
236    }
237
238    async fn maybe_init_client(&mut self) -> Result<()> {
239        if self.current.is_none() {
240            let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
241
242            self.current = Some(client);
243            // Set idle session timeout and statement timeout.
244            let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
245            self.execute(&idle_session_timeout_sql, &[]).await?;
246            let statement_timeout_sql = self.set_statement_timeout_sql();
247            self.execute(&statement_timeout_sql, &[]).await?;
248        }
249
250        Ok(())
251    }
252
253    /// Returns the result of the query.
254    ///
255    /// # Panics
256    /// if `current` is `None`.
257    async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
258        let result = tokio::time::timeout(
259            self.execution_timeout,
260            self.current.as_ref().unwrap().execute(sql, params),
261        )
262        .await
263        .map_err(|_| {
264            SqlExecutionTimeoutSnafu {
265                sql: sql.to_string(),
266                duration: self.execution_timeout,
267            }
268            .build()
269        })?;
270
271        result.context(PostgresExecutionSnafu { sql })
272    }
273
274    /// Returns the result of the query.
275    ///
276    /// # Panics
277    /// if `current` is `None`.
278    async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
279        let result = tokio::time::timeout(
280            self.execution_timeout,
281            self.current.as_ref().unwrap().query(sql, params),
282        )
283        .await
284        .map_err(|_| {
285            SqlExecutionTimeoutSnafu {
286                sql: sql.to_string(),
287                duration: self.execution_timeout,
288            }
289            .build()
290        })?;
291
292        result.context(PostgresExecutionSnafu { sql })
293    }
294}
295
296/// PostgreSql implementation of Election.
297pub struct PgElection {
298    leader_value: String,
299    pg_client: RwLock<ElectionPgClient>,
300    is_leader: AtomicBool,
301    leader_infancy: AtomicBool,
302    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
303    store_key_prefix: String,
304    candidate_lease_ttl: Duration,
305    meta_lease_ttl: Duration,
306    sql_set: ElectionSqlSet,
307}
308
309impl PgElection {
310    async fn maybe_init_client(&self) -> Result<()> {
311        if self.pg_client.read().await.current.is_none() {
312            self.pg_client.write().await.maybe_init_client().await?;
313        }
314
315        Ok(())
316    }
317
318    #[allow(clippy::too_many_arguments)]
319    pub async fn with_pg_client(
320        leader_value: String,
321        pg_client: ElectionPgClient,
322        store_key_prefix: String,
323        candidate_lease_ttl: Duration,
324        meta_lease_ttl: Duration,
325        schema_name: Option<&str>,
326        table_name: &str,
327        lock_id: u64,
328    ) -> Result<ElectionRef> {
329        if let Some(s) = schema_name {
330            common_telemetry::info!("PgElection uses schema: {}", s);
331        } else {
332            common_telemetry::info!("PgElection uses default search_path (no schema provided)");
333        }
334        let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
335
336        let tx = listen_leader_change(leader_value.clone());
337        Ok(Arc::new(Self {
338            leader_value,
339            pg_client: RwLock::new(pg_client),
340            is_leader: AtomicBool::new(false),
341            leader_infancy: AtomicBool::new(false),
342            leader_watcher: tx,
343            store_key_prefix,
344            candidate_lease_ttl,
345            meta_lease_ttl,
346            sql_set: sql_factory.build(),
347        }))
348    }
349
350    fn election_key(&self) -> String {
351        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
352    }
353
354    fn candidate_root(&self) -> String {
355        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
356    }
357
358    fn candidate_key(&self) -> String {
359        format!("{}{}", self.candidate_root(), self.leader_value)
360    }
361}
362
363#[async_trait::async_trait]
364impl Election for PgElection {
365    type Leader = LeaderValue;
366
367    fn is_leader(&self) -> bool {
368        self.is_leader.load(Ordering::Relaxed)
369    }
370
371    fn in_leader_infancy(&self) -> bool {
372        self.leader_infancy
373            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
374            .is_ok()
375    }
376
377    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
378        let key = self.candidate_key();
379        let node_info =
380            serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
381                input: format!("{node_info:?}"),
382            })?;
383        let res = self
384            .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
385            .await?;
386        // May registered before, just update the lease.
387        if !res {
388            self.delete_value(&key).await?;
389            self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
390                .await?;
391        }
392
393        // Check if the current lease has expired and renew the lease.
394        let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
395        loop {
396            let _ = keep_alive_interval.tick().await;
397
398            let lease = self
399                .get_value_with_lease(&key)
400                .await?
401                .context(UnexpectedSnafu {
402                    violated: format!("Failed to get lease for key: {:?}", key),
403                })?;
404
405            ensure!(
406                lease.expire_time > lease.current,
407                UnexpectedSnafu {
408                    violated: format!(
409                        "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
410                        lease.expire_time, lease.current, key
411                    ),
412                }
413            );
414
415            // Safety: origin is Some since we are using `get_value_with_lease` with `true`.
416            self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
417                .await?;
418        }
419    }
420
421    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
422        let key_prefix = self.candidate_root();
423        let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
424        // Remove expired candidates
425        candidates.retain(|c| c.1 > current);
426        let mut valid_candidates = Vec::with_capacity(candidates.len());
427        for (c, _) in candidates {
428            let node_info: MetasrvNodeInfo =
429                serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
430                    input: format!("{:?}", c),
431                })?;
432            valid_candidates.push(node_info);
433        }
434        Ok(valid_candidates)
435    }
436
437    /// Attempts to acquire leadership by executing a campaign. This function continuously checks
438    /// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
439    ///
440    /// The function operates in a loop, where it:
441    ///
442    /// 1. Waits for a predefined interval before attempting to acquire the lock again.
443    /// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock.
444    /// 3. Checks the result of the query:
445    ///    - If the lock is successfully acquired (result is true), it calls the `leader_action` method
446    ///      to perform actions as the leader.
447    ///    - If the lock is not acquired (result is false), it calls the `follower_action` method
448    ///      to perform actions as a follower.
449    async fn campaign(&self) -> Result<()> {
450        let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
451        keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
452
453        self.maybe_init_client().await?;
454        loop {
455            let res = self
456                .pg_client
457                .read()
458                .await
459                .query(&self.sql_set.campaign, &[])
460                .await?;
461            let row = res.first().context(UnexpectedSnafu {
462                violated: "Failed to get the result of acquiring advisory lock",
463            })?;
464            let is_leader = row.try_get(0).map_err(|_| {
465                UnexpectedSnafu {
466                    violated: "Failed to get the result of get lock",
467                }
468                .build()
469            })?;
470            if is_leader {
471                self.leader_action().await?;
472            } else {
473                self.follower_action().await?;
474            }
475            let _ = keep_alive_interval.tick().await;
476        }
477    }
478
479    async fn reset_campaign(&self) {
480        info!("Resetting campaign");
481        if self.is_leader.load(Ordering::Relaxed) {
482            if let Err(err) = self.step_down_without_lock().await {
483                error!(err; "Failed to step down without lock");
484            }
485            info!("Step down without lock successfully, due to reset campaign");
486        }
487        if let Err(err) = self.pg_client.write().await.reset_client().await {
488            error!(err; "Failed to reset client");
489        }
490    }
491
492    async fn leader(&self) -> Result<Self::Leader> {
493        if self.is_leader.load(Ordering::Relaxed) {
494            Ok(self.leader_value.as_bytes().into())
495        } else {
496            let key = self.election_key();
497            if let Some(lease) = self.get_value_with_lease(&key).await? {
498                ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
499                Ok(lease.leader_value.as_bytes().into())
500            } else {
501                NoLeaderSnafu.fail()
502            }
503        }
504    }
505
506    async fn resign(&self) -> Result<()> {
507        todo!()
508    }
509
510    fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
511        self.leader_watcher.subscribe()
512    }
513}
514
515impl PgElection {
516    /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
517    async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
518        let key = key.as_bytes();
519        self.maybe_init_client().await?;
520        let res = self
521            .pg_client
522            .read()
523            .await
524            .query(&self.sql_set.get_value_with_lease, &[&key])
525            .await?;
526
527        if res.is_empty() {
528            Ok(None)
529        } else {
530            // Safety: Checked if res is empty above.
531            let current_time_str = res[0].try_get(1).unwrap_or_default();
532            let current_time = match Timestamp::from_str(current_time_str, None) {
533                Ok(ts) => ts,
534                Err(_) => UnexpectedSnafu {
535                    violated: format!("Invalid timestamp: {}", current_time_str),
536                }
537                .fail()?,
538            };
539            // Safety: Checked if res is empty above.
540            let value_and_expire_time =
541                String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
542            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
543
544            Ok(Some(Lease {
545                leader_value: value,
546                expire_time,
547                current: current_time,
548                origin: value_and_expire_time.to_string(),
549            }))
550        }
551    }
552
553    /// Returns all values and expire time with the given key prefix. Also returns the current time.
554    async fn get_value_with_lease_by_prefix(
555        &self,
556        key_prefix: &str,
557    ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
558        let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
559        self.maybe_init_client().await?;
560        let res = self
561            .pg_client
562            .read()
563            .await
564            .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
565            .await?;
566
567        let mut values_with_leases = vec![];
568        let mut current = Timestamp::default();
569        for row in res {
570            let current_time_str = row.try_get(1).unwrap_or_default();
571            current = match Timestamp::from_str(current_time_str, None) {
572                Ok(ts) => ts,
573                Err(_) => UnexpectedSnafu {
574                    violated: format!("Invalid timestamp: {}", current_time_str),
575                }
576                .fail()?,
577            };
578
579            let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
580            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
581
582            values_with_leases.push((value, expire_time));
583        }
584        Ok((values_with_leases, current))
585    }
586
587    async fn update_value_with_lease(
588        &self,
589        key: &str,
590        prev: &str,
591        updated: &str,
592        lease_ttl: Duration,
593    ) -> Result<()> {
594        let key = key.as_bytes();
595        let prev = prev.as_bytes();
596        self.maybe_init_client().await?;
597        let lease_ttl_secs = lease_ttl.as_secs() as f64;
598        let res = self
599            .pg_client
600            .read()
601            .await
602            .execute(
603                &self.sql_set.update_value_with_lease,
604                &[&key, &prev, &updated, &lease_ttl_secs],
605            )
606            .await?;
607
608        ensure!(
609            res == 1,
610            UnexpectedSnafu {
611                violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
612            }
613        );
614
615        Ok(())
616    }
617
618    /// Returns `true` if the insertion is successful
619    async fn put_value_with_lease(
620        &self,
621        key: &str,
622        value: &str,
623        lease_ttl: Duration,
624    ) -> Result<bool> {
625        let key = key.as_bytes();
626        let lease_ttl_secs = lease_ttl.as_secs() as f64;
627        let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
628        self.maybe_init_client().await?;
629        let res = self
630            .pg_client
631            .read()
632            .await
633            .query(&self.sql_set.put_value_with_lease, &params)
634            .await?;
635        Ok(res.is_empty())
636    }
637
638    /// Returns `true` if the deletion is successful.
639    /// Caution: Should only delete the key if the lease is expired.
640    async fn delete_value(&self, key: &str) -> Result<bool> {
641        let key = key.as_bytes();
642        self.maybe_init_client().await?;
643        let res = self
644            .pg_client
645            .read()
646            .await
647            .query(&self.sql_set.delete_value, &[&key])
648            .await?;
649
650        Ok(res.len() == 1)
651    }
652
653    /// Handles the actions of a leader in the election process.
654    ///
655    /// This function performs the following checks and actions:
656    ///
657    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
658    ///   it attempts to renew the lease. It checks if the lease is still valid and either renews it
659    ///   or steps down if it has expired.
660    ///
661    ///   - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease
662    ///     by updating the value associated with the election key.
663    ///   - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning
664    ///     and steps down, initiating a new campaign for leadership.
665    ///   - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning
666    ///     indicating that it still holds the lock and steps down to re-initiate the campaign. This may
667    ///     happen if the leader has failed to renew the lease and the session has expired, and recovery
668    ///     after a period of time during which other leaders have been elected and stepped down.
669    ///   - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
670    ///
671    /// - **Case 2**: If the current instance is not leader previously, it calls the `elected` method
672    ///   as a newly elected leader.
673    async fn leader_action(&self) -> Result<()> {
674        let key = self.election_key();
675        // Case 1
676        if self.is_leader() {
677            match self.get_value_with_lease(&key).await? {
678                Some(lease) => {
679                    match (
680                        lease.leader_value == self.leader_value,
681                        lease.expire_time > lease.current,
682                    ) {
683                        // Case 1.1
684                        (true, true) => {
685                            // Safety: prev is Some since we are using `get_value_with_lease` with `true`.
686                            self.update_value_with_lease(
687                                &key,
688                                &lease.origin,
689                                &self.leader_value,
690                                self.meta_lease_ttl,
691                            )
692                            .await?;
693                        }
694                        // Case 1.2
695                        (true, false) => {
696                            warn!("Leader lease expired, now stepping down.");
697                            self.step_down().await?;
698                        }
699                        // Case 1.3
700                        (false, _) => {
701                            warn!(
702                                "Leader lease not found, but still hold the lock. Now stepping down."
703                            );
704                            self.step_down().await?;
705                        }
706                    }
707                }
708                // Case 1.4
709                None => {
710                    warn!("Leader lease not found, but still hold the lock. Now stepping down.");
711                    self.step_down().await?;
712                }
713            }
714        // Case 2
715        } else {
716            self.elected().await?;
717        }
718        Ok(())
719    }
720
721    /// Handles the actions of a follower in the election process.
722    ///
723    /// This function performs the following checks and actions:
724    ///
725    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
726    ///   it steps down without deleting the key.
727    /// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error
728    ///   to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
729    ///   will be released.
730    /// - **Case 3**: If all checks pass, the function returns without performing any actions.
731    async fn follower_action(&self) -> Result<()> {
732        let key = self.election_key();
733        // Case 1
734        if self.is_leader() {
735            self.step_down_without_lock().await?;
736        }
737        let lease = self
738            .get_value_with_lease(&key)
739            .await?
740            .context(NoLeaderSnafu)?;
741        // Case 2
742        ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
743        // Case 3
744        Ok(())
745    }
746
747    /// Step down the leader. The leader should delete the key and notify the leader watcher.
748    ///
749    /// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected.
750    ///
751    /// ## Caution:
752    /// Should only step down while holding the advisory lock.
753    async fn step_down(&self) -> Result<()> {
754        let key = self.election_key();
755        let leader_key = RdsLeaderKey {
756            name: self.leader_value.clone().into_bytes(),
757            key: key.clone().into_bytes(),
758            ..Default::default()
759        };
760        self.delete_value(&key).await?;
761        self.maybe_init_client().await?;
762        self.pg_client
763            .read()
764            .await
765            .query(&self.sql_set.step_down, &[])
766            .await?;
767        send_leader_change_and_set_flags(
768            &self.is_leader,
769            &self.leader_infancy,
770            &self.leader_watcher,
771            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
772        );
773        Ok(())
774    }
775
776    /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
777    async fn step_down_without_lock(&self) -> Result<()> {
778        let key = self.election_key().into_bytes();
779        let leader_key = RdsLeaderKey {
780            name: self.leader_value.clone().into_bytes(),
781            key: key.clone(),
782            ..Default::default()
783        };
784        send_leader_change_and_set_flags(
785            &self.is_leader,
786            &self.leader_infancy,
787            &self.leader_watcher,
788            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
789        );
790        Ok(())
791    }
792
793    /// Elected as leader. The leader should put the key and notify the leader watcher.
794    /// Caution: Should only elected while holding the advisory lock.
795    async fn elected(&self) -> Result<()> {
796        let key = self.election_key();
797        let leader_key = RdsLeaderKey {
798            name: self.leader_value.clone().into_bytes(),
799            key: key.clone().into_bytes(),
800            ..Default::default()
801        };
802        self.delete_value(&key).await?;
803        self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
804            .await?;
805
806        if self
807            .is_leader
808            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
809            .is_ok()
810        {
811            self.leader_infancy.store(true, Ordering::Release);
812
813            if let Err(e) = self
814                .leader_watcher
815                .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
816            {
817                error!(e; "Failed to send leader change message");
818            }
819        }
820        Ok(())
821    }
822}
823
824#[cfg(test)]
825mod tests {
826    use std::assert_matches::assert_matches;
827    use std::env;
828
829    use common_meta::maybe_skip_postgres_integration_test;
830
831    use super::*;
832    use crate::error;
833    use crate::utils::postgres::create_postgres_pool;
834
835    async fn create_postgres_client(
836        table_name: Option<&str>,
837        execution_timeout: Duration,
838        idle_session_timeout: Duration,
839        statement_timeout: Duration,
840    ) -> Result<ElectionPgClient> {
841        let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
842        if endpoint.is_empty() {
843            return UnexpectedSnafu {
844                violated: "Postgres endpoint is empty".to_string(),
845            }
846            .fail();
847        }
848        let pool = create_postgres_pool(&[endpoint], None, None).await.unwrap();
849        let mut pg_client = ElectionPgClient::new(
850            pool,
851            execution_timeout,
852            idle_session_timeout,
853            statement_timeout,
854        )
855        .unwrap();
856        pg_client.maybe_init_client().await?;
857        if let Some(table_name) = table_name {
858            let create_table_sql = format!(
859                "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
860                table_name
861            );
862            pg_client.execute(&create_table_sql, &[]).await?;
863        }
864        Ok(pg_client)
865    }
866
867    async fn drop_table(pg_election: &PgElection, table_name: &str) {
868        let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
869        pg_election
870            .pg_client
871            .read()
872            .await
873            .execute(&sql, &[])
874            .await
875            .unwrap();
876    }
877
878    #[tokio::test]
879    async fn test_postgres_crud() {
880        maybe_skip_postgres_integration_test!();
881        let key = "test_key".to_string();
882        let value = "test_value".to_string();
883
884        let uuid = uuid::Uuid::new_v4().to_string();
885        let table_name = "test_postgres_crud_greptime_metakv";
886        let candidate_lease_ttl = Duration::from_secs(10);
887        let execution_timeout = Duration::from_secs(10);
888        let statement_timeout = Duration::from_secs(10);
889        let meta_lease_ttl = Duration::from_secs(2);
890        let idle_session_timeout = Duration::from_secs(0);
891        let client = create_postgres_client(
892            Some(table_name),
893            execution_timeout,
894            idle_session_timeout,
895            statement_timeout,
896        )
897        .await
898        .unwrap();
899
900        let (tx, _) = broadcast::channel(100);
901        let pg_election = PgElection {
902            leader_value: "test_leader".to_string(),
903            pg_client: RwLock::new(client),
904            is_leader: AtomicBool::new(false),
905            leader_infancy: AtomicBool::new(true),
906            leader_watcher: tx,
907            store_key_prefix: uuid,
908            candidate_lease_ttl,
909            meta_lease_ttl,
910            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
911        };
912
913        let res = pg_election
914            .put_value_with_lease(&key, &value, candidate_lease_ttl)
915            .await
916            .unwrap();
917        assert!(res);
918
919        let lease = pg_election
920            .get_value_with_lease(&key)
921            .await
922            .unwrap()
923            .unwrap();
924        assert_eq!(lease.leader_value, value);
925
926        pg_election
927            .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
928            .await
929            .unwrap();
930
931        let res = pg_election.delete_value(&key).await.unwrap();
932        assert!(res);
933
934        let res = pg_election.get_value_with_lease(&key).await.unwrap();
935        assert!(res.is_none());
936
937        for i in 0..10 {
938            let key = format!("test_key_{}", i);
939            let value = format!("test_value_{}", i);
940            pg_election
941                .put_value_with_lease(&key, &value, candidate_lease_ttl)
942                .await
943                .unwrap();
944        }
945
946        let key_prefix = "test_key".to_string();
947        let (res, _) = pg_election
948            .get_value_with_lease_by_prefix(&key_prefix)
949            .await
950            .unwrap();
951        assert_eq!(res.len(), 10);
952
953        for i in 0..10 {
954            let key = format!("test_key_{}", i);
955            let res = pg_election.delete_value(&key).await.unwrap();
956            assert!(res);
957        }
958
959        let (res, current) = pg_election
960            .get_value_with_lease_by_prefix(&key_prefix)
961            .await
962            .unwrap();
963        assert!(res.is_empty());
964        assert!(current == Timestamp::default());
965
966        drop_table(&pg_election, table_name).await;
967    }
968
969    async fn candidate(
970        leader_value: String,
971        candidate_lease_ttl: Duration,
972        store_key_prefix: String,
973        table_name: String,
974    ) {
975        let execution_timeout = Duration::from_secs(10);
976        let statement_timeout = Duration::from_secs(10);
977        let meta_lease_ttl = Duration::from_secs(2);
978        let idle_session_timeout = Duration::from_secs(0);
979        let client = create_postgres_client(
980            None,
981            execution_timeout,
982            idle_session_timeout,
983            statement_timeout,
984        )
985        .await
986        .unwrap();
987
988        let (tx, _) = broadcast::channel(100);
989        let pg_election = PgElection {
990            leader_value,
991            pg_client: RwLock::new(client),
992            is_leader: AtomicBool::new(false),
993            leader_infancy: AtomicBool::new(true),
994            leader_watcher: tx,
995            store_key_prefix,
996            candidate_lease_ttl,
997            meta_lease_ttl,
998            sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
999        };
1000
1001        let node_info = MetasrvNodeInfo {
1002            addr: "test_addr".to_string(),
1003            version: "test_version".to_string(),
1004            git_commit: "test_git_commit".to_string(),
1005            start_time_ms: 0,
1006            total_cpu_millicores: 0,
1007            total_memory_bytes: 0,
1008            cpu_usage_millicores: 0,
1009            memory_usage_bytes: 0,
1010            hostname: "test_hostname".to_string(),
1011        };
1012        pg_election.register_candidate(&node_info).await.unwrap();
1013    }
1014
1015    #[tokio::test]
1016    async fn test_candidate_registration() {
1017        maybe_skip_postgres_integration_test!();
1018        let leader_value_prefix = "test_leader".to_string();
1019        let uuid = uuid::Uuid::new_v4().to_string();
1020        let table_name = "test_candidate_registration_greptime_metakv";
1021        let mut handles = vec![];
1022        let candidate_lease_ttl = Duration::from_secs(5);
1023        let execution_timeout = Duration::from_secs(10);
1024        let statement_timeout = Duration::from_secs(10);
1025        let meta_lease_ttl = Duration::from_secs(2);
1026        let idle_session_timeout = Duration::from_secs(0);
1027        let client = create_postgres_client(
1028            Some(table_name),
1029            execution_timeout,
1030            idle_session_timeout,
1031            statement_timeout,
1032        )
1033        .await
1034        .unwrap();
1035
1036        for i in 0..10 {
1037            let leader_value = format!("{}{}", leader_value_prefix, i);
1038            let handle = tokio::spawn(candidate(
1039                leader_value,
1040                candidate_lease_ttl,
1041                uuid.clone(),
1042                table_name.to_string(),
1043            ));
1044            handles.push(handle);
1045        }
1046        // Wait for candidates to register themselves and renew their leases at least once.
1047        tokio::time::sleep(Duration::from_secs(3)).await;
1048
1049        let (tx, _) = broadcast::channel(100);
1050        let leader_value = "test_leader".to_string();
1051        let pg_election = PgElection {
1052            leader_value,
1053            pg_client: RwLock::new(client),
1054            is_leader: AtomicBool::new(false),
1055            leader_infancy: AtomicBool::new(true),
1056            leader_watcher: tx,
1057            store_key_prefix: uuid.clone(),
1058            candidate_lease_ttl,
1059            meta_lease_ttl,
1060            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1061        };
1062
1063        let candidates = pg_election.all_candidates().await.unwrap();
1064        assert_eq!(candidates.len(), 10);
1065
1066        for handle in handles {
1067            handle.abort();
1068        }
1069
1070        // Wait for the candidate leases to expire.
1071        tokio::time::sleep(Duration::from_secs(5)).await;
1072        let candidates = pg_election.all_candidates().await.unwrap();
1073        assert!(candidates.is_empty());
1074
1075        // Garbage collection
1076        for i in 0..10 {
1077            let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1078            let res = pg_election.delete_value(&key).await.unwrap();
1079            assert!(res);
1080        }
1081
1082        drop_table(&pg_election, table_name).await;
1083    }
1084
1085    #[tokio::test]
1086    async fn test_elected_and_step_down() {
1087        maybe_skip_postgres_integration_test!();
1088        let leader_value = "test_leader".to_string();
1089        let uuid = uuid::Uuid::new_v4().to_string();
1090        let table_name = "test_elected_and_step_down_greptime_metakv";
1091        let candidate_lease_ttl = Duration::from_secs(5);
1092        let execution_timeout = Duration::from_secs(10);
1093        let statement_timeout = Duration::from_secs(10);
1094        let meta_lease_ttl = Duration::from_secs(2);
1095        let idle_session_timeout = Duration::from_secs(0);
1096        let client = create_postgres_client(
1097            Some(table_name),
1098            execution_timeout,
1099            idle_session_timeout,
1100            statement_timeout,
1101        )
1102        .await
1103        .unwrap();
1104
1105        let (tx, mut rx) = broadcast::channel(100);
1106        let leader_pg_election = PgElection {
1107            leader_value: leader_value.clone(),
1108            pg_client: RwLock::new(client),
1109            is_leader: AtomicBool::new(false),
1110            leader_infancy: AtomicBool::new(true),
1111            leader_watcher: tx,
1112            store_key_prefix: uuid,
1113            candidate_lease_ttl,
1114            meta_lease_ttl,
1115            sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1116        };
1117
1118        leader_pg_election.elected().await.unwrap();
1119        let lease = leader_pg_election
1120            .get_value_with_lease(&leader_pg_election.election_key())
1121            .await
1122            .unwrap()
1123            .unwrap();
1124        assert!(lease.leader_value == leader_value);
1125        assert!(lease.expire_time > lease.current);
1126        assert!(leader_pg_election.is_leader());
1127
1128        match rx.recv().await {
1129            Ok(LeaderChangeMessage::Elected(key)) => {
1130                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1131                assert_eq!(
1132                    String::from_utf8_lossy(key.key()),
1133                    leader_pg_election.election_key()
1134                );
1135                assert_eq!(key.lease_id(), i64::default());
1136                assert_eq!(key.revision(), i64::default());
1137            }
1138            _ => panic!("Expected LeaderChangeMessage::Elected"),
1139        }
1140
1141        leader_pg_election.step_down_without_lock().await.unwrap();
1142        let lease = leader_pg_election
1143            .get_value_with_lease(&leader_pg_election.election_key())
1144            .await
1145            .unwrap()
1146            .unwrap();
1147        assert!(lease.leader_value == leader_value);
1148        assert!(!leader_pg_election.is_leader());
1149
1150        match rx.recv().await {
1151            Ok(LeaderChangeMessage::StepDown(key)) => {
1152                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1153                assert_eq!(
1154                    String::from_utf8_lossy(key.key()),
1155                    leader_pg_election.election_key()
1156                );
1157                assert_eq!(key.lease_id(), i64::default());
1158                assert_eq!(key.revision(), i64::default());
1159            }
1160            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1161        }
1162
1163        leader_pg_election.elected().await.unwrap();
1164        let lease = leader_pg_election
1165            .get_value_with_lease(&leader_pg_election.election_key())
1166            .await
1167            .unwrap()
1168            .unwrap();
1169        assert!(lease.leader_value == leader_value);
1170        assert!(lease.expire_time > lease.current);
1171        assert!(leader_pg_election.is_leader());
1172
1173        match rx.recv().await {
1174            Ok(LeaderChangeMessage::Elected(key)) => {
1175                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1176                assert_eq!(
1177                    String::from_utf8_lossy(key.key()),
1178                    leader_pg_election.election_key()
1179                );
1180                assert_eq!(key.lease_id(), i64::default());
1181                assert_eq!(key.revision(), i64::default());
1182            }
1183            _ => panic!("Expected LeaderChangeMessage::Elected"),
1184        }
1185
1186        leader_pg_election.step_down().await.unwrap();
1187        let res = leader_pg_election
1188            .get_value_with_lease(&leader_pg_election.election_key())
1189            .await
1190            .unwrap();
1191        assert!(res.is_none());
1192        assert!(!leader_pg_election.is_leader());
1193
1194        match rx.recv().await {
1195            Ok(LeaderChangeMessage::StepDown(key)) => {
1196                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1197                assert_eq!(
1198                    String::from_utf8_lossy(key.key()),
1199                    leader_pg_election.election_key()
1200                );
1201                assert_eq!(key.lease_id(), i64::default());
1202                assert_eq!(key.revision(), i64::default());
1203            }
1204            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1205        }
1206
1207        drop_table(&leader_pg_election, table_name).await;
1208    }
1209
1210    #[tokio::test]
1211    async fn test_leader_action() {
1212        maybe_skip_postgres_integration_test!();
1213        let leader_value = "test_leader".to_string();
1214        let uuid = uuid::Uuid::new_v4().to_string();
1215        let table_name = "test_leader_action_greptime_metakv";
1216        let candidate_lease_ttl = Duration::from_secs(5);
1217        let execution_timeout = Duration::from_secs(10);
1218        let statement_timeout = Duration::from_secs(10);
1219        let meta_lease_ttl = Duration::from_secs(2);
1220        let idle_session_timeout = Duration::from_secs(0);
1221        let client = create_postgres_client(
1222            Some(table_name),
1223            execution_timeout,
1224            idle_session_timeout,
1225            statement_timeout,
1226        )
1227        .await
1228        .unwrap();
1229
1230        let (tx, mut rx) = broadcast::channel(100);
1231        let leader_pg_election = PgElection {
1232            leader_value: leader_value.clone(),
1233            pg_client: RwLock::new(client),
1234            is_leader: AtomicBool::new(false),
1235            leader_infancy: AtomicBool::new(true),
1236            leader_watcher: tx,
1237            store_key_prefix: uuid,
1238            candidate_lease_ttl,
1239            meta_lease_ttl,
1240            sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1241        };
1242
1243        // Step 1: No leader exists, campaign and elected.
1244        let res = leader_pg_election
1245            .pg_client
1246            .read()
1247            .await
1248            .query(&leader_pg_election.sql_set.campaign, &[])
1249            .await
1250            .unwrap();
1251        let res: bool = res[0].get(0);
1252        assert!(res);
1253        leader_pg_election.leader_action().await.unwrap();
1254        let lease = leader_pg_election
1255            .get_value_with_lease(&leader_pg_election.election_key())
1256            .await
1257            .unwrap()
1258            .unwrap();
1259        assert!(lease.leader_value == leader_value);
1260        assert!(lease.expire_time > lease.current);
1261        assert!(leader_pg_election.is_leader());
1262
1263        match rx.recv().await {
1264            Ok(LeaderChangeMessage::Elected(key)) => {
1265                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1266                assert_eq!(
1267                    String::from_utf8_lossy(key.key()),
1268                    leader_pg_election.election_key()
1269                );
1270                assert_eq!(key.lease_id(), i64::default());
1271                assert_eq!(key.revision(), i64::default());
1272            }
1273            _ => panic!("Expected LeaderChangeMessage::Elected"),
1274        }
1275
1276        // Step 2: As a leader, renew the lease.
1277        let res = leader_pg_election
1278            .pg_client
1279            .read()
1280            .await
1281            .query(&leader_pg_election.sql_set.campaign, &[])
1282            .await
1283            .unwrap();
1284        let res: bool = res[0].get(0);
1285        assert!(res);
1286        leader_pg_election.leader_action().await.unwrap();
1287        let new_lease = leader_pg_election
1288            .get_value_with_lease(&leader_pg_election.election_key())
1289            .await
1290            .unwrap()
1291            .unwrap();
1292        assert!(new_lease.leader_value == leader_value);
1293        assert!(
1294            new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1295        );
1296        assert!(leader_pg_election.is_leader());
1297
1298        // Step 3: Something wrong, the leader lease expired.
1299        tokio::time::sleep(Duration::from_secs(2)).await;
1300
1301        let res = leader_pg_election
1302            .pg_client
1303            .read()
1304            .await
1305            .query(&leader_pg_election.sql_set.campaign, &[])
1306            .await
1307            .unwrap();
1308        let res: bool = res[0].get(0);
1309        assert!(res);
1310        leader_pg_election.leader_action().await.unwrap();
1311        let res = leader_pg_election
1312            .get_value_with_lease(&leader_pg_election.election_key())
1313            .await
1314            .unwrap();
1315        assert!(res.is_none());
1316
1317        match rx.recv().await {
1318            Ok(LeaderChangeMessage::StepDown(key)) => {
1319                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1320                assert_eq!(
1321                    String::from_utf8_lossy(key.key()),
1322                    leader_pg_election.election_key()
1323                );
1324                assert_eq!(key.lease_id(), i64::default());
1325                assert_eq!(key.revision(), i64::default());
1326            }
1327            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1328        }
1329
1330        // Step 4: Re-campaign and elected.
1331        let res = leader_pg_election
1332            .pg_client
1333            .read()
1334            .await
1335            .query(&leader_pg_election.sql_set.campaign, &[])
1336            .await
1337            .unwrap();
1338        let res: bool = res[0].get(0);
1339        assert!(res);
1340        leader_pg_election.leader_action().await.unwrap();
1341        let lease = leader_pg_election
1342            .get_value_with_lease(&leader_pg_election.election_key())
1343            .await
1344            .unwrap()
1345            .unwrap();
1346        assert!(lease.leader_value == leader_value);
1347        assert!(lease.expire_time > lease.current);
1348        assert!(leader_pg_election.is_leader());
1349
1350        match rx.recv().await {
1351            Ok(LeaderChangeMessage::Elected(key)) => {
1352                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1353                assert_eq!(
1354                    String::from_utf8_lossy(key.key()),
1355                    leader_pg_election.election_key()
1356                );
1357                assert_eq!(key.lease_id(), i64::default());
1358                assert_eq!(key.revision(), i64::default());
1359            }
1360            _ => panic!("Expected LeaderChangeMessage::Elected"),
1361        }
1362
1363        // Step 5: Something wrong, the leader key is deleted by other followers.
1364        leader_pg_election
1365            .delete_value(&leader_pg_election.election_key())
1366            .await
1367            .unwrap();
1368        leader_pg_election.leader_action().await.unwrap();
1369        let res = leader_pg_election
1370            .get_value_with_lease(&leader_pg_election.election_key())
1371            .await
1372            .unwrap();
1373        assert!(res.is_none());
1374        assert!(!leader_pg_election.is_leader());
1375
1376        match rx.recv().await {
1377            Ok(LeaderChangeMessage::StepDown(key)) => {
1378                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1379                assert_eq!(
1380                    String::from_utf8_lossy(key.key()),
1381                    leader_pg_election.election_key()
1382                );
1383                assert_eq!(key.lease_id(), i64::default());
1384                assert_eq!(key.revision(), i64::default());
1385            }
1386            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1387        }
1388
1389        // Step 6: Re-campaign and elected.
1390        let res = leader_pg_election
1391            .pg_client
1392            .read()
1393            .await
1394            .query(&leader_pg_election.sql_set.campaign, &[])
1395            .await
1396            .unwrap();
1397        let res: bool = res[0].get(0);
1398        assert!(res);
1399        leader_pg_election.leader_action().await.unwrap();
1400        let lease = leader_pg_election
1401            .get_value_with_lease(&leader_pg_election.election_key())
1402            .await
1403            .unwrap()
1404            .unwrap();
1405        assert!(lease.leader_value == leader_value);
1406        assert!(lease.expire_time > lease.current);
1407        assert!(leader_pg_election.is_leader());
1408
1409        match rx.recv().await {
1410            Ok(LeaderChangeMessage::Elected(key)) => {
1411                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1412                assert_eq!(
1413                    String::from_utf8_lossy(key.key()),
1414                    leader_pg_election.election_key()
1415                );
1416                assert_eq!(key.lease_id(), i64::default());
1417                assert_eq!(key.revision(), i64::default());
1418            }
1419            _ => panic!("Expected LeaderChangeMessage::Elected"),
1420        }
1421
1422        // Step 7: Something wrong, the leader key changed by others.
1423        let res = leader_pg_election
1424            .pg_client
1425            .read()
1426            .await
1427            .query(&leader_pg_election.sql_set.campaign, &[])
1428            .await
1429            .unwrap();
1430        let res: bool = res[0].get(0);
1431        assert!(res);
1432        leader_pg_election
1433            .delete_value(&leader_pg_election.election_key())
1434            .await
1435            .unwrap();
1436        leader_pg_election
1437            .put_value_with_lease(
1438                &leader_pg_election.election_key(),
1439                "test",
1440                Duration::from_secs(10),
1441            )
1442            .await
1443            .unwrap();
1444        leader_pg_election.leader_action().await.unwrap();
1445        let res = leader_pg_election
1446            .get_value_with_lease(&leader_pg_election.election_key())
1447            .await
1448            .unwrap();
1449        assert!(res.is_none());
1450        assert!(!leader_pg_election.is_leader());
1451
1452        match rx.recv().await {
1453            Ok(LeaderChangeMessage::StepDown(key)) => {
1454                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1455                assert_eq!(
1456                    String::from_utf8_lossy(key.key()),
1457                    leader_pg_election.election_key()
1458                );
1459                assert_eq!(key.lease_id(), i64::default());
1460                assert_eq!(key.revision(), i64::default());
1461            }
1462            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1463        }
1464
1465        // Clean up
1466        leader_pg_election
1467            .pg_client
1468            .read()
1469            .await
1470            .query(&leader_pg_election.sql_set.step_down, &[])
1471            .await
1472            .unwrap();
1473
1474        drop_table(&leader_pg_election, table_name).await;
1475    }
1476
1477    #[tokio::test]
1478    async fn test_follower_action() {
1479        maybe_skip_postgres_integration_test!();
1480        common_telemetry::init_default_ut_logging();
1481        let uuid = uuid::Uuid::new_v4().to_string();
1482        let table_name = "test_follower_action_greptime_metakv";
1483
1484        let candidate_lease_ttl = Duration::from_secs(5);
1485        let execution_timeout = Duration::from_secs(10);
1486        let statement_timeout = Duration::from_secs(10);
1487        let meta_lease_ttl = Duration::from_secs(2);
1488        let idle_session_timeout = Duration::from_secs(0);
1489        let follower_client = create_postgres_client(
1490            Some(table_name),
1491            execution_timeout,
1492            idle_session_timeout,
1493            statement_timeout,
1494        )
1495        .await
1496        .unwrap();
1497        let (tx, mut rx) = broadcast::channel(100);
1498        let follower_pg_election = PgElection {
1499            leader_value: "test_follower".to_string(),
1500            pg_client: RwLock::new(follower_client),
1501            is_leader: AtomicBool::new(false),
1502            leader_infancy: AtomicBool::new(true),
1503            leader_watcher: tx,
1504            store_key_prefix: uuid.clone(),
1505            candidate_lease_ttl,
1506            meta_lease_ttl,
1507            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1508        };
1509
1510        let leader_client = create_postgres_client(
1511            Some(table_name),
1512            execution_timeout,
1513            idle_session_timeout,
1514            statement_timeout,
1515        )
1516        .await
1517        .unwrap();
1518        let (tx, _) = broadcast::channel(100);
1519        let leader_pg_election = PgElection {
1520            leader_value: "test_leader".to_string(),
1521            pg_client: RwLock::new(leader_client),
1522            is_leader: AtomicBool::new(false),
1523            leader_infancy: AtomicBool::new(true),
1524            leader_watcher: tx,
1525            store_key_prefix: uuid,
1526            candidate_lease_ttl,
1527            meta_lease_ttl,
1528            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1529        };
1530
1531        leader_pg_election
1532            .pg_client
1533            .read()
1534            .await
1535            .query(&leader_pg_election.sql_set.campaign, &[])
1536            .await
1537            .unwrap();
1538        leader_pg_election.elected().await.unwrap();
1539
1540        // Step 1: As a follower, the leader exists and the lease is not expired.
1541        follower_pg_election.follower_action().await.unwrap();
1542
1543        // Step 2: As a follower, the leader exists but the lease expired.
1544        tokio::time::sleep(Duration::from_secs(2)).await;
1545        assert!(follower_pg_election.follower_action().await.is_err());
1546
1547        // Step 3: As a follower, the leader does not exist.
1548        leader_pg_election
1549            .delete_value(&leader_pg_election.election_key())
1550            .await
1551            .unwrap();
1552        assert!(follower_pg_election.follower_action().await.is_err());
1553
1554        // Step 4: Follower thinks it's the leader but failed to acquire the lock.
1555        follower_pg_election
1556            .is_leader
1557            .store(true, Ordering::Relaxed);
1558        assert!(follower_pg_election.follower_action().await.is_err());
1559
1560        match rx.recv().await {
1561            Ok(LeaderChangeMessage::StepDown(key)) => {
1562                assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1563                assert_eq!(
1564                    String::from_utf8_lossy(key.key()),
1565                    follower_pg_election.election_key()
1566                );
1567                assert_eq!(key.lease_id(), i64::default());
1568                assert_eq!(key.revision(), i64::default());
1569            }
1570            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1571        }
1572
1573        // Clean up
1574        leader_pg_election
1575            .pg_client
1576            .read()
1577            .await
1578            .query(&leader_pg_election.sql_set.step_down, &[])
1579            .await
1580            .unwrap();
1581
1582        drop_table(&follower_pg_election, table_name).await;
1583    }
1584
1585    #[tokio::test]
1586    async fn test_idle_session_timeout() {
1587        maybe_skip_postgres_integration_test!();
1588        common_telemetry::init_default_ut_logging();
1589        let execution_timeout = Duration::from_secs(10);
1590        let statement_timeout = Duration::from_secs(10);
1591        let idle_session_timeout = Duration::from_secs(1);
1592        let mut client = create_postgres_client(
1593            None,
1594            execution_timeout,
1595            idle_session_timeout,
1596            statement_timeout,
1597        )
1598        .await
1599        .unwrap();
1600        tokio::time::sleep(Duration::from_millis(1100)).await;
1601        // Wait for the idle session timeout.
1602        let err = client.query("SELECT 1", &[]).await.unwrap_err();
1603        assert_matches!(err, error::Error::PostgresExecution { .. });
1604        let error::Error::PostgresExecution { error, .. } = err else {
1605            panic!("Expected PostgresExecution error");
1606        };
1607        assert!(error.is_closed());
1608        // Reset the client and try again.
1609        client.reset_client().await.unwrap();
1610        let _ = client.query("SELECT 1", &[]).await.unwrap();
1611    }
1612
1613    #[test]
1614    fn test_election_sql_with_schema() {
1615        let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1616        let s = f.build();
1617        assert!(s.campaign.contains("pg_try_advisory_lock"));
1618        assert!(
1619            s.put_value_with_lease
1620                .contains("\"test_schema\".\"greptime_metakv\"")
1621        );
1622        assert!(
1623            s.update_value_with_lease
1624                .contains("\"test_schema\".\"greptime_metakv\"")
1625        );
1626        assert!(
1627            s.get_value_with_lease
1628                .contains("\"test_schema\".\"greptime_metakv\"")
1629        );
1630        assert!(
1631            s.get_value_with_lease_by_prefix
1632                .contains("\"test_schema\".\"greptime_metakv\"")
1633        );
1634        assert!(
1635            s.delete_value
1636                .contains("\"test_schema\".\"greptime_metakv\"")
1637        );
1638    }
1639}