1use std::sync::atomic::{AtomicBool, Ordering};
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_telemetry::{error, warn};
20use common_time::Timestamp;
21use deadpool_postgres::{Manager, Pool};
22use snafu::{ensure, OptionExt, ResultExt};
23use tokio::sync::{broadcast, RwLock};
24use tokio::time::MissedTickBehavior;
25use tokio_postgres::types::ToSql;
26use tokio_postgres::Row;
27
28use crate::election::rds::{parse_value_and_expire_time, Lease, RdsLeaderKey, LEASE_SEP};
29use crate::election::{
30 listen_leader_change, send_leader_change_and_set_flags, Election, LeaderChangeMessage,
31 CANDIDATES_ROOT, ELECTION_KEY,
32};
33use crate::error::{
34 DeserializeFromJsonSnafu, GetPostgresClientSnafu, NoLeaderSnafu, PostgresExecutionSnafu,
35 Result, SerializeToJsonSnafu, SqlExecutionTimeoutSnafu, UnexpectedSnafu,
36};
37use crate::metasrv::{ElectionRef, LeaderValue, MetasrvNodeInfo};
38
39struct ElectionSqlFactory<'a> {
40 lock_id: u64,
41 table_name: &'a str,
42}
43
44struct ElectionSqlSet {
45 campaign: String,
46 step_down: String,
47 put_value_with_lease: String,
57 update_value_with_lease: String,
65 get_value_with_lease: String,
70 get_value_with_lease_by_prefix: String,
79 delete_value: String,
88}
89
90impl<'a> ElectionSqlFactory<'a> {
91 fn new(lock_id: u64, table_name: &'a str) -> Self {
92 Self {
93 lock_id,
94 table_name,
95 }
96 }
97
98 fn build(self) -> ElectionSqlSet {
99 ElectionSqlSet {
100 campaign: self.campaign_sql(),
101 step_down: self.step_down_sql(),
102 put_value_with_lease: self.put_value_with_lease_sql(),
103 update_value_with_lease: self.update_value_with_lease_sql(),
104 get_value_with_lease: self.get_value_with_lease_sql(),
105 get_value_with_lease_by_prefix: self.get_value_with_lease_by_prefix_sql(),
106 delete_value: self.delete_value_sql(),
107 }
108 }
109
110 fn campaign_sql(&self) -> String {
111 format!("SELECT pg_try_advisory_lock({})", self.lock_id)
112 }
113
114 fn step_down_sql(&self) -> String {
115 format!("SELECT pg_advisory_unlock({})", self.lock_id)
116 }
117
118 fn put_value_with_lease_sql(&self) -> String {
119 format!(
120 r#"WITH prev AS (
121 SELECT k, v FROM "{}" WHERE k = $1
122 ), insert AS (
123 INSERT INTO "{}"
124 VALUES($1, convert_to($2 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $3, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8'))
125 ON CONFLICT (k) DO NOTHING
126 )
127 SELECT k, v FROM prev;
128 "#,
129 self.table_name, self.table_name, LEASE_SEP
130 )
131 }
132
133 fn update_value_with_lease_sql(&self) -> String {
134 format!(
135 r#"UPDATE "{}"
136 SET v = convert_to($3 || '{}' || TO_CHAR(CURRENT_TIMESTAMP + INTERVAL '1 second' * $4, 'YYYY-MM-DD HH24:MI:SS.MS'), 'UTF8')
137 WHERE k = $1 AND v = $2"#,
138 self.table_name, LEASE_SEP
139 )
140 }
141
142 fn get_value_with_lease_sql(&self) -> String {
143 format!(
144 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k = $1"#,
145 self.table_name
146 )
147 }
148
149 fn get_value_with_lease_by_prefix_sql(&self) -> String {
150 format!(
151 r#"SELECT v, TO_CHAR(CURRENT_TIMESTAMP, 'YYYY-MM-DD HH24:MI:SS.MS') FROM "{}" WHERE k LIKE $1"#,
152 self.table_name
153 )
154 }
155
156 fn delete_value_sql(&self) -> String {
157 format!(
158 "DELETE FROM \"{}\" WHERE k = $1 RETURNING k,v;",
159 self.table_name
160 )
161 }
162}
163
164pub struct ElectionPgClient {
166 current: Option<deadpool::managed::Object<Manager>>,
167 pool: Pool,
168 execution_timeout: Duration,
173
174 idle_session_timeout: Duration,
179
180 statement_timeout: Duration,
185}
186
187impl ElectionPgClient {
188 pub fn new(
189 pool: Pool,
190 execution_timeout: Duration,
191 idle_session_timeout: Duration,
192 statement_timeout: Duration,
193 ) -> Result<ElectionPgClient> {
194 Ok(ElectionPgClient {
195 current: None,
196 pool,
197 execution_timeout,
198 idle_session_timeout,
199 statement_timeout,
200 })
201 }
202
203 fn set_idle_session_timeout_sql(&self) -> String {
204 format!(
205 "SET idle_session_timeout = '{}s';",
206 self.idle_session_timeout.as_secs()
207 )
208 }
209
210 fn set_statement_timeout_sql(&self) -> String {
211 format!(
212 "SET statement_timeout = '{}s';",
213 self.statement_timeout.as_secs()
214 )
215 }
216
217 async fn reset_client(&mut self) -> Result<()> {
218 self.current = None;
219 self.maybe_init_client().await
220 }
221
222 async fn maybe_init_client(&mut self) -> Result<()> {
223 if self.current.is_none() {
224 let client = self.pool.get().await.context(GetPostgresClientSnafu)?;
225
226 self.current = Some(client);
227 let idle_session_timeout_sql = self.set_idle_session_timeout_sql();
229 self.execute(&idle_session_timeout_sql, &[]).await?;
230 let statement_timeout_sql = self.set_statement_timeout_sql();
231 self.execute(&statement_timeout_sql, &[]).await?;
232 }
233
234 Ok(())
235 }
236
237 async fn execute(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<u64> {
242 let result = tokio::time::timeout(
243 self.execution_timeout,
244 self.current.as_ref().unwrap().execute(sql, params),
245 )
246 .await
247 .map_err(|_| {
248 SqlExecutionTimeoutSnafu {
249 sql: sql.to_string(),
250 duration: self.execution_timeout,
251 }
252 .build()
253 })?;
254
255 result.context(PostgresExecutionSnafu { sql })
256 }
257
258 async fn query(&self, sql: &str, params: &[&(dyn ToSql + Sync)]) -> Result<Vec<Row>> {
263 let result = tokio::time::timeout(
264 self.execution_timeout,
265 self.current.as_ref().unwrap().query(sql, params),
266 )
267 .await
268 .map_err(|_| {
269 SqlExecutionTimeoutSnafu {
270 sql: sql.to_string(),
271 duration: self.execution_timeout,
272 }
273 .build()
274 })?;
275
276 result.context(PostgresExecutionSnafu { sql })
277 }
278}
279
280pub struct PgElection {
282 leader_value: String,
283 pg_client: RwLock<ElectionPgClient>,
284 is_leader: AtomicBool,
285 leader_infancy: AtomicBool,
286 leader_watcher: broadcast::Sender<LeaderChangeMessage>,
287 store_key_prefix: String,
288 candidate_lease_ttl: Duration,
289 meta_lease_ttl: Duration,
290 sql_set: ElectionSqlSet,
291}
292
293impl PgElection {
294 async fn maybe_init_client(&self) -> Result<()> {
295 if self.pg_client.read().await.current.is_none() {
296 self.pg_client.write().await.maybe_init_client().await?;
297 }
298
299 Ok(())
300 }
301
302 pub async fn with_pg_client(
303 leader_value: String,
304 pg_client: ElectionPgClient,
305 store_key_prefix: String,
306 candidate_lease_ttl: Duration,
307 meta_lease_ttl: Duration,
308 table_name: &str,
309 lock_id: u64,
310 ) -> Result<ElectionRef> {
311 let sql_factory = ElectionSqlFactory::new(lock_id, table_name);
312
313 let tx = listen_leader_change(leader_value.clone());
314 Ok(Arc::new(Self {
315 leader_value,
316 pg_client: RwLock::new(pg_client),
317 is_leader: AtomicBool::new(false),
318 leader_infancy: AtomicBool::new(false),
319 leader_watcher: tx,
320 store_key_prefix,
321 candidate_lease_ttl,
322 meta_lease_ttl,
323 sql_set: sql_factory.build(),
324 }))
325 }
326
327 fn election_key(&self) -> String {
328 format!("{}{}", self.store_key_prefix, ELECTION_KEY)
329 }
330
331 fn candidate_root(&self) -> String {
332 format!("{}{}", self.store_key_prefix, CANDIDATES_ROOT)
333 }
334
335 fn candidate_key(&self) -> String {
336 format!("{}{}", self.candidate_root(), self.leader_value)
337 }
338}
339
340#[async_trait::async_trait]
341impl Election for PgElection {
342 type Leader = LeaderValue;
343
344 fn is_leader(&self) -> bool {
345 self.is_leader.load(Ordering::Relaxed)
346 }
347
348 fn in_leader_infancy(&self) -> bool {
349 self.leader_infancy
350 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
351 .is_ok()
352 }
353
354 async fn register_candidate(&self, node_info: &MetasrvNodeInfo) -> Result<()> {
355 let key = self.candidate_key();
356 let node_info =
357 serde_json::to_string(node_info).with_context(|_| SerializeToJsonSnafu {
358 input: format!("{node_info:?}"),
359 })?;
360 let res = self
361 .put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
362 .await?;
363 if !res {
365 self.delete_value(&key).await?;
366 self.put_value_with_lease(&key, &node_info, self.candidate_lease_ttl)
367 .await?;
368 }
369
370 let mut keep_alive_interval = tokio::time::interval(self.candidate_lease_ttl / 2);
372 loop {
373 let _ = keep_alive_interval.tick().await;
374
375 let lease = self
376 .get_value_with_lease(&key)
377 .await?
378 .context(UnexpectedSnafu {
379 violated: format!("Failed to get lease for key: {:?}", key),
380 })?;
381
382 ensure!(
383 lease.expire_time > lease.current,
384 UnexpectedSnafu {
385 violated: format!(
386 "Candidate lease expired at {:?} (current time {:?}), key: {:?}",
387 lease.expire_time, lease.current, key
388 ),
389 }
390 );
391
392 self.update_value_with_lease(&key, &lease.origin, &node_info, self.candidate_lease_ttl)
394 .await?;
395 }
396 }
397
398 async fn all_candidates(&self) -> Result<Vec<MetasrvNodeInfo>> {
399 let key_prefix = self.candidate_root();
400 let (mut candidates, current) = self.get_value_with_lease_by_prefix(&key_prefix).await?;
401 candidates.retain(|c| c.1 > current);
403 let mut valid_candidates = Vec::with_capacity(candidates.len());
404 for (c, _) in candidates {
405 let node_info: MetasrvNodeInfo =
406 serde_json::from_str(&c).with_context(|_| DeserializeFromJsonSnafu {
407 input: format!("{:?}", c),
408 })?;
409 valid_candidates.push(node_info);
410 }
411 Ok(valid_candidates)
412 }
413
414 async fn campaign(&self) -> Result<()> {
427 let mut keep_alive_interval = tokio::time::interval(self.meta_lease_ttl / 2);
428 keep_alive_interval.set_missed_tick_behavior(MissedTickBehavior::Delay);
429
430 self.maybe_init_client().await?;
431 loop {
432 let res = self
433 .pg_client
434 .read()
435 .await
436 .query(&self.sql_set.campaign, &[])
437 .await?;
438 let row = res.first().context(UnexpectedSnafu {
439 violated: "Failed to get the result of acquiring advisory lock",
440 })?;
441 let is_leader = row.try_get(0).map_err(|_| {
442 UnexpectedSnafu {
443 violated: "Failed to get the result of get lock",
444 }
445 .build()
446 })?;
447 if is_leader {
448 self.leader_action().await?;
449 } else {
450 self.follower_action().await?;
451 }
452 let _ = keep_alive_interval.tick().await;
453 }
454 }
455
456 async fn reset_campaign(&self) {
457 if let Err(err) = self.pg_client.write().await.reset_client().await {
458 error!(err; "Failed to reset client");
459 }
460 }
461
462 async fn leader(&self) -> Result<Self::Leader> {
463 if self.is_leader.load(Ordering::Relaxed) {
464 Ok(self.leader_value.as_bytes().into())
465 } else {
466 let key = self.election_key();
467 if let Some(lease) = self.get_value_with_lease(&key).await? {
468 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
469 Ok(lease.leader_value.as_bytes().into())
470 } else {
471 NoLeaderSnafu.fail()
472 }
473 }
474 }
475
476 async fn resign(&self) -> Result<()> {
477 todo!()
478 }
479
480 fn subscribe_leader_change(&self) -> broadcast::Receiver<LeaderChangeMessage> {
481 self.leader_watcher.subscribe()
482 }
483}
484
485impl PgElection {
486 async fn get_value_with_lease(&self, key: &str) -> Result<Option<Lease>> {
488 let key = key.as_bytes();
489 self.maybe_init_client().await?;
490 let res = self
491 .pg_client
492 .read()
493 .await
494 .query(&self.sql_set.get_value_with_lease, &[&key])
495 .await?;
496
497 if res.is_empty() {
498 Ok(None)
499 } else {
500 let current_time_str = res[0].try_get(1).unwrap_or_default();
502 let current_time = match Timestamp::from_str(current_time_str, None) {
503 Ok(ts) => ts,
504 Err(_) => UnexpectedSnafu {
505 violated: format!("Invalid timestamp: {}", current_time_str),
506 }
507 .fail()?,
508 };
509 let value_and_expire_time =
511 String::from_utf8_lossy(res[0].try_get(0).unwrap_or_default());
512 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
513
514 Ok(Some(Lease {
515 leader_value: value,
516 expire_time,
517 current: current_time,
518 origin: value_and_expire_time.to_string(),
519 }))
520 }
521 }
522
523 async fn get_value_with_lease_by_prefix(
525 &self,
526 key_prefix: &str,
527 ) -> Result<(Vec<(String, Timestamp)>, Timestamp)> {
528 let key_prefix = format!("{}%", key_prefix).as_bytes().to_vec();
529 self.maybe_init_client().await?;
530 let res = self
531 .pg_client
532 .read()
533 .await
534 .query(&self.sql_set.get_value_with_lease_by_prefix, &[&key_prefix])
535 .await?;
536
537 let mut values_with_leases = vec![];
538 let mut current = Timestamp::default();
539 for row in res {
540 let current_time_str = row.try_get(1).unwrap_or_default();
541 current = match Timestamp::from_str(current_time_str, None) {
542 Ok(ts) => ts,
543 Err(_) => UnexpectedSnafu {
544 violated: format!("Invalid timestamp: {}", current_time_str),
545 }
546 .fail()?,
547 };
548
549 let value_and_expire_time = String::from_utf8_lossy(row.try_get(0).unwrap_or_default());
550 let (value, expire_time) = parse_value_and_expire_time(&value_and_expire_time)?;
551
552 values_with_leases.push((value, expire_time));
553 }
554 Ok((values_with_leases, current))
555 }
556
557 async fn update_value_with_lease(
558 &self,
559 key: &str,
560 prev: &str,
561 updated: &str,
562 lease_ttl: Duration,
563 ) -> Result<()> {
564 let key = key.as_bytes();
565 let prev = prev.as_bytes();
566 self.maybe_init_client().await?;
567 let lease_ttl_secs = lease_ttl.as_secs() as f64;
568 let res = self
569 .pg_client
570 .read()
571 .await
572 .execute(
573 &self.sql_set.update_value_with_lease,
574 &[&key, &prev, &updated, &lease_ttl_secs],
575 )
576 .await?;
577
578 ensure!(
579 res == 1,
580 UnexpectedSnafu {
581 violated: format!("Failed to update key: {}", String::from_utf8_lossy(key)),
582 }
583 );
584
585 Ok(())
586 }
587
588 async fn put_value_with_lease(
590 &self,
591 key: &str,
592 value: &str,
593 lease_ttl: Duration,
594 ) -> Result<bool> {
595 let key = key.as_bytes();
596 let lease_ttl_secs = lease_ttl.as_secs() as f64;
597 let params: Vec<&(dyn ToSql + Sync)> = vec![&key, &value, &lease_ttl_secs];
598 self.maybe_init_client().await?;
599 let res = self
600 .pg_client
601 .read()
602 .await
603 .query(&self.sql_set.put_value_with_lease, ¶ms)
604 .await?;
605 Ok(res.is_empty())
606 }
607
608 async fn delete_value(&self, key: &str) -> Result<bool> {
611 let key = key.as_bytes();
612 self.maybe_init_client().await?;
613 let res = self
614 .pg_client
615 .read()
616 .await
617 .query(&self.sql_set.delete_value, &[&key])
618 .await?;
619
620 Ok(res.len() == 1)
621 }
622
623 async fn leader_action(&self) -> Result<()> {
644 let key = self.election_key();
645 if self.is_leader() {
647 match self.get_value_with_lease(&key).await? {
648 Some(lease) => {
649 match (
650 lease.leader_value == self.leader_value,
651 lease.expire_time > lease.current,
652 ) {
653 (true, true) => {
655 self.update_value_with_lease(
657 &key,
658 &lease.origin,
659 &self.leader_value,
660 self.meta_lease_ttl,
661 )
662 .await?;
663 }
664 (true, false) => {
666 warn!("Leader lease expired, now stepping down.");
667 self.step_down().await?;
668 }
669 (false, _) => {
671 warn!("Leader lease not found, but still hold the lock. Now stepping down.");
672 self.step_down().await?;
673 }
674 }
675 }
676 None => {
678 warn!("Leader lease not found, but still hold the lock. Now stepping down.");
679 self.step_down().await?;
680 }
681 }
682 } else {
684 self.elected().await?;
685 }
686 Ok(())
687 }
688
689 async fn follower_action(&self) -> Result<()> {
700 let key = self.election_key();
701 if self.is_leader() {
703 self.step_down_without_lock().await?;
704 }
705 let lease = self
706 .get_value_with_lease(&key)
707 .await?
708 .context(NoLeaderSnafu)?;
709 ensure!(lease.expire_time > lease.current, NoLeaderSnafu);
711 Ok(())
713 }
714
715 async fn step_down(&self) -> Result<()> {
722 let key = self.election_key();
723 let leader_key = RdsLeaderKey {
724 name: self.leader_value.clone().into_bytes(),
725 key: key.clone().into_bytes(),
726 ..Default::default()
727 };
728 self.delete_value(&key).await?;
729 self.maybe_init_client().await?;
730 self.pg_client
731 .read()
732 .await
733 .query(&self.sql_set.step_down, &[])
734 .await?;
735 send_leader_change_and_set_flags(
736 &self.is_leader,
737 &self.leader_infancy,
738 &self.leader_watcher,
739 LeaderChangeMessage::StepDown(Arc::new(leader_key)),
740 );
741 Ok(())
742 }
743
744 async fn step_down_without_lock(&self) -> Result<()> {
746 let key = self.election_key().into_bytes();
747 let leader_key = RdsLeaderKey {
748 name: self.leader_value.clone().into_bytes(),
749 key: key.clone(),
750 ..Default::default()
751 };
752 if self
753 .is_leader
754 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
755 .is_ok()
756 {
757 if let Err(e) = self
758 .leader_watcher
759 .send(LeaderChangeMessage::StepDown(Arc::new(leader_key)))
760 {
761 error!(e; "Failed to send leader change message");
762 }
763 }
764 Ok(())
765 }
766
767 async fn elected(&self) -> Result<()> {
770 let key = self.election_key();
771 let leader_key = RdsLeaderKey {
772 name: self.leader_value.clone().into_bytes(),
773 key: key.clone().into_bytes(),
774 ..Default::default()
775 };
776 self.delete_value(&key).await?;
777 self.put_value_with_lease(&key, &self.leader_value, self.meta_lease_ttl)
778 .await?;
779
780 if self
781 .is_leader
782 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
783 .is_ok()
784 {
785 self.leader_infancy.store(true, Ordering::Release);
786
787 if let Err(e) = self
788 .leader_watcher
789 .send(LeaderChangeMessage::Elected(Arc::new(leader_key)))
790 {
791 error!(e; "Failed to send leader change message");
792 }
793 }
794 Ok(())
795 }
796}
797
798#[cfg(test)]
799mod tests {
800 use std::assert_matches::assert_matches;
801 use std::env;
802
803 use common_meta::maybe_skip_postgres_integration_test;
804
805 use super::*;
806 use crate::bootstrap::create_postgres_pool;
807 use crate::error;
808
809 async fn create_postgres_client(
810 table_name: Option<&str>,
811 execution_timeout: Duration,
812 idle_session_timeout: Duration,
813 statement_timeout: Duration,
814 ) -> Result<ElectionPgClient> {
815 let endpoint = env::var("GT_POSTGRES_ENDPOINTS").unwrap_or_default();
816 if endpoint.is_empty() {
817 return UnexpectedSnafu {
818 violated: "Postgres endpoint is empty".to_string(),
819 }
820 .fail();
821 }
822 let pool = create_postgres_pool(&[endpoint]).await.unwrap();
823 let mut pg_client = ElectionPgClient::new(
824 pool,
825 execution_timeout,
826 idle_session_timeout,
827 statement_timeout,
828 )
829 .unwrap();
830 pg_client.maybe_init_client().await?;
831 if let Some(table_name) = table_name {
832 let create_table_sql = format!(
833 "CREATE TABLE IF NOT EXISTS \"{}\"(k bytea PRIMARY KEY, v bytea);",
834 table_name
835 );
836 pg_client.execute(&create_table_sql, &[]).await?;
837 }
838 Ok(pg_client)
839 }
840
841 async fn drop_table(pg_election: &PgElection, table_name: &str) {
842 let sql = format!("DROP TABLE IF EXISTS \"{}\";", table_name);
843 pg_election
844 .pg_client
845 .read()
846 .await
847 .execute(&sql, &[])
848 .await
849 .unwrap();
850 }
851
852 #[tokio::test]
853 async fn test_postgres_crud() {
854 maybe_skip_postgres_integration_test!();
855 let key = "test_key".to_string();
856 let value = "test_value".to_string();
857
858 let uuid = uuid::Uuid::new_v4().to_string();
859 let table_name = "test_postgres_crud_greptime_metakv";
860 let candidate_lease_ttl = Duration::from_secs(10);
861 let execution_timeout = Duration::from_secs(10);
862 let statement_timeout = Duration::from_secs(10);
863 let meta_lease_ttl = Duration::from_secs(2);
864 let idle_session_timeout = Duration::from_secs(0);
865 let client = create_postgres_client(
866 Some(table_name),
867 execution_timeout,
868 idle_session_timeout,
869 statement_timeout,
870 )
871 .await
872 .unwrap();
873
874 let (tx, _) = broadcast::channel(100);
875 let pg_election = PgElection {
876 leader_value: "test_leader".to_string(),
877 pg_client: RwLock::new(client),
878 is_leader: AtomicBool::new(false),
879 leader_infancy: AtomicBool::new(true),
880 leader_watcher: tx,
881 store_key_prefix: uuid,
882 candidate_lease_ttl,
883 meta_lease_ttl,
884 sql_set: ElectionSqlFactory::new(28319, table_name).build(),
885 };
886
887 let res = pg_election
888 .put_value_with_lease(&key, &value, candidate_lease_ttl)
889 .await
890 .unwrap();
891 assert!(res);
892
893 let lease = pg_election
894 .get_value_with_lease(&key)
895 .await
896 .unwrap()
897 .unwrap();
898 assert_eq!(lease.leader_value, value);
899
900 pg_election
901 .update_value_with_lease(&key, &lease.origin, &value, pg_election.meta_lease_ttl)
902 .await
903 .unwrap();
904
905 let res = pg_election.delete_value(&key).await.unwrap();
906 assert!(res);
907
908 let res = pg_election.get_value_with_lease(&key).await.unwrap();
909 assert!(res.is_none());
910
911 for i in 0..10 {
912 let key = format!("test_key_{}", i);
913 let value = format!("test_value_{}", i);
914 pg_election
915 .put_value_with_lease(&key, &value, candidate_lease_ttl)
916 .await
917 .unwrap();
918 }
919
920 let key_prefix = "test_key".to_string();
921 let (res, _) = pg_election
922 .get_value_with_lease_by_prefix(&key_prefix)
923 .await
924 .unwrap();
925 assert_eq!(res.len(), 10);
926
927 for i in 0..10 {
928 let key = format!("test_key_{}", i);
929 let res = pg_election.delete_value(&key).await.unwrap();
930 assert!(res);
931 }
932
933 let (res, current) = pg_election
934 .get_value_with_lease_by_prefix(&key_prefix)
935 .await
936 .unwrap();
937 assert!(res.is_empty());
938 assert!(current == Timestamp::default());
939
940 drop_table(&pg_election, table_name).await;
941 }
942
943 async fn candidate(
944 leader_value: String,
945 candidate_lease_ttl: Duration,
946 store_key_prefix: String,
947 table_name: String,
948 ) {
949 let execution_timeout = Duration::from_secs(10);
950 let statement_timeout = Duration::from_secs(10);
951 let meta_lease_ttl = Duration::from_secs(2);
952 let idle_session_timeout = Duration::from_secs(0);
953 let client = create_postgres_client(
954 None,
955 execution_timeout,
956 idle_session_timeout,
957 statement_timeout,
958 )
959 .await
960 .unwrap();
961
962 let (tx, _) = broadcast::channel(100);
963 let pg_election = PgElection {
964 leader_value,
965 pg_client: RwLock::new(client),
966 is_leader: AtomicBool::new(false),
967 leader_infancy: AtomicBool::new(true),
968 leader_watcher: tx,
969 store_key_prefix,
970 candidate_lease_ttl,
971 meta_lease_ttl,
972 sql_set: ElectionSqlFactory::new(28319, &table_name).build(),
973 };
974
975 let node_info = MetasrvNodeInfo {
976 addr: "test_addr".to_string(),
977 version: "test_version".to_string(),
978 git_commit: "test_git_commit".to_string(),
979 start_time_ms: 0,
980 };
981 pg_election.register_candidate(&node_info).await.unwrap();
982 }
983
984 #[tokio::test]
985 async fn test_candidate_registration() {
986 maybe_skip_postgres_integration_test!();
987 let leader_value_prefix = "test_leader".to_string();
988 let uuid = uuid::Uuid::new_v4().to_string();
989 let table_name = "test_candidate_registration_greptime_metakv";
990 let mut handles = vec![];
991 let candidate_lease_ttl = Duration::from_secs(5);
992 let execution_timeout = Duration::from_secs(10);
993 let statement_timeout = Duration::from_secs(10);
994 let meta_lease_ttl = Duration::from_secs(2);
995 let idle_session_timeout = Duration::from_secs(0);
996 let client = create_postgres_client(
997 Some(table_name),
998 execution_timeout,
999 idle_session_timeout,
1000 statement_timeout,
1001 )
1002 .await
1003 .unwrap();
1004
1005 for i in 0..10 {
1006 let leader_value = format!("{}{}", leader_value_prefix, i);
1007 let handle = tokio::spawn(candidate(
1008 leader_value,
1009 candidate_lease_ttl,
1010 uuid.clone(),
1011 table_name.to_string(),
1012 ));
1013 handles.push(handle);
1014 }
1015 tokio::time::sleep(Duration::from_secs(3)).await;
1017
1018 let (tx, _) = broadcast::channel(100);
1019 let leader_value = "test_leader".to_string();
1020 let pg_election = PgElection {
1021 leader_value,
1022 pg_client: RwLock::new(client),
1023 is_leader: AtomicBool::new(false),
1024 leader_infancy: AtomicBool::new(true),
1025 leader_watcher: tx,
1026 store_key_prefix: uuid.clone(),
1027 candidate_lease_ttl,
1028 meta_lease_ttl,
1029 sql_set: ElectionSqlFactory::new(28319, table_name).build(),
1030 };
1031
1032 let candidates = pg_election.all_candidates().await.unwrap();
1033 assert_eq!(candidates.len(), 10);
1034
1035 for handle in handles {
1036 handle.abort();
1037 }
1038
1039 tokio::time::sleep(Duration::from_secs(5)).await;
1041 let candidates = pg_election.all_candidates().await.unwrap();
1042 assert!(candidates.is_empty());
1043
1044 for i in 0..10 {
1046 let key = format!("{}{}{}{}", uuid, CANDIDATES_ROOT, leader_value_prefix, i);
1047 let res = pg_election.delete_value(&key).await.unwrap();
1048 assert!(res);
1049 }
1050
1051 drop_table(&pg_election, table_name).await;
1052 }
1053
1054 #[tokio::test]
1055 async fn test_elected_and_step_down() {
1056 maybe_skip_postgres_integration_test!();
1057 let leader_value = "test_leader".to_string();
1058 let uuid = uuid::Uuid::new_v4().to_string();
1059 let table_name = "test_elected_and_step_down_greptime_metakv";
1060 let candidate_lease_ttl = Duration::from_secs(5);
1061 let execution_timeout = Duration::from_secs(10);
1062 let statement_timeout = Duration::from_secs(10);
1063 let meta_lease_ttl = Duration::from_secs(2);
1064 let idle_session_timeout = Duration::from_secs(0);
1065 let client = create_postgres_client(
1066 Some(table_name),
1067 execution_timeout,
1068 idle_session_timeout,
1069 statement_timeout,
1070 )
1071 .await
1072 .unwrap();
1073
1074 let (tx, mut rx) = broadcast::channel(100);
1075 let leader_pg_election = PgElection {
1076 leader_value: leader_value.clone(),
1077 pg_client: RwLock::new(client),
1078 is_leader: AtomicBool::new(false),
1079 leader_infancy: AtomicBool::new(true),
1080 leader_watcher: tx,
1081 store_key_prefix: uuid,
1082 candidate_lease_ttl,
1083 meta_lease_ttl,
1084 sql_set: ElectionSqlFactory::new(28320, table_name).build(),
1085 };
1086
1087 leader_pg_election.elected().await.unwrap();
1088 let lease = leader_pg_election
1089 .get_value_with_lease(&leader_pg_election.election_key())
1090 .await
1091 .unwrap()
1092 .unwrap();
1093 assert!(lease.leader_value == leader_value);
1094 assert!(lease.expire_time > lease.current);
1095 assert!(leader_pg_election.is_leader());
1096
1097 match rx.recv().await {
1098 Ok(LeaderChangeMessage::Elected(key)) => {
1099 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1100 assert_eq!(
1101 String::from_utf8_lossy(key.key()),
1102 leader_pg_election.election_key()
1103 );
1104 assert_eq!(key.lease_id(), i64::default());
1105 assert_eq!(key.revision(), i64::default());
1106 }
1107 _ => panic!("Expected LeaderChangeMessage::Elected"),
1108 }
1109
1110 leader_pg_election.step_down_without_lock().await.unwrap();
1111 let lease = leader_pg_election
1112 .get_value_with_lease(&leader_pg_election.election_key())
1113 .await
1114 .unwrap()
1115 .unwrap();
1116 assert!(lease.leader_value == leader_value);
1117 assert!(!leader_pg_election.is_leader());
1118
1119 match rx.recv().await {
1120 Ok(LeaderChangeMessage::StepDown(key)) => {
1121 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1122 assert_eq!(
1123 String::from_utf8_lossy(key.key()),
1124 leader_pg_election.election_key()
1125 );
1126 assert_eq!(key.lease_id(), i64::default());
1127 assert_eq!(key.revision(), i64::default());
1128 }
1129 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1130 }
1131
1132 leader_pg_election.elected().await.unwrap();
1133 let lease = leader_pg_election
1134 .get_value_with_lease(&leader_pg_election.election_key())
1135 .await
1136 .unwrap()
1137 .unwrap();
1138 assert!(lease.leader_value == leader_value);
1139 assert!(lease.expire_time > lease.current);
1140 assert!(leader_pg_election.is_leader());
1141
1142 match rx.recv().await {
1143 Ok(LeaderChangeMessage::Elected(key)) => {
1144 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1145 assert_eq!(
1146 String::from_utf8_lossy(key.key()),
1147 leader_pg_election.election_key()
1148 );
1149 assert_eq!(key.lease_id(), i64::default());
1150 assert_eq!(key.revision(), i64::default());
1151 }
1152 _ => panic!("Expected LeaderChangeMessage::Elected"),
1153 }
1154
1155 leader_pg_election.step_down().await.unwrap();
1156 let res = leader_pg_election
1157 .get_value_with_lease(&leader_pg_election.election_key())
1158 .await
1159 .unwrap();
1160 assert!(res.is_none());
1161 assert!(!leader_pg_election.is_leader());
1162
1163 match rx.recv().await {
1164 Ok(LeaderChangeMessage::StepDown(key)) => {
1165 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1166 assert_eq!(
1167 String::from_utf8_lossy(key.key()),
1168 leader_pg_election.election_key()
1169 );
1170 assert_eq!(key.lease_id(), i64::default());
1171 assert_eq!(key.revision(), i64::default());
1172 }
1173 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1174 }
1175
1176 drop_table(&leader_pg_election, table_name).await;
1177 }
1178
1179 #[tokio::test]
1180 async fn test_leader_action() {
1181 maybe_skip_postgres_integration_test!();
1182 let leader_value = "test_leader".to_string();
1183 let uuid = uuid::Uuid::new_v4().to_string();
1184 let table_name = "test_leader_action_greptime_metakv";
1185 let candidate_lease_ttl = Duration::from_secs(5);
1186 let execution_timeout = Duration::from_secs(10);
1187 let statement_timeout = Duration::from_secs(10);
1188 let meta_lease_ttl = Duration::from_secs(2);
1189 let idle_session_timeout = Duration::from_secs(0);
1190 let client = create_postgres_client(
1191 Some(table_name),
1192 execution_timeout,
1193 idle_session_timeout,
1194 statement_timeout,
1195 )
1196 .await
1197 .unwrap();
1198
1199 let (tx, mut rx) = broadcast::channel(100);
1200 let leader_pg_election = PgElection {
1201 leader_value: leader_value.clone(),
1202 pg_client: RwLock::new(client),
1203 is_leader: AtomicBool::new(false),
1204 leader_infancy: AtomicBool::new(true),
1205 leader_watcher: tx,
1206 store_key_prefix: uuid,
1207 candidate_lease_ttl,
1208 meta_lease_ttl,
1209 sql_set: ElectionSqlFactory::new(28321, table_name).build(),
1210 };
1211
1212 let res = leader_pg_election
1214 .pg_client
1215 .read()
1216 .await
1217 .query(&leader_pg_election.sql_set.campaign, &[])
1218 .await
1219 .unwrap();
1220 let res: bool = res[0].get(0);
1221 assert!(res);
1222 leader_pg_election.leader_action().await.unwrap();
1223 let lease = leader_pg_election
1224 .get_value_with_lease(&leader_pg_election.election_key())
1225 .await
1226 .unwrap()
1227 .unwrap();
1228 assert!(lease.leader_value == leader_value);
1229 assert!(lease.expire_time > lease.current);
1230 assert!(leader_pg_election.is_leader());
1231
1232 match rx.recv().await {
1233 Ok(LeaderChangeMessage::Elected(key)) => {
1234 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1235 assert_eq!(
1236 String::from_utf8_lossy(key.key()),
1237 leader_pg_election.election_key()
1238 );
1239 assert_eq!(key.lease_id(), i64::default());
1240 assert_eq!(key.revision(), i64::default());
1241 }
1242 _ => panic!("Expected LeaderChangeMessage::Elected"),
1243 }
1244
1245 let res = leader_pg_election
1247 .pg_client
1248 .read()
1249 .await
1250 .query(&leader_pg_election.sql_set.campaign, &[])
1251 .await
1252 .unwrap();
1253 let res: bool = res[0].get(0);
1254 assert!(res);
1255 leader_pg_election.leader_action().await.unwrap();
1256 let new_lease = leader_pg_election
1257 .get_value_with_lease(&leader_pg_election.election_key())
1258 .await
1259 .unwrap()
1260 .unwrap();
1261 assert!(new_lease.leader_value == leader_value);
1262 assert!(
1263 new_lease.expire_time > new_lease.current && new_lease.expire_time > lease.expire_time
1264 );
1265 assert!(leader_pg_election.is_leader());
1266
1267 tokio::time::sleep(Duration::from_secs(2)).await;
1269
1270 let res = leader_pg_election
1271 .pg_client
1272 .read()
1273 .await
1274 .query(&leader_pg_election.sql_set.campaign, &[])
1275 .await
1276 .unwrap();
1277 let res: bool = res[0].get(0);
1278 assert!(res);
1279 leader_pg_election.leader_action().await.unwrap();
1280 let res = leader_pg_election
1281 .get_value_with_lease(&leader_pg_election.election_key())
1282 .await
1283 .unwrap();
1284 assert!(res.is_none());
1285
1286 match rx.recv().await {
1287 Ok(LeaderChangeMessage::StepDown(key)) => {
1288 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1289 assert_eq!(
1290 String::from_utf8_lossy(key.key()),
1291 leader_pg_election.election_key()
1292 );
1293 assert_eq!(key.lease_id(), i64::default());
1294 assert_eq!(key.revision(), i64::default());
1295 }
1296 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1297 }
1298
1299 let res = leader_pg_election
1301 .pg_client
1302 .read()
1303 .await
1304 .query(&leader_pg_election.sql_set.campaign, &[])
1305 .await
1306 .unwrap();
1307 let res: bool = res[0].get(0);
1308 assert!(res);
1309 leader_pg_election.leader_action().await.unwrap();
1310 let lease = leader_pg_election
1311 .get_value_with_lease(&leader_pg_election.election_key())
1312 .await
1313 .unwrap()
1314 .unwrap();
1315 assert!(lease.leader_value == leader_value);
1316 assert!(lease.expire_time > lease.current);
1317 assert!(leader_pg_election.is_leader());
1318
1319 match rx.recv().await {
1320 Ok(LeaderChangeMessage::Elected(key)) => {
1321 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1322 assert_eq!(
1323 String::from_utf8_lossy(key.key()),
1324 leader_pg_election.election_key()
1325 );
1326 assert_eq!(key.lease_id(), i64::default());
1327 assert_eq!(key.revision(), i64::default());
1328 }
1329 _ => panic!("Expected LeaderChangeMessage::Elected"),
1330 }
1331
1332 leader_pg_election
1334 .delete_value(&leader_pg_election.election_key())
1335 .await
1336 .unwrap();
1337 leader_pg_election.leader_action().await.unwrap();
1338 let res = leader_pg_election
1339 .get_value_with_lease(&leader_pg_election.election_key())
1340 .await
1341 .unwrap();
1342 assert!(res.is_none());
1343 assert!(!leader_pg_election.is_leader());
1344
1345 match rx.recv().await {
1346 Ok(LeaderChangeMessage::StepDown(key)) => {
1347 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1348 assert_eq!(
1349 String::from_utf8_lossy(key.key()),
1350 leader_pg_election.election_key()
1351 );
1352 assert_eq!(key.lease_id(), i64::default());
1353 assert_eq!(key.revision(), i64::default());
1354 }
1355 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1356 }
1357
1358 let res = leader_pg_election
1360 .pg_client
1361 .read()
1362 .await
1363 .query(&leader_pg_election.sql_set.campaign, &[])
1364 .await
1365 .unwrap();
1366 let res: bool = res[0].get(0);
1367 assert!(res);
1368 leader_pg_election.leader_action().await.unwrap();
1369 let lease = leader_pg_election
1370 .get_value_with_lease(&leader_pg_election.election_key())
1371 .await
1372 .unwrap()
1373 .unwrap();
1374 assert!(lease.leader_value == leader_value);
1375 assert!(lease.expire_time > lease.current);
1376 assert!(leader_pg_election.is_leader());
1377
1378 match rx.recv().await {
1379 Ok(LeaderChangeMessage::Elected(key)) => {
1380 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1381 assert_eq!(
1382 String::from_utf8_lossy(key.key()),
1383 leader_pg_election.election_key()
1384 );
1385 assert_eq!(key.lease_id(), i64::default());
1386 assert_eq!(key.revision(), i64::default());
1387 }
1388 _ => panic!("Expected LeaderChangeMessage::Elected"),
1389 }
1390
1391 let res = leader_pg_election
1393 .pg_client
1394 .read()
1395 .await
1396 .query(&leader_pg_election.sql_set.campaign, &[])
1397 .await
1398 .unwrap();
1399 let res: bool = res[0].get(0);
1400 assert!(res);
1401 leader_pg_election
1402 .delete_value(&leader_pg_election.election_key())
1403 .await
1404 .unwrap();
1405 leader_pg_election
1406 .put_value_with_lease(
1407 &leader_pg_election.election_key(),
1408 "test",
1409 Duration::from_secs(10),
1410 )
1411 .await
1412 .unwrap();
1413 leader_pg_election.leader_action().await.unwrap();
1414 let res = leader_pg_election
1415 .get_value_with_lease(&leader_pg_election.election_key())
1416 .await
1417 .unwrap();
1418 assert!(res.is_none());
1419 assert!(!leader_pg_election.is_leader());
1420
1421 match rx.recv().await {
1422 Ok(LeaderChangeMessage::StepDown(key)) => {
1423 assert_eq!(String::from_utf8_lossy(key.name()), leader_value);
1424 assert_eq!(
1425 String::from_utf8_lossy(key.key()),
1426 leader_pg_election.election_key()
1427 );
1428 assert_eq!(key.lease_id(), i64::default());
1429 assert_eq!(key.revision(), i64::default());
1430 }
1431 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1432 }
1433
1434 leader_pg_election
1436 .pg_client
1437 .read()
1438 .await
1439 .query(&leader_pg_election.sql_set.step_down, &[])
1440 .await
1441 .unwrap();
1442
1443 drop_table(&leader_pg_election, table_name).await;
1444 }
1445
1446 #[tokio::test]
1447 async fn test_follower_action() {
1448 maybe_skip_postgres_integration_test!();
1449 common_telemetry::init_default_ut_logging();
1450 let uuid = uuid::Uuid::new_v4().to_string();
1451 let table_name = "test_follower_action_greptime_metakv";
1452
1453 let candidate_lease_ttl = Duration::from_secs(5);
1454 let execution_timeout = Duration::from_secs(10);
1455 let statement_timeout = Duration::from_secs(10);
1456 let meta_lease_ttl = Duration::from_secs(2);
1457 let idle_session_timeout = Duration::from_secs(0);
1458 let follower_client = create_postgres_client(
1459 Some(table_name),
1460 execution_timeout,
1461 idle_session_timeout,
1462 statement_timeout,
1463 )
1464 .await
1465 .unwrap();
1466 let (tx, mut rx) = broadcast::channel(100);
1467 let follower_pg_election = PgElection {
1468 leader_value: "test_follower".to_string(),
1469 pg_client: RwLock::new(follower_client),
1470 is_leader: AtomicBool::new(false),
1471 leader_infancy: AtomicBool::new(true),
1472 leader_watcher: tx,
1473 store_key_prefix: uuid.clone(),
1474 candidate_lease_ttl,
1475 meta_lease_ttl,
1476 sql_set: ElectionSqlFactory::new(28322, table_name).build(),
1477 };
1478
1479 let leader_client = create_postgres_client(
1480 Some(table_name),
1481 execution_timeout,
1482 idle_session_timeout,
1483 statement_timeout,
1484 )
1485 .await
1486 .unwrap();
1487 let (tx, _) = broadcast::channel(100);
1488 let leader_pg_election = PgElection {
1489 leader_value: "test_leader".to_string(),
1490 pg_client: RwLock::new(leader_client),
1491 is_leader: AtomicBool::new(false),
1492 leader_infancy: AtomicBool::new(true),
1493 leader_watcher: tx,
1494 store_key_prefix: uuid,
1495 candidate_lease_ttl,
1496 meta_lease_ttl,
1497 sql_set: ElectionSqlFactory::new(28322, table_name).build(),
1498 };
1499
1500 leader_pg_election
1501 .pg_client
1502 .read()
1503 .await
1504 .query(&leader_pg_election.sql_set.campaign, &[])
1505 .await
1506 .unwrap();
1507 leader_pg_election.elected().await.unwrap();
1508
1509 follower_pg_election.follower_action().await.unwrap();
1511
1512 tokio::time::sleep(Duration::from_secs(2)).await;
1514 assert!(follower_pg_election.follower_action().await.is_err());
1515
1516 leader_pg_election
1518 .delete_value(&leader_pg_election.election_key())
1519 .await
1520 .unwrap();
1521 assert!(follower_pg_election.follower_action().await.is_err());
1522
1523 follower_pg_election
1525 .is_leader
1526 .store(true, Ordering::Relaxed);
1527 assert!(follower_pg_election.follower_action().await.is_err());
1528
1529 match rx.recv().await {
1530 Ok(LeaderChangeMessage::StepDown(key)) => {
1531 assert_eq!(String::from_utf8_lossy(key.name()), "test_follower");
1532 assert_eq!(
1533 String::from_utf8_lossy(key.key()),
1534 follower_pg_election.election_key()
1535 );
1536 assert_eq!(key.lease_id(), i64::default());
1537 assert_eq!(key.revision(), i64::default());
1538 }
1539 _ => panic!("Expected LeaderChangeMessage::StepDown"),
1540 }
1541
1542 leader_pg_election
1544 .pg_client
1545 .read()
1546 .await
1547 .query(&leader_pg_election.sql_set.step_down, &[])
1548 .await
1549 .unwrap();
1550
1551 drop_table(&follower_pg_election, table_name).await;
1552 }
1553
1554 #[tokio::test]
1555 async fn test_idle_session_timeout() {
1556 maybe_skip_postgres_integration_test!();
1557 common_telemetry::init_default_ut_logging();
1558 let execution_timeout = Duration::from_secs(10);
1559 let statement_timeout = Duration::from_secs(10);
1560 let idle_session_timeout = Duration::from_secs(1);
1561 let mut client = create_postgres_client(
1562 None,
1563 execution_timeout,
1564 idle_session_timeout,
1565 statement_timeout,
1566 )
1567 .await
1568 .unwrap();
1569 tokio::time::sleep(Duration::from_millis(1100)).await;
1570 let err = client.query("SELECT 1", &[]).await.unwrap_err();
1572 assert_matches!(err, error::Error::PostgresExecution { .. });
1573 let error::Error::PostgresExecution { error, .. } = err else {
1574 panic!("Expected PostgresExecution error");
1575 };
1576 assert!(error.is_closed());
1577 client.reset_client().await.unwrap();
1579 let _ = client.query("SELECT 1", &[]).await.unwrap();
1580 }
1581}