Skip to main content

common_meta/election/rds/
postgres.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_telemetry::{error, info, warn};
20use common_time::Timestamp;
21use deadpool_postgres::{Manager, Pool};
22use snafu::{OptionExt, ResultExt, ensure};
23use tokio::sync::{RwLock, broadcast};
24use tokio::time::MissedTickBehavior;
25use tokio_postgres::Row;
26use tokio_postgres::types::ToSql;
27
28use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
29use crate::election::{
30    Election, ElectionRef, LeaderChangeMessage, LeaderValue, MetasrvNodeInfo, listen_leader_change,
31    send_leader_change_and_set_flags,
32};
33use crate::error::{
34    DeserializeFromJsonSnafu, ElectionNoLeaderSnafu, GetPostgresClientSnafu,
35    PostgresExecutionSnafu, Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu,
36    UnexpectedSnafu,
37};
38use crate::key::{CANDIDATES_ROOT, ELECTION_KEY};
39
40struct ElectionSqlFactory<'a> {
41    lock_id: u64,
42    schema_name: Option<&'a str>,
43    table_name: &'a str,
44}
45
46struct ElectionSqlSet {
47    campaign: String,
48    step_down: String,
49    // SQL to put a value with expire time.
50    //
51    // Parameters for the query:
52    // `$1`: key,
53    // `$2`: value,
54    // `$3`: lease time in seconds
55    //
56    // Returns:
57    // If the key already exists, return the previous value.
58    put_value_with_lease: String,
59    // SQL to update a value with expire time.
60    //
61    // Parameters for the query:
62    // `$1`: key,
63    // `$2`: previous value,
64    // `$3`: updated value,
65    // `$4`: lease time in seconds
66    update_value_with_lease: String,
67    // SQL to get a value with expire time.
68    //
69    // Parameters:
70    // `$1`: key
71    get_value_with_lease: String,
72    // SQL to get all values with expire time with the given key prefix.
73    //
74    // Parameters:
75    // `$1`: key prefix like 'prefix%'
76    //
77    // Returns:
78    // column 0: value,
79    // column 1: current timestamp
80    get_value_with_lease_by_prefix: String,
81    // SQL to delete a value.
82    //
83    // Parameters:
84    // `$1`: key
85    //
86    // Returns:
87    // column 0: key deleted,
88    // column 1: value deleted
89    delete_value: String,
90}
91
92impl<'a> ElectionSqlFactory<'a> {
93    fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
94        Self {
95            lock_id,
96            schema_name,
97            table_name,
98        }
99    }
100
101    fn table_ident(&self) -> String {
102        match self.schema_name {
103            Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
104            _ => format!("\"{}\"", self.table_name),
105        }
106    }
107
108    fn build(self) -> ElectionSqlSet {
109        ElectionSqlSet {
110            campaign: self.campaign_sql(),
111            step_down: self.step_down_sql(),
112            put_value_with_lease: self.put_value_with_lease_sql(),
113            update_value_with_lease: self.update_value_with_lease_sql(),
114            get_value_with_lease: self.get_value_with_lease_sql(),
115            get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
116            delete_value: self.delete_value_sql(),
117        }
118    }
119
120    fn campaign_sql(&self) -> String {
121        format!("SELECT pg_try_advisory_lock({})", self.lock_id)
122    }
123
124    fn step_down_sql(&self) -> String {
125        format!("SELECT pg_advisory_unlock({})", self.lock_id)
126    }
127
128    fn put_value_with_lease_sql(&self) -> String {
129        let table = self.table_ident();
130        format!(
131            r#"WITH prev AS (
132                SELECT k, v FROM {table} WHERE k = $1
133            ), insert AS (
134                INSERT INTO {table}
135                VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
136                ON CONFLICT (k) DO NOTHING
137            )
138            SELECT k, v FROM prev;
139            "#,
140            table = table,
141            lease_sep = LEASE_SEP
142        )
143    }
144
145    fn update_value_with_lease_sql(&self) -> String {
146        let table = self.table_ident();
147        format!(
148            r#"UPDATE {table}
149               SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
150               WHERE k = $1 AND v = $2"#,
151            table = table,
152            lease_sep = LEASE_SEP
153        )
154    }
155
156    fn get_value_with_lease_sql(&self) -> String {
157        let table = self.table_ident();
158        format!(
159            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
160            table = table
161        )
162    }
163
164    fn get_value_with_lease_by_prefix_sql(&self) -> String {
165        let table = self.table_ident();
166        format!(
167            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
168            table = table
169        )
170    }
171
172    fn delete_value_sql(&self) -> String {
173        let table = self.table_ident();
174        format!(
175            "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
176            table = table
177        )
178    }
179}
180
181/// PgClient for election.
182pub struct ElectionPgClient {
183    current: Option<deadpool::managed::Object<Manager>>,
184    pool: Pool,
185    /// The client-side timeout for statement execution.
186    ///
187    /// This timeout is enforced by the client application and is independent of any server-side timeouts.
188    /// If a statement takes longer than this duration to execute, the client will abort the operation.
189    execution_timeout: Duration,
190
191    /// The idle session timeout.
192    ///
193    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
194    /// If a session remains idle for longer than this duration, the server will terminate it.
195    idle_session_timeout: Duration,
196
197    /// The statement timeout.
198    ///
199    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
200    /// If a statement takes longer than this duration to execute, the server will abort it.
201    statement_timeout: Duration,
202}
203
204impl ElectionPgClient {
205    pub fn new(
206        pool: Pool,
207        execution_timeout: Duration,
208        idle_session_timeout: Duration,
209        statement_timeout: Duration,
210    ) -> Result<ElectionPgClient> {
211        Ok(ElectionPgClient {
212            current: None,
213            pool,
214            execution_timeout,
215            idle_session_timeout,
216            statement_timeout,
217        })
218    }
219
220    fn set_idle_session_timeout_sql(&self) -> String {
221        format!(
222            "SET idle_session_timeout = '{}s';",
223            self.idle_session_timeout.as_secs()
224        )
225    }
226
227    fn set_statement_timeout_sql(&self) -> String {
228        format!(
229            "SET statement_timeout = '{}s';",
230            self.statement_timeout.as_secs()
231        )
232    }
233
234    async fn reset_client(&mut self) -> Result<()> {
235        if let Some(client) = self.current.take() {
236            // Remove the connection from deadpool and drop it,
237            // forcing TCP close and backend termination.
238            let inner = deadpool::managed::Object::<deadpool_postgres::Manager>::take(client);
239            drop(inner);
240        }
241        self.maybe_init_client().await
242    }
243
244    async fn maybe_init_client(&mut self) -> Result<()> {
245        if self.current.is_none() {
246            let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
247
248            self.current = Some(client);
249            // Set idle session timeout and statement timeout.
250            let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
251            self.execute(&idle_session_timeout_sql, &[]).await?;
252            let statement_timeout_sql = self.set_statement_timeout_sql();
253            self.execute(&statement_timeout_sql, &[]).await?;
254        }
255
256        Ok(())
257    }
258
259    /// Returns the result of the query.
260    ///
261    /// # Panics
262    /// if `current` is `None`.
263    async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
264        let result = tokio::time::timeout(
265            self.execution_timeout,
266            self.current.as_ref().unwrap().execute(sql, params),
267        )
268        .await
269        .map_err(|_| {
270            SqlExecutionTimeoutSnafu {
271                sql: sql.to_string(),
272                duration: self.execution_timeout,
273            }
274            .build()
275        })?;
276
277        result.context(PostgresExecutionSnafu { sql })
278    }
279
280    /// Returns the result of the query.
281    ///
282    /// # Panics
283    /// if `current` is `None`.
284    async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
285        let result = tokio::time::timeout(
286            self.execution_timeout,
287            self.current.as_ref().unwrap().query(sql, params),
288        )
289        .await
290        .map_err(|_| {
291            SqlExecutionTimeoutSnafu {
292                sql: sql.to_string(),
293                duration: self.execution_timeout,
294            }
295            .build()
296        })?;
297
298        result.context(PostgresExecutionSnafu { sql })
299    }
300}
301
302/// PostgreSql implementation of Election.
303pub struct PgElection {
304    leader_value: String,
305    pg_client: RwLock<ElectionPgClient>,
306    is_leader: AtomicBool,
307    leader_infancy: AtomicBool,
308    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
309    store_key_prefix: String,
310    candidate_lease_ttl: Duration,
311    meta_lease_ttl: Duration,
312    sql_set: ElectionSqlSet,
313}
314
315impl PgElection {
316    async fn maybe_init_client(&self) -> Result<()> {
317        if self.pg_client.read().await.current.is_none() {
318            self.pg_client.write().await.maybe_init_client().await?;
319        }
320
321        Ok(())
322    }
323
324    #[allow(clippy::too_many_arguments)]
325    pub async fn with_pg_client(
326        leader_value: String,
327        pg_client: ElectionPgClient,
328        store_key_prefix: String,
329        candidate_lease_ttl: Duration,
330        meta_lease_ttl: Duration,
331        schema_name: Option<&str>,
332        table_name: &str,
333        lock_id: u64,
334    ) -> Result<ElectionRef> {
335        if let Some(s) = schema_name {
336            common_telemetry::info!("PgElection uses schema: {}", s);
337        } else {
338            common_telemetry::info!("PgElection uses default search_path (no schema provided)");
339        }
340        let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
341
342        let tx = listen_leader_change(leader_value.clone());
343        Ok(Arc::new(Self {
344            leader_value,
345            pg_client: RwLock::new(pg_client),
346            is_leader: AtomicBool::new(false),
347            leader_infancy: AtomicBool::new(false),
348            leader_watcher: tx,
349            store_key_prefix,
350            candidate_lease_ttl,
351            meta_lease_ttl,
352            sql_set: sql_factory.build(),
353        }))
354    }
355
356    fn election_key(&self) -> String {
357        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
358    }
359
360    fn candidate_root(&self) -> String {
361        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
362    }
363
364    fn candidate_key(&self) -> String {
365        format!("{}{}", self.candidate_root(), self.leader_value)
366    }
367}
368
369#[async_trait::async_trait]
370impl Election for PgElection {
371    type Leader = LeaderValue;
372
373    fn is_leader(&self) -> bool {
374        self.is_leader.load(Ordering::Relaxed)
375    }
376
377    fn in_leader_infancy(&self) -> bool {
378        self.leader_infancy
379            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
380            .is_ok()
381    }
382
383    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
384        let key = self.candidate_key();
385        let node_info =
386            serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
387                input: format!("{node_info:?}"),
388            })?;
389        let res = self
390            .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
391            .await?;
392        // May registered before, just update the lease.
393        if !res {
394            self.delete_value(&key).await?;
395            self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
396                .await?;
397        }
398
399        // Check if the current lease has expired and renew the lease.
400        let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
401        loop {
402            let _ = keep_alive_interval.tick().await;
403
404            let lease = self
405                .get_value_with_lease(&key)
406                .await?
407                .context(UnexpectedSnafu {
408                    err_msg: format!("Failed to get lease for key: {:?}", key),
409                })?;
410
411            ensure!(
412                lease.expire_time > lease.current,
413                UnexpectedSnafu {
414                    err_msg: format!(
415                        "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
416                        lease.expire_time, lease.current, key
417                    ),
418                }
419            );
420
421            // Safety: origin is Some since we are using `get_value_with_lease` with `true`.
422            self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
423                .await?;
424        }
425    }
426
427    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
428        let key_prefix = self.candidate_root();
429        let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
430        // Remove expired candidates
431        candidates.retain(|c| c.1 > current);
432        let mut valid_candidates = Vec::with_capacity(candidates.len());
433        for (c, _) in candidates {
434            let node_info: MetasrvNodeInfo =
435                serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
436                    input: format!("{:?}", c),
437                })?;
438            valid_candidates.push(node_info);
439        }
440        Ok(valid_candidates)
441    }
442
443    /// Attempts to acquire leadership by executing a campaign. This function continuously checks
444    /// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
445    ///
446    /// The function operates in a loop, where it:
447    ///
448    /// 1. Waits for a predefined interval before attempting to acquire the lock again.
449    /// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock.
450    /// 3. Checks the result of the query:
451    ///    - If the lock is successfully acquired (result is true), it calls the `leader_action` method
452    ///      to perform actions as the leader.
453    ///    - If the lock is not acquired (result is false), it calls the `follower_action` method
454    ///      to perform actions as a follower.
455    async fn campaign(&self) -> Result<()> {
456        let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
457        keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
458
459        self.maybe_init_client().await?;
460        loop {
461            let res = self
462                .pg_client
463                .read()
464                .await
465                .query(&self.sql_set.campaign, &[])
466                .await?;
467            let row = res.first().context(UnexpectedSnafu {
468                err_msg: "Failed to get the result of acquiring advisory lock".to_string(),
469            })?;
470            let is_leader = row.try_get(0).map_err(|_| {
471                UnexpectedSnafu {
472                    err_msg: "Failed to get the result of get lock".to_string(),
473                }
474                .build()
475            })?;
476            if is_leader {
477                self.leader_action().await?;
478            } else {
479                self.follower_action().await?;
480            }
481            let _ = keep_alive_interval.tick().await;
482        }
483    }
484
485    async fn reset_campaign(&self) {
486        info!("Resetting campaign");
487        if self.is_leader.load(Ordering::Relaxed) {
488            if let Err(err) = self.step_down_without_lock().await {
489                error!(err; "Failed to step down without lock");
490            }
491            info!("Step down without lock successfully, due to reset campaign");
492        }
493        if let Err(err) = self.pg_client.write().await.reset_client().await {
494            error!(err; "Failed to reset client");
495        }
496    }
497
498    async fn leader(&self) -> Result<Self::Leader> {
499        if self.is_leader.load(Ordering::Relaxed) {
500            Ok(self.leader_value.as_bytes().into())
501        } else {
502            let key = self.election_key();
503            if let Some(lease) = self.get_value_with_lease(&key).await? {
504                ensure!(lease.expire_time > lease.current, ElectionNoLeaderSnafu);
505                Ok(lease.leader_value.as_bytes().into())
506            } else {
507                ElectionNoLeaderSnafu.fail()
508            }
509        }
510    }
511
512    async fn resign(&self) -> Result<()> {
513        todo!()
514    }
515
516    fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
517        self.leader_watcher.subscribe()
518    }
519}
520
521impl PgElection {
522    /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
523    async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
524        let key = key.as_bytes();
525        self.maybe_init_client().await?;
526        let res = self
527            .pg_client
528            .read()
529            .await
530            .query(&self.sql_set.get_value_with_lease, &[&key])
531            .await?;
532
533        if res.is_empty() {
534            Ok(None)
535        } else {
536            // Safety: Checked if res is empty above.
537            let current_time_str = res[0].try_get(1).unwrap_or_default();
538            let current_time = match Timestamp::from_str(current_time_str, None) {
539                Ok(ts) => ts,
540                Err(_) => UnexpectedSnafu {
541                    err_msg: format!("Invalid timestamp: {}", current_time_str),
542                }
543                .fail()?,
544            };
545            // Safety: Checked if res is empty above.
546            let value_and_expire_time =
547                String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
548            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
549
550            Ok(Some(Lease {
551                leader_value: value,
552                expire_time,
553                current: current_time,
554                origin: value_and_expire_time.to_string(),
555            }))
556        }
557    }
558
559    /// Returns all values and expire time with the given key prefix. Also returns the current time.
560    async fn get_value_with_lease_by_prefix(
561        &self,
562        key_prefix: &str,
563    ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
564        let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
565        self.maybe_init_client().await?;
566        let res = self
567            .pg_client
568            .read()
569            .await
570            .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
571            .await?;
572
573        let mut values_with_leases = vec![];
574        let mut current = Timestamp::default();
575        for row in res {
576            let current_time_str = row.try_get(1).unwrap_or_default();
577            current = match Timestamp::from_str(current_time_str, None) {
578                Ok(ts) => ts,
579                Err(_) => UnexpectedSnafu {
580                    err_msg: format!("Invalid timestamp: {}", current_time_str),
581                }
582                .fail()?,
583            };
584
585            let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
586            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
587
588            values_with_leases.push((value, expire_time));
589        }
590        Ok((values_with_leases, current))
591    }
592
593    async fn update_value_with_lease(
594        &self,
595        key: &str,
596        prev: &str,
597        updated: &str,
598        lease_ttl: Duration,
599    ) -> Result<()> {
600        let key = key.as_bytes();
601        let prev = prev.as_bytes();
602        self.maybe_init_client().await?;
603        let lease_ttl_secs = lease_ttl.as_secs() as f64;
604        let res = self
605            .pg_client
606            .read()
607            .await
608            .execute(
609                &self.sql_set.update_value_with_lease,
610                &[&key, &prev, &updated, &lease_ttl_secs],
611            )
612            .await?;
613
614        ensure!(
615            res == 1,
616            UnexpectedSnafu {
617                err_msg: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
618            }
619        );
620
621        Ok(())
622    }
623
624    /// Returns `true` if the insertion is successful
625    async fn put_value_with_lease(
626        &self,
627        key: &str,
628        value: &str,
629        lease_ttl: Duration,
630    ) -> Result<bool> {
631        let key = key.as_bytes();
632        let lease_ttl_secs = lease_ttl.as_secs() as f64;
633        let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
634        self.maybe_init_client().await?;
635        let res = self
636            .pg_client
637            .read()
638            .await
639            .query(&self.sql_set.put_value_with_lease, &params)
640            .await?;
641        Ok(res.is_empty())
642    }
643
644    /// Returns `true` if the deletion is successful.
645    /// Caution: Should only delete the key if the lease is expired.
646    async fn delete_value(&self, key: &str) -> Result<bool> {
647        let key = key.as_bytes();
648        self.maybe_init_client().await?;
649        let res = self
650            .pg_client
651            .read()
652            .await
653            .query(&self.sql_set.delete_value, &[&key])
654            .await?;
655
656        Ok(res.len() == 1)
657    }
658
659    /// Handles the actions of a leader in the election process.
660    ///
661    /// This function performs the following checks and actions:
662    ///
663    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
664    ///   it attempts to renew the lease. It checks if the lease is still valid and either renews it
665    ///   or steps down if it has expired.
666    ///
667    ///   - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease
668    ///     by updating the value associated with the election key.
669    ///   - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning
670    ///     and steps down, initiating a new campaign for leadership.
671    ///   - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning
672    ///     indicating that it still holds the lock and steps down to re-initiate the campaign. This may
673    ///     happen if the leader has failed to renew the lease and the session has expired, and recovery
674    ///     after a period of time during which other leaders have been elected and stepped down.
675    ///   - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
676    ///
677    /// - **Case 2**: If the current instance is not leader previously, it calls the `elected` method
678    ///   as a newly elected leader.
679    async fn leader_action(&self) -> Result<()> {
680        let key = self.election_key();
681        // Case 1
682        if self.is_leader() {
683            match self.get_value_with_lease(&key).await? {
684                Some(lease) => {
685                    match (
686                        lease.leader_value == self.leader_value,
687                        lease.expire_time > lease.current,
688                    ) {
689                        // Case 1.1
690                        (true, true) => {
691                            // Safety: prev is Some since we are using `get_value_with_lease` with `true`.
692                            self.update_value_with_lease(
693                                &key,
694                                &lease.origin,
695                                &self.leader_value,
696                                self.meta_lease_ttl,
697                            )
698                            .await?;
699                        }
700                        // Case 1.2
701                        (true, false) => {
702                            warn!("Leader lease expired, now stepping down.");
703                            self.step_down().await?;
704                        }
705                        // Case 1.3
706                        (false, _) => {
707                            warn!(
708                                "Leader lease not found, but still hold the lock. Now stepping down."
709                            );
710                            self.step_down().await?;
711                        }
712                    }
713                }
714                // Case 1.4
715                None => {
716                    warn!("Leader lease not found, but still hold the lock. Now stepping down.");
717                    self.step_down().await?;
718                }
719            }
720        // Case 2
721        } else {
722            self.elected().await?;
723        }
724        Ok(())
725    }
726
727    /// Handles the actions of a follower in the election process.
728    ///
729    /// This function performs the following checks and actions:
730    ///
731    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
732    ///   it steps down without deleting the key.
733    /// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error
734    ///   to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
735    ///   will be released.
736    /// - **Case 3**: If all checks pass, the function returns without performing any actions.
737    async fn follower_action(&self) -> Result<()> {
738        let key = self.election_key();
739        // Case 1
740        if self.is_leader() {
741            self.step_down_without_lock().await?;
742        }
743        let lease = self
744            .get_value_with_lease(&key)
745            .await?
746            .context(ElectionNoLeaderSnafu)?;
747        // Case 2
748        ensure!(lease.expire_time > lease.current, ElectionNoLeaderSnafu);
749        // Case 3
750        Ok(())
751    }
752
753    /// Step down the leader. The leader should delete the key and notify the leader watcher.
754    ///
755    /// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected.
756    ///
757    /// ## Caution:
758    /// Should only step down while holding the advisory lock.
759    async fn step_down(&self) -> Result<()> {
760        let key = self.election_key();
761        let leader_key = RdsLeaderKey {
762            name: self.leader_value.clone().into_bytes(),
763            key: key.clone().into_bytes(),
764            ..Default::default()
765        };
766        self.delete_value(&key).await?;
767        self.maybe_init_client().await?;
768        self.pg_client
769            .read()
770            .await
771            .query(&self.sql_set.step_down, &[])
772            .await?;
773        send_leader_change_and_set_flags(
774            &self.is_leader,
775            &self.leader_infancy,
776            &self.leader_watcher,
777            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
778        );
779        Ok(())
780    }
781
782    /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
783    async fn step_down_without_lock(&self) -> Result<()> {
784        let key = self.election_key().into_bytes();
785        let leader_key = RdsLeaderKey {
786            name: self.leader_value.clone().into_bytes(),
787            key: key.clone(),
788            ..Default::default()
789        };
790        send_leader_change_and_set_flags(
791            &self.is_leader,
792            &self.leader_infancy,
793            &self.leader_watcher,
794            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
795        );
796        Ok(())
797    }
798
799    /// Elected as leader. The leader should put the key and notify the leader watcher.
800    /// Caution: Should only elected while holding the advisory lock.
801    async fn elected(&self) -> Result<()> {
802        let key = self.election_key();
803        let leader_key = RdsLeaderKey {
804            name: self.leader_value.clone().into_bytes(),
805            key: key.clone().into_bytes(),
806            ..Default::default()
807        };
808        self.delete_value(&key).await?;
809        self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
810            .await?;
811
812        if self
813            .is_leader
814            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
815            .is_ok()
816        {
817            self.leader_infancy.store(true, Ordering::Release);
818
819            if let Err(e) = self
820                .leader_watcher
821                .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
822            {
823                error!(e; "Failed to send leader change message");
824            }
825        }
826        Ok(())
827    }
828}
829
830#[cfg(test)]
831mod tests {
832    use std::{assert_matches, env};
833
834    use deadpool_postgres::{Config, Runtime};
835    use tokio_postgres::NoTls;
836
837    use super::*;
838    use crate::{error, maybe_skip_postgres_integration_test};
839
840    async fn create_postgres_client(
841        table_name: Option<&str>,
842        execution_timeout: Duration,
843        idle_session_timeout: Duration,
844        statement_timeout: Duration,
845    ) -> Result<ElectionPgClient> {
846        let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
847        if endpoint.is_empty() {
848            return UnexpectedSnafu {
849                err_msg: "Postgres endpoint is empty".to_string(),
850            }
851            .fail();
852        }
853        let mut cfg = Config::new();
854        cfg.url = Some(endpoint);
855        let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
856        let mut pg_client = ElectionPgClient::new(
857            pool,
858            execution_timeout,
859            idle_session_timeout,
860            statement_timeout,
861        )
862        .unwrap();
863        pg_client.maybe_init_client().await?;
864        if let Some(table_name) = table_name {
865            let create_table_sql = format!(
866                "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
867                table_name
868            );
869            pg_client.execute(&create_table_sql, &[]).await?;
870        }
871        Ok(pg_client)
872    }
873
874    async fn drop_table(pg_election: &PgElection, table_name: &str) {
875        let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
876        pg_election
877            .pg_client
878            .read()
879            .await
880            .execute(&sql, &[])
881            .await
882            .unwrap();
883    }
884
885    #[tokio::test]
886    async fn test_postgres_crud() {
887        maybe_skip_postgres_integration_test!();
888        let key = "test_key".to_string();
889        let value = "test_value".to_string();
890
891        let uuid = uuid::Uuid::new_v4().to_string();
892        let table_name = "test_postgres_crud_greptime_metakv";
893        let candidate_lease_ttl = Duration::from_secs(10);
894        let execution_timeout = Duration::from_secs(10);
895        let statement_timeout = Duration::from_secs(10);
896        let meta_lease_ttl = Duration::from_secs(2);
897        let idle_session_timeout = Duration::from_secs(0);
898        let client = create_postgres_client(
899            Some(table_name),
900            execution_timeout,
901            idle_session_timeout,
902            statement_timeout,
903        )
904        .await
905        .unwrap();
906
907        let (tx, _) = broadcast::channel(100);
908        let pg_election = PgElection {
909            leader_value: "test_leader".to_string(),
910            pg_client: RwLock::new(client),
911            is_leader: AtomicBool::new(false),
912            leader_infancy: AtomicBool::new(true),
913            leader_watcher: tx,
914            store_key_prefix: uuid,
915            candidate_lease_ttl,
916            meta_lease_ttl,
917            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
918        };
919
920        let res = pg_election
921            .put_value_with_lease(&key, &value, candidate_lease_ttl)
922            .await
923            .unwrap();
924        assert!(res);
925
926        let lease = pg_election
927            .get_value_with_lease(&key)
928            .await
929            .unwrap()
930            .unwrap();
931        assert_eq!(lease.leader_value, value);
932
933        pg_election
934            .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
935            .await
936            .unwrap();
937
938        let res = pg_election.delete_value(&key).await.unwrap();
939        assert!(res);
940
941        let res = pg_election.get_value_with_lease(&key).await.unwrap();
942        assert!(res.is_none());
943
944        for i in 0..10 {
945            let key = format!("test_key_{}", i);
946            let value = format!("test_value_{}", i);
947            pg_election
948                .put_value_with_lease(&key, &value, candidate_lease_ttl)
949                .await
950                .unwrap();
951        }
952
953        let key_prefix = "test_key".to_string();
954        let (res, _) = pg_election
955            .get_value_with_lease_by_prefix(&key_prefix)
956            .await
957            .unwrap();
958        assert_eq!(res.len(), 10);
959
960        for i in 0..10 {
961            let key = format!("test_key_{}", i);
962            let res = pg_election.delete_value(&key).await.unwrap();
963            assert!(res);
964        }
965
966        let (res, current) = pg_election
967            .get_value_with_lease_by_prefix(&key_prefix)
968            .await
969            .unwrap();
970        assert!(res.is_empty());
971        assert!(current == Timestamp::default());
972
973        drop_table(&pg_election, table_name).await;
974    }
975
976    async fn candidate(
977        leader_value: String,
978        candidate_lease_ttl: Duration,
979        store_key_prefix: String,
980        table_name: String,
981    ) {
982        let execution_timeout = Duration::from_secs(10);
983        let statement_timeout = Duration::from_secs(10);
984        let meta_lease_ttl = Duration::from_secs(2);
985        let idle_session_timeout = Duration::from_secs(0);
986        let client = create_postgres_client(
987            None,
988            execution_timeout,
989            idle_session_timeout,
990            statement_timeout,
991        )
992        .await
993        .unwrap();
994
995        let (tx, _) = broadcast::channel(100);
996        let pg_election = PgElection {
997            leader_value,
998            pg_client: RwLock::new(client),
999            is_leader: AtomicBool::new(false),
1000            leader_infancy: AtomicBool::new(true),
1001            leader_watcher: tx,
1002            store_key_prefix,
1003            candidate_lease_ttl,
1004            meta_lease_ttl,
1005            sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
1006        };
1007
1008        let node_info = MetasrvNodeInfo {
1009            addr: "test_addr".to_string(),
1010            version: "test_version".to_string(),
1011            git_commit: "test_git_commit".to_string(),
1012            start_time_ms: 0,
1013            total_cpu_millicores: 0,
1014            total_memory_bytes: 0,
1015            cpu_usage_millicores: 0,
1016            memory_usage_bytes: 0,
1017            hostname: "test_hostname".to_string(),
1018        };
1019        pg_election.register_candidate(&node_info).await.unwrap();
1020    }
1021
1022    #[tokio::test]
1023    async fn test_candidate_registration() {
1024        maybe_skip_postgres_integration_test!();
1025        let leader_value_prefix = "test_leader".to_string();
1026        let uuid = uuid::Uuid::new_v4().to_string();
1027        let table_name = "test_candidate_registration_greptime_metakv";
1028        let mut handles = vec![];
1029        let candidate_lease_ttl = Duration::from_secs(5);
1030        let execution_timeout = Duration::from_secs(10);
1031        let statement_timeout = Duration::from_secs(10);
1032        let meta_lease_ttl = Duration::from_secs(2);
1033        let idle_session_timeout = Duration::from_secs(0);
1034        let client = create_postgres_client(
1035            Some(table_name),
1036            execution_timeout,
1037            idle_session_timeout,
1038            statement_timeout,
1039        )
1040        .await
1041        .unwrap();
1042
1043        for i in 0..10 {
1044            let leader_value = format!("{}{}", leader_value_prefix, i);
1045            let handle = tokio::spawn(candidate(
1046                leader_value,
1047                candidate_lease_ttl,
1048                uuid.clone(),
1049                table_name.to_string(),
1050            ));
1051            handles.push(handle);
1052        }
1053        // Wait for candidates to register themselves and renew their leases at least once.
1054        tokio::time::sleep(Duration::from_secs(3)).await;
1055
1056        let (tx, _) = broadcast::channel(100);
1057        let leader_value = "test_leader".to_string();
1058        let pg_election = PgElection {
1059            leader_value,
1060            pg_client: RwLock::new(client),
1061            is_leader: AtomicBool::new(false),
1062            leader_infancy: AtomicBool::new(true),
1063            leader_watcher: tx,
1064            store_key_prefix: uuid.clone(),
1065            candidate_lease_ttl,
1066            meta_lease_ttl,
1067            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1068        };
1069
1070        let candidates = pg_election.all_candidates().await.unwrap();
1071        assert_eq!(candidates.len(), 10);
1072
1073        for handle in handles {
1074            handle.abort();
1075        }
1076
1077        // Wait for the candidate leases to expire.
1078        tokio::time::sleep(Duration::from_secs(5)).await;
1079        let candidates = pg_election.all_candidates().await.unwrap();
1080        assert!(candidates.is_empty());
1081
1082        // Garbage collection
1083        for i in 0..10 {
1084            let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1085            let res = pg_election.delete_value(&key).await.unwrap();
1086            assert!(res);
1087        }
1088
1089        drop_table(&pg_election, table_name).await;
1090    }
1091
1092    #[tokio::test]
1093    async fn test_elected_and_step_down() {
1094        maybe_skip_postgres_integration_test!();
1095        let leader_value = "test_leader".to_string();
1096        let uuid = uuid::Uuid::new_v4().to_string();
1097        let table_name = "test_elected_and_step_down_greptime_metakv";
1098        let candidate_lease_ttl = Duration::from_secs(5);
1099        let execution_timeout = Duration::from_secs(10);
1100        let statement_timeout = Duration::from_secs(10);
1101        let meta_lease_ttl = Duration::from_secs(2);
1102        let idle_session_timeout = Duration::from_secs(0);
1103        let client = create_postgres_client(
1104            Some(table_name),
1105            execution_timeout,
1106            idle_session_timeout,
1107            statement_timeout,
1108        )
1109        .await
1110        .unwrap();
1111
1112        let (tx, mut rx) = broadcast::channel(100);
1113        let leader_pg_election = PgElection {
1114            leader_value: leader_value.clone(),
1115            pg_client: RwLock::new(client),
1116            is_leader: AtomicBool::new(false),
1117            leader_infancy: AtomicBool::new(true),
1118            leader_watcher: tx,
1119            store_key_prefix: uuid,
1120            candidate_lease_ttl,
1121            meta_lease_ttl,
1122            sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1123        };
1124
1125        leader_pg_election.elected().await.unwrap();
1126        let lease = leader_pg_election
1127            .get_value_with_lease(&leader_pg_election.election_key())
1128            .await
1129            .unwrap()
1130            .unwrap();
1131        assert!(lease.leader_value == leader_value);
1132        assert!(lease.expire_time > lease.current);
1133        assert!(leader_pg_election.is_leader());
1134
1135        match rx.recv().await {
1136            Ok(LeaderChangeMessage::Elected(key)) => {
1137                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1138                assert_eq!(
1139                    String::from_utf8_lossy(key.key()),
1140                    leader_pg_election.election_key()
1141                );
1142                assert_eq!(key.lease_id(), i64::default());
1143                assert_eq!(key.revision(), i64::default());
1144            }
1145            _ => panic!("Expected LeaderChangeMessage::Elected"),
1146        }
1147
1148        leader_pg_election.step_down_without_lock().await.unwrap();
1149        let lease = leader_pg_election
1150            .get_value_with_lease(&leader_pg_election.election_key())
1151            .await
1152            .unwrap()
1153            .unwrap();
1154        assert!(lease.leader_value == leader_value);
1155        assert!(!leader_pg_election.is_leader());
1156
1157        match rx.recv().await {
1158            Ok(LeaderChangeMessage::StepDown(key)) => {
1159                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1160                assert_eq!(
1161                    String::from_utf8_lossy(key.key()),
1162                    leader_pg_election.election_key()
1163                );
1164                assert_eq!(key.lease_id(), i64::default());
1165                assert_eq!(key.revision(), i64::default());
1166            }
1167            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1168        }
1169
1170        leader_pg_election.elected().await.unwrap();
1171        let lease = leader_pg_election
1172            .get_value_with_lease(&leader_pg_election.election_key())
1173            .await
1174            .unwrap()
1175            .unwrap();
1176        assert!(lease.leader_value == leader_value);
1177        assert!(lease.expire_time > lease.current);
1178        assert!(leader_pg_election.is_leader());
1179
1180        match rx.recv().await {
1181            Ok(LeaderChangeMessage::Elected(key)) => {
1182                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1183                assert_eq!(
1184                    String::from_utf8_lossy(key.key()),
1185                    leader_pg_election.election_key()
1186                );
1187                assert_eq!(key.lease_id(), i64::default());
1188                assert_eq!(key.revision(), i64::default());
1189            }
1190            _ => panic!("Expected LeaderChangeMessage::Elected"),
1191        }
1192
1193        leader_pg_election.step_down().await.unwrap();
1194        let res = leader_pg_election
1195            .get_value_with_lease(&leader_pg_election.election_key())
1196            .await
1197            .unwrap();
1198        assert!(res.is_none());
1199        assert!(!leader_pg_election.is_leader());
1200
1201        match rx.recv().await {
1202            Ok(LeaderChangeMessage::StepDown(key)) => {
1203                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1204                assert_eq!(
1205                    String::from_utf8_lossy(key.key()),
1206                    leader_pg_election.election_key()
1207                );
1208                assert_eq!(key.lease_id(), i64::default());
1209                assert_eq!(key.revision(), i64::default());
1210            }
1211            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1212        }
1213
1214        drop_table(&leader_pg_election, table_name).await;
1215    }
1216
1217    #[tokio::test]
1218    async fn test_leader_action() {
1219        maybe_skip_postgres_integration_test!();
1220        let leader_value = "test_leader".to_string();
1221        let uuid = uuid::Uuid::new_v4().to_string();
1222        let table_name = "test_leader_action_greptime_metakv";
1223        let candidate_lease_ttl = Duration::from_secs(5);
1224        let execution_timeout = Duration::from_secs(10);
1225        let statement_timeout = Duration::from_secs(10);
1226        let meta_lease_ttl = Duration::from_secs(2);
1227        let idle_session_timeout = Duration::from_secs(0);
1228        let client = create_postgres_client(
1229            Some(table_name),
1230            execution_timeout,
1231            idle_session_timeout,
1232            statement_timeout,
1233        )
1234        .await
1235        .unwrap();
1236
1237        let (tx, mut rx) = broadcast::channel(100);
1238        let leader_pg_election = PgElection {
1239            leader_value: leader_value.clone(),
1240            pg_client: RwLock::new(client),
1241            is_leader: AtomicBool::new(false),
1242            leader_infancy: AtomicBool::new(true),
1243            leader_watcher: tx,
1244            store_key_prefix: uuid,
1245            candidate_lease_ttl,
1246            meta_lease_ttl,
1247            sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1248        };
1249
1250        // Step 1: No leader exists, campaign and elected.
1251        let res = leader_pg_election
1252            .pg_client
1253            .read()
1254            .await
1255            .query(&leader_pg_election.sql_set.campaign, &[])
1256            .await
1257            .unwrap();
1258        let res: bool = res[0].get(0);
1259        assert!(res);
1260        leader_pg_election.leader_action().await.unwrap();
1261        let lease = leader_pg_election
1262            .get_value_with_lease(&leader_pg_election.election_key())
1263            .await
1264            .unwrap()
1265            .unwrap();
1266        assert!(lease.leader_value == leader_value);
1267        assert!(lease.expire_time > lease.current);
1268        assert!(leader_pg_election.is_leader());
1269
1270        match rx.recv().await {
1271            Ok(LeaderChangeMessage::Elected(key)) => {
1272                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1273                assert_eq!(
1274                    String::from_utf8_lossy(key.key()),
1275                    leader_pg_election.election_key()
1276                );
1277                assert_eq!(key.lease_id(), i64::default());
1278                assert_eq!(key.revision(), i64::default());
1279            }
1280            _ => panic!("Expected LeaderChangeMessage::Elected"),
1281        }
1282
1283        // Step 2: As a leader, renew the lease.
1284        let res = leader_pg_election
1285            .pg_client
1286            .read()
1287            .await
1288            .query(&leader_pg_election.sql_set.campaign, &[])
1289            .await
1290            .unwrap();
1291        let res: bool = res[0].get(0);
1292        assert!(res);
1293        leader_pg_election.leader_action().await.unwrap();
1294        let new_lease = leader_pg_election
1295            .get_value_with_lease(&leader_pg_election.election_key())
1296            .await
1297            .unwrap()
1298            .unwrap();
1299        assert!(new_lease.leader_value == leader_value);
1300        assert!(
1301            new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1302        );
1303        assert!(leader_pg_election.is_leader());
1304
1305        // Step 3: Something wrong, the leader lease expired.
1306        tokio::time::sleep(Duration::from_secs(2)).await;
1307
1308        let res = leader_pg_election
1309            .pg_client
1310            .read()
1311            .await
1312            .query(&leader_pg_election.sql_set.campaign, &[])
1313            .await
1314            .unwrap();
1315        let res: bool = res[0].get(0);
1316        assert!(res);
1317        leader_pg_election.leader_action().await.unwrap();
1318        let res = leader_pg_election
1319            .get_value_with_lease(&leader_pg_election.election_key())
1320            .await
1321            .unwrap();
1322        assert!(res.is_none());
1323
1324        match rx.recv().await {
1325            Ok(LeaderChangeMessage::StepDown(key)) => {
1326                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1327                assert_eq!(
1328                    String::from_utf8_lossy(key.key()),
1329                    leader_pg_election.election_key()
1330                );
1331                assert_eq!(key.lease_id(), i64::default());
1332                assert_eq!(key.revision(), i64::default());
1333            }
1334            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1335        }
1336
1337        // Step 4: Re-campaign and elected.
1338        let res = leader_pg_election
1339            .pg_client
1340            .read()
1341            .await
1342            .query(&leader_pg_election.sql_set.campaign, &[])
1343            .await
1344            .unwrap();
1345        let res: bool = res[0].get(0);
1346        assert!(res);
1347        leader_pg_election.leader_action().await.unwrap();
1348        let lease = leader_pg_election
1349            .get_value_with_lease(&leader_pg_election.election_key())
1350            .await
1351            .unwrap()
1352            .unwrap();
1353        assert!(lease.leader_value == leader_value);
1354        assert!(lease.expire_time > lease.current);
1355        assert!(leader_pg_election.is_leader());
1356
1357        match rx.recv().await {
1358            Ok(LeaderChangeMessage::Elected(key)) => {
1359                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1360                assert_eq!(
1361                    String::from_utf8_lossy(key.key()),
1362                    leader_pg_election.election_key()
1363                );
1364                assert_eq!(key.lease_id(), i64::default());
1365                assert_eq!(key.revision(), i64::default());
1366            }
1367            _ => panic!("Expected LeaderChangeMessage::Elected"),
1368        }
1369
1370        // Step 5: Something wrong, the leader key is deleted by other followers.
1371        leader_pg_election
1372            .delete_value(&leader_pg_election.election_key())
1373            .await
1374            .unwrap();
1375        leader_pg_election.leader_action().await.unwrap();
1376        let res = leader_pg_election
1377            .get_value_with_lease(&leader_pg_election.election_key())
1378            .await
1379            .unwrap();
1380        assert!(res.is_none());
1381        assert!(!leader_pg_election.is_leader());
1382
1383        match rx.recv().await {
1384            Ok(LeaderChangeMessage::StepDown(key)) => {
1385                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1386                assert_eq!(
1387                    String::from_utf8_lossy(key.key()),
1388                    leader_pg_election.election_key()
1389                );
1390                assert_eq!(key.lease_id(), i64::default());
1391                assert_eq!(key.revision(), i64::default());
1392            }
1393            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1394        }
1395
1396        // Step 6: Re-campaign and elected.
1397        let res = leader_pg_election
1398            .pg_client
1399            .read()
1400            .await
1401            .query(&leader_pg_election.sql_set.campaign, &[])
1402            .await
1403            .unwrap();
1404        let res: bool = res[0].get(0);
1405        assert!(res);
1406        leader_pg_election.leader_action().await.unwrap();
1407        let lease = leader_pg_election
1408            .get_value_with_lease(&leader_pg_election.election_key())
1409            .await
1410            .unwrap()
1411            .unwrap();
1412        assert!(lease.leader_value == leader_value);
1413        assert!(lease.expire_time > lease.current);
1414        assert!(leader_pg_election.is_leader());
1415
1416        match rx.recv().await {
1417            Ok(LeaderChangeMessage::Elected(key)) => {
1418                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1419                assert_eq!(
1420                    String::from_utf8_lossy(key.key()),
1421                    leader_pg_election.election_key()
1422                );
1423                assert_eq!(key.lease_id(), i64::default());
1424                assert_eq!(key.revision(), i64::default());
1425            }
1426            _ => panic!("Expected LeaderChangeMessage::Elected"),
1427        }
1428
1429        // Step 7: Something wrong, the leader key changed by others.
1430        let res = leader_pg_election
1431            .pg_client
1432            .read()
1433            .await
1434            .query(&leader_pg_election.sql_set.campaign, &[])
1435            .await
1436            .unwrap();
1437        let res: bool = res[0].get(0);
1438        assert!(res);
1439        leader_pg_election
1440            .delete_value(&leader_pg_election.election_key())
1441            .await
1442            .unwrap();
1443        leader_pg_election
1444            .put_value_with_lease(
1445                &leader_pg_election.election_key(),
1446                "test",
1447                Duration::from_secs(10),
1448            )
1449            .await
1450            .unwrap();
1451        leader_pg_election.leader_action().await.unwrap();
1452        let res = leader_pg_election
1453            .get_value_with_lease(&leader_pg_election.election_key())
1454            .await
1455            .unwrap();
1456        assert!(res.is_none());
1457        assert!(!leader_pg_election.is_leader());
1458
1459        match rx.recv().await {
1460            Ok(LeaderChangeMessage::StepDown(key)) => {
1461                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1462                assert_eq!(
1463                    String::from_utf8_lossy(key.key()),
1464                    leader_pg_election.election_key()
1465                );
1466                assert_eq!(key.lease_id(), i64::default());
1467                assert_eq!(key.revision(), i64::default());
1468            }
1469            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1470        }
1471
1472        // Clean up
1473        leader_pg_election
1474            .pg_client
1475            .read()
1476            .await
1477            .query(&leader_pg_election.sql_set.step_down, &[])
1478            .await
1479            .unwrap();
1480
1481        drop_table(&leader_pg_election, table_name).await;
1482    }
1483
1484    #[tokio::test]
1485    async fn test_follower_action() {
1486        maybe_skip_postgres_integration_test!();
1487        common_telemetry::init_default_ut_logging();
1488        let uuid = uuid::Uuid::new_v4().to_string();
1489        let table_name = "test_follower_action_greptime_metakv";
1490
1491        let candidate_lease_ttl = Duration::from_secs(5);
1492        let execution_timeout = Duration::from_secs(10);
1493        let statement_timeout = Duration::from_secs(10);
1494        let meta_lease_ttl = Duration::from_secs(2);
1495        let idle_session_timeout = Duration::from_secs(0);
1496        let follower_client = create_postgres_client(
1497            Some(table_name),
1498            execution_timeout,
1499            idle_session_timeout,
1500            statement_timeout,
1501        )
1502        .await
1503        .unwrap();
1504        let (tx, mut rx) = broadcast::channel(100);
1505        let follower_pg_election = PgElection {
1506            leader_value: "test_follower".to_string(),
1507            pg_client: RwLock::new(follower_client),
1508            is_leader: AtomicBool::new(false),
1509            leader_infancy: AtomicBool::new(true),
1510            leader_watcher: tx,
1511            store_key_prefix: uuid.clone(),
1512            candidate_lease_ttl,
1513            meta_lease_ttl,
1514            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1515        };
1516
1517        let leader_client = create_postgres_client(
1518            Some(table_name),
1519            execution_timeout,
1520            idle_session_timeout,
1521            statement_timeout,
1522        )
1523        .await
1524        .unwrap();
1525        let (tx, _) = broadcast::channel(100);
1526        let leader_pg_election = PgElection {
1527            leader_value: "test_leader".to_string(),
1528            pg_client: RwLock::new(leader_client),
1529            is_leader: AtomicBool::new(false),
1530            leader_infancy: AtomicBool::new(true),
1531            leader_watcher: tx,
1532            store_key_prefix: uuid,
1533            candidate_lease_ttl,
1534            meta_lease_ttl,
1535            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1536        };
1537
1538        leader_pg_election
1539            .pg_client
1540            .read()
1541            .await
1542            .query(&leader_pg_election.sql_set.campaign, &[])
1543            .await
1544            .unwrap();
1545        leader_pg_election.elected().await.unwrap();
1546
1547        // Step 1: As a follower, the leader exists and the lease is not expired.
1548        follower_pg_election.follower_action().await.unwrap();
1549
1550        // Step 2: As a follower, the leader exists but the lease expired.
1551        tokio::time::sleep(Duration::from_secs(2)).await;
1552        assert!(follower_pg_election.follower_action().await.is_err());
1553
1554        // Step 3: As a follower, the leader does not exist.
1555        leader_pg_election
1556            .delete_value(&leader_pg_election.election_key())
1557            .await
1558            .unwrap();
1559        assert!(follower_pg_election.follower_action().await.is_err());
1560
1561        // Step 4: Follower thinks it's the leader but failed to acquire the lock.
1562        follower_pg_election
1563            .is_leader
1564            .store(true, Ordering::Relaxed);
1565        assert!(follower_pg_election.follower_action().await.is_err());
1566
1567        match rx.recv().await {
1568            Ok(LeaderChangeMessage::StepDown(key)) => {
1569                assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1570                assert_eq!(
1571                    String::from_utf8_lossy(key.key()),
1572                    follower_pg_election.election_key()
1573                );
1574                assert_eq!(key.lease_id(), i64::default());
1575                assert_eq!(key.revision(), i64::default());
1576            }
1577            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1578        }
1579
1580        // Clean up
1581        leader_pg_election
1582            .pg_client
1583            .read()
1584            .await
1585            .query(&leader_pg_election.sql_set.step_down, &[])
1586            .await
1587            .unwrap();
1588
1589        drop_table(&follower_pg_election, table_name).await;
1590    }
1591
1592    #[tokio::test]
1593    async fn test_reset_campaign() {
1594        maybe_skip_postgres_integration_test!();
1595        let leader_value = "test_leader".to_string();
1596        let uuid = uuid::Uuid::new_v4().to_string();
1597        let table_name = "test_reset_campaign_greptime_metakv";
1598        let candidate_lease_ttl = Duration::from_secs(5);
1599        let execution_timeout = Duration::from_secs(10);
1600        let statement_timeout = Duration::from_secs(10);
1601        let meta_lease_ttl = Duration::from_secs(2);
1602        let idle_session_timeout = Duration::from_secs(0);
1603        let client = create_postgres_client(
1604            Some(table_name),
1605            execution_timeout,
1606            idle_session_timeout,
1607            statement_timeout,
1608        )
1609        .await
1610        .unwrap();
1611
1612        let (tx, _) = broadcast::channel(100);
1613        let leader_pg_election = PgElection {
1614            leader_value,
1615            pg_client: RwLock::new(client),
1616            is_leader: AtomicBool::new(false),
1617            leader_infancy: AtomicBool::new(true),
1618            leader_watcher: tx,
1619            store_key_prefix: uuid,
1620            candidate_lease_ttl,
1621            meta_lease_ttl,
1622            sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1623        };
1624        leader_pg_election.is_leader.store(true, Ordering::Relaxed);
1625        leader_pg_election.reset_campaign().await;
1626        assert!(!leader_pg_election.is_leader());
1627        drop_table(&leader_pg_election, table_name).await;
1628    }
1629
1630    #[tokio::test]
1631    async fn test_idle_session_timeout() {
1632        maybe_skip_postgres_integration_test!();
1633        common_telemetry::init_default_ut_logging();
1634        let execution_timeout = Duration::from_secs(10);
1635        let statement_timeout = Duration::from_secs(10);
1636        let idle_session_timeout = Duration::from_secs(1);
1637        let mut client = create_postgres_client(
1638            None,
1639            execution_timeout,
1640            idle_session_timeout,
1641            statement_timeout,
1642        )
1643        .await
1644        .unwrap();
1645        tokio::time::sleep(Duration::from_millis(1100)).await;
1646        // Wait for the idle session timeout.
1647        let err = client.query("SELECT 1", &[]).await.unwrap_err();
1648        assert_matches!(err, error::Error::PostgresExecution { .. });
1649        let error::Error::PostgresExecution { error, .. } = err else {
1650            panic!("Expected PostgresExecution error");
1651        };
1652        assert!(error.is_closed());
1653        // Reset the client and try again.
1654        client.reset_client().await.unwrap();
1655        let _ = client.query("SELECT 1", &[]).await.unwrap();
1656    }
1657
1658    #[test]
1659    fn test_election_sql_with_schema() {
1660        let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1661        let s = f.build();
1662        assert!(s.campaign.contains("pg_try_advisory_lock"));
1663        assert!(
1664            s.put_value_with_lease
1665                .contains("\"test_schema\".\"greptime_metakv\"")
1666        );
1667        assert!(
1668            s.update_value_with_lease
1669                .contains("\"test_schema\".\"greptime_metakv\"")
1670        );
1671        assert!(
1672            s.get_value_with_lease
1673                .contains("\"test_schema\".\"greptime_metakv\"")
1674        );
1675        assert!(
1676            s.get_value_with_lease_by_prefix
1677                .contains("\"test_schema\".\"greptime_metakv\"")
1678        );
1679        assert!(
1680            s.delete_value
1681                .contains("\"test_schema\".\"greptime_metakv\"")
1682        );
1683    }
1684}