1use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, warn};
21use common_time::Timestamp;
22use deadpool_postgres::{Manager, Pool};
23use snafu::{ensure, OptionExt, ResultExt};
24use tokio::sync::{broadcast, RwLock};
25use tokio::time::MissedTickBehavior;
26use tokio_postgres::types::ToSql;
27use tokio_postgres::Row;
28
29use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
30use crate::election::{
31 listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
32};
33use crate::error::{
34 DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35 Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40 lock_id: u64,
41 schema_name: Option<&'a str>,
42 table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46 campaign: String,
47 step_down: String,
48 put_value_with_lease: String,
58 update_value_with_lease: String,
66 get_value_with_lease: String,
71 get_value_with_lease_by_prefix: String,
80 delete_value: String,
89}
90
91impl<'a> ElectionSqlFactory<'a> {
92 fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
93 Self {
94 lock_id,
95 schema_name,
96 table_name,
97 }
98 }
99
100 fn table_ident(&self) -> String {
101 match self.schema_name {
102 Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
103 _ => format!("\"{}\"", self.table_name),
104 }
105 }
106
107 fn build(self) -> ElectionSqlSet {
108 ElectionSqlSet {
109 campaign: self.campaign_sql(),
110 step_down: self.step_down_sql(),
111 put_value_with_lease: self.put_value_with_lease_sql(),
112 update_value_with_lease: self.update_value_with_lease_sql(),
113 get_value_with_lease: self.get_value_with_lease_sql(),
114 get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
115 delete_value: self.delete_value_sql(),
116 }
117 }
118
119 fn campaign_sql(&self) -> String {
120 format!("SELECT pg_try_advisory_lock({})", self.lock_id)
121 }
122
123 fn step_down_sql(&self) -> String {
124 format!("SELECT pg_advisory_unlock({})", self.lock_id)
125 }
126
127 fn put_value_with_lease_sql(&self) -> String {
128 let table = self.table_ident();
129 format!(
130 r#"WITH prev AS (
131 SELECT k, v FROM {table} WHERE k = $1
132 ), insert AS (
133 INSERT INTO {table}
134 VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
135 ON CONFLICT (k) DO NOTHING
136 )
137 SELECT k, v FROM prev;
138 "#,
139 table = table,
140 lease_sep = LEASE_SEP
141 )
142 }
143
144 fn update_value_with_lease_sql(&self) -> String {
145 let table = self.table_ident();
146 format!(
147 r#"UPDATE {table}
148 SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
149 WHERE k = $1 AND v = $2"#,
150 table = table,
151 lease_sep = LEASE_SEP
152 )
153 }
154
155 fn get_value_with_lease_sql(&self) -> String {
156 let table = self.table_ident();
157 format!(
158 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
159 table = table
160 )
161 }
162
163 fn get_value_with_lease_by_prefix_sql(&self) -> String {
164 let table = self.table_ident();
165 format!(
166 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
167 table = table
168 )
169 }
170
171 fn delete_value_sql(&self) -> String {
172 let table = self.table_ident();
173 format!(
174 "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
175 table = table
176 )
177 }
178}
179
180pub struct ElectionPgClient {
182 current: Option<deadpool::managed::Object<Manager>>,
183 pool: Pool,
184 execution_timeout: Duration,
189
190 idle_session_timeout: Duration,
195
196 statement_timeout: Duration,
201}
202
203impl ElectionPgClient {
204 pub fn new(
205 pool: Pool,
206 execution_timeout: Duration,
207 idle_session_timeout: Duration,
208 statement_timeout: Duration,
209 ) -> Result<ElectionPgClient> {
210 Ok(ElectionPgClient {
211 current: None,
212 pool,
213 execution_timeout,
214 idle_session_timeout,
215 statement_timeout,
216 })
217 }
218
219 fn set_idle_session_timeout_sql(&self) -> String {
220 format!(
221 "SET idle_session_timeout = '{}s';",
222 self.idle_session_timeout.as_secs()
223 )
224 }
225
226 fn set_statement_timeout_sql(&self) -> String {
227 format!(
228 "SET statement_timeout = '{}s';",
229 self.statement_timeout.as_secs()
230 )
231 }
232
233 async fn reset_client(&mut self) -> Result<()> {
234 self.current = None;
235 self.maybe_init_client().await
236 }
237
238 async fn maybe_init_client(&mut self) -> Result<()> {
239 if self.current.is_none() {
240 let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
241
242 self.current = Some(client);
243 let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
245 self.execute(&idle_session_timeout_sql, &[]).await?;
246 let statement_timeout_sql = self.set_statement_timeout_sql();
247 self.execute(&statement_timeout_sql, &[]).await?;
248 }
249
250 Ok(())
251 }
252
253 async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
258 let result = tokio::time::timeout(
259 self.execution_timeout,
260 self.current.as_ref().unwrap().execute(sql, params),
261 )
262 .await
263 .map_err(|_| {
264 SqlExecutionTimeoutSnafu {
265 sql: sql.to_string(),
266 duration: self.execution_timeout,
267 }
268 .build()
269 })?;
270
271 result.context(PostgresExecutionSnafu { sql })
272 }
273
274 async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
279 let result = tokio::time::timeout(
280 self.execution_timeout,
281 self.current.as_ref().unwrap().query(sql, params),
282 )
283 .await
284 .map_err(|_| {
285 SqlExecutionTimeoutSnafu {
286 sql: sql.to_string(),
287 duration: self.execution_timeout,
288 }
289 .build()
290 })?;
291
292 result.context(PostgresExecutionSnafu { sql })
293 }
294}
295
296pub struct PgElection {
298 leader_value: String,
299 pg_client: RwLock<ElectionPgClient>,
300 is_leader: AtomicBool,
301 leader_infancy: AtomicBool,
302 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
303 store_key_prefix: String,
304 candidate_lease_ttl: Duration,
305 meta_lease_ttl: Duration,
306 sql_set: ElectionSqlSet,
307}
308
309impl PgElection {
310 async fn maybe_init_client(&self) -> Result<()> {
311 if self.pg_client.read().await.current.is_none() {
312 self.pg_client.write().await.maybe_init_client().await?;
313 }
314
315 Ok(())
316 }
317
318 #[allow(clippy::too_many_arguments)]
319 pub async fn with_pg_client(
320 leader_value: String,
321 pg_client: ElectionPgClient,
322 store_key_prefix: String,
323 candidate_lease_ttl: Duration,
324 meta_lease_ttl: Duration,
325 schema_name: Option<&str>,
326 table_name: &str,
327 lock_id: u64,
328 ) -> Result<ElectionRef> {
329 if let Some(s) = schema_name {
330 common_telemetry::info!("PgElection uses schema: {}", s);
331 } else {
332 common_telemetry::info!("PgElection uses default search_path (no schema provided)");
333 }
334 let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
335
336 let tx = listen_leader_change(leader_value.clone());
337 Ok(Arc::new(Self {
338 leader_value,
339 pg_client: RwLock::new(pg_client),
340 is_leader: AtomicBool::new(false),
341 leader_infancy: AtomicBool::new(false),
342 leader_watcher: tx,
343 store_key_prefix,
344 candidate_lease_ttl,
345 meta_lease_ttl,
346 sql_set: sql_factory.build(),
347 }))
348 }
349
350 fn election_key(&self) -> String {
351 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
352 }
353
354 fn candidate_root(&self) -> String {
355 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
356 }
357
358 fn candidate_key(&self) -> String {
359 format!("{}{}", self.candidate_root(), self.leader_value)
360 }
361}
362
363#[async_trait::async_trait]
364impl Election for PgElection {
365 type Leader = LeaderValue;
366
367 fn is_leader(&self) -> bool {
368 self.is_leader.load(Ordering::Relaxed)
369 }
370
371 fn in_leader_infancy(&self) -> bool {
372 self.leader_infancy
373 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
374 .is_ok()
375 }
376
377 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
378 let key = self.candidate_key();
379 let node_info =
380 serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
381 input: format!("{node_info:?}"),
382 })?;
383 let res = self
384 .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
385 .await?;
386 if !res {
388 self.delete_value(&key).await?;
389 self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
390 .await?;
391 }
392
393 let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
395 loop {
396 let _ = keep_alive_interval.tick().await;
397
398 let lease = self
399 .get_value_with_lease(&key)
400 .await?
401 .context(UnexpectedSnafu {
402 violated: format!("Failed to get lease for key: {:?}", key),
403 })?;
404
405 ensure!(
406 lease.expire_time > lease.current,
407 UnexpectedSnafu {
408 violated: format!(
409 "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
410 lease.expire_time, lease.current, key
411 ),
412 }
413 );
414
415 self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
417 .await?;
418 }
419 }
420
421 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
422 let key_prefix = self.candidate_root();
423 let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
424 candidates.retain(|c| c.1 > current);
426 let mut valid_candidates = Vec::with_capacity(candidates.len());
427 for (c, _) in candidates {
428 let node_info: MetasrvNodeInfo =
429 serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
430 input: format!("{:?}", c),
431 })?;
432 valid_candidates.push(node_info);
433 }
434 Ok(valid_candidates)
435 }
436
437 async fn campaign(&self) -> Result<()> {
450 let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
451 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
452
453 self.maybe_init_client().await?;
454 loop {
455 let res = self
456 .pg_client
457 .read()
458 .await
459 .query(&self.sql_set.campaign, &[])
460 .await?;
461 let row = res.first().context(UnexpectedSnafu {
462 violated: "Failed to get the result of acquiring advisory lock",
463 })?;
464 let is_leader = row.try_get(0).map_err(|_| {
465 UnexpectedSnafu {
466 violated: "Failed to get the result of get lock",
467 }
468 .build()
469 })?;
470 if is_leader {
471 self.leader_action().await?;
472 } else {
473 self.follower_action().await?;
474 }
475 let _ = keep_alive_interval.tick().await;
476 }
477 }
478
479 async fn reset_campaign(&self) {
480 if let Err(err) = self.pg_client.write().await.reset_client().await {
481 error!(err; "Failed to reset client");
482 }
483 }
484
485 async fn leader(&self) -> Result<Self::Leader> {
486 if self.is_leader.load(Ordering::Relaxed) {
487 Ok(self.leader_value.as_bytes().into())
488 } else {
489 let key = self.election_key();
490 if let Some(lease) = self.get_value_with_lease(&key).await? {
491 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
492 Ok(lease.leader_value.as_bytes().into())
493 } else {
494 NoLeaderSnafu.fail()
495 }
496 }
497 }
498
499 async fn resign(&self) -> Result<()> {
500 todo!()
501 }
502
503 fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
504 self.leader_watcher.subscribe()
505 }
506}
507
508impl PgElection {
509 async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
511 let key = key.as_bytes();
512 self.maybe_init_client().await?;
513 let res = self
514 .pg_client
515 .read()
516 .await
517 .query(&self.sql_set.get_value_with_lease, &[&key])
518 .await?;
519
520 if res.is_empty() {
521 Ok(None)
522 } else {
523 let current_time_str = res[0].try_get(1).unwrap_or_default();
525 let current_time = match Timestamp::from_str(current_time_str, None) {
526 Ok(ts) => ts,
527 Err(_) => UnexpectedSnafu {
528 violated: format!("Invalid timestamp: {}", current_time_str),
529 }
530 .fail()?,
531 };
532 let value_and_expire_time =
534 String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
535 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
536
537 Ok(Some(Lease {
538 leader_value: value,
539 expire_time,
540 current: current_time,
541 origin: value_and_expire_time.to_string(),
542 }))
543 }
544 }
545
546 async fn get_value_with_lease_by_prefix(
548 &self,
549 key_prefix: &str,
550 ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
551 let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
552 self.maybe_init_client().await?;
553 let res = self
554 .pg_client
555 .read()
556 .await
557 .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
558 .await?;
559
560 let mut values_with_leases = vec![];
561 let mut current = Timestamp::default();
562 for row in res {
563 let current_time_str = row.try_get(1).unwrap_or_default();
564 current = match Timestamp::from_str(current_time_str, None) {
565 Ok(ts) => ts,
566 Err(_) => UnexpectedSnafu {
567 violated: format!("Invalid timestamp: {}", current_time_str),
568 }
569 .fail()?,
570 };
571
572 let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
573 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
574
575 values_with_leases.push((value, expire_time));
576 }
577 Ok((values_with_leases, current))
578 }
579
580 async fn update_value_with_lease(
581 &self,
582 key: &str,
583 prev: &str,
584 updated: &str,
585 lease_ttl: Duration,
586 ) -> Result<()> {
587 let key = key.as_bytes();
588 let prev = prev.as_bytes();
589 self.maybe_init_client().await?;
590 let lease_ttl_secs = lease_ttl.as_secs() as f64;
591 let res = self
592 .pg_client
593 .read()
594 .await
595 .execute(
596 &self.sql_set.update_value_with_lease,
597 &[&key, &prev, &updated, &lease_ttl_secs],
598 )
599 .await?;
600
601 ensure!(
602 res == 1,
603 UnexpectedSnafu {
604 violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
605 }
606 );
607
608 Ok(())
609 }
610
611 async fn put_value_with_lease(
613 &self,
614 key: &str,
615 value: &str,
616 lease_ttl: Duration,
617 ) -> Result<bool> {
618 let key = key.as_bytes();
619 let lease_ttl_secs = lease_ttl.as_secs() as f64;
620 let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
621 self.maybe_init_client().await?;
622 let res = self
623 .pg_client
624 .read()
625 .await
626 .query(&self.sql_set.put_value_with_lease, ¶ms)
627 .await?;
628 Ok(res.is_empty())
629 }
630
631 async fn delete_value(&self, key: &str) -> Result<bool> {
634 let key = key.as_bytes();
635 self.maybe_init_client().await?;
636 let res = self
637 .pg_client
638 .read()
639 .await
640 .query(&self.sql_set.delete_value, &[&key])
641 .await?;
642
643 Ok(res.len() == 1)
644 }
645
646 async fn leader_action(&self) -> Result<()> {
667 let key = self.election_key();
668 if self.is_leader() {
670 match self.get_value_with_lease(&key).await? {
671 Some(lease) => {
672 match (
673 lease.leader_value == self.leader_value,
674 lease.expire_time > lease.current,
675 ) {
676 (true, true) => {
678 self.update_value_with_lease(
680 &key,
681 &lease.origin,
682 &self.leader_value,
683 self.meta_lease_ttl,
684 )
685 .await?;
686 }
687 (true, false) => {
689 warn!("Leader lease expired, now stepping down.");
690 self.step_down().await?;
691 }
692 (false, _) => {
694 warn!("Leader lease not found, but still hold the lock. Now stepping down.");
695 self.step_down().await?;
696 }
697 }
698 }
699 None => {
701 warn!("Leader lease not found, but still hold the lock. Now stepping down.");
702 self.step_down().await?;
703 }
704 }
705 } else {
707 self.elected().await?;
708 }
709 Ok(())
710 }
711
712 async fn follower_action(&self) -> Result<()> {
723 let key = self.election_key();
724 if self.is_leader() {
726 self.step_down_without_lock().await?;
727 }
728 let lease = self
729 .get_value_with_lease(&key)
730 .await?
731 .context(NoLeaderSnafu)?;
732 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
734 Ok(())
736 }
737
738 async fn step_down(&self) -> Result<()> {
745 let key = self.election_key();
746 let leader_key = RdsLeaderKey {
747 name: self.leader_value.clone().into_bytes(),
748 key: key.clone().into_bytes(),
749 ..Default::default()
750 };
751 self.delete_value(&key).await?;
752 self.maybe_init_client().await?;
753 self.pg_client
754 .read()
755 .await
756 .query(&self.sql_set.step_down, &[])
757 .await?;
758 send_leader_change_and_set_flags(
759 &self.is_leader,
760 &self.leader_infancy,
761 &self.leader_watcher,
762 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
763 );
764 Ok(())
765 }
766
767 async fn step_down_without_lock(&self) -> Result<()> {
769 let key = self.election_key().into_bytes();
770 let leader_key = RdsLeaderKey {
771 name: self.leader_value.clone().into_bytes(),
772 key: key.clone(),
773 ..Default::default()
774 };
775 if self
776 .is_leader
777 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
778 .is_ok()
779 {
780 if let Err(e) = self
781 .leader_watcher
782 .send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
783 {
784 error!(e; "Failed to send leader change message");
785 }
786 }
787 Ok(())
788 }
789
790 async fn elected(&self) -> Result<()> {
793 let key = self.election_key();
794 let leader_key = RdsLeaderKey {
795 name: self.leader_value.clone().into_bytes(),
796 key: key.clone().into_bytes(),
797 ..Default::default()
798 };
799 self.delete_value(&key).await?;
800 self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
801 .await?;
802
803 if self
804 .is_leader
805 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
806 .is_ok()
807 {
808 self.leader_infancy.store(true, Ordering::Release);
809
810 if let Err(e) = self
811 .leader_watcher
812 .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
813 {
814 error!(e; "Failed to send leader change message");
815 }
816 }
817 Ok(())
818 }
819}
820
821#[cfg(test)]
822mod tests {
823 use std::assert_matches::assert_matches;
824 use std::env;
825
826 use common_meta::maybe_skip_postgres_integration_test;
827
828 use super::*;
829 use crate::bootstrap::create_postgres_pool;
830 use crate::error;
831
832 async fn create_postgres_client(
833 table_name: Option<&str>,
834 execution_timeout: Duration,
835 idle_session_timeout: Duration,
836 statement_timeout: Duration,
837 ) -> Result<ElectionPgClient> {
838 let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
839 if endpoint.is_empty() {
840 return UnexpectedSnafu {
841 violated: "Postgres endpoint is empty".to_string(),
842 }
843 .fail();
844 }
845 let pool = create_postgres_pool(&[endpoint], None).await.unwrap();
846 let mut pg_client = ElectionPgClient::new(
847 pool,
848 execution_timeout,
849 idle_session_timeout,
850 statement_timeout,
851 )
852 .unwrap();
853 pg_client.maybe_init_client().await?;
854 if let Some(table_name) = table_name {
855 let create_table_sql = format!(
856 "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
857 table_name
858 );
859 pg_client.execute(&create_table_sql, &[]).await?;
860 }
861 Ok(pg_client)
862 }
863
864 async fn drop_table(pg_election: &PgElection, table_name: &str) {
865 let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
866 pg_election
867 .pg_client
868 .read()
869 .await
870 .execute(&sql, &[])
871 .await
872 .unwrap();
873 }
874
875 #[tokio::test]
876 async fn test_postgres_crud() {
877 maybe_skip_postgres_integration_test!();
878 let key = "test_key".to_string();
879 let value = "test_value".to_string();
880
881 let uuid = uuid::Uuid::new_v4().to_string();
882 let table_name = "test_postgres_crud_greptime_metakv";
883 let candidate_lease_ttl = Duration::from_secs(10);
884 let execution_timeout = Duration::from_secs(10);
885 let statement_timeout = Duration::from_secs(10);
886 let meta_lease_ttl = Duration::from_secs(2);
887 let idle_session_timeout = Duration::from_secs(0);
888 let client = create_postgres_client(
889 Some(table_name),
890 execution_timeout,
891 idle_session_timeout,
892 statement_timeout,
893 )
894 .await
895 .unwrap();
896
897 let (tx, _) = broadcast::channel(100);
898 let pg_election = PgElection {
899 leader_value: "test_leader".to_string(),
900 pg_client: RwLock::new(client),
901 is_leader: AtomicBool::new(false),
902 leader_infancy: AtomicBool::new(true),
903 leader_watcher: tx,
904 store_key_prefix: uuid,
905 candidate_lease_ttl,
906 meta_lease_ttl,
907 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
908 };
909
910 let res = pg_election
911 .put_value_with_lease(&key, &value, candidate_lease_ttl)
912 .await
913 .unwrap();
914 assert!(res);
915
916 let lease = pg_election
917 .get_value_with_lease(&key)
918 .await
919 .unwrap()
920 .unwrap();
921 assert_eq!(lease.leader_value, value);
922
923 pg_election
924 .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
925 .await
926 .unwrap();
927
928 let res = pg_election.delete_value(&key).await.unwrap();
929 assert!(res);
930
931 let res = pg_election.get_value_with_lease(&key).await.unwrap();
932 assert!(res.is_none());
933
934 for i in 0..10 {
935 let key = format!("test_key_{}", i);
936 let value = format!("test_value_{}", i);
937 pg_election
938 .put_value_with_lease(&key, &value, candidate_lease_ttl)
939 .await
940 .unwrap();
941 }
942
943 let key_prefix = "test_key".to_string();
944 let (res, _) = pg_election
945 .get_value_with_lease_by_prefix(&key_prefix)
946 .await
947 .unwrap();
948 assert_eq!(res.len(), 10);
949
950 for i in 0..10 {
951 let key = format!("test_key_{}", i);
952 let res = pg_election.delete_value(&key).await.unwrap();
953 assert!(res);
954 }
955
956 let (res, current) = pg_election
957 .get_value_with_lease_by_prefix(&key_prefix)
958 .await
959 .unwrap();
960 assert!(res.is_empty());
961 assert!(current == Timestamp::default());
962
963 drop_table(&pg_election, table_name).await;
964 }
965
966 async fn candidate(
967 leader_value: String,
968 candidate_lease_ttl: Duration,
969 store_key_prefix: String,
970 table_name: String,
971 ) {
972 let execution_timeout = Duration::from_secs(10);
973 let statement_timeout = Duration::from_secs(10);
974 let meta_lease_ttl = Duration::from_secs(2);
975 let idle_session_timeout = Duration::from_secs(0);
976 let client = create_postgres_client(
977 None,
978 execution_timeout,
979 idle_session_timeout,
980 statement_timeout,
981 )
982 .await
983 .unwrap();
984
985 let (tx, _) = broadcast::channel(100);
986 let pg_election = PgElection {
987 leader_value,
988 pg_client: RwLock::new(client),
989 is_leader: AtomicBool::new(false),
990 leader_infancy: AtomicBool::new(true),
991 leader_watcher: tx,
992 store_key_prefix,
993 candidate_lease_ttl,
994 meta_lease_ttl,
995 sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
996 };
997
998 let node_info = MetasrvNodeInfo {
999 addr: "test_addr".to_string(),
1000 version: "test_version".to_string(),
1001 git_commit: "test_git_commit".to_string(),
1002 start_time_ms: 0,
1003 };
1004 pg_election.register_candidate(&node_info).await.unwrap();
1005 }
1006
1007 #[tokio::test]
1008 async fn test_candidate_registration() {
1009 maybe_skip_postgres_integration_test!();
1010 let leader_value_prefix = "test_leader".to_string();
1011 let uuid = uuid::Uuid::new_v4().to_string();
1012 let table_name = "test_candidate_registration_greptime_metakv";
1013 let mut handles = vec![];
1014 let candidate_lease_ttl = Duration::from_secs(5);
1015 let execution_timeout = Duration::from_secs(10);
1016 let statement_timeout = Duration::from_secs(10);
1017 let meta_lease_ttl = Duration::from_secs(2);
1018 let idle_session_timeout = Duration::from_secs(0);
1019 let client = create_postgres_client(
1020 Some(table_name),
1021 execution_timeout,
1022 idle_session_timeout,
1023 statement_timeout,
1024 )
1025 .await
1026 .unwrap();
1027
1028 for i in 0..10 {
1029 let leader_value = format!("{}{}", leader_value_prefix, i);
1030 let handle = tokio::spawn(candidate(
1031 leader_value,
1032 candidate_lease_ttl,
1033 uuid.clone(),
1034 table_name.to_string(),
1035 ));
1036 handles.push(handle);
1037 }
1038 tokio::time::sleep(Duration::from_secs(3)).await;
1040
1041 let (tx, _) = broadcast::channel(100);
1042 let leader_value = "test_leader".to_string();
1043 let pg_election = PgElection {
1044 leader_value,
1045 pg_client: RwLock::new(client),
1046 is_leader: AtomicBool::new(false),
1047 leader_infancy: AtomicBool::new(true),
1048 leader_watcher: tx,
1049 store_key_prefix: uuid.clone(),
1050 candidate_lease_ttl,
1051 meta_lease_ttl,
1052 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1053 };
1054
1055 let candidates = pg_election.all_candidates().await.unwrap();
1056 assert_eq!(candidates.len(), 10);
1057
1058 for handle in handles {
1059 handle.abort();
1060 }
1061
1062 tokio::time::sleep(Duration::from_secs(5)).await;
1064 let candidates = pg_election.all_candidates().await.unwrap();
1065 assert!(candidates.is_empty());
1066
1067 for i in 0..10 {
1069 let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1070 let res = pg_election.delete_value(&key).await.unwrap();
1071 assert!(res);
1072 }
1073
1074 drop_table(&pg_election, table_name).await;
1075 }
1076
1077 #[tokio::test]
1078 async fn test_elected_and_step_down() {
1079 maybe_skip_postgres_integration_test!();
1080 let leader_value = "test_leader".to_string();
1081 let uuid = uuid::Uuid::new_v4().to_string();
1082 let table_name = "test_elected_and_step_down_greptime_metakv";
1083 let candidate_lease_ttl = Duration::from_secs(5);
1084 let execution_timeout = Duration::from_secs(10);
1085 let statement_timeout = Duration::from_secs(10);
1086 let meta_lease_ttl = Duration::from_secs(2);
1087 let idle_session_timeout = Duration::from_secs(0);
1088 let client = create_postgres_client(
1089 Some(table_name),
1090 execution_timeout,
1091 idle_session_timeout,
1092 statement_timeout,
1093 )
1094 .await
1095 .unwrap();
1096
1097 let (tx, mut rx) = broadcast::channel(100);
1098 let leader_pg_election = PgElection {
1099 leader_value: leader_value.clone(),
1100 pg_client: RwLock::new(client),
1101 is_leader: AtomicBool::new(false),
1102 leader_infancy: AtomicBool::new(true),
1103 leader_watcher: tx,
1104 store_key_prefix: uuid,
1105 candidate_lease_ttl,
1106 meta_lease_ttl,
1107 sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1108 };
1109
1110 leader_pg_election.elected().await.unwrap();
1111 let lease = leader_pg_election
1112 .get_value_with_lease(&leader_pg_election.election_key())
1113 .await
1114 .unwrap()
1115 .unwrap();
1116 assert!(lease.leader_value == leader_value);
1117 assert!(lease.expire_time > lease.current);
1118 assert!(leader_pg_election.is_leader());
1119
1120 match rx.recv().await {
1121 Ok(LeaderChangeMessage::Elected(key)) => {
1122 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1123 assert_eq!(
1124 String::from_utf8_lossy(key.key()),
1125 leader_pg_election.election_key()
1126 );
1127 assert_eq!(key.lease_id(), i64::default());
1128 assert_eq!(key.revision(), i64::default());
1129 }
1130 _ => panic!("Expected LeaderChangeMessage::Elected"),
1131 }
1132
1133 leader_pg_election.step_down_without_lock().await.unwrap();
1134 let lease = leader_pg_election
1135 .get_value_with_lease(&leader_pg_election.election_key())
1136 .await
1137 .unwrap()
1138 .unwrap();
1139 assert!(lease.leader_value == leader_value);
1140 assert!(!leader_pg_election.is_leader());
1141
1142 match rx.recv().await {
1143 Ok(LeaderChangeMessage::StepDown(key)) => {
1144 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1145 assert_eq!(
1146 String::from_utf8_lossy(key.key()),
1147 leader_pg_election.election_key()
1148 );
1149 assert_eq!(key.lease_id(), i64::default());
1150 assert_eq!(key.revision(), i64::default());
1151 }
1152 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1153 }
1154
1155 leader_pg_election.elected().await.unwrap();
1156 let lease = leader_pg_election
1157 .get_value_with_lease(&leader_pg_election.election_key())
1158 .await
1159 .unwrap()
1160 .unwrap();
1161 assert!(lease.leader_value == leader_value);
1162 assert!(lease.expire_time > lease.current);
1163 assert!(leader_pg_election.is_leader());
1164
1165 match rx.recv().await {
1166 Ok(LeaderChangeMessage::Elected(key)) => {
1167 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1168 assert_eq!(
1169 String::from_utf8_lossy(key.key()),
1170 leader_pg_election.election_key()
1171 );
1172 assert_eq!(key.lease_id(), i64::default());
1173 assert_eq!(key.revision(), i64::default());
1174 }
1175 _ => panic!("Expected LeaderChangeMessage::Elected"),
1176 }
1177
1178 leader_pg_election.step_down().await.unwrap();
1179 let res = leader_pg_election
1180 .get_value_with_lease(&leader_pg_election.election_key())
1181 .await
1182 .unwrap();
1183 assert!(res.is_none());
1184 assert!(!leader_pg_election.is_leader());
1185
1186 match rx.recv().await {
1187 Ok(LeaderChangeMessage::StepDown(key)) => {
1188 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1189 assert_eq!(
1190 String::from_utf8_lossy(key.key()),
1191 leader_pg_election.election_key()
1192 );
1193 assert_eq!(key.lease_id(), i64::default());
1194 assert_eq!(key.revision(), i64::default());
1195 }
1196 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1197 }
1198
1199 drop_table(&leader_pg_election, table_name).await;
1200 }
1201
1202 #[tokio::test]
1203 async fn test_leader_action() {
1204 maybe_skip_postgres_integration_test!();
1205 let leader_value = "test_leader".to_string();
1206 let uuid = uuid::Uuid::new_v4().to_string();
1207 let table_name = "test_leader_action_greptime_metakv";
1208 let candidate_lease_ttl = Duration::from_secs(5);
1209 let execution_timeout = Duration::from_secs(10);
1210 let statement_timeout = Duration::from_secs(10);
1211 let meta_lease_ttl = Duration::from_secs(2);
1212 let idle_session_timeout = Duration::from_secs(0);
1213 let client = create_postgres_client(
1214 Some(table_name),
1215 execution_timeout,
1216 idle_session_timeout,
1217 statement_timeout,
1218 )
1219 .await
1220 .unwrap();
1221
1222 let (tx, mut rx) = broadcast::channel(100);
1223 let leader_pg_election = PgElection {
1224 leader_value: leader_value.clone(),
1225 pg_client: RwLock::new(client),
1226 is_leader: AtomicBool::new(false),
1227 leader_infancy: AtomicBool::new(true),
1228 leader_watcher: tx,
1229 store_key_prefix: uuid,
1230 candidate_lease_ttl,
1231 meta_lease_ttl,
1232 sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1233 };
1234
1235 let res = leader_pg_election
1237 .pg_client
1238 .read()
1239 .await
1240 .query(&leader_pg_election.sql_set.campaign, &[])
1241 .await
1242 .unwrap();
1243 let res: bool = res[0].get(0);
1244 assert!(res);
1245 leader_pg_election.leader_action().await.unwrap();
1246 let lease = leader_pg_election
1247 .get_value_with_lease(&leader_pg_election.election_key())
1248 .await
1249 .unwrap()
1250 .unwrap();
1251 assert!(lease.leader_value == leader_value);
1252 assert!(lease.expire_time > lease.current);
1253 assert!(leader_pg_election.is_leader());
1254
1255 match rx.recv().await {
1256 Ok(LeaderChangeMessage::Elected(key)) => {
1257 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1258 assert_eq!(
1259 String::from_utf8_lossy(key.key()),
1260 leader_pg_election.election_key()
1261 );
1262 assert_eq!(key.lease_id(), i64::default());
1263 assert_eq!(key.revision(), i64::default());
1264 }
1265 _ => panic!("Expected LeaderChangeMessage::Elected"),
1266 }
1267
1268 let res = leader_pg_election
1270 .pg_client
1271 .read()
1272 .await
1273 .query(&leader_pg_election.sql_set.campaign, &[])
1274 .await
1275 .unwrap();
1276 let res: bool = res[0].get(0);
1277 assert!(res);
1278 leader_pg_election.leader_action().await.unwrap();
1279 let new_lease = leader_pg_election
1280 .get_value_with_lease(&leader_pg_election.election_key())
1281 .await
1282 .unwrap()
1283 .unwrap();
1284 assert!(new_lease.leader_value == leader_value);
1285 assert!(
1286 new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1287 );
1288 assert!(leader_pg_election.is_leader());
1289
1290 tokio::time::sleep(Duration::from_secs(2)).await;
1292
1293 let res = leader_pg_election
1294 .pg_client
1295 .read()
1296 .await
1297 .query(&leader_pg_election.sql_set.campaign, &[])
1298 .await
1299 .unwrap();
1300 let res: bool = res[0].get(0);
1301 assert!(res);
1302 leader_pg_election.leader_action().await.unwrap();
1303 let res = leader_pg_election
1304 .get_value_with_lease(&leader_pg_election.election_key())
1305 .await
1306 .unwrap();
1307 assert!(res.is_none());
1308
1309 match rx.recv().await {
1310 Ok(LeaderChangeMessage::StepDown(key)) => {
1311 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1312 assert_eq!(
1313 String::from_utf8_lossy(key.key()),
1314 leader_pg_election.election_key()
1315 );
1316 assert_eq!(key.lease_id(), i64::default());
1317 assert_eq!(key.revision(), i64::default());
1318 }
1319 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1320 }
1321
1322 let res = leader_pg_election
1324 .pg_client
1325 .read()
1326 .await
1327 .query(&leader_pg_election.sql_set.campaign, &[])
1328 .await
1329 .unwrap();
1330 let res: bool = res[0].get(0);
1331 assert!(res);
1332 leader_pg_election.leader_action().await.unwrap();
1333 let lease = leader_pg_election
1334 .get_value_with_lease(&leader_pg_election.election_key())
1335 .await
1336 .unwrap()
1337 .unwrap();
1338 assert!(lease.leader_value == leader_value);
1339 assert!(lease.expire_time > lease.current);
1340 assert!(leader_pg_election.is_leader());
1341
1342 match rx.recv().await {
1343 Ok(LeaderChangeMessage::Elected(key)) => {
1344 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1345 assert_eq!(
1346 String::from_utf8_lossy(key.key()),
1347 leader_pg_election.election_key()
1348 );
1349 assert_eq!(key.lease_id(), i64::default());
1350 assert_eq!(key.revision(), i64::default());
1351 }
1352 _ => panic!("Expected LeaderChangeMessage::Elected"),
1353 }
1354
1355 leader_pg_election
1357 .delete_value(&leader_pg_election.election_key())
1358 .await
1359 .unwrap();
1360 leader_pg_election.leader_action().await.unwrap();
1361 let res = leader_pg_election
1362 .get_value_with_lease(&leader_pg_election.election_key())
1363 .await
1364 .unwrap();
1365 assert!(res.is_none());
1366 assert!(!leader_pg_election.is_leader());
1367
1368 match rx.recv().await {
1369 Ok(LeaderChangeMessage::StepDown(key)) => {
1370 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1371 assert_eq!(
1372 String::from_utf8_lossy(key.key()),
1373 leader_pg_election.election_key()
1374 );
1375 assert_eq!(key.lease_id(), i64::default());
1376 assert_eq!(key.revision(), i64::default());
1377 }
1378 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1379 }
1380
1381 let res = leader_pg_election
1383 .pg_client
1384 .read()
1385 .await
1386 .query(&leader_pg_election.sql_set.campaign, &[])
1387 .await
1388 .unwrap();
1389 let res: bool = res[0].get(0);
1390 assert!(res);
1391 leader_pg_election.leader_action().await.unwrap();
1392 let lease = leader_pg_election
1393 .get_value_with_lease(&leader_pg_election.election_key())
1394 .await
1395 .unwrap()
1396 .unwrap();
1397 assert!(lease.leader_value == leader_value);
1398 assert!(lease.expire_time > lease.current);
1399 assert!(leader_pg_election.is_leader());
1400
1401 match rx.recv().await {
1402 Ok(LeaderChangeMessage::Elected(key)) => {
1403 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1404 assert_eq!(
1405 String::from_utf8_lossy(key.key()),
1406 leader_pg_election.election_key()
1407 );
1408 assert_eq!(key.lease_id(), i64::default());
1409 assert_eq!(key.revision(), i64::default());
1410 }
1411 _ => panic!("Expected LeaderChangeMessage::Elected"),
1412 }
1413
1414 let res = leader_pg_election
1416 .pg_client
1417 .read()
1418 .await
1419 .query(&leader_pg_election.sql_set.campaign, &[])
1420 .await
1421 .unwrap();
1422 let res: bool = res[0].get(0);
1423 assert!(res);
1424 leader_pg_election
1425 .delete_value(&leader_pg_election.election_key())
1426 .await
1427 .unwrap();
1428 leader_pg_election
1429 .put_value_with_lease(
1430 &leader_pg_election.election_key(),
1431 "test",
1432 Duration::from_secs(10),
1433 )
1434 .await
1435 .unwrap();
1436 leader_pg_election.leader_action().await.unwrap();
1437 let res = leader_pg_election
1438 .get_value_with_lease(&leader_pg_election.election_key())
1439 .await
1440 .unwrap();
1441 assert!(res.is_none());
1442 assert!(!leader_pg_election.is_leader());
1443
1444 match rx.recv().await {
1445 Ok(LeaderChangeMessage::StepDown(key)) => {
1446 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1447 assert_eq!(
1448 String::from_utf8_lossy(key.key()),
1449 leader_pg_election.election_key()
1450 );
1451 assert_eq!(key.lease_id(), i64::default());
1452 assert_eq!(key.revision(), i64::default());
1453 }
1454 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1455 }
1456
1457 leader_pg_election
1459 .pg_client
1460 .read()
1461 .await
1462 .query(&leader_pg_election.sql_set.step_down, &[])
1463 .await
1464 .unwrap();
1465
1466 drop_table(&leader_pg_election, table_name).await;
1467 }
1468
1469 #[tokio::test]
1470 async fn test_follower_action() {
1471 maybe_skip_postgres_integration_test!();
1472 common_telemetry::init_default_ut_logging();
1473 let uuid = uuid::Uuid::new_v4().to_string();
1474 let table_name = "test_follower_action_greptime_metakv";
1475
1476 let candidate_lease_ttl = Duration::from_secs(5);
1477 let execution_timeout = Duration::from_secs(10);
1478 let statement_timeout = Duration::from_secs(10);
1479 let meta_lease_ttl = Duration::from_secs(2);
1480 let idle_session_timeout = Duration::from_secs(0);
1481 let follower_client = create_postgres_client(
1482 Some(table_name),
1483 execution_timeout,
1484 idle_session_timeout,
1485 statement_timeout,
1486 )
1487 .await
1488 .unwrap();
1489 let (tx, mut rx) = broadcast::channel(100);
1490 let follower_pg_election = PgElection {
1491 leader_value: "test_follower".to_string(),
1492 pg_client: RwLock::new(follower_client),
1493 is_leader: AtomicBool::new(false),
1494 leader_infancy: AtomicBool::new(true),
1495 leader_watcher: tx,
1496 store_key_prefix: uuid.clone(),
1497 candidate_lease_ttl,
1498 meta_lease_ttl,
1499 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1500 };
1501
1502 let leader_client = create_postgres_client(
1503 Some(table_name),
1504 execution_timeout,
1505 idle_session_timeout,
1506 statement_timeout,
1507 )
1508 .await
1509 .unwrap();
1510 let (tx, _) = broadcast::channel(100);
1511 let leader_pg_election = PgElection {
1512 leader_value: "test_leader".to_string(),
1513 pg_client: RwLock::new(leader_client),
1514 is_leader: AtomicBool::new(false),
1515 leader_infancy: AtomicBool::new(true),
1516 leader_watcher: tx,
1517 store_key_prefix: uuid,
1518 candidate_lease_ttl,
1519 meta_lease_ttl,
1520 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1521 };
1522
1523 leader_pg_election
1524 .pg_client
1525 .read()
1526 .await
1527 .query(&leader_pg_election.sql_set.campaign, &[])
1528 .await
1529 .unwrap();
1530 leader_pg_election.elected().await.unwrap();
1531
1532 follower_pg_election.follower_action().await.unwrap();
1534
1535 tokio::time::sleep(Duration::from_secs(2)).await;
1537 assert!(follower_pg_election.follower_action().await.is_err());
1538
1539 leader_pg_election
1541 .delete_value(&leader_pg_election.election_key())
1542 .await
1543 .unwrap();
1544 assert!(follower_pg_election.follower_action().await.is_err());
1545
1546 follower_pg_election
1548 .is_leader
1549 .store(true, Ordering::Relaxed);
1550 assert!(follower_pg_election.follower_action().await.is_err());
1551
1552 match rx.recv().await {
1553 Ok(LeaderChangeMessage::StepDown(key)) => {
1554 assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1555 assert_eq!(
1556 String::from_utf8_lossy(key.key()),
1557 follower_pg_election.election_key()
1558 );
1559 assert_eq!(key.lease_id(), i64::default());
1560 assert_eq!(key.revision(), i64::default());
1561 }
1562 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1563 }
1564
1565 leader_pg_election
1567 .pg_client
1568 .read()
1569 .await
1570 .query(&leader_pg_election.sql_set.step_down, &[])
1571 .await
1572 .unwrap();
1573
1574 drop_table(&follower_pg_election, table_name).await;
1575 }
1576
1577 #[tokio::test]
1578 async fn test_idle_session_timeout() {
1579 maybe_skip_postgres_integration_test!();
1580 common_telemetry::init_default_ut_logging();
1581 let execution_timeout = Duration::from_secs(10);
1582 let statement_timeout = Duration::from_secs(10);
1583 let idle_session_timeout = Duration::from_secs(1);
1584 let mut client = create_postgres_client(
1585 None,
1586 execution_timeout,
1587 idle_session_timeout,
1588 statement_timeout,
1589 )
1590 .await
1591 .unwrap();
1592 tokio::time::sleep(Duration::from_millis(1100)).await;
1593 let err = client.query("SELECT 1", &[]).await.unwrap_err();
1595 assert_matches!(err, error::Error::PostgresExecution { .. });
1596 let error::Error::PostgresExecution { error, .. } = err else {
1597 panic!("Expected PostgresExecution error");
1598 };
1599 assert!(error.is_closed());
1600 client.reset_client().await.unwrap();
1602 let _ = client.query("SELECT 1", &[]).await.unwrap();
1603 }
1604
1605 #[test]
1606 fn test_election_sql_with_schema() {
1607 let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1608 let s = f.build();
1609 assert!(s.campaign.contains("pg_try_advisory_lock"));
1610 assert!(s
1611 .put_value_with_lease
1612 .contains("\"test_schema\".\"greptime_metakv\""));
1613 assert!(s
1614 .update_value_with_lease
1615 .contains("\"test_schema\".\"greptime_metakv\""));
1616 assert!(s
1617 .get_value_with_lease
1618 .contains("\"test_schema\".\"greptime_metakv\""));
1619 assert!(s
1620 .get_value_with_lease_by_prefix
1621 .contains("\"test_schema\".\"greptime_metakv\""));
1622 assert!(s
1623 .delete_value
1624 .contains("\"test_schema\".\"greptime_metakv\""));
1625 }
1626}