1pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use api::v1::meta::{HeartbeatConfig, Role};
23use clap::ValueEnum;
24use common_base::Plugins;
25use common_base::readable_size::ReadableSize;
26use common_config::{Configurable, DEFAULT_DATA_HOME};
27use common_event_recorder::EventRecorderOptions;
28use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
29use common_meta::cache_invalidator::CacheInvalidatorRef;
30use common_meta::ddl::allocator::resource_id::ResourceIdAllocatorRef;
31use common_meta::ddl_manager::DdlManagerRef;
32use common_meta::distributed_time_constants::{
33 self, BASE_HEARTBEAT_INTERVAL, default_distributed_time_constants, frontend_heartbeat_interval,
34};
35use common_meta::election::LeaderChangeMessage;
36pub use common_meta::election::{ElectionRef, MetasrvNodeInfo};
37use common_meta::key::TableMetadataManagerRef;
38use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
39use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
40use common_meta::leadership_notifier::{
41 LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
42};
43use common_meta::node_expiry_listener::NodeExpiryListener;
44use common_meta::peer::{Peer, PeerDiscoveryRef};
45use common_meta::reconciliation::manager::ReconciliationManagerRef;
46use common_meta::region_keeper::MemoryRegionKeeperRef;
47use common_meta::region_registry::LeaderRegionRegistryRef;
48use common_meta::stats::topic::TopicStatsRegistryRef;
49use common_meta::wal_provider::WalProviderRef;
50use common_options::datanode::DatanodeClientOptions;
51use common_options::memory::MemoryOptions;
52use common_procedure::ProcedureManagerRef;
53use common_procedure::options::ProcedureConfig;
54use common_stat::ResourceStatRef;
55use common_telemetry::logging::{LoggingOptions, TracingOptions};
56use common_telemetry::{error, info, warn};
57use common_time::util::DefaultSystemTimer;
58use common_wal::config::MetasrvWalConfig;
59use serde::{Deserialize, Serialize};
60use servers::grpc::GrpcOptions;
61use servers::http::HttpOptions;
62use servers::tls::TlsOption;
63use snafu::{OptionExt, ResultExt};
64use store_api::storage::RegionId;
65use tokio::sync::broadcast::error::RecvError;
66
67use crate::cluster::MetaPeerClientRef;
68use crate::discovery;
69use crate::error::{
70 self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
71 StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
72};
73use crate::failure_detector::PhiAccrualFailureDetectorOptions;
74use crate::gc::{GcSchedulerOptions, GcTickerRef};
75use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
76use crate::procedure::ProcedureManagerListenerAdapter;
77use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
78use crate::procedure::wal_prune::manager::WalPruneTickerRef;
79use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
80use crate::region::flush_trigger::RegionFlushTickerRef;
81use crate::region::supervisor::RegionSupervisorTickerRef;
82use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
83use crate::service::mailbox::MailboxRef;
84use crate::service::store::cached_kv::LeaderCachedKvBackend;
85use crate::state::{StateRef, become_follower, become_leader};
86
87pub const TABLE_ID_SEQ: &str = "table_id";
88pub const FLOW_ID_SEQ: &str = "flow_id";
89pub const METASRV_DATA_DIR: &str = "metasrv";
90
91#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
93#[serde(rename_all = "snake_case")]
94pub enum BackendImpl {
95 #[default]
97 EtcdStore,
98 MemoryStore,
100 #[cfg(feature = "pg_kvbackend")]
101 PostgresStore,
103 #[cfg(feature = "mysql_kvbackend")]
104 MysqlStore,
106}
107
108#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
110pub struct StatsPersistenceOptions {
111 #[serde(with = "humantime_serde")]
113 pub ttl: Duration,
114 #[serde(with = "humantime_serde")]
116 pub interval: Duration,
117}
118
119impl Default for StatsPersistenceOptions {
120 fn default() -> Self {
121 Self {
122 ttl: Duration::ZERO,
123 interval: Duration::from_mins(10),
124 }
125 }
126}
127
128#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
130#[serde(default)]
131pub struct HeartbeatOptions {
132 #[serde(with = "humantime_serde")]
134 pub interval: Duration,
135 #[serde(with = "humantime_serde")]
137 pub retry_interval: Duration,
138}
139
140impl Default for HeartbeatOptions {
141 fn default() -> Self {
142 Self {
143 interval: BASE_HEARTBEAT_INTERVAL,
144 retry_interval: BASE_HEARTBEAT_INTERVAL,
145 }
146 }
147}
148
149impl HeartbeatOptions {
150 pub fn datanode_from(base_interval: Duration) -> Self {
151 Self {
152 interval: base_interval,
153 retry_interval: base_interval,
154 }
155 }
156
157 pub fn frontend_from(base_interval: Duration) -> Self {
158 Self {
159 interval: frontend_heartbeat_interval(base_interval),
160 retry_interval: base_interval,
161 }
162 }
163
164 pub fn flownode_from(base_interval: Duration) -> Self {
165 Self {
166 interval: base_interval,
167 retry_interval: base_interval,
168 }
169 }
170}
171
172impl From<HeartbeatOptions> for HeartbeatConfig {
173 fn from(opts: HeartbeatOptions) -> Self {
174 Self {
175 heartbeat_interval_ms: opts.interval.as_millis() as u64,
176 retry_interval_ms: opts.retry_interval.as_millis() as u64,
177 }
178 }
179}
180
181#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
182#[serde(default)]
183pub struct BackendClientOptions {
184 #[serde(with = "humantime_serde")]
185 pub keep_alive_timeout: Duration,
186 #[serde(with = "humantime_serde")]
187 pub keep_alive_interval: Duration,
188 #[serde(with = "humantime_serde")]
189 pub connect_timeout: Duration,
190}
191
192impl Default for BackendClientOptions {
193 fn default() -> Self {
194 Self {
195 keep_alive_interval: Duration::from_secs(10),
196 keep_alive_timeout: Duration::from_secs(3),
197 connect_timeout: Duration::from_secs(3),
198 }
199 }
200}
201
202#[derive(Clone, PartialEq, Serialize, Deserialize)]
203#[serde(default)]
204pub struct MetasrvOptions {
205 #[deprecated(note = "Use grpc.bind_addr instead")]
207 pub bind_addr: String,
208 #[deprecated(note = "Use grpc.server_addr instead")]
210 pub server_addr: String,
211 pub store_addrs: Vec<String>,
213 #[serde(default)]
216 pub backend_tls: Option<TlsOption>,
217 #[serde(default)]
220 pub backend_client: BackendClientOptions,
221 pub selector: SelectorType,
223 pub enable_region_failover: bool,
225 #[serde(with = "humantime_serde")]
230 pub heartbeat_interval: Duration,
231 #[serde(with = "humantime_serde")]
235 pub region_failure_detector_initialization_delay: Duration,
236 pub allow_region_failover_on_local_wal: bool,
241 pub grpc: GrpcOptions,
242 pub http: HttpOptions,
244 pub logging: LoggingOptions,
246 pub procedure: ProcedureConfig,
248 pub failure_detector: PhiAccrualFailureDetectorOptions,
250 pub datanode: DatanodeClientOptions,
252 pub enable_telemetry: bool,
254 pub data_home: String,
256 pub wal: MetasrvWalConfig,
258 pub store_key_prefix: String,
261 pub max_txn_ops: usize,
272 pub flush_stats_factor: usize,
276 pub tracing: TracingOptions,
278 pub memory: MemoryOptions,
280 pub backend: BackendImpl,
282 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
283 pub meta_table_name: String,
285 #[cfg(feature = "pg_kvbackend")]
286 pub meta_election_lock_id: u64,
288 #[cfg(feature = "pg_kvbackend")]
289 pub meta_schema_name: Option<String>,
291 #[cfg(feature = "pg_kvbackend")]
292 pub auto_create_schema: bool,
294 #[serde(with = "humantime_serde")]
295 pub node_max_idle_time: Duration,
296 pub event_recorder: EventRecorderOptions,
298 pub stats_persistence: StatsPersistenceOptions,
300 pub gc: GcSchedulerOptions,
302}
303
304impl fmt::Debug for MetasrvOptions {
305 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
306 let mut debug_struct = f.debug_struct("MetasrvOptions");
307 debug_struct
308 .field("store_addrs", &self.sanitize_store_addrs())
309 .field("backend_tls", &self.backend_tls)
310 .field("selector", &self.selector)
311 .field("enable_region_failover", &self.enable_region_failover)
312 .field(
313 "allow_region_failover_on_local_wal",
314 &self.allow_region_failover_on_local_wal,
315 )
316 .field("grpc", &self.grpc)
317 .field("http", &self.http)
318 .field("logging", &self.logging)
319 .field("procedure", &self.procedure)
320 .field("failure_detector", &self.failure_detector)
321 .field("datanode", &self.datanode)
322 .field("enable_telemetry", &self.enable_telemetry)
323 .field("data_home", &self.data_home)
324 .field("wal", &self.wal)
325 .field("store_key_prefix", &self.store_key_prefix)
326 .field("max_txn_ops", &self.max_txn_ops)
327 .field("flush_stats_factor", &self.flush_stats_factor)
328 .field("tracing", &self.tracing)
329 .field("backend", &self.backend)
330 .field("event_recorder", &self.event_recorder)
331 .field("stats_persistence", &self.stats_persistence)
332 .field("heartbeat_interval", &self.heartbeat_interval)
333 .field("backend_client", &self.backend_client);
334
335 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
336 debug_struct.field("meta_table_name", &self.meta_table_name);
337
338 #[cfg(feature = "pg_kvbackend")]
339 debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
340 #[cfg(feature = "pg_kvbackend")]
341 debug_struct.field("meta_schema_name", &self.meta_schema_name);
342
343 debug_struct
344 .field("node_max_idle_time", &self.node_max_idle_time)
345 .finish()
346 }
347}
348
349const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
350
351impl Default for MetasrvOptions {
352 fn default() -> Self {
353 Self {
354 #[allow(deprecated)]
355 bind_addr: String::new(),
356 #[allow(deprecated)]
357 server_addr: String::new(),
358 store_addrs: vec!["127.0.0.1:2379".to_string()],
359 backend_tls: Some(TlsOption::prefer()),
360 selector: SelectorType::default(),
361 enable_region_failover: false,
362 heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
363 region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
364 allow_region_failover_on_local_wal: false,
365 grpc: GrpcOptions {
366 bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
367 ..Default::default()
368 },
369 http: HttpOptions::default(),
370 logging: LoggingOptions::default(),
371 procedure: ProcedureConfig {
372 max_retry_times: 12,
373 retry_delay: Duration::from_millis(500),
374 max_metadata_value_size: Some(ReadableSize::kb(1500)),
377 max_running_procedures: 128,
378 },
379 failure_detector: PhiAccrualFailureDetectorOptions::default(),
380 datanode: DatanodeClientOptions::default(),
381 enable_telemetry: true,
382 data_home: DEFAULT_DATA_HOME.to_string(),
383 wal: MetasrvWalConfig::default(),
384 store_key_prefix: String::new(),
385 max_txn_ops: 128,
386 flush_stats_factor: 3,
387 tracing: TracingOptions::default(),
388 memory: MemoryOptions::default(),
389 backend: BackendImpl::EtcdStore,
390 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
391 meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
392 #[cfg(feature = "pg_kvbackend")]
393 meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
394 #[cfg(feature = "pg_kvbackend")]
395 meta_schema_name: None,
396 #[cfg(feature = "pg_kvbackend")]
397 auto_create_schema: true,
398 node_max_idle_time: Duration::from_secs(24 * 60 * 60),
399 event_recorder: EventRecorderOptions::default(),
400 stats_persistence: StatsPersistenceOptions::default(),
401 gc: GcSchedulerOptions::default(),
402 backend_client: BackendClientOptions::default(),
403 }
404 }
405}
406
407impl Configurable for MetasrvOptions {
408 fn env_list_keys() -> Option<&'static [&'static str]> {
409 Some(&["wal.broker_endpoints", "store_addrs"])
410 }
411}
412
413impl MetasrvOptions {
414 fn sanitize_store_addrs(&self) -> Vec<String> {
415 self.store_addrs
416 .iter()
417 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
418 .collect()
419 }
420}
421
422pub struct MetasrvInfo {
423 pub server_addr: String,
424}
425#[derive(Clone)]
426pub struct Context {
427 pub server_addr: String,
428 pub in_memory: ResettableKvBackendRef,
429 pub kv_backend: KvBackendRef,
430 pub leader_cached_kv_backend: ResettableKvBackendRef,
431 pub meta_peer_client: MetaPeerClientRef,
432 pub mailbox: MailboxRef,
433 pub election: Option<ElectionRef>,
434 pub is_infancy: bool,
435 pub table_metadata_manager: TableMetadataManagerRef,
436 pub cache_invalidator: CacheInvalidatorRef,
437 pub leader_region_registry: LeaderRegionRegistryRef,
438 pub topic_stats_registry: TopicStatsRegistryRef,
439 pub heartbeat_interval: Duration,
440 pub is_handshake: bool,
441}
442
443impl Context {
444 pub fn reset_in_memory(&self) {
445 self.in_memory.reset();
446 self.leader_region_registry.reset();
447 }
448
449 pub fn with_handshake(mut self, is_handshake: bool) -> Self {
450 self.is_handshake = is_handshake;
451 self
452 }
453
454 pub fn heartbeat_options_for(&self, role: Role) -> HeartbeatOptions {
455 match role {
456 Role::Datanode => HeartbeatOptions::datanode_from(self.heartbeat_interval),
457 Role::Frontend => HeartbeatOptions::frontend_from(self.heartbeat_interval),
458 Role::Flownode => HeartbeatOptions::flownode_from(self.heartbeat_interval),
459 }
460 }
461}
462
463#[derive(Clone, Copy)]
464pub enum SelectTarget {
465 Datanode,
466 Flownode,
467}
468
469impl Display for SelectTarget {
470 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
471 match self {
472 SelectTarget::Datanode => write!(f, "datanode"),
473 SelectTarget::Flownode => write!(f, "flownode"),
474 }
475 }
476}
477
478#[derive(Clone)]
479pub struct SelectorContext {
480 pub peer_discovery: PeerDiscoveryRef,
481}
482
483pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
484pub type RegionStatAwareSelectorRef =
485 Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
486
487pub struct MetaStateHandler {
488 subscribe_manager: Option<SubscriptionManagerRef>,
489 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
490 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
491 leadership_change_notifier: LeadershipChangeNotifier,
492 mailbox: MailboxRef,
493 state: StateRef,
494}
495
496impl MetaStateHandler {
497 pub async fn on_leader_start(&self) {
498 self.state.write().unwrap().next_state(become_leader(false));
499
500 if let Err(e) = self.leader_cached_kv_backend.load().await {
501 error!(e; "Failed to load kv into leader cache kv store");
502 } else {
503 self.state.write().unwrap().next_state(become_leader(true));
504 }
505
506 self.leadership_change_notifier
507 .notify_on_leader_start()
508 .await;
509
510 self.greptimedb_telemetry_task.should_report(true);
511 }
512
513 pub async fn on_leader_stop(&self) {
514 self.state.write().unwrap().next_state(become_follower());
515
516 self.mailbox.reset().await;
519 self.leadership_change_notifier
520 .notify_on_leader_stop()
521 .await;
522
523 self.greptimedb_telemetry_task.should_report(false);
525
526 if let Some(sub_manager) = self.subscribe_manager.clone() {
527 info!("Leader changed, un_subscribe all");
528 if let Err(e) = sub_manager.unsubscribe_all() {
529 error!(e; "Failed to un_subscribe all");
530 }
531 }
532 }
533}
534
535pub struct Metasrv {
536 state: StateRef,
537 started: Arc<AtomicBool>,
538 start_time_ms: u64,
539 options: MetasrvOptions,
540 in_memory: ResettableKvBackendRef,
543 kv_backend: KvBackendRef,
544 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
545 meta_peer_client: MetaPeerClientRef,
546 selector: SelectorRef,
548 selector_ctx: SelectorContext,
549 flow_selector: SelectorRef,
551 handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
552 handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
553 election: Option<ElectionRef>,
554 procedure_manager: ProcedureManagerRef,
555 mailbox: MailboxRef,
556 ddl_manager: DdlManagerRef,
557 wal_provider: WalProviderRef,
558 table_metadata_manager: TableMetadataManagerRef,
559 runtime_switch_manager: RuntimeSwitchManagerRef,
560 memory_region_keeper: MemoryRegionKeeperRef,
561 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
562 region_migration_manager: RegionMigrationManagerRef,
563 region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
564 cache_invalidator: CacheInvalidatorRef,
565 leader_region_registry: LeaderRegionRegistryRef,
566 topic_stats_registry: TopicStatsRegistryRef,
567 wal_prune_ticker: Option<WalPruneTickerRef>,
568 region_flush_ticker: Option<RegionFlushTickerRef>,
569 table_id_allocator: ResourceIdAllocatorRef,
570 reconciliation_manager: ReconciliationManagerRef,
571 resource_stat: ResourceStatRef,
572 gc_ticker: Option<GcTickerRef>,
573
574 plugins: Plugins,
575}
576
577impl Metasrv {
578 pub async fn try_start(&self) -> Result<()> {
579 if self
580 .started
581 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
582 .is_err()
583 {
584 warn!("Metasrv already started");
585 return Ok(());
586 }
587
588 let handler_group_builder =
589 self.handler_group_builder
590 .lock()
591 .unwrap()
592 .take()
593 .context(error::UnexpectedSnafu {
594 violated: "expected heartbeat handler group builder",
595 })?;
596 *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
597
598 self.table_metadata_manager
600 .init()
601 .await
602 .context(InitMetadataSnafu)?;
603
604 if let Some(election) = self.election() {
605 let procedure_manager = self.procedure_manager.clone();
606 let in_memory = self.in_memory.clone();
607 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
608 let subscribe_manager = self.subscription_manager();
609 let mut rx = election.subscribe_leader_change();
610 let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
611 greptimedb_telemetry_task
612 .start()
613 .context(StartTelemetryTaskSnafu)?;
614
615 let mut leadership_change_notifier = LeadershipChangeNotifier::default();
617 leadership_change_notifier.add_listener(self.wal_provider.clone());
618 leadership_change_notifier
619 .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
620 leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
621 self.options.node_max_idle_time,
622 self.in_memory.clone(),
623 )));
624 if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
625 leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
626 }
627 if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
628 leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
629 }
630 if let Some(region_flush_trigger) = &self.region_flush_ticker {
631 leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
632 }
633 if let Some(gc_ticker) = &self.gc_ticker {
634 leadership_change_notifier.add_listener(gc_ticker.clone() as _);
635 }
636 if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
637 customizer.customize(&mut leadership_change_notifier);
638 }
639
640 let state_handler = MetaStateHandler {
641 greptimedb_telemetry_task,
642 subscribe_manager,
643 state: self.state.clone(),
644 leader_cached_kv_backend: leader_cached_kv_backend.clone(),
645 leadership_change_notifier,
646 mailbox: self.mailbox.clone(),
647 };
648 let _handle = common_runtime::spawn_global(async move {
649 loop {
650 match rx.recv().await {
651 Ok(msg) => {
652 in_memory.reset();
653 leader_cached_kv_backend.reset();
654 info!("Leader's cache has bean cleared on leader change: {msg}");
655 match msg {
656 LeaderChangeMessage::Elected(_) => {
657 state_handler.on_leader_start().await;
658 }
659 LeaderChangeMessage::StepDown(leader) => {
660 error!("Leader :{:?} step down", leader);
661
662 state_handler.on_leader_stop().await;
663 }
664 }
665 }
666 Err(RecvError::Closed) => {
667 error!("Not expected, is leader election loop still running?");
668 break;
669 }
670 Err(RecvError::Lagged(_)) => {
671 break;
672 }
673 }
674 }
675
676 state_handler.on_leader_stop().await;
677 });
678
679 {
681 let election = election.clone();
682 let started = self.started.clone();
683 let node_info = self.node_info();
684 let _handle = common_runtime::spawn_global(async move {
685 while started.load(Ordering::Acquire) {
686 let res = election.register_candidate(&node_info).await;
687 if let Err(e) = res {
688 warn!(e; "Metasrv register candidate error");
689 }
690 }
691 });
692 }
693
694 {
696 let election = election.clone();
697 let started = self.started.clone();
698 let _handle = common_runtime::spawn_global(async move {
699 while started.load(Ordering::Acquire) {
700 let res = election.campaign().await;
701 if let Err(e) = res {
702 warn!(e; "Metasrv election error");
703 }
704 election.reset_campaign().await;
705 info!("Metasrv re-initiate election");
706 }
707 info!("Metasrv stopped");
708 });
709 }
710 } else {
711 warn!(
712 "Ensure only one instance of Metasrv is running, as there is no election service."
713 );
714
715 if let Err(e) = self.wal_provider.start().await {
716 error!(e; "Failed to start wal provider");
717 }
718 self.leader_cached_kv_backend
720 .load()
721 .await
722 .context(KvBackendSnafu)?;
723 self.procedure_manager
724 .start()
725 .await
726 .context(StartProcedureManagerSnafu)?;
727 }
728
729 info!("Metasrv started");
730
731 Ok(())
732 }
733
734 pub async fn shutdown(&self) -> Result<()> {
735 if self
736 .started
737 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
738 .is_err()
739 {
740 warn!("Metasrv already stopped");
741 return Ok(());
742 }
743
744 self.procedure_manager
745 .stop()
746 .await
747 .context(StopProcedureManagerSnafu)?;
748
749 info!("Metasrv stopped");
750
751 Ok(())
752 }
753
754 pub fn start_time_ms(&self) -> u64 {
755 self.start_time_ms
756 }
757
758 pub fn resource_stat(&self) -> &ResourceStatRef {
759 &self.resource_stat
760 }
761
762 pub fn node_info(&self) -> MetasrvNodeInfo {
763 let build_info = common_version::build_info();
764 MetasrvNodeInfo {
765 addr: self.options().grpc.server_addr.clone(),
766 version: build_info.version.to_string(),
767 git_commit: build_info.commit_short.to_string(),
768 start_time_ms: self.start_time_ms(),
769 total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
770 total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
771 cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
772 memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
773 hostname: hostname::get()
774 .unwrap_or_default()
775 .to_string_lossy()
776 .to_string(),
777 }
778 }
779
780 pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
783 discovery::utils::alive_datanode(
784 &DefaultSystemTimer,
785 self.meta_peer_client.as_ref(),
786 peer_id,
787 default_distributed_time_constants().datanode_lease,
788 )
789 .await
790 }
791
792 pub fn options(&self) -> &MetasrvOptions {
793 &self.options
794 }
795
796 pub fn in_memory(&self) -> &ResettableKvBackendRef {
797 &self.in_memory
798 }
799
800 pub fn kv_backend(&self) -> &KvBackendRef {
801 &self.kv_backend
802 }
803
804 pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
805 &self.meta_peer_client
806 }
807
808 pub fn selector(&self) -> &SelectorRef {
809 &self.selector
810 }
811
812 pub fn selector_ctx(&self) -> &SelectorContext {
813 &self.selector_ctx
814 }
815
816 pub fn flow_selector(&self) -> &SelectorRef {
817 &self.flow_selector
818 }
819
820 pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
821 self.handler_group.read().unwrap().clone()
822 }
823
824 pub fn election(&self) -> Option<&ElectionRef> {
825 self.election.as_ref()
826 }
827
828 pub fn mailbox(&self) -> &MailboxRef {
829 &self.mailbox
830 }
831
832 pub fn ddl_manager(&self) -> &DdlManagerRef {
833 &self.ddl_manager
834 }
835
836 pub fn procedure_manager(&self) -> &ProcedureManagerRef {
837 &self.procedure_manager
838 }
839
840 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
841 &self.table_metadata_manager
842 }
843
844 pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
845 &self.runtime_switch_manager
846 }
847
848 pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
849 &self.memory_region_keeper
850 }
851
852 pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
853 &self.region_migration_manager
854 }
855
856 pub fn publish(&self) -> Option<PublisherRef> {
857 self.plugins.get::<PublisherRef>()
858 }
859
860 pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
861 self.plugins.get::<SubscriptionManagerRef>()
862 }
863
864 pub fn table_id_allocator(&self) -> &ResourceIdAllocatorRef {
865 &self.table_id_allocator
866 }
867
868 pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
869 &self.reconciliation_manager
870 }
871
872 pub fn plugins(&self) -> &Plugins {
873 &self.plugins
874 }
875
876 pub fn started(&self) -> Arc<AtomicBool> {
877 self.started.clone()
878 }
879
880 pub fn gc_ticker(&self) -> Option<GcTickerRef> {
881 self.gc_ticker.as_ref().cloned()
882 }
883
884 #[inline]
885 pub fn new_ctx(&self) -> Context {
886 let server_addr = self.options().grpc.server_addr.clone();
887 let in_memory = self.in_memory.clone();
888 let kv_backend = self.kv_backend.clone();
889 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
890 let meta_peer_client = self.meta_peer_client.clone();
891 let mailbox = self.mailbox.clone();
892 let election = self.election.clone();
893 let table_metadata_manager = self.table_metadata_manager.clone();
894 let cache_invalidator = self.cache_invalidator.clone();
895 let leader_region_registry = self.leader_region_registry.clone();
896 let topic_stats_registry = self.topic_stats_registry.clone();
897
898 Context {
899 server_addr,
900 in_memory,
901 kv_backend,
902 leader_cached_kv_backend,
903 meta_peer_client,
904 mailbox,
905 election,
906 is_infancy: false,
907 table_metadata_manager,
908 cache_invalidator,
909 leader_region_registry,
910 topic_stats_registry,
911 heartbeat_interval: self.options().heartbeat_interval,
912 is_handshake: false,
913 }
914 }
915}
916
917#[cfg(test)]
918mod tests {
919 use crate::metasrv::MetasrvNodeInfo;
920
921 #[test]
922 fn test_deserialize_metasrv_node_info() {
923 let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
924 let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
925 assert_eq!(node_info.addr, "127.0.0.1:4002");
926 assert_eq!(node_info.version, "0.1.0");
927 assert_eq!(node_info.git_commit, "1234567890");
928 assert_eq!(node_info.start_time_ms, 1715145600);
929 }
930}