1pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use api::v1::meta::{HeartbeatConfig, Role};
23use clap::ValueEnum;
24use common_base::Plugins;
25use common_base::readable_size::ReadableSize;
26use common_config::{Configurable, DEFAULT_DATA_HOME};
27use common_event_recorder::EventRecorderOptions;
28use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
29use common_meta::cache_invalidator::CacheInvalidatorRef;
30use common_meta::ddl::allocator::resource_id::ResourceIdAllocatorRef;
31use common_meta::ddl_manager::DdlManagerRef;
32use common_meta::distributed_time_constants::{
33 self, BASE_HEARTBEAT_INTERVAL, default_distributed_time_constants, frontend_heartbeat_interval,
34};
35use common_meta::key::TableMetadataManagerRef;
36use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
37use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
38use common_meta::leadership_notifier::{
39 LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
40};
41use common_meta::node_expiry_listener::NodeExpiryListener;
42use common_meta::peer::{Peer, PeerDiscoveryRef};
43use common_meta::reconciliation::manager::ReconciliationManagerRef;
44use common_meta::region_keeper::MemoryRegionKeeperRef;
45use common_meta::region_registry::LeaderRegionRegistryRef;
46use common_meta::stats::topic::TopicStatsRegistryRef;
47use common_meta::wal_provider::WalProviderRef;
48use common_options::datanode::DatanodeClientOptions;
49use common_options::memory::MemoryOptions;
50use common_procedure::ProcedureManagerRef;
51use common_procedure::options::ProcedureConfig;
52use common_stat::ResourceStatRef;
53use common_telemetry::logging::{LoggingOptions, TracingOptions};
54use common_telemetry::{error, info, warn};
55use common_time::util::DefaultSystemTimer;
56use common_wal::config::MetasrvWalConfig;
57use serde::{Deserialize, Serialize};
58use servers::grpc::GrpcOptions;
59use servers::http::HttpOptions;
60use servers::tls::TlsOption;
61use snafu::{OptionExt, ResultExt};
62use store_api::storage::RegionId;
63use tokio::sync::broadcast::error::RecvError;
64
65use crate::cluster::MetaPeerClientRef;
66use crate::discovery;
67use crate::election::{Election, LeaderChangeMessage};
68use crate::error::{
69 self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
70 StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
71};
72use crate::failure_detector::PhiAccrualFailureDetectorOptions;
73use crate::gc::{GcSchedulerOptions, GcTickerRef};
74use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
75use crate::procedure::ProcedureManagerListenerAdapter;
76use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
77use crate::procedure::wal_prune::manager::WalPruneTickerRef;
78use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
79use crate::region::flush_trigger::RegionFlushTickerRef;
80use crate::region::supervisor::RegionSupervisorTickerRef;
81use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
82use crate::service::mailbox::MailboxRef;
83use crate::service::store::cached_kv::LeaderCachedKvBackend;
84use crate::state::{StateRef, become_follower, become_leader};
85
86pub const TABLE_ID_SEQ: &str = "table_id";
87pub const FLOW_ID_SEQ: &str = "flow_id";
88pub const METASRV_DATA_DIR: &str = "metasrv";
89
90#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
92#[serde(rename_all = "snake_case")]
93pub enum BackendImpl {
94 #[default]
96 EtcdStore,
97 MemoryStore,
99 #[cfg(feature = "pg_kvbackend")]
100 PostgresStore,
102 #[cfg(feature = "mysql_kvbackend")]
103 MysqlStore,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
109pub struct StatsPersistenceOptions {
110 #[serde(with = "humantime_serde")]
112 pub ttl: Duration,
113 #[serde(with = "humantime_serde")]
115 pub interval: Duration,
116}
117
118impl Default for StatsPersistenceOptions {
119 fn default() -> Self {
120 Self {
121 ttl: Duration::ZERO,
122 interval: Duration::from_mins(10),
123 }
124 }
125}
126
127#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
129#[serde(default)]
130pub struct HeartbeatOptions {
131 #[serde(with = "humantime_serde")]
133 pub interval: Duration,
134 #[serde(with = "humantime_serde")]
136 pub retry_interval: Duration,
137}
138
139impl Default for HeartbeatOptions {
140 fn default() -> Self {
141 Self {
142 interval: BASE_HEARTBEAT_INTERVAL,
143 retry_interval: BASE_HEARTBEAT_INTERVAL,
144 }
145 }
146}
147
148impl HeartbeatOptions {
149 pub fn datanode_from(base_interval: Duration) -> Self {
150 Self {
151 interval: base_interval,
152 retry_interval: base_interval,
153 }
154 }
155
156 pub fn frontend_from(base_interval: Duration) -> Self {
157 Self {
158 interval: frontend_heartbeat_interval(base_interval),
159 retry_interval: base_interval,
160 }
161 }
162
163 pub fn flownode_from(base_interval: Duration) -> Self {
164 Self {
165 interval: base_interval,
166 retry_interval: base_interval,
167 }
168 }
169}
170
171impl From<HeartbeatOptions> for HeartbeatConfig {
172 fn from(opts: HeartbeatOptions) -> Self {
173 Self {
174 heartbeat_interval_ms: opts.interval.as_millis() as u64,
175 retry_interval_ms: opts.retry_interval.as_millis() as u64,
176 }
177 }
178}
179
180#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
181#[serde(default)]
182pub struct BackendClientOptions {
183 #[serde(with = "humantime_serde")]
184 pub keep_alive_timeout: Duration,
185 #[serde(with = "humantime_serde")]
186 pub keep_alive_interval: Duration,
187 #[serde(with = "humantime_serde")]
188 pub connect_timeout: Duration,
189}
190
191impl Default for BackendClientOptions {
192 fn default() -> Self {
193 Self {
194 keep_alive_interval: Duration::from_secs(10),
195 keep_alive_timeout: Duration::from_secs(3),
196 connect_timeout: Duration::from_secs(3),
197 }
198 }
199}
200
201#[derive(Clone, PartialEq, Serialize, Deserialize)]
202#[serde(default)]
203pub struct MetasrvOptions {
204 #[deprecated(note = "Use grpc.bind_addr instead")]
206 pub bind_addr: String,
207 #[deprecated(note = "Use grpc.server_addr instead")]
209 pub server_addr: String,
210 pub store_addrs: Vec<String>,
212 #[serde(default)]
215 pub backend_tls: Option<TlsOption>,
216 #[serde(default)]
219 pub backend_client: BackendClientOptions,
220 pub selector: SelectorType,
222 pub enable_region_failover: bool,
224 #[serde(with = "humantime_serde")]
229 pub heartbeat_interval: Duration,
230 #[serde(with = "humantime_serde")]
234 pub region_failure_detector_initialization_delay: Duration,
235 pub allow_region_failover_on_local_wal: bool,
240 pub grpc: GrpcOptions,
241 pub http: HttpOptions,
243 pub logging: LoggingOptions,
245 pub procedure: ProcedureConfig,
247 pub failure_detector: PhiAccrualFailureDetectorOptions,
249 pub datanode: DatanodeClientOptions,
251 pub enable_telemetry: bool,
253 pub data_home: String,
255 pub wal: MetasrvWalConfig,
257 pub store_key_prefix: String,
260 pub max_txn_ops: usize,
271 pub flush_stats_factor: usize,
275 pub tracing: TracingOptions,
277 pub memory: MemoryOptions,
279 pub backend: BackendImpl,
281 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
282 pub meta_table_name: String,
284 #[cfg(feature = "pg_kvbackend")]
285 pub meta_election_lock_id: u64,
287 #[cfg(feature = "pg_kvbackend")]
288 pub meta_schema_name: Option<String>,
290 #[cfg(feature = "pg_kvbackend")]
291 pub auto_create_schema: bool,
293 #[serde(with = "humantime_serde")]
294 pub node_max_idle_time: Duration,
295 pub event_recorder: EventRecorderOptions,
297 pub stats_persistence: StatsPersistenceOptions,
299 pub gc: GcSchedulerOptions,
301}
302
303impl fmt::Debug for MetasrvOptions {
304 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
305 let mut debug_struct = f.debug_struct("MetasrvOptions");
306 debug_struct
307 .field("store_addrs", &self.sanitize_store_addrs())
308 .field("backend_tls", &self.backend_tls)
309 .field("selector", &self.selector)
310 .field("enable_region_failover", &self.enable_region_failover)
311 .field(
312 "allow_region_failover_on_local_wal",
313 &self.allow_region_failover_on_local_wal,
314 )
315 .field("grpc", &self.grpc)
316 .field("http", &self.http)
317 .field("logging", &self.logging)
318 .field("procedure", &self.procedure)
319 .field("failure_detector", &self.failure_detector)
320 .field("datanode", &self.datanode)
321 .field("enable_telemetry", &self.enable_telemetry)
322 .field("data_home", &self.data_home)
323 .field("wal", &self.wal)
324 .field("store_key_prefix", &self.store_key_prefix)
325 .field("max_txn_ops", &self.max_txn_ops)
326 .field("flush_stats_factor", &self.flush_stats_factor)
327 .field("tracing", &self.tracing)
328 .field("backend", &self.backend)
329 .field("event_recorder", &self.event_recorder)
330 .field("stats_persistence", &self.stats_persistence)
331 .field("heartbeat_interval", &self.heartbeat_interval)
332 .field("backend_client", &self.backend_client);
333
334 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
335 debug_struct.field("meta_table_name", &self.meta_table_name);
336
337 #[cfg(feature = "pg_kvbackend")]
338 debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
339 #[cfg(feature = "pg_kvbackend")]
340 debug_struct.field("meta_schema_name", &self.meta_schema_name);
341
342 debug_struct
343 .field("node_max_idle_time", &self.node_max_idle_time)
344 .finish()
345 }
346}
347
348const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
349
350impl Default for MetasrvOptions {
351 fn default() -> Self {
352 Self {
353 #[allow(deprecated)]
354 bind_addr: String::new(),
355 #[allow(deprecated)]
356 server_addr: String::new(),
357 store_addrs: vec!["127.0.0.1:2379".to_string()],
358 backend_tls: Some(TlsOption::prefer()),
359 selector: SelectorType::default(),
360 enable_region_failover: false,
361 heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
362 region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
363 allow_region_failover_on_local_wal: false,
364 grpc: GrpcOptions {
365 bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
366 ..Default::default()
367 },
368 http: HttpOptions::default(),
369 logging: LoggingOptions::default(),
370 procedure: ProcedureConfig {
371 max_retry_times: 12,
372 retry_delay: Duration::from_millis(500),
373 max_metadata_value_size: Some(ReadableSize::kb(1500)),
376 max_running_procedures: 128,
377 },
378 failure_detector: PhiAccrualFailureDetectorOptions::default(),
379 datanode: DatanodeClientOptions::default(),
380 enable_telemetry: true,
381 data_home: DEFAULT_DATA_HOME.to_string(),
382 wal: MetasrvWalConfig::default(),
383 store_key_prefix: String::new(),
384 max_txn_ops: 128,
385 flush_stats_factor: 3,
386 tracing: TracingOptions::default(),
387 memory: MemoryOptions::default(),
388 backend: BackendImpl::EtcdStore,
389 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
390 meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
391 #[cfg(feature = "pg_kvbackend")]
392 meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
393 #[cfg(feature = "pg_kvbackend")]
394 meta_schema_name: None,
395 #[cfg(feature = "pg_kvbackend")]
396 auto_create_schema: true,
397 node_max_idle_time: Duration::from_secs(24 * 60 * 60),
398 event_recorder: EventRecorderOptions::default(),
399 stats_persistence: StatsPersistenceOptions::default(),
400 gc: GcSchedulerOptions::default(),
401 backend_client: BackendClientOptions::default(),
402 }
403 }
404}
405
406impl Configurable for MetasrvOptions {
407 fn env_list_keys() -> Option<&'static [&'static str]> {
408 Some(&["wal.broker_endpoints", "store_addrs"])
409 }
410}
411
412impl MetasrvOptions {
413 fn sanitize_store_addrs(&self) -> Vec<String> {
414 self.store_addrs
415 .iter()
416 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
417 .collect()
418 }
419}
420
421pub struct MetasrvInfo {
422 pub server_addr: String,
423}
424#[derive(Clone)]
425pub struct Context {
426 pub server_addr: String,
427 pub in_memory: ResettableKvBackendRef,
428 pub kv_backend: KvBackendRef,
429 pub leader_cached_kv_backend: ResettableKvBackendRef,
430 pub meta_peer_client: MetaPeerClientRef,
431 pub mailbox: MailboxRef,
432 pub election: Option<ElectionRef>,
433 pub is_infancy: bool,
434 pub table_metadata_manager: TableMetadataManagerRef,
435 pub cache_invalidator: CacheInvalidatorRef,
436 pub leader_region_registry: LeaderRegionRegistryRef,
437 pub topic_stats_registry: TopicStatsRegistryRef,
438 pub heartbeat_interval: Duration,
439 pub is_handshake: bool,
440}
441
442impl Context {
443 pub fn reset_in_memory(&self) {
444 self.in_memory.reset();
445 self.leader_region_registry.reset();
446 }
447
448 pub fn with_handshake(mut self, is_handshake: bool) -> Self {
449 self.is_handshake = is_handshake;
450 self
451 }
452
453 pub fn heartbeat_options_for(&self, role: Role) -> HeartbeatOptions {
454 match role {
455 Role::Datanode => HeartbeatOptions::datanode_from(self.heartbeat_interval),
456 Role::Frontend => HeartbeatOptions::frontend_from(self.heartbeat_interval),
457 Role::Flownode => HeartbeatOptions::flownode_from(self.heartbeat_interval),
458 }
459 }
460}
461
462pub struct LeaderValue(pub String);
464
465impl<T: AsRef<[u8]>> From<T> for LeaderValue {
466 fn from(value: T) -> Self {
467 let string = String::from_utf8_lossy(value.as_ref());
468 Self(string.to_string())
469 }
470}
471
472#[derive(Debug, Clone, Serialize, Deserialize)]
473pub struct MetasrvNodeInfo {
474 pub addr: String,
476 pub version: String,
478 pub git_commit: String,
480 pub start_time_ms: u64,
482 #[serde(default)]
484 pub total_cpu_millicores: i64,
485 #[serde(default)]
487 pub total_memory_bytes: i64,
488 #[serde(default)]
490 pub cpu_usage_millicores: i64,
491 #[serde(default)]
493 pub memory_usage_bytes: i64,
494 #[serde(default)]
496 pub hostname: String,
497}
498
499#[allow(deprecated)]
501impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
502 fn from(node_info: MetasrvNodeInfo) -> Self {
503 Self {
504 peer: Some(api::v1::meta::Peer {
505 addr: node_info.addr,
506 ..Default::default()
507 }),
508 version: node_info.version.clone(),
511 git_commit: node_info.git_commit.clone(),
512 start_time_ms: node_info.start_time_ms,
513 cpus: node_info.total_cpu_millicores as u32,
514 memory_bytes: node_info.total_memory_bytes as u64,
515 info: Some(api::v1::meta::NodeInfo {
517 version: node_info.version,
518 git_commit: node_info.git_commit,
519 start_time_ms: node_info.start_time_ms,
520 total_cpu_millicores: node_info.total_cpu_millicores,
521 total_memory_bytes: node_info.total_memory_bytes,
522 cpu_usage_millicores: node_info.cpu_usage_millicores,
523 memory_usage_bytes: node_info.memory_usage_bytes,
524 cpus: node_info.total_cpu_millicores as u32,
525 memory_bytes: node_info.total_memory_bytes as u64,
526 hostname: node_info.hostname,
527 }),
528 }
529 }
530}
531
532#[derive(Clone, Copy)]
533pub enum SelectTarget {
534 Datanode,
535 Flownode,
536}
537
538impl Display for SelectTarget {
539 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
540 match self {
541 SelectTarget::Datanode => write!(f, "datanode"),
542 SelectTarget::Flownode => write!(f, "flownode"),
543 }
544 }
545}
546
547#[derive(Clone)]
548pub struct SelectorContext {
549 pub peer_discovery: PeerDiscoveryRef,
550}
551
552pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
553pub type RegionStatAwareSelectorRef =
554 Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
555pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
556
557pub struct MetaStateHandler {
558 subscribe_manager: Option<SubscriptionManagerRef>,
559 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
560 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
561 leadership_change_notifier: LeadershipChangeNotifier,
562 mailbox: MailboxRef,
563 state: StateRef,
564}
565
566impl MetaStateHandler {
567 pub async fn on_leader_start(&self) {
568 self.state.write().unwrap().next_state(become_leader(false));
569
570 if let Err(e) = self.leader_cached_kv_backend.load().await {
571 error!(e; "Failed to load kv into leader cache kv store");
572 } else {
573 self.state.write().unwrap().next_state(become_leader(true));
574 }
575
576 self.leadership_change_notifier
577 .notify_on_leader_start()
578 .await;
579
580 self.greptimedb_telemetry_task.should_report(true);
581 }
582
583 pub async fn on_leader_stop(&self) {
584 self.state.write().unwrap().next_state(become_follower());
585
586 self.mailbox.reset().await;
589 self.leadership_change_notifier
590 .notify_on_leader_stop()
591 .await;
592
593 self.greptimedb_telemetry_task.should_report(false);
595
596 if let Some(sub_manager) = self.subscribe_manager.clone() {
597 info!("Leader changed, un_subscribe all");
598 if let Err(e) = sub_manager.unsubscribe_all() {
599 error!(e; "Failed to un_subscribe all");
600 }
601 }
602 }
603}
604
605pub struct Metasrv {
606 state: StateRef,
607 started: Arc<AtomicBool>,
608 start_time_ms: u64,
609 options: MetasrvOptions,
610 in_memory: ResettableKvBackendRef,
613 kv_backend: KvBackendRef,
614 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
615 meta_peer_client: MetaPeerClientRef,
616 selector: SelectorRef,
618 selector_ctx: SelectorContext,
619 flow_selector: SelectorRef,
621 handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
622 handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
623 election: Option<ElectionRef>,
624 procedure_manager: ProcedureManagerRef,
625 mailbox: MailboxRef,
626 ddl_manager: DdlManagerRef,
627 wal_provider: WalProviderRef,
628 table_metadata_manager: TableMetadataManagerRef,
629 runtime_switch_manager: RuntimeSwitchManagerRef,
630 memory_region_keeper: MemoryRegionKeeperRef,
631 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
632 region_migration_manager: RegionMigrationManagerRef,
633 region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
634 cache_invalidator: CacheInvalidatorRef,
635 leader_region_registry: LeaderRegionRegistryRef,
636 topic_stats_registry: TopicStatsRegistryRef,
637 wal_prune_ticker: Option<WalPruneTickerRef>,
638 region_flush_ticker: Option<RegionFlushTickerRef>,
639 table_id_allocator: ResourceIdAllocatorRef,
640 reconciliation_manager: ReconciliationManagerRef,
641 resource_stat: ResourceStatRef,
642 gc_ticker: Option<GcTickerRef>,
643
644 plugins: Plugins,
645}
646
647impl Metasrv {
648 pub async fn try_start(&self) -> Result<()> {
649 if self
650 .started
651 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
652 .is_err()
653 {
654 warn!("Metasrv already started");
655 return Ok(());
656 }
657
658 let handler_group_builder =
659 self.handler_group_builder
660 .lock()
661 .unwrap()
662 .take()
663 .context(error::UnexpectedSnafu {
664 violated: "expected heartbeat handler group builder",
665 })?;
666 *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
667
668 self.table_metadata_manager
670 .init()
671 .await
672 .context(InitMetadataSnafu)?;
673
674 if let Some(election) = self.election() {
675 let procedure_manager = self.procedure_manager.clone();
676 let in_memory = self.in_memory.clone();
677 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
678 let subscribe_manager = self.subscription_manager();
679 let mut rx = election.subscribe_leader_change();
680 let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
681 greptimedb_telemetry_task
682 .start()
683 .context(StartTelemetryTaskSnafu)?;
684
685 let mut leadership_change_notifier = LeadershipChangeNotifier::default();
687 leadership_change_notifier.add_listener(self.wal_provider.clone());
688 leadership_change_notifier
689 .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
690 leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
691 self.options.node_max_idle_time,
692 self.in_memory.clone(),
693 )));
694 if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
695 leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
696 }
697 if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
698 leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
699 }
700 if let Some(region_flush_trigger) = &self.region_flush_ticker {
701 leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
702 }
703 if let Some(gc_ticker) = &self.gc_ticker {
704 leadership_change_notifier.add_listener(gc_ticker.clone() as _);
705 }
706 if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
707 customizer.customize(&mut leadership_change_notifier);
708 }
709
710 let state_handler = MetaStateHandler {
711 greptimedb_telemetry_task,
712 subscribe_manager,
713 state: self.state.clone(),
714 leader_cached_kv_backend: leader_cached_kv_backend.clone(),
715 leadership_change_notifier,
716 mailbox: self.mailbox.clone(),
717 };
718 let _handle = common_runtime::spawn_global(async move {
719 loop {
720 match rx.recv().await {
721 Ok(msg) => {
722 in_memory.reset();
723 leader_cached_kv_backend.reset();
724 info!("Leader's cache has bean cleared on leader change: {msg}");
725 match msg {
726 LeaderChangeMessage::Elected(_) => {
727 state_handler.on_leader_start().await;
728 }
729 LeaderChangeMessage::StepDown(leader) => {
730 error!("Leader :{:?} step down", leader);
731
732 state_handler.on_leader_stop().await;
733 }
734 }
735 }
736 Err(RecvError::Closed) => {
737 error!("Not expected, is leader election loop still running?");
738 break;
739 }
740 Err(RecvError::Lagged(_)) => {
741 break;
742 }
743 }
744 }
745
746 state_handler.on_leader_stop().await;
747 });
748
749 {
751 let election = election.clone();
752 let started = self.started.clone();
753 let node_info = self.node_info();
754 let _handle = common_runtime::spawn_global(async move {
755 while started.load(Ordering::Acquire) {
756 let res = election.register_candidate(&node_info).await;
757 if let Err(e) = res {
758 warn!(e; "Metasrv register candidate error");
759 }
760 }
761 });
762 }
763
764 {
766 let election = election.clone();
767 let started = self.started.clone();
768 let _handle = common_runtime::spawn_global(async move {
769 while started.load(Ordering::Acquire) {
770 let res = election.campaign().await;
771 if let Err(e) = res {
772 warn!(e; "Metasrv election error");
773 }
774 election.reset_campaign().await;
775 info!("Metasrv re-initiate election");
776 }
777 info!("Metasrv stopped");
778 });
779 }
780 } else {
781 warn!(
782 "Ensure only one instance of Metasrv is running, as there is no election service."
783 );
784
785 if let Err(e) = self.wal_provider.start().await {
786 error!(e; "Failed to start wal provider");
787 }
788 self.leader_cached_kv_backend
790 .load()
791 .await
792 .context(KvBackendSnafu)?;
793 self.procedure_manager
794 .start()
795 .await
796 .context(StartProcedureManagerSnafu)?;
797 }
798
799 info!("Metasrv started");
800
801 Ok(())
802 }
803
804 pub async fn shutdown(&self) -> Result<()> {
805 if self
806 .started
807 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
808 .is_err()
809 {
810 warn!("Metasrv already stopped");
811 return Ok(());
812 }
813
814 self.procedure_manager
815 .stop()
816 .await
817 .context(StopProcedureManagerSnafu)?;
818
819 info!("Metasrv stopped");
820
821 Ok(())
822 }
823
824 pub fn start_time_ms(&self) -> u64 {
825 self.start_time_ms
826 }
827
828 pub fn resource_stat(&self) -> &ResourceStatRef {
829 &self.resource_stat
830 }
831
832 pub fn node_info(&self) -> MetasrvNodeInfo {
833 let build_info = common_version::build_info();
834 MetasrvNodeInfo {
835 addr: self.options().grpc.server_addr.clone(),
836 version: build_info.version.to_string(),
837 git_commit: build_info.commit_short.to_string(),
838 start_time_ms: self.start_time_ms(),
839 total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
840 total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
841 cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
842 memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
843 hostname: hostname::get()
844 .unwrap_or_default()
845 .to_string_lossy()
846 .to_string(),
847 }
848 }
849
850 pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
853 discovery::utils::alive_datanode(
854 &DefaultSystemTimer,
855 self.meta_peer_client.as_ref(),
856 peer_id,
857 default_distributed_time_constants().datanode_lease,
858 )
859 .await
860 }
861
862 pub fn options(&self) -> &MetasrvOptions {
863 &self.options
864 }
865
866 pub fn in_memory(&self) -> &ResettableKvBackendRef {
867 &self.in_memory
868 }
869
870 pub fn kv_backend(&self) -> &KvBackendRef {
871 &self.kv_backend
872 }
873
874 pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
875 &self.meta_peer_client
876 }
877
878 pub fn selector(&self) -> &SelectorRef {
879 &self.selector
880 }
881
882 pub fn selector_ctx(&self) -> &SelectorContext {
883 &self.selector_ctx
884 }
885
886 pub fn flow_selector(&self) -> &SelectorRef {
887 &self.flow_selector
888 }
889
890 pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
891 self.handler_group.read().unwrap().clone()
892 }
893
894 pub fn election(&self) -> Option<&ElectionRef> {
895 self.election.as_ref()
896 }
897
898 pub fn mailbox(&self) -> &MailboxRef {
899 &self.mailbox
900 }
901
902 pub fn ddl_manager(&self) -> &DdlManagerRef {
903 &self.ddl_manager
904 }
905
906 pub fn procedure_manager(&self) -> &ProcedureManagerRef {
907 &self.procedure_manager
908 }
909
910 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
911 &self.table_metadata_manager
912 }
913
914 pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
915 &self.runtime_switch_manager
916 }
917
918 pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
919 &self.memory_region_keeper
920 }
921
922 pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
923 &self.region_migration_manager
924 }
925
926 pub fn publish(&self) -> Option<PublisherRef> {
927 self.plugins.get::<PublisherRef>()
928 }
929
930 pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
931 self.plugins.get::<SubscriptionManagerRef>()
932 }
933
934 pub fn table_id_allocator(&self) -> &ResourceIdAllocatorRef {
935 &self.table_id_allocator
936 }
937
938 pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
939 &self.reconciliation_manager
940 }
941
942 pub fn plugins(&self) -> &Plugins {
943 &self.plugins
944 }
945
946 pub fn started(&self) -> Arc<AtomicBool> {
947 self.started.clone()
948 }
949
950 pub fn gc_ticker(&self) -> Option<GcTickerRef> {
951 self.gc_ticker.as_ref().cloned()
952 }
953
954 #[inline]
955 pub fn new_ctx(&self) -> Context {
956 let server_addr = self.options().grpc.server_addr.clone();
957 let in_memory = self.in_memory.clone();
958 let kv_backend = self.kv_backend.clone();
959 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
960 let meta_peer_client = self.meta_peer_client.clone();
961 let mailbox = self.mailbox.clone();
962 let election = self.election.clone();
963 let table_metadata_manager = self.table_metadata_manager.clone();
964 let cache_invalidator = self.cache_invalidator.clone();
965 let leader_region_registry = self.leader_region_registry.clone();
966 let topic_stats_registry = self.topic_stats_registry.clone();
967
968 Context {
969 server_addr,
970 in_memory,
971 kv_backend,
972 leader_cached_kv_backend,
973 meta_peer_client,
974 mailbox,
975 election,
976 is_infancy: false,
977 table_metadata_manager,
978 cache_invalidator,
979 leader_region_registry,
980 topic_stats_registry,
981 heartbeat_interval: self.options().heartbeat_interval,
982 is_handshake: false,
983 }
984 }
985}
986
987#[cfg(test)]
988mod tests {
989 use crate::metasrv::MetasrvNodeInfo;
990
991 #[test]
992 fn test_deserialize_metasrv_node_info() {
993 let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
994 let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
995 assert_eq!(node_info.addr, "127.0.0.1:4002");
996 assert_eq!(node_info.version, "0.1.0");
997 assert_eq!(node_info.git_commit, "1234567890");
998 assert_eq!(node_info.start_time_ms, 1715145600);
999 }
1000}