1use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, info, warn};
21use common_time::Timestamp;
22use deadpool_postgres::{Manager, Pool};
23use snafu::{OptionExt, ResultExt, ensure};
24use tokio::sync::{RwLock, broadcast};
25use tokio::time::MissedTickBehavior;
26use tokio_postgres::Row;
27use tokio_postgres::types::ToSql;
28
29use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
30use crate::election::{
31 Election, LeaderChangeMessage, listen_leader_change, send_leader_change_and_set_flags,
32};
33use crate::error::{
34 DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35 Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40 lock_id: u64,
41 schema_name: Option<&'a str>,
42 table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46 campaign: String,
47 step_down: String,
48 put_value_with_lease: String,
58 update_value_with_lease: String,
66 get_value_with_lease: String,
71 get_value_with_lease_by_prefix: String,
80 delete_value: String,
89}
90
91impl<'a> ElectionSqlFactory<'a> {
92 fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
93 Self {
94 lock_id,
95 schema_name,
96 table_name,
97 }
98 }
99
100 fn table_ident(&self) -> String {
101 match self.schema_name {
102 Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
103 _ => format!("\"{}\"", self.table_name),
104 }
105 }
106
107 fn build(self) -> ElectionSqlSet {
108 ElectionSqlSet {
109 campaign: self.campaign_sql(),
110 step_down: self.step_down_sql(),
111 put_value_with_lease: self.put_value_with_lease_sql(),
112 update_value_with_lease: self.update_value_with_lease_sql(),
113 get_value_with_lease: self.get_value_with_lease_sql(),
114 get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
115 delete_value: self.delete_value_sql(),
116 }
117 }
118
119 fn campaign_sql(&self) -> String {
120 format!("SELECT pg_try_advisory_lock({})", self.lock_id)
121 }
122
123 fn step_down_sql(&self) -> String {
124 format!("SELECT pg_advisory_unlock({})", self.lock_id)
125 }
126
127 fn put_value_with_lease_sql(&self) -> String {
128 let table = self.table_ident();
129 format!(
130 r#"WITH prev AS (
131 SELECT k, v FROM {table} WHERE k = $1
132 ), insert AS (
133 INSERT INTO {table}
134 VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
135 ON CONFLICT (k) DO NOTHING
136 )
137 SELECT k, v FROM prev;
138 "#,
139 table = table,
140 lease_sep = LEASE_SEP
141 )
142 }
143
144 fn update_value_with_lease_sql(&self) -> String {
145 let table = self.table_ident();
146 format!(
147 r#"UPDATE {table}
148 SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
149 WHERE k = $1 AND v = $2"#,
150 table = table,
151 lease_sep = LEASE_SEP
152 )
153 }
154
155 fn get_value_with_lease_sql(&self) -> String {
156 let table = self.table_ident();
157 format!(
158 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
159 table = table
160 )
161 }
162
163 fn get_value_with_lease_by_prefix_sql(&self) -> String {
164 let table = self.table_ident();
165 format!(
166 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
167 table = table
168 )
169 }
170
171 fn delete_value_sql(&self) -> String {
172 let table = self.table_ident();
173 format!(
174 "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
175 table = table
176 )
177 }
178}
179
180pub struct ElectionPgClient {
182 current: Option<deadpool::managed::Object<Manager>>,
183 pool: Pool,
184 execution_timeout: Duration,
189
190 idle_session_timeout: Duration,
195
196 statement_timeout: Duration,
201}
202
203impl ElectionPgClient {
204 pub fn new(
205 pool: Pool,
206 execution_timeout: Duration,
207 idle_session_timeout: Duration,
208 statement_timeout: Duration,
209 ) -> Result<ElectionPgClient> {
210 Ok(ElectionPgClient {
211 current: None,
212 pool,
213 execution_timeout,
214 idle_session_timeout,
215 statement_timeout,
216 })
217 }
218
219 fn set_idle_session_timeout_sql(&self) -> String {
220 format!(
221 "SET idle_session_timeout = '{}s';",
222 self.idle_session_timeout.as_secs()
223 )
224 }
225
226 fn set_statement_timeout_sql(&self) -> String {
227 format!(
228 "SET statement_timeout = '{}s';",
229 self.statement_timeout.as_secs()
230 )
231 }
232
233 async fn reset_client(&mut self) -> Result<()> {
234 self.current = None;
235 self.maybe_init_client().await
236 }
237
238 async fn maybe_init_client(&mut self) -> Result<()> {
239 if self.current.is_none() {
240 let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
241
242 self.current = Some(client);
243 let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
245 self.execute(&idle_session_timeout_sql, &[]).await?;
246 let statement_timeout_sql = self.set_statement_timeout_sql();
247 self.execute(&statement_timeout_sql, &[]).await?;
248 }
249
250 Ok(())
251 }
252
253 async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
258 let result = tokio::time::timeout(
259 self.execution_timeout,
260 self.current.as_ref().unwrap().execute(sql, params),
261 )
262 .await
263 .map_err(|_| {
264 SqlExecutionTimeoutSnafu {
265 sql: sql.to_string(),
266 duration: self.execution_timeout,
267 }
268 .build()
269 })?;
270
271 result.context(PostgresExecutionSnafu { sql })
272 }
273
274 async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
279 let result = tokio::time::timeout(
280 self.execution_timeout,
281 self.current.as_ref().unwrap().query(sql, params),
282 )
283 .await
284 .map_err(|_| {
285 SqlExecutionTimeoutSnafu {
286 sql: sql.to_string(),
287 duration: self.execution_timeout,
288 }
289 .build()
290 })?;
291
292 result.context(PostgresExecutionSnafu { sql })
293 }
294}
295
296pub struct PgElection {
298 leader_value: String,
299 pg_client: RwLock<ElectionPgClient>,
300 is_leader: AtomicBool,
301 leader_infancy: AtomicBool,
302 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
303 store_key_prefix: String,
304 candidate_lease_ttl: Duration,
305 meta_lease_ttl: Duration,
306 sql_set: ElectionSqlSet,
307}
308
309impl PgElection {
310 async fn maybe_init_client(&self) -> Result<()> {
311 if self.pg_client.read().await.current.is_none() {
312 self.pg_client.write().await.maybe_init_client().await?;
313 }
314
315 Ok(())
316 }
317
318 #[allow(clippy::too_many_arguments)]
319 pub async fn with_pg_client(
320 leader_value: String,
321 pg_client: ElectionPgClient,
322 store_key_prefix: String,
323 candidate_lease_ttl: Duration,
324 meta_lease_ttl: Duration,
325 schema_name: Option<&str>,
326 table_name: &str,
327 lock_id: u64,
328 ) -> Result<ElectionRef> {
329 if let Some(s) = schema_name {
330 common_telemetry::info!("PgElection uses schema: {}", s);
331 } else {
332 common_telemetry::info!("PgElection uses default search_path (no schema provided)");
333 }
334 let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
335
336 let tx = listen_leader_change(leader_value.clone());
337 Ok(Arc::new(Self {
338 leader_value,
339 pg_client: RwLock::new(pg_client),
340 is_leader: AtomicBool::new(false),
341 leader_infancy: AtomicBool::new(false),
342 leader_watcher: tx,
343 store_key_prefix,
344 candidate_lease_ttl,
345 meta_lease_ttl,
346 sql_set: sql_factory.build(),
347 }))
348 }
349
350 fn election_key(&self) -> String {
351 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
352 }
353
354 fn candidate_root(&self) -> String {
355 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
356 }
357
358 fn candidate_key(&self) -> String {
359 format!("{}{}", self.candidate_root(), self.leader_value)
360 }
361}
362
363#[async_trait::async_trait]
364impl Election for PgElection {
365 type Leader = LeaderValue;
366
367 fn is_leader(&self) -> bool {
368 self.is_leader.load(Ordering::Relaxed)
369 }
370
371 fn in_leader_infancy(&self) -> bool {
372 self.leader_infancy
373 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
374 .is_ok()
375 }
376
377 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
378 let key = self.candidate_key();
379 let node_info =
380 serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
381 input: format!("{node_info:?}"),
382 })?;
383 let res = self
384 .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
385 .await?;
386 if !res {
388 self.delete_value(&key).await?;
389 self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
390 .await?;
391 }
392
393 let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
395 loop {
396 let _ = keep_alive_interval.tick().await;
397
398 let lease = self
399 .get_value_with_lease(&key)
400 .await?
401 .context(UnexpectedSnafu {
402 violated: format!("Failed to get lease for key: {:?}", key),
403 })?;
404
405 ensure!(
406 lease.expire_time > lease.current,
407 UnexpectedSnafu {
408 violated: format!(
409 "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
410 lease.expire_time, lease.current, key
411 ),
412 }
413 );
414
415 self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
417 .await?;
418 }
419 }
420
421 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
422 let key_prefix = self.candidate_root();
423 let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
424 candidates.retain(|c| c.1 > current);
426 let mut valid_candidates = Vec::with_capacity(candidates.len());
427 for (c, _) in candidates {
428 let node_info: MetasrvNodeInfo =
429 serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
430 input: format!("{:?}", c),
431 })?;
432 valid_candidates.push(node_info);
433 }
434 Ok(valid_candidates)
435 }
436
437 async fn campaign(&self) -> Result<()> {
450 let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
451 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
452
453 self.maybe_init_client().await?;
454 loop {
455 let res = self
456 .pg_client
457 .read()
458 .await
459 .query(&self.sql_set.campaign, &[])
460 .await?;
461 let row = res.first().context(UnexpectedSnafu {
462 violated: "Failed to get the result of acquiring advisory lock",
463 })?;
464 let is_leader = row.try_get(0).map_err(|_| {
465 UnexpectedSnafu {
466 violated: "Failed to get the result of get lock",
467 }
468 .build()
469 })?;
470 if is_leader {
471 self.leader_action().await?;
472 } else {
473 self.follower_action().await?;
474 }
475 let _ = keep_alive_interval.tick().await;
476 }
477 }
478
479 async fn reset_campaign(&self) {
480 info!("Resetting campaign");
481 if self.is_leader.load(Ordering::Relaxed) {
482 if let Err(err) = self.step_down_without_lock().await {
483 error!(err; "Failed to step down without lock");
484 }
485 info!("Step down without lock successfully, due to reset campaign");
486 }
487 if let Err(err) = self.pg_client.write().await.reset_client().await {
488 error!(err; "Failed to reset client");
489 }
490 }
491
492 async fn leader(&self) -> Result<Self::Leader> {
493 if self.is_leader.load(Ordering::Relaxed) {
494 Ok(self.leader_value.as_bytes().into())
495 } else {
496 let key = self.election_key();
497 if let Some(lease) = self.get_value_with_lease(&key).await? {
498 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
499 Ok(lease.leader_value.as_bytes().into())
500 } else {
501 NoLeaderSnafu.fail()
502 }
503 }
504 }
505
506 async fn resign(&self) -> Result<()> {
507 todo!()
508 }
509
510 fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
511 self.leader_watcher.subscribe()
512 }
513}
514
515impl PgElection {
516 async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
518 let key = key.as_bytes();
519 self.maybe_init_client().await?;
520 let res = self
521 .pg_client
522 .read()
523 .await
524 .query(&self.sql_set.get_value_with_lease, &[&key])
525 .await?;
526
527 if res.is_empty() {
528 Ok(None)
529 } else {
530 let current_time_str = res[0].try_get(1).unwrap_or_default();
532 let current_time = match Timestamp::from_str(current_time_str, None) {
533 Ok(ts) => ts,
534 Err(_) => UnexpectedSnafu {
535 violated: format!("Invalid timestamp: {}", current_time_str),
536 }
537 .fail()?,
538 };
539 let value_and_expire_time =
541 String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
542 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
543
544 Ok(Some(Lease {
545 leader_value: value,
546 expire_time,
547 current: current_time,
548 origin: value_and_expire_time.to_string(),
549 }))
550 }
551 }
552
553 async fn get_value_with_lease_by_prefix(
555 &self,
556 key_prefix: &str,
557 ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
558 let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
559 self.maybe_init_client().await?;
560 let res = self
561 .pg_client
562 .read()
563 .await
564 .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
565 .await?;
566
567 let mut values_with_leases = vec![];
568 let mut current = Timestamp::default();
569 for row in res {
570 let current_time_str = row.try_get(1).unwrap_or_default();
571 current = match Timestamp::from_str(current_time_str, None) {
572 Ok(ts) => ts,
573 Err(_) => UnexpectedSnafu {
574 violated: format!("Invalid timestamp: {}", current_time_str),
575 }
576 .fail()?,
577 };
578
579 let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
580 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
581
582 values_with_leases.push((value, expire_time));
583 }
584 Ok((values_with_leases, current))
585 }
586
587 async fn update_value_with_lease(
588 &self,
589 key: &str,
590 prev: &str,
591 updated: &str,
592 lease_ttl: Duration,
593 ) -> Result<()> {
594 let key = key.as_bytes();
595 let prev = prev.as_bytes();
596 self.maybe_init_client().await?;
597 let lease_ttl_secs = lease_ttl.as_secs() as f64;
598 let res = self
599 .pg_client
600 .read()
601 .await
602 .execute(
603 &self.sql_set.update_value_with_lease,
604 &[&key, &prev, &updated, &lease_ttl_secs],
605 )
606 .await?;
607
608 ensure!(
609 res == 1,
610 UnexpectedSnafu {
611 violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
612 }
613 );
614
615 Ok(())
616 }
617
618 async fn put_value_with_lease(
620 &self,
621 key: &str,
622 value: &str,
623 lease_ttl: Duration,
624 ) -> Result<bool> {
625 let key = key.as_bytes();
626 let lease_ttl_secs = lease_ttl.as_secs() as f64;
627 let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
628 self.maybe_init_client().await?;
629 let res = self
630 .pg_client
631 .read()
632 .await
633 .query(&self.sql_set.put_value_with_lease, ¶ms)
634 .await?;
635 Ok(res.is_empty())
636 }
637
638 async fn delete_value(&self, key: &str) -> Result<bool> {
641 let key = key.as_bytes();
642 self.maybe_init_client().await?;
643 let res = self
644 .pg_client
645 .read()
646 .await
647 .query(&self.sql_set.delete_value, &[&key])
648 .await?;
649
650 Ok(res.len() == 1)
651 }
652
653 async fn leader_action(&self) -> Result<()> {
674 let key = self.election_key();
675 if self.is_leader() {
677 match self.get_value_with_lease(&key).await? {
678 Some(lease) => {
679 match (
680 lease.leader_value == self.leader_value,
681 lease.expire_time > lease.current,
682 ) {
683 (true, true) => {
685 self.update_value_with_lease(
687 &key,
688 &lease.origin,
689 &self.leader_value,
690 self.meta_lease_ttl,
691 )
692 .await?;
693 }
694 (true, false) => {
696 warn!("Leader lease expired, now stepping down.");
697 self.step_down().await?;
698 }
699 (false, _) => {
701 warn!(
702 "Leader lease not found, but still hold the lock. Now stepping down."
703 );
704 self.step_down().await?;
705 }
706 }
707 }
708 None => {
710 warn!("Leader lease not found, but still hold the lock. Now stepping down.");
711 self.step_down().await?;
712 }
713 }
714 } else {
716 self.elected().await?;
717 }
718 Ok(())
719 }
720
721 async fn follower_action(&self) -> Result<()> {
732 let key = self.election_key();
733 if self.is_leader() {
735 self.step_down_without_lock().await?;
736 }
737 let lease = self
738 .get_value_with_lease(&key)
739 .await?
740 .context(NoLeaderSnafu)?;
741 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
743 Ok(())
745 }
746
747 async fn step_down(&self) -> Result<()> {
754 let key = self.election_key();
755 let leader_key = RdsLeaderKey {
756 name: self.leader_value.clone().into_bytes(),
757 key: key.clone().into_bytes(),
758 ..Default::default()
759 };
760 self.delete_value(&key).await?;
761 self.maybe_init_client().await?;
762 self.pg_client
763 .read()
764 .await
765 .query(&self.sql_set.step_down, &[])
766 .await?;
767 send_leader_change_and_set_flags(
768 &self.is_leader,
769 &self.leader_infancy,
770 &self.leader_watcher,
771 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
772 );
773 Ok(())
774 }
775
776 async fn step_down_without_lock(&self) -> Result<()> {
778 let key = self.election_key().into_bytes();
779 let leader_key = RdsLeaderKey {
780 name: self.leader_value.clone().into_bytes(),
781 key: key.clone(),
782 ..Default::default()
783 };
784 send_leader_change_and_set_flags(
785 &self.is_leader,
786 &self.leader_infancy,
787 &self.leader_watcher,
788 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
789 );
790 Ok(())
791 }
792
793 async fn elected(&self) -> Result<()> {
796 let key = self.election_key();
797 let leader_key = RdsLeaderKey {
798 name: self.leader_value.clone().into_bytes(),
799 key: key.clone().into_bytes(),
800 ..Default::default()
801 };
802 self.delete_value(&key).await?;
803 self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
804 .await?;
805
806 if self
807 .is_leader
808 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
809 .is_ok()
810 {
811 self.leader_infancy.store(true, Ordering::Release);
812
813 if let Err(e) = self
814 .leader_watcher
815 .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
816 {
817 error!(e; "Failed to send leader change message");
818 }
819 }
820 Ok(())
821 }
822}
823
824#[cfg(test)]
825mod tests {
826 use std::assert_matches::assert_matches;
827 use std::env;
828
829 use common_meta::maybe_skip_postgres_integration_test;
830
831 use super::*;
832 use crate::error;
833 use crate::utils::postgres::create_postgres_pool;
834
835 async fn create_postgres_client(
836 table_name: Option<&str>,
837 execution_timeout: Duration,
838 idle_session_timeout: Duration,
839 statement_timeout: Duration,
840 ) -> Result<ElectionPgClient> {
841 let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
842 if endpoint.is_empty() {
843 return UnexpectedSnafu {
844 violated: "Postgres endpoint is empty".to_string(),
845 }
846 .fail();
847 }
848 let pool = create_postgres_pool(&[endpoint], None, None).await.unwrap();
849 let mut pg_client = ElectionPgClient::new(
850 pool,
851 execution_timeout,
852 idle_session_timeout,
853 statement_timeout,
854 )
855 .unwrap();
856 pg_client.maybe_init_client().await?;
857 if let Some(table_name) = table_name {
858 let create_table_sql = format!(
859 "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
860 table_name
861 );
862 pg_client.execute(&create_table_sql, &[]).await?;
863 }
864 Ok(pg_client)
865 }
866
867 async fn drop_table(pg_election: &PgElection, table_name: &str) {
868 let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
869 pg_election
870 .pg_client
871 .read()
872 .await
873 .execute(&sql, &[])
874 .await
875 .unwrap();
876 }
877
878 #[tokio::test]
879 async fn test_postgres_crud() {
880 maybe_skip_postgres_integration_test!();
881 let key = "test_key".to_string();
882 let value = "test_value".to_string();
883
884 let uuid = uuid::Uuid::new_v4().to_string();
885 let table_name = "test_postgres_crud_greptime_metakv";
886 let candidate_lease_ttl = Duration::from_secs(10);
887 let execution_timeout = Duration::from_secs(10);
888 let statement_timeout = Duration::from_secs(10);
889 let meta_lease_ttl = Duration::from_secs(2);
890 let idle_session_timeout = Duration::from_secs(0);
891 let client = create_postgres_client(
892 Some(table_name),
893 execution_timeout,
894 idle_session_timeout,
895 statement_timeout,
896 )
897 .await
898 .unwrap();
899
900 let (tx, _) = broadcast::channel(100);
901 let pg_election = PgElection {
902 leader_value: "test_leader".to_string(),
903 pg_client: RwLock::new(client),
904 is_leader: AtomicBool::new(false),
905 leader_infancy: AtomicBool::new(true),
906 leader_watcher: tx,
907 store_key_prefix: uuid,
908 candidate_lease_ttl,
909 meta_lease_ttl,
910 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
911 };
912
913 let res = pg_election
914 .put_value_with_lease(&key, &value, candidate_lease_ttl)
915 .await
916 .unwrap();
917 assert!(res);
918
919 let lease = pg_election
920 .get_value_with_lease(&key)
921 .await
922 .unwrap()
923 .unwrap();
924 assert_eq!(lease.leader_value, value);
925
926 pg_election
927 .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
928 .await
929 .unwrap();
930
931 let res = pg_election.delete_value(&key).await.unwrap();
932 assert!(res);
933
934 let res = pg_election.get_value_with_lease(&key).await.unwrap();
935 assert!(res.is_none());
936
937 for i in 0..10 {
938 let key = format!("test_key_{}", i);
939 let value = format!("test_value_{}", i);
940 pg_election
941 .put_value_with_lease(&key, &value, candidate_lease_ttl)
942 .await
943 .unwrap();
944 }
945
946 let key_prefix = "test_key".to_string();
947 let (res, _) = pg_election
948 .get_value_with_lease_by_prefix(&key_prefix)
949 .await
950 .unwrap();
951 assert_eq!(res.len(), 10);
952
953 for i in 0..10 {
954 let key = format!("test_key_{}", i);
955 let res = pg_election.delete_value(&key).await.unwrap();
956 assert!(res);
957 }
958
959 let (res, current) = pg_election
960 .get_value_with_lease_by_prefix(&key_prefix)
961 .await
962 .unwrap();
963 assert!(res.is_empty());
964 assert!(current == Timestamp::default());
965
966 drop_table(&pg_election, table_name).await;
967 }
968
969 async fn candidate(
970 leader_value: String,
971 candidate_lease_ttl: Duration,
972 store_key_prefix: String,
973 table_name: String,
974 ) {
975 let execution_timeout = Duration::from_secs(10);
976 let statement_timeout = Duration::from_secs(10);
977 let meta_lease_ttl = Duration::from_secs(2);
978 let idle_session_timeout = Duration::from_secs(0);
979 let client = create_postgres_client(
980 None,
981 execution_timeout,
982 idle_session_timeout,
983 statement_timeout,
984 )
985 .await
986 .unwrap();
987
988 let (tx, _) = broadcast::channel(100);
989 let pg_election = PgElection {
990 leader_value,
991 pg_client: RwLock::new(client),
992 is_leader: AtomicBool::new(false),
993 leader_infancy: AtomicBool::new(true),
994 leader_watcher: tx,
995 store_key_prefix,
996 candidate_lease_ttl,
997 meta_lease_ttl,
998 sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
999 };
1000
1001 let node_info = MetasrvNodeInfo {
1002 addr: "test_addr".to_string(),
1003 version: "test_version".to_string(),
1004 git_commit: "test_git_commit".to_string(),
1005 start_time_ms: 0,
1006 total_cpu_millicores: 0,
1007 total_memory_bytes: 0,
1008 cpu_usage_millicores: 0,
1009 memory_usage_bytes: 0,
1010 hostname: "test_hostname".to_string(),
1011 };
1012 pg_election.register_candidate(&node_info).await.unwrap();
1013 }
1014
1015 #[tokio::test]
1016 async fn test_candidate_registration() {
1017 maybe_skip_postgres_integration_test!();
1018 let leader_value_prefix = "test_leader".to_string();
1019 let uuid = uuid::Uuid::new_v4().to_string();
1020 let table_name = "test_candidate_registration_greptime_metakv";
1021 let mut handles = vec![];
1022 let candidate_lease_ttl = Duration::from_secs(5);
1023 let execution_timeout = Duration::from_secs(10);
1024 let statement_timeout = Duration::from_secs(10);
1025 let meta_lease_ttl = Duration::from_secs(2);
1026 let idle_session_timeout = Duration::from_secs(0);
1027 let client = create_postgres_client(
1028 Some(table_name),
1029 execution_timeout,
1030 idle_session_timeout,
1031 statement_timeout,
1032 )
1033 .await
1034 .unwrap();
1035
1036 for i in 0..10 {
1037 let leader_value = format!("{}{}", leader_value_prefix, i);
1038 let handle = tokio::spawn(candidate(
1039 leader_value,
1040 candidate_lease_ttl,
1041 uuid.clone(),
1042 table_name.to_string(),
1043 ));
1044 handles.push(handle);
1045 }
1046 tokio::time::sleep(Duration::from_secs(3)).await;
1048
1049 let (tx, _) = broadcast::channel(100);
1050 let leader_value = "test_leader".to_string();
1051 let pg_election = PgElection {
1052 leader_value,
1053 pg_client: RwLock::new(client),
1054 is_leader: AtomicBool::new(false),
1055 leader_infancy: AtomicBool::new(true),
1056 leader_watcher: tx,
1057 store_key_prefix: uuid.clone(),
1058 candidate_lease_ttl,
1059 meta_lease_ttl,
1060 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1061 };
1062
1063 let candidates = pg_election.all_candidates().await.unwrap();
1064 assert_eq!(candidates.len(), 10);
1065
1066 for handle in handles {
1067 handle.abort();
1068 }
1069
1070 tokio::time::sleep(Duration::from_secs(5)).await;
1072 let candidates = pg_election.all_candidates().await.unwrap();
1073 assert!(candidates.is_empty());
1074
1075 for i in 0..10 {
1077 let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1078 let res = pg_election.delete_value(&key).await.unwrap();
1079 assert!(res);
1080 }
1081
1082 drop_table(&pg_election, table_name).await;
1083 }
1084
1085 #[tokio::test]
1086 async fn test_elected_and_step_down() {
1087 maybe_skip_postgres_integration_test!();
1088 let leader_value = "test_leader".to_string();
1089 let uuid = uuid::Uuid::new_v4().to_string();
1090 let table_name = "test_elected_and_step_down_greptime_metakv";
1091 let candidate_lease_ttl = Duration::from_secs(5);
1092 let execution_timeout = Duration::from_secs(10);
1093 let statement_timeout = Duration::from_secs(10);
1094 let meta_lease_ttl = Duration::from_secs(2);
1095 let idle_session_timeout = Duration::from_secs(0);
1096 let client = create_postgres_client(
1097 Some(table_name),
1098 execution_timeout,
1099 idle_session_timeout,
1100 statement_timeout,
1101 )
1102 .await
1103 .unwrap();
1104
1105 let (tx, mut rx) = broadcast::channel(100);
1106 let leader_pg_election = PgElection {
1107 leader_value: leader_value.clone(),
1108 pg_client: RwLock::new(client),
1109 is_leader: AtomicBool::new(false),
1110 leader_infancy: AtomicBool::new(true),
1111 leader_watcher: tx,
1112 store_key_prefix: uuid,
1113 candidate_lease_ttl,
1114 meta_lease_ttl,
1115 sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1116 };
1117
1118 leader_pg_election.elected().await.unwrap();
1119 let lease = leader_pg_election
1120 .get_value_with_lease(&leader_pg_election.election_key())
1121 .await
1122 .unwrap()
1123 .unwrap();
1124 assert!(lease.leader_value == leader_value);
1125 assert!(lease.expire_time > lease.current);
1126 assert!(leader_pg_election.is_leader());
1127
1128 match rx.recv().await {
1129 Ok(LeaderChangeMessage::Elected(key)) => {
1130 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1131 assert_eq!(
1132 String::from_utf8_lossy(key.key()),
1133 leader_pg_election.election_key()
1134 );
1135 assert_eq!(key.lease_id(), i64::default());
1136 assert_eq!(key.revision(), i64::default());
1137 }
1138 _ => panic!("Expected LeaderChangeMessage::Elected"),
1139 }
1140
1141 leader_pg_election.step_down_without_lock().await.unwrap();
1142 let lease = leader_pg_election
1143 .get_value_with_lease(&leader_pg_election.election_key())
1144 .await
1145 .unwrap()
1146 .unwrap();
1147 assert!(lease.leader_value == leader_value);
1148 assert!(!leader_pg_election.is_leader());
1149
1150 match rx.recv().await {
1151 Ok(LeaderChangeMessage::StepDown(key)) => {
1152 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1153 assert_eq!(
1154 String::from_utf8_lossy(key.key()),
1155 leader_pg_election.election_key()
1156 );
1157 assert_eq!(key.lease_id(), i64::default());
1158 assert_eq!(key.revision(), i64::default());
1159 }
1160 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1161 }
1162
1163 leader_pg_election.elected().await.unwrap();
1164 let lease = leader_pg_election
1165 .get_value_with_lease(&leader_pg_election.election_key())
1166 .await
1167 .unwrap()
1168 .unwrap();
1169 assert!(lease.leader_value == leader_value);
1170 assert!(lease.expire_time > lease.current);
1171 assert!(leader_pg_election.is_leader());
1172
1173 match rx.recv().await {
1174 Ok(LeaderChangeMessage::Elected(key)) => {
1175 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1176 assert_eq!(
1177 String::from_utf8_lossy(key.key()),
1178 leader_pg_election.election_key()
1179 );
1180 assert_eq!(key.lease_id(), i64::default());
1181 assert_eq!(key.revision(), i64::default());
1182 }
1183 _ => panic!("Expected LeaderChangeMessage::Elected"),
1184 }
1185
1186 leader_pg_election.step_down().await.unwrap();
1187 let res = leader_pg_election
1188 .get_value_with_lease(&leader_pg_election.election_key())
1189 .await
1190 .unwrap();
1191 assert!(res.is_none());
1192 assert!(!leader_pg_election.is_leader());
1193
1194 match rx.recv().await {
1195 Ok(LeaderChangeMessage::StepDown(key)) => {
1196 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1197 assert_eq!(
1198 String::from_utf8_lossy(key.key()),
1199 leader_pg_election.election_key()
1200 );
1201 assert_eq!(key.lease_id(), i64::default());
1202 assert_eq!(key.revision(), i64::default());
1203 }
1204 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1205 }
1206
1207 drop_table(&leader_pg_election, table_name).await;
1208 }
1209
1210 #[tokio::test]
1211 async fn test_leader_action() {
1212 maybe_skip_postgres_integration_test!();
1213 let leader_value = "test_leader".to_string();
1214 let uuid = uuid::Uuid::new_v4().to_string();
1215 let table_name = "test_leader_action_greptime_metakv";
1216 let candidate_lease_ttl = Duration::from_secs(5);
1217 let execution_timeout = Duration::from_secs(10);
1218 let statement_timeout = Duration::from_secs(10);
1219 let meta_lease_ttl = Duration::from_secs(2);
1220 let idle_session_timeout = Duration::from_secs(0);
1221 let client = create_postgres_client(
1222 Some(table_name),
1223 execution_timeout,
1224 idle_session_timeout,
1225 statement_timeout,
1226 )
1227 .await
1228 .unwrap();
1229
1230 let (tx, mut rx) = broadcast::channel(100);
1231 let leader_pg_election = PgElection {
1232 leader_value: leader_value.clone(),
1233 pg_client: RwLock::new(client),
1234 is_leader: AtomicBool::new(false),
1235 leader_infancy: AtomicBool::new(true),
1236 leader_watcher: tx,
1237 store_key_prefix: uuid,
1238 candidate_lease_ttl,
1239 meta_lease_ttl,
1240 sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1241 };
1242
1243 let res = leader_pg_election
1245 .pg_client
1246 .read()
1247 .await
1248 .query(&leader_pg_election.sql_set.campaign, &[])
1249 .await
1250 .unwrap();
1251 let res: bool = res[0].get(0);
1252 assert!(res);
1253 leader_pg_election.leader_action().await.unwrap();
1254 let lease = leader_pg_election
1255 .get_value_with_lease(&leader_pg_election.election_key())
1256 .await
1257 .unwrap()
1258 .unwrap();
1259 assert!(lease.leader_value == leader_value);
1260 assert!(lease.expire_time > lease.current);
1261 assert!(leader_pg_election.is_leader());
1262
1263 match rx.recv().await {
1264 Ok(LeaderChangeMessage::Elected(key)) => {
1265 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1266 assert_eq!(
1267 String::from_utf8_lossy(key.key()),
1268 leader_pg_election.election_key()
1269 );
1270 assert_eq!(key.lease_id(), i64::default());
1271 assert_eq!(key.revision(), i64::default());
1272 }
1273 _ => panic!("Expected LeaderChangeMessage::Elected"),
1274 }
1275
1276 let res = leader_pg_election
1278 .pg_client
1279 .read()
1280 .await
1281 .query(&leader_pg_election.sql_set.campaign, &[])
1282 .await
1283 .unwrap();
1284 let res: bool = res[0].get(0);
1285 assert!(res);
1286 leader_pg_election.leader_action().await.unwrap();
1287 let new_lease = leader_pg_election
1288 .get_value_with_lease(&leader_pg_election.election_key())
1289 .await
1290 .unwrap()
1291 .unwrap();
1292 assert!(new_lease.leader_value == leader_value);
1293 assert!(
1294 new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1295 );
1296 assert!(leader_pg_election.is_leader());
1297
1298 tokio::time::sleep(Duration::from_secs(2)).await;
1300
1301 let res = leader_pg_election
1302 .pg_client
1303 .read()
1304 .await
1305 .query(&leader_pg_election.sql_set.campaign, &[])
1306 .await
1307 .unwrap();
1308 let res: bool = res[0].get(0);
1309 assert!(res);
1310 leader_pg_election.leader_action().await.unwrap();
1311 let res = leader_pg_election
1312 .get_value_with_lease(&leader_pg_election.election_key())
1313 .await
1314 .unwrap();
1315 assert!(res.is_none());
1316
1317 match rx.recv().await {
1318 Ok(LeaderChangeMessage::StepDown(key)) => {
1319 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1320 assert_eq!(
1321 String::from_utf8_lossy(key.key()),
1322 leader_pg_election.election_key()
1323 );
1324 assert_eq!(key.lease_id(), i64::default());
1325 assert_eq!(key.revision(), i64::default());
1326 }
1327 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1328 }
1329
1330 let res = leader_pg_election
1332 .pg_client
1333 .read()
1334 .await
1335 .query(&leader_pg_election.sql_set.campaign, &[])
1336 .await
1337 .unwrap();
1338 let res: bool = res[0].get(0);
1339 assert!(res);
1340 leader_pg_election.leader_action().await.unwrap();
1341 let lease = leader_pg_election
1342 .get_value_with_lease(&leader_pg_election.election_key())
1343 .await
1344 .unwrap()
1345 .unwrap();
1346 assert!(lease.leader_value == leader_value);
1347 assert!(lease.expire_time > lease.current);
1348 assert!(leader_pg_election.is_leader());
1349
1350 match rx.recv().await {
1351 Ok(LeaderChangeMessage::Elected(key)) => {
1352 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1353 assert_eq!(
1354 String::from_utf8_lossy(key.key()),
1355 leader_pg_election.election_key()
1356 );
1357 assert_eq!(key.lease_id(), i64::default());
1358 assert_eq!(key.revision(), i64::default());
1359 }
1360 _ => panic!("Expected LeaderChangeMessage::Elected"),
1361 }
1362
1363 leader_pg_election
1365 .delete_value(&leader_pg_election.election_key())
1366 .await
1367 .unwrap();
1368 leader_pg_election.leader_action().await.unwrap();
1369 let res = leader_pg_election
1370 .get_value_with_lease(&leader_pg_election.election_key())
1371 .await
1372 .unwrap();
1373 assert!(res.is_none());
1374 assert!(!leader_pg_election.is_leader());
1375
1376 match rx.recv().await {
1377 Ok(LeaderChangeMessage::StepDown(key)) => {
1378 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1379 assert_eq!(
1380 String::from_utf8_lossy(key.key()),
1381 leader_pg_election.election_key()
1382 );
1383 assert_eq!(key.lease_id(), i64::default());
1384 assert_eq!(key.revision(), i64::default());
1385 }
1386 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1387 }
1388
1389 let res = leader_pg_election
1391 .pg_client
1392 .read()
1393 .await
1394 .query(&leader_pg_election.sql_set.campaign, &[])
1395 .await
1396 .unwrap();
1397 let res: bool = res[0].get(0);
1398 assert!(res);
1399 leader_pg_election.leader_action().await.unwrap();
1400 let lease = leader_pg_election
1401 .get_value_with_lease(&leader_pg_election.election_key())
1402 .await
1403 .unwrap()
1404 .unwrap();
1405 assert!(lease.leader_value == leader_value);
1406 assert!(lease.expire_time > lease.current);
1407 assert!(leader_pg_election.is_leader());
1408
1409 match rx.recv().await {
1410 Ok(LeaderChangeMessage::Elected(key)) => {
1411 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1412 assert_eq!(
1413 String::from_utf8_lossy(key.key()),
1414 leader_pg_election.election_key()
1415 );
1416 assert_eq!(key.lease_id(), i64::default());
1417 assert_eq!(key.revision(), i64::default());
1418 }
1419 _ => panic!("Expected LeaderChangeMessage::Elected"),
1420 }
1421
1422 let res = leader_pg_election
1424 .pg_client
1425 .read()
1426 .await
1427 .query(&leader_pg_election.sql_set.campaign, &[])
1428 .await
1429 .unwrap();
1430 let res: bool = res[0].get(0);
1431 assert!(res);
1432 leader_pg_election
1433 .delete_value(&leader_pg_election.election_key())
1434 .await
1435 .unwrap();
1436 leader_pg_election
1437 .put_value_with_lease(
1438 &leader_pg_election.election_key(),
1439 "test",
1440 Duration::from_secs(10),
1441 )
1442 .await
1443 .unwrap();
1444 leader_pg_election.leader_action().await.unwrap();
1445 let res = leader_pg_election
1446 .get_value_with_lease(&leader_pg_election.election_key())
1447 .await
1448 .unwrap();
1449 assert!(res.is_none());
1450 assert!(!leader_pg_election.is_leader());
1451
1452 match rx.recv().await {
1453 Ok(LeaderChangeMessage::StepDown(key)) => {
1454 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1455 assert_eq!(
1456 String::from_utf8_lossy(key.key()),
1457 leader_pg_election.election_key()
1458 );
1459 assert_eq!(key.lease_id(), i64::default());
1460 assert_eq!(key.revision(), i64::default());
1461 }
1462 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1463 }
1464
1465 leader_pg_election
1467 .pg_client
1468 .read()
1469 .await
1470 .query(&leader_pg_election.sql_set.step_down, &[])
1471 .await
1472 .unwrap();
1473
1474 drop_table(&leader_pg_election, table_name).await;
1475 }
1476
1477 #[tokio::test]
1478 async fn test_follower_action() {
1479 maybe_skip_postgres_integration_test!();
1480 common_telemetry::init_default_ut_logging();
1481 let uuid = uuid::Uuid::new_v4().to_string();
1482 let table_name = "test_follower_action_greptime_metakv";
1483
1484 let candidate_lease_ttl = Duration::from_secs(5);
1485 let execution_timeout = Duration::from_secs(10);
1486 let statement_timeout = Duration::from_secs(10);
1487 let meta_lease_ttl = Duration::from_secs(2);
1488 let idle_session_timeout = Duration::from_secs(0);
1489 let follower_client = create_postgres_client(
1490 Some(table_name),
1491 execution_timeout,
1492 idle_session_timeout,
1493 statement_timeout,
1494 )
1495 .await
1496 .unwrap();
1497 let (tx, mut rx) = broadcast::channel(100);
1498 let follower_pg_election = PgElection {
1499 leader_value: "test_follower".to_string(),
1500 pg_client: RwLock::new(follower_client),
1501 is_leader: AtomicBool::new(false),
1502 leader_infancy: AtomicBool::new(true),
1503 leader_watcher: tx,
1504 store_key_prefix: uuid.clone(),
1505 candidate_lease_ttl,
1506 meta_lease_ttl,
1507 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1508 };
1509
1510 let leader_client = create_postgres_client(
1511 Some(table_name),
1512 execution_timeout,
1513 idle_session_timeout,
1514 statement_timeout,
1515 )
1516 .await
1517 .unwrap();
1518 let (tx, _) = broadcast::channel(100);
1519 let leader_pg_election = PgElection {
1520 leader_value: "test_leader".to_string(),
1521 pg_client: RwLock::new(leader_client),
1522 is_leader: AtomicBool::new(false),
1523 leader_infancy: AtomicBool::new(true),
1524 leader_watcher: tx,
1525 store_key_prefix: uuid,
1526 candidate_lease_ttl,
1527 meta_lease_ttl,
1528 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1529 };
1530
1531 leader_pg_election
1532 .pg_client
1533 .read()
1534 .await
1535 .query(&leader_pg_election.sql_set.campaign, &[])
1536 .await
1537 .unwrap();
1538 leader_pg_election.elected().await.unwrap();
1539
1540 follower_pg_election.follower_action().await.unwrap();
1542
1543 tokio::time::sleep(Duration::from_secs(2)).await;
1545 assert!(follower_pg_election.follower_action().await.is_err());
1546
1547 leader_pg_election
1549 .delete_value(&leader_pg_election.election_key())
1550 .await
1551 .unwrap();
1552 assert!(follower_pg_election.follower_action().await.is_err());
1553
1554 follower_pg_election
1556 .is_leader
1557 .store(true, Ordering::Relaxed);
1558 assert!(follower_pg_election.follower_action().await.is_err());
1559
1560 match rx.recv().await {
1561 Ok(LeaderChangeMessage::StepDown(key)) => {
1562 assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1563 assert_eq!(
1564 String::from_utf8_lossy(key.key()),
1565 follower_pg_election.election_key()
1566 );
1567 assert_eq!(key.lease_id(), i64::default());
1568 assert_eq!(key.revision(), i64::default());
1569 }
1570 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1571 }
1572
1573 leader_pg_election
1575 .pg_client
1576 .read()
1577 .await
1578 .query(&leader_pg_election.sql_set.step_down, &[])
1579 .await
1580 .unwrap();
1581
1582 drop_table(&follower_pg_election, table_name).await;
1583 }
1584
1585 #[tokio::test]
1586 async fn test_idle_session_timeout() {
1587 maybe_skip_postgres_integration_test!();
1588 common_telemetry::init_default_ut_logging();
1589 let execution_timeout = Duration::from_secs(10);
1590 let statement_timeout = Duration::from_secs(10);
1591 let idle_session_timeout = Duration::from_secs(1);
1592 let mut client = create_postgres_client(
1593 None,
1594 execution_timeout,
1595 idle_session_timeout,
1596 statement_timeout,
1597 )
1598 .await
1599 .unwrap();
1600 tokio::time::sleep(Duration::from_millis(1100)).await;
1601 let err = client.query("SELECT 1", &[]).await.unwrap_err();
1603 assert_matches!(err, error::Error::PostgresExecution { .. });
1604 let error::Error::PostgresExecution { error, .. } = err else {
1605 panic!("Expected PostgresExecution error");
1606 };
1607 assert!(error.is_closed());
1608 client.reset_client().await.unwrap();
1610 let _ = client.query("SELECT 1", &[]).await.unwrap();
1611 }
1612
1613 #[test]
1614 fn test_election_sql_with_schema() {
1615 let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1616 let s = f.build();
1617 assert!(s.campaign.contains("pg_try_advisory_lock"));
1618 assert!(
1619 s.put_value_with_lease
1620 .contains("\"test_schema\".\"greptime_metakv\"")
1621 );
1622 assert!(
1623 s.update_value_with_lease
1624 .contains("\"test_schema\".\"greptime_metakv\"")
1625 );
1626 assert!(
1627 s.get_value_with_lease
1628 .contains("\"test_schema\".\"greptime_metakv\"")
1629 );
1630 assert!(
1631 s.get_value_with_lease_by_prefix
1632 .contains("\"test_schema\".\"greptime_metakv\"")
1633 );
1634 assert!(
1635 s.delete_value
1636 .contains("\"test_schema\".\"greptime_metakv\"")
1637 );
1638 }
1639}