meta_srv/election/rds/
postgres.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, info, warn};
21use common_time::Timestamp;
22use deadpool_postgres::{Manager, Pool};
23use snafu::{OptionExt, ResultExt, ensure};
24use tokio::sync::{RwLock, broadcast};
25use tokio::time::MissedTickBehavior;
26use tokio_postgres::Row;
27use tokio_postgres::types::ToSql;
28
29use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
30use crate::election::{
31    Election, LeaderChangeMessage, listen_leader_change, send_leader_change_and_set_flags,
32};
33use crate::error::{
34    DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35    Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40    lock_id: u64,
41    schema_name: Option<&'a str>,
42    table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46    campaign: String,
47    step_down: String,
48    // SQL to put a value with expire time.
49    //
50    // Parameters for the query:
51    // `$1`: key,
52    // `$2`: value,
53    // `$3`: lease time in seconds
54    //
55    // Returns:
56    // If the key already exists, return the previous value.
57    put_value_with_lease: String,
58    // SQL to update a value with expire time.
59    //
60    // Parameters for the query:
61    // `$1`: key,
62    // `$2`: previous value,
63    // `$3`: updated value,
64    // `$4`: lease time in seconds
65    update_value_with_lease: String,
66    // SQL to get a value with expire time.
67    //
68    // Parameters:
69    // `$1`: key
70    get_value_with_lease: String,
71    // SQL to get all values with expire time with the given key prefix.
72    //
73    // Parameters:
74    // `$1`: key prefix like 'prefix%'
75    //
76    // Returns:
77    // column 0: value,
78    // column 1: current timestamp
79    get_value_with_lease_by_prefix: String,
80    // SQL to delete a value.
81    //
82    // Parameters:
83    // `$1`: key
84    //
85    // Returns:
86    // column 0: key deleted,
87    // column 1: value deleted
88    delete_value: String,
89}
90
91impl<'a> ElectionSqlFactory<'a> {
92    fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
93        Self {
94            lock_id,
95            schema_name,
96            table_name,
97        }
98    }
99
100    fn table_ident(&self) -> String {
101        match self.schema_name {
102            Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
103            _ => format!("\"{}\"", self.table_name),
104        }
105    }
106
107    fn build(self) -> ElectionSqlSet {
108        ElectionSqlSet {
109            campaign: self.campaign_sql(),
110            step_down: self.step_down_sql(),
111            put_value_with_lease: self.put_value_with_lease_sql(),
112            update_value_with_lease: self.update_value_with_lease_sql(),
113            get_value_with_lease: self.get_value_with_lease_sql(),
114            get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
115            delete_value: self.delete_value_sql(),
116        }
117    }
118
119    fn campaign_sql(&self) -> String {
120        format!("SELECT pg_try_advisory_lock({})", self.lock_id)
121    }
122
123    fn step_down_sql(&self) -> String {
124        format!("SELECT pg_advisory_unlock({})", self.lock_id)
125    }
126
127    fn put_value_with_lease_sql(&self) -> String {
128        let table = self.table_ident();
129        format!(
130            r#"WITH prev AS (
131                SELECT k, v FROM {table} WHERE k = $1
132            ), insert AS (
133                INSERT INTO {table}
134                VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
135                ON CONFLICT (k) DO NOTHING
136            )
137            SELECT k, v FROM prev;
138            "#,
139            table = table,
140            lease_sep = LEASE_SEP
141        )
142    }
143
144    fn update_value_with_lease_sql(&self) -> String {
145        let table = self.table_ident();
146        format!(
147            r#"UPDATE {table}
148               SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
149               WHERE k = $1 AND v = $2"#,
150            table = table,
151            lease_sep = LEASE_SEP
152        )
153    }
154
155    fn get_value_with_lease_sql(&self) -> String {
156        let table = self.table_ident();
157        format!(
158            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
159            table = table
160        )
161    }
162
163    fn get_value_with_lease_by_prefix_sql(&self) -> String {
164        let table = self.table_ident();
165        format!(
166            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
167            table = table
168        )
169    }
170
171    fn delete_value_sql(&self) -> String {
172        let table = self.table_ident();
173        format!(
174            "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
175            table = table
176        )
177    }
178}
179
180/// PgClient for election.
181pub struct ElectionPgClient {
182    current: Option<deadpool::managed::Object<Manager>>,
183    pool: Pool,
184    /// The client-side timeout for statement execution.
185    ///
186    /// This timeout is enforced by the client application and is independent of any server-side timeouts.
187    /// If a statement takes longer than this duration to execute, the client will abort the operation.
188    execution_timeout: Duration,
189
190    /// The idle session timeout.
191    ///
192    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
193    /// If a session remains idle for longer than this duration, the server will terminate it.
194    idle_session_timeout: Duration,
195
196    /// The statement timeout.
197    ///
198    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
199    /// If a statement takes longer than this duration to execute, the server will abort it.
200    statement_timeout: Duration,
201}
202
203impl ElectionPgClient {
204    pub fn new(
205        pool: Pool,
206        execution_timeout: Duration,
207        idle_session_timeout: Duration,
208        statement_timeout: Duration,
209    ) -> Result<ElectionPgClient> {
210        Ok(ElectionPgClient {
211            current: None,
212            pool,
213            execution_timeout,
214            idle_session_timeout,
215            statement_timeout,
216        })
217    }
218
219    fn set_idle_session_timeout_sql(&self) -> String {
220        format!(
221            "SET idle_session_timeout = '{}s';",
222            self.idle_session_timeout.as_secs()
223        )
224    }
225
226    fn set_statement_timeout_sql(&self) -> String {
227        format!(
228            "SET statement_timeout = '{}s';",
229            self.statement_timeout.as_secs()
230        )
231    }
232
233    async fn reset_client(&mut self) -> Result<()> {
234        if let Some(client) = self.current.take() {
235            // Remove the connection from deadpool and drop it,
236            // forcing TCP close and backend termination.
237            let inner = deadpool::managed::Object::<deadpool_postgres::Manager>::take(client);
238            drop(inner);
239        }
240        self.maybe_init_client().await
241    }
242
243    async fn maybe_init_client(&mut self) -> Result<()> {
244        if self.current.is_none() {
245            let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
246
247            self.current = Some(client);
248            // Set idle session timeout and statement timeout.
249            let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
250            self.execute(&idle_session_timeout_sql, &[]).await?;
251            let statement_timeout_sql = self.set_statement_timeout_sql();
252            self.execute(&statement_timeout_sql, &[]).await?;
253        }
254
255        Ok(())
256    }
257
258    /// Returns the result of the query.
259    ///
260    /// # Panics
261    /// if `current` is `None`.
262    async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
263        let result = tokio::time::timeout(
264            self.execution_timeout,
265            self.current.as_ref().unwrap().execute(sql, params),
266        )
267        .await
268        .map_err(|_| {
269            SqlExecutionTimeoutSnafu {
270                sql: sql.to_string(),
271                duration: self.execution_timeout,
272            }
273            .build()
274        })?;
275
276        result.context(PostgresExecutionSnafu { sql })
277    }
278
279    /// Returns the result of the query.
280    ///
281    /// # Panics
282    /// if `current` is `None`.
283    async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
284        let result = tokio::time::timeout(
285            self.execution_timeout,
286            self.current.as_ref().unwrap().query(sql, params),
287        )
288        .await
289        .map_err(|_| {
290            SqlExecutionTimeoutSnafu {
291                sql: sql.to_string(),
292                duration: self.execution_timeout,
293            }
294            .build()
295        })?;
296
297        result.context(PostgresExecutionSnafu { sql })
298    }
299}
300
301/// PostgreSql implementation of Election.
302pub struct PgElection {
303    leader_value: String,
304    pg_client: RwLock<ElectionPgClient>,
305    is_leader: AtomicBool,
306    leader_infancy: AtomicBool,
307    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
308    store_key_prefix: String,
309    candidate_lease_ttl: Duration,
310    meta_lease_ttl: Duration,
311    sql_set: ElectionSqlSet,
312}
313
314impl PgElection {
315    async fn maybe_init_client(&self) -> Result<()> {
316        if self.pg_client.read().await.current.is_none() {
317            self.pg_client.write().await.maybe_init_client().await?;
318        }
319
320        Ok(())
321    }
322
323    #[allow(clippy::too_many_arguments)]
324    pub async fn with_pg_client(
325        leader_value: String,
326        pg_client: ElectionPgClient,
327        store_key_prefix: String,
328        candidate_lease_ttl: Duration,
329        meta_lease_ttl: Duration,
330        schema_name: Option<&str>,
331        table_name: &str,
332        lock_id: u64,
333    ) -> Result<ElectionRef> {
334        if let Some(s) = schema_name {
335            common_telemetry::info!("PgElection uses schema: {}", s);
336        } else {
337            common_telemetry::info!("PgElection uses default search_path (no schema provided)");
338        }
339        let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
340
341        let tx = listen_leader_change(leader_value.clone());
342        Ok(Arc::new(Self {
343            leader_value,
344            pg_client: RwLock::new(pg_client),
345            is_leader: AtomicBool::new(false),
346            leader_infancy: AtomicBool::new(false),
347            leader_watcher: tx,
348            store_key_prefix,
349            candidate_lease_ttl,
350            meta_lease_ttl,
351            sql_set: sql_factory.build(),
352        }))
353    }
354
355    fn election_key(&self) -> String {
356        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
357    }
358
359    fn candidate_root(&self) -> String {
360        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
361    }
362
363    fn candidate_key(&self) -> String {
364        format!("{}{}", self.candidate_root(), self.leader_value)
365    }
366}
367
368#[async_trait::async_trait]
369impl Election for PgElection {
370    type Leader = LeaderValue;
371
372    fn is_leader(&self) -> bool {
373        self.is_leader.load(Ordering::Relaxed)
374    }
375
376    fn in_leader_infancy(&self) -> bool {
377        self.leader_infancy
378            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
379            .is_ok()
380    }
381
382    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
383        let key = self.candidate_key();
384        let node_info =
385            serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
386                input: format!("{node_info:?}"),
387            })?;
388        let res = self
389            .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
390            .await?;
391        // May registered before, just update the lease.
392        if !res {
393            self.delete_value(&key).await?;
394            self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
395                .await?;
396        }
397
398        // Check if the current lease has expired and renew the lease.
399        let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
400        loop {
401            let _ = keep_alive_interval.tick().await;
402
403            let lease = self
404                .get_value_with_lease(&key)
405                .await?
406                .context(UnexpectedSnafu {
407                    violated: format!("Failed to get lease for key: {:?}", key),
408                })?;
409
410            ensure!(
411                lease.expire_time > lease.current,
412                UnexpectedSnafu {
413                    violated: format!(
414                        "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
415                        lease.expire_time, lease.current, key
416                    ),
417                }
418            );
419
420            // Safety: origin is Some since we are using `get_value_with_lease` with `true`.
421            self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
422                .await?;
423        }
424    }
425
426    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
427        let key_prefix = self.candidate_root();
428        let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
429        // Remove expired candidates
430        candidates.retain(|c| c.1 > current);
431        let mut valid_candidates = Vec::with_capacity(candidates.len());
432        for (c, _) in candidates {
433            let node_info: MetasrvNodeInfo =
434                serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
435                    input: format!("{:?}", c),
436                })?;
437            valid_candidates.push(node_info);
438        }
439        Ok(valid_candidates)
440    }
441
442    /// Attempts to acquire leadership by executing a campaign. This function continuously checks
443    /// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
444    ///
445    /// The function operates in a loop, where it:
446    ///
447    /// 1. Waits for a predefined interval before attempting to acquire the lock again.
448    /// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock.
449    /// 3. Checks the result of the query:
450    ///    - If the lock is successfully acquired (result is true), it calls the `leader_action` method
451    ///      to perform actions as the leader.
452    ///    - If the lock is not acquired (result is false), it calls the `follower_action` method
453    ///      to perform actions as a follower.
454    async fn campaign(&self) -> Result<()> {
455        let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
456        keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
457
458        self.maybe_init_client().await?;
459        loop {
460            let res = self
461                .pg_client
462                .read()
463                .await
464                .query(&self.sql_set.campaign, &[])
465                .await?;
466            let row = res.first().context(UnexpectedSnafu {
467                violated: "Failed to get the result of acquiring advisory lock",
468            })?;
469            let is_leader = row.try_get(0).map_err(|_| {
470                UnexpectedSnafu {
471                    violated: "Failed to get the result of get lock",
472                }
473                .build()
474            })?;
475            if is_leader {
476                self.leader_action().await?;
477            } else {
478                self.follower_action().await?;
479            }
480            let _ = keep_alive_interval.tick().await;
481        }
482    }
483
484    async fn reset_campaign(&self) {
485        info!("Resetting campaign");
486        if self.is_leader.load(Ordering::Relaxed) {
487            if let Err(err) = self.step_down_without_lock().await {
488                error!(err; "Failed to step down without lock");
489            }
490            info!("Step down without lock successfully, due to reset campaign");
491        }
492        if let Err(err) = self.pg_client.write().await.reset_client().await {
493            error!(err; "Failed to reset client");
494        }
495    }
496
497    async fn leader(&self) -> Result<Self::Leader> {
498        if self.is_leader.load(Ordering::Relaxed) {
499            Ok(self.leader_value.as_bytes().into())
500        } else {
501            let key = self.election_key();
502            if let Some(lease) = self.get_value_with_lease(&key).await? {
503                ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
504                Ok(lease.leader_value.as_bytes().into())
505            } else {
506                NoLeaderSnafu.fail()
507            }
508        }
509    }
510
511    async fn resign(&self) -> Result<()> {
512        todo!()
513    }
514
515    fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
516        self.leader_watcher.subscribe()
517    }
518}
519
520impl PgElection {
521    /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
522    async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
523        let key = key.as_bytes();
524        self.maybe_init_client().await?;
525        let res = self
526            .pg_client
527            .read()
528            .await
529            .query(&self.sql_set.get_value_with_lease, &[&key])
530            .await?;
531
532        if res.is_empty() {
533            Ok(None)
534        } else {
535            // Safety: Checked if res is empty above.
536            let current_time_str = res[0].try_get(1).unwrap_or_default();
537            let current_time = match Timestamp::from_str(current_time_str, None) {
538                Ok(ts) => ts,
539                Err(_) => UnexpectedSnafu {
540                    violated: format!("Invalid timestamp: {}", current_time_str),
541                }
542                .fail()?,
543            };
544            // Safety: Checked if res is empty above.
545            let value_and_expire_time =
546                String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
547            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
548
549            Ok(Some(Lease {
550                leader_value: value,
551                expire_time,
552                current: current_time,
553                origin: value_and_expire_time.to_string(),
554            }))
555        }
556    }
557
558    /// Returns all values and expire time with the given key prefix. Also returns the current time.
559    async fn get_value_with_lease_by_prefix(
560        &self,
561        key_prefix: &str,
562    ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
563        let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
564        self.maybe_init_client().await?;
565        let res = self
566            .pg_client
567            .read()
568            .await
569            .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
570            .await?;
571
572        let mut values_with_leases = vec![];
573        let mut current = Timestamp::default();
574        for row in res {
575            let current_time_str = row.try_get(1).unwrap_or_default();
576            current = match Timestamp::from_str(current_time_str, None) {
577                Ok(ts) => ts,
578                Err(_) => UnexpectedSnafu {
579                    violated: format!("Invalid timestamp: {}", current_time_str),
580                }
581                .fail()?,
582            };
583
584            let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
585            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
586
587            values_with_leases.push((value, expire_time));
588        }
589        Ok((values_with_leases, current))
590    }
591
592    async fn update_value_with_lease(
593        &self,
594        key: &str,
595        prev: &str,
596        updated: &str,
597        lease_ttl: Duration,
598    ) -> Result<()> {
599        let key = key.as_bytes();
600        let prev = prev.as_bytes();
601        self.maybe_init_client().await?;
602        let lease_ttl_secs = lease_ttl.as_secs() as f64;
603        let res = self
604            .pg_client
605            .read()
606            .await
607            .execute(
608                &self.sql_set.update_value_with_lease,
609                &[&key, &prev, &updated, &lease_ttl_secs],
610            )
611            .await?;
612
613        ensure!(
614            res == 1,
615            UnexpectedSnafu {
616                violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
617            }
618        );
619
620        Ok(())
621    }
622
623    /// Returns `true` if the insertion is successful
624    async fn put_value_with_lease(
625        &self,
626        key: &str,
627        value: &str,
628        lease_ttl: Duration,
629    ) -> Result<bool> {
630        let key = key.as_bytes();
631        let lease_ttl_secs = lease_ttl.as_secs() as f64;
632        let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
633        self.maybe_init_client().await?;
634        let res = self
635            .pg_client
636            .read()
637            .await
638            .query(&self.sql_set.put_value_with_lease, &params)
639            .await?;
640        Ok(res.is_empty())
641    }
642
643    /// Returns `true` if the deletion is successful.
644    /// Caution: Should only delete the key if the lease is expired.
645    async fn delete_value(&self, key: &str) -> Result<bool> {
646        let key = key.as_bytes();
647        self.maybe_init_client().await?;
648        let res = self
649            .pg_client
650            .read()
651            .await
652            .query(&self.sql_set.delete_value, &[&key])
653            .await?;
654
655        Ok(res.len() == 1)
656    }
657
658    /// Handles the actions of a leader in the election process.
659    ///
660    /// This function performs the following checks and actions:
661    ///
662    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
663    ///   it attempts to renew the lease. It checks if the lease is still valid and either renews it
664    ///   or steps down if it has expired.
665    ///
666    ///   - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease
667    ///     by updating the value associated with the election key.
668    ///   - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning
669    ///     and steps down, initiating a new campaign for leadership.
670    ///   - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning
671    ///     indicating that it still holds the lock and steps down to re-initiate the campaign. This may
672    ///     happen if the leader has failed to renew the lease and the session has expired, and recovery
673    ///     after a period of time during which other leaders have been elected and stepped down.
674    ///   - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
675    ///
676    /// - **Case 2**: If the current instance is not leader previously, it calls the `elected` method
677    ///   as a newly elected leader.
678    async fn leader_action(&self) -> Result<()> {
679        let key = self.election_key();
680        // Case 1
681        if self.is_leader() {
682            match self.get_value_with_lease(&key).await? {
683                Some(lease) => {
684                    match (
685                        lease.leader_value == self.leader_value,
686                        lease.expire_time > lease.current,
687                    ) {
688                        // Case 1.1
689                        (true, true) => {
690                            // Safety: prev is Some since we are using `get_value_with_lease` with `true`.
691                            self.update_value_with_lease(
692                                &key,
693                                &lease.origin,
694                                &self.leader_value,
695                                self.meta_lease_ttl,
696                            )
697                            .await?;
698                        }
699                        // Case 1.2
700                        (true, false) => {
701                            warn!("Leader lease expired, now stepping down.");
702                            self.step_down().await?;
703                        }
704                        // Case 1.3
705                        (false, _) => {
706                            warn!(
707                                "Leader lease not found, but still hold the lock. Now stepping down."
708                            );
709                            self.step_down().await?;
710                        }
711                    }
712                }
713                // Case 1.4
714                None => {
715                    warn!("Leader lease not found, but still hold the lock. Now stepping down.");
716                    self.step_down().await?;
717                }
718            }
719        // Case 2
720        } else {
721            self.elected().await?;
722        }
723        Ok(())
724    }
725
726    /// Handles the actions of a follower in the election process.
727    ///
728    /// This function performs the following checks and actions:
729    ///
730    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
731    ///   it steps down without deleting the key.
732    /// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error
733    ///   to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
734    ///   will be released.
735    /// - **Case 3**: If all checks pass, the function returns without performing any actions.
736    async fn follower_action(&self) -> Result<()> {
737        let key = self.election_key();
738        // Case 1
739        if self.is_leader() {
740            self.step_down_without_lock().await?;
741        }
742        let lease = self
743            .get_value_with_lease(&key)
744            .await?
745            .context(NoLeaderSnafu)?;
746        // Case 2
747        ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
748        // Case 3
749        Ok(())
750    }
751
752    /// Step down the leader. The leader should delete the key and notify the leader watcher.
753    ///
754    /// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected.
755    ///
756    /// ## Caution:
757    /// Should only step down while holding the advisory lock.
758    async fn step_down(&self) -> Result<()> {
759        let key = self.election_key();
760        let leader_key = RdsLeaderKey {
761            name: self.leader_value.clone().into_bytes(),
762            key: key.clone().into_bytes(),
763            ..Default::default()
764        };
765        self.delete_value(&key).await?;
766        self.maybe_init_client().await?;
767        self.pg_client
768            .read()
769            .await
770            .query(&self.sql_set.step_down, &[])
771            .await?;
772        send_leader_change_and_set_flags(
773            &self.is_leader,
774            &self.leader_infancy,
775            &self.leader_watcher,
776            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
777        );
778        Ok(())
779    }
780
781    /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
782    async fn step_down_without_lock(&self) -> Result<()> {
783        let key = self.election_key().into_bytes();
784        let leader_key = RdsLeaderKey {
785            name: self.leader_value.clone().into_bytes(),
786            key: key.clone(),
787            ..Default::default()
788        };
789        send_leader_change_and_set_flags(
790            &self.is_leader,
791            &self.leader_infancy,
792            &self.leader_watcher,
793            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
794        );
795        Ok(())
796    }
797
798    /// Elected as leader. The leader should put the key and notify the leader watcher.
799    /// Caution: Should only elected while holding the advisory lock.
800    async fn elected(&self) -> Result<()> {
801        let key = self.election_key();
802        let leader_key = RdsLeaderKey {
803            name: self.leader_value.clone().into_bytes(),
804            key: key.clone().into_bytes(),
805            ..Default::default()
806        };
807        self.delete_value(&key).await?;
808        self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
809            .await?;
810
811        if self
812            .is_leader
813            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
814            .is_ok()
815        {
816            self.leader_infancy.store(true, Ordering::Release);
817
818            if let Err(e) = self
819                .leader_watcher
820                .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
821            {
822                error!(e; "Failed to send leader change message");
823            }
824        }
825        Ok(())
826    }
827}
828
829#[cfg(test)]
830mod tests {
831    use std::assert_matches::assert_matches;
832    use std::env;
833
834    use common_meta::maybe_skip_postgres_integration_test;
835
836    use super::*;
837    use crate::error;
838    use crate::utils::postgres::create_postgres_pool;
839
840    async fn create_postgres_client(
841        table_name: Option<&str>,
842        execution_timeout: Duration,
843        idle_session_timeout: Duration,
844        statement_timeout: Duration,
845    ) -> Result<ElectionPgClient> {
846        let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
847        if endpoint.is_empty() {
848            return UnexpectedSnafu {
849                violated: "Postgres endpoint is empty".to_string(),
850            }
851            .fail();
852        }
853        let pool = create_postgres_pool(&[endpoint], None, None).await.unwrap();
854        let mut pg_client = ElectionPgClient::new(
855            pool,
856            execution_timeout,
857            idle_session_timeout,
858            statement_timeout,
859        )
860        .unwrap();
861        pg_client.maybe_init_client().await?;
862        if let Some(table_name) = table_name {
863            let create_table_sql = format!(
864                "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
865                table_name
866            );
867            pg_client.execute(&create_table_sql, &[]).await?;
868        }
869        Ok(pg_client)
870    }
871
872    async fn drop_table(pg_election: &PgElection, table_name: &str) {
873        let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
874        pg_election
875            .pg_client
876            .read()
877            .await
878            .execute(&sql, &[])
879            .await
880            .unwrap();
881    }
882
883    #[tokio::test]
884    async fn test_postgres_crud() {
885        maybe_skip_postgres_integration_test!();
886        let key = "test_key".to_string();
887        let value = "test_value".to_string();
888
889        let uuid = uuid::Uuid::new_v4().to_string();
890        let table_name = "test_postgres_crud_greptime_metakv";
891        let candidate_lease_ttl = Duration::from_secs(10);
892        let execution_timeout = Duration::from_secs(10);
893        let statement_timeout = Duration::from_secs(10);
894        let meta_lease_ttl = Duration::from_secs(2);
895        let idle_session_timeout = Duration::from_secs(0);
896        let client = create_postgres_client(
897            Some(table_name),
898            execution_timeout,
899            idle_session_timeout,
900            statement_timeout,
901        )
902        .await
903        .unwrap();
904
905        let (tx, _) = broadcast::channel(100);
906        let pg_election = PgElection {
907            leader_value: "test_leader".to_string(),
908            pg_client: RwLock::new(client),
909            is_leader: AtomicBool::new(false),
910            leader_infancy: AtomicBool::new(true),
911            leader_watcher: tx,
912            store_key_prefix: uuid,
913            candidate_lease_ttl,
914            meta_lease_ttl,
915            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
916        };
917
918        let res = pg_election
919            .put_value_with_lease(&key, &value, candidate_lease_ttl)
920            .await
921            .unwrap();
922        assert!(res);
923
924        let lease = pg_election
925            .get_value_with_lease(&key)
926            .await
927            .unwrap()
928            .unwrap();
929        assert_eq!(lease.leader_value, value);
930
931        pg_election
932            .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
933            .await
934            .unwrap();
935
936        let res = pg_election.delete_value(&key).await.unwrap();
937        assert!(res);
938
939        let res = pg_election.get_value_with_lease(&key).await.unwrap();
940        assert!(res.is_none());
941
942        for i in 0..10 {
943            let key = format!("test_key_{}", i);
944            let value = format!("test_value_{}", i);
945            pg_election
946                .put_value_with_lease(&key, &value, candidate_lease_ttl)
947                .await
948                .unwrap();
949        }
950
951        let key_prefix = "test_key".to_string();
952        let (res, _) = pg_election
953            .get_value_with_lease_by_prefix(&key_prefix)
954            .await
955            .unwrap();
956        assert_eq!(res.len(), 10);
957
958        for i in 0..10 {
959            let key = format!("test_key_{}", i);
960            let res = pg_election.delete_value(&key).await.unwrap();
961            assert!(res);
962        }
963
964        let (res, current) = pg_election
965            .get_value_with_lease_by_prefix(&key_prefix)
966            .await
967            .unwrap();
968        assert!(res.is_empty());
969        assert!(current == Timestamp::default());
970
971        drop_table(&pg_election, table_name).await;
972    }
973
974    async fn candidate(
975        leader_value: String,
976        candidate_lease_ttl: Duration,
977        store_key_prefix: String,
978        table_name: String,
979    ) {
980        let execution_timeout = Duration::from_secs(10);
981        let statement_timeout = Duration::from_secs(10);
982        let meta_lease_ttl = Duration::from_secs(2);
983        let idle_session_timeout = Duration::from_secs(0);
984        let client = create_postgres_client(
985            None,
986            execution_timeout,
987            idle_session_timeout,
988            statement_timeout,
989        )
990        .await
991        .unwrap();
992
993        let (tx, _) = broadcast::channel(100);
994        let pg_election = PgElection {
995            leader_value,
996            pg_client: RwLock::new(client),
997            is_leader: AtomicBool::new(false),
998            leader_infancy: AtomicBool::new(true),
999            leader_watcher: tx,
1000            store_key_prefix,
1001            candidate_lease_ttl,
1002            meta_lease_ttl,
1003            sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
1004        };
1005
1006        let node_info = MetasrvNodeInfo {
1007            addr: "test_addr".to_string(),
1008            version: "test_version".to_string(),
1009            git_commit: "test_git_commit".to_string(),
1010            start_time_ms: 0,
1011            total_cpu_millicores: 0,
1012            total_memory_bytes: 0,
1013            cpu_usage_millicores: 0,
1014            memory_usage_bytes: 0,
1015            hostname: "test_hostname".to_string(),
1016        };
1017        pg_election.register_candidate(&node_info).await.unwrap();
1018    }
1019
1020    #[tokio::test]
1021    async fn test_candidate_registration() {
1022        maybe_skip_postgres_integration_test!();
1023        let leader_value_prefix = "test_leader".to_string();
1024        let uuid = uuid::Uuid::new_v4().to_string();
1025        let table_name = "test_candidate_registration_greptime_metakv";
1026        let mut handles = vec![];
1027        let candidate_lease_ttl = Duration::from_secs(5);
1028        let execution_timeout = Duration::from_secs(10);
1029        let statement_timeout = Duration::from_secs(10);
1030        let meta_lease_ttl = Duration::from_secs(2);
1031        let idle_session_timeout = Duration::from_secs(0);
1032        let client = create_postgres_client(
1033            Some(table_name),
1034            execution_timeout,
1035            idle_session_timeout,
1036            statement_timeout,
1037        )
1038        .await
1039        .unwrap();
1040
1041        for i in 0..10 {
1042            let leader_value = format!("{}{}", leader_value_prefix, i);
1043            let handle = tokio::spawn(candidate(
1044                leader_value,
1045                candidate_lease_ttl,
1046                uuid.clone(),
1047                table_name.to_string(),
1048            ));
1049            handles.push(handle);
1050        }
1051        // Wait for candidates to register themselves and renew their leases at least once.
1052        tokio::time::sleep(Duration::from_secs(3)).await;
1053
1054        let (tx, _) = broadcast::channel(100);
1055        let leader_value = "test_leader".to_string();
1056        let pg_election = PgElection {
1057            leader_value,
1058            pg_client: RwLock::new(client),
1059            is_leader: AtomicBool::new(false),
1060            leader_infancy: AtomicBool::new(true),
1061            leader_watcher: tx,
1062            store_key_prefix: uuid.clone(),
1063            candidate_lease_ttl,
1064            meta_lease_ttl,
1065            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1066        };
1067
1068        let candidates = pg_election.all_candidates().await.unwrap();
1069        assert_eq!(candidates.len(), 10);
1070
1071        for handle in handles {
1072            handle.abort();
1073        }
1074
1075        // Wait for the candidate leases to expire.
1076        tokio::time::sleep(Duration::from_secs(5)).await;
1077        let candidates = pg_election.all_candidates().await.unwrap();
1078        assert!(candidates.is_empty());
1079
1080        // Garbage collection
1081        for i in 0..10 {
1082            let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1083            let res = pg_election.delete_value(&key).await.unwrap();
1084            assert!(res);
1085        }
1086
1087        drop_table(&pg_election, table_name).await;
1088    }
1089
1090    #[tokio::test]
1091    async fn test_elected_and_step_down() {
1092        maybe_skip_postgres_integration_test!();
1093        let leader_value = "test_leader".to_string();
1094        let uuid = uuid::Uuid::new_v4().to_string();
1095        let table_name = "test_elected_and_step_down_greptime_metakv";
1096        let candidate_lease_ttl = Duration::from_secs(5);
1097        let execution_timeout = Duration::from_secs(10);
1098        let statement_timeout = Duration::from_secs(10);
1099        let meta_lease_ttl = Duration::from_secs(2);
1100        let idle_session_timeout = Duration::from_secs(0);
1101        let client = create_postgres_client(
1102            Some(table_name),
1103            execution_timeout,
1104            idle_session_timeout,
1105            statement_timeout,
1106        )
1107        .await
1108        .unwrap();
1109
1110        let (tx, mut rx) = broadcast::channel(100);
1111        let leader_pg_election = PgElection {
1112            leader_value: leader_value.clone(),
1113            pg_client: RwLock::new(client),
1114            is_leader: AtomicBool::new(false),
1115            leader_infancy: AtomicBool::new(true),
1116            leader_watcher: tx,
1117            store_key_prefix: uuid,
1118            candidate_lease_ttl,
1119            meta_lease_ttl,
1120            sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1121        };
1122
1123        leader_pg_election.elected().await.unwrap();
1124        let lease = leader_pg_election
1125            .get_value_with_lease(&leader_pg_election.election_key())
1126            .await
1127            .unwrap()
1128            .unwrap();
1129        assert!(lease.leader_value == leader_value);
1130        assert!(lease.expire_time > lease.current);
1131        assert!(leader_pg_election.is_leader());
1132
1133        match rx.recv().await {
1134            Ok(LeaderChangeMessage::Elected(key)) => {
1135                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1136                assert_eq!(
1137                    String::from_utf8_lossy(key.key()),
1138                    leader_pg_election.election_key()
1139                );
1140                assert_eq!(key.lease_id(), i64::default());
1141                assert_eq!(key.revision(), i64::default());
1142            }
1143            _ => panic!("Expected LeaderChangeMessage::Elected"),
1144        }
1145
1146        leader_pg_election.step_down_without_lock().await.unwrap();
1147        let lease = leader_pg_election
1148            .get_value_with_lease(&leader_pg_election.election_key())
1149            .await
1150            .unwrap()
1151            .unwrap();
1152        assert!(lease.leader_value == leader_value);
1153        assert!(!leader_pg_election.is_leader());
1154
1155        match rx.recv().await {
1156            Ok(LeaderChangeMessage::StepDown(key)) => {
1157                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1158                assert_eq!(
1159                    String::from_utf8_lossy(key.key()),
1160                    leader_pg_election.election_key()
1161                );
1162                assert_eq!(key.lease_id(), i64::default());
1163                assert_eq!(key.revision(), i64::default());
1164            }
1165            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1166        }
1167
1168        leader_pg_election.elected().await.unwrap();
1169        let lease = leader_pg_election
1170            .get_value_with_lease(&leader_pg_election.election_key())
1171            .await
1172            .unwrap()
1173            .unwrap();
1174        assert!(lease.leader_value == leader_value);
1175        assert!(lease.expire_time > lease.current);
1176        assert!(leader_pg_election.is_leader());
1177
1178        match rx.recv().await {
1179            Ok(LeaderChangeMessage::Elected(key)) => {
1180                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1181                assert_eq!(
1182                    String::from_utf8_lossy(key.key()),
1183                    leader_pg_election.election_key()
1184                );
1185                assert_eq!(key.lease_id(), i64::default());
1186                assert_eq!(key.revision(), i64::default());
1187            }
1188            _ => panic!("Expected LeaderChangeMessage::Elected"),
1189        }
1190
1191        leader_pg_election.step_down().await.unwrap();
1192        let res = leader_pg_election
1193            .get_value_with_lease(&leader_pg_election.election_key())
1194            .await
1195            .unwrap();
1196        assert!(res.is_none());
1197        assert!(!leader_pg_election.is_leader());
1198
1199        match rx.recv().await {
1200            Ok(LeaderChangeMessage::StepDown(key)) => {
1201                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1202                assert_eq!(
1203                    String::from_utf8_lossy(key.key()),
1204                    leader_pg_election.election_key()
1205                );
1206                assert_eq!(key.lease_id(), i64::default());
1207                assert_eq!(key.revision(), i64::default());
1208            }
1209            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1210        }
1211
1212        drop_table(&leader_pg_election, table_name).await;
1213    }
1214
1215    #[tokio::test]
1216    async fn test_leader_action() {
1217        maybe_skip_postgres_integration_test!();
1218        let leader_value = "test_leader".to_string();
1219        let uuid = uuid::Uuid::new_v4().to_string();
1220        let table_name = "test_leader_action_greptime_metakv";
1221        let candidate_lease_ttl = Duration::from_secs(5);
1222        let execution_timeout = Duration::from_secs(10);
1223        let statement_timeout = Duration::from_secs(10);
1224        let meta_lease_ttl = Duration::from_secs(2);
1225        let idle_session_timeout = Duration::from_secs(0);
1226        let client = create_postgres_client(
1227            Some(table_name),
1228            execution_timeout,
1229            idle_session_timeout,
1230            statement_timeout,
1231        )
1232        .await
1233        .unwrap();
1234
1235        let (tx, mut rx) = broadcast::channel(100);
1236        let leader_pg_election = PgElection {
1237            leader_value: leader_value.clone(),
1238            pg_client: RwLock::new(client),
1239            is_leader: AtomicBool::new(false),
1240            leader_infancy: AtomicBool::new(true),
1241            leader_watcher: tx,
1242            store_key_prefix: uuid,
1243            candidate_lease_ttl,
1244            meta_lease_ttl,
1245            sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1246        };
1247
1248        // Step 1: No leader exists, campaign and elected.
1249        let res = leader_pg_election
1250            .pg_client
1251            .read()
1252            .await
1253            .query(&leader_pg_election.sql_set.campaign, &[])
1254            .await
1255            .unwrap();
1256        let res: bool = res[0].get(0);
1257        assert!(res);
1258        leader_pg_election.leader_action().await.unwrap();
1259        let lease = leader_pg_election
1260            .get_value_with_lease(&leader_pg_election.election_key())
1261            .await
1262            .unwrap()
1263            .unwrap();
1264        assert!(lease.leader_value == leader_value);
1265        assert!(lease.expire_time > lease.current);
1266        assert!(leader_pg_election.is_leader());
1267
1268        match rx.recv().await {
1269            Ok(LeaderChangeMessage::Elected(key)) => {
1270                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1271                assert_eq!(
1272                    String::from_utf8_lossy(key.key()),
1273                    leader_pg_election.election_key()
1274                );
1275                assert_eq!(key.lease_id(), i64::default());
1276                assert_eq!(key.revision(), i64::default());
1277            }
1278            _ => panic!("Expected LeaderChangeMessage::Elected"),
1279        }
1280
1281        // Step 2: As a leader, renew the lease.
1282        let res = leader_pg_election
1283            .pg_client
1284            .read()
1285            .await
1286            .query(&leader_pg_election.sql_set.campaign, &[])
1287            .await
1288            .unwrap();
1289        let res: bool = res[0].get(0);
1290        assert!(res);
1291        leader_pg_election.leader_action().await.unwrap();
1292        let new_lease = leader_pg_election
1293            .get_value_with_lease(&leader_pg_election.election_key())
1294            .await
1295            .unwrap()
1296            .unwrap();
1297        assert!(new_lease.leader_value == leader_value);
1298        assert!(
1299            new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1300        );
1301        assert!(leader_pg_election.is_leader());
1302
1303        // Step 3: Something wrong, the leader lease expired.
1304        tokio::time::sleep(Duration::from_secs(2)).await;
1305
1306        let res = leader_pg_election
1307            .pg_client
1308            .read()
1309            .await
1310            .query(&leader_pg_election.sql_set.campaign, &[])
1311            .await
1312            .unwrap();
1313        let res: bool = res[0].get(0);
1314        assert!(res);
1315        leader_pg_election.leader_action().await.unwrap();
1316        let res = leader_pg_election
1317            .get_value_with_lease(&leader_pg_election.election_key())
1318            .await
1319            .unwrap();
1320        assert!(res.is_none());
1321
1322        match rx.recv().await {
1323            Ok(LeaderChangeMessage::StepDown(key)) => {
1324                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1325                assert_eq!(
1326                    String::from_utf8_lossy(key.key()),
1327                    leader_pg_election.election_key()
1328                );
1329                assert_eq!(key.lease_id(), i64::default());
1330                assert_eq!(key.revision(), i64::default());
1331            }
1332            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1333        }
1334
1335        // Step 4: Re-campaign and elected.
1336        let res = leader_pg_election
1337            .pg_client
1338            .read()
1339            .await
1340            .query(&leader_pg_election.sql_set.campaign, &[])
1341            .await
1342            .unwrap();
1343        let res: bool = res[0].get(0);
1344        assert!(res);
1345        leader_pg_election.leader_action().await.unwrap();
1346        let lease = leader_pg_election
1347            .get_value_with_lease(&leader_pg_election.election_key())
1348            .await
1349            .unwrap()
1350            .unwrap();
1351        assert!(lease.leader_value == leader_value);
1352        assert!(lease.expire_time > lease.current);
1353        assert!(leader_pg_election.is_leader());
1354
1355        match rx.recv().await {
1356            Ok(LeaderChangeMessage::Elected(key)) => {
1357                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1358                assert_eq!(
1359                    String::from_utf8_lossy(key.key()),
1360                    leader_pg_election.election_key()
1361                );
1362                assert_eq!(key.lease_id(), i64::default());
1363                assert_eq!(key.revision(), i64::default());
1364            }
1365            _ => panic!("Expected LeaderChangeMessage::Elected"),
1366        }
1367
1368        // Step 5: Something wrong, the leader key is deleted by other followers.
1369        leader_pg_election
1370            .delete_value(&leader_pg_election.election_key())
1371            .await
1372            .unwrap();
1373        leader_pg_election.leader_action().await.unwrap();
1374        let res = leader_pg_election
1375            .get_value_with_lease(&leader_pg_election.election_key())
1376            .await
1377            .unwrap();
1378        assert!(res.is_none());
1379        assert!(!leader_pg_election.is_leader());
1380
1381        match rx.recv().await {
1382            Ok(LeaderChangeMessage::StepDown(key)) => {
1383                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1384                assert_eq!(
1385                    String::from_utf8_lossy(key.key()),
1386                    leader_pg_election.election_key()
1387                );
1388                assert_eq!(key.lease_id(), i64::default());
1389                assert_eq!(key.revision(), i64::default());
1390            }
1391            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1392        }
1393
1394        // Step 6: Re-campaign and elected.
1395        let res = leader_pg_election
1396            .pg_client
1397            .read()
1398            .await
1399            .query(&leader_pg_election.sql_set.campaign, &[])
1400            .await
1401            .unwrap();
1402        let res: bool = res[0].get(0);
1403        assert!(res);
1404        leader_pg_election.leader_action().await.unwrap();
1405        let lease = leader_pg_election
1406            .get_value_with_lease(&leader_pg_election.election_key())
1407            .await
1408            .unwrap()
1409            .unwrap();
1410        assert!(lease.leader_value == leader_value);
1411        assert!(lease.expire_time > lease.current);
1412        assert!(leader_pg_election.is_leader());
1413
1414        match rx.recv().await {
1415            Ok(LeaderChangeMessage::Elected(key)) => {
1416                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1417                assert_eq!(
1418                    String::from_utf8_lossy(key.key()),
1419                    leader_pg_election.election_key()
1420                );
1421                assert_eq!(key.lease_id(), i64::default());
1422                assert_eq!(key.revision(), i64::default());
1423            }
1424            _ => panic!("Expected LeaderChangeMessage::Elected"),
1425        }
1426
1427        // Step 7: Something wrong, the leader key changed by others.
1428        let res = leader_pg_election
1429            .pg_client
1430            .read()
1431            .await
1432            .query(&leader_pg_election.sql_set.campaign, &[])
1433            .await
1434            .unwrap();
1435        let res: bool = res[0].get(0);
1436        assert!(res);
1437        leader_pg_election
1438            .delete_value(&leader_pg_election.election_key())
1439            .await
1440            .unwrap();
1441        leader_pg_election
1442            .put_value_with_lease(
1443                &leader_pg_election.election_key(),
1444                "test",
1445                Duration::from_secs(10),
1446            )
1447            .await
1448            .unwrap();
1449        leader_pg_election.leader_action().await.unwrap();
1450        let res = leader_pg_election
1451            .get_value_with_lease(&leader_pg_election.election_key())
1452            .await
1453            .unwrap();
1454        assert!(res.is_none());
1455        assert!(!leader_pg_election.is_leader());
1456
1457        match rx.recv().await {
1458            Ok(LeaderChangeMessage::StepDown(key)) => {
1459                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1460                assert_eq!(
1461                    String::from_utf8_lossy(key.key()),
1462                    leader_pg_election.election_key()
1463                );
1464                assert_eq!(key.lease_id(), i64::default());
1465                assert_eq!(key.revision(), i64::default());
1466            }
1467            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1468        }
1469
1470        // Clean up
1471        leader_pg_election
1472            .pg_client
1473            .read()
1474            .await
1475            .query(&leader_pg_election.sql_set.step_down, &[])
1476            .await
1477            .unwrap();
1478
1479        drop_table(&leader_pg_election, table_name).await;
1480    }
1481
1482    #[tokio::test]
1483    async fn test_follower_action() {
1484        maybe_skip_postgres_integration_test!();
1485        common_telemetry::init_default_ut_logging();
1486        let uuid = uuid::Uuid::new_v4().to_string();
1487        let table_name = "test_follower_action_greptime_metakv";
1488
1489        let candidate_lease_ttl = Duration::from_secs(5);
1490        let execution_timeout = Duration::from_secs(10);
1491        let statement_timeout = Duration::from_secs(10);
1492        let meta_lease_ttl = Duration::from_secs(2);
1493        let idle_session_timeout = Duration::from_secs(0);
1494        let follower_client = create_postgres_client(
1495            Some(table_name),
1496            execution_timeout,
1497            idle_session_timeout,
1498            statement_timeout,
1499        )
1500        .await
1501        .unwrap();
1502        let (tx, mut rx) = broadcast::channel(100);
1503        let follower_pg_election = PgElection {
1504            leader_value: "test_follower".to_string(),
1505            pg_client: RwLock::new(follower_client),
1506            is_leader: AtomicBool::new(false),
1507            leader_infancy: AtomicBool::new(true),
1508            leader_watcher: tx,
1509            store_key_prefix: uuid.clone(),
1510            candidate_lease_ttl,
1511            meta_lease_ttl,
1512            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1513        };
1514
1515        let leader_client = create_postgres_client(
1516            Some(table_name),
1517            execution_timeout,
1518            idle_session_timeout,
1519            statement_timeout,
1520        )
1521        .await
1522        .unwrap();
1523        let (tx, _) = broadcast::channel(100);
1524        let leader_pg_election = PgElection {
1525            leader_value: "test_leader".to_string(),
1526            pg_client: RwLock::new(leader_client),
1527            is_leader: AtomicBool::new(false),
1528            leader_infancy: AtomicBool::new(true),
1529            leader_watcher: tx,
1530            store_key_prefix: uuid,
1531            candidate_lease_ttl,
1532            meta_lease_ttl,
1533            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1534        };
1535
1536        leader_pg_election
1537            .pg_client
1538            .read()
1539            .await
1540            .query(&leader_pg_election.sql_set.campaign, &[])
1541            .await
1542            .unwrap();
1543        leader_pg_election.elected().await.unwrap();
1544
1545        // Step 1: As a follower, the leader exists and the lease is not expired.
1546        follower_pg_election.follower_action().await.unwrap();
1547
1548        // Step 2: As a follower, the leader exists but the lease expired.
1549        tokio::time::sleep(Duration::from_secs(2)).await;
1550        assert!(follower_pg_election.follower_action().await.is_err());
1551
1552        // Step 3: As a follower, the leader does not exist.
1553        leader_pg_election
1554            .delete_value(&leader_pg_election.election_key())
1555            .await
1556            .unwrap();
1557        assert!(follower_pg_election.follower_action().await.is_err());
1558
1559        // Step 4: Follower thinks it's the leader but failed to acquire the lock.
1560        follower_pg_election
1561            .is_leader
1562            .store(true, Ordering::Relaxed);
1563        assert!(follower_pg_election.follower_action().await.is_err());
1564
1565        match rx.recv().await {
1566            Ok(LeaderChangeMessage::StepDown(key)) => {
1567                assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1568                assert_eq!(
1569                    String::from_utf8_lossy(key.key()),
1570                    follower_pg_election.election_key()
1571                );
1572                assert_eq!(key.lease_id(), i64::default());
1573                assert_eq!(key.revision(), i64::default());
1574            }
1575            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1576        }
1577
1578        // Clean up
1579        leader_pg_election
1580            .pg_client
1581            .read()
1582            .await
1583            .query(&leader_pg_election.sql_set.step_down, &[])
1584            .await
1585            .unwrap();
1586
1587        drop_table(&follower_pg_election, table_name).await;
1588    }
1589
1590    #[tokio::test]
1591    async fn test_reset_campaign() {
1592        maybe_skip_postgres_integration_test!();
1593        let leader_value = "test_leader".to_string();
1594        let uuid = uuid::Uuid::new_v4().to_string();
1595        let table_name = "test_reset_campaign_greptime_metakv";
1596        let candidate_lease_ttl = Duration::from_secs(5);
1597        let execution_timeout = Duration::from_secs(10);
1598        let statement_timeout = Duration::from_secs(10);
1599        let meta_lease_ttl = Duration::from_secs(2);
1600        let idle_session_timeout = Duration::from_secs(0);
1601        let client = create_postgres_client(
1602            Some(table_name),
1603            execution_timeout,
1604            idle_session_timeout,
1605            statement_timeout,
1606        )
1607        .await
1608        .unwrap();
1609
1610        let (tx, _) = broadcast::channel(100);
1611        let leader_pg_election = PgElection {
1612            leader_value,
1613            pg_client: RwLock::new(client),
1614            is_leader: AtomicBool::new(false),
1615            leader_infancy: AtomicBool::new(true),
1616            leader_watcher: tx,
1617            store_key_prefix: uuid,
1618            candidate_lease_ttl,
1619            meta_lease_ttl,
1620            sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1621        };
1622        leader_pg_election.is_leader.store(true, Ordering::Relaxed);
1623        leader_pg_election.reset_campaign().await;
1624        assert!(!leader_pg_election.is_leader());
1625        drop_table(&leader_pg_election, table_name).await;
1626    }
1627
1628    #[tokio::test]
1629    async fn test_idle_session_timeout() {
1630        maybe_skip_postgres_integration_test!();
1631        common_telemetry::init_default_ut_logging();
1632        let execution_timeout = Duration::from_secs(10);
1633        let statement_timeout = Duration::from_secs(10);
1634        let idle_session_timeout = Duration::from_secs(1);
1635        let mut client = create_postgres_client(
1636            None,
1637            execution_timeout,
1638            idle_session_timeout,
1639            statement_timeout,
1640        )
1641        .await
1642        .unwrap();
1643        tokio::time::sleep(Duration::from_millis(1100)).await;
1644        // Wait for the idle session timeout.
1645        let err = client.query("SELECT 1", &[]).await.unwrap_err();
1646        assert_matches!(err, error::Error::PostgresExecution { .. });
1647        let error::Error::PostgresExecution { error, .. } = err else {
1648            panic!("Expected PostgresExecution error");
1649        };
1650        assert!(error.is_closed());
1651        // Reset the client and try again.
1652        client.reset_client().await.unwrap();
1653        let _ = client.query("SELECT 1", &[]).await.unwrap();
1654    }
1655
1656    #[test]
1657    fn test_election_sql_with_schema() {
1658        let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1659        let s = f.build();
1660        assert!(s.campaign.contains("pg_try_advisory_lock"));
1661        assert!(
1662            s.put_value_with_lease
1663                .contains("\"test_schema\".\"greptime_metakv\"")
1664        );
1665        assert!(
1666            s.update_value_with_lease
1667                .contains("\"test_schema\".\"greptime_metakv\"")
1668        );
1669        assert!(
1670            s.get_value_with_lease
1671                .contains("\"test_schema\".\"greptime_metakv\"")
1672        );
1673        assert!(
1674            s.get_value_with_lease_by_prefix
1675                .contains("\"test_schema\".\"greptime_metakv\"")
1676        );
1677        assert!(
1678            s.delete_value
1679                .contains("\"test_schema\".\"greptime_metakv\"")
1680        );
1681    }
1682}