meta_srv/election/rds/
postgres.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, warn};
21use common_time::Timestamp;
22use deadpool_postgres::{Manager, Pool};
23use snafu::{OptionExt, ResultExt, ensure};
24use tokio::sync::{RwLock, broadcast};
25use tokio::time::MissedTickBehavior;
26use tokio_postgres::Row;
27use tokio_postgres::types::ToSql;
28
29use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
30use crate::election::{
31    Election, LeaderChangeMessage, listen_leader_change, send_leader_change_and_set_flags,
32};
33use crate::error::{
34    DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35    Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40    lock_id: u64,
41    schema_name: Option<&'a str>,
42    table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46    campaign: String,
47    step_down: String,
48    // SQL to put a value with expire time.
49    //
50    // Parameters for the query:
51    // `$1`: key,
52    // `$2`: value,
53    // `$3`: lease time in seconds
54    //
55    // Returns:
56    // If the key already exists, return the previous value.
57    put_value_with_lease: String,
58    // SQL to update a value with expire time.
59    //
60    // Parameters for the query:
61    // `$1`: key,
62    // `$2`: previous value,
63    // `$3`: updated value,
64    // `$4`: lease time in seconds
65    update_value_with_lease: String,
66    // SQL to get a value with expire time.
67    //
68    // Parameters:
69    // `$1`: key
70    get_value_with_lease: String,
71    // SQL to get all values with expire time with the given key prefix.
72    //
73    // Parameters:
74    // `$1`: key prefix like 'prefix%'
75    //
76    // Returns:
77    // column 0: value,
78    // column 1: current timestamp
79    get_value_with_lease_by_prefix: String,
80    // SQL to delete a value.
81    //
82    // Parameters:
83    // `$1`: key
84    //
85    // Returns:
86    // column 0: key deleted,
87    // column 1: value deleted
88    delete_value: String,
89}
90
91impl<'a> ElectionSqlFactory<'a> {
92    fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
93        Self {
94            lock_id,
95            schema_name,
96            table_name,
97        }
98    }
99
100    fn table_ident(&self) -> String {
101        match self.schema_name {
102            Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
103            _ => format!("\"{}\"", self.table_name),
104        }
105    }
106
107    fn build(self) -> ElectionSqlSet {
108        ElectionSqlSet {
109            campaign: self.campaign_sql(),
110            step_down: self.step_down_sql(),
111            put_value_with_lease: self.put_value_with_lease_sql(),
112            update_value_with_lease: self.update_value_with_lease_sql(),
113            get_value_with_lease: self.get_value_with_lease_sql(),
114            get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
115            delete_value: self.delete_value_sql(),
116        }
117    }
118
119    fn campaign_sql(&self) -> String {
120        format!("SELECT pg_try_advisory_lock({})", self.lock_id)
121    }
122
123    fn step_down_sql(&self) -> String {
124        format!("SELECT pg_advisory_unlock({})", self.lock_id)
125    }
126
127    fn put_value_with_lease_sql(&self) -> String {
128        let table = self.table_ident();
129        format!(
130            r#"WITH prev AS (
131                SELECT k, v FROM {table} WHERE k = $1
132            ), insert AS (
133                INSERT INTO {table}
134                VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
135                ON CONFLICT (k) DO NOTHING
136            )
137            SELECT k, v FROM prev;
138            "#,
139            table = table,
140            lease_sep = LEASE_SEP
141        )
142    }
143
144    fn update_value_with_lease_sql(&self) -> String {
145        let table = self.table_ident();
146        format!(
147            r#"UPDATE {table}
148               SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
149               WHERE k = $1 AND v = $2"#,
150            table = table,
151            lease_sep = LEASE_SEP
152        )
153    }
154
155    fn get_value_with_lease_sql(&self) -> String {
156        let table = self.table_ident();
157        format!(
158            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
159            table = table
160        )
161    }
162
163    fn get_value_with_lease_by_prefix_sql(&self) -> String {
164        let table = self.table_ident();
165        format!(
166            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
167            table = table
168        )
169    }
170
171    fn delete_value_sql(&self) -> String {
172        let table = self.table_ident();
173        format!(
174            "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
175            table = table
176        )
177    }
178}
179
180/// PgClient for election.
181pub struct ElectionPgClient {
182    current: Option<deadpool::managed::Object<Manager>>,
183    pool: Pool,
184    /// The client-side timeout for statement execution.
185    ///
186    /// This timeout is enforced by the client application and is independent of any server-side timeouts.
187    /// If a statement takes longer than this duration to execute, the client will abort the operation.
188    execution_timeout: Duration,
189
190    /// The idle session timeout.
191    ///
192    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
193    /// If a session remains idle for longer than this duration, the server will terminate it.
194    idle_session_timeout: Duration,
195
196    /// The statement timeout.
197    ///
198    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
199    /// If a statement takes longer than this duration to execute, the server will abort it.
200    statement_timeout: Duration,
201}
202
203impl ElectionPgClient {
204    pub fn new(
205        pool: Pool,
206        execution_timeout: Duration,
207        idle_session_timeout: Duration,
208        statement_timeout: Duration,
209    ) -> Result<ElectionPgClient> {
210        Ok(ElectionPgClient {
211            current: None,
212            pool,
213            execution_timeout,
214            idle_session_timeout,
215            statement_timeout,
216        })
217    }
218
219    fn set_idle_session_timeout_sql(&self) -> String {
220        format!(
221            "SET idle_session_timeout = '{}s';",
222            self.idle_session_timeout.as_secs()
223        )
224    }
225
226    fn set_statement_timeout_sql(&self) -> String {
227        format!(
228            "SET statement_timeout = '{}s';",
229            self.statement_timeout.as_secs()
230        )
231    }
232
233    async fn reset_client(&mut self) -> Result<()> {
234        self.current = None;
235        self.maybe_init_client().await
236    }
237
238    async fn maybe_init_client(&mut self) -> Result<()> {
239        if self.current.is_none() {
240            let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
241
242            self.current = Some(client);
243            // Set idle session timeout and statement timeout.
244            let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
245            self.execute(&idle_session_timeout_sql, &[]).await?;
246            let statement_timeout_sql = self.set_statement_timeout_sql();
247            self.execute(&statement_timeout_sql, &[]).await?;
248        }
249
250        Ok(())
251    }
252
253    /// Returns the result of the query.
254    ///
255    /// # Panics
256    /// if `current` is `None`.
257    async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
258        let result = tokio::time::timeout(
259            self.execution_timeout,
260            self.current.as_ref().unwrap().execute(sql, params),
261        )
262        .await
263        .map_err(|_| {
264            SqlExecutionTimeoutSnafu {
265                sql: sql.to_string(),
266                duration: self.execution_timeout,
267            }
268            .build()
269        })?;
270
271        result.context(PostgresExecutionSnafu { sql })
272    }
273
274    /// Returns the result of the query.
275    ///
276    /// # Panics
277    /// if `current` is `None`.
278    async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
279        let result = tokio::time::timeout(
280            self.execution_timeout,
281            self.current.as_ref().unwrap().query(sql, params),
282        )
283        .await
284        .map_err(|_| {
285            SqlExecutionTimeoutSnafu {
286                sql: sql.to_string(),
287                duration: self.execution_timeout,
288            }
289            .build()
290        })?;
291
292        result.context(PostgresExecutionSnafu { sql })
293    }
294}
295
296/// PostgreSql implementation of Election.
297pub struct PgElection {
298    leader_value: String,
299    pg_client: RwLock<ElectionPgClient>,
300    is_leader: AtomicBool,
301    leader_infancy: AtomicBool,
302    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
303    store_key_prefix: String,
304    candidate_lease_ttl: Duration,
305    meta_lease_ttl: Duration,
306    sql_set: ElectionSqlSet,
307}
308
309impl PgElection {
310    async fn maybe_init_client(&self) -> Result<()> {
311        if self.pg_client.read().await.current.is_none() {
312            self.pg_client.write().await.maybe_init_client().await?;
313        }
314
315        Ok(())
316    }
317
318    #[allow(clippy::too_many_arguments)]
319    pub async fn with_pg_client(
320        leader_value: String,
321        pg_client: ElectionPgClient,
322        store_key_prefix: String,
323        candidate_lease_ttl: Duration,
324        meta_lease_ttl: Duration,
325        schema_name: Option<&str>,
326        table_name: &str,
327        lock_id: u64,
328    ) -> Result<ElectionRef> {
329        if let Some(s) = schema_name {
330            common_telemetry::info!("PgElection uses schema: {}", s);
331        } else {
332            common_telemetry::info!("PgElection uses default search_path (no schema provided)");
333        }
334        let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
335
336        let tx = listen_leader_change(leader_value.clone());
337        Ok(Arc::new(Self {
338            leader_value,
339            pg_client: RwLock::new(pg_client),
340            is_leader: AtomicBool::new(false),
341            leader_infancy: AtomicBool::new(false),
342            leader_watcher: tx,
343            store_key_prefix,
344            candidate_lease_ttl,
345            meta_lease_ttl,
346            sql_set: sql_factory.build(),
347        }))
348    }
349
350    fn election_key(&self) -> String {
351        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
352    }
353
354    fn candidate_root(&self) -> String {
355        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
356    }
357
358    fn candidate_key(&self) -> String {
359        format!("{}{}", self.candidate_root(), self.leader_value)
360    }
361}
362
363#[async_trait::async_trait]
364impl Election for PgElection {
365    type Leader = LeaderValue;
366
367    fn is_leader(&self) -> bool {
368        self.is_leader.load(Ordering::Relaxed)
369    }
370
371    fn in_leader_infancy(&self) -> bool {
372        self.leader_infancy
373            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
374            .is_ok()
375    }
376
377    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
378        let key = self.candidate_key();
379        let node_info =
380            serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
381                input: format!("{node_info:?}"),
382            })?;
383        let res = self
384            .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
385            .await?;
386        // May registered before, just update the lease.
387        if !res {
388            self.delete_value(&key).await?;
389            self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
390                .await?;
391        }
392
393        // Check if the current lease has expired and renew the lease.
394        let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
395        loop {
396            let _ = keep_alive_interval.tick().await;
397
398            let lease = self
399                .get_value_with_lease(&key)
400                .await?
401                .context(UnexpectedSnafu {
402                    violated: format!("Failed to get lease for key: {:?}", key),
403                })?;
404
405            ensure!(
406                lease.expire_time > lease.current,
407                UnexpectedSnafu {
408                    violated: format!(
409                        "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
410                        lease.expire_time, lease.current, key
411                    ),
412                }
413            );
414
415            // Safety: origin is Some since we are using `get_value_with_lease` with `true`.
416            self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
417                .await?;
418        }
419    }
420
421    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
422        let key_prefix = self.candidate_root();
423        let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
424        // Remove expired candidates
425        candidates.retain(|c| c.1 > current);
426        let mut valid_candidates = Vec::with_capacity(candidates.len());
427        for (c, _) in candidates {
428            let node_info: MetasrvNodeInfo =
429                serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
430                    input: format!("{:?}", c),
431                })?;
432            valid_candidates.push(node_info);
433        }
434        Ok(valid_candidates)
435    }
436
437    /// Attempts to acquire leadership by executing a campaign. This function continuously checks
438    /// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
439    ///
440    /// The function operates in a loop, where it:
441    ///
442    /// 1. Waits for a predefined interval before attempting to acquire the lock again.
443    /// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock.
444    /// 3. Checks the result of the query:
445    ///    - If the lock is successfully acquired (result is true), it calls the `leader_action` method
446    ///      to perform actions as the leader.
447    ///    - If the lock is not acquired (result is false), it calls the `follower_action` method
448    ///      to perform actions as a follower.
449    async fn campaign(&self) -> Result<()> {
450        let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
451        keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
452
453        self.maybe_init_client().await?;
454        loop {
455            let res = self
456                .pg_client
457                .read()
458                .await
459                .query(&self.sql_set.campaign, &[])
460                .await?;
461            let row = res.first().context(UnexpectedSnafu {
462                violated: "Failed to get the result of acquiring advisory lock",
463            })?;
464            let is_leader = row.try_get(0).map_err(|_| {
465                UnexpectedSnafu {
466                    violated: "Failed to get the result of get lock",
467                }
468                .build()
469            })?;
470            if is_leader {
471                self.leader_action().await?;
472            } else {
473                self.follower_action().await?;
474            }
475            let _ = keep_alive_interval.tick().await;
476        }
477    }
478
479    async fn reset_campaign(&self) {
480        if let Err(err) = self.pg_client.write().await.reset_client().await {
481            error!(err; "Failed to reset client");
482        }
483    }
484
485    async fn leader(&self) -> Result<Self::Leader> {
486        if self.is_leader.load(Ordering::Relaxed) {
487            Ok(self.leader_value.as_bytes().into())
488        } else {
489            let key = self.election_key();
490            if let Some(lease) = self.get_value_with_lease(&key).await? {
491                ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
492                Ok(lease.leader_value.as_bytes().into())
493            } else {
494                NoLeaderSnafu.fail()
495            }
496        }
497    }
498
499    async fn resign(&self) -> Result<()> {
500        todo!()
501    }
502
503    fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
504        self.leader_watcher.subscribe()
505    }
506}
507
508impl PgElection {
509    /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
510    async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
511        let key = key.as_bytes();
512        self.maybe_init_client().await?;
513        let res = self
514            .pg_client
515            .read()
516            .await
517            .query(&self.sql_set.get_value_with_lease, &[&key])
518            .await?;
519
520        if res.is_empty() {
521            Ok(None)
522        } else {
523            // Safety: Checked if res is empty above.
524            let current_time_str = res[0].try_get(1).unwrap_or_default();
525            let current_time = match Timestamp::from_str(current_time_str, None) {
526                Ok(ts) => ts,
527                Err(_) => UnexpectedSnafu {
528                    violated: format!("Invalid timestamp: {}", current_time_str),
529                }
530                .fail()?,
531            };
532            // Safety: Checked if res is empty above.
533            let value_and_expire_time =
534                String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
535            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
536
537            Ok(Some(Lease {
538                leader_value: value,
539                expire_time,
540                current: current_time,
541                origin: value_and_expire_time.to_string(),
542            }))
543        }
544    }
545
546    /// Returns all values and expire time with the given key prefix. Also returns the current time.
547    async fn get_value_with_lease_by_prefix(
548        &self,
549        key_prefix: &str,
550    ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
551        let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
552        self.maybe_init_client().await?;
553        let res = self
554            .pg_client
555            .read()
556            .await
557            .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
558            .await?;
559
560        let mut values_with_leases = vec![];
561        let mut current = Timestamp::default();
562        for row in res {
563            let current_time_str = row.try_get(1).unwrap_or_default();
564            current = match Timestamp::from_str(current_time_str, None) {
565                Ok(ts) => ts,
566                Err(_) => UnexpectedSnafu {
567                    violated: format!("Invalid timestamp: {}", current_time_str),
568                }
569                .fail()?,
570            };
571
572            let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
573            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
574
575            values_with_leases.push((value, expire_time));
576        }
577        Ok((values_with_leases, current))
578    }
579
580    async fn update_value_with_lease(
581        &self,
582        key: &str,
583        prev: &str,
584        updated: &str,
585        lease_ttl: Duration,
586    ) -> Result<()> {
587        let key = key.as_bytes();
588        let prev = prev.as_bytes();
589        self.maybe_init_client().await?;
590        let lease_ttl_secs = lease_ttl.as_secs() as f64;
591        let res = self
592            .pg_client
593            .read()
594            .await
595            .execute(
596                &self.sql_set.update_value_with_lease,
597                &[&key, &prev, &updated, &lease_ttl_secs],
598            )
599            .await?;
600
601        ensure!(
602            res == 1,
603            UnexpectedSnafu {
604                violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
605            }
606        );
607
608        Ok(())
609    }
610
611    /// Returns `true` if the insertion is successful
612    async fn put_value_with_lease(
613        &self,
614        key: &str,
615        value: &str,
616        lease_ttl: Duration,
617    ) -> Result<bool> {
618        let key = key.as_bytes();
619        let lease_ttl_secs = lease_ttl.as_secs() as f64;
620        let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
621        self.maybe_init_client().await?;
622        let res = self
623            .pg_client
624            .read()
625            .await
626            .query(&self.sql_set.put_value_with_lease, &params)
627            .await?;
628        Ok(res.is_empty())
629    }
630
631    /// Returns `true` if the deletion is successful.
632    /// Caution: Should only delete the key if the lease is expired.
633    async fn delete_value(&self, key: &str) -> Result<bool> {
634        let key = key.as_bytes();
635        self.maybe_init_client().await?;
636        let res = self
637            .pg_client
638            .read()
639            .await
640            .query(&self.sql_set.delete_value, &[&key])
641            .await?;
642
643        Ok(res.len() == 1)
644    }
645
646    /// Handles the actions of a leader in the election process.
647    ///
648    /// This function performs the following checks and actions:
649    ///
650    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
651    ///   it attempts to renew the lease. It checks if the lease is still valid and either renews it
652    ///   or steps down if it has expired.
653    ///
654    ///   - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease
655    ///     by updating the value associated with the election key.
656    ///   - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning
657    ///     and steps down, initiating a new campaign for leadership.
658    ///   - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning
659    ///     indicating that it still holds the lock and steps down to re-initiate the campaign. This may
660    ///     happen if the leader has failed to renew the lease and the session has expired, and recovery
661    ///     after a period of time during which other leaders have been elected and stepped down.
662    ///   - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
663    ///
664    /// - **Case 2**: If the current instance is not leader previously, it calls the `elected` method
665    ///   as a newly elected leader.
666    async fn leader_action(&self) -> Result<()> {
667        let key = self.election_key();
668        // Case 1
669        if self.is_leader() {
670            match self.get_value_with_lease(&key).await? {
671                Some(lease) => {
672                    match (
673                        lease.leader_value == self.leader_value,
674                        lease.expire_time > lease.current,
675                    ) {
676                        // Case 1.1
677                        (true, true) => {
678                            // Safety: prev is Some since we are using `get_value_with_lease` with `true`.
679                            self.update_value_with_lease(
680                                &key,
681                                &lease.origin,
682                                &self.leader_value,
683                                self.meta_lease_ttl,
684                            )
685                            .await?;
686                        }
687                        // Case 1.2
688                        (true, false) => {
689                            warn!("Leader lease expired, now stepping down.");
690                            self.step_down().await?;
691                        }
692                        // Case 1.3
693                        (false, _) => {
694                            warn!(
695                                "Leader lease not found, but still hold the lock. Now stepping down."
696                            );
697                            self.step_down().await?;
698                        }
699                    }
700                }
701                // Case 1.4
702                None => {
703                    warn!("Leader lease not found, but still hold the lock. Now stepping down.");
704                    self.step_down().await?;
705                }
706            }
707        // Case 2
708        } else {
709            self.elected().await?;
710        }
711        Ok(())
712    }
713
714    /// Handles the actions of a follower in the election process.
715    ///
716    /// This function performs the following checks and actions:
717    ///
718    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
719    ///   it steps down without deleting the key.
720    /// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error
721    ///   to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
722    ///   will be released.
723    /// - **Case 3**: If all checks pass, the function returns without performing any actions.
724    async fn follower_action(&self) -> Result<()> {
725        let key = self.election_key();
726        // Case 1
727        if self.is_leader() {
728            self.step_down_without_lock().await?;
729        }
730        let lease = self
731            .get_value_with_lease(&key)
732            .await?
733            .context(NoLeaderSnafu)?;
734        // Case 2
735        ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
736        // Case 3
737        Ok(())
738    }
739
740    /// Step down the leader. The leader should delete the key and notify the leader watcher.
741    ///
742    /// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected.
743    ///
744    /// ## Caution:
745    /// Should only step down while holding the advisory lock.
746    async fn step_down(&self) -> Result<()> {
747        let key = self.election_key();
748        let leader_key = RdsLeaderKey {
749            name: self.leader_value.clone().into_bytes(),
750            key: key.clone().into_bytes(),
751            ..Default::default()
752        };
753        self.delete_value(&key).await?;
754        self.maybe_init_client().await?;
755        self.pg_client
756            .read()
757            .await
758            .query(&self.sql_set.step_down, &[])
759            .await?;
760        send_leader_change_and_set_flags(
761            &self.is_leader,
762            &self.leader_infancy,
763            &self.leader_watcher,
764            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
765        );
766        Ok(())
767    }
768
769    /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
770    async fn step_down_without_lock(&self) -> Result<()> {
771        let key = self.election_key().into_bytes();
772        let leader_key = RdsLeaderKey {
773            name: self.leader_value.clone().into_bytes(),
774            key: key.clone(),
775            ..Default::default()
776        };
777        if self
778            .is_leader
779            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
780            .is_ok()
781            && let Err(e) = self
782                .leader_watcher
783                .send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
784        {
785            error!(e; "Failed to send leader change message");
786        }
787        Ok(())
788    }
789
790    /// Elected as leader. The leader should put the key and notify the leader watcher.
791    /// Caution: Should only elected while holding the advisory lock.
792    async fn elected(&self) -> Result<()> {
793        let key = self.election_key();
794        let leader_key = RdsLeaderKey {
795            name: self.leader_value.clone().into_bytes(),
796            key: key.clone().into_bytes(),
797            ..Default::default()
798        };
799        self.delete_value(&key).await?;
800        self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
801            .await?;
802
803        if self
804            .is_leader
805            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
806            .is_ok()
807        {
808            self.leader_infancy.store(true, Ordering::Release);
809
810            if let Err(e) = self
811                .leader_watcher
812                .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
813            {
814                error!(e; "Failed to send leader change message");
815            }
816        }
817        Ok(())
818    }
819}
820
821#[cfg(test)]
822mod tests {
823    use std::assert_matches::assert_matches;
824    use std::env;
825
826    use common_meta::maybe_skip_postgres_integration_test;
827
828    use super::*;
829    use crate::error;
830    use crate::utils::postgres::create_postgres_pool;
831
832    async fn create_postgres_client(
833        table_name: Option<&str>,
834        execution_timeout: Duration,
835        idle_session_timeout: Duration,
836        statement_timeout: Duration,
837    ) -> Result<ElectionPgClient> {
838        let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
839        if endpoint.is_empty() {
840            return UnexpectedSnafu {
841                violated: "Postgres endpoint is empty".to_string(),
842            }
843            .fail();
844        }
845        let pool = create_postgres_pool(&[endpoint], None, None).await.unwrap();
846        let mut pg_client = ElectionPgClient::new(
847            pool,
848            execution_timeout,
849            idle_session_timeout,
850            statement_timeout,
851        )
852        .unwrap();
853        pg_client.maybe_init_client().await?;
854        if let Some(table_name) = table_name {
855            let create_table_sql = format!(
856                "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
857                table_name
858            );
859            pg_client.execute(&create_table_sql, &[]).await?;
860        }
861        Ok(pg_client)
862    }
863
864    async fn drop_table(pg_election: &PgElection, table_name: &str) {
865        let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
866        pg_election
867            .pg_client
868            .read()
869            .await
870            .execute(&sql, &[])
871            .await
872            .unwrap();
873    }
874
875    #[tokio::test]
876    async fn test_postgres_crud() {
877        maybe_skip_postgres_integration_test!();
878        let key = "test_key".to_string();
879        let value = "test_value".to_string();
880
881        let uuid = uuid::Uuid::new_v4().to_string();
882        let table_name = "test_postgres_crud_greptime_metakv";
883        let candidate_lease_ttl = Duration::from_secs(10);
884        let execution_timeout = Duration::from_secs(10);
885        let statement_timeout = Duration::from_secs(10);
886        let meta_lease_ttl = Duration::from_secs(2);
887        let idle_session_timeout = Duration::from_secs(0);
888        let client = create_postgres_client(
889            Some(table_name),
890            execution_timeout,
891            idle_session_timeout,
892            statement_timeout,
893        )
894        .await
895        .unwrap();
896
897        let (tx, _) = broadcast::channel(100);
898        let pg_election = PgElection {
899            leader_value: "test_leader".to_string(),
900            pg_client: RwLock::new(client),
901            is_leader: AtomicBool::new(false),
902            leader_infancy: AtomicBool::new(true),
903            leader_watcher: tx,
904            store_key_prefix: uuid,
905            candidate_lease_ttl,
906            meta_lease_ttl,
907            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
908        };
909
910        let res = pg_election
911            .put_value_with_lease(&key, &value, candidate_lease_ttl)
912            .await
913            .unwrap();
914        assert!(res);
915
916        let lease = pg_election
917            .get_value_with_lease(&key)
918            .await
919            .unwrap()
920            .unwrap();
921        assert_eq!(lease.leader_value, value);
922
923        pg_election
924            .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
925            .await
926            .unwrap();
927
928        let res = pg_election.delete_value(&key).await.unwrap();
929        assert!(res);
930
931        let res = pg_election.get_value_with_lease(&key).await.unwrap();
932        assert!(res.is_none());
933
934        for i in 0..10 {
935            let key = format!("test_key_{}", i);
936            let value = format!("test_value_{}", i);
937            pg_election
938                .put_value_with_lease(&key, &value, candidate_lease_ttl)
939                .await
940                .unwrap();
941        }
942
943        let key_prefix = "test_key".to_string();
944        let (res, _) = pg_election
945            .get_value_with_lease_by_prefix(&key_prefix)
946            .await
947            .unwrap();
948        assert_eq!(res.len(), 10);
949
950        for i in 0..10 {
951            let key = format!("test_key_{}", i);
952            let res = pg_election.delete_value(&key).await.unwrap();
953            assert!(res);
954        }
955
956        let (res, current) = pg_election
957            .get_value_with_lease_by_prefix(&key_prefix)
958            .await
959            .unwrap();
960        assert!(res.is_empty());
961        assert!(current == Timestamp::default());
962
963        drop_table(&pg_election, table_name).await;
964    }
965
966    async fn candidate(
967        leader_value: String,
968        candidate_lease_ttl: Duration,
969        store_key_prefix: String,
970        table_name: String,
971    ) {
972        let execution_timeout = Duration::from_secs(10);
973        let statement_timeout = Duration::from_secs(10);
974        let meta_lease_ttl = Duration::from_secs(2);
975        let idle_session_timeout = Duration::from_secs(0);
976        let client = create_postgres_client(
977            None,
978            execution_timeout,
979            idle_session_timeout,
980            statement_timeout,
981        )
982        .await
983        .unwrap();
984
985        let (tx, _) = broadcast::channel(100);
986        let pg_election = PgElection {
987            leader_value,
988            pg_client: RwLock::new(client),
989            is_leader: AtomicBool::new(false),
990            leader_infancy: AtomicBool::new(true),
991            leader_watcher: tx,
992            store_key_prefix,
993            candidate_lease_ttl,
994            meta_lease_ttl,
995            sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
996        };
997
998        let node_info = MetasrvNodeInfo {
999            addr: "test_addr".to_string(),
1000            version: "test_version".to_string(),
1001            git_commit: "test_git_commit".to_string(),
1002            start_time_ms: 0,
1003            total_cpu_millicores: 0,
1004            total_memory_bytes: 0,
1005            cpu_usage_millicores: 0,
1006            memory_usage_bytes: 0,
1007            hostname: "test_hostname".to_string(),
1008        };
1009        pg_election.register_candidate(&node_info).await.unwrap();
1010    }
1011
1012    #[tokio::test]
1013    async fn test_candidate_registration() {
1014        maybe_skip_postgres_integration_test!();
1015        let leader_value_prefix = "test_leader".to_string();
1016        let uuid = uuid::Uuid::new_v4().to_string();
1017        let table_name = "test_candidate_registration_greptime_metakv";
1018        let mut handles = vec![];
1019        let candidate_lease_ttl = Duration::from_secs(5);
1020        let execution_timeout = Duration::from_secs(10);
1021        let statement_timeout = Duration::from_secs(10);
1022        let meta_lease_ttl = Duration::from_secs(2);
1023        let idle_session_timeout = Duration::from_secs(0);
1024        let client = create_postgres_client(
1025            Some(table_name),
1026            execution_timeout,
1027            idle_session_timeout,
1028            statement_timeout,
1029        )
1030        .await
1031        .unwrap();
1032
1033        for i in 0..10 {
1034            let leader_value = format!("{}{}", leader_value_prefix, i);
1035            let handle = tokio::spawn(candidate(
1036                leader_value,
1037                candidate_lease_ttl,
1038                uuid.clone(),
1039                table_name.to_string(),
1040            ));
1041            handles.push(handle);
1042        }
1043        // Wait for candidates to register themselves and renew their leases at least once.
1044        tokio::time::sleep(Duration::from_secs(3)).await;
1045
1046        let (tx, _) = broadcast::channel(100);
1047        let leader_value = "test_leader".to_string();
1048        let pg_election = PgElection {
1049            leader_value,
1050            pg_client: RwLock::new(client),
1051            is_leader: AtomicBool::new(false),
1052            leader_infancy: AtomicBool::new(true),
1053            leader_watcher: tx,
1054            store_key_prefix: uuid.clone(),
1055            candidate_lease_ttl,
1056            meta_lease_ttl,
1057            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1058        };
1059
1060        let candidates = pg_election.all_candidates().await.unwrap();
1061        assert_eq!(candidates.len(), 10);
1062
1063        for handle in handles {
1064            handle.abort();
1065        }
1066
1067        // Wait for the candidate leases to expire.
1068        tokio::time::sleep(Duration::from_secs(5)).await;
1069        let candidates = pg_election.all_candidates().await.unwrap();
1070        assert!(candidates.is_empty());
1071
1072        // Garbage collection
1073        for i in 0..10 {
1074            let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1075            let res = pg_election.delete_value(&key).await.unwrap();
1076            assert!(res);
1077        }
1078
1079        drop_table(&pg_election, table_name).await;
1080    }
1081
1082    #[tokio::test]
1083    async fn test_elected_and_step_down() {
1084        maybe_skip_postgres_integration_test!();
1085        let leader_value = "test_leader".to_string();
1086        let uuid = uuid::Uuid::new_v4().to_string();
1087        let table_name = "test_elected_and_step_down_greptime_metakv";
1088        let candidate_lease_ttl = Duration::from_secs(5);
1089        let execution_timeout = Duration::from_secs(10);
1090        let statement_timeout = Duration::from_secs(10);
1091        let meta_lease_ttl = Duration::from_secs(2);
1092        let idle_session_timeout = Duration::from_secs(0);
1093        let client = create_postgres_client(
1094            Some(table_name),
1095            execution_timeout,
1096            idle_session_timeout,
1097            statement_timeout,
1098        )
1099        .await
1100        .unwrap();
1101
1102        let (tx, mut rx) = broadcast::channel(100);
1103        let leader_pg_election = PgElection {
1104            leader_value: leader_value.clone(),
1105            pg_client: RwLock::new(client),
1106            is_leader: AtomicBool::new(false),
1107            leader_infancy: AtomicBool::new(true),
1108            leader_watcher: tx,
1109            store_key_prefix: uuid,
1110            candidate_lease_ttl,
1111            meta_lease_ttl,
1112            sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1113        };
1114
1115        leader_pg_election.elected().await.unwrap();
1116        let lease = leader_pg_election
1117            .get_value_with_lease(&leader_pg_election.election_key())
1118            .await
1119            .unwrap()
1120            .unwrap();
1121        assert!(lease.leader_value == leader_value);
1122        assert!(lease.expire_time > lease.current);
1123        assert!(leader_pg_election.is_leader());
1124
1125        match rx.recv().await {
1126            Ok(LeaderChangeMessage::Elected(key)) => {
1127                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1128                assert_eq!(
1129                    String::from_utf8_lossy(key.key()),
1130                    leader_pg_election.election_key()
1131                );
1132                assert_eq!(key.lease_id(), i64::default());
1133                assert_eq!(key.revision(), i64::default());
1134            }
1135            _ => panic!("Expected LeaderChangeMessage::Elected"),
1136        }
1137
1138        leader_pg_election.step_down_without_lock().await.unwrap();
1139        let lease = leader_pg_election
1140            .get_value_with_lease(&leader_pg_election.election_key())
1141            .await
1142            .unwrap()
1143            .unwrap();
1144        assert!(lease.leader_value == leader_value);
1145        assert!(!leader_pg_election.is_leader());
1146
1147        match rx.recv().await {
1148            Ok(LeaderChangeMessage::StepDown(key)) => {
1149                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1150                assert_eq!(
1151                    String::from_utf8_lossy(key.key()),
1152                    leader_pg_election.election_key()
1153                );
1154                assert_eq!(key.lease_id(), i64::default());
1155                assert_eq!(key.revision(), i64::default());
1156            }
1157            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1158        }
1159
1160        leader_pg_election.elected().await.unwrap();
1161        let lease = leader_pg_election
1162            .get_value_with_lease(&leader_pg_election.election_key())
1163            .await
1164            .unwrap()
1165            .unwrap();
1166        assert!(lease.leader_value == leader_value);
1167        assert!(lease.expire_time > lease.current);
1168        assert!(leader_pg_election.is_leader());
1169
1170        match rx.recv().await {
1171            Ok(LeaderChangeMessage::Elected(key)) => {
1172                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1173                assert_eq!(
1174                    String::from_utf8_lossy(key.key()),
1175                    leader_pg_election.election_key()
1176                );
1177                assert_eq!(key.lease_id(), i64::default());
1178                assert_eq!(key.revision(), i64::default());
1179            }
1180            _ => panic!("Expected LeaderChangeMessage::Elected"),
1181        }
1182
1183        leader_pg_election.step_down().await.unwrap();
1184        let res = leader_pg_election
1185            .get_value_with_lease(&leader_pg_election.election_key())
1186            .await
1187            .unwrap();
1188        assert!(res.is_none());
1189        assert!(!leader_pg_election.is_leader());
1190
1191        match rx.recv().await {
1192            Ok(LeaderChangeMessage::StepDown(key)) => {
1193                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1194                assert_eq!(
1195                    String::from_utf8_lossy(key.key()),
1196                    leader_pg_election.election_key()
1197                );
1198                assert_eq!(key.lease_id(), i64::default());
1199                assert_eq!(key.revision(), i64::default());
1200            }
1201            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1202        }
1203
1204        drop_table(&leader_pg_election, table_name).await;
1205    }
1206
1207    #[tokio::test]
1208    async fn test_leader_action() {
1209        maybe_skip_postgres_integration_test!();
1210        let leader_value = "test_leader".to_string();
1211        let uuid = uuid::Uuid::new_v4().to_string();
1212        let table_name = "test_leader_action_greptime_metakv";
1213        let candidate_lease_ttl = Duration::from_secs(5);
1214        let execution_timeout = Duration::from_secs(10);
1215        let statement_timeout = Duration::from_secs(10);
1216        let meta_lease_ttl = Duration::from_secs(2);
1217        let idle_session_timeout = Duration::from_secs(0);
1218        let client = create_postgres_client(
1219            Some(table_name),
1220            execution_timeout,
1221            idle_session_timeout,
1222            statement_timeout,
1223        )
1224        .await
1225        .unwrap();
1226
1227        let (tx, mut rx) = broadcast::channel(100);
1228        let leader_pg_election = PgElection {
1229            leader_value: leader_value.clone(),
1230            pg_client: RwLock::new(client),
1231            is_leader: AtomicBool::new(false),
1232            leader_infancy: AtomicBool::new(true),
1233            leader_watcher: tx,
1234            store_key_prefix: uuid,
1235            candidate_lease_ttl,
1236            meta_lease_ttl,
1237            sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1238        };
1239
1240        // Step 1: No leader exists, campaign and elected.
1241        let res = leader_pg_election
1242            .pg_client
1243            .read()
1244            .await
1245            .query(&leader_pg_election.sql_set.campaign, &[])
1246            .await
1247            .unwrap();
1248        let res: bool = res[0].get(0);
1249        assert!(res);
1250        leader_pg_election.leader_action().await.unwrap();
1251        let lease = leader_pg_election
1252            .get_value_with_lease(&leader_pg_election.election_key())
1253            .await
1254            .unwrap()
1255            .unwrap();
1256        assert!(lease.leader_value == leader_value);
1257        assert!(lease.expire_time > lease.current);
1258        assert!(leader_pg_election.is_leader());
1259
1260        match rx.recv().await {
1261            Ok(LeaderChangeMessage::Elected(key)) => {
1262                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1263                assert_eq!(
1264                    String::from_utf8_lossy(key.key()),
1265                    leader_pg_election.election_key()
1266                );
1267                assert_eq!(key.lease_id(), i64::default());
1268                assert_eq!(key.revision(), i64::default());
1269            }
1270            _ => panic!("Expected LeaderChangeMessage::Elected"),
1271        }
1272
1273        // Step 2: As a leader, renew the lease.
1274        let res = leader_pg_election
1275            .pg_client
1276            .read()
1277            .await
1278            .query(&leader_pg_election.sql_set.campaign, &[])
1279            .await
1280            .unwrap();
1281        let res: bool = res[0].get(0);
1282        assert!(res);
1283        leader_pg_election.leader_action().await.unwrap();
1284        let new_lease = leader_pg_election
1285            .get_value_with_lease(&leader_pg_election.election_key())
1286            .await
1287            .unwrap()
1288            .unwrap();
1289        assert!(new_lease.leader_value == leader_value);
1290        assert!(
1291            new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1292        );
1293        assert!(leader_pg_election.is_leader());
1294
1295        // Step 3: Something wrong, the leader lease expired.
1296        tokio::time::sleep(Duration::from_secs(2)).await;
1297
1298        let res = leader_pg_election
1299            .pg_client
1300            .read()
1301            .await
1302            .query(&leader_pg_election.sql_set.campaign, &[])
1303            .await
1304            .unwrap();
1305        let res: bool = res[0].get(0);
1306        assert!(res);
1307        leader_pg_election.leader_action().await.unwrap();
1308        let res = leader_pg_election
1309            .get_value_with_lease(&leader_pg_election.election_key())
1310            .await
1311            .unwrap();
1312        assert!(res.is_none());
1313
1314        match rx.recv().await {
1315            Ok(LeaderChangeMessage::StepDown(key)) => {
1316                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1317                assert_eq!(
1318                    String::from_utf8_lossy(key.key()),
1319                    leader_pg_election.election_key()
1320                );
1321                assert_eq!(key.lease_id(), i64::default());
1322                assert_eq!(key.revision(), i64::default());
1323            }
1324            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1325        }
1326
1327        // Step 4: Re-campaign and elected.
1328        let res = leader_pg_election
1329            .pg_client
1330            .read()
1331            .await
1332            .query(&leader_pg_election.sql_set.campaign, &[])
1333            .await
1334            .unwrap();
1335        let res: bool = res[0].get(0);
1336        assert!(res);
1337        leader_pg_election.leader_action().await.unwrap();
1338        let lease = leader_pg_election
1339            .get_value_with_lease(&leader_pg_election.election_key())
1340            .await
1341            .unwrap()
1342            .unwrap();
1343        assert!(lease.leader_value == leader_value);
1344        assert!(lease.expire_time > lease.current);
1345        assert!(leader_pg_election.is_leader());
1346
1347        match rx.recv().await {
1348            Ok(LeaderChangeMessage::Elected(key)) => {
1349                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1350                assert_eq!(
1351                    String::from_utf8_lossy(key.key()),
1352                    leader_pg_election.election_key()
1353                );
1354                assert_eq!(key.lease_id(), i64::default());
1355                assert_eq!(key.revision(), i64::default());
1356            }
1357            _ => panic!("Expected LeaderChangeMessage::Elected"),
1358        }
1359
1360        // Step 5: Something wrong, the leader key is deleted by other followers.
1361        leader_pg_election
1362            .delete_value(&leader_pg_election.election_key())
1363            .await
1364            .unwrap();
1365        leader_pg_election.leader_action().await.unwrap();
1366        let res = leader_pg_election
1367            .get_value_with_lease(&leader_pg_election.election_key())
1368            .await
1369            .unwrap();
1370        assert!(res.is_none());
1371        assert!(!leader_pg_election.is_leader());
1372
1373        match rx.recv().await {
1374            Ok(LeaderChangeMessage::StepDown(key)) => {
1375                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1376                assert_eq!(
1377                    String::from_utf8_lossy(key.key()),
1378                    leader_pg_election.election_key()
1379                );
1380                assert_eq!(key.lease_id(), i64::default());
1381                assert_eq!(key.revision(), i64::default());
1382            }
1383            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1384        }
1385
1386        // Step 6: Re-campaign and elected.
1387        let res = leader_pg_election
1388            .pg_client
1389            .read()
1390            .await
1391            .query(&leader_pg_election.sql_set.campaign, &[])
1392            .await
1393            .unwrap();
1394        let res: bool = res[0].get(0);
1395        assert!(res);
1396        leader_pg_election.leader_action().await.unwrap();
1397        let lease = leader_pg_election
1398            .get_value_with_lease(&leader_pg_election.election_key())
1399            .await
1400            .unwrap()
1401            .unwrap();
1402        assert!(lease.leader_value == leader_value);
1403        assert!(lease.expire_time > lease.current);
1404        assert!(leader_pg_election.is_leader());
1405
1406        match rx.recv().await {
1407            Ok(LeaderChangeMessage::Elected(key)) => {
1408                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1409                assert_eq!(
1410                    String::from_utf8_lossy(key.key()),
1411                    leader_pg_election.election_key()
1412                );
1413                assert_eq!(key.lease_id(), i64::default());
1414                assert_eq!(key.revision(), i64::default());
1415            }
1416            _ => panic!("Expected LeaderChangeMessage::Elected"),
1417        }
1418
1419        // Step 7: Something wrong, the leader key changed by others.
1420        let res = leader_pg_election
1421            .pg_client
1422            .read()
1423            .await
1424            .query(&leader_pg_election.sql_set.campaign, &[])
1425            .await
1426            .unwrap();
1427        let res: bool = res[0].get(0);
1428        assert!(res);
1429        leader_pg_election
1430            .delete_value(&leader_pg_election.election_key())
1431            .await
1432            .unwrap();
1433        leader_pg_election
1434            .put_value_with_lease(
1435                &leader_pg_election.election_key(),
1436                "test",
1437                Duration::from_secs(10),
1438            )
1439            .await
1440            .unwrap();
1441        leader_pg_election.leader_action().await.unwrap();
1442        let res = leader_pg_election
1443            .get_value_with_lease(&leader_pg_election.election_key())
1444            .await
1445            .unwrap();
1446        assert!(res.is_none());
1447        assert!(!leader_pg_election.is_leader());
1448
1449        match rx.recv().await {
1450            Ok(LeaderChangeMessage::StepDown(key)) => {
1451                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1452                assert_eq!(
1453                    String::from_utf8_lossy(key.key()),
1454                    leader_pg_election.election_key()
1455                );
1456                assert_eq!(key.lease_id(), i64::default());
1457                assert_eq!(key.revision(), i64::default());
1458            }
1459            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1460        }
1461
1462        // Clean up
1463        leader_pg_election
1464            .pg_client
1465            .read()
1466            .await
1467            .query(&leader_pg_election.sql_set.step_down, &[])
1468            .await
1469            .unwrap();
1470
1471        drop_table(&leader_pg_election, table_name).await;
1472    }
1473
1474    #[tokio::test]
1475    async fn test_follower_action() {
1476        maybe_skip_postgres_integration_test!();
1477        common_telemetry::init_default_ut_logging();
1478        let uuid = uuid::Uuid::new_v4().to_string();
1479        let table_name = "test_follower_action_greptime_metakv";
1480
1481        let candidate_lease_ttl = Duration::from_secs(5);
1482        let execution_timeout = Duration::from_secs(10);
1483        let statement_timeout = Duration::from_secs(10);
1484        let meta_lease_ttl = Duration::from_secs(2);
1485        let idle_session_timeout = Duration::from_secs(0);
1486        let follower_client = create_postgres_client(
1487            Some(table_name),
1488            execution_timeout,
1489            idle_session_timeout,
1490            statement_timeout,
1491        )
1492        .await
1493        .unwrap();
1494        let (tx, mut rx) = broadcast::channel(100);
1495        let follower_pg_election = PgElection {
1496            leader_value: "test_follower".to_string(),
1497            pg_client: RwLock::new(follower_client),
1498            is_leader: AtomicBool::new(false),
1499            leader_infancy: AtomicBool::new(true),
1500            leader_watcher: tx,
1501            store_key_prefix: uuid.clone(),
1502            candidate_lease_ttl,
1503            meta_lease_ttl,
1504            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1505        };
1506
1507        let leader_client = create_postgres_client(
1508            Some(table_name),
1509            execution_timeout,
1510            idle_session_timeout,
1511            statement_timeout,
1512        )
1513        .await
1514        .unwrap();
1515        let (tx, _) = broadcast::channel(100);
1516        let leader_pg_election = PgElection {
1517            leader_value: "test_leader".to_string(),
1518            pg_client: RwLock::new(leader_client),
1519            is_leader: AtomicBool::new(false),
1520            leader_infancy: AtomicBool::new(true),
1521            leader_watcher: tx,
1522            store_key_prefix: uuid,
1523            candidate_lease_ttl,
1524            meta_lease_ttl,
1525            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1526        };
1527
1528        leader_pg_election
1529            .pg_client
1530            .read()
1531            .await
1532            .query(&leader_pg_election.sql_set.campaign, &[])
1533            .await
1534            .unwrap();
1535        leader_pg_election.elected().await.unwrap();
1536
1537        // Step 1: As a follower, the leader exists and the lease is not expired.
1538        follower_pg_election.follower_action().await.unwrap();
1539
1540        // Step 2: As a follower, the leader exists but the lease expired.
1541        tokio::time::sleep(Duration::from_secs(2)).await;
1542        assert!(follower_pg_election.follower_action().await.is_err());
1543
1544        // Step 3: As a follower, the leader does not exist.
1545        leader_pg_election
1546            .delete_value(&leader_pg_election.election_key())
1547            .await
1548            .unwrap();
1549        assert!(follower_pg_election.follower_action().await.is_err());
1550
1551        // Step 4: Follower thinks it's the leader but failed to acquire the lock.
1552        follower_pg_election
1553            .is_leader
1554            .store(true, Ordering::Relaxed);
1555        assert!(follower_pg_election.follower_action().await.is_err());
1556
1557        match rx.recv().await {
1558            Ok(LeaderChangeMessage::StepDown(key)) => {
1559                assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1560                assert_eq!(
1561                    String::from_utf8_lossy(key.key()),
1562                    follower_pg_election.election_key()
1563                );
1564                assert_eq!(key.lease_id(), i64::default());
1565                assert_eq!(key.revision(), i64::default());
1566            }
1567            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1568        }
1569
1570        // Clean up
1571        leader_pg_election
1572            .pg_client
1573            .read()
1574            .await
1575            .query(&leader_pg_election.sql_set.step_down, &[])
1576            .await
1577            .unwrap();
1578
1579        drop_table(&follower_pg_election, table_name).await;
1580    }
1581
1582    #[tokio::test]
1583    async fn test_idle_session_timeout() {
1584        maybe_skip_postgres_integration_test!();
1585        common_telemetry::init_default_ut_logging();
1586        let execution_timeout = Duration::from_secs(10);
1587        let statement_timeout = Duration::from_secs(10);
1588        let idle_session_timeout = Duration::from_secs(1);
1589        let mut client = create_postgres_client(
1590            None,
1591            execution_timeout,
1592            idle_session_timeout,
1593            statement_timeout,
1594        )
1595        .await
1596        .unwrap();
1597        tokio::time::sleep(Duration::from_millis(1100)).await;
1598        // Wait for the idle session timeout.
1599        let err = client.query("SELECT 1", &[]).await.unwrap_err();
1600        assert_matches!(err, error::Error::PostgresExecution { .. });
1601        let error::Error::PostgresExecution { error, .. } = err else {
1602            panic!("Expected PostgresExecution error");
1603        };
1604        assert!(error.is_closed());
1605        // Reset the client and try again.
1606        client.reset_client().await.unwrap();
1607        let _ = client.query("SELECT 1", &[]).await.unwrap();
1608    }
1609
1610    #[test]
1611    fn test_election_sql_with_schema() {
1612        let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1613        let s = f.build();
1614        assert!(s.campaign.contains("pg_try_advisory_lock"));
1615        assert!(
1616            s.put_value_with_lease
1617                .contains("\"test_schema\".\"greptime_metakv\"")
1618        );
1619        assert!(
1620            s.update_value_with_lease
1621                .contains("\"test_schema\".\"greptime_metakv\"")
1622        );
1623        assert!(
1624            s.get_value_with_lease
1625                .contains("\"test_schema\".\"greptime_metakv\"")
1626        );
1627        assert!(
1628            s.get_value_with_lease_by_prefix
1629                .contains("\"test_schema\".\"greptime_metakv\"")
1630        );
1631        assert!(
1632            s.delete_value
1633                .contains("\"test_schema\".\"greptime_metakv\"")
1634        );
1635    }
1636}