meta_srv/election/rds/
postgres.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, warn};
21use common_time::Timestamp;
22use deadpool_postgres::{Manager, Pool};
23use snafu::{OptionExt, ResultExt, ensure};
24use tokio::sync::{RwLock, broadcast};
25use tokio::time::MissedTickBehavior;
26use tokio_postgres::Row;
27use tokio_postgres::types::ToSql;
28
29use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
30use crate::election::{
31    Election, LeaderChangeMessage, listen_leader_change, send_leader_change_and_set_flags,
32};
33use crate::error::{
34    DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35    Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40    lock_id: u64,
41    schema_name: Option<&'a str>,
42    table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46    campaign: String,
47    step_down: String,
48    // SQL to put a value with expire time.
49    //
50    // Parameters for the query:
51    // `$1`: key,
52    // `$2`: value,
53    // `$3`: lease time in seconds
54    //
55    // Returns:
56    // If the key already exists, return the previous value.
57    put_value_with_lease: String,
58    // SQL to update a value with expire time.
59    //
60    // Parameters for the query:
61    // `$1`: key,
62    // `$2`: previous value,
63    // `$3`: updated value,
64    // `$4`: lease time in seconds
65    update_value_with_lease: String,
66    // SQL to get a value with expire time.
67    //
68    // Parameters:
69    // `$1`: key
70    get_value_with_lease: String,
71    // SQL to get all values with expire time with the given key prefix.
72    //
73    // Parameters:
74    // `$1`: key prefix like 'prefix%'
75    //
76    // Returns:
77    // column 0: value,
78    // column 1: current timestamp
79    get_value_with_lease_by_prefix: String,
80    // SQL to delete a value.
81    //
82    // Parameters:
83    // `$1`: key
84    //
85    // Returns:
86    // column 0: key deleted,
87    // column 1: value deleted
88    delete_value: String,
89}
90
91impl<'a> ElectionSqlFactory<'a> {
92    fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
93        Self {
94            lock_id,
95            schema_name,
96            table_name,
97        }
98    }
99
100    fn table_ident(&self) -> String {
101        match self.schema_name {
102            Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
103            _ => format!("\"{}\"", self.table_name),
104        }
105    }
106
107    fn build(self) -> ElectionSqlSet {
108        ElectionSqlSet {
109            campaign: self.campaign_sql(),
110            step_down: self.step_down_sql(),
111            put_value_with_lease: self.put_value_with_lease_sql(),
112            update_value_with_lease: self.update_value_with_lease_sql(),
113            get_value_with_lease: self.get_value_with_lease_sql(),
114            get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
115            delete_value: self.delete_value_sql(),
116        }
117    }
118
119    fn campaign_sql(&self) -> String {
120        format!("SELECT pg_try_advisory_lock({})", self.lock_id)
121    }
122
123    fn step_down_sql(&self) -> String {
124        format!("SELECT pg_advisory_unlock({})", self.lock_id)
125    }
126
127    fn put_value_with_lease_sql(&self) -> String {
128        let table = self.table_ident();
129        format!(
130            r#"WITH prev AS (
131                SELECT k, v FROM {table} WHERE k = $1
132            ), insert AS (
133                INSERT INTO {table}
134                VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
135                ON CONFLICT (k) DO NOTHING
136            )
137            SELECT k, v FROM prev;
138            "#,
139            table = table,
140            lease_sep = LEASE_SEP
141        )
142    }
143
144    fn update_value_with_lease_sql(&self) -> String {
145        let table = self.table_ident();
146        format!(
147            r#"UPDATE {table}
148               SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
149               WHERE k = $1 AND v = $2"#,
150            table = table,
151            lease_sep = LEASE_SEP
152        )
153    }
154
155    fn get_value_with_lease_sql(&self) -> String {
156        let table = self.table_ident();
157        format!(
158            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
159            table = table
160        )
161    }
162
163    fn get_value_with_lease_by_prefix_sql(&self) -> String {
164        let table = self.table_ident();
165        format!(
166            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
167            table = table
168        )
169    }
170
171    fn delete_value_sql(&self) -> String {
172        let table = self.table_ident();
173        format!(
174            "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
175            table = table
176        )
177    }
178}
179
180/// PgClient for election.
181pub struct ElectionPgClient {
182    current: Option<deadpool::managed::Object<Manager>>,
183    pool: Pool,
184    /// The client-side timeout for statement execution.
185    ///
186    /// This timeout is enforced by the client application and is independent of any server-side timeouts.
187    /// If a statement takes longer than this duration to execute, the client will abort the operation.
188    execution_timeout: Duration,
189
190    /// The idle session timeout.
191    ///
192    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
193    /// If a session remains idle for longer than this duration, the server will terminate it.
194    idle_session_timeout: Duration,
195
196    /// The statement timeout.
197    ///
198    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
199    /// If a statement takes longer than this duration to execute, the server will abort it.
200    statement_timeout: Duration,
201}
202
203impl ElectionPgClient {
204    pub fn new(
205        pool: Pool,
206        execution_timeout: Duration,
207        idle_session_timeout: Duration,
208        statement_timeout: Duration,
209    ) -> Result<ElectionPgClient> {
210        Ok(ElectionPgClient {
211            current: None,
212            pool,
213            execution_timeout,
214            idle_session_timeout,
215            statement_timeout,
216        })
217    }
218
219    fn set_idle_session_timeout_sql(&self) -> String {
220        format!(
221            "SET idle_session_timeout = '{}s';",
222            self.idle_session_timeout.as_secs()
223        )
224    }
225
226    fn set_statement_timeout_sql(&self) -> String {
227        format!(
228            "SET statement_timeout = '{}s';",
229            self.statement_timeout.as_secs()
230        )
231    }
232
233    async fn reset_client(&mut self) -> Result<()> {
234        self.current = None;
235        self.maybe_init_client().await
236    }
237
238    async fn maybe_init_client(&mut self) -> Result<()> {
239        if self.current.is_none() {
240            let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
241
242            self.current = Some(client);
243            // Set idle session timeout and statement timeout.
244            let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
245            self.execute(&idle_session_timeout_sql, &[]).await?;
246            let statement_timeout_sql = self.set_statement_timeout_sql();
247            self.execute(&statement_timeout_sql, &[]).await?;
248        }
249
250        Ok(())
251    }
252
253    /// Returns the result of the query.
254    ///
255    /// # Panics
256    /// if `current` is `None`.
257    async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
258        let result = tokio::time::timeout(
259            self.execution_timeout,
260            self.current.as_ref().unwrap().execute(sql, params),
261        )
262        .await
263        .map_err(|_| {
264            SqlExecutionTimeoutSnafu {
265                sql: sql.to_string(),
266                duration: self.execution_timeout,
267            }
268            .build()
269        })?;
270
271        result.context(PostgresExecutionSnafu { sql })
272    }
273
274    /// Returns the result of the query.
275    ///
276    /// # Panics
277    /// if `current` is `None`.
278    async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
279        let result = tokio::time::timeout(
280            self.execution_timeout,
281            self.current.as_ref().unwrap().query(sql, params),
282        )
283        .await
284        .map_err(|_| {
285            SqlExecutionTimeoutSnafu {
286                sql: sql.to_string(),
287                duration: self.execution_timeout,
288            }
289            .build()
290        })?;
291
292        result.context(PostgresExecutionSnafu { sql })
293    }
294}
295
296/// PostgreSql implementation of Election.
297pub struct PgElection {
298    leader_value: String,
299    pg_client: RwLock<ElectionPgClient>,
300    is_leader: AtomicBool,
301    leader_infancy: AtomicBool,
302    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
303    store_key_prefix: String,
304    candidate_lease_ttl: Duration,
305    meta_lease_ttl: Duration,
306    sql_set: ElectionSqlSet,
307}
308
309impl PgElection {
310    async fn maybe_init_client(&self) -> Result<()> {
311        if self.pg_client.read().await.current.is_none() {
312            self.pg_client.write().await.maybe_init_client().await?;
313        }
314
315        Ok(())
316    }
317
318    #[allow(clippy::too_many_arguments)]
319    pub async fn with_pg_client(
320        leader_value: String,
321        pg_client: ElectionPgClient,
322        store_key_prefix: String,
323        candidate_lease_ttl: Duration,
324        meta_lease_ttl: Duration,
325        schema_name: Option<&str>,
326        table_name: &str,
327        lock_id: u64,
328    ) -> Result<ElectionRef> {
329        if let Some(s) = schema_name {
330            common_telemetry::info!("PgElection uses schema: {}", s);
331        } else {
332            common_telemetry::info!("PgElection uses default search_path (no schema provided)");
333        }
334        let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
335
336        let tx = listen_leader_change(leader_value.clone());
337        Ok(Arc::new(Self {
338            leader_value,
339            pg_client: RwLock::new(pg_client),
340            is_leader: AtomicBool::new(false),
341            leader_infancy: AtomicBool::new(false),
342            leader_watcher: tx,
343            store_key_prefix,
344            candidate_lease_ttl,
345            meta_lease_ttl,
346            sql_set: sql_factory.build(),
347        }))
348    }
349
350    fn election_key(&self) -> String {
351        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
352    }
353
354    fn candidate_root(&self) -> String {
355        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
356    }
357
358    fn candidate_key(&self) -> String {
359        format!("{}{}", self.candidate_root(), self.leader_value)
360    }
361}
362
363#[async_trait::async_trait]
364impl Election for PgElection {
365    type Leader = LeaderValue;
366
367    fn is_leader(&self) -> bool {
368        self.is_leader.load(Ordering::Relaxed)
369    }
370
371    fn in_leader_infancy(&self) -> bool {
372        self.leader_infancy
373            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
374            .is_ok()
375    }
376
377    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
378        let key = self.candidate_key();
379        let node_info =
380            serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
381                input: format!("{node_info:?}"),
382            })?;
383        let res = self
384            .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
385            .await?;
386        // May registered before, just update the lease.
387        if !res {
388            self.delete_value(&key).await?;
389            self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
390                .await?;
391        }
392
393        // Check if the current lease has expired and renew the lease.
394        let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
395        loop {
396            let _ = keep_alive_interval.tick().await;
397
398            let lease = self
399                .get_value_with_lease(&key)
400                .await?
401                .context(UnexpectedSnafu {
402                    violated: format!("Failed to get lease for key: {:?}", key),
403                })?;
404
405            ensure!(
406                lease.expire_time > lease.current,
407                UnexpectedSnafu {
408                    violated: format!(
409                        "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
410                        lease.expire_time, lease.current, key
411                    ),
412                }
413            );
414
415            // Safety: origin is Some since we are using `get_value_with_lease` with `true`.
416            self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
417                .await?;
418        }
419    }
420
421    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
422        let key_prefix = self.candidate_root();
423        let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
424        // Remove expired candidates
425        candidates.retain(|c| c.1 > current);
426        let mut valid_candidates = Vec::with_capacity(candidates.len());
427        for (c, _) in candidates {
428            let node_info: MetasrvNodeInfo =
429                serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
430                    input: format!("{:?}", c),
431                })?;
432            valid_candidates.push(node_info);
433        }
434        Ok(valid_candidates)
435    }
436
437    /// Attempts to acquire leadership by executing a campaign. This function continuously checks
438    /// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
439    ///
440    /// The function operates in a loop, where it:
441    ///
442    /// 1. Waits for a predefined interval before attempting to acquire the lock again.
443    /// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock.
444    /// 3. Checks the result of the query:
445    ///    - If the lock is successfully acquired (result is true), it calls the `leader_action` method
446    ///      to perform actions as the leader.
447    ///    - If the lock is not acquired (result is false), it calls the `follower_action` method
448    ///      to perform actions as a follower.
449    async fn campaign(&self) -> Result<()> {
450        let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
451        keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
452
453        self.maybe_init_client().await?;
454        loop {
455            let res = self
456                .pg_client
457                .read()
458                .await
459                .query(&self.sql_set.campaign, &[])
460                .await?;
461            let row = res.first().context(UnexpectedSnafu {
462                violated: "Failed to get the result of acquiring advisory lock",
463            })?;
464            let is_leader = row.try_get(0).map_err(|_| {
465                UnexpectedSnafu {
466                    violated: "Failed to get the result of get lock",
467                }
468                .build()
469            })?;
470            if is_leader {
471                self.leader_action().await?;
472            } else {
473                self.follower_action().await?;
474            }
475            let _ = keep_alive_interval.tick().await;
476        }
477    }
478
479    async fn reset_campaign(&self) {
480        if let Err(err) = self.pg_client.write().await.reset_client().await {
481            error!(err; "Failed to reset client");
482        }
483    }
484
485    async fn leader(&self) -> Result<Self::Leader> {
486        if self.is_leader.load(Ordering::Relaxed) {
487            Ok(self.leader_value.as_bytes().into())
488        } else {
489            let key = self.election_key();
490            if let Some(lease) = self.get_value_with_lease(&key).await? {
491                ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
492                Ok(lease.leader_value.as_bytes().into())
493            } else {
494                NoLeaderSnafu.fail()
495            }
496        }
497    }
498
499    async fn resign(&self) -> Result<()> {
500        todo!()
501    }
502
503    fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
504        self.leader_watcher.subscribe()
505    }
506}
507
508impl PgElection {
509    /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
510    async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
511        let key = key.as_bytes();
512        self.maybe_init_client().await?;
513        let res = self
514            .pg_client
515            .read()
516            .await
517            .query(&self.sql_set.get_value_with_lease, &[&key])
518            .await?;
519
520        if res.is_empty() {
521            Ok(None)
522        } else {
523            // Safety: Checked if res is empty above.
524            let current_time_str = res[0].try_get(1).unwrap_or_default();
525            let current_time = match Timestamp::from_str(current_time_str, None) {
526                Ok(ts) => ts,
527                Err(_) => UnexpectedSnafu {
528                    violated: format!("Invalid timestamp: {}", current_time_str),
529                }
530                .fail()?,
531            };
532            // Safety: Checked if res is empty above.
533            let value_and_expire_time =
534                String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
535            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
536
537            Ok(Some(Lease {
538                leader_value: value,
539                expire_time,
540                current: current_time,
541                origin: value_and_expire_time.to_string(),
542            }))
543        }
544    }
545
546    /// Returns all values and expire time with the given key prefix. Also returns the current time.
547    async fn get_value_with_lease_by_prefix(
548        &self,
549        key_prefix: &str,
550    ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
551        let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
552        self.maybe_init_client().await?;
553        let res = self
554            .pg_client
555            .read()
556            .await
557            .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
558            .await?;
559
560        let mut values_with_leases = vec![];
561        let mut current = Timestamp::default();
562        for row in res {
563            let current_time_str = row.try_get(1).unwrap_or_default();
564            current = match Timestamp::from_str(current_time_str, None) {
565                Ok(ts) => ts,
566                Err(_) => UnexpectedSnafu {
567                    violated: format!("Invalid timestamp: {}", current_time_str),
568                }
569                .fail()?,
570            };
571
572            let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
573            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
574
575            values_with_leases.push((value, expire_time));
576        }
577        Ok((values_with_leases, current))
578    }
579
580    async fn update_value_with_lease(
581        &self,
582        key: &str,
583        prev: &str,
584        updated: &str,
585        lease_ttl: Duration,
586    ) -> Result<()> {
587        let key = key.as_bytes();
588        let prev = prev.as_bytes();
589        self.maybe_init_client().await?;
590        let lease_ttl_secs = lease_ttl.as_secs() as f64;
591        let res = self
592            .pg_client
593            .read()
594            .await
595            .execute(
596                &self.sql_set.update_value_with_lease,
597                &[&key, &prev, &updated, &lease_ttl_secs],
598            )
599            .await?;
600
601        ensure!(
602            res == 1,
603            UnexpectedSnafu {
604                violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
605            }
606        );
607
608        Ok(())
609    }
610
611    /// Returns `true` if the insertion is successful
612    async fn put_value_with_lease(
613        &self,
614        key: &str,
615        value: &str,
616        lease_ttl: Duration,
617    ) -> Result<bool> {
618        let key = key.as_bytes();
619        let lease_ttl_secs = lease_ttl.as_secs() as f64;
620        let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
621        self.maybe_init_client().await?;
622        let res = self
623            .pg_client
624            .read()
625            .await
626            .query(&self.sql_set.put_value_with_lease, &params)
627            .await?;
628        Ok(res.is_empty())
629    }
630
631    /// Returns `true` if the deletion is successful.
632    /// Caution: Should only delete the key if the lease is expired.
633    async fn delete_value(&self, key: &str) -> Result<bool> {
634        let key = key.as_bytes();
635        self.maybe_init_client().await?;
636        let res = self
637            .pg_client
638            .read()
639            .await
640            .query(&self.sql_set.delete_value, &[&key])
641            .await?;
642
643        Ok(res.len() == 1)
644    }
645
646    /// Handles the actions of a leader in the election process.
647    ///
648    /// This function performs the following checks and actions:
649    ///
650    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
651    ///   it attempts to renew the lease. It checks if the lease is still valid and either renews it
652    ///   or steps down if it has expired.
653    ///
654    ///   - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease
655    ///     by updating the value associated with the election key.
656    ///   - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning
657    ///     and steps down, initiating a new campaign for leadership.
658    ///   - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning
659    ///     indicating that it still holds the lock and steps down to re-initiate the campaign. This may
660    ///     happen if the leader has failed to renew the lease and the session has expired, and recovery
661    ///     after a period of time during which other leaders have been elected and stepped down.
662    ///   - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
663    ///
664    /// - **Case 2**: If the current instance is not leader previously, it calls the `elected` method
665    ///   as a newly elected leader.
666    async fn leader_action(&self) -> Result<()> {
667        let key = self.election_key();
668        // Case 1
669        if self.is_leader() {
670            match self.get_value_with_lease(&key).await? {
671                Some(lease) => {
672                    match (
673                        lease.leader_value == self.leader_value,
674                        lease.expire_time > lease.current,
675                    ) {
676                        // Case 1.1
677                        (true, true) => {
678                            // Safety: prev is Some since we are using `get_value_with_lease` with `true`.
679                            self.update_value_with_lease(
680                                &key,
681                                &lease.origin,
682                                &self.leader_value,
683                                self.meta_lease_ttl,
684                            )
685                            .await?;
686                        }
687                        // Case 1.2
688                        (true, false) => {
689                            warn!("Leader lease expired, now stepping down.");
690                            self.step_down().await?;
691                        }
692                        // Case 1.3
693                        (false, _) => {
694                            warn!(
695                                "Leader lease not found, but still hold the lock. Now stepping down."
696                            );
697                            self.step_down().await?;
698                        }
699                    }
700                }
701                // Case 1.4
702                None => {
703                    warn!("Leader lease not found, but still hold the lock. Now stepping down.");
704                    self.step_down().await?;
705                }
706            }
707        // Case 2
708        } else {
709            self.elected().await?;
710        }
711        Ok(())
712    }
713
714    /// Handles the actions of a follower in the election process.
715    ///
716    /// This function performs the following checks and actions:
717    ///
718    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
719    ///   it steps down without deleting the key.
720    /// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error
721    ///   to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
722    ///   will be released.
723    /// - **Case 3**: If all checks pass, the function returns without performing any actions.
724    async fn follower_action(&self) -> Result<()> {
725        let key = self.election_key();
726        // Case 1
727        if self.is_leader() {
728            self.step_down_without_lock().await?;
729        }
730        let lease = self
731            .get_value_with_lease(&key)
732            .await?
733            .context(NoLeaderSnafu)?;
734        // Case 2
735        ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
736        // Case 3
737        Ok(())
738    }
739
740    /// Step down the leader. The leader should delete the key and notify the leader watcher.
741    ///
742    /// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected.
743    ///
744    /// ## Caution:
745    /// Should only step down while holding the advisory lock.
746    async fn step_down(&self) -> Result<()> {
747        let key = self.election_key();
748        let leader_key = RdsLeaderKey {
749            name: self.leader_value.clone().into_bytes(),
750            key: key.clone().into_bytes(),
751            ..Default::default()
752        };
753        self.delete_value(&key).await?;
754        self.maybe_init_client().await?;
755        self.pg_client
756            .read()
757            .await
758            .query(&self.sql_set.step_down, &[])
759            .await?;
760        send_leader_change_and_set_flags(
761            &self.is_leader,
762            &self.leader_infancy,
763            &self.leader_watcher,
764            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
765        );
766        Ok(())
767    }
768
769    /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
770    async fn step_down_without_lock(&self) -> Result<()> {
771        let key = self.election_key().into_bytes();
772        let leader_key = RdsLeaderKey {
773            name: self.leader_value.clone().into_bytes(),
774            key: key.clone(),
775            ..Default::default()
776        };
777        if self
778            .is_leader
779            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
780            .is_ok()
781            && let Err(e) = self
782                .leader_watcher
783                .send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
784        {
785            error!(e; "Failed to send leader change message");
786        }
787        Ok(())
788    }
789
790    /// Elected as leader. The leader should put the key and notify the leader watcher.
791    /// Caution: Should only elected while holding the advisory lock.
792    async fn elected(&self) -> Result<()> {
793        let key = self.election_key();
794        let leader_key = RdsLeaderKey {
795            name: self.leader_value.clone().into_bytes(),
796            key: key.clone().into_bytes(),
797            ..Default::default()
798        };
799        self.delete_value(&key).await?;
800        self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
801            .await?;
802
803        if self
804            .is_leader
805            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
806            .is_ok()
807        {
808            self.leader_infancy.store(true, Ordering::Release);
809
810            if let Err(e) = self
811                .leader_watcher
812                .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
813            {
814                error!(e; "Failed to send leader change message");
815            }
816        }
817        Ok(())
818    }
819}
820
821#[cfg(test)]
822mod tests {
823    use std::assert_matches::assert_matches;
824    use std::env;
825
826    use common_meta::maybe_skip_postgres_integration_test;
827
828    use super::*;
829    use crate::error;
830    use crate::utils::postgres::create_postgres_pool;
831
832    async fn create_postgres_client(
833        table_name: Option<&str>,
834        execution_timeout: Duration,
835        idle_session_timeout: Duration,
836        statement_timeout: Duration,
837    ) -> Result<ElectionPgClient> {
838        let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
839        if endpoint.is_empty() {
840            return UnexpectedSnafu {
841                violated: "Postgres endpoint is empty".to_string(),
842            }
843            .fail();
844        }
845        let pool = create_postgres_pool(&[endpoint], None, None).await.unwrap();
846        let mut pg_client = ElectionPgClient::new(
847            pool,
848            execution_timeout,
849            idle_session_timeout,
850            statement_timeout,
851        )
852        .unwrap();
853        pg_client.maybe_init_client().await?;
854        if let Some(table_name) = table_name {
855            let create_table_sql = format!(
856                "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
857                table_name
858            );
859            pg_client.execute(&create_table_sql, &[]).await?;
860        }
861        Ok(pg_client)
862    }
863
864    async fn drop_table(pg_election: &PgElection, table_name: &str) {
865        let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
866        pg_election
867            .pg_client
868            .read()
869            .await
870            .execute(&sql, &[])
871            .await
872            .unwrap();
873    }
874
875    #[tokio::test]
876    async fn test_postgres_crud() {
877        maybe_skip_postgres_integration_test!();
878        let key = "test_key".to_string();
879        let value = "test_value".to_string();
880
881        let uuid = uuid::Uuid::new_v4().to_string();
882        let table_name = "test_postgres_crud_greptime_metakv";
883        let candidate_lease_ttl = Duration::from_secs(10);
884        let execution_timeout = Duration::from_secs(10);
885        let statement_timeout = Duration::from_secs(10);
886        let meta_lease_ttl = Duration::from_secs(2);
887        let idle_session_timeout = Duration::from_secs(0);
888        let client = create_postgres_client(
889            Some(table_name),
890            execution_timeout,
891            idle_session_timeout,
892            statement_timeout,
893        )
894        .await
895        .unwrap();
896
897        let (tx, _) = broadcast::channel(100);
898        let pg_election = PgElection {
899            leader_value: "test_leader".to_string(),
900            pg_client: RwLock::new(client),
901            is_leader: AtomicBool::new(false),
902            leader_infancy: AtomicBool::new(true),
903            leader_watcher: tx,
904            store_key_prefix: uuid,
905            candidate_lease_ttl,
906            meta_lease_ttl,
907            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
908        };
909
910        let res = pg_election
911            .put_value_with_lease(&key, &value, candidate_lease_ttl)
912            .await
913            .unwrap();
914        assert!(res);
915
916        let lease = pg_election
917            .get_value_with_lease(&key)
918            .await
919            .unwrap()
920            .unwrap();
921        assert_eq!(lease.leader_value, value);
922
923        pg_election
924            .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
925            .await
926            .unwrap();
927
928        let res = pg_election.delete_value(&key).await.unwrap();
929        assert!(res);
930
931        let res = pg_election.get_value_with_lease(&key).await.unwrap();
932        assert!(res.is_none());
933
934        for i in 0..10 {
935            let key = format!("test_key_{}", i);
936            let value = format!("test_value_{}", i);
937            pg_election
938                .put_value_with_lease(&key, &value, candidate_lease_ttl)
939                .await
940                .unwrap();
941        }
942
943        let key_prefix = "test_key".to_string();
944        let (res, _) = pg_election
945            .get_value_with_lease_by_prefix(&key_prefix)
946            .await
947            .unwrap();
948        assert_eq!(res.len(), 10);
949
950        for i in 0..10 {
951            let key = format!("test_key_{}", i);
952            let res = pg_election.delete_value(&key).await.unwrap();
953            assert!(res);
954        }
955
956        let (res, current) = pg_election
957            .get_value_with_lease_by_prefix(&key_prefix)
958            .await
959            .unwrap();
960        assert!(res.is_empty());
961        assert!(current == Timestamp::default());
962
963        drop_table(&pg_election, table_name).await;
964    }
965
966    async fn candidate(
967        leader_value: String,
968        candidate_lease_ttl: Duration,
969        store_key_prefix: String,
970        table_name: String,
971    ) {
972        let execution_timeout = Duration::from_secs(10);
973        let statement_timeout = Duration::from_secs(10);
974        let meta_lease_ttl = Duration::from_secs(2);
975        let idle_session_timeout = Duration::from_secs(0);
976        let client = create_postgres_client(
977            None,
978            execution_timeout,
979            idle_session_timeout,
980            statement_timeout,
981        )
982        .await
983        .unwrap();
984
985        let (tx, _) = broadcast::channel(100);
986        let pg_election = PgElection {
987            leader_value,
988            pg_client: RwLock::new(client),
989            is_leader: AtomicBool::new(false),
990            leader_infancy: AtomicBool::new(true),
991            leader_watcher: tx,
992            store_key_prefix,
993            candidate_lease_ttl,
994            meta_lease_ttl,
995            sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
996        };
997
998        let node_info = MetasrvNodeInfo {
999            addr: "test_addr".to_string(),
1000            version: "test_version".to_string(),
1001            git_commit: "test_git_commit".to_string(),
1002            start_time_ms: 0,
1003            cpus: 0,
1004            memory_bytes: 0,
1005        };
1006        pg_election.register_candidate(&node_info).await.unwrap();
1007    }
1008
1009    #[tokio::test]
1010    async fn test_candidate_registration() {
1011        maybe_skip_postgres_integration_test!();
1012        let leader_value_prefix = "test_leader".to_string();
1013        let uuid = uuid::Uuid::new_v4().to_string();
1014        let table_name = "test_candidate_registration_greptime_metakv";
1015        let mut handles = vec![];
1016        let candidate_lease_ttl = Duration::from_secs(5);
1017        let execution_timeout = Duration::from_secs(10);
1018        let statement_timeout = Duration::from_secs(10);
1019        let meta_lease_ttl = Duration::from_secs(2);
1020        let idle_session_timeout = Duration::from_secs(0);
1021        let client = create_postgres_client(
1022            Some(table_name),
1023            execution_timeout,
1024            idle_session_timeout,
1025            statement_timeout,
1026        )
1027        .await
1028        .unwrap();
1029
1030        for i in 0..10 {
1031            let leader_value = format!("{}{}", leader_value_prefix, i);
1032            let handle = tokio::spawn(candidate(
1033                leader_value,
1034                candidate_lease_ttl,
1035                uuid.clone(),
1036                table_name.to_string(),
1037            ));
1038            handles.push(handle);
1039        }
1040        // Wait for candidates to register themselves and renew their leases at least once.
1041        tokio::time::sleep(Duration::from_secs(3)).await;
1042
1043        let (tx, _) = broadcast::channel(100);
1044        let leader_value = "test_leader".to_string();
1045        let pg_election = PgElection {
1046            leader_value,
1047            pg_client: RwLock::new(client),
1048            is_leader: AtomicBool::new(false),
1049            leader_infancy: AtomicBool::new(true),
1050            leader_watcher: tx,
1051            store_key_prefix: uuid.clone(),
1052            candidate_lease_ttl,
1053            meta_lease_ttl,
1054            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1055        };
1056
1057        let candidates = pg_election.all_candidates().await.unwrap();
1058        assert_eq!(candidates.len(), 10);
1059
1060        for handle in handles {
1061            handle.abort();
1062        }
1063
1064        // Wait for the candidate leases to expire.
1065        tokio::time::sleep(Duration::from_secs(5)).await;
1066        let candidates = pg_election.all_candidates().await.unwrap();
1067        assert!(candidates.is_empty());
1068
1069        // Garbage collection
1070        for i in 0..10 {
1071            let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1072            let res = pg_election.delete_value(&key).await.unwrap();
1073            assert!(res);
1074        }
1075
1076        drop_table(&pg_election, table_name).await;
1077    }
1078
1079    #[tokio::test]
1080    async fn test_elected_and_step_down() {
1081        maybe_skip_postgres_integration_test!();
1082        let leader_value = "test_leader".to_string();
1083        let uuid = uuid::Uuid::new_v4().to_string();
1084        let table_name = "test_elected_and_step_down_greptime_metakv";
1085        let candidate_lease_ttl = Duration::from_secs(5);
1086        let execution_timeout = Duration::from_secs(10);
1087        let statement_timeout = Duration::from_secs(10);
1088        let meta_lease_ttl = Duration::from_secs(2);
1089        let idle_session_timeout = Duration::from_secs(0);
1090        let client = create_postgres_client(
1091            Some(table_name),
1092            execution_timeout,
1093            idle_session_timeout,
1094            statement_timeout,
1095        )
1096        .await
1097        .unwrap();
1098
1099        let (tx, mut rx) = broadcast::channel(100);
1100        let leader_pg_election = PgElection {
1101            leader_value: leader_value.clone(),
1102            pg_client: RwLock::new(client),
1103            is_leader: AtomicBool::new(false),
1104            leader_infancy: AtomicBool::new(true),
1105            leader_watcher: tx,
1106            store_key_prefix: uuid,
1107            candidate_lease_ttl,
1108            meta_lease_ttl,
1109            sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1110        };
1111
1112        leader_pg_election.elected().await.unwrap();
1113        let lease = leader_pg_election
1114            .get_value_with_lease(&leader_pg_election.election_key())
1115            .await
1116            .unwrap()
1117            .unwrap();
1118        assert!(lease.leader_value == leader_value);
1119        assert!(lease.expire_time > lease.current);
1120        assert!(leader_pg_election.is_leader());
1121
1122        match rx.recv().await {
1123            Ok(LeaderChangeMessage::Elected(key)) => {
1124                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1125                assert_eq!(
1126                    String::from_utf8_lossy(key.key()),
1127                    leader_pg_election.election_key()
1128                );
1129                assert_eq!(key.lease_id(), i64::default());
1130                assert_eq!(key.revision(), i64::default());
1131            }
1132            _ => panic!("Expected LeaderChangeMessage::Elected"),
1133        }
1134
1135        leader_pg_election.step_down_without_lock().await.unwrap();
1136        let lease = leader_pg_election
1137            .get_value_with_lease(&leader_pg_election.election_key())
1138            .await
1139            .unwrap()
1140            .unwrap();
1141        assert!(lease.leader_value == leader_value);
1142        assert!(!leader_pg_election.is_leader());
1143
1144        match rx.recv().await {
1145            Ok(LeaderChangeMessage::StepDown(key)) => {
1146                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1147                assert_eq!(
1148                    String::from_utf8_lossy(key.key()),
1149                    leader_pg_election.election_key()
1150                );
1151                assert_eq!(key.lease_id(), i64::default());
1152                assert_eq!(key.revision(), i64::default());
1153            }
1154            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1155        }
1156
1157        leader_pg_election.elected().await.unwrap();
1158        let lease = leader_pg_election
1159            .get_value_with_lease(&leader_pg_election.election_key())
1160            .await
1161            .unwrap()
1162            .unwrap();
1163        assert!(lease.leader_value == leader_value);
1164        assert!(lease.expire_time > lease.current);
1165        assert!(leader_pg_election.is_leader());
1166
1167        match rx.recv().await {
1168            Ok(LeaderChangeMessage::Elected(key)) => {
1169                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1170                assert_eq!(
1171                    String::from_utf8_lossy(key.key()),
1172                    leader_pg_election.election_key()
1173                );
1174                assert_eq!(key.lease_id(), i64::default());
1175                assert_eq!(key.revision(), i64::default());
1176            }
1177            _ => panic!("Expected LeaderChangeMessage::Elected"),
1178        }
1179
1180        leader_pg_election.step_down().await.unwrap();
1181        let res = leader_pg_election
1182            .get_value_with_lease(&leader_pg_election.election_key())
1183            .await
1184            .unwrap();
1185        assert!(res.is_none());
1186        assert!(!leader_pg_election.is_leader());
1187
1188        match rx.recv().await {
1189            Ok(LeaderChangeMessage::StepDown(key)) => {
1190                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1191                assert_eq!(
1192                    String::from_utf8_lossy(key.key()),
1193                    leader_pg_election.election_key()
1194                );
1195                assert_eq!(key.lease_id(), i64::default());
1196                assert_eq!(key.revision(), i64::default());
1197            }
1198            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1199        }
1200
1201        drop_table(&leader_pg_election, table_name).await;
1202    }
1203
1204    #[tokio::test]
1205    async fn test_leader_action() {
1206        maybe_skip_postgres_integration_test!();
1207        let leader_value = "test_leader".to_string();
1208        let uuid = uuid::Uuid::new_v4().to_string();
1209        let table_name = "test_leader_action_greptime_metakv";
1210        let candidate_lease_ttl = Duration::from_secs(5);
1211        let execution_timeout = Duration::from_secs(10);
1212        let statement_timeout = Duration::from_secs(10);
1213        let meta_lease_ttl = Duration::from_secs(2);
1214        let idle_session_timeout = Duration::from_secs(0);
1215        let client = create_postgres_client(
1216            Some(table_name),
1217            execution_timeout,
1218            idle_session_timeout,
1219            statement_timeout,
1220        )
1221        .await
1222        .unwrap();
1223
1224        let (tx, mut rx) = broadcast::channel(100);
1225        let leader_pg_election = PgElection {
1226            leader_value: leader_value.clone(),
1227            pg_client: RwLock::new(client),
1228            is_leader: AtomicBool::new(false),
1229            leader_infancy: AtomicBool::new(true),
1230            leader_watcher: tx,
1231            store_key_prefix: uuid,
1232            candidate_lease_ttl,
1233            meta_lease_ttl,
1234            sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1235        };
1236
1237        // Step 1: No leader exists, campaign and elected.
1238        let res = leader_pg_election
1239            .pg_client
1240            .read()
1241            .await
1242            .query(&leader_pg_election.sql_set.campaign, &[])
1243            .await
1244            .unwrap();
1245        let res: bool = res[0].get(0);
1246        assert!(res);
1247        leader_pg_election.leader_action().await.unwrap();
1248        let lease = leader_pg_election
1249            .get_value_with_lease(&leader_pg_election.election_key())
1250            .await
1251            .unwrap()
1252            .unwrap();
1253        assert!(lease.leader_value == leader_value);
1254        assert!(lease.expire_time > lease.current);
1255        assert!(leader_pg_election.is_leader());
1256
1257        match rx.recv().await {
1258            Ok(LeaderChangeMessage::Elected(key)) => {
1259                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1260                assert_eq!(
1261                    String::from_utf8_lossy(key.key()),
1262                    leader_pg_election.election_key()
1263                );
1264                assert_eq!(key.lease_id(), i64::default());
1265                assert_eq!(key.revision(), i64::default());
1266            }
1267            _ => panic!("Expected LeaderChangeMessage::Elected"),
1268        }
1269
1270        // Step 2: As a leader, renew the lease.
1271        let res = leader_pg_election
1272            .pg_client
1273            .read()
1274            .await
1275            .query(&leader_pg_election.sql_set.campaign, &[])
1276            .await
1277            .unwrap();
1278        let res: bool = res[0].get(0);
1279        assert!(res);
1280        leader_pg_election.leader_action().await.unwrap();
1281        let new_lease = leader_pg_election
1282            .get_value_with_lease(&leader_pg_election.election_key())
1283            .await
1284            .unwrap()
1285            .unwrap();
1286        assert!(new_lease.leader_value == leader_value);
1287        assert!(
1288            new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1289        );
1290        assert!(leader_pg_election.is_leader());
1291
1292        // Step 3: Something wrong, the leader lease expired.
1293        tokio::time::sleep(Duration::from_secs(2)).await;
1294
1295        let res = leader_pg_election
1296            .pg_client
1297            .read()
1298            .await
1299            .query(&leader_pg_election.sql_set.campaign, &[])
1300            .await
1301            .unwrap();
1302        let res: bool = res[0].get(0);
1303        assert!(res);
1304        leader_pg_election.leader_action().await.unwrap();
1305        let res = leader_pg_election
1306            .get_value_with_lease(&leader_pg_election.election_key())
1307            .await
1308            .unwrap();
1309        assert!(res.is_none());
1310
1311        match rx.recv().await {
1312            Ok(LeaderChangeMessage::StepDown(key)) => {
1313                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1314                assert_eq!(
1315                    String::from_utf8_lossy(key.key()),
1316                    leader_pg_election.election_key()
1317                );
1318                assert_eq!(key.lease_id(), i64::default());
1319                assert_eq!(key.revision(), i64::default());
1320            }
1321            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1322        }
1323
1324        // Step 4: Re-campaign and elected.
1325        let res = leader_pg_election
1326            .pg_client
1327            .read()
1328            .await
1329            .query(&leader_pg_election.sql_set.campaign, &[])
1330            .await
1331            .unwrap();
1332        let res: bool = res[0].get(0);
1333        assert!(res);
1334        leader_pg_election.leader_action().await.unwrap();
1335        let lease = leader_pg_election
1336            .get_value_with_lease(&leader_pg_election.election_key())
1337            .await
1338            .unwrap()
1339            .unwrap();
1340        assert!(lease.leader_value == leader_value);
1341        assert!(lease.expire_time > lease.current);
1342        assert!(leader_pg_election.is_leader());
1343
1344        match rx.recv().await {
1345            Ok(LeaderChangeMessage::Elected(key)) => {
1346                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1347                assert_eq!(
1348                    String::from_utf8_lossy(key.key()),
1349                    leader_pg_election.election_key()
1350                );
1351                assert_eq!(key.lease_id(), i64::default());
1352                assert_eq!(key.revision(), i64::default());
1353            }
1354            _ => panic!("Expected LeaderChangeMessage::Elected"),
1355        }
1356
1357        // Step 5: Something wrong, the leader key is deleted by other followers.
1358        leader_pg_election
1359            .delete_value(&leader_pg_election.election_key())
1360            .await
1361            .unwrap();
1362        leader_pg_election.leader_action().await.unwrap();
1363        let res = leader_pg_election
1364            .get_value_with_lease(&leader_pg_election.election_key())
1365            .await
1366            .unwrap();
1367        assert!(res.is_none());
1368        assert!(!leader_pg_election.is_leader());
1369
1370        match rx.recv().await {
1371            Ok(LeaderChangeMessage::StepDown(key)) => {
1372                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1373                assert_eq!(
1374                    String::from_utf8_lossy(key.key()),
1375                    leader_pg_election.election_key()
1376                );
1377                assert_eq!(key.lease_id(), i64::default());
1378                assert_eq!(key.revision(), i64::default());
1379            }
1380            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1381        }
1382
1383        // Step 6: Re-campaign and elected.
1384        let res = leader_pg_election
1385            .pg_client
1386            .read()
1387            .await
1388            .query(&leader_pg_election.sql_set.campaign, &[])
1389            .await
1390            .unwrap();
1391        let res: bool = res[0].get(0);
1392        assert!(res);
1393        leader_pg_election.leader_action().await.unwrap();
1394        let lease = leader_pg_election
1395            .get_value_with_lease(&leader_pg_election.election_key())
1396            .await
1397            .unwrap()
1398            .unwrap();
1399        assert!(lease.leader_value == leader_value);
1400        assert!(lease.expire_time > lease.current);
1401        assert!(leader_pg_election.is_leader());
1402
1403        match rx.recv().await {
1404            Ok(LeaderChangeMessage::Elected(key)) => {
1405                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1406                assert_eq!(
1407                    String::from_utf8_lossy(key.key()),
1408                    leader_pg_election.election_key()
1409                );
1410                assert_eq!(key.lease_id(), i64::default());
1411                assert_eq!(key.revision(), i64::default());
1412            }
1413            _ => panic!("Expected LeaderChangeMessage::Elected"),
1414        }
1415
1416        // Step 7: Something wrong, the leader key changed by others.
1417        let res = leader_pg_election
1418            .pg_client
1419            .read()
1420            .await
1421            .query(&leader_pg_election.sql_set.campaign, &[])
1422            .await
1423            .unwrap();
1424        let res: bool = res[0].get(0);
1425        assert!(res);
1426        leader_pg_election
1427            .delete_value(&leader_pg_election.election_key())
1428            .await
1429            .unwrap();
1430        leader_pg_election
1431            .put_value_with_lease(
1432                &leader_pg_election.election_key(),
1433                "test",
1434                Duration::from_secs(10),
1435            )
1436            .await
1437            .unwrap();
1438        leader_pg_election.leader_action().await.unwrap();
1439        let res = leader_pg_election
1440            .get_value_with_lease(&leader_pg_election.election_key())
1441            .await
1442            .unwrap();
1443        assert!(res.is_none());
1444        assert!(!leader_pg_election.is_leader());
1445
1446        match rx.recv().await {
1447            Ok(LeaderChangeMessage::StepDown(key)) => {
1448                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1449                assert_eq!(
1450                    String::from_utf8_lossy(key.key()),
1451                    leader_pg_election.election_key()
1452                );
1453                assert_eq!(key.lease_id(), i64::default());
1454                assert_eq!(key.revision(), i64::default());
1455            }
1456            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1457        }
1458
1459        // Clean up
1460        leader_pg_election
1461            .pg_client
1462            .read()
1463            .await
1464            .query(&leader_pg_election.sql_set.step_down, &[])
1465            .await
1466            .unwrap();
1467
1468        drop_table(&leader_pg_election, table_name).await;
1469    }
1470
1471    #[tokio::test]
1472    async fn test_follower_action() {
1473        maybe_skip_postgres_integration_test!();
1474        common_telemetry::init_default_ut_logging();
1475        let uuid = uuid::Uuid::new_v4().to_string();
1476        let table_name = "test_follower_action_greptime_metakv";
1477
1478        let candidate_lease_ttl = Duration::from_secs(5);
1479        let execution_timeout = Duration::from_secs(10);
1480        let statement_timeout = Duration::from_secs(10);
1481        let meta_lease_ttl = Duration::from_secs(2);
1482        let idle_session_timeout = Duration::from_secs(0);
1483        let follower_client = create_postgres_client(
1484            Some(table_name),
1485            execution_timeout,
1486            idle_session_timeout,
1487            statement_timeout,
1488        )
1489        .await
1490        .unwrap();
1491        let (tx, mut rx) = broadcast::channel(100);
1492        let follower_pg_election = PgElection {
1493            leader_value: "test_follower".to_string(),
1494            pg_client: RwLock::new(follower_client),
1495            is_leader: AtomicBool::new(false),
1496            leader_infancy: AtomicBool::new(true),
1497            leader_watcher: tx,
1498            store_key_prefix: uuid.clone(),
1499            candidate_lease_ttl,
1500            meta_lease_ttl,
1501            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1502        };
1503
1504        let leader_client = create_postgres_client(
1505            Some(table_name),
1506            execution_timeout,
1507            idle_session_timeout,
1508            statement_timeout,
1509        )
1510        .await
1511        .unwrap();
1512        let (tx, _) = broadcast::channel(100);
1513        let leader_pg_election = PgElection {
1514            leader_value: "test_leader".to_string(),
1515            pg_client: RwLock::new(leader_client),
1516            is_leader: AtomicBool::new(false),
1517            leader_infancy: AtomicBool::new(true),
1518            leader_watcher: tx,
1519            store_key_prefix: uuid,
1520            candidate_lease_ttl,
1521            meta_lease_ttl,
1522            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1523        };
1524
1525        leader_pg_election
1526            .pg_client
1527            .read()
1528            .await
1529            .query(&leader_pg_election.sql_set.campaign, &[])
1530            .await
1531            .unwrap();
1532        leader_pg_election.elected().await.unwrap();
1533
1534        // Step 1: As a follower, the leader exists and the lease is not expired.
1535        follower_pg_election.follower_action().await.unwrap();
1536
1537        // Step 2: As a follower, the leader exists but the lease expired.
1538        tokio::time::sleep(Duration::from_secs(2)).await;
1539        assert!(follower_pg_election.follower_action().await.is_err());
1540
1541        // Step 3: As a follower, the leader does not exist.
1542        leader_pg_election
1543            .delete_value(&leader_pg_election.election_key())
1544            .await
1545            .unwrap();
1546        assert!(follower_pg_election.follower_action().await.is_err());
1547
1548        // Step 4: Follower thinks it's the leader but failed to acquire the lock.
1549        follower_pg_election
1550            .is_leader
1551            .store(true, Ordering::Relaxed);
1552        assert!(follower_pg_election.follower_action().await.is_err());
1553
1554        match rx.recv().await {
1555            Ok(LeaderChangeMessage::StepDown(key)) => {
1556                assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1557                assert_eq!(
1558                    String::from_utf8_lossy(key.key()),
1559                    follower_pg_election.election_key()
1560                );
1561                assert_eq!(key.lease_id(), i64::default());
1562                assert_eq!(key.revision(), i64::default());
1563            }
1564            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1565        }
1566
1567        // Clean up
1568        leader_pg_election
1569            .pg_client
1570            .read()
1571            .await
1572            .query(&leader_pg_election.sql_set.step_down, &[])
1573            .await
1574            .unwrap();
1575
1576        drop_table(&follower_pg_election, table_name).await;
1577    }
1578
1579    #[tokio::test]
1580    async fn test_idle_session_timeout() {
1581        maybe_skip_postgres_integration_test!();
1582        common_telemetry::init_default_ut_logging();
1583        let execution_timeout = Duration::from_secs(10);
1584        let statement_timeout = Duration::from_secs(10);
1585        let idle_session_timeout = Duration::from_secs(1);
1586        let mut client = create_postgres_client(
1587            None,
1588            execution_timeout,
1589            idle_session_timeout,
1590            statement_timeout,
1591        )
1592        .await
1593        .unwrap();
1594        tokio::time::sleep(Duration::from_millis(1100)).await;
1595        // Wait for the idle session timeout.
1596        let err = client.query("SELECT 1", &[]).await.unwrap_err();
1597        assert_matches!(err, error::Error::PostgresExecution { .. });
1598        let error::Error::PostgresExecution { error, .. } = err else {
1599            panic!("Expected PostgresExecution error");
1600        };
1601        assert!(error.is_closed());
1602        // Reset the client and try again.
1603        client.reset_client().await.unwrap();
1604        let _ = client.query("SELECT 1", &[]).await.unwrap();
1605    }
1606
1607    #[test]
1608    fn test_election_sql_with_schema() {
1609        let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1610        let s = f.build();
1611        assert!(s.campaign.contains("pg_try_advisory_lock"));
1612        assert!(
1613            s.put_value_with_lease
1614                .contains("\"test_schema\".\"greptime_metakv\"")
1615        );
1616        assert!(
1617            s.update_value_with_lease
1618                .contains("\"test_schema\".\"greptime_metakv\"")
1619        );
1620        assert!(
1621            s.get_value_with_lease
1622                .contains("\"test_schema\".\"greptime_metakv\"")
1623        );
1624        assert!(
1625            s.get_value_with_lease_by_prefix
1626                .contains("\"test_schema\".\"greptime_metakv\"")
1627        );
1628        assert!(
1629            s.delete_value
1630                .contains("\"test_schema\".\"greptime_metakv\"")
1631        );
1632    }
1633}