1use std::sync::Arc;
16use std::sync::atomic::{AtomicBool, Ordering};
17use std::time::Duration;
18
19use common_telemetry::{error, info, warn};
20use common_time::Timestamp;
21use deadpool_postgres::{Manager, Pool};
22use snafu::{OptionExt, ResultExt, ensure};
23use tokio::sync::{RwLock, broadcast};
24use tokio::time::MissedTickBehavior;
25use tokio_postgres::Row;
26use tokio_postgres::types::ToSql;
27
28use crate::election::rds::{LEASE_SEP, Lease, RdsLeaderKey, parse_value_and_expire_time};
29use crate::election::{
30 Election, ElectionRef, LeaderChangeMessage, LeaderValue, MetasrvNodeInfo, listen_leader_change,
31 send_leader_change_and_set_flags,
32};
33use crate::error::{
34 DeserializeFromJsonSnafu, ElectionNoLeaderSnafu, GetPostgresClientSnafu,
35 PostgresExecutionSnafu, Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu,
36 UnexpectedSnafu,
37};
38use crate::key::{CANDIDATES_ROOT, ELECTION_KEY};
39
40struct ElectionSqlFactory<'a> {
41 lock_id: u64,
42 schema_name: Option<&'a str>,
43 table_name: &'a str,
44}
45
46struct ElectionSqlSet {
47 campaign: String,
48 step_down: String,
49 put_value_with_lease: String,
59 update_value_with_lease: String,
67 get_value_with_lease: String,
72 get_value_with_lease_by_prefix: String,
81 delete_value: String,
90}
91
92impl<'a> ElectionSqlFactory<'a> {
93 fn new(lock_id: u64, schema_name: Option<&'a str>, table_name: &'a str) -> Self {
94 Self {
95 lock_id,
96 schema_name,
97 table_name,
98 }
99 }
100
101 fn table_ident(&self) -> String {
102 match self.schema_name {
103 Some(s) if !s.is_empty() => format!("\"{}\".\"{}\"", s, self.table_name),
104 _ => format!("\"{}\"", self.table_name),
105 }
106 }
107
108 fn build(self) -> ElectionSqlSet {
109 ElectionSqlSet {
110 campaign: self.campaign_sql(),
111 step_down: self.step_down_sql(),
112 put_value_with_lease: self.put_value_with_lease_sql(),
113 update_value_with_lease: self.update_value_with_lease_sql(),
114 get_value_with_lease: self.get_value_with_lease_sql(),
115 get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
116 delete_value: self.delete_value_sql(),
117 }
118 }
119
120 fn campaign_sql(&self) -> String {
121 format!("SELECT pg_try_advisory_lock({})", self.lock_id)
122 }
123
124 fn step_down_sql(&self) -> String {
125 format!("SELECT pg_advisory_unlock({})", self.lock_id)
126 }
127
128 fn put_value_with_lease_sql(&self) -> String {
129 let table = self.table_ident();
130 format!(
131 r#"WITH prev AS (
132 SELECT k, v FROM {table} WHERE k = $1
133 ), insert AS (
134 INSERT INTO {table}
135 VALUES($1, convert_to($2 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
136 ON CONFLICT (k) DO NOTHING
137 )
138 SELECT k, v FROM prev;
139 "#,
140 table = table,
141 lease_sep = LEASE_SEP
142 )
143 }
144
145 fn update_value_with_lease_sql(&self) -> String {
146 let table = self.table_ident();
147 format!(
148 r#"UPDATE {table}
149 SET v = convert_to($3 || '{lease_sep}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
150 WHERE k = $1 AND v = $2"#,
151 table = table,
152 lease_sep = LEASE_SEP
153 )
154 }
155
156 fn get_value_with_lease_sql(&self) -> String {
157 let table = self.table_ident();
158 format!(
159 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k = $1"#,
160 table = table
161 )
162 }
163
164 fn get_value_with_lease_by_prefix_sql(&self) -> String {
165 let table = self.table_ident();
166 format!(
167 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM {table} WHERE k LIKE $1"#,
168 table = table
169 )
170 }
171
172 fn delete_value_sql(&self) -> String {
173 let table = self.table_ident();
174 format!(
175 "DELETE FROM {table} WHERE k = $1 RETURNING k,v;",
176 table = table
177 )
178 }
179}
180
181pub struct ElectionPgClient {
183 current: Option<deadpool::managed::Object<Manager>>,
184 pool: Pool,
185 execution_timeout: Duration,
190
191 idle_session_timeout: Duration,
196
197 statement_timeout: Duration,
202}
203
204impl ElectionPgClient {
205 pub fn new(
206 pool: Pool,
207 execution_timeout: Duration,
208 idle_session_timeout: Duration,
209 statement_timeout: Duration,
210 ) -> Result<ElectionPgClient> {
211 Ok(ElectionPgClient {
212 current: None,
213 pool,
214 execution_timeout,
215 idle_session_timeout,
216 statement_timeout,
217 })
218 }
219
220 fn set_idle_session_timeout_sql(&self) -> String {
221 format!(
222 "SET idle_session_timeout = '{}s';",
223 self.idle_session_timeout.as_secs()
224 )
225 }
226
227 fn set_statement_timeout_sql(&self) -> String {
228 format!(
229 "SET statement_timeout = '{}s';",
230 self.statement_timeout.as_secs()
231 )
232 }
233
234 async fn reset_client(&mut self) -> Result<()> {
235 if let Some(client) = self.current.take() {
236 let inner = deadpool::managed::Object::<deadpool_postgres::Manager>::take(client);
239 drop(inner);
240 }
241 self.maybe_init_client().await
242 }
243
244 async fn maybe_init_client(&mut self) -> Result<()> {
245 if self.current.is_none() {
246 let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
247
248 self.current = Some(client);
249 let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
251 self.execute(&idle_session_timeout_sql, &[]).await?;
252 let statement_timeout_sql = self.set_statement_timeout_sql();
253 self.execute(&statement_timeout_sql, &[]).await?;
254 }
255
256 Ok(())
257 }
258
259 async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
264 let result = tokio::time::timeout(
265 self.execution_timeout,
266 self.current.as_ref().unwrap().execute(sql, params),
267 )
268 .await
269 .map_err(|_| {
270 SqlExecutionTimeoutSnafu {
271 sql: sql.to_string(),
272 duration: self.execution_timeout,
273 }
274 .build()
275 })?;
276
277 result.context(PostgresExecutionSnafu { sql })
278 }
279
280 async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
285 let result = tokio::time::timeout(
286 self.execution_timeout,
287 self.current.as_ref().unwrap().query(sql, params),
288 )
289 .await
290 .map_err(|_| {
291 SqlExecutionTimeoutSnafu {
292 sql: sql.to_string(),
293 duration: self.execution_timeout,
294 }
295 .build()
296 })?;
297
298 result.context(PostgresExecutionSnafu { sql })
299 }
300}
301
302pub struct PgElection {
304 leader_value: String,
305 pg_client: RwLock<ElectionPgClient>,
306 is_leader: AtomicBool,
307 leader_infancy: AtomicBool,
308 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
309 store_key_prefix: String,
310 candidate_lease_ttl: Duration,
311 meta_lease_ttl: Duration,
312 sql_set: ElectionSqlSet,
313}
314
315impl PgElection {
316 async fn maybe_init_client(&self) -> Result<()> {
317 if self.pg_client.read().await.current.is_none() {
318 self.pg_client.write().await.maybe_init_client().await?;
319 }
320
321 Ok(())
322 }
323
324 #[allow(clippy::too_many_arguments)]
325 pub async fn with_pg_client(
326 leader_value: String,
327 pg_client: ElectionPgClient,
328 store_key_prefix: String,
329 candidate_lease_ttl: Duration,
330 meta_lease_ttl: Duration,
331 schema_name: Option<&str>,
332 table_name: &str,
333 lock_id: u64,
334 ) -> Result<ElectionRef> {
335 if let Some(s) = schema_name {
336 common_telemetry::info!("PgElection uses schema: {}", s);
337 } else {
338 common_telemetry::info!("PgElection uses default search_path (no schema provided)");
339 }
340 let sql_factory = ElectionSqlFactory::new(lock_id, schema_name, table_name);
341
342 let tx = listen_leader_change(leader_value.clone());
343 Ok(Arc::new(Self {
344 leader_value,
345 pg_client: RwLock::new(pg_client),
346 is_leader: AtomicBool::new(false),
347 leader_infancy: AtomicBool::new(false),
348 leader_watcher: tx,
349 store_key_prefix,
350 candidate_lease_ttl,
351 meta_lease_ttl,
352 sql_set: sql_factory.build(),
353 }))
354 }
355
356 fn election_key(&self) -> String {
357 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
358 }
359
360 fn candidate_root(&self) -> String {
361 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
362 }
363
364 fn candidate_key(&self) -> String {
365 format!("{}{}", self.candidate_root(), self.leader_value)
366 }
367}
368
369#[async_trait::async_trait]
370impl Election for PgElection {
371 type Leader = LeaderValue;
372
373 fn is_leader(&self) -> bool {
374 self.is_leader.load(Ordering::Relaxed)
375 }
376
377 fn in_leader_infancy(&self) -> bool {
378 self.leader_infancy
379 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
380 .is_ok()
381 }
382
383 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
384 let key = self.candidate_key();
385 let node_info =
386 serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
387 input: format!("{node_info:?}"),
388 })?;
389 let res = self
390 .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
391 .await?;
392 if !res {
394 self.delete_value(&key).await?;
395 self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
396 .await?;
397 }
398
399 let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
401 loop {
402 let _ = keep_alive_interval.tick().await;
403
404 let lease = self
405 .get_value_with_lease(&key)
406 .await?
407 .context(UnexpectedSnafu {
408 err_msg: format!("Failed to get lease for key: {:?}", key),
409 })?;
410
411 ensure!(
412 lease.expire_time > lease.current,
413 UnexpectedSnafu {
414 err_msg: format!(
415 "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
416 lease.expire_time, lease.current, key
417 ),
418 }
419 );
420
421 self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
423 .await?;
424 }
425 }
426
427 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
428 let key_prefix = self.candidate_root();
429 let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
430 candidates.retain(|c| c.1 > current);
432 let mut valid_candidates = Vec::with_capacity(candidates.len());
433 for (c, _) in candidates {
434 let node_info: MetasrvNodeInfo =
435 serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
436 input: format!("{:?}", c),
437 })?;
438 valid_candidates.push(node_info);
439 }
440 Ok(valid_candidates)
441 }
442
443 async fn campaign(&self) -> Result<()> {
456 let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
457 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
458
459 self.maybe_init_client().await?;
460 loop {
461 let res = self
462 .pg_client
463 .read()
464 .await
465 .query(&self.sql_set.campaign, &[])
466 .await?;
467 let row = res.first().context(UnexpectedSnafu {
468 err_msg: "Failed to get the result of acquiring advisory lock".to_string(),
469 })?;
470 let is_leader = row.try_get(0).map_err(|_| {
471 UnexpectedSnafu {
472 err_msg: "Failed to get the result of get lock".to_string(),
473 }
474 .build()
475 })?;
476 if is_leader {
477 self.leader_action().await?;
478 } else {
479 self.follower_action().await?;
480 }
481 let _ = keep_alive_interval.tick().await;
482 }
483 }
484
485 async fn reset_campaign(&self) {
486 info!("Resetting campaign");
487 if self.is_leader.load(Ordering::Relaxed) {
488 if let Err(err) = self.step_down_without_lock().await {
489 error!(err; "Failed to step down without lock");
490 }
491 info!("Step down without lock successfully, due to reset campaign");
492 }
493 if let Err(err) = self.pg_client.write().await.reset_client().await {
494 error!(err; "Failed to reset client");
495 }
496 }
497
498 async fn leader(&self) -> Result<Self::Leader> {
499 if self.is_leader.load(Ordering::Relaxed) {
500 Ok(self.leader_value.as_bytes().into())
501 } else {
502 let key = self.election_key();
503 if let Some(lease) = self.get_value_with_lease(&key).await? {
504 ensure!(lease.expire_time > lease.current, ElectionNoLeaderSnafu);
505 Ok(lease.leader_value.as_bytes().into())
506 } else {
507 ElectionNoLeaderSnafu.fail()
508 }
509 }
510 }
511
512 async fn resign(&self) -> Result<()> {
513 todo!()
514 }
515
516 fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
517 self.leader_watcher.subscribe()
518 }
519}
520
521impl PgElection {
522 async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
524 let key = key.as_bytes();
525 self.maybe_init_client().await?;
526 let res = self
527 .pg_client
528 .read()
529 .await
530 .query(&self.sql_set.get_value_with_lease, &[&key])
531 .await?;
532
533 if res.is_empty() {
534 Ok(None)
535 } else {
536 let current_time_str = res[0].try_get(1).unwrap_or_default();
538 let current_time = match Timestamp::from_str(current_time_str, None) {
539 Ok(ts) => ts,
540 Err(_) => UnexpectedSnafu {
541 err_msg: format!("Invalid timestamp: {}", current_time_str),
542 }
543 .fail()?,
544 };
545 let value_and_expire_time =
547 String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
548 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
549
550 Ok(Some(Lease {
551 leader_value: value,
552 expire_time,
553 current: current_time,
554 origin: value_and_expire_time.to_string(),
555 }))
556 }
557 }
558
559 async fn get_value_with_lease_by_prefix(
561 &self,
562 key_prefix: &str,
563 ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
564 let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
565 self.maybe_init_client().await?;
566 let res = self
567 .pg_client
568 .read()
569 .await
570 .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
571 .await?;
572
573 let mut values_with_leases = vec![];
574 let mut current = Timestamp::default();
575 for row in res {
576 let current_time_str = row.try_get(1).unwrap_or_default();
577 current = match Timestamp::from_str(current_time_str, None) {
578 Ok(ts) => ts,
579 Err(_) => UnexpectedSnafu {
580 err_msg: format!("Invalid timestamp: {}", current_time_str),
581 }
582 .fail()?,
583 };
584
585 let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
586 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
587
588 values_with_leases.push((value, expire_time));
589 }
590 Ok((values_with_leases, current))
591 }
592
593 async fn update_value_with_lease(
594 &self,
595 key: &str,
596 prev: &str,
597 updated: &str,
598 lease_ttl: Duration,
599 ) -> Result<()> {
600 let key = key.as_bytes();
601 let prev = prev.as_bytes();
602 self.maybe_init_client().await?;
603 let lease_ttl_secs = lease_ttl.as_secs() as f64;
604 let res = self
605 .pg_client
606 .read()
607 .await
608 .execute(
609 &self.sql_set.update_value_with_lease,
610 &[&key, &prev, &updated, &lease_ttl_secs],
611 )
612 .await?;
613
614 ensure!(
615 res == 1,
616 UnexpectedSnafu {
617 err_msg: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
618 }
619 );
620
621 Ok(())
622 }
623
624 async fn put_value_with_lease(
626 &self,
627 key: &str,
628 value: &str,
629 lease_ttl: Duration,
630 ) -> Result<bool> {
631 let key = key.as_bytes();
632 let lease_ttl_secs = lease_ttl.as_secs() as f64;
633 let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
634 self.maybe_init_client().await?;
635 let res = self
636 .pg_client
637 .read()
638 .await
639 .query(&self.sql_set.put_value_with_lease, ¶ms)
640 .await?;
641 Ok(res.is_empty())
642 }
643
644 async fn delete_value(&self, key: &str) -> Result<bool> {
647 let key = key.as_bytes();
648 self.maybe_init_client().await?;
649 let res = self
650 .pg_client
651 .read()
652 .await
653 .query(&self.sql_set.delete_value, &[&key])
654 .await?;
655
656 Ok(res.len() == 1)
657 }
658
659 async fn leader_action(&self) -> Result<()> {
680 let key = self.election_key();
681 if self.is_leader() {
683 match self.get_value_with_lease(&key).await? {
684 Some(lease) => {
685 match (
686 lease.leader_value == self.leader_value,
687 lease.expire_time > lease.current,
688 ) {
689 (true, true) => {
691 self.update_value_with_lease(
693 &key,
694 &lease.origin,
695 &self.leader_value,
696 self.meta_lease_ttl,
697 )
698 .await?;
699 }
700 (true, false) => {
702 warn!("Leader lease expired, now stepping down.");
703 self.step_down().await?;
704 }
705 (false, _) => {
707 warn!(
708 "Leader lease not found, but still hold the lock. Now stepping down."
709 );
710 self.step_down().await?;
711 }
712 }
713 }
714 None => {
716 warn!("Leader lease not found, but still hold the lock. Now stepping down.");
717 self.step_down().await?;
718 }
719 }
720 } else {
722 self.elected().await?;
723 }
724 Ok(())
725 }
726
727 async fn follower_action(&self) -> Result<()> {
738 let key = self.election_key();
739 if self.is_leader() {
741 self.step_down_without_lock().await?;
742 }
743 let lease = self
744 .get_value_with_lease(&key)
745 .await?
746 .context(ElectionNoLeaderSnafu)?;
747 ensure!(lease.expire_time > lease.current, ElectionNoLeaderSnafu);
749 Ok(())
751 }
752
753 async fn step_down(&self) -> Result<()> {
760 let key = self.election_key();
761 let leader_key = RdsLeaderKey {
762 name: self.leader_value.clone().into_bytes(),
763 key: key.clone().into_bytes(),
764 ..Default::default()
765 };
766 self.delete_value(&key).await?;
767 self.maybe_init_client().await?;
768 self.pg_client
769 .read()
770 .await
771 .query(&self.sql_set.step_down, &[])
772 .await?;
773 send_leader_change_and_set_flags(
774 &self.is_leader,
775 &self.leader_infancy,
776 &self.leader_watcher,
777 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
778 );
779 Ok(())
780 }
781
782 async fn step_down_without_lock(&self) -> Result<()> {
784 let key = self.election_key().into_bytes();
785 let leader_key = RdsLeaderKey {
786 name: self.leader_value.clone().into_bytes(),
787 key: key.clone(),
788 ..Default::default()
789 };
790 send_leader_change_and_set_flags(
791 &self.is_leader,
792 &self.leader_infancy,
793 &self.leader_watcher,
794 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
795 );
796 Ok(())
797 }
798
799 async fn elected(&self) -> Result<()> {
802 let key = self.election_key();
803 let leader_key = RdsLeaderKey {
804 name: self.leader_value.clone().into_bytes(),
805 key: key.clone().into_bytes(),
806 ..Default::default()
807 };
808 self.delete_value(&key).await?;
809 self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
810 .await?;
811
812 if self
813 .is_leader
814 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
815 .is_ok()
816 {
817 self.leader_infancy.store(true, Ordering::Release);
818
819 if let Err(e) = self
820 .leader_watcher
821 .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
822 {
823 error!(e; "Failed to send leader change message");
824 }
825 }
826 Ok(())
827 }
828}
829
830#[cfg(test)]
831mod tests {
832 use std::{assert_matches, env};
833
834 use deadpool_postgres::{Config, Runtime};
835 use tokio_postgres::NoTls;
836
837 use super::*;
838 use crate::{error, maybe_skip_postgres_integration_test};
839
840 async fn create_postgres_client(
841 table_name: Option<&str>,
842 execution_timeout: Duration,
843 idle_session_timeout: Duration,
844 statement_timeout: Duration,
845 ) -> Result<ElectionPgClient> {
846 let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
847 if endpoint.is_empty() {
848 return UnexpectedSnafu {
849 err_msg: "Postgres endpoint is empty".to_string(),
850 }
851 .fail();
852 }
853 let mut cfg = Config::new();
854 cfg.url = Some(endpoint);
855 let pool = cfg.create_pool(Some(Runtime::Tokio1), NoTls).unwrap();
856 let mut pg_client = ElectionPgClient::new(
857 pool,
858 execution_timeout,
859 idle_session_timeout,
860 statement_timeout,
861 )
862 .unwrap();
863 pg_client.maybe_init_client().await?;
864 if let Some(table_name) = table_name {
865 let create_table_sql = format!(
866 "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
867 table_name
868 );
869 pg_client.execute(&create_table_sql, &[]).await?;
870 }
871 Ok(pg_client)
872 }
873
874 async fn drop_table(pg_election: &PgElection, table_name: &str) {
875 let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
876 pg_election
877 .pg_client
878 .read()
879 .await
880 .execute(&sql, &[])
881 .await
882 .unwrap();
883 }
884
885 #[tokio::test]
886 async fn test_postgres_crud() {
887 maybe_skip_postgres_integration_test!();
888 let key = "test_key".to_string();
889 let value = "test_value".to_string();
890
891 let uuid = uuid::Uuid::new_v4().to_string();
892 let table_name = "test_postgres_crud_greptime_metakv";
893 let candidate_lease_ttl = Duration::from_secs(10);
894 let execution_timeout = Duration::from_secs(10);
895 let statement_timeout = Duration::from_secs(10);
896 let meta_lease_ttl = Duration::from_secs(2);
897 let idle_session_timeout = Duration::from_secs(0);
898 let client = create_postgres_client(
899 Some(table_name),
900 execution_timeout,
901 idle_session_timeout,
902 statement_timeout,
903 )
904 .await
905 .unwrap();
906
907 let (tx, _) = broadcast::channel(100);
908 let pg_election = PgElection {
909 leader_value: "test_leader".to_string(),
910 pg_client: RwLock::new(client),
911 is_leader: AtomicBool::new(false),
912 leader_infancy: AtomicBool::new(true),
913 leader_watcher: tx,
914 store_key_prefix: uuid,
915 candidate_lease_ttl,
916 meta_lease_ttl,
917 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
918 };
919
920 let res = pg_election
921 .put_value_with_lease(&key, &value, candidate_lease_ttl)
922 .await
923 .unwrap();
924 assert!(res);
925
926 let lease = pg_election
927 .get_value_with_lease(&key)
928 .await
929 .unwrap()
930 .unwrap();
931 assert_eq!(lease.leader_value, value);
932
933 pg_election
934 .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
935 .await
936 .unwrap();
937
938 let res = pg_election.delete_value(&key).await.unwrap();
939 assert!(res);
940
941 let res = pg_election.get_value_with_lease(&key).await.unwrap();
942 assert!(res.is_none());
943
944 for i in 0..10 {
945 let key = format!("test_key_{}", i);
946 let value = format!("test_value_{}", i);
947 pg_election
948 .put_value_with_lease(&key, &value, candidate_lease_ttl)
949 .await
950 .unwrap();
951 }
952
953 let key_prefix = "test_key".to_string();
954 let (res, _) = pg_election
955 .get_value_with_lease_by_prefix(&key_prefix)
956 .await
957 .unwrap();
958 assert_eq!(res.len(), 10);
959
960 for i in 0..10 {
961 let key = format!("test_key_{}", i);
962 let res = pg_election.delete_value(&key).await.unwrap();
963 assert!(res);
964 }
965
966 let (res, current) = pg_election
967 .get_value_with_lease_by_prefix(&key_prefix)
968 .await
969 .unwrap();
970 assert!(res.is_empty());
971 assert!(current == Timestamp::default());
972
973 drop_table(&pg_election, table_name).await;
974 }
975
976 async fn candidate(
977 leader_value: String,
978 candidate_lease_ttl: Duration,
979 store_key_prefix: String,
980 table_name: String,
981 ) {
982 let execution_timeout = Duration::from_secs(10);
983 let statement_timeout = Duration::from_secs(10);
984 let meta_lease_ttl = Duration::from_secs(2);
985 let idle_session_timeout = Duration::from_secs(0);
986 let client = create_postgres_client(
987 None,
988 execution_timeout,
989 idle_session_timeout,
990 statement_timeout,
991 )
992 .await
993 .unwrap();
994
995 let (tx, _) = broadcast::channel(100);
996 let pg_election = PgElection {
997 leader_value,
998 pg_client: RwLock::new(client),
999 is_leader: AtomicBool::new(false),
1000 leader_infancy: AtomicBool::new(true),
1001 leader_watcher: tx,
1002 store_key_prefix,
1003 candidate_lease_ttl,
1004 meta_lease_ttl,
1005 sql_set: ElectionSqlFactory::new(28319, None, &table_name).build(),
1006 };
1007
1008 let node_info = MetasrvNodeInfo {
1009 addr: "test_addr".to_string(),
1010 version: "test_version".to_string(),
1011 git_commit: "test_git_commit".to_string(),
1012 start_time_ms: 0,
1013 total_cpu_millicores: 0,
1014 total_memory_bytes: 0,
1015 cpu_usage_millicores: 0,
1016 memory_usage_bytes: 0,
1017 hostname: "test_hostname".to_string(),
1018 };
1019 pg_election.register_candidate(&node_info).await.unwrap();
1020 }
1021
1022 #[tokio::test]
1023 async fn test_candidate_registration() {
1024 maybe_skip_postgres_integration_test!();
1025 let leader_value_prefix = "test_leader".to_string();
1026 let uuid = uuid::Uuid::new_v4().to_string();
1027 let table_name = "test_candidate_registration_greptime_metakv";
1028 let mut handles = vec![];
1029 let candidate_lease_ttl = Duration::from_secs(5);
1030 let execution_timeout = Duration::from_secs(10);
1031 let statement_timeout = Duration::from_secs(10);
1032 let meta_lease_ttl = Duration::from_secs(2);
1033 let idle_session_timeout = Duration::from_secs(0);
1034 let client = create_postgres_client(
1035 Some(table_name),
1036 execution_timeout,
1037 idle_session_timeout,
1038 statement_timeout,
1039 )
1040 .await
1041 .unwrap();
1042
1043 for i in 0..10 {
1044 let leader_value = format!("{}{}", leader_value_prefix, i);
1045 let handle = tokio::spawn(candidate(
1046 leader_value,
1047 candidate_lease_ttl,
1048 uuid.clone(),
1049 table_name.to_string(),
1050 ));
1051 handles.push(handle);
1052 }
1053 tokio::time::sleep(Duration::from_secs(3)).await;
1055
1056 let (tx, _) = broadcast::channel(100);
1057 let leader_value = "test_leader".to_string();
1058 let pg_election = PgElection {
1059 leader_value,
1060 pg_client: RwLock::new(client),
1061 is_leader: AtomicBool::new(false),
1062 leader_infancy: AtomicBool::new(true),
1063 leader_watcher: tx,
1064 store_key_prefix: uuid.clone(),
1065 candidate_lease_ttl,
1066 meta_lease_ttl,
1067 sql_set: ElectionSqlFactory::new(28319, None, table_name).build(),
1068 };
1069
1070 let candidates = pg_election.all_candidates().await.unwrap();
1071 assert_eq!(candidates.len(), 10);
1072
1073 for handle in handles {
1074 handle.abort();
1075 }
1076
1077 tokio::time::sleep(Duration::from_secs(5)).await;
1079 let candidates = pg_election.all_candidates().await.unwrap();
1080 assert!(candidates.is_empty());
1081
1082 for i in 0..10 {
1084 let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1085 let res = pg_election.delete_value(&key).await.unwrap();
1086 assert!(res);
1087 }
1088
1089 drop_table(&pg_election, table_name).await;
1090 }
1091
1092 #[tokio::test]
1093 async fn test_elected_and_step_down() {
1094 maybe_skip_postgres_integration_test!();
1095 let leader_value = "test_leader".to_string();
1096 let uuid = uuid::Uuid::new_v4().to_string();
1097 let table_name = "test_elected_and_step_down_greptime_metakv";
1098 let candidate_lease_ttl = Duration::from_secs(5);
1099 let execution_timeout = Duration::from_secs(10);
1100 let statement_timeout = Duration::from_secs(10);
1101 let meta_lease_ttl = Duration::from_secs(2);
1102 let idle_session_timeout = Duration::from_secs(0);
1103 let client = create_postgres_client(
1104 Some(table_name),
1105 execution_timeout,
1106 idle_session_timeout,
1107 statement_timeout,
1108 )
1109 .await
1110 .unwrap();
1111
1112 let (tx, mut rx) = broadcast::channel(100);
1113 let leader_pg_election = PgElection {
1114 leader_value: leader_value.clone(),
1115 pg_client: RwLock::new(client),
1116 is_leader: AtomicBool::new(false),
1117 leader_infancy: AtomicBool::new(true),
1118 leader_watcher: tx,
1119 store_key_prefix: uuid,
1120 candidate_lease_ttl,
1121 meta_lease_ttl,
1122 sql_set: ElectionSqlFactory::new(28320, None, table_name).build(),
1123 };
1124
1125 leader_pg_election.elected().await.unwrap();
1126 let lease = leader_pg_election
1127 .get_value_with_lease(&leader_pg_election.election_key())
1128 .await
1129 .unwrap()
1130 .unwrap();
1131 assert!(lease.leader_value == leader_value);
1132 assert!(lease.expire_time > lease.current);
1133 assert!(leader_pg_election.is_leader());
1134
1135 match rx.recv().await {
1136 Ok(LeaderChangeMessage::Elected(key)) => {
1137 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1138 assert_eq!(
1139 String::from_utf8_lossy(key.key()),
1140 leader_pg_election.election_key()
1141 );
1142 assert_eq!(key.lease_id(), i64::default());
1143 assert_eq!(key.revision(), i64::default());
1144 }
1145 _ => panic!("Expected LeaderChangeMessage::Elected"),
1146 }
1147
1148 leader_pg_election.step_down_without_lock().await.unwrap();
1149 let lease = leader_pg_election
1150 .get_value_with_lease(&leader_pg_election.election_key())
1151 .await
1152 .unwrap()
1153 .unwrap();
1154 assert!(lease.leader_value == leader_value);
1155 assert!(!leader_pg_election.is_leader());
1156
1157 match rx.recv().await {
1158 Ok(LeaderChangeMessage::StepDown(key)) => {
1159 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1160 assert_eq!(
1161 String::from_utf8_lossy(key.key()),
1162 leader_pg_election.election_key()
1163 );
1164 assert_eq!(key.lease_id(), i64::default());
1165 assert_eq!(key.revision(), i64::default());
1166 }
1167 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1168 }
1169
1170 leader_pg_election.elected().await.unwrap();
1171 let lease = leader_pg_election
1172 .get_value_with_lease(&leader_pg_election.election_key())
1173 .await
1174 .unwrap()
1175 .unwrap();
1176 assert!(lease.leader_value == leader_value);
1177 assert!(lease.expire_time > lease.current);
1178 assert!(leader_pg_election.is_leader());
1179
1180 match rx.recv().await {
1181 Ok(LeaderChangeMessage::Elected(key)) => {
1182 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1183 assert_eq!(
1184 String::from_utf8_lossy(key.key()),
1185 leader_pg_election.election_key()
1186 );
1187 assert_eq!(key.lease_id(), i64::default());
1188 assert_eq!(key.revision(), i64::default());
1189 }
1190 _ => panic!("Expected LeaderChangeMessage::Elected"),
1191 }
1192
1193 leader_pg_election.step_down().await.unwrap();
1194 let res = leader_pg_election
1195 .get_value_with_lease(&leader_pg_election.election_key())
1196 .await
1197 .unwrap();
1198 assert!(res.is_none());
1199 assert!(!leader_pg_election.is_leader());
1200
1201 match rx.recv().await {
1202 Ok(LeaderChangeMessage::StepDown(key)) => {
1203 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1204 assert_eq!(
1205 String::from_utf8_lossy(key.key()),
1206 leader_pg_election.election_key()
1207 );
1208 assert_eq!(key.lease_id(), i64::default());
1209 assert_eq!(key.revision(), i64::default());
1210 }
1211 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1212 }
1213
1214 drop_table(&leader_pg_election, table_name).await;
1215 }
1216
1217 #[tokio::test]
1218 async fn test_leader_action() {
1219 maybe_skip_postgres_integration_test!();
1220 let leader_value = "test_leader".to_string();
1221 let uuid = uuid::Uuid::new_v4().to_string();
1222 let table_name = "test_leader_action_greptime_metakv";
1223 let candidate_lease_ttl = Duration::from_secs(5);
1224 let execution_timeout = Duration::from_secs(10);
1225 let statement_timeout = Duration::from_secs(10);
1226 let meta_lease_ttl = Duration::from_secs(2);
1227 let idle_session_timeout = Duration::from_secs(0);
1228 let client = create_postgres_client(
1229 Some(table_name),
1230 execution_timeout,
1231 idle_session_timeout,
1232 statement_timeout,
1233 )
1234 .await
1235 .unwrap();
1236
1237 let (tx, mut rx) = broadcast::channel(100);
1238 let leader_pg_election = PgElection {
1239 leader_value: leader_value.clone(),
1240 pg_client: RwLock::new(client),
1241 is_leader: AtomicBool::new(false),
1242 leader_infancy: AtomicBool::new(true),
1243 leader_watcher: tx,
1244 store_key_prefix: uuid,
1245 candidate_lease_ttl,
1246 meta_lease_ttl,
1247 sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1248 };
1249
1250 let res = leader_pg_election
1252 .pg_client
1253 .read()
1254 .await
1255 .query(&leader_pg_election.sql_set.campaign, &[])
1256 .await
1257 .unwrap();
1258 let res: bool = res[0].get(0);
1259 assert!(res);
1260 leader_pg_election.leader_action().await.unwrap();
1261 let lease = leader_pg_election
1262 .get_value_with_lease(&leader_pg_election.election_key())
1263 .await
1264 .unwrap()
1265 .unwrap();
1266 assert!(lease.leader_value == leader_value);
1267 assert!(lease.expire_time > lease.current);
1268 assert!(leader_pg_election.is_leader());
1269
1270 match rx.recv().await {
1271 Ok(LeaderChangeMessage::Elected(key)) => {
1272 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1273 assert_eq!(
1274 String::from_utf8_lossy(key.key()),
1275 leader_pg_election.election_key()
1276 );
1277 assert_eq!(key.lease_id(), i64::default());
1278 assert_eq!(key.revision(), i64::default());
1279 }
1280 _ => panic!("Expected LeaderChangeMessage::Elected"),
1281 }
1282
1283 let res = leader_pg_election
1285 .pg_client
1286 .read()
1287 .await
1288 .query(&leader_pg_election.sql_set.campaign, &[])
1289 .await
1290 .unwrap();
1291 let res: bool = res[0].get(0);
1292 assert!(res);
1293 leader_pg_election.leader_action().await.unwrap();
1294 let new_lease = leader_pg_election
1295 .get_value_with_lease(&leader_pg_election.election_key())
1296 .await
1297 .unwrap()
1298 .unwrap();
1299 assert!(new_lease.leader_value == leader_value);
1300 assert!(
1301 new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1302 );
1303 assert!(leader_pg_election.is_leader());
1304
1305 tokio::time::sleep(Duration::from_secs(2)).await;
1307
1308 let res = leader_pg_election
1309 .pg_client
1310 .read()
1311 .await
1312 .query(&leader_pg_election.sql_set.campaign, &[])
1313 .await
1314 .unwrap();
1315 let res: bool = res[0].get(0);
1316 assert!(res);
1317 leader_pg_election.leader_action().await.unwrap();
1318 let res = leader_pg_election
1319 .get_value_with_lease(&leader_pg_election.election_key())
1320 .await
1321 .unwrap();
1322 assert!(res.is_none());
1323
1324 match rx.recv().await {
1325 Ok(LeaderChangeMessage::StepDown(key)) => {
1326 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1327 assert_eq!(
1328 String::from_utf8_lossy(key.key()),
1329 leader_pg_election.election_key()
1330 );
1331 assert_eq!(key.lease_id(), i64::default());
1332 assert_eq!(key.revision(), i64::default());
1333 }
1334 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1335 }
1336
1337 let res = leader_pg_election
1339 .pg_client
1340 .read()
1341 .await
1342 .query(&leader_pg_election.sql_set.campaign, &[])
1343 .await
1344 .unwrap();
1345 let res: bool = res[0].get(0);
1346 assert!(res);
1347 leader_pg_election.leader_action().await.unwrap();
1348 let lease = leader_pg_election
1349 .get_value_with_lease(&leader_pg_election.election_key())
1350 .await
1351 .unwrap()
1352 .unwrap();
1353 assert!(lease.leader_value == leader_value);
1354 assert!(lease.expire_time > lease.current);
1355 assert!(leader_pg_election.is_leader());
1356
1357 match rx.recv().await {
1358 Ok(LeaderChangeMessage::Elected(key)) => {
1359 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1360 assert_eq!(
1361 String::from_utf8_lossy(key.key()),
1362 leader_pg_election.election_key()
1363 );
1364 assert_eq!(key.lease_id(), i64::default());
1365 assert_eq!(key.revision(), i64::default());
1366 }
1367 _ => panic!("Expected LeaderChangeMessage::Elected"),
1368 }
1369
1370 leader_pg_election
1372 .delete_value(&leader_pg_election.election_key())
1373 .await
1374 .unwrap();
1375 leader_pg_election.leader_action().await.unwrap();
1376 let res = leader_pg_election
1377 .get_value_with_lease(&leader_pg_election.election_key())
1378 .await
1379 .unwrap();
1380 assert!(res.is_none());
1381 assert!(!leader_pg_election.is_leader());
1382
1383 match rx.recv().await {
1384 Ok(LeaderChangeMessage::StepDown(key)) => {
1385 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1386 assert_eq!(
1387 String::from_utf8_lossy(key.key()),
1388 leader_pg_election.election_key()
1389 );
1390 assert_eq!(key.lease_id(), i64::default());
1391 assert_eq!(key.revision(), i64::default());
1392 }
1393 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1394 }
1395
1396 let res = leader_pg_election
1398 .pg_client
1399 .read()
1400 .await
1401 .query(&leader_pg_election.sql_set.campaign, &[])
1402 .await
1403 .unwrap();
1404 let res: bool = res[0].get(0);
1405 assert!(res);
1406 leader_pg_election.leader_action().await.unwrap();
1407 let lease = leader_pg_election
1408 .get_value_with_lease(&leader_pg_election.election_key())
1409 .await
1410 .unwrap()
1411 .unwrap();
1412 assert!(lease.leader_value == leader_value);
1413 assert!(lease.expire_time > lease.current);
1414 assert!(leader_pg_election.is_leader());
1415
1416 match rx.recv().await {
1417 Ok(LeaderChangeMessage::Elected(key)) => {
1418 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1419 assert_eq!(
1420 String::from_utf8_lossy(key.key()),
1421 leader_pg_election.election_key()
1422 );
1423 assert_eq!(key.lease_id(), i64::default());
1424 assert_eq!(key.revision(), i64::default());
1425 }
1426 _ => panic!("Expected LeaderChangeMessage::Elected"),
1427 }
1428
1429 let res = leader_pg_election
1431 .pg_client
1432 .read()
1433 .await
1434 .query(&leader_pg_election.sql_set.campaign, &[])
1435 .await
1436 .unwrap();
1437 let res: bool = res[0].get(0);
1438 assert!(res);
1439 leader_pg_election
1440 .delete_value(&leader_pg_election.election_key())
1441 .await
1442 .unwrap();
1443 leader_pg_election
1444 .put_value_with_lease(
1445 &leader_pg_election.election_key(),
1446 "test",
1447 Duration::from_secs(10),
1448 )
1449 .await
1450 .unwrap();
1451 leader_pg_election.leader_action().await.unwrap();
1452 let res = leader_pg_election
1453 .get_value_with_lease(&leader_pg_election.election_key())
1454 .await
1455 .unwrap();
1456 assert!(res.is_none());
1457 assert!(!leader_pg_election.is_leader());
1458
1459 match rx.recv().await {
1460 Ok(LeaderChangeMessage::StepDown(key)) => {
1461 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1462 assert_eq!(
1463 String::from_utf8_lossy(key.key()),
1464 leader_pg_election.election_key()
1465 );
1466 assert_eq!(key.lease_id(), i64::default());
1467 assert_eq!(key.revision(), i64::default());
1468 }
1469 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1470 }
1471
1472 leader_pg_election
1474 .pg_client
1475 .read()
1476 .await
1477 .query(&leader_pg_election.sql_set.step_down, &[])
1478 .await
1479 .unwrap();
1480
1481 drop_table(&leader_pg_election, table_name).await;
1482 }
1483
1484 #[tokio::test]
1485 async fn test_follower_action() {
1486 maybe_skip_postgres_integration_test!();
1487 common_telemetry::init_default_ut_logging();
1488 let uuid = uuid::Uuid::new_v4().to_string();
1489 let table_name = "test_follower_action_greptime_metakv";
1490
1491 let candidate_lease_ttl = Duration::from_secs(5);
1492 let execution_timeout = Duration::from_secs(10);
1493 let statement_timeout = Duration::from_secs(10);
1494 let meta_lease_ttl = Duration::from_secs(2);
1495 let idle_session_timeout = Duration::from_secs(0);
1496 let follower_client = create_postgres_client(
1497 Some(table_name),
1498 execution_timeout,
1499 idle_session_timeout,
1500 statement_timeout,
1501 )
1502 .await
1503 .unwrap();
1504 let (tx, mut rx) = broadcast::channel(100);
1505 let follower_pg_election = PgElection {
1506 leader_value: "test_follower".to_string(),
1507 pg_client: RwLock::new(follower_client),
1508 is_leader: AtomicBool::new(false),
1509 leader_infancy: AtomicBool::new(true),
1510 leader_watcher: tx,
1511 store_key_prefix: uuid.clone(),
1512 candidate_lease_ttl,
1513 meta_lease_ttl,
1514 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1515 };
1516
1517 let leader_client = create_postgres_client(
1518 Some(table_name),
1519 execution_timeout,
1520 idle_session_timeout,
1521 statement_timeout,
1522 )
1523 .await
1524 .unwrap();
1525 let (tx, _) = broadcast::channel(100);
1526 let leader_pg_election = PgElection {
1527 leader_value: "test_leader".to_string(),
1528 pg_client: RwLock::new(leader_client),
1529 is_leader: AtomicBool::new(false),
1530 leader_infancy: AtomicBool::new(true),
1531 leader_watcher: tx,
1532 store_key_prefix: uuid,
1533 candidate_lease_ttl,
1534 meta_lease_ttl,
1535 sql_set: ElectionSqlFactory::new(28322, None, table_name).build(),
1536 };
1537
1538 leader_pg_election
1539 .pg_client
1540 .read()
1541 .await
1542 .query(&leader_pg_election.sql_set.campaign, &[])
1543 .await
1544 .unwrap();
1545 leader_pg_election.elected().await.unwrap();
1546
1547 follower_pg_election.follower_action().await.unwrap();
1549
1550 tokio::time::sleep(Duration::from_secs(2)).await;
1552 assert!(follower_pg_election.follower_action().await.is_err());
1553
1554 leader_pg_election
1556 .delete_value(&leader_pg_election.election_key())
1557 .await
1558 .unwrap();
1559 assert!(follower_pg_election.follower_action().await.is_err());
1560
1561 follower_pg_election
1563 .is_leader
1564 .store(true, Ordering::Relaxed);
1565 assert!(follower_pg_election.follower_action().await.is_err());
1566
1567 match rx.recv().await {
1568 Ok(LeaderChangeMessage::StepDown(key)) => {
1569 assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1570 assert_eq!(
1571 String::from_utf8_lossy(key.key()),
1572 follower_pg_election.election_key()
1573 );
1574 assert_eq!(key.lease_id(), i64::default());
1575 assert_eq!(key.revision(), i64::default());
1576 }
1577 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1578 }
1579
1580 leader_pg_election
1582 .pg_client
1583 .read()
1584 .await
1585 .query(&leader_pg_election.sql_set.step_down, &[])
1586 .await
1587 .unwrap();
1588
1589 drop_table(&follower_pg_election, table_name).await;
1590 }
1591
1592 #[tokio::test]
1593 async fn test_reset_campaign() {
1594 maybe_skip_postgres_integration_test!();
1595 let leader_value = "test_leader".to_string();
1596 let uuid = uuid::Uuid::new_v4().to_string();
1597 let table_name = "test_reset_campaign_greptime_metakv";
1598 let candidate_lease_ttl = Duration::from_secs(5);
1599 let execution_timeout = Duration::from_secs(10);
1600 let statement_timeout = Duration::from_secs(10);
1601 let meta_lease_ttl = Duration::from_secs(2);
1602 let idle_session_timeout = Duration::from_secs(0);
1603 let client = create_postgres_client(
1604 Some(table_name),
1605 execution_timeout,
1606 idle_session_timeout,
1607 statement_timeout,
1608 )
1609 .await
1610 .unwrap();
1611
1612 let (tx, _) = broadcast::channel(100);
1613 let leader_pg_election = PgElection {
1614 leader_value,
1615 pg_client: RwLock::new(client),
1616 is_leader: AtomicBool::new(false),
1617 leader_infancy: AtomicBool::new(true),
1618 leader_watcher: tx,
1619 store_key_prefix: uuid,
1620 candidate_lease_ttl,
1621 meta_lease_ttl,
1622 sql_set: ElectionSqlFactory::new(28321, None, table_name).build(),
1623 };
1624 leader_pg_election.is_leader.store(true, Ordering::Relaxed);
1625 leader_pg_election.reset_campaign().await;
1626 assert!(!leader_pg_election.is_leader());
1627 drop_table(&leader_pg_election, table_name).await;
1628 }
1629
1630 #[tokio::test]
1631 async fn test_idle_session_timeout() {
1632 maybe_skip_postgres_integration_test!();
1633 common_telemetry::init_default_ut_logging();
1634 let execution_timeout = Duration::from_secs(10);
1635 let statement_timeout = Duration::from_secs(10);
1636 let idle_session_timeout = Duration::from_secs(1);
1637 let mut client = create_postgres_client(
1638 None,
1639 execution_timeout,
1640 idle_session_timeout,
1641 statement_timeout,
1642 )
1643 .await
1644 .unwrap();
1645 tokio::time::sleep(Duration::from_millis(1100)).await;
1646 let err = client.query("SELECT 1", &[]).await.unwrap_err();
1648 assert_matches!(err, error::Error::PostgresExecution { .. });
1649 let error::Error::PostgresExecution { error, .. } = err else {
1650 panic!("Expected PostgresExecution error");
1651 };
1652 assert!(error.is_closed());
1653 client.reset_client().await.unwrap();
1655 let _ = client.query("SELECT 1", &[]).await.unwrap();
1656 }
1657
1658 #[test]
1659 fn test_election_sql_with_schema() {
1660 let f = ElectionSqlFactory::new(42, Some("test_schema"), "greptime_metakv");
1661 let s = f.build();
1662 assert!(s.campaign.contains("pg_try_advisory_lock"));
1663 assert!(
1664 s.put_value_with_lease
1665 .contains("\"test_schema\".\"greptime_metakv\"")
1666 );
1667 assert!(
1668 s.update_value_with_lease
1669 .contains("\"test_schema\".\"greptime_metakv\"")
1670 );
1671 assert!(
1672 s.get_value_with_lease
1673 .contains("\"test_schema\".\"greptime_metakv\"")
1674 );
1675 assert!(
1676 s.get_value_with_lease_by_prefix
1677 .contains("\"test_schema\".\"greptime_metakv\"")
1678 );
1679 assert!(
1680 s.delete_value
1681 .contains("\"test_schema\".\"greptime_metakv\"")
1682 );
1683 }
1684}