1pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use clap::ValueEnum;
23use common_base::Plugins;
24use common_base::readable_size::ReadableSize;
25use common_config::{Configurable, DEFAULT_DATA_HOME};
26use common_event_recorder::EventRecorderOptions;
27use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
28use common_meta::cache_invalidator::CacheInvalidatorRef;
29use common_meta::ddl_manager::DdlManagerRef;
30use common_meta::distributed_time_constants::{self, default_distributed_time_constants};
31use common_meta::key::TableMetadataManagerRef;
32use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
33use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
34use common_meta::leadership_notifier::{
35 LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
36};
37use common_meta::node_expiry_listener::NodeExpiryListener;
38use common_meta::peer::{Peer, PeerDiscoveryRef};
39use common_meta::reconciliation::manager::ReconciliationManagerRef;
40use common_meta::region_keeper::MemoryRegionKeeperRef;
41use common_meta::region_registry::LeaderRegionRegistryRef;
42use common_meta::sequence::SequenceRef;
43use common_meta::stats::topic::TopicStatsRegistryRef;
44use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
45use common_options::datanode::DatanodeClientOptions;
46use common_options::memory::MemoryOptions;
47use common_procedure::ProcedureManagerRef;
48use common_procedure::options::ProcedureConfig;
49use common_stat::ResourceStatRef;
50use common_telemetry::logging::{LoggingOptions, TracingOptions};
51use common_telemetry::{error, info, warn};
52use common_time::util::DefaultSystemTimer;
53use common_wal::config::MetasrvWalConfig;
54use serde::{Deserialize, Serialize};
55use servers::grpc::GrpcOptions;
56use servers::http::HttpOptions;
57use servers::tls::TlsOption;
58use snafu::{OptionExt, ResultExt};
59use store_api::storage::RegionId;
60use tokio::sync::broadcast::error::RecvError;
61
62use crate::cluster::MetaPeerClientRef;
63use crate::discovery;
64use crate::election::{Election, LeaderChangeMessage};
65use crate::error::{
66 self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
67 StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
68};
69use crate::failure_detector::PhiAccrualFailureDetectorOptions;
70use crate::gc::{GcSchedulerOptions, GcTickerRef};
71use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
72use crate::procedure::ProcedureManagerListenerAdapter;
73use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
74use crate::procedure::wal_prune::manager::WalPruneTickerRef;
75use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
76use crate::region::flush_trigger::RegionFlushTickerRef;
77use crate::region::supervisor::RegionSupervisorTickerRef;
78use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
79use crate::service::mailbox::MailboxRef;
80use crate::service::store::cached_kv::LeaderCachedKvBackend;
81use crate::state::{StateRef, become_follower, become_leader};
82
83pub const TABLE_ID_SEQ: &str = "table_id";
84pub const FLOW_ID_SEQ: &str = "flow_id";
85pub const METASRV_DATA_DIR: &str = "metasrv";
86
87#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
89#[serde(rename_all = "snake_case")]
90pub enum BackendImpl {
91 #[default]
93 EtcdStore,
94 MemoryStore,
96 #[cfg(feature = "pg_kvbackend")]
97 PostgresStore,
99 #[cfg(feature = "mysql_kvbackend")]
100 MysqlStore,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106pub struct StatsPersistenceOptions {
107 #[serde(with = "humantime_serde")]
109 pub ttl: Duration,
110 #[serde(with = "humantime_serde")]
112 pub interval: Duration,
113}
114
115impl Default for StatsPersistenceOptions {
116 fn default() -> Self {
117 Self {
118 ttl: Duration::ZERO,
119 interval: Duration::from_mins(10),
120 }
121 }
122}
123
124#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
125#[serde(default)]
126pub struct BackendClientOptions {
127 #[serde(with = "humantime_serde")]
128 pub keep_alive_timeout: Duration,
129 #[serde(with = "humantime_serde")]
130 pub keep_alive_interval: Duration,
131 #[serde(with = "humantime_serde")]
132 pub connect_timeout: Duration,
133}
134
135impl Default for BackendClientOptions {
136 fn default() -> Self {
137 Self {
138 keep_alive_interval: Duration::from_secs(10),
139 keep_alive_timeout: Duration::from_secs(3),
140 connect_timeout: Duration::from_secs(3),
141 }
142 }
143}
144
145#[derive(Clone, PartialEq, Serialize, Deserialize)]
146#[serde(default)]
147pub struct MetasrvOptions {
148 #[deprecated(note = "Use grpc.bind_addr instead")]
150 pub bind_addr: String,
151 #[deprecated(note = "Use grpc.server_addr instead")]
153 pub server_addr: String,
154 pub store_addrs: Vec<String>,
156 #[serde(default)]
159 pub backend_tls: Option<TlsOption>,
160 #[serde(default)]
163 pub backend_client: BackendClientOptions,
164 pub selector: SelectorType,
166 pub enable_region_failover: bool,
168 #[serde(with = "humantime_serde")]
173 pub heartbeat_interval: Duration,
174 #[serde(with = "humantime_serde")]
178 pub region_failure_detector_initialization_delay: Duration,
179 pub allow_region_failover_on_local_wal: bool,
184 pub grpc: GrpcOptions,
185 pub http: HttpOptions,
187 pub logging: LoggingOptions,
189 pub procedure: ProcedureConfig,
191 pub failure_detector: PhiAccrualFailureDetectorOptions,
193 pub datanode: DatanodeClientOptions,
195 pub enable_telemetry: bool,
197 pub data_home: String,
199 pub wal: MetasrvWalConfig,
201 pub store_key_prefix: String,
204 pub max_txn_ops: usize,
215 pub flush_stats_factor: usize,
219 pub tracing: TracingOptions,
221 pub memory: MemoryOptions,
223 pub backend: BackendImpl,
225 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
226 pub meta_table_name: String,
228 #[cfg(feature = "pg_kvbackend")]
229 pub meta_election_lock_id: u64,
231 #[cfg(feature = "pg_kvbackend")]
232 pub meta_schema_name: Option<String>,
234 #[cfg(feature = "pg_kvbackend")]
235 pub auto_create_schema: bool,
237 #[serde(with = "humantime_serde")]
238 pub node_max_idle_time: Duration,
239 pub event_recorder: EventRecorderOptions,
241 pub stats_persistence: StatsPersistenceOptions,
243 pub gc: GcSchedulerOptions,
245}
246
247impl fmt::Debug for MetasrvOptions {
248 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
249 let mut debug_struct = f.debug_struct("MetasrvOptions");
250 debug_struct
251 .field("store_addrs", &self.sanitize_store_addrs())
252 .field("backend_tls", &self.backend_tls)
253 .field("selector", &self.selector)
254 .field("enable_region_failover", &self.enable_region_failover)
255 .field(
256 "allow_region_failover_on_local_wal",
257 &self.allow_region_failover_on_local_wal,
258 )
259 .field("grpc", &self.grpc)
260 .field("http", &self.http)
261 .field("logging", &self.logging)
262 .field("procedure", &self.procedure)
263 .field("failure_detector", &self.failure_detector)
264 .field("datanode", &self.datanode)
265 .field("enable_telemetry", &self.enable_telemetry)
266 .field("data_home", &self.data_home)
267 .field("wal", &self.wal)
268 .field("store_key_prefix", &self.store_key_prefix)
269 .field("max_txn_ops", &self.max_txn_ops)
270 .field("flush_stats_factor", &self.flush_stats_factor)
271 .field("tracing", &self.tracing)
272 .field("backend", &self.backend)
273 .field("event_recorder", &self.event_recorder)
274 .field("stats_persistence", &self.stats_persistence)
275 .field("heartbeat_interval", &self.heartbeat_interval)
276 .field("backend_client", &self.backend_client);
277
278 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
279 debug_struct.field("meta_table_name", &self.meta_table_name);
280
281 #[cfg(feature = "pg_kvbackend")]
282 debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
283 #[cfg(feature = "pg_kvbackend")]
284 debug_struct.field("meta_schema_name", &self.meta_schema_name);
285
286 debug_struct
287 .field("node_max_idle_time", &self.node_max_idle_time)
288 .finish()
289 }
290}
291
292const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
293
294impl Default for MetasrvOptions {
295 fn default() -> Self {
296 Self {
297 #[allow(deprecated)]
298 bind_addr: String::new(),
299 #[allow(deprecated)]
300 server_addr: String::new(),
301 store_addrs: vec!["127.0.0.1:2379".to_string()],
302 backend_tls: Some(TlsOption::prefer()),
303 selector: SelectorType::default(),
304 enable_region_failover: false,
305 heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
306 region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
307 allow_region_failover_on_local_wal: false,
308 grpc: GrpcOptions {
309 bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
310 ..Default::default()
311 },
312 http: HttpOptions::default(),
313 logging: LoggingOptions::default(),
314 procedure: ProcedureConfig {
315 max_retry_times: 12,
316 retry_delay: Duration::from_millis(500),
317 max_metadata_value_size: Some(ReadableSize::kb(1500)),
320 max_running_procedures: 128,
321 },
322 failure_detector: PhiAccrualFailureDetectorOptions::default(),
323 datanode: DatanodeClientOptions::default(),
324 enable_telemetry: true,
325 data_home: DEFAULT_DATA_HOME.to_string(),
326 wal: MetasrvWalConfig::default(),
327 store_key_prefix: String::new(),
328 max_txn_ops: 128,
329 flush_stats_factor: 3,
330 tracing: TracingOptions::default(),
331 memory: MemoryOptions::default(),
332 backend: BackendImpl::EtcdStore,
333 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
334 meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
335 #[cfg(feature = "pg_kvbackend")]
336 meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
337 #[cfg(feature = "pg_kvbackend")]
338 meta_schema_name: None,
339 #[cfg(feature = "pg_kvbackend")]
340 auto_create_schema: true,
341 node_max_idle_time: Duration::from_secs(24 * 60 * 60),
342 event_recorder: EventRecorderOptions::default(),
343 stats_persistence: StatsPersistenceOptions::default(),
344 gc: GcSchedulerOptions::default(),
345 backend_client: BackendClientOptions::default(),
346 }
347 }
348}
349
350impl Configurable for MetasrvOptions {
351 fn env_list_keys() -> Option<&'static [&'static str]> {
352 Some(&["wal.broker_endpoints", "store_addrs"])
353 }
354}
355
356impl MetasrvOptions {
357 fn sanitize_store_addrs(&self) -> Vec<String> {
358 self.store_addrs
359 .iter()
360 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
361 .collect()
362 }
363}
364
365pub struct MetasrvInfo {
366 pub server_addr: String,
367}
368#[derive(Clone)]
369pub struct Context {
370 pub server_addr: String,
371 pub in_memory: ResettableKvBackendRef,
372 pub kv_backend: KvBackendRef,
373 pub leader_cached_kv_backend: ResettableKvBackendRef,
374 pub meta_peer_client: MetaPeerClientRef,
375 pub mailbox: MailboxRef,
376 pub election: Option<ElectionRef>,
377 pub is_infancy: bool,
378 pub table_metadata_manager: TableMetadataManagerRef,
379 pub cache_invalidator: CacheInvalidatorRef,
380 pub leader_region_registry: LeaderRegionRegistryRef,
381 pub topic_stats_registry: TopicStatsRegistryRef,
382}
383
384impl Context {
385 pub fn reset_in_memory(&self) {
386 self.in_memory.reset();
387 self.leader_region_registry.reset();
388 }
389}
390
391pub struct LeaderValue(pub String);
393
394impl<T: AsRef<[u8]>> From<T> for LeaderValue {
395 fn from(value: T) -> Self {
396 let string = String::from_utf8_lossy(value.as_ref());
397 Self(string.to_string())
398 }
399}
400
401#[derive(Debug, Clone, Serialize, Deserialize)]
402pub struct MetasrvNodeInfo {
403 pub addr: String,
405 pub version: String,
407 pub git_commit: String,
409 pub start_time_ms: u64,
411 #[serde(default)]
413 pub total_cpu_millicores: i64,
414 #[serde(default)]
416 pub total_memory_bytes: i64,
417 #[serde(default)]
419 pub cpu_usage_millicores: i64,
420 #[serde(default)]
422 pub memory_usage_bytes: i64,
423 #[serde(default)]
425 pub hostname: String,
426}
427
428#[allow(deprecated)]
430impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
431 fn from(node_info: MetasrvNodeInfo) -> Self {
432 Self {
433 peer: Some(api::v1::meta::Peer {
434 addr: node_info.addr,
435 ..Default::default()
436 }),
437 version: node_info.version.clone(),
440 git_commit: node_info.git_commit.clone(),
441 start_time_ms: node_info.start_time_ms,
442 cpus: node_info.total_cpu_millicores as u32,
443 memory_bytes: node_info.total_memory_bytes as u64,
444 info: Some(api::v1::meta::NodeInfo {
446 version: node_info.version,
447 git_commit: node_info.git_commit,
448 start_time_ms: node_info.start_time_ms,
449 total_cpu_millicores: node_info.total_cpu_millicores,
450 total_memory_bytes: node_info.total_memory_bytes,
451 cpu_usage_millicores: node_info.cpu_usage_millicores,
452 memory_usage_bytes: node_info.memory_usage_bytes,
453 cpus: node_info.total_cpu_millicores as u32,
454 memory_bytes: node_info.total_memory_bytes as u64,
455 hostname: node_info.hostname,
456 }),
457 }
458 }
459}
460
461#[derive(Clone, Copy)]
462pub enum SelectTarget {
463 Datanode,
464 Flownode,
465}
466
467impl Display for SelectTarget {
468 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
469 match self {
470 SelectTarget::Datanode => write!(f, "datanode"),
471 SelectTarget::Flownode => write!(f, "flownode"),
472 }
473 }
474}
475
476#[derive(Clone)]
477pub struct SelectorContext {
478 pub peer_discovery: PeerDiscoveryRef,
479}
480
481pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
482pub type RegionStatAwareSelectorRef =
483 Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
484pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
485
486pub struct MetaStateHandler {
487 subscribe_manager: Option<SubscriptionManagerRef>,
488 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
489 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
490 leadership_change_notifier: LeadershipChangeNotifier,
491 mailbox: MailboxRef,
492 state: StateRef,
493}
494
495impl MetaStateHandler {
496 pub async fn on_leader_start(&self) {
497 self.state.write().unwrap().next_state(become_leader(false));
498
499 if let Err(e) = self.leader_cached_kv_backend.load().await {
500 error!(e; "Failed to load kv into leader cache kv store");
501 } else {
502 self.state.write().unwrap().next_state(become_leader(true));
503 }
504
505 self.leadership_change_notifier
506 .notify_on_leader_start()
507 .await;
508
509 self.greptimedb_telemetry_task.should_report(true);
510 }
511
512 pub async fn on_leader_stop(&self) {
513 self.state.write().unwrap().next_state(become_follower());
514
515 self.mailbox.reset().await;
518 self.leadership_change_notifier
519 .notify_on_leader_stop()
520 .await;
521
522 self.greptimedb_telemetry_task.should_report(false);
524
525 if let Some(sub_manager) = self.subscribe_manager.clone() {
526 info!("Leader changed, un_subscribe all");
527 if let Err(e) = sub_manager.unsubscribe_all() {
528 error!(e; "Failed to un_subscribe all");
529 }
530 }
531 }
532}
533
534pub struct Metasrv {
535 state: StateRef,
536 started: Arc<AtomicBool>,
537 start_time_ms: u64,
538 options: MetasrvOptions,
539 in_memory: ResettableKvBackendRef,
542 kv_backend: KvBackendRef,
543 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
544 meta_peer_client: MetaPeerClientRef,
545 selector: SelectorRef,
547 selector_ctx: SelectorContext,
548 flow_selector: SelectorRef,
550 handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
551 handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
552 election: Option<ElectionRef>,
553 procedure_manager: ProcedureManagerRef,
554 mailbox: MailboxRef,
555 ddl_manager: DdlManagerRef,
556 wal_options_allocator: WalOptionsAllocatorRef,
557 table_metadata_manager: TableMetadataManagerRef,
558 runtime_switch_manager: RuntimeSwitchManagerRef,
559 memory_region_keeper: MemoryRegionKeeperRef,
560 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
561 region_migration_manager: RegionMigrationManagerRef,
562 region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
563 cache_invalidator: CacheInvalidatorRef,
564 leader_region_registry: LeaderRegionRegistryRef,
565 topic_stats_registry: TopicStatsRegistryRef,
566 wal_prune_ticker: Option<WalPruneTickerRef>,
567 region_flush_ticker: Option<RegionFlushTickerRef>,
568 table_id_sequence: SequenceRef,
569 reconciliation_manager: ReconciliationManagerRef,
570 resource_stat: ResourceStatRef,
571 gc_ticker: Option<GcTickerRef>,
572
573 plugins: Plugins,
574}
575
576impl Metasrv {
577 pub async fn try_start(&self) -> Result<()> {
578 if self
579 .started
580 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
581 .is_err()
582 {
583 warn!("Metasrv already started");
584 return Ok(());
585 }
586
587 let handler_group_builder =
588 self.handler_group_builder
589 .lock()
590 .unwrap()
591 .take()
592 .context(error::UnexpectedSnafu {
593 violated: "expected heartbeat handler group builder",
594 })?;
595 *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
596
597 self.table_metadata_manager
599 .init()
600 .await
601 .context(InitMetadataSnafu)?;
602
603 if let Some(election) = self.election() {
604 let procedure_manager = self.procedure_manager.clone();
605 let in_memory = self.in_memory.clone();
606 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
607 let subscribe_manager = self.subscription_manager();
608 let mut rx = election.subscribe_leader_change();
609 let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
610 greptimedb_telemetry_task
611 .start()
612 .context(StartTelemetryTaskSnafu)?;
613
614 let mut leadership_change_notifier = LeadershipChangeNotifier::default();
616 leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
617 leadership_change_notifier
618 .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
619 leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
620 self.options.node_max_idle_time,
621 self.in_memory.clone(),
622 )));
623 if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
624 leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
625 }
626 if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
627 leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
628 }
629 if let Some(region_flush_trigger) = &self.region_flush_ticker {
630 leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
631 }
632 if let Some(gc_ticker) = &self.gc_ticker {
633 leadership_change_notifier.add_listener(gc_ticker.clone() as _);
634 }
635 if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
636 customizer.customize(&mut leadership_change_notifier);
637 }
638
639 let state_handler = MetaStateHandler {
640 greptimedb_telemetry_task,
641 subscribe_manager,
642 state: self.state.clone(),
643 leader_cached_kv_backend: leader_cached_kv_backend.clone(),
644 leadership_change_notifier,
645 mailbox: self.mailbox.clone(),
646 };
647 let _handle = common_runtime::spawn_global(async move {
648 loop {
649 match rx.recv().await {
650 Ok(msg) => {
651 in_memory.reset();
652 leader_cached_kv_backend.reset();
653 info!("Leader's cache has bean cleared on leader change: {msg}");
654 match msg {
655 LeaderChangeMessage::Elected(_) => {
656 state_handler.on_leader_start().await;
657 }
658 LeaderChangeMessage::StepDown(leader) => {
659 error!("Leader :{:?} step down", leader);
660
661 state_handler.on_leader_stop().await;
662 }
663 }
664 }
665 Err(RecvError::Closed) => {
666 error!("Not expected, is leader election loop still running?");
667 break;
668 }
669 Err(RecvError::Lagged(_)) => {
670 break;
671 }
672 }
673 }
674
675 state_handler.on_leader_stop().await;
676 });
677
678 {
680 let election = election.clone();
681 let started = self.started.clone();
682 let node_info = self.node_info();
683 let _handle = common_runtime::spawn_global(async move {
684 while started.load(Ordering::Acquire) {
685 let res = election.register_candidate(&node_info).await;
686 if let Err(e) = res {
687 warn!(e; "Metasrv register candidate error");
688 }
689 }
690 });
691 }
692
693 {
695 let election = election.clone();
696 let started = self.started.clone();
697 let _handle = common_runtime::spawn_global(async move {
698 while started.load(Ordering::Acquire) {
699 let res = election.campaign().await;
700 if let Err(e) = res {
701 warn!(e; "Metasrv election error");
702 }
703 election.reset_campaign().await;
704 info!("Metasrv re-initiate election");
705 }
706 info!("Metasrv stopped");
707 });
708 }
709 } else {
710 warn!(
711 "Ensure only one instance of Metasrv is running, as there is no election service."
712 );
713
714 if let Err(e) = self.wal_options_allocator.start().await {
715 error!(e; "Failed to start wal options allocator");
716 }
717 self.leader_cached_kv_backend
719 .load()
720 .await
721 .context(KvBackendSnafu)?;
722 self.procedure_manager
723 .start()
724 .await
725 .context(StartProcedureManagerSnafu)?;
726 }
727
728 info!("Metasrv started");
729
730 Ok(())
731 }
732
733 pub async fn shutdown(&self) -> Result<()> {
734 if self
735 .started
736 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
737 .is_err()
738 {
739 warn!("Metasrv already stopped");
740 return Ok(());
741 }
742
743 self.procedure_manager
744 .stop()
745 .await
746 .context(StopProcedureManagerSnafu)?;
747
748 info!("Metasrv stopped");
749
750 Ok(())
751 }
752
753 pub fn start_time_ms(&self) -> u64 {
754 self.start_time_ms
755 }
756
757 pub fn resource_stat(&self) -> &ResourceStatRef {
758 &self.resource_stat
759 }
760
761 pub fn node_info(&self) -> MetasrvNodeInfo {
762 let build_info = common_version::build_info();
763 MetasrvNodeInfo {
764 addr: self.options().grpc.server_addr.clone(),
765 version: build_info.version.to_string(),
766 git_commit: build_info.commit_short.to_string(),
767 start_time_ms: self.start_time_ms(),
768 total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
769 total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
770 cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
771 memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
772 hostname: hostname::get()
773 .unwrap_or_default()
774 .to_string_lossy()
775 .to_string(),
776 }
777 }
778
779 pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
782 discovery::utils::alive_datanode(
783 &DefaultSystemTimer,
784 self.meta_peer_client.as_ref(),
785 peer_id,
786 default_distributed_time_constants().datanode_lease,
787 )
788 .await
789 }
790
791 pub fn options(&self) -> &MetasrvOptions {
792 &self.options
793 }
794
795 pub fn in_memory(&self) -> &ResettableKvBackendRef {
796 &self.in_memory
797 }
798
799 pub fn kv_backend(&self) -> &KvBackendRef {
800 &self.kv_backend
801 }
802
803 pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
804 &self.meta_peer_client
805 }
806
807 pub fn selector(&self) -> &SelectorRef {
808 &self.selector
809 }
810
811 pub fn selector_ctx(&self) -> &SelectorContext {
812 &self.selector_ctx
813 }
814
815 pub fn flow_selector(&self) -> &SelectorRef {
816 &self.flow_selector
817 }
818
819 pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
820 self.handler_group.read().unwrap().clone()
821 }
822
823 pub fn election(&self) -> Option<&ElectionRef> {
824 self.election.as_ref()
825 }
826
827 pub fn mailbox(&self) -> &MailboxRef {
828 &self.mailbox
829 }
830
831 pub fn ddl_manager(&self) -> &DdlManagerRef {
832 &self.ddl_manager
833 }
834
835 pub fn procedure_manager(&self) -> &ProcedureManagerRef {
836 &self.procedure_manager
837 }
838
839 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
840 &self.table_metadata_manager
841 }
842
843 pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
844 &self.runtime_switch_manager
845 }
846
847 pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
848 &self.memory_region_keeper
849 }
850
851 pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
852 &self.region_migration_manager
853 }
854
855 pub fn publish(&self) -> Option<PublisherRef> {
856 self.plugins.get::<PublisherRef>()
857 }
858
859 pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
860 self.plugins.get::<SubscriptionManagerRef>()
861 }
862
863 pub fn table_id_sequence(&self) -> &SequenceRef {
864 &self.table_id_sequence
865 }
866
867 pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
868 &self.reconciliation_manager
869 }
870
871 pub fn plugins(&self) -> &Plugins {
872 &self.plugins
873 }
874
875 pub fn started(&self) -> Arc<AtomicBool> {
876 self.started.clone()
877 }
878
879 #[inline]
880 pub fn new_ctx(&self) -> Context {
881 let server_addr = self.options().grpc.server_addr.clone();
882 let in_memory = self.in_memory.clone();
883 let kv_backend = self.kv_backend.clone();
884 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
885 let meta_peer_client = self.meta_peer_client.clone();
886 let mailbox = self.mailbox.clone();
887 let election = self.election.clone();
888 let table_metadata_manager = self.table_metadata_manager.clone();
889 let cache_invalidator = self.cache_invalidator.clone();
890 let leader_region_registry = self.leader_region_registry.clone();
891 let topic_stats_registry = self.topic_stats_registry.clone();
892
893 Context {
894 server_addr,
895 in_memory,
896 kv_backend,
897 leader_cached_kv_backend,
898 meta_peer_client,
899 mailbox,
900 election,
901 is_infancy: false,
902 table_metadata_manager,
903 cache_invalidator,
904 leader_region_registry,
905 topic_stats_registry,
906 }
907 }
908}
909
910#[cfg(test)]
911mod tests {
912 use crate::metasrv::MetasrvNodeInfo;
913
914 #[test]
915 fn test_deserialize_metasrv_node_info() {
916 let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
917 let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
918 assert_eq!(node_info.addr, "127.0.0.1:4002");
919 assert_eq!(node_info.version, "0.1.0");
920 assert_eq!(node_info.git_commit, "1234567890");
921 assert_eq!(node_info.start_time_ms, 1715145600);
922 }
923}