1use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, warn};
21use common_time::Timestamp;
22use deadpool_postgres::{Manager, Pool};
23use snafu::{OptionExt, ResultExt, ensure};
24use tokio::sync::{RwLock, broadcast};
25use tokio::time::MissedTickBehavior;
26use tokio_postgres::Row;
27use tokio_postgres::types::ToSql;
28
29use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
30use crate::election::{
31 Election, LeaderChangeMessage, listen_leader_change, send_leader_change_and_set_flags,
32};
33use crate::error::{
34 DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35 Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40 lock_id: u64,
41 schema_name: Option<&'a str>,
42 table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46 campaign: String,
47 step_down: String,
48 put_value_with_lease: String,
58 update_value_with_lease: String,
66 get_value_with_lease: String,
71 get_value_with_lease_by_prefix: String,
80 delete_value: String,
89}
90
91impl<'a> ElectionSqlFactory<'a> {
92 fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
93 Self {
94 lock_id,
95 schema_name,
96 table_name,
97 }
98 }
99
100 fn table_ident(&self) -> String {
101 match self.schema_name {
102 Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
103 _ => format!("\"{}\"", self.table_name),
104 }
105 }
106
107 fn build(self) -> ElectionSqlSet {
108 ElectionSqlSet {
109 campaign: self.campaign_sql(),
110 step_down: self.step_down_sql(),
111 put_value_with_lease: self.put_value_with_lease_sql(),
112 update_value_with_lease: self.update_value_with_lease_sql(),
113 get_value_with_lease: self.get_value_with_lease_sql(),
114 get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
115 delete_value: self.delete_value_sql(),
116 }
117 }
118
119 fn campaign_sql(&self) -> String {
120 format!("SELECT pg_try_advisory_lock({})", self.lock_id)
121 }
122
123 fn step_down_sql(&self) -> String {
124 format!("SELECT pg_advisory_unlock({})", self.lock_id)
125 }
126
127 fn put_value_with_lease_sql(&self) -> String {
128 let table = self.table_ident();
129 format!(
130 r#"WITH prev AS (
131 SELECT k, v FROM {table} WHERE k = $1
132 ), insert AS (
133 INSERT INTO {table}
134 VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
135 ON CONFLICT (k) DO NOTHING
136 )
137 SELECT k, v FROM prev;
138 "#,
139 table = table,
140 lease_sep = LEASE_SEP
141 )
142 }
143
144 fn update_value_with_lease_sql(&self) -> String {
145 let table = self.table_ident();
146 format!(
147 r#"UPDATE {table}
148 SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
149 WHERE k = $1 AND v = $2"#,
150 table = table,
151 lease_sep = LEASE_SEP
152 )
153 }
154
155 fn get_value_with_lease_sql(&self) -> String {
156 let table = self.table_ident();
157 format!(
158 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
159 table = table
160 )
161 }
162
163 fn get_value_with_lease_by_prefix_sql(&self) -> String {
164 let table = self.table_ident();
165 format!(
166 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
167 table = table
168 )
169 }
170
171 fn delete_value_sql(&self) -> String {
172 let table = self.table_ident();
173 format!(
174 "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
175 table = table
176 )
177 }
178}
179
180pub struct ElectionPgClient {
182 current: Option<deadpool::managed::Object<Manager>>,
183 pool: Pool,
184 execution_timeout: Duration,
189
190 idle_session_timeout: Duration,
195
196 statement_timeout: Duration,
201}
202
203impl ElectionPgClient {
204 pub fn new(
205 pool: Pool,
206 execution_timeout: Duration,
207 idle_session_timeout: Duration,
208 statement_timeout: Duration,
209 ) -> Result<ElectionPgClient> {
210 Ok(ElectionPgClient {
211 current: None,
212 pool,
213 execution_timeout,
214 idle_session_timeout,
215 statement_timeout,
216 })
217 }
218
219 fn set_idle_session_timeout_sql(&self) -> String {
220 format!(
221 "SET idle_session_timeout = '{}s';",
222 self.idle_session_timeout.as_secs()
223 )
224 }
225
226 fn set_statement_timeout_sql(&self) -> String {
227 format!(
228 "SET statement_timeout = '{}s';",
229 self.statement_timeout.as_secs()
230 )
231 }
232
233 async fn reset_client(&mut self) -> Result<()> {
234 self.current = None;
235 self.maybe_init_client().await
236 }
237
238 async fn maybe_init_client(&mut self) -> Result<()> {
239 if self.current.is_none() {
240 let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
241
242 self.current = Some(client);
243 let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
245 self.execute(&idle_session_timeout_sql, &[]).await?;
246 let statement_timeout_sql = self.set_statement_timeout_sql();
247 self.execute(&statement_timeout_sql, &[]).await?;
248 }
249
250 Ok(())
251 }
252
253 async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
258 let result = tokio::time::timeout(
259 self.execution_timeout,
260 self.current.as_ref().unwrap().execute(sql, params),
261 )
262 .await
263 .map_err(|_| {
264 SqlExecutionTimeoutSnafu {
265 sql: sql.to_string(),
266 duration: self.execution_timeout,
267 }
268 .build()
269 })?;
270
271 result.context(PostgresExecutionSnafu { sql })
272 }
273
274 async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
279 let result = tokio::time::timeout(
280 self.execution_timeout,
281 self.current.as_ref().unwrap().query(sql, params),
282 )
283 .await
284 .map_err(|_| {
285 SqlExecutionTimeoutSnafu {
286 sql: sql.to_string(),
287 duration: self.execution_timeout,
288 }
289 .build()
290 })?;
291
292 result.context(PostgresExecutionSnafu { sql })
293 }
294}
295
296pub struct PgElection {
298 leader_value: String,
299 pg_client: RwLock<ElectionPgClient>,
300 is_leader: AtomicBool,
301 leader_infancy: AtomicBool,
302 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
303 store_key_prefix: String,
304 candidate_lease_ttl: Duration,
305 meta_lease_ttl: Duration,
306 sql_set: ElectionSqlSet,
307}
308
309impl PgElection {
310 async fn maybe_init_client(&self) -> Result<()> {
311 if self.pg_client.read().await.current.is_none() {
312 self.pg_client.write().await.maybe_init_client().await?;
313 }
314
315 Ok(())
316 }
317
318 #[allow(clippy::too_many_arguments)]
319 pub async fn with_pg_client(
320 leader_value: String,
321 pg_client: ElectionPgClient,
322 store_key_prefix: String,
323 candidate_lease_ttl: Duration,
324 meta_lease_ttl: Duration,
325 schema_name: Option<&str>,
326 table_name: &str,
327 lock_id: u64,
328 ) -> Result<ElectionRef> {
329 if let Some(s) = schema_name {
330 common_telemetry::info!("PgElection uses schema: {}", s);
331 } else {
332 common_telemetry::info!("PgElection uses default search_path (no schema provided)");
333 }
334 let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
335
336 let tx = listen_leader_change(leader_value.clone());
337 Ok(Arc::new(Self {
338 leader_value,
339 pg_client: RwLock::new(pg_client),
340 is_leader: AtomicBool::new(false),
341 leader_infancy: AtomicBool::new(false),
342 leader_watcher: tx,
343 store_key_prefix,
344 candidate_lease_ttl,
345 meta_lease_ttl,
346 sql_set: sql_factory.build(),
347 }))
348 }
349
350 fn election_key(&self) -> String {
351 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
352 }
353
354 fn candidate_root(&self) -> String {
355 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
356 }
357
358 fn candidate_key(&self) -> String {
359 format!("{}{}", self.candidate_root(), self.leader_value)
360 }
361}
362
363#[async_trait::async_trait]
364impl Election for PgElection {
365 type Leader = LeaderValue;
366
367 fn is_leader(&self) -> bool {
368 self.is_leader.load(Ordering::Relaxed)
369 }
370
371 fn in_leader_infancy(&self) -> bool {
372 self.leader_infancy
373 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
374 .is_ok()
375 }
376
377 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
378 let key = self.candidate_key();
379 let node_info =
380 serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
381 input: format!("{node_info:?}"),
382 })?;
383 let res = self
384 .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
385 .await?;
386 if !res {
388 self.delete_value(&key).await?;
389 self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
390 .await?;
391 }
392
393 let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
395 loop {
396 let _ = keep_alive_interval.tick().await;
397
398 let lease = self
399 .get_value_with_lease(&key)
400 .await?
401 .context(UnexpectedSnafu {
402 violated: format!("Failed to get lease for key: {:?}", key),
403 })?;
404
405 ensure!(
406 lease.expire_time > lease.current,
407 UnexpectedSnafu {
408 violated: format!(
409 "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
410 lease.expire_time, lease.current, key
411 ),
412 }
413 );
414
415 self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
417 .await?;
418 }
419 }
420
421 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
422 let key_prefix = self.candidate_root();
423 let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
424 candidates.retain(|c| c.1 > current);
426 let mut valid_candidates = Vec::with_capacity(candidates.len());
427 for (c, _) in candidates {
428 let node_info: MetasrvNodeInfo =
429 serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
430 input: format!("{:?}", c),
431 })?;
432 valid_candidates.push(node_info);
433 }
434 Ok(valid_candidates)
435 }
436
437 async fn campaign(&self) -> Result<()> {
450 let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
451 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
452
453 self.maybe_init_client().await?;
454 loop {
455 let res = self
456 .pg_client
457 .read()
458 .await
459 .query(&self.sql_set.campaign, &[])
460 .await?;
461 let row = res.first().context(UnexpectedSnafu {
462 violated: "Failed to get the result of acquiring advisory lock",
463 })?;
464 let is_leader = row.try_get(0).map_err(|_| {
465 UnexpectedSnafu {
466 violated: "Failed to get the result of get lock",
467 }
468 .build()
469 })?;
470 if is_leader {
471 self.leader_action().await?;
472 } else {
473 self.follower_action().await?;
474 }
475 let _ = keep_alive_interval.tick().await;
476 }
477 }
478
479 async fn reset_campaign(&self) {
480 if let Err(err) = self.pg_client.write().await.reset_client().await {
481 error!(err; "Failed to reset client");
482 }
483 }
484
485 async fn leader(&self) -> Result<Self::Leader> {
486 if self.is_leader.load(Ordering::Relaxed) {
487 Ok(self.leader_value.as_bytes().into())
488 } else {
489 let key = self.election_key();
490 if let Some(lease) = self.get_value_with_lease(&key).await? {
491 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
492 Ok(lease.leader_value.as_bytes().into())
493 } else {
494 NoLeaderSnafu.fail()
495 }
496 }
497 }
498
499 async fn resign(&self) -> Result<()> {
500 todo!()
501 }
502
503 fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
504 self.leader_watcher.subscribe()
505 }
506}
507
508impl PgElection {
509 async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
511 let key = key.as_bytes();
512 self.maybe_init_client().await?;
513 let res = self
514 .pg_client
515 .read()
516 .await
517 .query(&self.sql_set.get_value_with_lease, &[&key])
518 .await?;
519
520 if res.is_empty() {
521 Ok(None)
522 } else {
523 let current_time_str = res[0].try_get(1).unwrap_or_default();
525 let current_time = match Timestamp::from_str(current_time_str, None) {
526 Ok(ts) => ts,
527 Err(_) => UnexpectedSnafu {
528 violated: format!("Invalid timestamp: {}", current_time_str),
529 }
530 .fail()?,
531 };
532 let value_and_expire_time =
534 String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
535 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
536
537 Ok(Some(Lease {
538 leader_value: value,
539 expire_time,
540 current: current_time,
541 origin: value_and_expire_time.to_string(),
542 }))
543 }
544 }
545
546 async fn get_value_with_lease_by_prefix(
548 &self,
549 key_prefix: &str,
550 ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
551 let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
552 self.maybe_init_client().await?;
553 let res = self
554 .pg_client
555 .read()
556 .await
557 .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
558 .await?;
559
560 let mut values_with_leases = vec![];
561 let mut current = Timestamp::default();
562 for row in res {
563 let current_time_str = row.try_get(1).unwrap_or_default();
564 current = match Timestamp::from_str(current_time_str, None) {
565 Ok(ts) => ts,
566 Err(_) => UnexpectedSnafu {
567 violated: format!("Invalid timestamp: {}", current_time_str),
568 }
569 .fail()?,
570 };
571
572 let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
573 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
574
575 values_with_leases.push((value, expire_time));
576 }
577 Ok((values_with_leases, current))
578 }
579
580 async fn update_value_with_lease(
581 &self,
582 key: &str,
583 prev: &str,
584 updated: &str,
585 lease_ttl: Duration,
586 ) -> Result<()> {
587 let key = key.as_bytes();
588 let prev = prev.as_bytes();
589 self.maybe_init_client().await?;
590 let lease_ttl_secs = lease_ttl.as_secs() as f64;
591 let res = self
592 .pg_client
593 .read()
594 .await
595 .execute(
596 &self.sql_set.update_value_with_lease,
597 &[&key, &prev, &updated, &lease_ttl_secs],
598 )
599 .await?;
600
601 ensure!(
602 res == 1,
603 UnexpectedSnafu {
604 violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
605 }
606 );
607
608 Ok(())
609 }
610
611 async fn put_value_with_lease(
613 &self,
614 key: &str,
615 value: &str,
616 lease_ttl: Duration,
617 ) -> Result<bool> {
618 let key = key.as_bytes();
619 let lease_ttl_secs = lease_ttl.as_secs() as f64;
620 let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
621 self.maybe_init_client().await?;
622 let res = self
623 .pg_client
624 .read()
625 .await
626 .query(&self.sql_set.put_value_with_lease, ¶ms)
627 .await?;
628 Ok(res.is_empty())
629 }
630
631 async fn delete_value(&self, key: &str) -> Result<bool> {
634 let key = key.as_bytes();
635 self.maybe_init_client().await?;
636 let res = self
637 .pg_client
638 .read()
639 .await
640 .query(&self.sql_set.delete_value, &[&key])
641 .await?;
642
643 Ok(res.len() == 1)
644 }
645
646 async fn leader_action(&self) -> Result<()> {
667 let key = self.election_key();
668 if self.is_leader() {
670 match self.get_value_with_lease(&key).await? {
671 Some(lease) => {
672 match (
673 lease.leader_value == self.leader_value,
674 lease.expire_time > lease.current,
675 ) {
676 (true, true) => {
678 self.update_value_with_lease(
680 &key,
681 &lease.origin,
682 &self.leader_value,
683 self.meta_lease_ttl,
684 )
685 .await?;
686 }
687 (true, false) => {
689 warn!("Leader lease expired, now stepping down.");
690 self.step_down().await?;
691 }
692 (false, _) => {
694 warn!(
695 "Leader lease not found, but still hold the lock. Now stepping down."
696 );
697 self.step_down().await?;
698 }
699 }
700 }
701 None => {
703 warn!("Leader lease not found, but still hold the lock. Now stepping down.");
704 self.step_down().await?;
705 }
706 }
707 } else {
709 self.elected().await?;
710 }
711 Ok(())
712 }
713
714 async fn follower_action(&self) -> Result<()> {
725 let key = self.election_key();
726 if self.is_leader() {
728 self.step_down_without_lock().await?;
729 }
730 let lease = self
731 .get_value_with_lease(&key)
732 .await?
733 .context(NoLeaderSnafu)?;
734 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
736 Ok(())
738 }
739
740 async fn step_down(&self) -> Result<()> {
747 let key = self.election_key();
748 let leader_key = RdsLeaderKey {
749 name: self.leader_value.clone().into_bytes(),
750 key: key.clone().into_bytes(),
751 ..Default::default()
752 };
753 self.delete_value(&key).await?;
754 self.maybe_init_client().await?;
755 self.pg_client
756 .read()
757 .await
758 .query(&self.sql_set.step_down, &[])
759 .await?;
760 send_leader_change_and_set_flags(
761 &self.is_leader,
762 &self.leader_infancy,
763 &self.leader_watcher,
764 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
765 );
766 Ok(())
767 }
768
769 async fn step_down_without_lock(&self) -> Result<()> {
771 let key = self.election_key().into_bytes();
772 let leader_key = RdsLeaderKey {
773 name: self.leader_value.clone().into_bytes(),
774 key: key.clone(),
775 ..Default::default()
776 };
777 if self
778 .is_leader
779 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
780 .is_ok()
781 && let Err(e) = self
782 .leader_watcher
783 .send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
784 {
785 error!(e; "Failed to send leader change message");
786 }
787 Ok(())
788 }
789
790 async fn elected(&self) -> Result<()> {
793 let key = self.election_key();
794 let leader_key = RdsLeaderKey {
795 name: self.leader_value.clone().into_bytes(),
796 key: key.clone().into_bytes(),
797 ..Default::default()
798 };
799 self.delete_value(&key).await?;
800 self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
801 .await?;
802
803 if self
804 .is_leader
805 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
806 .is_ok()
807 {
808 self.leader_infancy.store(true, Ordering::Release);
809
810 if let Err(e) = self
811 .leader_watcher
812 .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
813 {
814 error!(e; "Failed to send leader change message");
815 }
816 }
817 Ok(())
818 }
819}
820
821#[cfg(test)]
822mod tests {
823 use std::assert_matches::assert_matches;
824 use std::env;
825
826 use common_meta::maybe_skip_postgres_integration_test;
827
828 use super::*;
829 use crate::error;
830 use crate::utils::postgres::create_postgres_pool;
831
832 async fn create_postgres_client(
833 table_name: Option<&str>,
834 execution_timeout: Duration,
835 idle_session_timeout: Duration,
836 statement_timeout: Duration,
837 ) -> Result<ElectionPgClient> {
838 let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
839 if endpoint.is_empty() {
840 return UnexpectedSnafu {
841 violated: "Postgres endpoint is empty".to_string(),
842 }
843 .fail();
844 }
845 let pool = create_postgres_pool(&[endpoint], None, None).await.unwrap();
846 let mut pg_client = ElectionPgClient::new(
847 pool,
848 execution_timeout,
849 idle_session_timeout,
850 statement_timeout,
851 )
852 .unwrap();
853 pg_client.maybe_init_client().await?;
854 if let Some(table_name) = table_name {
855 let create_table_sql = format!(
856 "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
857 table_name
858 );
859 pg_client.execute(&create_table_sql, &[]).await?;
860 }
861 Ok(pg_client)
862 }
863
864 async fn drop_table(pg_election: &PgElection, table_name: &str) {
865 let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
866 pg_election
867 .pg_client
868 .read()
869 .await
870 .execute(&sql, &[])
871 .await
872 .unwrap();
873 }
874
875 #[tokio::test]
876 async fn test_postgres_crud() {
877 maybe_skip_postgres_integration_test!();
878 let key = "test_key".to_string();
879 let value = "test_value".to_string();
880
881 let uuid = uuid::Uuid::new_v4().to_string();
882 let table_name = "test_postgres_crud_greptime_metakv";
883 let candidate_lease_ttl = Duration::from_secs(10);
884 let execution_timeout = Duration::from_secs(10);
885 let statement_timeout = Duration::from_secs(10);
886 let meta_lease_ttl = Duration::from_secs(2);
887 let idle_session_timeout = Duration::from_secs(0);
888 let client = create_postgres_client(
889 Some(table_name),
890 execution_timeout,
891 idle_session_timeout,
892 statement_timeout,
893 )
894 .await
895 .unwrap();
896
897 let (tx, _) = broadcast::channel(100);
898 let pg_election = PgElection {
899 leader_value: "test_leader".to_string(),
900 pg_client: RwLock::new(client),
901 is_leader: AtomicBool::new(false),
902 leader_infancy: AtomicBool::new(true),
903 leader_watcher: tx,
904 store_key_prefix: uuid,
905 candidate_lease_ttl,
906 meta_lease_ttl,
907 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
908 };
909
910 let res = pg_election
911 .put_value_with_lease(&key, &value, candidate_lease_ttl)
912 .await
913 .unwrap();
914 assert!(res);
915
916 let lease = pg_election
917 .get_value_with_lease(&key)
918 .await
919 .unwrap()
920 .unwrap();
921 assert_eq!(lease.leader_value, value);
922
923 pg_election
924 .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
925 .await
926 .unwrap();
927
928 let res = pg_election.delete_value(&key).await.unwrap();
929 assert!(res);
930
931 let res = pg_election.get_value_with_lease(&key).await.unwrap();
932 assert!(res.is_none());
933
934 for i in 0..10 {
935 let key = format!("test_key_{}", i);
936 let value = format!("test_value_{}", i);
937 pg_election
938 .put_value_with_lease(&key, &value, candidate_lease_ttl)
939 .await
940 .unwrap();
941 }
942
943 let key_prefix = "test_key".to_string();
944 let (res, _) = pg_election
945 .get_value_with_lease_by_prefix(&key_prefix)
946 .await
947 .unwrap();
948 assert_eq!(res.len(), 10);
949
950 for i in 0..10 {
951 let key = format!("test_key_{}", i);
952 let res = pg_election.delete_value(&key).await.unwrap();
953 assert!(res);
954 }
955
956 let (res, current) = pg_election
957 .get_value_with_lease_by_prefix(&key_prefix)
958 .await
959 .unwrap();
960 assert!(res.is_empty());
961 assert!(current == Timestamp::default());
962
963 drop_table(&pg_election, table_name).await;
964 }
965
966 async fn candidate(
967 leader_value: String,
968 candidate_lease_ttl: Duration,
969 store_key_prefix: String,
970 table_name: String,
971 ) {
972 let execution_timeout = Duration::from_secs(10);
973 let statement_timeout = Duration::from_secs(10);
974 let meta_lease_ttl = Duration::from_secs(2);
975 let idle_session_timeout = Duration::from_secs(0);
976 let client = create_postgres_client(
977 None,
978 execution_timeout,
979 idle_session_timeout,
980 statement_timeout,
981 )
982 .await
983 .unwrap();
984
985 let (tx, _) = broadcast::channel(100);
986 let pg_election = PgElection {
987 leader_value,
988 pg_client: RwLock::new(client),
989 is_leader: AtomicBool::new(false),
990 leader_infancy: AtomicBool::new(true),
991 leader_watcher: tx,
992 store_key_prefix,
993 candidate_lease_ttl,
994 meta_lease_ttl,
995 sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
996 };
997
998 let node_info = MetasrvNodeInfo {
999 addr: "test_addr".to_string(),
1000 version: "test_version".to_string(),
1001 git_commit: "test_git_commit".to_string(),
1002 start_time_ms: 0,
1003 cpus: 0,
1004 memory_bytes: 0,
1005 };
1006 pg_election.register_candidate(&node_info).await.unwrap();
1007 }
1008
1009 #[tokio::test]
1010 async fn test_candidate_registration() {
1011 maybe_skip_postgres_integration_test!();
1012 let leader_value_prefix = "test_leader".to_string();
1013 let uuid = uuid::Uuid::new_v4().to_string();
1014 let table_name = "test_candidate_registration_greptime_metakv";
1015 let mut handles = vec![];
1016 let candidate_lease_ttl = Duration::from_secs(5);
1017 let execution_timeout = Duration::from_secs(10);
1018 let statement_timeout = Duration::from_secs(10);
1019 let meta_lease_ttl = Duration::from_secs(2);
1020 let idle_session_timeout = Duration::from_secs(0);
1021 let client = create_postgres_client(
1022 Some(table_name),
1023 execution_timeout,
1024 idle_session_timeout,
1025 statement_timeout,
1026 )
1027 .await
1028 .unwrap();
1029
1030 for i in 0..10 {
1031 let leader_value = format!("{}{}", leader_value_prefix, i);
1032 let handle = tokio::spawn(candidate(
1033 leader_value,
1034 candidate_lease_ttl,
1035 uuid.clone(),
1036 table_name.to_string(),
1037 ));
1038 handles.push(handle);
1039 }
1040 tokio::time::sleep(Duration::from_secs(3)).await;
1042
1043 let (tx, _) = broadcast::channel(100);
1044 let leader_value = "test_leader".to_string();
1045 let pg_election = PgElection {
1046 leader_value,
1047 pg_client: RwLock::new(client),
1048 is_leader: AtomicBool::new(false),
1049 leader_infancy: AtomicBool::new(true),
1050 leader_watcher: tx,
1051 store_key_prefix: uuid.clone(),
1052 candidate_lease_ttl,
1053 meta_lease_ttl,
1054 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1055 };
1056
1057 let candidates = pg_election.all_candidates().await.unwrap();
1058 assert_eq!(candidates.len(), 10);
1059
1060 for handle in handles {
1061 handle.abort();
1062 }
1063
1064 tokio::time::sleep(Duration::from_secs(5)).await;
1066 let candidates = pg_election.all_candidates().await.unwrap();
1067 assert!(candidates.is_empty());
1068
1069 for i in 0..10 {
1071 let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1072 let res = pg_election.delete_value(&key).await.unwrap();
1073 assert!(res);
1074 }
1075
1076 drop_table(&pg_election, table_name).await;
1077 }
1078
1079 #[tokio::test]
1080 async fn test_elected_and_step_down() {
1081 maybe_skip_postgres_integration_test!();
1082 let leader_value = "test_leader".to_string();
1083 let uuid = uuid::Uuid::new_v4().to_string();
1084 let table_name = "test_elected_and_step_down_greptime_metakv";
1085 let candidate_lease_ttl = Duration::from_secs(5);
1086 let execution_timeout = Duration::from_secs(10);
1087 let statement_timeout = Duration::from_secs(10);
1088 let meta_lease_ttl = Duration::from_secs(2);
1089 let idle_session_timeout = Duration::from_secs(0);
1090 let client = create_postgres_client(
1091 Some(table_name),
1092 execution_timeout,
1093 idle_session_timeout,
1094 statement_timeout,
1095 )
1096 .await
1097 .unwrap();
1098
1099 let (tx, mut rx) = broadcast::channel(100);
1100 let leader_pg_election = PgElection {
1101 leader_value: leader_value.clone(),
1102 pg_client: RwLock::new(client),
1103 is_leader: AtomicBool::new(false),
1104 leader_infancy: AtomicBool::new(true),
1105 leader_watcher: tx,
1106 store_key_prefix: uuid,
1107 candidate_lease_ttl,
1108 meta_lease_ttl,
1109 sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1110 };
1111
1112 leader_pg_election.elected().await.unwrap();
1113 let lease = leader_pg_election
1114 .get_value_with_lease(&leader_pg_election.election_key())
1115 .await
1116 .unwrap()
1117 .unwrap();
1118 assert!(lease.leader_value == leader_value);
1119 assert!(lease.expire_time > lease.current);
1120 assert!(leader_pg_election.is_leader());
1121
1122 match rx.recv().await {
1123 Ok(LeaderChangeMessage::Elected(key)) => {
1124 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1125 assert_eq!(
1126 String::from_utf8_lossy(key.key()),
1127 leader_pg_election.election_key()
1128 );
1129 assert_eq!(key.lease_id(), i64::default());
1130 assert_eq!(key.revision(), i64::default());
1131 }
1132 _ => panic!("Expected LeaderChangeMessage::Elected"),
1133 }
1134
1135 leader_pg_election.step_down_without_lock().await.unwrap();
1136 let lease = leader_pg_election
1137 .get_value_with_lease(&leader_pg_election.election_key())
1138 .await
1139 .unwrap()
1140 .unwrap();
1141 assert!(lease.leader_value == leader_value);
1142 assert!(!leader_pg_election.is_leader());
1143
1144 match rx.recv().await {
1145 Ok(LeaderChangeMessage::StepDown(key)) => {
1146 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1147 assert_eq!(
1148 String::from_utf8_lossy(key.key()),
1149 leader_pg_election.election_key()
1150 );
1151 assert_eq!(key.lease_id(), i64::default());
1152 assert_eq!(key.revision(), i64::default());
1153 }
1154 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1155 }
1156
1157 leader_pg_election.elected().await.unwrap();
1158 let lease = leader_pg_election
1159 .get_value_with_lease(&leader_pg_election.election_key())
1160 .await
1161 .unwrap()
1162 .unwrap();
1163 assert!(lease.leader_value == leader_value);
1164 assert!(lease.expire_time > lease.current);
1165 assert!(leader_pg_election.is_leader());
1166
1167 match rx.recv().await {
1168 Ok(LeaderChangeMessage::Elected(key)) => {
1169 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1170 assert_eq!(
1171 String::from_utf8_lossy(key.key()),
1172 leader_pg_election.election_key()
1173 );
1174 assert_eq!(key.lease_id(), i64::default());
1175 assert_eq!(key.revision(), i64::default());
1176 }
1177 _ => panic!("Expected LeaderChangeMessage::Elected"),
1178 }
1179
1180 leader_pg_election.step_down().await.unwrap();
1181 let res = leader_pg_election
1182 .get_value_with_lease(&leader_pg_election.election_key())
1183 .await
1184 .unwrap();
1185 assert!(res.is_none());
1186 assert!(!leader_pg_election.is_leader());
1187
1188 match rx.recv().await {
1189 Ok(LeaderChangeMessage::StepDown(key)) => {
1190 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1191 assert_eq!(
1192 String::from_utf8_lossy(key.key()),
1193 leader_pg_election.election_key()
1194 );
1195 assert_eq!(key.lease_id(), i64::default());
1196 assert_eq!(key.revision(), i64::default());
1197 }
1198 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1199 }
1200
1201 drop_table(&leader_pg_election, table_name).await;
1202 }
1203
1204 #[tokio::test]
1205 async fn test_leader_action() {
1206 maybe_skip_postgres_integration_test!();
1207 let leader_value = "test_leader".to_string();
1208 let uuid = uuid::Uuid::new_v4().to_string();
1209 let table_name = "test_leader_action_greptime_metakv";
1210 let candidate_lease_ttl = Duration::from_secs(5);
1211 let execution_timeout = Duration::from_secs(10);
1212 let statement_timeout = Duration::from_secs(10);
1213 let meta_lease_ttl = Duration::from_secs(2);
1214 let idle_session_timeout = Duration::from_secs(0);
1215 let client = create_postgres_client(
1216 Some(table_name),
1217 execution_timeout,
1218 idle_session_timeout,
1219 statement_timeout,
1220 )
1221 .await
1222 .unwrap();
1223
1224 let (tx, mut rx) = broadcast::channel(100);
1225 let leader_pg_election = PgElection {
1226 leader_value: leader_value.clone(),
1227 pg_client: RwLock::new(client),
1228 is_leader: AtomicBool::new(false),
1229 leader_infancy: AtomicBool::new(true),
1230 leader_watcher: tx,
1231 store_key_prefix: uuid,
1232 candidate_lease_ttl,
1233 meta_lease_ttl,
1234 sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1235 };
1236
1237 let res = leader_pg_election
1239 .pg_client
1240 .read()
1241 .await
1242 .query(&leader_pg_election.sql_set.campaign, &[])
1243 .await
1244 .unwrap();
1245 let res: bool = res[0].get(0);
1246 assert!(res);
1247 leader_pg_election.leader_action().await.unwrap();
1248 let lease = leader_pg_election
1249 .get_value_with_lease(&leader_pg_election.election_key())
1250 .await
1251 .unwrap()
1252 .unwrap();
1253 assert!(lease.leader_value == leader_value);
1254 assert!(lease.expire_time > lease.current);
1255 assert!(leader_pg_election.is_leader());
1256
1257 match rx.recv().await {
1258 Ok(LeaderChangeMessage::Elected(key)) => {
1259 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1260 assert_eq!(
1261 String::from_utf8_lossy(key.key()),
1262 leader_pg_election.election_key()
1263 );
1264 assert_eq!(key.lease_id(), i64::default());
1265 assert_eq!(key.revision(), i64::default());
1266 }
1267 _ => panic!("Expected LeaderChangeMessage::Elected"),
1268 }
1269
1270 let res = leader_pg_election
1272 .pg_client
1273 .read()
1274 .await
1275 .query(&leader_pg_election.sql_set.campaign, &[])
1276 .await
1277 .unwrap();
1278 let res: bool = res[0].get(0);
1279 assert!(res);
1280 leader_pg_election.leader_action().await.unwrap();
1281 let new_lease = leader_pg_election
1282 .get_value_with_lease(&leader_pg_election.election_key())
1283 .await
1284 .unwrap()
1285 .unwrap();
1286 assert!(new_lease.leader_value == leader_value);
1287 assert!(
1288 new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1289 );
1290 assert!(leader_pg_election.is_leader());
1291
1292 tokio::time::sleep(Duration::from_secs(2)).await;
1294
1295 let res = leader_pg_election
1296 .pg_client
1297 .read()
1298 .await
1299 .query(&leader_pg_election.sql_set.campaign, &[])
1300 .await
1301 .unwrap();
1302 let res: bool = res[0].get(0);
1303 assert!(res);
1304 leader_pg_election.leader_action().await.unwrap();
1305 let res = leader_pg_election
1306 .get_value_with_lease(&leader_pg_election.election_key())
1307 .await
1308 .unwrap();
1309 assert!(res.is_none());
1310
1311 match rx.recv().await {
1312 Ok(LeaderChangeMessage::StepDown(key)) => {
1313 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1314 assert_eq!(
1315 String::from_utf8_lossy(key.key()),
1316 leader_pg_election.election_key()
1317 );
1318 assert_eq!(key.lease_id(), i64::default());
1319 assert_eq!(key.revision(), i64::default());
1320 }
1321 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1322 }
1323
1324 let res = leader_pg_election
1326 .pg_client
1327 .read()
1328 .await
1329 .query(&leader_pg_election.sql_set.campaign, &[])
1330 .await
1331 .unwrap();
1332 let res: bool = res[0].get(0);
1333 assert!(res);
1334 leader_pg_election.leader_action().await.unwrap();
1335 let lease = leader_pg_election
1336 .get_value_with_lease(&leader_pg_election.election_key())
1337 .await
1338 .unwrap()
1339 .unwrap();
1340 assert!(lease.leader_value == leader_value);
1341 assert!(lease.expire_time > lease.current);
1342 assert!(leader_pg_election.is_leader());
1343
1344 match rx.recv().await {
1345 Ok(LeaderChangeMessage::Elected(key)) => {
1346 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1347 assert_eq!(
1348 String::from_utf8_lossy(key.key()),
1349 leader_pg_election.election_key()
1350 );
1351 assert_eq!(key.lease_id(), i64::default());
1352 assert_eq!(key.revision(), i64::default());
1353 }
1354 _ => panic!("Expected LeaderChangeMessage::Elected"),
1355 }
1356
1357 leader_pg_election
1359 .delete_value(&leader_pg_election.election_key())
1360 .await
1361 .unwrap();
1362 leader_pg_election.leader_action().await.unwrap();
1363 let res = leader_pg_election
1364 .get_value_with_lease(&leader_pg_election.election_key())
1365 .await
1366 .unwrap();
1367 assert!(res.is_none());
1368 assert!(!leader_pg_election.is_leader());
1369
1370 match rx.recv().await {
1371 Ok(LeaderChangeMessage::StepDown(key)) => {
1372 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1373 assert_eq!(
1374 String::from_utf8_lossy(key.key()),
1375 leader_pg_election.election_key()
1376 );
1377 assert_eq!(key.lease_id(), i64::default());
1378 assert_eq!(key.revision(), i64::default());
1379 }
1380 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1381 }
1382
1383 let res = leader_pg_election
1385 .pg_client
1386 .read()
1387 .await
1388 .query(&leader_pg_election.sql_set.campaign, &[])
1389 .await
1390 .unwrap();
1391 let res: bool = res[0].get(0);
1392 assert!(res);
1393 leader_pg_election.leader_action().await.unwrap();
1394 let lease = leader_pg_election
1395 .get_value_with_lease(&leader_pg_election.election_key())
1396 .await
1397 .unwrap()
1398 .unwrap();
1399 assert!(lease.leader_value == leader_value);
1400 assert!(lease.expire_time > lease.current);
1401 assert!(leader_pg_election.is_leader());
1402
1403 match rx.recv().await {
1404 Ok(LeaderChangeMessage::Elected(key)) => {
1405 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1406 assert_eq!(
1407 String::from_utf8_lossy(key.key()),
1408 leader_pg_election.election_key()
1409 );
1410 assert_eq!(key.lease_id(), i64::default());
1411 assert_eq!(key.revision(), i64::default());
1412 }
1413 _ => panic!("Expected LeaderChangeMessage::Elected"),
1414 }
1415
1416 let res = leader_pg_election
1418 .pg_client
1419 .read()
1420 .await
1421 .query(&leader_pg_election.sql_set.campaign, &[])
1422 .await
1423 .unwrap();
1424 let res: bool = res[0].get(0);
1425 assert!(res);
1426 leader_pg_election
1427 .delete_value(&leader_pg_election.election_key())
1428 .await
1429 .unwrap();
1430 leader_pg_election
1431 .put_value_with_lease(
1432 &leader_pg_election.election_key(),
1433 "test",
1434 Duration::from_secs(10),
1435 )
1436 .await
1437 .unwrap();
1438 leader_pg_election.leader_action().await.unwrap();
1439 let res = leader_pg_election
1440 .get_value_with_lease(&leader_pg_election.election_key())
1441 .await
1442 .unwrap();
1443 assert!(res.is_none());
1444 assert!(!leader_pg_election.is_leader());
1445
1446 match rx.recv().await {
1447 Ok(LeaderChangeMessage::StepDown(key)) => {
1448 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1449 assert_eq!(
1450 String::from_utf8_lossy(key.key()),
1451 leader_pg_election.election_key()
1452 );
1453 assert_eq!(key.lease_id(), i64::default());
1454 assert_eq!(key.revision(), i64::default());
1455 }
1456 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1457 }
1458
1459 leader_pg_election
1461 .pg_client
1462 .read()
1463 .await
1464 .query(&leader_pg_election.sql_set.step_down, &[])
1465 .await
1466 .unwrap();
1467
1468 drop_table(&leader_pg_election, table_name).await;
1469 }
1470
1471 #[tokio::test]
1472 async fn test_follower_action() {
1473 maybe_skip_postgres_integration_test!();
1474 common_telemetry::init_default_ut_logging();
1475 let uuid = uuid::Uuid::new_v4().to_string();
1476 let table_name = "test_follower_action_greptime_metakv";
1477
1478 let candidate_lease_ttl = Duration::from_secs(5);
1479 let execution_timeout = Duration::from_secs(10);
1480 let statement_timeout = Duration::from_secs(10);
1481 let meta_lease_ttl = Duration::from_secs(2);
1482 let idle_session_timeout = Duration::from_secs(0);
1483 let follower_client = create_postgres_client(
1484 Some(table_name),
1485 execution_timeout,
1486 idle_session_timeout,
1487 statement_timeout,
1488 )
1489 .await
1490 .unwrap();
1491 let (tx, mut rx) = broadcast::channel(100);
1492 let follower_pg_election = PgElection {
1493 leader_value: "test_follower".to_string(),
1494 pg_client: RwLock::new(follower_client),
1495 is_leader: AtomicBool::new(false),
1496 leader_infancy: AtomicBool::new(true),
1497 leader_watcher: tx,
1498 store_key_prefix: uuid.clone(),
1499 candidate_lease_ttl,
1500 meta_lease_ttl,
1501 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1502 };
1503
1504 let leader_client = create_postgres_client(
1505 Some(table_name),
1506 execution_timeout,
1507 idle_session_timeout,
1508 statement_timeout,
1509 )
1510 .await
1511 .unwrap();
1512 let (tx, _) = broadcast::channel(100);
1513 let leader_pg_election = PgElection {
1514 leader_value: "test_leader".to_string(),
1515 pg_client: RwLock::new(leader_client),
1516 is_leader: AtomicBool::new(false),
1517 leader_infancy: AtomicBool::new(true),
1518 leader_watcher: tx,
1519 store_key_prefix: uuid,
1520 candidate_lease_ttl,
1521 meta_lease_ttl,
1522 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1523 };
1524
1525 leader_pg_election
1526 .pg_client
1527 .read()
1528 .await
1529 .query(&leader_pg_election.sql_set.campaign, &[])
1530 .await
1531 .unwrap();
1532 leader_pg_election.elected().await.unwrap();
1533
1534 follower_pg_election.follower_action().await.unwrap();
1536
1537 tokio::time::sleep(Duration::from_secs(2)).await;
1539 assert!(follower_pg_election.follower_action().await.is_err());
1540
1541 leader_pg_election
1543 .delete_value(&leader_pg_election.election_key())
1544 .await
1545 .unwrap();
1546 assert!(follower_pg_election.follower_action().await.is_err());
1547
1548 follower_pg_election
1550 .is_leader
1551 .store(true, Ordering::Relaxed);
1552 assert!(follower_pg_election.follower_action().await.is_err());
1553
1554 match rx.recv().await {
1555 Ok(LeaderChangeMessage::StepDown(key)) => {
1556 assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1557 assert_eq!(
1558 String::from_utf8_lossy(key.key()),
1559 follower_pg_election.election_key()
1560 );
1561 assert_eq!(key.lease_id(), i64::default());
1562 assert_eq!(key.revision(), i64::default());
1563 }
1564 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1565 }
1566
1567 leader_pg_election
1569 .pg_client
1570 .read()
1571 .await
1572 .query(&leader_pg_election.sql_set.step_down, &[])
1573 .await
1574 .unwrap();
1575
1576 drop_table(&follower_pg_election, table_name).await;
1577 }
1578
1579 #[tokio::test]
1580 async fn test_idle_session_timeout() {
1581 maybe_skip_postgres_integration_test!();
1582 common_telemetry::init_default_ut_logging();
1583 let execution_timeout = Duration::from_secs(10);
1584 let statement_timeout = Duration::from_secs(10);
1585 let idle_session_timeout = Duration::from_secs(1);
1586 let mut client = create_postgres_client(
1587 None,
1588 execution_timeout,
1589 idle_session_timeout,
1590 statement_timeout,
1591 )
1592 .await
1593 .unwrap();
1594 tokio::time::sleep(Duration::from_millis(1100)).await;
1595 let err = client.query("SELECT 1", &[]).await.unwrap_err();
1597 assert_matches!(err, error::Error::PostgresExecution { .. });
1598 let error::Error::PostgresExecution { error, .. } = err else {
1599 panic!("Expected PostgresExecution error");
1600 };
1601 assert!(error.is_closed());
1602 client.reset_client().await.unwrap();
1604 let _ = client.query("SELECT 1", &[]).await.unwrap();
1605 }
1606
1607 #[test]
1608 fn test_election_sql_with_schema() {
1609 let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1610 let s = f.build();
1611 assert!(s.campaign.contains("pg_try_advisory_lock"));
1612 assert!(
1613 s.put_value_with_lease
1614 .contains("\"test_schema\".\"greptime_metakv\"")
1615 );
1616 assert!(
1617 s.update_value_with_lease
1618 .contains("\"test_schema\".\"greptime_metakv\"")
1619 );
1620 assert!(
1621 s.get_value_with_lease
1622 .contains("\"test_schema\".\"greptime_metakv\"")
1623 );
1624 assert!(
1625 s.get_value_with_lease_by_prefix
1626 .contains("\"test_schema\".\"greptime_metakv\"")
1627 );
1628 assert!(
1629 s.delete_value
1630 .contains("\"test_schema\".\"greptime_metakv\"")
1631 );
1632 }
1633}