1use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, warn};
21use common_time::Timestamp;
22use deadpool_postgres::{Manager, Pool};
23use snafu::{OptionExt, ResultExt, ensure};
24use tokio::sync::{RwLock, broadcast};
25use tokio::time::MissedTickBehavior;
26use tokio_postgres::Row;
27use tokio_postgres::types::ToSql;
28
29use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
30use crate::election::{
31 Election, LeaderChangeMessage, listen_leader_change, send_leader_change_and_set_flags,
32};
33use crate::error::{
34 DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35 Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40 lock_id: u64,
41 schema_name: Option<&'a str>,
42 table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46 campaign: String,
47 step_down: String,
48 put_value_with_lease: String,
58 update_value_with_lease: String,
66 get_value_with_lease: String,
71 get_value_with_lease_by_prefix: String,
80 delete_value: String,
89}
90
91impl<'a> ElectionSqlFactory<'a> {
92 fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
93 Self {
94 lock_id,
95 schema_name,
96 table_name,
97 }
98 }
99
100 fn table_ident(&self) -> String {
101 match self.schema_name {
102 Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
103 _ => format!("\"{}\"", self.table_name),
104 }
105 }
106
107 fn build(self) -> ElectionSqlSet {
108 ElectionSqlSet {
109 campaign: self.campaign_sql(),
110 step_down: self.step_down_sql(),
111 put_value_with_lease: self.put_value_with_lease_sql(),
112 update_value_with_lease: self.update_value_with_lease_sql(),
113 get_value_with_lease: self.get_value_with_lease_sql(),
114 get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
115 delete_value: self.delete_value_sql(),
116 }
117 }
118
119 fn campaign_sql(&self) -> String {
120 format!("SELECT pg_try_advisory_lock({})", self.lock_id)
121 }
122
123 fn step_down_sql(&self) -> String {
124 format!("SELECT pg_advisory_unlock({})", self.lock_id)
125 }
126
127 fn put_value_with_lease_sql(&self) -> String {
128 let table = self.table_ident();
129 format!(
130 r#"WITH prev AS (
131 SELECT k, v FROM {table} WHERE k = $1
132 ), insert AS (
133 INSERT INTO {table}
134 VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
135 ON CONFLICT (k) DO NOTHING
136 )
137 SELECT k, v FROM prev;
138 "#,
139 table = table,
140 lease_sep = LEASE_SEP
141 )
142 }
143
144 fn update_value_with_lease_sql(&self) -> String {
145 let table = self.table_ident();
146 format!(
147 r#"UPDATE {table}
148 SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
149 WHERE k = $1 AND v = $2"#,
150 table = table,
151 lease_sep = LEASE_SEP
152 )
153 }
154
155 fn get_value_with_lease_sql(&self) -> String {
156 let table = self.table_ident();
157 format!(
158 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
159 table = table
160 )
161 }
162
163 fn get_value_with_lease_by_prefix_sql(&self) -> String {
164 let table = self.table_ident();
165 format!(
166 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
167 table = table
168 )
169 }
170
171 fn delete_value_sql(&self) -> String {
172 let table = self.table_ident();
173 format!(
174 "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
175 table = table
176 )
177 }
178}
179
180pub struct ElectionPgClient {
182 current: Option<deadpool::managed::Object<Manager>>,
183 pool: Pool,
184 execution_timeout: Duration,
189
190 idle_session_timeout: Duration,
195
196 statement_timeout: Duration,
201}
202
203impl ElectionPgClient {
204 pub fn new(
205 pool: Pool,
206 execution_timeout: Duration,
207 idle_session_timeout: Duration,
208 statement_timeout: Duration,
209 ) -> Result<ElectionPgClient> {
210 Ok(ElectionPgClient {
211 current: None,
212 pool,
213 execution_timeout,
214 idle_session_timeout,
215 statement_timeout,
216 })
217 }
218
219 fn set_idle_session_timeout_sql(&self) -> String {
220 format!(
221 "SET idle_session_timeout = '{}s';",
222 self.idle_session_timeout.as_secs()
223 )
224 }
225
226 fn set_statement_timeout_sql(&self) -> String {
227 format!(
228 "SET statement_timeout = '{}s';",
229 self.statement_timeout.as_secs()
230 )
231 }
232
233 async fn reset_client(&mut self) -> Result<()> {
234 self.current = None;
235 self.maybe_init_client().await
236 }
237
238 async fn maybe_init_client(&mut self) -> Result<()> {
239 if self.current.is_none() {
240 let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
241
242 self.current = Some(client);
243 let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
245 self.execute(&idle_session_timeout_sql, &[]).await?;
246 let statement_timeout_sql = self.set_statement_timeout_sql();
247 self.execute(&statement_timeout_sql, &[]).await?;
248 }
249
250 Ok(())
251 }
252
253 async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
258 let result = tokio::time::timeout(
259 self.execution_timeout,
260 self.current.as_ref().unwrap().execute(sql, params),
261 )
262 .await
263 .map_err(|_| {
264 SqlExecutionTimeoutSnafu {
265 sql: sql.to_string(),
266 duration: self.execution_timeout,
267 }
268 .build()
269 })?;
270
271 result.context(PostgresExecutionSnafu { sql })
272 }
273
274 async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
279 let result = tokio::time::timeout(
280 self.execution_timeout,
281 self.current.as_ref().unwrap().query(sql, params),
282 )
283 .await
284 .map_err(|_| {
285 SqlExecutionTimeoutSnafu {
286 sql: sql.to_string(),
287 duration: self.execution_timeout,
288 }
289 .build()
290 })?;
291
292 result.context(PostgresExecutionSnafu { sql })
293 }
294}
295
296pub struct PgElection {
298 leader_value: String,
299 pg_client: RwLock<ElectionPgClient>,
300 is_leader: AtomicBool,
301 leader_infancy: AtomicBool,
302 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
303 store_key_prefix: String,
304 candidate_lease_ttl: Duration,
305 meta_lease_ttl: Duration,
306 sql_set: ElectionSqlSet,
307}
308
309impl PgElection {
310 async fn maybe_init_client(&self) -> Result<()> {
311 if self.pg_client.read().await.current.is_none() {
312 self.pg_client.write().await.maybe_init_client().await?;
313 }
314
315 Ok(())
316 }
317
318 #[allow(clippy::too_many_arguments)]
319 pub async fn with_pg_client(
320 leader_value: String,
321 pg_client: ElectionPgClient,
322 store_key_prefix: String,
323 candidate_lease_ttl: Duration,
324 meta_lease_ttl: Duration,
325 schema_name: Option<&str>,
326 table_name: &str,
327 lock_id: u64,
328 ) -> Result<ElectionRef> {
329 if let Some(s) = schema_name {
330 common_telemetry::info!("PgElection uses schema: {}", s);
331 } else {
332 common_telemetry::info!("PgElection uses default search_path (no schema provided)");
333 }
334 let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
335
336 let tx = listen_leader_change(leader_value.clone());
337 Ok(Arc::new(Self {
338 leader_value,
339 pg_client: RwLock::new(pg_client),
340 is_leader: AtomicBool::new(false),
341 leader_infancy: AtomicBool::new(false),
342 leader_watcher: tx,
343 store_key_prefix,
344 candidate_lease_ttl,
345 meta_lease_ttl,
346 sql_set: sql_factory.build(),
347 }))
348 }
349
350 fn election_key(&self) -> String {
351 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
352 }
353
354 fn candidate_root(&self) -> String {
355 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
356 }
357
358 fn candidate_key(&self) -> String {
359 format!("{}{}", self.candidate_root(), self.leader_value)
360 }
361}
362
363#[async_trait::async_trait]
364impl Election for PgElection {
365 type Leader = LeaderValue;
366
367 fn is_leader(&self) -> bool {
368 self.is_leader.load(Ordering::Relaxed)
369 }
370
371 fn in_leader_infancy(&self) -> bool {
372 self.leader_infancy
373 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
374 .is_ok()
375 }
376
377 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
378 let key = self.candidate_key();
379 let node_info =
380 serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
381 input: format!("{node_info:?}"),
382 })?;
383 let res = self
384 .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
385 .await?;
386 if !res {
388 self.delete_value(&key).await?;
389 self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
390 .await?;
391 }
392
393 let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
395 loop {
396 let _ = keep_alive_interval.tick().await;
397
398 let lease = self
399 .get_value_with_lease(&key)
400 .await?
401 .context(UnexpectedSnafu {
402 violated: format!("Failed to get lease for key: {:?}", key),
403 })?;
404
405 ensure!(
406 lease.expire_time > lease.current,
407 UnexpectedSnafu {
408 violated: format!(
409 "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
410 lease.expire_time, lease.current, key
411 ),
412 }
413 );
414
415 self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
417 .await?;
418 }
419 }
420
421 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
422 let key_prefix = self.candidate_root();
423 let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
424 candidates.retain(|c| c.1 > current);
426 let mut valid_candidates = Vec::with_capacity(candidates.len());
427 for (c, _) in candidates {
428 let node_info: MetasrvNodeInfo =
429 serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
430 input: format!("{:?}", c),
431 })?;
432 valid_candidates.push(node_info);
433 }
434 Ok(valid_candidates)
435 }
436
437 async fn campaign(&self) -> Result<()> {
450 let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
451 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
452
453 self.maybe_init_client().await?;
454 loop {
455 let res = self
456 .pg_client
457 .read()
458 .await
459 .query(&self.sql_set.campaign, &[])
460 .await?;
461 let row = res.first().context(UnexpectedSnafu {
462 violated: "Failed to get the result of acquiring advisory lock",
463 })?;
464 let is_leader = row.try_get(0).map_err(|_| {
465 UnexpectedSnafu {
466 violated: "Failed to get the result of get lock",
467 }
468 .build()
469 })?;
470 if is_leader {
471 self.leader_action().await?;
472 } else {
473 self.follower_action().await?;
474 }
475 let _ = keep_alive_interval.tick().await;
476 }
477 }
478
479 async fn reset_campaign(&self) {
480 if let Err(err) = self.pg_client.write().await.reset_client().await {
481 error!(err; "Failed to reset client");
482 }
483 }
484
485 async fn leader(&self) -> Result<Self::Leader> {
486 if self.is_leader.load(Ordering::Relaxed) {
487 Ok(self.leader_value.as_bytes().into())
488 } else {
489 let key = self.election_key();
490 if let Some(lease) = self.get_value_with_lease(&key).await? {
491 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
492 Ok(lease.leader_value.as_bytes().into())
493 } else {
494 NoLeaderSnafu.fail()
495 }
496 }
497 }
498
499 async fn resign(&self) -> Result<()> {
500 todo!()
501 }
502
503 fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
504 self.leader_watcher.subscribe()
505 }
506}
507
508impl PgElection {
509 async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
511 let key = key.as_bytes();
512 self.maybe_init_client().await?;
513 let res = self
514 .pg_client
515 .read()
516 .await
517 .query(&self.sql_set.get_value_with_lease, &[&key])
518 .await?;
519
520 if res.is_empty() {
521 Ok(None)
522 } else {
523 let current_time_str = res[0].try_get(1).unwrap_or_default();
525 let current_time = match Timestamp::from_str(current_time_str, None) {
526 Ok(ts) => ts,
527 Err(_) => UnexpectedSnafu {
528 violated: format!("Invalid timestamp: {}", current_time_str),
529 }
530 .fail()?,
531 };
532 let value_and_expire_time =
534 String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
535 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
536
537 Ok(Some(Lease {
538 leader_value: value,
539 expire_time,
540 current: current_time,
541 origin: value_and_expire_time.to_string(),
542 }))
543 }
544 }
545
546 async fn get_value_with_lease_by_prefix(
548 &self,
549 key_prefix: &str,
550 ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
551 let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
552 self.maybe_init_client().await?;
553 let res = self
554 .pg_client
555 .read()
556 .await
557 .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
558 .await?;
559
560 let mut values_with_leases = vec![];
561 let mut current = Timestamp::default();
562 for row in res {
563 let current_time_str = row.try_get(1).unwrap_or_default();
564 current = match Timestamp::from_str(current_time_str, None) {
565 Ok(ts) => ts,
566 Err(_) => UnexpectedSnafu {
567 violated: format!("Invalid timestamp: {}", current_time_str),
568 }
569 .fail()?,
570 };
571
572 let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
573 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
574
575 values_with_leases.push((value, expire_time));
576 }
577 Ok((values_with_leases, current))
578 }
579
580 async fn update_value_with_lease(
581 &self,
582 key: &str,
583 prev: &str,
584 updated: &str,
585 lease_ttl: Duration,
586 ) -> Result<()> {
587 let key = key.as_bytes();
588 let prev = prev.as_bytes();
589 self.maybe_init_client().await?;
590 let lease_ttl_secs = lease_ttl.as_secs() as f64;
591 let res = self
592 .pg_client
593 .read()
594 .await
595 .execute(
596 &self.sql_set.update_value_with_lease,
597 &[&key, &prev, &updated, &lease_ttl_secs],
598 )
599 .await?;
600
601 ensure!(
602 res == 1,
603 UnexpectedSnafu {
604 violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
605 }
606 );
607
608 Ok(())
609 }
610
611 async fn put_value_with_lease(
613 &self,
614 key: &str,
615 value: &str,
616 lease_ttl: Duration,
617 ) -> Result<bool> {
618 let key = key.as_bytes();
619 let lease_ttl_secs = lease_ttl.as_secs() as f64;
620 let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
621 self.maybe_init_client().await?;
622 let res = self
623 .pg_client
624 .read()
625 .await
626 .query(&self.sql_set.put_value_with_lease, ¶ms)
627 .await?;
628 Ok(res.is_empty())
629 }
630
631 async fn delete_value(&self, key: &str) -> Result<bool> {
634 let key = key.as_bytes();
635 self.maybe_init_client().await?;
636 let res = self
637 .pg_client
638 .read()
639 .await
640 .query(&self.sql_set.delete_value, &[&key])
641 .await?;
642
643 Ok(res.len() == 1)
644 }
645
646 async fn leader_action(&self) -> Result<()> {
667 let key = self.election_key();
668 if self.is_leader() {
670 match self.get_value_with_lease(&key).await? {
671 Some(lease) => {
672 match (
673 lease.leader_value == self.leader_value,
674 lease.expire_time > lease.current,
675 ) {
676 (true, true) => {
678 self.update_value_with_lease(
680 &key,
681 &lease.origin,
682 &self.leader_value,
683 self.meta_lease_ttl,
684 )
685 .await?;
686 }
687 (true, false) => {
689 warn!("Leader lease expired, now stepping down.");
690 self.step_down().await?;
691 }
692 (false, _) => {
694 warn!(
695 "Leader lease not found, but still hold the lock. Now stepping down."
696 );
697 self.step_down().await?;
698 }
699 }
700 }
701 None => {
703 warn!("Leader lease not found, but still hold the lock. Now stepping down.");
704 self.step_down().await?;
705 }
706 }
707 } else {
709 self.elected().await?;
710 }
711 Ok(())
712 }
713
714 async fn follower_action(&self) -> Result<()> {
725 let key = self.election_key();
726 if self.is_leader() {
728 self.step_down_without_lock().await?;
729 }
730 let lease = self
731 .get_value_with_lease(&key)
732 .await?
733 .context(NoLeaderSnafu)?;
734 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
736 Ok(())
738 }
739
740 async fn step_down(&self) -> Result<()> {
747 let key = self.election_key();
748 let leader_key = RdsLeaderKey {
749 name: self.leader_value.clone().into_bytes(),
750 key: key.clone().into_bytes(),
751 ..Default::default()
752 };
753 self.delete_value(&key).await?;
754 self.maybe_init_client().await?;
755 self.pg_client
756 .read()
757 .await
758 .query(&self.sql_set.step_down, &[])
759 .await?;
760 send_leader_change_and_set_flags(
761 &self.is_leader,
762 &self.leader_infancy,
763 &self.leader_watcher,
764 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
765 );
766 Ok(())
767 }
768
769 async fn step_down_without_lock(&self) -> Result<()> {
771 let key = self.election_key().into_bytes();
772 let leader_key = RdsLeaderKey {
773 name: self.leader_value.clone().into_bytes(),
774 key: key.clone(),
775 ..Default::default()
776 };
777 if self
778 .is_leader
779 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
780 .is_ok()
781 && let Err(e) = self
782 .leader_watcher
783 .send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
784 {
785 error!(e; "Failed to send leader change message");
786 }
787 Ok(())
788 }
789
790 async fn elected(&self) -> Result<()> {
793 let key = self.election_key();
794 let leader_key = RdsLeaderKey {
795 name: self.leader_value.clone().into_bytes(),
796 key: key.clone().into_bytes(),
797 ..Default::default()
798 };
799 self.delete_value(&key).await?;
800 self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
801 .await?;
802
803 if self
804 .is_leader
805 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
806 .is_ok()
807 {
808 self.leader_infancy.store(true, Ordering::Release);
809
810 if let Err(e) = self
811 .leader_watcher
812 .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
813 {
814 error!(e; "Failed to send leader change message");
815 }
816 }
817 Ok(())
818 }
819}
820
821#[cfg(test)]
822mod tests {
823 use std::assert_matches::assert_matches;
824 use std::env;
825
826 use common_meta::maybe_skip_postgres_integration_test;
827
828 use super::*;
829 use crate::error;
830 use crate::utils::postgres::create_postgres_pool;
831
832 async fn create_postgres_client(
833 table_name: Option<&str>,
834 execution_timeout: Duration,
835 idle_session_timeout: Duration,
836 statement_timeout: Duration,
837 ) -> Result<ElectionPgClient> {
838 let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
839 if endpoint.is_empty() {
840 return UnexpectedSnafu {
841 violated: "Postgres endpoint is empty".to_string(),
842 }
843 .fail();
844 }
845 let pool = create_postgres_pool(&[endpoint], None, None).await.unwrap();
846 let mut pg_client = ElectionPgClient::new(
847 pool,
848 execution_timeout,
849 idle_session_timeout,
850 statement_timeout,
851 )
852 .unwrap();
853 pg_client.maybe_init_client().await?;
854 if let Some(table_name) = table_name {
855 let create_table_sql = format!(
856 "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
857 table_name
858 );
859 pg_client.execute(&create_table_sql, &[]).await?;
860 }
861 Ok(pg_client)
862 }
863
864 async fn drop_table(pg_election: &PgElection, table_name: &str) {
865 let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
866 pg_election
867 .pg_client
868 .read()
869 .await
870 .execute(&sql, &[])
871 .await
872 .unwrap();
873 }
874
875 #[tokio::test]
876 async fn test_postgres_crud() {
877 maybe_skip_postgres_integration_test!();
878 let key = "test_key".to_string();
879 let value = "test_value".to_string();
880
881 let uuid = uuid::Uuid::new_v4().to_string();
882 let table_name = "test_postgres_crud_greptime_metakv";
883 let candidate_lease_ttl = Duration::from_secs(10);
884 let execution_timeout = Duration::from_secs(10);
885 let statement_timeout = Duration::from_secs(10);
886 let meta_lease_ttl = Duration::from_secs(2);
887 let idle_session_timeout = Duration::from_secs(0);
888 let client = create_postgres_client(
889 Some(table_name),
890 execution_timeout,
891 idle_session_timeout,
892 statement_timeout,
893 )
894 .await
895 .unwrap();
896
897 let (tx, _) = broadcast::channel(100);
898 let pg_election = PgElection {
899 leader_value: "test_leader".to_string(),
900 pg_client: RwLock::new(client),
901 is_leader: AtomicBool::new(false),
902 leader_infancy: AtomicBool::new(true),
903 leader_watcher: tx,
904 store_key_prefix: uuid,
905 candidate_lease_ttl,
906 meta_lease_ttl,
907 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
908 };
909
910 let res = pg_election
911 .put_value_with_lease(&key, &value, candidate_lease_ttl)
912 .await
913 .unwrap();
914 assert!(res);
915
916 let lease = pg_election
917 .get_value_with_lease(&key)
918 .await
919 .unwrap()
920 .unwrap();
921 assert_eq!(lease.leader_value, value);
922
923 pg_election
924 .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
925 .await
926 .unwrap();
927
928 let res = pg_election.delete_value(&key).await.unwrap();
929 assert!(res);
930
931 let res = pg_election.get_value_with_lease(&key).await.unwrap();
932 assert!(res.is_none());
933
934 for i in 0..10 {
935 let key = format!("test_key_{}", i);
936 let value = format!("test_value_{}", i);
937 pg_election
938 .put_value_with_lease(&key, &value, candidate_lease_ttl)
939 .await
940 .unwrap();
941 }
942
943 let key_prefix = "test_key".to_string();
944 let (res, _) = pg_election
945 .get_value_with_lease_by_prefix(&key_prefix)
946 .await
947 .unwrap();
948 assert_eq!(res.len(), 10);
949
950 for i in 0..10 {
951 let key = format!("test_key_{}", i);
952 let res = pg_election.delete_value(&key).await.unwrap();
953 assert!(res);
954 }
955
956 let (res, current) = pg_election
957 .get_value_with_lease_by_prefix(&key_prefix)
958 .await
959 .unwrap();
960 assert!(res.is_empty());
961 assert!(current == Timestamp::default());
962
963 drop_table(&pg_election, table_name).await;
964 }
965
966 async fn candidate(
967 leader_value: String,
968 candidate_lease_ttl: Duration,
969 store_key_prefix: String,
970 table_name: String,
971 ) {
972 let execution_timeout = Duration::from_secs(10);
973 let statement_timeout = Duration::from_secs(10);
974 let meta_lease_ttl = Duration::from_secs(2);
975 let idle_session_timeout = Duration::from_secs(0);
976 let client = create_postgres_client(
977 None,
978 execution_timeout,
979 idle_session_timeout,
980 statement_timeout,
981 )
982 .await
983 .unwrap();
984
985 let (tx, _) = broadcast::channel(100);
986 let pg_election = PgElection {
987 leader_value,
988 pg_client: RwLock::new(client),
989 is_leader: AtomicBool::new(false),
990 leader_infancy: AtomicBool::new(true),
991 leader_watcher: tx,
992 store_key_prefix,
993 candidate_lease_ttl,
994 meta_lease_ttl,
995 sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
996 };
997
998 let node_info = MetasrvNodeInfo {
999 addr: "test_addr".to_string(),
1000 version: "test_version".to_string(),
1001 git_commit: "test_git_commit".to_string(),
1002 start_time_ms: 0,
1003 total_cpu_millicores: 0,
1004 total_memory_bytes: 0,
1005 cpu_usage_millicores: 0,
1006 memory_usage_bytes: 0,
1007 hostname: "test_hostname".to_string(),
1008 };
1009 pg_election.register_candidate(&node_info).await.unwrap();
1010 }
1011
1012 #[tokio::test]
1013 async fn test_candidate_registration() {
1014 maybe_skip_postgres_integration_test!();
1015 let leader_value_prefix = "test_leader".to_string();
1016 let uuid = uuid::Uuid::new_v4().to_string();
1017 let table_name = "test_candidate_registration_greptime_metakv";
1018 let mut handles = vec![];
1019 let candidate_lease_ttl = Duration::from_secs(5);
1020 let execution_timeout = Duration::from_secs(10);
1021 let statement_timeout = Duration::from_secs(10);
1022 let meta_lease_ttl = Duration::from_secs(2);
1023 let idle_session_timeout = Duration::from_secs(0);
1024 let client = create_postgres_client(
1025 Some(table_name),
1026 execution_timeout,
1027 idle_session_timeout,
1028 statement_timeout,
1029 )
1030 .await
1031 .unwrap();
1032
1033 for i in 0..10 {
1034 let leader_value = format!("{}{}", leader_value_prefix, i);
1035 let handle = tokio::spawn(candidate(
1036 leader_value,
1037 candidate_lease_ttl,
1038 uuid.clone(),
1039 table_name.to_string(),
1040 ));
1041 handles.push(handle);
1042 }
1043 tokio::time::sleep(Duration::from_secs(3)).await;
1045
1046 let (tx, _) = broadcast::channel(100);
1047 let leader_value = "test_leader".to_string();
1048 let pg_election = PgElection {
1049 leader_value,
1050 pg_client: RwLock::new(client),
1051 is_leader: AtomicBool::new(false),
1052 leader_infancy: AtomicBool::new(true),
1053 leader_watcher: tx,
1054 store_key_prefix: uuid.clone(),
1055 candidate_lease_ttl,
1056 meta_lease_ttl,
1057 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1058 };
1059
1060 let candidates = pg_election.all_candidates().await.unwrap();
1061 assert_eq!(candidates.len(), 10);
1062
1063 for handle in handles {
1064 handle.abort();
1065 }
1066
1067 tokio::time::sleep(Duration::from_secs(5)).await;
1069 let candidates = pg_election.all_candidates().await.unwrap();
1070 assert!(candidates.is_empty());
1071
1072 for i in 0..10 {
1074 let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1075 let res = pg_election.delete_value(&key).await.unwrap();
1076 assert!(res);
1077 }
1078
1079 drop_table(&pg_election, table_name).await;
1080 }
1081
1082 #[tokio::test]
1083 async fn test_elected_and_step_down() {
1084 maybe_skip_postgres_integration_test!();
1085 let leader_value = "test_leader".to_string();
1086 let uuid = uuid::Uuid::new_v4().to_string();
1087 let table_name = "test_elected_and_step_down_greptime_metakv";
1088 let candidate_lease_ttl = Duration::from_secs(5);
1089 let execution_timeout = Duration::from_secs(10);
1090 let statement_timeout = Duration::from_secs(10);
1091 let meta_lease_ttl = Duration::from_secs(2);
1092 let idle_session_timeout = Duration::from_secs(0);
1093 let client = create_postgres_client(
1094 Some(table_name),
1095 execution_timeout,
1096 idle_session_timeout,
1097 statement_timeout,
1098 )
1099 .await
1100 .unwrap();
1101
1102 let (tx, mut rx) = broadcast::channel(100);
1103 let leader_pg_election = PgElection {
1104 leader_value: leader_value.clone(),
1105 pg_client: RwLock::new(client),
1106 is_leader: AtomicBool::new(false),
1107 leader_infancy: AtomicBool::new(true),
1108 leader_watcher: tx,
1109 store_key_prefix: uuid,
1110 candidate_lease_ttl,
1111 meta_lease_ttl,
1112 sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1113 };
1114
1115 leader_pg_election.elected().await.unwrap();
1116 let lease = leader_pg_election
1117 .get_value_with_lease(&leader_pg_election.election_key())
1118 .await
1119 .unwrap()
1120 .unwrap();
1121 assert!(lease.leader_value == leader_value);
1122 assert!(lease.expire_time > lease.current);
1123 assert!(leader_pg_election.is_leader());
1124
1125 match rx.recv().await {
1126 Ok(LeaderChangeMessage::Elected(key)) => {
1127 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1128 assert_eq!(
1129 String::from_utf8_lossy(key.key()),
1130 leader_pg_election.election_key()
1131 );
1132 assert_eq!(key.lease_id(), i64::default());
1133 assert_eq!(key.revision(), i64::default());
1134 }
1135 _ => panic!("Expected LeaderChangeMessage::Elected"),
1136 }
1137
1138 leader_pg_election.step_down_without_lock().await.unwrap();
1139 let lease = leader_pg_election
1140 .get_value_with_lease(&leader_pg_election.election_key())
1141 .await
1142 .unwrap()
1143 .unwrap();
1144 assert!(lease.leader_value == leader_value);
1145 assert!(!leader_pg_election.is_leader());
1146
1147 match rx.recv().await {
1148 Ok(LeaderChangeMessage::StepDown(key)) => {
1149 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1150 assert_eq!(
1151 String::from_utf8_lossy(key.key()),
1152 leader_pg_election.election_key()
1153 );
1154 assert_eq!(key.lease_id(), i64::default());
1155 assert_eq!(key.revision(), i64::default());
1156 }
1157 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1158 }
1159
1160 leader_pg_election.elected().await.unwrap();
1161 let lease = leader_pg_election
1162 .get_value_with_lease(&leader_pg_election.election_key())
1163 .await
1164 .unwrap()
1165 .unwrap();
1166 assert!(lease.leader_value == leader_value);
1167 assert!(lease.expire_time > lease.current);
1168 assert!(leader_pg_election.is_leader());
1169
1170 match rx.recv().await {
1171 Ok(LeaderChangeMessage::Elected(key)) => {
1172 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1173 assert_eq!(
1174 String::from_utf8_lossy(key.key()),
1175 leader_pg_election.election_key()
1176 );
1177 assert_eq!(key.lease_id(), i64::default());
1178 assert_eq!(key.revision(), i64::default());
1179 }
1180 _ => panic!("Expected LeaderChangeMessage::Elected"),
1181 }
1182
1183 leader_pg_election.step_down().await.unwrap();
1184 let res = leader_pg_election
1185 .get_value_with_lease(&leader_pg_election.election_key())
1186 .await
1187 .unwrap();
1188 assert!(res.is_none());
1189 assert!(!leader_pg_election.is_leader());
1190
1191 match rx.recv().await {
1192 Ok(LeaderChangeMessage::StepDown(key)) => {
1193 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1194 assert_eq!(
1195 String::from_utf8_lossy(key.key()),
1196 leader_pg_election.election_key()
1197 );
1198 assert_eq!(key.lease_id(), i64::default());
1199 assert_eq!(key.revision(), i64::default());
1200 }
1201 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1202 }
1203
1204 drop_table(&leader_pg_election, table_name).await;
1205 }
1206
1207 #[tokio::test]
1208 async fn test_leader_action() {
1209 maybe_skip_postgres_integration_test!();
1210 let leader_value = "test_leader".to_string();
1211 let uuid = uuid::Uuid::new_v4().to_string();
1212 let table_name = "test_leader_action_greptime_metakv";
1213 let candidate_lease_ttl = Duration::from_secs(5);
1214 let execution_timeout = Duration::from_secs(10);
1215 let statement_timeout = Duration::from_secs(10);
1216 let meta_lease_ttl = Duration::from_secs(2);
1217 let idle_session_timeout = Duration::from_secs(0);
1218 let client = create_postgres_client(
1219 Some(table_name),
1220 execution_timeout,
1221 idle_session_timeout,
1222 statement_timeout,
1223 )
1224 .await
1225 .unwrap();
1226
1227 let (tx, mut rx) = broadcast::channel(100);
1228 let leader_pg_election = PgElection {
1229 leader_value: leader_value.clone(),
1230 pg_client: RwLock::new(client),
1231 is_leader: AtomicBool::new(false),
1232 leader_infancy: AtomicBool::new(true),
1233 leader_watcher: tx,
1234 store_key_prefix: uuid,
1235 candidate_lease_ttl,
1236 meta_lease_ttl,
1237 sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1238 };
1239
1240 let res = leader_pg_election
1242 .pg_client
1243 .read()
1244 .await
1245 .query(&leader_pg_election.sql_set.campaign, &[])
1246 .await
1247 .unwrap();
1248 let res: bool = res[0].get(0);
1249 assert!(res);
1250 leader_pg_election.leader_action().await.unwrap();
1251 let lease = leader_pg_election
1252 .get_value_with_lease(&leader_pg_election.election_key())
1253 .await
1254 .unwrap()
1255 .unwrap();
1256 assert!(lease.leader_value == leader_value);
1257 assert!(lease.expire_time > lease.current);
1258 assert!(leader_pg_election.is_leader());
1259
1260 match rx.recv().await {
1261 Ok(LeaderChangeMessage::Elected(key)) => {
1262 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1263 assert_eq!(
1264 String::from_utf8_lossy(key.key()),
1265 leader_pg_election.election_key()
1266 );
1267 assert_eq!(key.lease_id(), i64::default());
1268 assert_eq!(key.revision(), i64::default());
1269 }
1270 _ => panic!("Expected LeaderChangeMessage::Elected"),
1271 }
1272
1273 let res = leader_pg_election
1275 .pg_client
1276 .read()
1277 .await
1278 .query(&leader_pg_election.sql_set.campaign, &[])
1279 .await
1280 .unwrap();
1281 let res: bool = res[0].get(0);
1282 assert!(res);
1283 leader_pg_election.leader_action().await.unwrap();
1284 let new_lease = leader_pg_election
1285 .get_value_with_lease(&leader_pg_election.election_key())
1286 .await
1287 .unwrap()
1288 .unwrap();
1289 assert!(new_lease.leader_value == leader_value);
1290 assert!(
1291 new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1292 );
1293 assert!(leader_pg_election.is_leader());
1294
1295 tokio::time::sleep(Duration::from_secs(2)).await;
1297
1298 let res = leader_pg_election
1299 .pg_client
1300 .read()
1301 .await
1302 .query(&leader_pg_election.sql_set.campaign, &[])
1303 .await
1304 .unwrap();
1305 let res: bool = res[0].get(0);
1306 assert!(res);
1307 leader_pg_election.leader_action().await.unwrap();
1308 let res = leader_pg_election
1309 .get_value_with_lease(&leader_pg_election.election_key())
1310 .await
1311 .unwrap();
1312 assert!(res.is_none());
1313
1314 match rx.recv().await {
1315 Ok(LeaderChangeMessage::StepDown(key)) => {
1316 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1317 assert_eq!(
1318 String::from_utf8_lossy(key.key()),
1319 leader_pg_election.election_key()
1320 );
1321 assert_eq!(key.lease_id(), i64::default());
1322 assert_eq!(key.revision(), i64::default());
1323 }
1324 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1325 }
1326
1327 let res = leader_pg_election
1329 .pg_client
1330 .read()
1331 .await
1332 .query(&leader_pg_election.sql_set.campaign, &[])
1333 .await
1334 .unwrap();
1335 let res: bool = res[0].get(0);
1336 assert!(res);
1337 leader_pg_election.leader_action().await.unwrap();
1338 let lease = leader_pg_election
1339 .get_value_with_lease(&leader_pg_election.election_key())
1340 .await
1341 .unwrap()
1342 .unwrap();
1343 assert!(lease.leader_value == leader_value);
1344 assert!(lease.expire_time > lease.current);
1345 assert!(leader_pg_election.is_leader());
1346
1347 match rx.recv().await {
1348 Ok(LeaderChangeMessage::Elected(key)) => {
1349 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1350 assert_eq!(
1351 String::from_utf8_lossy(key.key()),
1352 leader_pg_election.election_key()
1353 );
1354 assert_eq!(key.lease_id(), i64::default());
1355 assert_eq!(key.revision(), i64::default());
1356 }
1357 _ => panic!("Expected LeaderChangeMessage::Elected"),
1358 }
1359
1360 leader_pg_election
1362 .delete_value(&leader_pg_election.election_key())
1363 .await
1364 .unwrap();
1365 leader_pg_election.leader_action().await.unwrap();
1366 let res = leader_pg_election
1367 .get_value_with_lease(&leader_pg_election.election_key())
1368 .await
1369 .unwrap();
1370 assert!(res.is_none());
1371 assert!(!leader_pg_election.is_leader());
1372
1373 match rx.recv().await {
1374 Ok(LeaderChangeMessage::StepDown(key)) => {
1375 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1376 assert_eq!(
1377 String::from_utf8_lossy(key.key()),
1378 leader_pg_election.election_key()
1379 );
1380 assert_eq!(key.lease_id(), i64::default());
1381 assert_eq!(key.revision(), i64::default());
1382 }
1383 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1384 }
1385
1386 let res = leader_pg_election
1388 .pg_client
1389 .read()
1390 .await
1391 .query(&leader_pg_election.sql_set.campaign, &[])
1392 .await
1393 .unwrap();
1394 let res: bool = res[0].get(0);
1395 assert!(res);
1396 leader_pg_election.leader_action().await.unwrap();
1397 let lease = leader_pg_election
1398 .get_value_with_lease(&leader_pg_election.election_key())
1399 .await
1400 .unwrap()
1401 .unwrap();
1402 assert!(lease.leader_value == leader_value);
1403 assert!(lease.expire_time > lease.current);
1404 assert!(leader_pg_election.is_leader());
1405
1406 match rx.recv().await {
1407 Ok(LeaderChangeMessage::Elected(key)) => {
1408 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1409 assert_eq!(
1410 String::from_utf8_lossy(key.key()),
1411 leader_pg_election.election_key()
1412 );
1413 assert_eq!(key.lease_id(), i64::default());
1414 assert_eq!(key.revision(), i64::default());
1415 }
1416 _ => panic!("Expected LeaderChangeMessage::Elected"),
1417 }
1418
1419 let res = leader_pg_election
1421 .pg_client
1422 .read()
1423 .await
1424 .query(&leader_pg_election.sql_set.campaign, &[])
1425 .await
1426 .unwrap();
1427 let res: bool = res[0].get(0);
1428 assert!(res);
1429 leader_pg_election
1430 .delete_value(&leader_pg_election.election_key())
1431 .await
1432 .unwrap();
1433 leader_pg_election
1434 .put_value_with_lease(
1435 &leader_pg_election.election_key(),
1436 "test",
1437 Duration::from_secs(10),
1438 )
1439 .await
1440 .unwrap();
1441 leader_pg_election.leader_action().await.unwrap();
1442 let res = leader_pg_election
1443 .get_value_with_lease(&leader_pg_election.election_key())
1444 .await
1445 .unwrap();
1446 assert!(res.is_none());
1447 assert!(!leader_pg_election.is_leader());
1448
1449 match rx.recv().await {
1450 Ok(LeaderChangeMessage::StepDown(key)) => {
1451 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1452 assert_eq!(
1453 String::from_utf8_lossy(key.key()),
1454 leader_pg_election.election_key()
1455 );
1456 assert_eq!(key.lease_id(), i64::default());
1457 assert_eq!(key.revision(), i64::default());
1458 }
1459 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1460 }
1461
1462 leader_pg_election
1464 .pg_client
1465 .read()
1466 .await
1467 .query(&leader_pg_election.sql_set.step_down, &[])
1468 .await
1469 .unwrap();
1470
1471 drop_table(&leader_pg_election, table_name).await;
1472 }
1473
1474 #[tokio::test]
1475 async fn test_follower_action() {
1476 maybe_skip_postgres_integration_test!();
1477 common_telemetry::init_default_ut_logging();
1478 let uuid = uuid::Uuid::new_v4().to_string();
1479 let table_name = "test_follower_action_greptime_metakv";
1480
1481 let candidate_lease_ttl = Duration::from_secs(5);
1482 let execution_timeout = Duration::from_secs(10);
1483 let statement_timeout = Duration::from_secs(10);
1484 let meta_lease_ttl = Duration::from_secs(2);
1485 let idle_session_timeout = Duration::from_secs(0);
1486 let follower_client = create_postgres_client(
1487 Some(table_name),
1488 execution_timeout,
1489 idle_session_timeout,
1490 statement_timeout,
1491 )
1492 .await
1493 .unwrap();
1494 let (tx, mut rx) = broadcast::channel(100);
1495 let follower_pg_election = PgElection {
1496 leader_value: "test_follower".to_string(),
1497 pg_client: RwLock::new(follower_client),
1498 is_leader: AtomicBool::new(false),
1499 leader_infancy: AtomicBool::new(true),
1500 leader_watcher: tx,
1501 store_key_prefix: uuid.clone(),
1502 candidate_lease_ttl,
1503 meta_lease_ttl,
1504 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1505 };
1506
1507 let leader_client = create_postgres_client(
1508 Some(table_name),
1509 execution_timeout,
1510 idle_session_timeout,
1511 statement_timeout,
1512 )
1513 .await
1514 .unwrap();
1515 let (tx, _) = broadcast::channel(100);
1516 let leader_pg_election = PgElection {
1517 leader_value: "test_leader".to_string(),
1518 pg_client: RwLock::new(leader_client),
1519 is_leader: AtomicBool::new(false),
1520 leader_infancy: AtomicBool::new(true),
1521 leader_watcher: tx,
1522 store_key_prefix: uuid,
1523 candidate_lease_ttl,
1524 meta_lease_ttl,
1525 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1526 };
1527
1528 leader_pg_election
1529 .pg_client
1530 .read()
1531 .await
1532 .query(&leader_pg_election.sql_set.campaign, &[])
1533 .await
1534 .unwrap();
1535 leader_pg_election.elected().await.unwrap();
1536
1537 follower_pg_election.follower_action().await.unwrap();
1539
1540 tokio::time::sleep(Duration::from_secs(2)).await;
1542 assert!(follower_pg_election.follower_action().await.is_err());
1543
1544 leader_pg_election
1546 .delete_value(&leader_pg_election.election_key())
1547 .await
1548 .unwrap();
1549 assert!(follower_pg_election.follower_action().await.is_err());
1550
1551 follower_pg_election
1553 .is_leader
1554 .store(true, Ordering::Relaxed);
1555 assert!(follower_pg_election.follower_action().await.is_err());
1556
1557 match rx.recv().await {
1558 Ok(LeaderChangeMessage::StepDown(key)) => {
1559 assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1560 assert_eq!(
1561 String::from_utf8_lossy(key.key()),
1562 follower_pg_election.election_key()
1563 );
1564 assert_eq!(key.lease_id(), i64::default());
1565 assert_eq!(key.revision(), i64::default());
1566 }
1567 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1568 }
1569
1570 leader_pg_election
1572 .pg_client
1573 .read()
1574 .await
1575 .query(&leader_pg_election.sql_set.step_down, &[])
1576 .await
1577 .unwrap();
1578
1579 drop_table(&follower_pg_election, table_name).await;
1580 }
1581
1582 #[tokio::test]
1583 async fn test_idle_session_timeout() {
1584 maybe_skip_postgres_integration_test!();
1585 common_telemetry::init_default_ut_logging();
1586 let execution_timeout = Duration::from_secs(10);
1587 let statement_timeout = Duration::from_secs(10);
1588 let idle_session_timeout = Duration::from_secs(1);
1589 let mut client = create_postgres_client(
1590 None,
1591 execution_timeout,
1592 idle_session_timeout,
1593 statement_timeout,
1594 )
1595 .await
1596 .unwrap();
1597 tokio::time::sleep(Duration::from_millis(1100)).await;
1598 let err = client.query("SELECT 1", &[]).await.unwrap_err();
1600 assert_matches!(err, error::Error::PostgresExecution { .. });
1601 let error::Error::PostgresExecution { error, .. } = err else {
1602 panic!("Expected PostgresExecution error");
1603 };
1604 assert!(error.is_closed());
1605 client.reset_client().await.unwrap();
1607 let _ = client.query("SELECT 1", &[]).await.unwrap();
1608 }
1609
1610 #[test]
1611 fn test_election_sql_with_schema() {
1612 let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1613 let s = f.build();
1614 assert!(s.campaign.contains("pg_try_advisory_lock"));
1615 assert!(
1616 s.put_value_with_lease
1617 .contains("\"test_schema\".\"greptime_metakv\"")
1618 );
1619 assert!(
1620 s.update_value_with_lease
1621 .contains("\"test_schema\".\"greptime_metakv\"")
1622 );
1623 assert!(
1624 s.get_value_with_lease
1625 .contains("\"test_schema\".\"greptime_metakv\"")
1626 );
1627 assert!(
1628 s.get_value_with_lease_by_prefix
1629 .contains("\"test_schema\".\"greptime_metakv\"")
1630 );
1631 assert!(
1632 s.delete_value
1633 .contains("\"test_schema\".\"greptime_metakv\"")
1634 );
1635 }
1636}