1use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_meta::key::{CANDIDATES_ROOT, ELECTION_KEY};
20use common_telemetry::{error, info, warn};
21use common_time::Timestamp;
22use deadpool_postgres::{Manager, Pool};
23use snafu::{OptionExt, ResultExt, ensure};
24use tokio::sync::{RwLock, broadcast};
25use tokio::time::MissedTickBehavior;
26use tokio_postgres::Row;
27use tokio_postgres::types::ToSql;
28
29use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
30use crate::election::{
31 Election, LeaderChangeMessage, listen_leader_change, send_leader_change_and_set_flags,
32};
33use crate::error::{
34 DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35 Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40 lock_id: u64,
41 schema_name: Option<&'a str>,
42 table_name: &'a str,
43}
44
45struct ElectionSqlSet {
46 campaign: String,
47 step_down: String,
48 put_value_with_lease: String,
58 update_value_with_lease: String,
66 get_value_with_lease: String,
71 get_value_with_lease_by_prefix: String,
80 delete_value: String,
89}
90
91impl<'a> ElectionSqlFactory<'a> {
92 fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
93 Self {
94 lock_id,
95 schema_name,
96 table_name,
97 }
98 }
99
100 fn table_ident(&self) -> String {
101 match self.schema_name {
102 Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
103 _ => format!("\"{}\"", self.table_name),
104 }
105 }
106
107 fn build(self) -> ElectionSqlSet {
108 ElectionSqlSet {
109 campaign: self.campaign_sql(),
110 step_down: self.step_down_sql(),
111 put_value_with_lease: self.put_value_with_lease_sql(),
112 update_value_with_lease: self.update_value_with_lease_sql(),
113 get_value_with_lease: self.get_value_with_lease_sql(),
114 get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
115 delete_value: self.delete_value_sql(),
116 }
117 }
118
119 fn campaign_sql(&self) -> String {
120 format!("SELECT pg_try_advisory_lock({})", self.lock_id)
121 }
122
123 fn step_down_sql(&self) -> String {
124 format!("SELECT pg_advisory_unlock({})", self.lock_id)
125 }
126
127 fn put_value_with_lease_sql(&self) -> String {
128 let table = self.table_ident();
129 format!(
130 r#"WITH prev AS (
131 SELECT k, v FROM {table} WHERE k = $1
132 ), insert AS (
133 INSERT INTO {table}
134 VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
135 ON CONFLICT (k) DO NOTHING
136 )
137 SELECT k, v FROM prev;
138 "#,
139 table = table,
140 lease_sep = LEASE_SEP
141 )
142 }
143
144 fn update_value_with_lease_sql(&self) -> String {
145 let table = self.table_ident();
146 format!(
147 r#"UPDATE {table}
148 SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
149 WHERE k = $1 AND v = $2"#,
150 table = table,
151 lease_sep = LEASE_SEP
152 )
153 }
154
155 fn get_value_with_lease_sql(&self) -> String {
156 let table = self.table_ident();
157 format!(
158 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
159 table = table
160 )
161 }
162
163 fn get_value_with_lease_by_prefix_sql(&self) -> String {
164 let table = self.table_ident();
165 format!(
166 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
167 table = table
168 )
169 }
170
171 fn delete_value_sql(&self) -> String {
172 let table = self.table_ident();
173 format!(
174 "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
175 table = table
176 )
177 }
178}
179
180pub struct ElectionPgClient {
182 current: Option<deadpool::managed::Object<Manager>>,
183 pool: Pool,
184 execution_timeout: Duration,
189
190 idle_session_timeout: Duration,
195
196 statement_timeout: Duration,
201}
202
203impl ElectionPgClient {
204 pub fn new(
205 pool: Pool,
206 execution_timeout: Duration,
207 idle_session_timeout: Duration,
208 statement_timeout: Duration,
209 ) -> Result<ElectionPgClient> {
210 Ok(ElectionPgClient {
211 current: None,
212 pool,
213 execution_timeout,
214 idle_session_timeout,
215 statement_timeout,
216 })
217 }
218
219 fn set_idle_session_timeout_sql(&self) -> String {
220 format!(
221 "SET idle_session_timeout = '{}s';",
222 self.idle_session_timeout.as_secs()
223 )
224 }
225
226 fn set_statement_timeout_sql(&self) -> String {
227 format!(
228 "SET statement_timeout = '{}s';",
229 self.statement_timeout.as_secs()
230 )
231 }
232
233 async fn reset_client(&mut self) -> Result<()> {
234 if let Some(client) = self.current.take() {
235 let inner = deadpool::managed::Object::<deadpool_postgres::Manager>::take(client);
238 drop(inner);
239 }
240 self.maybe_init_client().await
241 }
242
243 async fn maybe_init_client(&mut self) -> Result<()> {
244 if self.current.is_none() {
245 let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
246
247 self.current = Some(client);
248 let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
250 self.execute(&idle_session_timeout_sql, &[]).await?;
251 let statement_timeout_sql = self.set_statement_timeout_sql();
252 self.execute(&statement_timeout_sql, &[]).await?;
253 }
254
255 Ok(())
256 }
257
258 async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
263 let result = tokio::time::timeout(
264 self.execution_timeout,
265 self.current.as_ref().unwrap().execute(sql, params),
266 )
267 .await
268 .map_err(|_| {
269 SqlExecutionTimeoutSnafu {
270 sql: sql.to_string(),
271 duration: self.execution_timeout,
272 }
273 .build()
274 })?;
275
276 result.context(PostgresExecutionSnafu { sql })
277 }
278
279 async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
284 let result = tokio::time::timeout(
285 self.execution_timeout,
286 self.current.as_ref().unwrap().query(sql, params),
287 )
288 .await
289 .map_err(|_| {
290 SqlExecutionTimeoutSnafu {
291 sql: sql.to_string(),
292 duration: self.execution_timeout,
293 }
294 .build()
295 })?;
296
297 result.context(PostgresExecutionSnafu { sql })
298 }
299}
300
301pub struct PgElection {
303 leader_value: String,
304 pg_client: RwLock<ElectionPgClient>,
305 is_leader: AtomicBool,
306 leader_infancy: AtomicBool,
307 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
308 store_key_prefix: String,
309 candidate_lease_ttl: Duration,
310 meta_lease_ttl: Duration,
311 sql_set: ElectionSqlSet,
312}
313
314impl PgElection {
315 async fn maybe_init_client(&self) -> Result<()> {
316 if self.pg_client.read().await.current.is_none() {
317 self.pg_client.write().await.maybe_init_client().await?;
318 }
319
320 Ok(())
321 }
322
323 #[allow(clippy::too_many_arguments)]
324 pub async fn with_pg_client(
325 leader_value: String,
326 pg_client: ElectionPgClient,
327 store_key_prefix: String,
328 candidate_lease_ttl: Duration,
329 meta_lease_ttl: Duration,
330 schema_name: Option<&str>,
331 table_name: &str,
332 lock_id: u64,
333 ) -> Result<ElectionRef> {
334 if let Some(s) = schema_name {
335 common_telemetry::info!("PgElection uses schema: {}", s);
336 } else {
337 common_telemetry::info!("PgElection uses default search_path (no schema provided)");
338 }
339 let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
340
341 let tx = listen_leader_change(leader_value.clone());
342 Ok(Arc::new(Self {
343 leader_value,
344 pg_client: RwLock::new(pg_client),
345 is_leader: AtomicBool::new(false),
346 leader_infancy: AtomicBool::new(false),
347 leader_watcher: tx,
348 store_key_prefix,
349 candidate_lease_ttl,
350 meta_lease_ttl,
351 sql_set: sql_factory.build(),
352 }))
353 }
354
355 fn election_key(&self) -> String {
356 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
357 }
358
359 fn candidate_root(&self) -> String {
360 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
361 }
362
363 fn candidate_key(&self) -> String {
364 format!("{}{}", self.candidate_root(), self.leader_value)
365 }
366}
367
368#[async_trait::async_trait]
369impl Election for PgElection {
370 type Leader = LeaderValue;
371
372 fn is_leader(&self) -> bool {
373 self.is_leader.load(Ordering::Relaxed)
374 }
375
376 fn in_leader_infancy(&self) -> bool {
377 self.leader_infancy
378 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
379 .is_ok()
380 }
381
382 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
383 let key = self.candidate_key();
384 let node_info =
385 serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
386 input: format!("{node_info:?}"),
387 })?;
388 let res = self
389 .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
390 .await?;
391 if !res {
393 self.delete_value(&key).await?;
394 self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
395 .await?;
396 }
397
398 let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
400 loop {
401 let _ = keep_alive_interval.tick().await;
402
403 let lease = self
404 .get_value_with_lease(&key)
405 .await?
406 .context(UnexpectedSnafu {
407 violated: format!("Failed to get lease for key: {:?}", key),
408 })?;
409
410 ensure!(
411 lease.expire_time > lease.current,
412 UnexpectedSnafu {
413 violated: format!(
414 "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
415 lease.expire_time, lease.current, key
416 ),
417 }
418 );
419
420 self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
422 .await?;
423 }
424 }
425
426 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
427 let key_prefix = self.candidate_root();
428 let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
429 candidates.retain(|c| c.1 > current);
431 let mut valid_candidates = Vec::with_capacity(candidates.len());
432 for (c, _) in candidates {
433 let node_info: MetasrvNodeInfo =
434 serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
435 input: format!("{:?}", c),
436 })?;
437 valid_candidates.push(node_info);
438 }
439 Ok(valid_candidates)
440 }
441
442 async fn campaign(&self) -> Result<()> {
455 let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
456 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
457
458 self.maybe_init_client().await?;
459 loop {
460 let res = self
461 .pg_client
462 .read()
463 .await
464 .query(&self.sql_set.campaign, &[])
465 .await?;
466 let row = res.first().context(UnexpectedSnafu {
467 violated: "Failed to get the result of acquiring advisory lock",
468 })?;
469 let is_leader = row.try_get(0).map_err(|_| {
470 UnexpectedSnafu {
471 violated: "Failed to get the result of get lock",
472 }
473 .build()
474 })?;
475 if is_leader {
476 self.leader_action().await?;
477 } else {
478 self.follower_action().await?;
479 }
480 let _ = keep_alive_interval.tick().await;
481 }
482 }
483
484 async fn reset_campaign(&self) {
485 info!("Resetting campaign");
486 if self.is_leader.load(Ordering::Relaxed) {
487 if let Err(err) = self.step_down_without_lock().await {
488 error!(err; "Failed to step down without lock");
489 }
490 info!("Step down without lock successfully, due to reset campaign");
491 }
492 if let Err(err) = self.pg_client.write().await.reset_client().await {
493 error!(err; "Failed to reset client");
494 }
495 }
496
497 async fn leader(&self) -> Result<Self::Leader> {
498 if self.is_leader.load(Ordering::Relaxed) {
499 Ok(self.leader_value.as_bytes().into())
500 } else {
501 let key = self.election_key();
502 if let Some(lease) = self.get_value_with_lease(&key).await? {
503 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
504 Ok(lease.leader_value.as_bytes().into())
505 } else {
506 NoLeaderSnafu.fail()
507 }
508 }
509 }
510
511 async fn resign(&self) -> Result<()> {
512 todo!()
513 }
514
515 fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
516 self.leader_watcher.subscribe()
517 }
518}
519
520impl PgElection {
521 async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
523 let key = key.as_bytes();
524 self.maybe_init_client().await?;
525 let res = self
526 .pg_client
527 .read()
528 .await
529 .query(&self.sql_set.get_value_with_lease, &[&key])
530 .await?;
531
532 if res.is_empty() {
533 Ok(None)
534 } else {
535 let current_time_str = res[0].try_get(1).unwrap_or_default();
537 let current_time = match Timestamp::from_str(current_time_str, None) {
538 Ok(ts) => ts,
539 Err(_) => UnexpectedSnafu {
540 violated: format!("Invalid timestamp: {}", current_time_str),
541 }
542 .fail()?,
543 };
544 let value_and_expire_time =
546 String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
547 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
548
549 Ok(Some(Lease {
550 leader_value: value,
551 expire_time,
552 current: current_time,
553 origin: value_and_expire_time.to_string(),
554 }))
555 }
556 }
557
558 async fn get_value_with_lease_by_prefix(
560 &self,
561 key_prefix: &str,
562 ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
563 let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
564 self.maybe_init_client().await?;
565 let res = self
566 .pg_client
567 .read()
568 .await
569 .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
570 .await?;
571
572 let mut values_with_leases = vec![];
573 let mut current = Timestamp::default();
574 for row in res {
575 let current_time_str = row.try_get(1).unwrap_or_default();
576 current = match Timestamp::from_str(current_time_str, None) {
577 Ok(ts) => ts,
578 Err(_) => UnexpectedSnafu {
579 violated: format!("Invalid timestamp: {}", current_time_str),
580 }
581 .fail()?,
582 };
583
584 let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
585 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
586
587 values_with_leases.push((value, expire_time));
588 }
589 Ok((values_with_leases, current))
590 }
591
592 async fn update_value_with_lease(
593 &self,
594 key: &str,
595 prev: &str,
596 updated: &str,
597 lease_ttl: Duration,
598 ) -> Result<()> {
599 let key = key.as_bytes();
600 let prev = prev.as_bytes();
601 self.maybe_init_client().await?;
602 let lease_ttl_secs = lease_ttl.as_secs() as f64;
603 let res = self
604 .pg_client
605 .read()
606 .await
607 .execute(
608 &self.sql_set.update_value_with_lease,
609 &[&key, &prev, &updated, &lease_ttl_secs],
610 )
611 .await?;
612
613 ensure!(
614 res == 1,
615 UnexpectedSnafu {
616 violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
617 }
618 );
619
620 Ok(())
621 }
622
623 async fn put_value_with_lease(
625 &self,
626 key: &str,
627 value: &str,
628 lease_ttl: Duration,
629 ) -> Result<bool> {
630 let key = key.as_bytes();
631 let lease_ttl_secs = lease_ttl.as_secs() as f64;
632 let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
633 self.maybe_init_client().await?;
634 let res = self
635 .pg_client
636 .read()
637 .await
638 .query(&self.sql_set.put_value_with_lease, ¶ms)
639 .await?;
640 Ok(res.is_empty())
641 }
642
643 async fn delete_value(&self, key: &str) -> Result<bool> {
646 let key = key.as_bytes();
647 self.maybe_init_client().await?;
648 let res = self
649 .pg_client
650 .read()
651 .await
652 .query(&self.sql_set.delete_value, &[&key])
653 .await?;
654
655 Ok(res.len() == 1)
656 }
657
658 async fn leader_action(&self) -> Result<()> {
679 let key = self.election_key();
680 if self.is_leader() {
682 match self.get_value_with_lease(&key).await? {
683 Some(lease) => {
684 match (
685 lease.leader_value == self.leader_value,
686 lease.expire_time > lease.current,
687 ) {
688 (true, true) => {
690 self.update_value_with_lease(
692 &key,
693 &lease.origin,
694 &self.leader_value,
695 self.meta_lease_ttl,
696 )
697 .await?;
698 }
699 (true, false) => {
701 warn!("Leader lease expired, now stepping down.");
702 self.step_down().await?;
703 }
704 (false, _) => {
706 warn!(
707 "Leader lease not found, but still hold the lock. Now stepping down."
708 );
709 self.step_down().await?;
710 }
711 }
712 }
713 None => {
715 warn!("Leader lease not found, but still hold the lock. Now stepping down.");
716 self.step_down().await?;
717 }
718 }
719 } else {
721 self.elected().await?;
722 }
723 Ok(())
724 }
725
726 async fn follower_action(&self) -> Result<()> {
737 let key = self.election_key();
738 if self.is_leader() {
740 self.step_down_without_lock().await?;
741 }
742 let lease = self
743 .get_value_with_lease(&key)
744 .await?
745 .context(NoLeaderSnafu)?;
746 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
748 Ok(())
750 }
751
752 async fn step_down(&self) -> Result<()> {
759 let key = self.election_key();
760 let leader_key = RdsLeaderKey {
761 name: self.leader_value.clone().into_bytes(),
762 key: key.clone().into_bytes(),
763 ..Default::default()
764 };
765 self.delete_value(&key).await?;
766 self.maybe_init_client().await?;
767 self.pg_client
768 .read()
769 .await
770 .query(&self.sql_set.step_down, &[])
771 .await?;
772 send_leader_change_and_set_flags(
773 &self.is_leader,
774 &self.leader_infancy,
775 &self.leader_watcher,
776 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
777 );
778 Ok(())
779 }
780
781 async fn step_down_without_lock(&self) -> Result<()> {
783 let key = self.election_key().into_bytes();
784 let leader_key = RdsLeaderKey {
785 name: self.leader_value.clone().into_bytes(),
786 key: key.clone(),
787 ..Default::default()
788 };
789 send_leader_change_and_set_flags(
790 &self.is_leader,
791 &self.leader_infancy,
792 &self.leader_watcher,
793 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
794 );
795 Ok(())
796 }
797
798 async fn elected(&self) -> Result<()> {
801 let key = self.election_key();
802 let leader_key = RdsLeaderKey {
803 name: self.leader_value.clone().into_bytes(),
804 key: key.clone().into_bytes(),
805 ..Default::default()
806 };
807 self.delete_value(&key).await?;
808 self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
809 .await?;
810
811 if self
812 .is_leader
813 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
814 .is_ok()
815 {
816 self.leader_infancy.store(true, Ordering::Release);
817
818 if let Err(e) = self
819 .leader_watcher
820 .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
821 {
822 error!(e; "Failed to send leader change message");
823 }
824 }
825 Ok(())
826 }
827}
828
829#[cfg(test)]
830mod tests {
831 use std::assert_matches::assert_matches;
832 use std::env;
833
834 use common_meta::maybe_skip_postgres_integration_test;
835
836 use super::*;
837 use crate::error;
838 use crate::utils::postgres::create_postgres_pool;
839
840 async fn create_postgres_client(
841 table_name: Option<&str>,
842 execution_timeout: Duration,
843 idle_session_timeout: Duration,
844 statement_timeout: Duration,
845 ) -> Result<ElectionPgClient> {
846 let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
847 if endpoint.is_empty() {
848 return UnexpectedSnafu {
849 violated: "Postgres endpoint is empty".to_string(),
850 }
851 .fail();
852 }
853 let pool = create_postgres_pool(&[endpoint], None, None).await.unwrap();
854 let mut pg_client = ElectionPgClient::new(
855 pool,
856 execution_timeout,
857 idle_session_timeout,
858 statement_timeout,
859 )
860 .unwrap();
861 pg_client.maybe_init_client().await?;
862 if let Some(table_name) = table_name {
863 let create_table_sql = format!(
864 "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
865 table_name
866 );
867 pg_client.execute(&create_table_sql, &[]).await?;
868 }
869 Ok(pg_client)
870 }
871
872 async fn drop_table(pg_election: &PgElection, table_name: &str) {
873 let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
874 pg_election
875 .pg_client
876 .read()
877 .await
878 .execute(&sql, &[])
879 .await
880 .unwrap();
881 }
882
883 #[tokio::test]
884 async fn test_postgres_crud() {
885 maybe_skip_postgres_integration_test!();
886 let key = "test_key".to_string();
887 let value = "test_value".to_string();
888
889 let uuid = uuid::Uuid::new_v4().to_string();
890 let table_name = "test_postgres_crud_greptime_metakv";
891 let candidate_lease_ttl = Duration::from_secs(10);
892 let execution_timeout = Duration::from_secs(10);
893 let statement_timeout = Duration::from_secs(10);
894 let meta_lease_ttl = Duration::from_secs(2);
895 let idle_session_timeout = Duration::from_secs(0);
896 let client = create_postgres_client(
897 Some(table_name),
898 execution_timeout,
899 idle_session_timeout,
900 statement_timeout,
901 )
902 .await
903 .unwrap();
904
905 let (tx, _) = broadcast::channel(100);
906 let pg_election = PgElection {
907 leader_value: "test_leader".to_string(),
908 pg_client: RwLock::new(client),
909 is_leader: AtomicBool::new(false),
910 leader_infancy: AtomicBool::new(true),
911 leader_watcher: tx,
912 store_key_prefix: uuid,
913 candidate_lease_ttl,
914 meta_lease_ttl,
915 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
916 };
917
918 let res = pg_election
919 .put_value_with_lease(&key, &value, candidate_lease_ttl)
920 .await
921 .unwrap();
922 assert!(res);
923
924 let lease = pg_election
925 .get_value_with_lease(&key)
926 .await
927 .unwrap()
928 .unwrap();
929 assert_eq!(lease.leader_value, value);
930
931 pg_election
932 .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
933 .await
934 .unwrap();
935
936 let res = pg_election.delete_value(&key).await.unwrap();
937 assert!(res);
938
939 let res = pg_election.get_value_with_lease(&key).await.unwrap();
940 assert!(res.is_none());
941
942 for i in 0..10 {
943 let key = format!("test_key_{}", i);
944 let value = format!("test_value_{}", i);
945 pg_election
946 .put_value_with_lease(&key, &value, candidate_lease_ttl)
947 .await
948 .unwrap();
949 }
950
951 let key_prefix = "test_key".to_string();
952 let (res, _) = pg_election
953 .get_value_with_lease_by_prefix(&key_prefix)
954 .await
955 .unwrap();
956 assert_eq!(res.len(), 10);
957
958 for i in 0..10 {
959 let key = format!("test_key_{}", i);
960 let res = pg_election.delete_value(&key).await.unwrap();
961 assert!(res);
962 }
963
964 let (res, current) = pg_election
965 .get_value_with_lease_by_prefix(&key_prefix)
966 .await
967 .unwrap();
968 assert!(res.is_empty());
969 assert!(current == Timestamp::default());
970
971 drop_table(&pg_election, table_name).await;
972 }
973
974 async fn candidate(
975 leader_value: String,
976 candidate_lease_ttl: Duration,
977 store_key_prefix: String,
978 table_name: String,
979 ) {
980 let execution_timeout = Duration::from_secs(10);
981 let statement_timeout = Duration::from_secs(10);
982 let meta_lease_ttl = Duration::from_secs(2);
983 let idle_session_timeout = Duration::from_secs(0);
984 let client = create_postgres_client(
985 None,
986 execution_timeout,
987 idle_session_timeout,
988 statement_timeout,
989 )
990 .await
991 .unwrap();
992
993 let (tx, _) = broadcast::channel(100);
994 let pg_election = PgElection {
995 leader_value,
996 pg_client: RwLock::new(client),
997 is_leader: AtomicBool::new(false),
998 leader_infancy: AtomicBool::new(true),
999 leader_watcher: tx,
1000 store_key_prefix,
1001 candidate_lease_ttl,
1002 meta_lease_ttl,
1003 sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
1004 };
1005
1006 let node_info = MetasrvNodeInfo {
1007 addr: "test_addr".to_string(),
1008 version: "test_version".to_string(),
1009 git_commit: "test_git_commit".to_string(),
1010 start_time_ms: 0,
1011 total_cpu_millicores: 0,
1012 total_memory_bytes: 0,
1013 cpu_usage_millicores: 0,
1014 memory_usage_bytes: 0,
1015 hostname: "test_hostname".to_string(),
1016 };
1017 pg_election.register_candidate(&node_info).await.unwrap();
1018 }
1019
1020 #[tokio::test]
1021 async fn test_candidate_registration() {
1022 maybe_skip_postgres_integration_test!();
1023 let leader_value_prefix = "test_leader".to_string();
1024 let uuid = uuid::Uuid::new_v4().to_string();
1025 let table_name = "test_candidate_registration_greptime_metakv";
1026 let mut handles = vec![];
1027 let candidate_lease_ttl = Duration::from_secs(5);
1028 let execution_timeout = Duration::from_secs(10);
1029 let statement_timeout = Duration::from_secs(10);
1030 let meta_lease_ttl = Duration::from_secs(2);
1031 let idle_session_timeout = Duration::from_secs(0);
1032 let client = create_postgres_client(
1033 Some(table_name),
1034 execution_timeout,
1035 idle_session_timeout,
1036 statement_timeout,
1037 )
1038 .await
1039 .unwrap();
1040
1041 for i in 0..10 {
1042 let leader_value = format!("{}{}", leader_value_prefix, i);
1043 let handle = tokio::spawn(candidate(
1044 leader_value,
1045 candidate_lease_ttl,
1046 uuid.clone(),
1047 table_name.to_string(),
1048 ));
1049 handles.push(handle);
1050 }
1051 tokio::time::sleep(Duration::from_secs(3)).await;
1053
1054 let (tx, _) = broadcast::channel(100);
1055 let leader_value = "test_leader".to_string();
1056 let pg_election = PgElection {
1057 leader_value,
1058 pg_client: RwLock::new(client),
1059 is_leader: AtomicBool::new(false),
1060 leader_infancy: AtomicBool::new(true),
1061 leader_watcher: tx,
1062 store_key_prefix: uuid.clone(),
1063 candidate_lease_ttl,
1064 meta_lease_ttl,
1065 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1066 };
1067
1068 let candidates = pg_election.all_candidates().await.unwrap();
1069 assert_eq!(candidates.len(), 10);
1070
1071 for handle in handles {
1072 handle.abort();
1073 }
1074
1075 tokio::time::sleep(Duration::from_secs(5)).await;
1077 let candidates = pg_election.all_candidates().await.unwrap();
1078 assert!(candidates.is_empty());
1079
1080 for i in 0..10 {
1082 let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1083 let res = pg_election.delete_value(&key).await.unwrap();
1084 assert!(res);
1085 }
1086
1087 drop_table(&pg_election, table_name).await;
1088 }
1089
1090 #[tokio::test]
1091 async fn test_elected_and_step_down() {
1092 maybe_skip_postgres_integration_test!();
1093 let leader_value = "test_leader".to_string();
1094 let uuid = uuid::Uuid::new_v4().to_string();
1095 let table_name = "test_elected_and_step_down_greptime_metakv";
1096 let candidate_lease_ttl = Duration::from_secs(5);
1097 let execution_timeout = Duration::from_secs(10);
1098 let statement_timeout = Duration::from_secs(10);
1099 let meta_lease_ttl = Duration::from_secs(2);
1100 let idle_session_timeout = Duration::from_secs(0);
1101 let client = create_postgres_client(
1102 Some(table_name),
1103 execution_timeout,
1104 idle_session_timeout,
1105 statement_timeout,
1106 )
1107 .await
1108 .unwrap();
1109
1110 let (tx, mut rx) = broadcast::channel(100);
1111 let leader_pg_election = PgElection {
1112 leader_value: leader_value.clone(),
1113 pg_client: RwLock::new(client),
1114 is_leader: AtomicBool::new(false),
1115 leader_infancy: AtomicBool::new(true),
1116 leader_watcher: tx,
1117 store_key_prefix: uuid,
1118 candidate_lease_ttl,
1119 meta_lease_ttl,
1120 sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1121 };
1122
1123 leader_pg_election.elected().await.unwrap();
1124 let lease = leader_pg_election
1125 .get_value_with_lease(&leader_pg_election.election_key())
1126 .await
1127 .unwrap()
1128 .unwrap();
1129 assert!(lease.leader_value == leader_value);
1130 assert!(lease.expire_time > lease.current);
1131 assert!(leader_pg_election.is_leader());
1132
1133 match rx.recv().await {
1134 Ok(LeaderChangeMessage::Elected(key)) => {
1135 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1136 assert_eq!(
1137 String::from_utf8_lossy(key.key()),
1138 leader_pg_election.election_key()
1139 );
1140 assert_eq!(key.lease_id(), i64::default());
1141 assert_eq!(key.revision(), i64::default());
1142 }
1143 _ => panic!("Expected LeaderChangeMessage::Elected"),
1144 }
1145
1146 leader_pg_election.step_down_without_lock().await.unwrap();
1147 let lease = leader_pg_election
1148 .get_value_with_lease(&leader_pg_election.election_key())
1149 .await
1150 .unwrap()
1151 .unwrap();
1152 assert!(lease.leader_value == leader_value);
1153 assert!(!leader_pg_election.is_leader());
1154
1155 match rx.recv().await {
1156 Ok(LeaderChangeMessage::StepDown(key)) => {
1157 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1158 assert_eq!(
1159 String::from_utf8_lossy(key.key()),
1160 leader_pg_election.election_key()
1161 );
1162 assert_eq!(key.lease_id(), i64::default());
1163 assert_eq!(key.revision(), i64::default());
1164 }
1165 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1166 }
1167
1168 leader_pg_election.elected().await.unwrap();
1169 let lease = leader_pg_election
1170 .get_value_with_lease(&leader_pg_election.election_key())
1171 .await
1172 .unwrap()
1173 .unwrap();
1174 assert!(lease.leader_value == leader_value);
1175 assert!(lease.expire_time > lease.current);
1176 assert!(leader_pg_election.is_leader());
1177
1178 match rx.recv().await {
1179 Ok(LeaderChangeMessage::Elected(key)) => {
1180 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1181 assert_eq!(
1182 String::from_utf8_lossy(key.key()),
1183 leader_pg_election.election_key()
1184 );
1185 assert_eq!(key.lease_id(), i64::default());
1186 assert_eq!(key.revision(), i64::default());
1187 }
1188 _ => panic!("Expected LeaderChangeMessage::Elected"),
1189 }
1190
1191 leader_pg_election.step_down().await.unwrap();
1192 let res = leader_pg_election
1193 .get_value_with_lease(&leader_pg_election.election_key())
1194 .await
1195 .unwrap();
1196 assert!(res.is_none());
1197 assert!(!leader_pg_election.is_leader());
1198
1199 match rx.recv().await {
1200 Ok(LeaderChangeMessage::StepDown(key)) => {
1201 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1202 assert_eq!(
1203 String::from_utf8_lossy(key.key()),
1204 leader_pg_election.election_key()
1205 );
1206 assert_eq!(key.lease_id(), i64::default());
1207 assert_eq!(key.revision(), i64::default());
1208 }
1209 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1210 }
1211
1212 drop_table(&leader_pg_election, table_name).await;
1213 }
1214
1215 #[tokio::test]
1216 async fn test_leader_action() {
1217 maybe_skip_postgres_integration_test!();
1218 let leader_value = "test_leader".to_string();
1219 let uuid = uuid::Uuid::new_v4().to_string();
1220 let table_name = "test_leader_action_greptime_metakv";
1221 let candidate_lease_ttl = Duration::from_secs(5);
1222 let execution_timeout = Duration::from_secs(10);
1223 let statement_timeout = Duration::from_secs(10);
1224 let meta_lease_ttl = Duration::from_secs(2);
1225 let idle_session_timeout = Duration::from_secs(0);
1226 let client = create_postgres_client(
1227 Some(table_name),
1228 execution_timeout,
1229 idle_session_timeout,
1230 statement_timeout,
1231 )
1232 .await
1233 .unwrap();
1234
1235 let (tx, mut rx) = broadcast::channel(100);
1236 let leader_pg_election = PgElection {
1237 leader_value: leader_value.clone(),
1238 pg_client: RwLock::new(client),
1239 is_leader: AtomicBool::new(false),
1240 leader_infancy: AtomicBool::new(true),
1241 leader_watcher: tx,
1242 store_key_prefix: uuid,
1243 candidate_lease_ttl,
1244 meta_lease_ttl,
1245 sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1246 };
1247
1248 let res = leader_pg_election
1250 .pg_client
1251 .read()
1252 .await
1253 .query(&leader_pg_election.sql_set.campaign, &[])
1254 .await
1255 .unwrap();
1256 let res: bool = res[0].get(0);
1257 assert!(res);
1258 leader_pg_election.leader_action().await.unwrap();
1259 let lease = leader_pg_election
1260 .get_value_with_lease(&leader_pg_election.election_key())
1261 .await
1262 .unwrap()
1263 .unwrap();
1264 assert!(lease.leader_value == leader_value);
1265 assert!(lease.expire_time > lease.current);
1266 assert!(leader_pg_election.is_leader());
1267
1268 match rx.recv().await {
1269 Ok(LeaderChangeMessage::Elected(key)) => {
1270 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1271 assert_eq!(
1272 String::from_utf8_lossy(key.key()),
1273 leader_pg_election.election_key()
1274 );
1275 assert_eq!(key.lease_id(), i64::default());
1276 assert_eq!(key.revision(), i64::default());
1277 }
1278 _ => panic!("Expected LeaderChangeMessage::Elected"),
1279 }
1280
1281 let res = leader_pg_election
1283 .pg_client
1284 .read()
1285 .await
1286 .query(&leader_pg_election.sql_set.campaign, &[])
1287 .await
1288 .unwrap();
1289 let res: bool = res[0].get(0);
1290 assert!(res);
1291 leader_pg_election.leader_action().await.unwrap();
1292 let new_lease = leader_pg_election
1293 .get_value_with_lease(&leader_pg_election.election_key())
1294 .await
1295 .unwrap()
1296 .unwrap();
1297 assert!(new_lease.leader_value == leader_value);
1298 assert!(
1299 new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1300 );
1301 assert!(leader_pg_election.is_leader());
1302
1303 tokio::time::sleep(Duration::from_secs(2)).await;
1305
1306 let res = leader_pg_election
1307 .pg_client
1308 .read()
1309 .await
1310 .query(&leader_pg_election.sql_set.campaign, &[])
1311 .await
1312 .unwrap();
1313 let res: bool = res[0].get(0);
1314 assert!(res);
1315 leader_pg_election.leader_action().await.unwrap();
1316 let res = leader_pg_election
1317 .get_value_with_lease(&leader_pg_election.election_key())
1318 .await
1319 .unwrap();
1320 assert!(res.is_none());
1321
1322 match rx.recv().await {
1323 Ok(LeaderChangeMessage::StepDown(key)) => {
1324 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1325 assert_eq!(
1326 String::from_utf8_lossy(key.key()),
1327 leader_pg_election.election_key()
1328 );
1329 assert_eq!(key.lease_id(), i64::default());
1330 assert_eq!(key.revision(), i64::default());
1331 }
1332 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1333 }
1334
1335 let res = leader_pg_election
1337 .pg_client
1338 .read()
1339 .await
1340 .query(&leader_pg_election.sql_set.campaign, &[])
1341 .await
1342 .unwrap();
1343 let res: bool = res[0].get(0);
1344 assert!(res);
1345 leader_pg_election.leader_action().await.unwrap();
1346 let lease = leader_pg_election
1347 .get_value_with_lease(&leader_pg_election.election_key())
1348 .await
1349 .unwrap()
1350 .unwrap();
1351 assert!(lease.leader_value == leader_value);
1352 assert!(lease.expire_time > lease.current);
1353 assert!(leader_pg_election.is_leader());
1354
1355 match rx.recv().await {
1356 Ok(LeaderChangeMessage::Elected(key)) => {
1357 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1358 assert_eq!(
1359 String::from_utf8_lossy(key.key()),
1360 leader_pg_election.election_key()
1361 );
1362 assert_eq!(key.lease_id(), i64::default());
1363 assert_eq!(key.revision(), i64::default());
1364 }
1365 _ => panic!("Expected LeaderChangeMessage::Elected"),
1366 }
1367
1368 leader_pg_election
1370 .delete_value(&leader_pg_election.election_key())
1371 .await
1372 .unwrap();
1373 leader_pg_election.leader_action().await.unwrap();
1374 let res = leader_pg_election
1375 .get_value_with_lease(&leader_pg_election.election_key())
1376 .await
1377 .unwrap();
1378 assert!(res.is_none());
1379 assert!(!leader_pg_election.is_leader());
1380
1381 match rx.recv().await {
1382 Ok(LeaderChangeMessage::StepDown(key)) => {
1383 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1384 assert_eq!(
1385 String::from_utf8_lossy(key.key()),
1386 leader_pg_election.election_key()
1387 );
1388 assert_eq!(key.lease_id(), i64::default());
1389 assert_eq!(key.revision(), i64::default());
1390 }
1391 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1392 }
1393
1394 let res = leader_pg_election
1396 .pg_client
1397 .read()
1398 .await
1399 .query(&leader_pg_election.sql_set.campaign, &[])
1400 .await
1401 .unwrap();
1402 let res: bool = res[0].get(0);
1403 assert!(res);
1404 leader_pg_election.leader_action().await.unwrap();
1405 let lease = leader_pg_election
1406 .get_value_with_lease(&leader_pg_election.election_key())
1407 .await
1408 .unwrap()
1409 .unwrap();
1410 assert!(lease.leader_value == leader_value);
1411 assert!(lease.expire_time > lease.current);
1412 assert!(leader_pg_election.is_leader());
1413
1414 match rx.recv().await {
1415 Ok(LeaderChangeMessage::Elected(key)) => {
1416 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1417 assert_eq!(
1418 String::from_utf8_lossy(key.key()),
1419 leader_pg_election.election_key()
1420 );
1421 assert_eq!(key.lease_id(), i64::default());
1422 assert_eq!(key.revision(), i64::default());
1423 }
1424 _ => panic!("Expected LeaderChangeMessage::Elected"),
1425 }
1426
1427 let res = leader_pg_election
1429 .pg_client
1430 .read()
1431 .await
1432 .query(&leader_pg_election.sql_set.campaign, &[])
1433 .await
1434 .unwrap();
1435 let res: bool = res[0].get(0);
1436 assert!(res);
1437 leader_pg_election
1438 .delete_value(&leader_pg_election.election_key())
1439 .await
1440 .unwrap();
1441 leader_pg_election
1442 .put_value_with_lease(
1443 &leader_pg_election.election_key(),
1444 "test",
1445 Duration::from_secs(10),
1446 )
1447 .await
1448 .unwrap();
1449 leader_pg_election.leader_action().await.unwrap();
1450 let res = leader_pg_election
1451 .get_value_with_lease(&leader_pg_election.election_key())
1452 .await
1453 .unwrap();
1454 assert!(res.is_none());
1455 assert!(!leader_pg_election.is_leader());
1456
1457 match rx.recv().await {
1458 Ok(LeaderChangeMessage::StepDown(key)) => {
1459 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1460 assert_eq!(
1461 String::from_utf8_lossy(key.key()),
1462 leader_pg_election.election_key()
1463 );
1464 assert_eq!(key.lease_id(), i64::default());
1465 assert_eq!(key.revision(), i64::default());
1466 }
1467 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1468 }
1469
1470 leader_pg_election
1472 .pg_client
1473 .read()
1474 .await
1475 .query(&leader_pg_election.sql_set.step_down, &[])
1476 .await
1477 .unwrap();
1478
1479 drop_table(&leader_pg_election, table_name).await;
1480 }
1481
1482 #[tokio::test]
1483 async fn test_follower_action() {
1484 maybe_skip_postgres_integration_test!();
1485 common_telemetry::init_default_ut_logging();
1486 let uuid = uuid::Uuid::new_v4().to_string();
1487 let table_name = "test_follower_action_greptime_metakv";
1488
1489 let candidate_lease_ttl = Duration::from_secs(5);
1490 let execution_timeout = Duration::from_secs(10);
1491 let statement_timeout = Duration::from_secs(10);
1492 let meta_lease_ttl = Duration::from_secs(2);
1493 let idle_session_timeout = Duration::from_secs(0);
1494 let follower_client = create_postgres_client(
1495 Some(table_name),
1496 execution_timeout,
1497 idle_session_timeout,
1498 statement_timeout,
1499 )
1500 .await
1501 .unwrap();
1502 let (tx, mut rx) = broadcast::channel(100);
1503 let follower_pg_election = PgElection {
1504 leader_value: "test_follower".to_string(),
1505 pg_client: RwLock::new(follower_client),
1506 is_leader: AtomicBool::new(false),
1507 leader_infancy: AtomicBool::new(true),
1508 leader_watcher: tx,
1509 store_key_prefix: uuid.clone(),
1510 candidate_lease_ttl,
1511 meta_lease_ttl,
1512 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1513 };
1514
1515 let leader_client = create_postgres_client(
1516 Some(table_name),
1517 execution_timeout,
1518 idle_session_timeout,
1519 statement_timeout,
1520 )
1521 .await
1522 .unwrap();
1523 let (tx, _) = broadcast::channel(100);
1524 let leader_pg_election = PgElection {
1525 leader_value: "test_leader".to_string(),
1526 pg_client: RwLock::new(leader_client),
1527 is_leader: AtomicBool::new(false),
1528 leader_infancy: AtomicBool::new(true),
1529 leader_watcher: tx,
1530 store_key_prefix: uuid,
1531 candidate_lease_ttl,
1532 meta_lease_ttl,
1533 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1534 };
1535
1536 leader_pg_election
1537 .pg_client
1538 .read()
1539 .await
1540 .query(&leader_pg_election.sql_set.campaign, &[])
1541 .await
1542 .unwrap();
1543 leader_pg_election.elected().await.unwrap();
1544
1545 follower_pg_election.follower_action().await.unwrap();
1547
1548 tokio::time::sleep(Duration::from_secs(2)).await;
1550 assert!(follower_pg_election.follower_action().await.is_err());
1551
1552 leader_pg_election
1554 .delete_value(&leader_pg_election.election_key())
1555 .await
1556 .unwrap();
1557 assert!(follower_pg_election.follower_action().await.is_err());
1558
1559 follower_pg_election
1561 .is_leader
1562 .store(true, Ordering::Relaxed);
1563 assert!(follower_pg_election.follower_action().await.is_err());
1564
1565 match rx.recv().await {
1566 Ok(LeaderChangeMessage::StepDown(key)) => {
1567 assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1568 assert_eq!(
1569 String::from_utf8_lossy(key.key()),
1570 follower_pg_election.election_key()
1571 );
1572 assert_eq!(key.lease_id(), i64::default());
1573 assert_eq!(key.revision(), i64::default());
1574 }
1575 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1576 }
1577
1578 leader_pg_election
1580 .pg_client
1581 .read()
1582 .await
1583 .query(&leader_pg_election.sql_set.step_down, &[])
1584 .await
1585 .unwrap();
1586
1587 drop_table(&follower_pg_election, table_name).await;
1588 }
1589
1590 #[tokio::test]
1591 async fn test_reset_campaign() {
1592 maybe_skip_postgres_integration_test!();
1593 let leader_value = "test_leader".to_string();
1594 let uuid = uuid::Uuid::new_v4().to_string();
1595 let table_name = "test_reset_campaign_greptime_metakv";
1596 let candidate_lease_ttl = Duration::from_secs(5);
1597 let execution_timeout = Duration::from_secs(10);
1598 let statement_timeout = Duration::from_secs(10);
1599 let meta_lease_ttl = Duration::from_secs(2);
1600 let idle_session_timeout = Duration::from_secs(0);
1601 let client = create_postgres_client(
1602 Some(table_name),
1603 execution_timeout,
1604 idle_session_timeout,
1605 statement_timeout,
1606 )
1607 .await
1608 .unwrap();
1609
1610 let (tx, _) = broadcast::channel(100);
1611 let leader_pg_election = PgElection {
1612 leader_value,
1613 pg_client: RwLock::new(client),
1614 is_leader: AtomicBool::new(false),
1615 leader_infancy: AtomicBool::new(true),
1616 leader_watcher: tx,
1617 store_key_prefix: uuid,
1618 candidate_lease_ttl,
1619 meta_lease_ttl,
1620 sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1621 };
1622 leader_pg_election.is_leader.store(true, Ordering::Relaxed);
1623 leader_pg_election.reset_campaign().await;
1624 assert!(!leader_pg_election.is_leader());
1625 drop_table(&leader_pg_election, table_name).await;
1626 }
1627
1628 #[tokio::test]
1629 async fn test_idle_session_timeout() {
1630 maybe_skip_postgres_integration_test!();
1631 common_telemetry::init_default_ut_logging();
1632 let execution_timeout = Duration::from_secs(10);
1633 let statement_timeout = Duration::from_secs(10);
1634 let idle_session_timeout = Duration::from_secs(1);
1635 let mut client = create_postgres_client(
1636 None,
1637 execution_timeout,
1638 idle_session_timeout,
1639 statement_timeout,
1640 )
1641 .await
1642 .unwrap();
1643 tokio::time::sleep(Duration::from_millis(1100)).await;
1644 let err = client.query("SELECT 1", &[]).await.unwrap_err();
1646 assert_matches!(err, error::Error::PostgresExecution { .. });
1647 let error::Error::PostgresExecution { error, .. } = err else {
1648 panic!("Expected PostgresExecution error");
1649 };
1650 assert!(error.is_closed());
1651 client.reset_client().await.unwrap();
1653 let _ = client.query("SELECT 1", &[]).await.unwrap();
1654 }
1655
1656 #[test]
1657 fn test_election_sql_with_schema() {
1658 let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1659 let s = f.build();
1660 assert!(s.campaign.contains("pg_try_advisory_lock"));
1661 assert!(
1662 s.put_value_with_lease
1663 .contains("\"test_schema\".\"greptime_metakv\"")
1664 );
1665 assert!(
1666 s.update_value_with_lease
1667 .contains("\"test_schema\".\"greptime_metakv\"")
1668 );
1669 assert!(
1670 s.get_value_with_lease
1671 .contains("\"test_schema\".\"greptime_metakv\"")
1672 );
1673 assert!(
1674 s.get_value_with_lease_by_prefix
1675 .contains("\"test_schema\".\"greptime_metakv\"")
1676 );
1677 assert!(
1678 s.delete_value
1679 .contains("\"test_schema\".\"greptime_metakv\"")
1680 );
1681 }
1682}