meta_srv/election/rds/
postgres.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, warn};
21use common_time::Timestamp;
22use deadpool_postgres::{Manager, Pool};
23use snafu::{ensure, OptionExt, ResultExt};
24use tokio::sync::{broadcast, RwLock};
25use tokio::time::MissedTickBehavior;
26use tokio_postgres::types::ToSql;
27use tokio_postgres::Row;
28
29use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
30use crate::election::{
31    listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
32};
33use crate::error::{
34    DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35    Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40    lock_id: u64,
41    schema_name: Option<&'a str>,
42    table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46    campaign: String,
47    step_down: String,
48    // SQL to put a value with expire time.
49    //
50    // Parameters for the query:
51    // `$1`: key,
52    // `$2`: value,
53    // `$3`: lease time in seconds
54    //
55    // Returns:
56    // If the key already exists, return the previous value.
57    put_value_with_lease: String,
58    // SQL to update a value with expire time.
59    //
60    // Parameters for the query:
61    // `$1`: key,
62    // `$2`: previous value,
63    // `$3`: updated value,
64    // `$4`: lease time in seconds
65    update_value_with_lease: String,
66    // SQL to get a value with expire time.
67    //
68    // Parameters:
69    // `$1`: key
70    get_value_with_lease: String,
71    // SQL to get all values with expire time with the given key prefix.
72    //
73    // Parameters:
74    // `$1`: key prefix like 'prefix%'
75    //
76    // Returns:
77    // column 0: value,
78    // column 1: current timestamp
79    get_value_with_lease_by_prefix: String,
80    // SQL to delete a value.
81    //
82    // Parameters:
83    // `$1`: key
84    //
85    // Returns:
86    // column 0: key deleted,
87    // column 1: value deleted
88    delete_value: String,
89}
90
91impl<'a> ElectionSqlFactory<'a> {
92    fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
93        Self {
94            lock_id,
95            schema_name,
96            table_name,
97        }
98    }
99
100    fn table_ident(&self) -> String {
101        match self.schema_name {
102            Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
103            _ => format!("\"{}\"", self.table_name),
104        }
105    }
106
107    fn build(self) -> ElectionSqlSet {
108        ElectionSqlSet {
109            campaign: self.campaign_sql(),
110            step_down: self.step_down_sql(),
111            put_value_with_lease: self.put_value_with_lease_sql(),
112            update_value_with_lease: self.update_value_with_lease_sql(),
113            get_value_with_lease: self.get_value_with_lease_sql(),
114            get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
115            delete_value: self.delete_value_sql(),
116        }
117    }
118
119    fn campaign_sql(&self) -> String {
120        format!("SELECT pg_try_advisory_lock({})", self.lock_id)
121    }
122
123    fn step_down_sql(&self) -> String {
124        format!("SELECT pg_advisory_unlock({})", self.lock_id)
125    }
126
127    fn put_value_with_lease_sql(&self) -> String {
128        let table = self.table_ident();
129        format!(
130            r#"WITH prev AS (
131                SELECT k, v FROM {table} WHERE k = $1
132            ), insert AS (
133                INSERT INTO {table}
134                VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
135                ON CONFLICT (k) DO NOTHING
136            )
137            SELECT k, v FROM prev;
138            "#,
139            table = table,
140            lease_sep = LEASE_SEP
141        )
142    }
143
144    fn update_value_with_lease_sql(&self) -> String {
145        let table = self.table_ident();
146        format!(
147            r#"UPDATE {table}
148               SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
149               WHERE k = $1 AND v = $2"#,
150            table = table,
151            lease_sep = LEASE_SEP
152        )
153    }
154
155    fn get_value_with_lease_sql(&self) -> String {
156        let table = self.table_ident();
157        format!(
158            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
159            table = table
160        )
161    }
162
163    fn get_value_with_lease_by_prefix_sql(&self) -> String {
164        let table = self.table_ident();
165        format!(
166            r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
167            table = table
168        )
169    }
170
171    fn delete_value_sql(&self) -> String {
172        let table = self.table_ident();
173        format!(
174            "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
175            table = table
176        )
177    }
178}
179
180/// PgClient for election.
181pub struct ElectionPgClient {
182    current: Option<deadpool::managed::Object<Manager>>,
183    pool: Pool,
184    /// The client-side timeout for statement execution.
185    ///
186    /// This timeout is enforced by the client application and is independent of any server-side timeouts.
187    /// If a statement takes longer than this duration to execute, the client will abort the operation.
188    execution_timeout: Duration,
189
190    /// The idle session timeout.
191    ///
192    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
193    /// If a session remains idle for longer than this duration, the server will terminate it.
194    idle_session_timeout: Duration,
195
196    /// The statement timeout.
197    ///
198    /// This timeout is configured per client session and is enforced by the PostgreSQL server.
199    /// If a statement takes longer than this duration to execute, the server will abort it.
200    statement_timeout: Duration,
201}
202
203impl ElectionPgClient {
204    pub fn new(
205        pool: Pool,
206        execution_timeout: Duration,
207        idle_session_timeout: Duration,
208        statement_timeout: Duration,
209    ) -> Result<ElectionPgClient> {
210        Ok(ElectionPgClient {
211            current: None,
212            pool,
213            execution_timeout,
214            idle_session_timeout,
215            statement_timeout,
216        })
217    }
218
219    fn set_idle_session_timeout_sql(&self) -> String {
220        format!(
221            "SET idle_session_timeout = '{}s';",
222            self.idle_session_timeout.as_secs()
223        )
224    }
225
226    fn set_statement_timeout_sql(&self) -> String {
227        format!(
228            "SET statement_timeout = '{}s';",
229            self.statement_timeout.as_secs()
230        )
231    }
232
233    async fn reset_client(&mut self) -> Result<()> {
234        self.current = None;
235        self.maybe_init_client().await
236    }
237
238    async fn maybe_init_client(&mut self) -> Result<()> {
239        if self.current.is_none() {
240            let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
241
242            self.current = Some(client);
243            // Set idle session timeout and statement timeout.
244            let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
245            self.execute(&idle_session_timeout_sql, &[]).await?;
246            let statement_timeout_sql = self.set_statement_timeout_sql();
247            self.execute(&statement_timeout_sql, &[]).await?;
248        }
249
250        Ok(())
251    }
252
253    /// Returns the result of the query.
254    ///
255    /// # Panics
256    /// if `current` is `None`.
257    async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
258        let result = tokio::time::timeout(
259            self.execution_timeout,
260            self.current.as_ref().unwrap().execute(sql, params),
261        )
262        .await
263        .map_err(|_| {
264            SqlExecutionTimeoutSnafu {
265                sql: sql.to_string(),
266                duration: self.execution_timeout,
267            }
268            .build()
269        })?;
270
271        result.context(PostgresExecutionSnafu { sql })
272    }
273
274    /// Returns the result of the query.
275    ///
276    /// # Panics
277    /// if `current` is `None`.
278    async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
279        let result = tokio::time::timeout(
280            self.execution_timeout,
281            self.current.as_ref().unwrap().query(sql, params),
282        )
283        .await
284        .map_err(|_| {
285            SqlExecutionTimeoutSnafu {
286                sql: sql.to_string(),
287                duration: self.execution_timeout,
288            }
289            .build()
290        })?;
291
292        result.context(PostgresExecutionSnafu { sql })
293    }
294}
295
296/// PostgreSql implementation of Election.
297pub struct PgElection {
298    leader_value: String,
299    pg_client: RwLock<ElectionPgClient>,
300    is_leader: AtomicBool,
301    leader_infancy: AtomicBool,
302    leader_watcher: broadcast::Sender<LeaderChangeMessage>,
303    store_key_prefix: String,
304    candidate_lease_ttl: Duration,
305    meta_lease_ttl: Duration,
306    sql_set: ElectionSqlSet,
307}
308
309impl PgElection {
310    async fn maybe_init_client(&self) -> Result<()> {
311        if self.pg_client.read().await.current.is_none() {
312            self.pg_client.write().await.maybe_init_client().await?;
313        }
314
315        Ok(())
316    }
317
318    #[allow(clippy::too_many_arguments)]
319    pub async fn with_pg_client(
320        leader_value: String,
321        pg_client: ElectionPgClient,
322        store_key_prefix: String,
323        candidate_lease_ttl: Duration,
324        meta_lease_ttl: Duration,
325        schema_name: Option<&str>,
326        table_name: &str,
327        lock_id: u64,
328    ) -> Result<ElectionRef> {
329        if let Some(s) = schema_name {
330            common_telemetry::info!("PgElection uses schema: {}", s);
331        } else {
332            common_telemetry::info!("PgElection uses default search_path (no schema provided)");
333        }
334        let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
335
336        let tx = listen_leader_change(leader_value.clone());
337        Ok(Arc::new(Self {
338            leader_value,
339            pg_client: RwLock::new(pg_client),
340            is_leader: AtomicBool::new(false),
341            leader_infancy: AtomicBool::new(false),
342            leader_watcher: tx,
343            store_key_prefix,
344            candidate_lease_ttl,
345            meta_lease_ttl,
346            sql_set: sql_factory.build(),
347        }))
348    }
349
350    fn election_key(&self) -> String {
351        format!("{}{}", self.store_key_prefix, ELECTION_KEY)
352    }
353
354    fn candidate_root(&self) -> String {
355        format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
356    }
357
358    fn candidate_key(&self) -> String {
359        format!("{}{}", self.candidate_root(), self.leader_value)
360    }
361}
362
363#[async_trait::async_trait]
364impl Election for PgElection {
365    type Leader = LeaderValue;
366
367    fn is_leader(&self) -> bool {
368        self.is_leader.load(Ordering::Relaxed)
369    }
370
371    fn in_leader_infancy(&self) -> bool {
372        self.leader_infancy
373            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
374            .is_ok()
375    }
376
377    async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
378        let key = self.candidate_key();
379        let node_info =
380            serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
381                input: format!("{node_info:?}"),
382            })?;
383        let res = self
384            .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
385            .await?;
386        // May registered before, just update the lease.
387        if !res {
388            self.delete_value(&key).await?;
389            self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
390                .await?;
391        }
392
393        // Check if the current lease has expired and renew the lease.
394        let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
395        loop {
396            let _ = keep_alive_interval.tick().await;
397
398            let lease = self
399                .get_value_with_lease(&key)
400                .await?
401                .context(UnexpectedSnafu {
402                    violated: format!("Failed to get lease for key: {:?}", key),
403                })?;
404
405            ensure!(
406                lease.expire_time > lease.current,
407                UnexpectedSnafu {
408                    violated: format!(
409                        "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
410                        lease.expire_time, lease.current, key
411                    ),
412                }
413            );
414
415            // Safety: origin is Some since we are using `get_value_with_lease` with `true`.
416            self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
417                .await?;
418        }
419    }
420
421    async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
422        let key_prefix = self.candidate_root();
423        let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
424        // Remove expired candidates
425        candidates.retain(|c| c.1 > current);
426        let mut valid_candidates = Vec::with_capacity(candidates.len());
427        for (c, _) in candidates {
428            let node_info: MetasrvNodeInfo =
429                serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
430                    input: format!("{:?}", c),
431                })?;
432            valid_candidates.push(node_info);
433        }
434        Ok(valid_candidates)
435    }
436
437    /// Attempts to acquire leadership by executing a campaign. This function continuously checks
438    /// if the current instance can become the leader by acquiring an advisory lock in the PostgreSQL database.
439    ///
440    /// The function operates in a loop, where it:
441    ///
442    /// 1. Waits for a predefined interval before attempting to acquire the lock again.
443    /// 2. Executes the `CAMPAIGN` SQL query to try to acquire the advisory lock.
444    /// 3. Checks the result of the query:
445    ///    - If the lock is successfully acquired (result is true), it calls the `leader_action` method
446    ///      to perform actions as the leader.
447    ///    - If the lock is not acquired (result is false), it calls the `follower_action` method
448    ///      to perform actions as a follower.
449    async fn campaign(&self) -> Result<()> {
450        let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
451        keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
452
453        self.maybe_init_client().await?;
454        loop {
455            let res = self
456                .pg_client
457                .read()
458                .await
459                .query(&self.sql_set.campaign, &[])
460                .await?;
461            let row = res.first().context(UnexpectedSnafu {
462                violated: "Failed to get the result of acquiring advisory lock",
463            })?;
464            let is_leader = row.try_get(0).map_err(|_| {
465                UnexpectedSnafu {
466                    violated: "Failed to get the result of get lock",
467                }
468                .build()
469            })?;
470            if is_leader {
471                self.leader_action().await?;
472            } else {
473                self.follower_action().await?;
474            }
475            let _ = keep_alive_interval.tick().await;
476        }
477    }
478
479    async fn reset_campaign(&self) {
480        if let Err(err) = self.pg_client.write().await.reset_client().await {
481            error!(err; "Failed to reset client");
482        }
483    }
484
485    async fn leader(&self) -> Result<Self::Leader> {
486        if self.is_leader.load(Ordering::Relaxed) {
487            Ok(self.leader_value.as_bytes().into())
488        } else {
489            let key = self.election_key();
490            if let Some(lease) = self.get_value_with_lease(&key).await? {
491                ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
492                Ok(lease.leader_value.as_bytes().into())
493            } else {
494                NoLeaderSnafu.fail()
495            }
496        }
497    }
498
499    async fn resign(&self) -> Result<()> {
500        todo!()
501    }
502
503    fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
504        self.leader_watcher.subscribe()
505    }
506}
507
508impl PgElection {
509    /// Returns value, expire time and current time. If `with_origin` is true, the origin string is also returned.
510    async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
511        let key = key.as_bytes();
512        self.maybe_init_client().await?;
513        let res = self
514            .pg_client
515            .read()
516            .await
517            .query(&self.sql_set.get_value_with_lease, &[&key])
518            .await?;
519
520        if res.is_empty() {
521            Ok(None)
522        } else {
523            // Safety: Checked if res is empty above.
524            let current_time_str = res[0].try_get(1).unwrap_or_default();
525            let current_time = match Timestamp::from_str(current_time_str, None) {
526                Ok(ts) => ts,
527                Err(_) => UnexpectedSnafu {
528                    violated: format!("Invalid timestamp: {}", current_time_str),
529                }
530                .fail()?,
531            };
532            // Safety: Checked if res is empty above.
533            let value_and_expire_time =
534                String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
535            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
536
537            Ok(Some(Lease {
538                leader_value: value,
539                expire_time,
540                current: current_time,
541                origin: value_and_expire_time.to_string(),
542            }))
543        }
544    }
545
546    /// Returns all values and expire time with the given key prefix. Also returns the current time.
547    async fn get_value_with_lease_by_prefix(
548        &self,
549        key_prefix: &str,
550    ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
551        let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
552        self.maybe_init_client().await?;
553        let res = self
554            .pg_client
555            .read()
556            .await
557            .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
558            .await?;
559
560        let mut values_with_leases = vec![];
561        let mut current = Timestamp::default();
562        for row in res {
563            let current_time_str = row.try_get(1).unwrap_or_default();
564            current = match Timestamp::from_str(current_time_str, None) {
565                Ok(ts) => ts,
566                Err(_) => UnexpectedSnafu {
567                    violated: format!("Invalid timestamp: {}", current_time_str),
568                }
569                .fail()?,
570            };
571
572            let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
573            let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
574
575            values_with_leases.push((value, expire_time));
576        }
577        Ok((values_with_leases, current))
578    }
579
580    async fn update_value_with_lease(
581        &self,
582        key: &str,
583        prev: &str,
584        updated: &str,
585        lease_ttl: Duration,
586    ) -> Result<()> {
587        let key = key.as_bytes();
588        let prev = prev.as_bytes();
589        self.maybe_init_client().await?;
590        let lease_ttl_secs = lease_ttl.as_secs() as f64;
591        let res = self
592            .pg_client
593            .read()
594            .await
595            .execute(
596                &self.sql_set.update_value_with_lease,
597                &[&key, &prev, &updated, &lease_ttl_secs],
598            )
599            .await?;
600
601        ensure!(
602            res == 1,
603            UnexpectedSnafu {
604                violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
605            }
606        );
607
608        Ok(())
609    }
610
611    /// Returns `true` if the insertion is successful
612    async fn put_value_with_lease(
613        &self,
614        key: &str,
615        value: &str,
616        lease_ttl: Duration,
617    ) -> Result<bool> {
618        let key = key.as_bytes();
619        let lease_ttl_secs = lease_ttl.as_secs() as f64;
620        let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
621        self.maybe_init_client().await?;
622        let res = self
623            .pg_client
624            .read()
625            .await
626            .query(&self.sql_set.put_value_with_lease, &params)
627            .await?;
628        Ok(res.is_empty())
629    }
630
631    /// Returns `true` if the deletion is successful.
632    /// Caution: Should only delete the key if the lease is expired.
633    async fn delete_value(&self, key: &str) -> Result<bool> {
634        let key = key.as_bytes();
635        self.maybe_init_client().await?;
636        let res = self
637            .pg_client
638            .read()
639            .await
640            .query(&self.sql_set.delete_value, &[&key])
641            .await?;
642
643        Ok(res.len() == 1)
644    }
645
646    /// Handles the actions of a leader in the election process.
647    ///
648    /// This function performs the following checks and actions:
649    ///
650    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
651    ///   it attempts to renew the lease. It checks if the lease is still valid and either renews it
652    ///   or steps down if it has expired.
653    ///
654    ///   - **Case 1.1**: If the instance is still the leader and the lease is valid, it renews the lease
655    ///     by updating the value associated with the election key.
656    ///   - **Case 1.2**: If the instance is still the leader but the lease has expired, it logs a warning
657    ///     and steps down, initiating a new campaign for leadership.
658    ///   - **Case 1.3**: If the instance is not the leader (which is a rare scenario), it logs a warning
659    ///     indicating that it still holds the lock and steps down to re-initiate the campaign. This may
660    ///     happen if the leader has failed to renew the lease and the session has expired, and recovery
661    ///     after a period of time during which other leaders have been elected and stepped down.
662    ///   - **Case 1.4**: If no lease information is found, it also steps down and re-initiates the campaign.
663    ///
664    /// - **Case 2**: If the current instance is not leader previously, it calls the `elected` method
665    ///   as a newly elected leader.
666    async fn leader_action(&self) -> Result<()> {
667        let key = self.election_key();
668        // Case 1
669        if self.is_leader() {
670            match self.get_value_with_lease(&key).await? {
671                Some(lease) => {
672                    match (
673                        lease.leader_value == self.leader_value,
674                        lease.expire_time > lease.current,
675                    ) {
676                        // Case 1.1
677                        (true, true) => {
678                            // Safety: prev is Some since we are using `get_value_with_lease` with `true`.
679                            self.update_value_with_lease(
680                                &key,
681                                &lease.origin,
682                                &self.leader_value,
683                                self.meta_lease_ttl,
684                            )
685                            .await?;
686                        }
687                        // Case 1.2
688                        (true, false) => {
689                            warn!("Leader lease expired, now stepping down.");
690                            self.step_down().await?;
691                        }
692                        // Case 1.3
693                        (false, _) => {
694                            warn!("Leader lease not found, but still hold the lock. Now stepping down.");
695                            self.step_down().await?;
696                        }
697                    }
698                }
699                // Case 1.4
700                None => {
701                    warn!("Leader lease not found, but still hold the lock. Now stepping down.");
702                    self.step_down().await?;
703                }
704            }
705        // Case 2
706        } else {
707            self.elected().await?;
708        }
709        Ok(())
710    }
711
712    /// Handles the actions of a follower in the election process.
713    ///
714    /// This function performs the following checks and actions:
715    ///
716    /// - **Case 1**: If the current instance believes it is the leader from the previous term,
717    ///   it steps down without deleting the key.
718    /// - **Case 2**: If the current instance is not the leader but the lease has expired, it raises an error
719    ///   to re-initiate the campaign. If the leader failed to renew the lease, its session will expire and the lock
720    ///   will be released.
721    /// - **Case 3**: If all checks pass, the function returns without performing any actions.
722    async fn follower_action(&self) -> Result<()> {
723        let key = self.election_key();
724        // Case 1
725        if self.is_leader() {
726            self.step_down_without_lock().await?;
727        }
728        let lease = self
729            .get_value_with_lease(&key)
730            .await?
731            .context(NoLeaderSnafu)?;
732        // Case 2
733        ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
734        // Case 3
735        Ok(())
736    }
737
738    /// Step down the leader. The leader should delete the key and notify the leader watcher.
739    ///
740    /// __DO NOT__ check if the deletion is successful, since the key may be deleted by others elected.
741    ///
742    /// ## Caution:
743    /// Should only step down while holding the advisory lock.
744    async fn step_down(&self) -> Result<()> {
745        let key = self.election_key();
746        let leader_key = RdsLeaderKey {
747            name: self.leader_value.clone().into_bytes(),
748            key: key.clone().into_bytes(),
749            ..Default::default()
750        };
751        self.delete_value(&key).await?;
752        self.maybe_init_client().await?;
753        self.pg_client
754            .read()
755            .await
756            .query(&self.sql_set.step_down, &[])
757            .await?;
758        send_leader_change_and_set_flags(
759            &self.is_leader,
760            &self.leader_infancy,
761            &self.leader_watcher,
762            LeaderChangeMessage::StepDown(Arc::new(leader_key)),
763        );
764        Ok(())
765    }
766
767    /// Still consider itself as the leader locally but failed to acquire the lock. Step down without deleting the key.
768    async fn step_down_without_lock(&self) -> Result<()> {
769        let key = self.election_key().into_bytes();
770        let leader_key = RdsLeaderKey {
771            name: self.leader_value.clone().into_bytes(),
772            key: key.clone(),
773            ..Default::default()
774        };
775        if self
776            .is_leader
777            .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
778            .is_ok()
779        {
780            if let Err(e) = self
781                .leader_watcher
782                .send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
783            {
784                error!(e; "Failed to send leader change message");
785            }
786        }
787        Ok(())
788    }
789
790    /// Elected as leader. The leader should put the key and notify the leader watcher.
791    /// Caution: Should only elected while holding the advisory lock.
792    async fn elected(&self) -> Result<()> {
793        let key = self.election_key();
794        let leader_key = RdsLeaderKey {
795            name: self.leader_value.clone().into_bytes(),
796            key: key.clone().into_bytes(),
797            ..Default::default()
798        };
799        self.delete_value(&key).await?;
800        self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
801            .await?;
802
803        if self
804            .is_leader
805            .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
806            .is_ok()
807        {
808            self.leader_infancy.store(true, Ordering::Release);
809
810            if let Err(e) = self
811                .leader_watcher
812                .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
813            {
814                error!(e; "Failed to send leader change message");
815            }
816        }
817        Ok(())
818    }
819}
820
821#[cfg(test)]
822mod tests {
823    use std::assert_matches::assert_matches;
824    use std::env;
825
826    use common_meta::maybe_skip_postgres_integration_test;
827
828    use super::*;
829    use crate::bootstrap::create_postgres_pool;
830    use crate::error;
831
832    async fn create_postgres_client(
833        table_name: Option<&str>,
834        execution_timeout: Duration,
835        idle_session_timeout: Duration,
836        statement_timeout: Duration,
837    ) -> Result<ElectionPgClient> {
838        let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
839        if endpoint.is_empty() {
840            return UnexpectedSnafu {
841                violated: "Postgres endpoint is empty".to_string(),
842            }
843            .fail();
844        }
845        let pool = create_postgres_pool(&[endpoint], None).await.unwrap();
846        let mut pg_client = ElectionPgClient::new(
847            pool,
848            execution_timeout,
849            idle_session_timeout,
850            statement_timeout,
851        )
852        .unwrap();
853        pg_client.maybe_init_client().await?;
854        if let Some(table_name) = table_name {
855            let create_table_sql = format!(
856                "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
857                table_name
858            );
859            pg_client.execute(&create_table_sql, &[]).await?;
860        }
861        Ok(pg_client)
862    }
863
864    async fn drop_table(pg_election: &PgElection, table_name: &str) {
865        let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
866        pg_election
867            .pg_client
868            .read()
869            .await
870            .execute(&sql, &[])
871            .await
872            .unwrap();
873    }
874
875    #[tokio::test]
876    async fn test_postgres_crud() {
877        maybe_skip_postgres_integration_test!();
878        let key = "test_key".to_string();
879        let value = "test_value".to_string();
880
881        let uuid = uuid::Uuid::new_v4().to_string();
882        let table_name = "test_postgres_crud_greptime_metakv";
883        let candidate_lease_ttl = Duration::from_secs(10);
884        let execution_timeout = Duration::from_secs(10);
885        let statement_timeout = Duration::from_secs(10);
886        let meta_lease_ttl = Duration::from_secs(2);
887        let idle_session_timeout = Duration::from_secs(0);
888        let client = create_postgres_client(
889            Some(table_name),
890            execution_timeout,
891            idle_session_timeout,
892            statement_timeout,
893        )
894        .await
895        .unwrap();
896
897        let (tx, _) = broadcast::channel(100);
898        let pg_election = PgElection {
899            leader_value: "test_leader".to_string(),
900            pg_client: RwLock::new(client),
901            is_leader: AtomicBool::new(false),
902            leader_infancy: AtomicBool::new(true),
903            leader_watcher: tx,
904            store_key_prefix: uuid,
905            candidate_lease_ttl,
906            meta_lease_ttl,
907            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
908        };
909
910        let res = pg_election
911            .put_value_with_lease(&key, &value, candidate_lease_ttl)
912            .await
913            .unwrap();
914        assert!(res);
915
916        let lease = pg_election
917            .get_value_with_lease(&key)
918            .await
919            .unwrap()
920            .unwrap();
921        assert_eq!(lease.leader_value, value);
922
923        pg_election
924            .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
925            .await
926            .unwrap();
927
928        let res = pg_election.delete_value(&key).await.unwrap();
929        assert!(res);
930
931        let res = pg_election.get_value_with_lease(&key).await.unwrap();
932        assert!(res.is_none());
933
934        for i in 0..10 {
935            let key = format!("test_key_{}", i);
936            let value = format!("test_value_{}", i);
937            pg_election
938                .put_value_with_lease(&key, &value, candidate_lease_ttl)
939                .await
940                .unwrap();
941        }
942
943        let key_prefix = "test_key".to_string();
944        let (res, _) = pg_election
945            .get_value_with_lease_by_prefix(&key_prefix)
946            .await
947            .unwrap();
948        assert_eq!(res.len(), 10);
949
950        for i in 0..10 {
951            let key = format!("test_key_{}", i);
952            let res = pg_election.delete_value(&key).await.unwrap();
953            assert!(res);
954        }
955
956        let (res, current) = pg_election
957            .get_value_with_lease_by_prefix(&key_prefix)
958            .await
959            .unwrap();
960        assert!(res.is_empty());
961        assert!(current == Timestamp::default());
962
963        drop_table(&pg_election, table_name).await;
964    }
965
966    async fn candidate(
967        leader_value: String,
968        candidate_lease_ttl: Duration,
969        store_key_prefix: String,
970        table_name: String,
971    ) {
972        let execution_timeout = Duration::from_secs(10);
973        let statement_timeout = Duration::from_secs(10);
974        let meta_lease_ttl = Duration::from_secs(2);
975        let idle_session_timeout = Duration::from_secs(0);
976        let client = create_postgres_client(
977            None,
978            execution_timeout,
979            idle_session_timeout,
980            statement_timeout,
981        )
982        .await
983        .unwrap();
984
985        let (tx, _) = broadcast::channel(100);
986        let pg_election = PgElection {
987            leader_value,
988            pg_client: RwLock::new(client),
989            is_leader: AtomicBool::new(false),
990            leader_infancy: AtomicBool::new(true),
991            leader_watcher: tx,
992            store_key_prefix,
993            candidate_lease_ttl,
994            meta_lease_ttl,
995            sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
996        };
997
998        let node_info = MetasrvNodeInfo {
999            addr: "test_addr".to_string(),
1000            version: "test_version".to_string(),
1001            git_commit: "test_git_commit".to_string(),
1002            start_time_ms: 0,
1003        };
1004        pg_election.register_candidate(&node_info).await.unwrap();
1005    }
1006
1007    #[tokio::test]
1008    async fn test_candidate_registration() {
1009        maybe_skip_postgres_integration_test!();
1010        let leader_value_prefix = "test_leader".to_string();
1011        let uuid = uuid::Uuid::new_v4().to_string();
1012        let table_name = "test_candidate_registration_greptime_metakv";
1013        let mut handles = vec![];
1014        let candidate_lease_ttl = Duration::from_secs(5);
1015        let execution_timeout = Duration::from_secs(10);
1016        let statement_timeout = Duration::from_secs(10);
1017        let meta_lease_ttl = Duration::from_secs(2);
1018        let idle_session_timeout = Duration::from_secs(0);
1019        let client = create_postgres_client(
1020            Some(table_name),
1021            execution_timeout,
1022            idle_session_timeout,
1023            statement_timeout,
1024        )
1025        .await
1026        .unwrap();
1027
1028        for i in 0..10 {
1029            let leader_value = format!("{}{}", leader_value_prefix, i);
1030            let handle = tokio::spawn(candidate(
1031                leader_value,
1032                candidate_lease_ttl,
1033                uuid.clone(),
1034                table_name.to_string(),
1035            ));
1036            handles.push(handle);
1037        }
1038        // Wait for candidates to registrate themselves and renew their leases at least once.
1039        tokio::time::sleep(Duration::from_secs(3)).await;
1040
1041        let (tx, _) = broadcast::channel(100);
1042        let leader_value = "test_leader".to_string();
1043        let pg_election = PgElection {
1044            leader_value,
1045            pg_client: RwLock::new(client),
1046            is_leader: AtomicBool::new(false),
1047            leader_infancy: AtomicBool::new(true),
1048            leader_watcher: tx,
1049            store_key_prefix: uuid.clone(),
1050            candidate_lease_ttl,
1051            meta_lease_ttl,
1052            sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1053        };
1054
1055        let candidates = pg_election.all_candidates().await.unwrap();
1056        assert_eq!(candidates.len(), 10);
1057
1058        for handle in handles {
1059            handle.abort();
1060        }
1061
1062        // Wait for the candidate leases to expire.
1063        tokio::time::sleep(Duration::from_secs(5)).await;
1064        let candidates = pg_election.all_candidates().await.unwrap();
1065        assert!(candidates.is_empty());
1066
1067        // Garbage collection
1068        for i in 0..10 {
1069            let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1070            let res = pg_election.delete_value(&key).await.unwrap();
1071            assert!(res);
1072        }
1073
1074        drop_table(&pg_election, table_name).await;
1075    }
1076
1077    #[tokio::test]
1078    async fn test_elected_and_step_down() {
1079        maybe_skip_postgres_integration_test!();
1080        let leader_value = "test_leader".to_string();
1081        let uuid = uuid::Uuid::new_v4().to_string();
1082        let table_name = "test_elected_and_step_down_greptime_metakv";
1083        let candidate_lease_ttl = Duration::from_secs(5);
1084        let execution_timeout = Duration::from_secs(10);
1085        let statement_timeout = Duration::from_secs(10);
1086        let meta_lease_ttl = Duration::from_secs(2);
1087        let idle_session_timeout = Duration::from_secs(0);
1088        let client = create_postgres_client(
1089            Some(table_name),
1090            execution_timeout,
1091            idle_session_timeout,
1092            statement_timeout,
1093        )
1094        .await
1095        .unwrap();
1096
1097        let (tx, mut rx) = broadcast::channel(100);
1098        let leader_pg_election = PgElection {
1099            leader_value: leader_value.clone(),
1100            pg_client: RwLock::new(client),
1101            is_leader: AtomicBool::new(false),
1102            leader_infancy: AtomicBool::new(true),
1103            leader_watcher: tx,
1104            store_key_prefix: uuid,
1105            candidate_lease_ttl,
1106            meta_lease_ttl,
1107            sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1108        };
1109
1110        leader_pg_election.elected().await.unwrap();
1111        let lease = leader_pg_election
1112            .get_value_with_lease(&leader_pg_election.election_key())
1113            .await
1114            .unwrap()
1115            .unwrap();
1116        assert!(lease.leader_value == leader_value);
1117        assert!(lease.expire_time > lease.current);
1118        assert!(leader_pg_election.is_leader());
1119
1120        match rx.recv().await {
1121            Ok(LeaderChangeMessage::Elected(key)) => {
1122                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1123                assert_eq!(
1124                    String::from_utf8_lossy(key.key()),
1125                    leader_pg_election.election_key()
1126                );
1127                assert_eq!(key.lease_id(), i64::default());
1128                assert_eq!(key.revision(), i64::default());
1129            }
1130            _ => panic!("Expected LeaderChangeMessage::Elected"),
1131        }
1132
1133        leader_pg_election.step_down_without_lock().await.unwrap();
1134        let lease = leader_pg_election
1135            .get_value_with_lease(&leader_pg_election.election_key())
1136            .await
1137            .unwrap()
1138            .unwrap();
1139        assert!(lease.leader_value == leader_value);
1140        assert!(!leader_pg_election.is_leader());
1141
1142        match rx.recv().await {
1143            Ok(LeaderChangeMessage::StepDown(key)) => {
1144                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1145                assert_eq!(
1146                    String::from_utf8_lossy(key.key()),
1147                    leader_pg_election.election_key()
1148                );
1149                assert_eq!(key.lease_id(), i64::default());
1150                assert_eq!(key.revision(), i64::default());
1151            }
1152            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1153        }
1154
1155        leader_pg_election.elected().await.unwrap();
1156        let lease = leader_pg_election
1157            .get_value_with_lease(&leader_pg_election.election_key())
1158            .await
1159            .unwrap()
1160            .unwrap();
1161        assert!(lease.leader_value == leader_value);
1162        assert!(lease.expire_time > lease.current);
1163        assert!(leader_pg_election.is_leader());
1164
1165        match rx.recv().await {
1166            Ok(LeaderChangeMessage::Elected(key)) => {
1167                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1168                assert_eq!(
1169                    String::from_utf8_lossy(key.key()),
1170                    leader_pg_election.election_key()
1171                );
1172                assert_eq!(key.lease_id(), i64::default());
1173                assert_eq!(key.revision(), i64::default());
1174            }
1175            _ => panic!("Expected LeaderChangeMessage::Elected"),
1176        }
1177
1178        leader_pg_election.step_down().await.unwrap();
1179        let res = leader_pg_election
1180            .get_value_with_lease(&leader_pg_election.election_key())
1181            .await
1182            .unwrap();
1183        assert!(res.is_none());
1184        assert!(!leader_pg_election.is_leader());
1185
1186        match rx.recv().await {
1187            Ok(LeaderChangeMessage::StepDown(key)) => {
1188                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1189                assert_eq!(
1190                    String::from_utf8_lossy(key.key()),
1191                    leader_pg_election.election_key()
1192                );
1193                assert_eq!(key.lease_id(), i64::default());
1194                assert_eq!(key.revision(), i64::default());
1195            }
1196            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1197        }
1198
1199        drop_table(&leader_pg_election, table_name).await;
1200    }
1201
1202    #[tokio::test]
1203    async fn test_leader_action() {
1204        maybe_skip_postgres_integration_test!();
1205        let leader_value = "test_leader".to_string();
1206        let uuid = uuid::Uuid::new_v4().to_string();
1207        let table_name = "test_leader_action_greptime_metakv";
1208        let candidate_lease_ttl = Duration::from_secs(5);
1209        let execution_timeout = Duration::from_secs(10);
1210        let statement_timeout = Duration::from_secs(10);
1211        let meta_lease_ttl = Duration::from_secs(2);
1212        let idle_session_timeout = Duration::from_secs(0);
1213        let client = create_postgres_client(
1214            Some(table_name),
1215            execution_timeout,
1216            idle_session_timeout,
1217            statement_timeout,
1218        )
1219        .await
1220        .unwrap();
1221
1222        let (tx, mut rx) = broadcast::channel(100);
1223        let leader_pg_election = PgElection {
1224            leader_value: leader_value.clone(),
1225            pg_client: RwLock::new(client),
1226            is_leader: AtomicBool::new(false),
1227            leader_infancy: AtomicBool::new(true),
1228            leader_watcher: tx,
1229            store_key_prefix: uuid,
1230            candidate_lease_ttl,
1231            meta_lease_ttl,
1232            sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1233        };
1234
1235        // Step 1: No leader exists, campaign and elected.
1236        let res = leader_pg_election
1237            .pg_client
1238            .read()
1239            .await
1240            .query(&leader_pg_election.sql_set.campaign, &[])
1241            .await
1242            .unwrap();
1243        let res: bool = res[0].get(0);
1244        assert!(res);
1245        leader_pg_election.leader_action().await.unwrap();
1246        let lease = leader_pg_election
1247            .get_value_with_lease(&leader_pg_election.election_key())
1248            .await
1249            .unwrap()
1250            .unwrap();
1251        assert!(lease.leader_value == leader_value);
1252        assert!(lease.expire_time > lease.current);
1253        assert!(leader_pg_election.is_leader());
1254
1255        match rx.recv().await {
1256            Ok(LeaderChangeMessage::Elected(key)) => {
1257                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1258                assert_eq!(
1259                    String::from_utf8_lossy(key.key()),
1260                    leader_pg_election.election_key()
1261                );
1262                assert_eq!(key.lease_id(), i64::default());
1263                assert_eq!(key.revision(), i64::default());
1264            }
1265            _ => panic!("Expected LeaderChangeMessage::Elected"),
1266        }
1267
1268        // Step 2: As a leader, renew the lease.
1269        let res = leader_pg_election
1270            .pg_client
1271            .read()
1272            .await
1273            .query(&leader_pg_election.sql_set.campaign, &[])
1274            .await
1275            .unwrap();
1276        let res: bool = res[0].get(0);
1277        assert!(res);
1278        leader_pg_election.leader_action().await.unwrap();
1279        let new_lease = leader_pg_election
1280            .get_value_with_lease(&leader_pg_election.election_key())
1281            .await
1282            .unwrap()
1283            .unwrap();
1284        assert!(new_lease.leader_value == leader_value);
1285        assert!(
1286            new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1287        );
1288        assert!(leader_pg_election.is_leader());
1289
1290        // Step 3: Something wrong, the leader lease expired.
1291        tokio::time::sleep(Duration::from_secs(2)).await;
1292
1293        let res = leader_pg_election
1294            .pg_client
1295            .read()
1296            .await
1297            .query(&leader_pg_election.sql_set.campaign, &[])
1298            .await
1299            .unwrap();
1300        let res: bool = res[0].get(0);
1301        assert!(res);
1302        leader_pg_election.leader_action().await.unwrap();
1303        let res = leader_pg_election
1304            .get_value_with_lease(&leader_pg_election.election_key())
1305            .await
1306            .unwrap();
1307        assert!(res.is_none());
1308
1309        match rx.recv().await {
1310            Ok(LeaderChangeMessage::StepDown(key)) => {
1311                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1312                assert_eq!(
1313                    String::from_utf8_lossy(key.key()),
1314                    leader_pg_election.election_key()
1315                );
1316                assert_eq!(key.lease_id(), i64::default());
1317                assert_eq!(key.revision(), i64::default());
1318            }
1319            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1320        }
1321
1322        // Step 4: Re-campaign and elected.
1323        let res = leader_pg_election
1324            .pg_client
1325            .read()
1326            .await
1327            .query(&leader_pg_election.sql_set.campaign, &[])
1328            .await
1329            .unwrap();
1330        let res: bool = res[0].get(0);
1331        assert!(res);
1332        leader_pg_election.leader_action().await.unwrap();
1333        let lease = leader_pg_election
1334            .get_value_with_lease(&leader_pg_election.election_key())
1335            .await
1336            .unwrap()
1337            .unwrap();
1338        assert!(lease.leader_value == leader_value);
1339        assert!(lease.expire_time > lease.current);
1340        assert!(leader_pg_election.is_leader());
1341
1342        match rx.recv().await {
1343            Ok(LeaderChangeMessage::Elected(key)) => {
1344                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1345                assert_eq!(
1346                    String::from_utf8_lossy(key.key()),
1347                    leader_pg_election.election_key()
1348                );
1349                assert_eq!(key.lease_id(), i64::default());
1350                assert_eq!(key.revision(), i64::default());
1351            }
1352            _ => panic!("Expected LeaderChangeMessage::Elected"),
1353        }
1354
1355        // Step 5: Something wrong, the leader key is deleted by other followers.
1356        leader_pg_election
1357            .delete_value(&leader_pg_election.election_key())
1358            .await
1359            .unwrap();
1360        leader_pg_election.leader_action().await.unwrap();
1361        let res = leader_pg_election
1362            .get_value_with_lease(&leader_pg_election.election_key())
1363            .await
1364            .unwrap();
1365        assert!(res.is_none());
1366        assert!(!leader_pg_election.is_leader());
1367
1368        match rx.recv().await {
1369            Ok(LeaderChangeMessage::StepDown(key)) => {
1370                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1371                assert_eq!(
1372                    String::from_utf8_lossy(key.key()),
1373                    leader_pg_election.election_key()
1374                );
1375                assert_eq!(key.lease_id(), i64::default());
1376                assert_eq!(key.revision(), i64::default());
1377            }
1378            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1379        }
1380
1381        // Step 6: Re-campaign and elected.
1382        let res = leader_pg_election
1383            .pg_client
1384            .read()
1385            .await
1386            .query(&leader_pg_election.sql_set.campaign, &[])
1387            .await
1388            .unwrap();
1389        let res: bool = res[0].get(0);
1390        assert!(res);
1391        leader_pg_election.leader_action().await.unwrap();
1392        let lease = leader_pg_election
1393            .get_value_with_lease(&leader_pg_election.election_key())
1394            .await
1395            .unwrap()
1396            .unwrap();
1397        assert!(lease.leader_value == leader_value);
1398        assert!(lease.expire_time > lease.current);
1399        assert!(leader_pg_election.is_leader());
1400
1401        match rx.recv().await {
1402            Ok(LeaderChangeMessage::Elected(key)) => {
1403                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1404                assert_eq!(
1405                    String::from_utf8_lossy(key.key()),
1406                    leader_pg_election.election_key()
1407                );
1408                assert_eq!(key.lease_id(), i64::default());
1409                assert_eq!(key.revision(), i64::default());
1410            }
1411            _ => panic!("Expected LeaderChangeMessage::Elected"),
1412        }
1413
1414        // Step 7: Something wrong, the leader key changed by others.
1415        let res = leader_pg_election
1416            .pg_client
1417            .read()
1418            .await
1419            .query(&leader_pg_election.sql_set.campaign, &[])
1420            .await
1421            .unwrap();
1422        let res: bool = res[0].get(0);
1423        assert!(res);
1424        leader_pg_election
1425            .delete_value(&leader_pg_election.election_key())
1426            .await
1427            .unwrap();
1428        leader_pg_election
1429            .put_value_with_lease(
1430                &leader_pg_election.election_key(),
1431                "test",
1432                Duration::from_secs(10),
1433            )
1434            .await
1435            .unwrap();
1436        leader_pg_election.leader_action().await.unwrap();
1437        let res = leader_pg_election
1438            .get_value_with_lease(&leader_pg_election.election_key())
1439            .await
1440            .unwrap();
1441        assert!(res.is_none());
1442        assert!(!leader_pg_election.is_leader());
1443
1444        match rx.recv().await {
1445            Ok(LeaderChangeMessage::StepDown(key)) => {
1446                assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1447                assert_eq!(
1448                    String::from_utf8_lossy(key.key()),
1449                    leader_pg_election.election_key()
1450                );
1451                assert_eq!(key.lease_id(), i64::default());
1452                assert_eq!(key.revision(), i64::default());
1453            }
1454            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1455        }
1456
1457        // Clean up
1458        leader_pg_election
1459            .pg_client
1460            .read()
1461            .await
1462            .query(&leader_pg_election.sql_set.step_down, &[])
1463            .await
1464            .unwrap();
1465
1466        drop_table(&leader_pg_election, table_name).await;
1467    }
1468
1469    #[tokio::test]
1470    async fn test_follower_action() {
1471        maybe_skip_postgres_integration_test!();
1472        common_telemetry::init_default_ut_logging();
1473        let uuid = uuid::Uuid::new_v4().to_string();
1474        let table_name = "test_follower_action_greptime_metakv";
1475
1476        let candidate_lease_ttl = Duration::from_secs(5);
1477        let execution_timeout = Duration::from_secs(10);
1478        let statement_timeout = Duration::from_secs(10);
1479        let meta_lease_ttl = Duration::from_secs(2);
1480        let idle_session_timeout = Duration::from_secs(0);
1481        let follower_client = create_postgres_client(
1482            Some(table_name),
1483            execution_timeout,
1484            idle_session_timeout,
1485            statement_timeout,
1486        )
1487        .await
1488        .unwrap();
1489        let (tx, mut rx) = broadcast::channel(100);
1490        let follower_pg_election = PgElection {
1491            leader_value: "test_follower".to_string(),
1492            pg_client: RwLock::new(follower_client),
1493            is_leader: AtomicBool::new(false),
1494            leader_infancy: AtomicBool::new(true),
1495            leader_watcher: tx,
1496            store_key_prefix: uuid.clone(),
1497            candidate_lease_ttl,
1498            meta_lease_ttl,
1499            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1500        };
1501
1502        let leader_client = create_postgres_client(
1503            Some(table_name),
1504            execution_timeout,
1505            idle_session_timeout,
1506            statement_timeout,
1507        )
1508        .await
1509        .unwrap();
1510        let (tx, _) = broadcast::channel(100);
1511        let leader_pg_election = PgElection {
1512            leader_value: "test_leader".to_string(),
1513            pg_client: RwLock::new(leader_client),
1514            is_leader: AtomicBool::new(false),
1515            leader_infancy: AtomicBool::new(true),
1516            leader_watcher: tx,
1517            store_key_prefix: uuid,
1518            candidate_lease_ttl,
1519            meta_lease_ttl,
1520            sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1521        };
1522
1523        leader_pg_election
1524            .pg_client
1525            .read()
1526            .await
1527            .query(&leader_pg_election.sql_set.campaign, &[])
1528            .await
1529            .unwrap();
1530        leader_pg_election.elected().await.unwrap();
1531
1532        // Step 1: As a follower, the leader exists and the lease is not expired.
1533        follower_pg_election.follower_action().await.unwrap();
1534
1535        // Step 2: As a follower, the leader exists but the lease expired.
1536        tokio::time::sleep(Duration::from_secs(2)).await;
1537        assert!(follower_pg_election.follower_action().await.is_err());
1538
1539        // Step 3: As a follower, the leader does not exist.
1540        leader_pg_election
1541            .delete_value(&leader_pg_election.election_key())
1542            .await
1543            .unwrap();
1544        assert!(follower_pg_election.follower_action().await.is_err());
1545
1546        // Step 4: Follower thinks it's the leader but failed to acquire the lock.
1547        follower_pg_election
1548            .is_leader
1549            .store(true, Ordering::Relaxed);
1550        assert!(follower_pg_election.follower_action().await.is_err());
1551
1552        match rx.recv().await {
1553            Ok(LeaderChangeMessage::StepDown(key)) => {
1554                assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1555                assert_eq!(
1556                    String::from_utf8_lossy(key.key()),
1557                    follower_pg_election.election_key()
1558                );
1559                assert_eq!(key.lease_id(), i64::default());
1560                assert_eq!(key.revision(), i64::default());
1561            }
1562            _ => panic!("Expected LeaderChangeMessage::StepDown"),
1563        }
1564
1565        // Clean up
1566        leader_pg_election
1567            .pg_client
1568            .read()
1569            .await
1570            .query(&leader_pg_election.sql_set.step_down, &[])
1571            .await
1572            .unwrap();
1573
1574        drop_table(&follower_pg_election, table_name).await;
1575    }
1576
1577    #[tokio::test]
1578    async fn test_idle_session_timeout() {
1579        maybe_skip_postgres_integration_test!();
1580        common_telemetry::init_default_ut_logging();
1581        let execution_timeout = Duration::from_secs(10);
1582        let statement_timeout = Duration::from_secs(10);
1583        let idle_session_timeout = Duration::from_secs(1);
1584        let mut client = create_postgres_client(
1585            None,
1586            execution_timeout,
1587            idle_session_timeout,
1588            statement_timeout,
1589        )
1590        .await
1591        .unwrap();
1592        tokio::time::sleep(Duration::from_millis(1100)).await;
1593        // Wait for the idle session timeout.
1594        let err = client.query("SELECT 1", &[]).await.unwrap_err();
1595        assert_matches!(err, error::Error::PostgresExecution { .. });
1596        let error::Error::PostgresExecution { error, .. } = err else {
1597            panic!("Expected PostgresExecution error");
1598        };
1599        assert!(error.is_closed());
1600        // Reset the client and try again.
1601        client.reset_client().await.unwrap();
1602        let _ = client.query("SELECT 1", &[]).await.unwrap();
1603    }
1604
1605    #[test]
1606    fn test_election_sql_with_schema() {
1607        let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1608        let s = f.build();
1609        assert!(s.campaign.contains("pg_try_advisory_lock"));
1610        assert!(s
1611            .put_value_with_lease
1612            .contains("\"test_schema\".\"greptime_metakv\""));
1613        assert!(s
1614            .update_value_with_lease
1615            .contains("\"test_schema\".\"greptime_metakv\""));
1616        assert!(s
1617            .get_value_with_lease
1618            .contains("\"test_schema\".\"greptime_metakv\""));
1619        assert!(s
1620            .get_value_with_lease_by_prefix
1621            .contains("\"test_schema\".\"greptime_metakv\""));
1622        assert!(s
1623            .delete_value
1624            .contains("\"test_schema\".\"greptime_metakv\""));
1625    }
1626}