1pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use clap::ValueEnum;
23use common_base::Plugins;
24use common_base::readable_size::ReadableSize;
25use common_config::{Configurable, DEFAULT_DATA_HOME};
26use common_event_recorder::EventRecorderOptions;
27use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
28use common_meta::cache_invalidator::CacheInvalidatorRef;
29use common_meta::ddl_manager::DdlManagerRef;
30use common_meta::distributed_time_constants;
31use common_meta::key::TableMetadataManagerRef;
32use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
33use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
34use common_meta::leadership_notifier::{
35 LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
36};
37use common_meta::node_expiry_listener::NodeExpiryListener;
38use common_meta::peer::{Peer, PeerDiscoveryRef};
39use common_meta::reconciliation::manager::ReconciliationManagerRef;
40use common_meta::region_keeper::MemoryRegionKeeperRef;
41use common_meta::region_registry::LeaderRegionRegistryRef;
42use common_meta::sequence::SequenceRef;
43use common_meta::stats::topic::TopicStatsRegistryRef;
44use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
45use common_options::datanode::DatanodeClientOptions;
46use common_options::memory::MemoryOptions;
47use common_procedure::ProcedureManagerRef;
48use common_procedure::options::ProcedureConfig;
49use common_stat::ResourceStatRef;
50use common_telemetry::logging::{LoggingOptions, TracingOptions};
51use common_telemetry::{error, info, warn};
52use common_time::util::DefaultSystemTimer;
53use common_wal::config::MetasrvWalConfig;
54use serde::{Deserialize, Serialize};
55use servers::grpc::GrpcOptions;
56use servers::http::HttpOptions;
57use servers::tls::TlsOption;
58use snafu::{OptionExt, ResultExt};
59use store_api::storage::RegionId;
60use tokio::sync::broadcast::error::RecvError;
61
62use crate::cluster::MetaPeerClientRef;
63use crate::discovery;
64use crate::election::{Election, LeaderChangeMessage};
65use crate::error::{
66 self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
67 StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
68};
69use crate::failure_detector::PhiAccrualFailureDetectorOptions;
70use crate::gc::{GcSchedulerOptions, GcTickerRef};
71use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
72use crate::procedure::ProcedureManagerListenerAdapter;
73use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
74use crate::procedure::wal_prune::manager::WalPruneTickerRef;
75use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
76use crate::region::flush_trigger::RegionFlushTickerRef;
77use crate::region::supervisor::RegionSupervisorTickerRef;
78use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
79use crate::service::mailbox::MailboxRef;
80use crate::service::store::cached_kv::LeaderCachedKvBackend;
81use crate::state::{StateRef, become_follower, become_leader};
82
83pub const TABLE_ID_SEQ: &str = "table_id";
84pub const FLOW_ID_SEQ: &str = "flow_id";
85pub const METASRV_DATA_DIR: &str = "metasrv";
86
87#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
89#[serde(rename_all = "snake_case")]
90pub enum BackendImpl {
91 #[default]
93 EtcdStore,
94 MemoryStore,
96 #[cfg(feature = "pg_kvbackend")]
97 PostgresStore,
99 #[cfg(feature = "mysql_kvbackend")]
100 MysqlStore,
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
106pub struct StatsPersistenceOptions {
107 #[serde(with = "humantime_serde")]
109 pub ttl: Duration,
110 #[serde(with = "humantime_serde")]
112 pub interval: Duration,
113}
114
115impl Default for StatsPersistenceOptions {
116 fn default() -> Self {
117 Self {
118 ttl: Duration::ZERO,
119 interval: Duration::from_mins(10),
120 }
121 }
122}
123
124#[derive(Clone, PartialEq, Serialize, Deserialize)]
125#[serde(default)]
126pub struct MetasrvOptions {
127 #[deprecated(note = "Use grpc.bind_addr instead")]
129 pub bind_addr: String,
130 #[deprecated(note = "Use grpc.server_addr instead")]
132 pub server_addr: String,
133 pub store_addrs: Vec<String>,
135 #[serde(default)]
138 pub backend_tls: Option<TlsOption>,
139 pub selector: SelectorType,
141 pub use_memory_store: bool,
143 pub enable_region_failover: bool,
145 #[serde(with = "humantime_serde")]
149 pub region_failure_detector_initialization_delay: Duration,
150 pub allow_region_failover_on_local_wal: bool,
155 pub grpc: GrpcOptions,
156 pub http: HttpOptions,
158 pub logging: LoggingOptions,
160 pub procedure: ProcedureConfig,
162 pub failure_detector: PhiAccrualFailureDetectorOptions,
164 pub datanode: DatanodeClientOptions,
166 pub enable_telemetry: bool,
168 pub data_home: String,
170 pub wal: MetasrvWalConfig,
172 pub store_key_prefix: String,
175 pub max_txn_ops: usize,
186 pub flush_stats_factor: usize,
190 pub tracing: TracingOptions,
192 pub memory: MemoryOptions,
194 pub backend: BackendImpl,
196 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
197 pub meta_table_name: String,
199 #[cfg(feature = "pg_kvbackend")]
200 pub meta_election_lock_id: u64,
202 #[cfg(feature = "pg_kvbackend")]
203 pub meta_schema_name: Option<String>,
205 #[serde(with = "humantime_serde")]
206 pub node_max_idle_time: Duration,
207 pub event_recorder: EventRecorderOptions,
209 pub stats_persistence: StatsPersistenceOptions,
211 pub gc: GcSchedulerOptions,
213}
214
215impl fmt::Debug for MetasrvOptions {
216 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
217 let mut debug_struct = f.debug_struct("MetasrvOptions");
218 debug_struct
219 .field("store_addrs", &self.sanitize_store_addrs())
220 .field("backend_tls", &self.backend_tls)
221 .field("selector", &self.selector)
222 .field("use_memory_store", &self.use_memory_store)
223 .field("enable_region_failover", &self.enable_region_failover)
224 .field(
225 "allow_region_failover_on_local_wal",
226 &self.allow_region_failover_on_local_wal,
227 )
228 .field("grpc", &self.grpc)
229 .field("http", &self.http)
230 .field("logging", &self.logging)
231 .field("procedure", &self.procedure)
232 .field("failure_detector", &self.failure_detector)
233 .field("datanode", &self.datanode)
234 .field("enable_telemetry", &self.enable_telemetry)
235 .field("data_home", &self.data_home)
236 .field("wal", &self.wal)
237 .field("store_key_prefix", &self.store_key_prefix)
238 .field("max_txn_ops", &self.max_txn_ops)
239 .field("flush_stats_factor", &self.flush_stats_factor)
240 .field("tracing", &self.tracing)
241 .field("backend", &self.backend)
242 .field("event_recorder", &self.event_recorder)
243 .field("stats_persistence", &self.stats_persistence);
244
245 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
246 debug_struct.field("meta_table_name", &self.meta_table_name);
247
248 #[cfg(feature = "pg_kvbackend")]
249 debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
250 #[cfg(feature = "pg_kvbackend")]
251 debug_struct.field("meta_schema_name", &self.meta_schema_name);
252
253 debug_struct
254 .field("node_max_idle_time", &self.node_max_idle_time)
255 .finish()
256 }
257}
258
259const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
260
261impl Default for MetasrvOptions {
262 fn default() -> Self {
263 Self {
264 #[allow(deprecated)]
265 bind_addr: String::new(),
266 #[allow(deprecated)]
267 server_addr: String::new(),
268 store_addrs: vec!["127.0.0.1:2379".to_string()],
269 backend_tls: None,
270 selector: SelectorType::default(),
271 use_memory_store: false,
272 enable_region_failover: false,
273 region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
274 allow_region_failover_on_local_wal: false,
275 grpc: GrpcOptions {
276 bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
277 ..Default::default()
278 },
279 http: HttpOptions::default(),
280 logging: LoggingOptions::default(),
281 procedure: ProcedureConfig {
282 max_retry_times: 12,
283 retry_delay: Duration::from_millis(500),
284 max_metadata_value_size: Some(ReadableSize::kb(1500)),
287 max_running_procedures: 128,
288 },
289 failure_detector: PhiAccrualFailureDetectorOptions::default(),
290 datanode: DatanodeClientOptions::default(),
291 enable_telemetry: true,
292 data_home: DEFAULT_DATA_HOME.to_string(),
293 wal: MetasrvWalConfig::default(),
294 store_key_prefix: String::new(),
295 max_txn_ops: 128,
296 flush_stats_factor: 3,
297 tracing: TracingOptions::default(),
298 memory: MemoryOptions::default(),
299 backend: BackendImpl::EtcdStore,
300 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
301 meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
302 #[cfg(feature = "pg_kvbackend")]
303 meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
304 #[cfg(feature = "pg_kvbackend")]
305 meta_schema_name: None,
306 node_max_idle_time: Duration::from_secs(24 * 60 * 60),
307 event_recorder: EventRecorderOptions::default(),
308 stats_persistence: StatsPersistenceOptions::default(),
309 gc: GcSchedulerOptions::default(),
310 }
311 }
312}
313
314impl Configurable for MetasrvOptions {
315 fn env_list_keys() -> Option<&'static [&'static str]> {
316 Some(&["wal.broker_endpoints", "store_addrs"])
317 }
318}
319
320impl MetasrvOptions {
321 fn sanitize_store_addrs(&self) -> Vec<String> {
322 self.store_addrs
323 .iter()
324 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
325 .collect()
326 }
327}
328
329pub struct MetasrvInfo {
330 pub server_addr: String,
331}
332#[derive(Clone)]
333pub struct Context {
334 pub server_addr: String,
335 pub in_memory: ResettableKvBackendRef,
336 pub kv_backend: KvBackendRef,
337 pub leader_cached_kv_backend: ResettableKvBackendRef,
338 pub meta_peer_client: MetaPeerClientRef,
339 pub mailbox: MailboxRef,
340 pub election: Option<ElectionRef>,
341 pub is_infancy: bool,
342 pub table_metadata_manager: TableMetadataManagerRef,
343 pub cache_invalidator: CacheInvalidatorRef,
344 pub leader_region_registry: LeaderRegionRegistryRef,
345 pub topic_stats_registry: TopicStatsRegistryRef,
346}
347
348impl Context {
349 pub fn reset_in_memory(&self) {
350 self.in_memory.reset();
351 self.leader_region_registry.reset();
352 }
353}
354
355pub struct LeaderValue(pub String);
357
358impl<T: AsRef<[u8]>> From<T> for LeaderValue {
359 fn from(value: T) -> Self {
360 let string = String::from_utf8_lossy(value.as_ref());
361 Self(string.to_string())
362 }
363}
364
365#[derive(Debug, Clone, Serialize, Deserialize)]
366pub struct MetasrvNodeInfo {
367 pub addr: String,
369 pub version: String,
371 pub git_commit: String,
373 pub start_time_ms: u64,
375 #[serde(default)]
377 pub total_cpu_millicores: i64,
378 #[serde(default)]
380 pub total_memory_bytes: i64,
381 #[serde(default)]
383 pub cpu_usage_millicores: i64,
384 #[serde(default)]
386 pub memory_usage_bytes: i64,
387 #[serde(default)]
389 pub hostname: String,
390}
391
392#[allow(deprecated)]
394impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
395 fn from(node_info: MetasrvNodeInfo) -> Self {
396 Self {
397 peer: Some(api::v1::meta::Peer {
398 addr: node_info.addr,
399 ..Default::default()
400 }),
401 version: node_info.version.clone(),
404 git_commit: node_info.git_commit.clone(),
405 start_time_ms: node_info.start_time_ms,
406 cpus: node_info.total_cpu_millicores as u32,
407 memory_bytes: node_info.total_memory_bytes as u64,
408 info: Some(api::v1::meta::NodeInfo {
410 version: node_info.version,
411 git_commit: node_info.git_commit,
412 start_time_ms: node_info.start_time_ms,
413 total_cpu_millicores: node_info.total_cpu_millicores,
414 total_memory_bytes: node_info.total_memory_bytes,
415 cpu_usage_millicores: node_info.cpu_usage_millicores,
416 memory_usage_bytes: node_info.memory_usage_bytes,
417 cpus: node_info.total_cpu_millicores as u32,
418 memory_bytes: node_info.total_memory_bytes as u64,
419 hostname: node_info.hostname,
420 }),
421 }
422 }
423}
424
425#[derive(Clone, Copy)]
426pub enum SelectTarget {
427 Datanode,
428 Flownode,
429}
430
431impl Display for SelectTarget {
432 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
433 match self {
434 SelectTarget::Datanode => write!(f, "datanode"),
435 SelectTarget::Flownode => write!(f, "flownode"),
436 }
437 }
438}
439
440#[derive(Clone)]
441pub struct SelectorContext {
442 pub peer_discovery: PeerDiscoveryRef,
443}
444
445pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
446pub type RegionStatAwareSelectorRef =
447 Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
448pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
449
450pub struct MetaStateHandler {
451 subscribe_manager: Option<SubscriptionManagerRef>,
452 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
453 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
454 leadership_change_notifier: LeadershipChangeNotifier,
455 state: StateRef,
456}
457
458impl MetaStateHandler {
459 pub async fn on_leader_start(&self) {
460 self.state.write().unwrap().next_state(become_leader(false));
461
462 if let Err(e) = self.leader_cached_kv_backend.load().await {
463 error!(e; "Failed to load kv into leader cache kv store");
464 } else {
465 self.state.write().unwrap().next_state(become_leader(true));
466 }
467
468 self.leadership_change_notifier
469 .notify_on_leader_start()
470 .await;
471
472 self.greptimedb_telemetry_task.should_report(true);
473 }
474
475 pub async fn on_leader_stop(&self) {
476 self.state.write().unwrap().next_state(become_follower());
477
478 self.leadership_change_notifier
479 .notify_on_leader_stop()
480 .await;
481
482 self.greptimedb_telemetry_task.should_report(false);
484
485 if let Some(sub_manager) = self.subscribe_manager.clone() {
486 info!("Leader changed, un_subscribe all");
487 if let Err(e) = sub_manager.unsubscribe_all() {
488 error!(e; "Failed to un_subscribe all");
489 }
490 }
491 }
492}
493
494pub struct Metasrv {
495 state: StateRef,
496 started: Arc<AtomicBool>,
497 start_time_ms: u64,
498 options: MetasrvOptions,
499 in_memory: ResettableKvBackendRef,
502 kv_backend: KvBackendRef,
503 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
504 meta_peer_client: MetaPeerClientRef,
505 selector: SelectorRef,
507 selector_ctx: SelectorContext,
508 flow_selector: SelectorRef,
510 handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
511 handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
512 election: Option<ElectionRef>,
513 procedure_manager: ProcedureManagerRef,
514 mailbox: MailboxRef,
515 ddl_manager: DdlManagerRef,
516 wal_options_allocator: WalOptionsAllocatorRef,
517 table_metadata_manager: TableMetadataManagerRef,
518 runtime_switch_manager: RuntimeSwitchManagerRef,
519 memory_region_keeper: MemoryRegionKeeperRef,
520 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
521 region_migration_manager: RegionMigrationManagerRef,
522 region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
523 cache_invalidator: CacheInvalidatorRef,
524 leader_region_registry: LeaderRegionRegistryRef,
525 topic_stats_registry: TopicStatsRegistryRef,
526 wal_prune_ticker: Option<WalPruneTickerRef>,
527 region_flush_ticker: Option<RegionFlushTickerRef>,
528 table_id_sequence: SequenceRef,
529 reconciliation_manager: ReconciliationManagerRef,
530 resource_stat: ResourceStatRef,
531 gc_ticker: Option<GcTickerRef>,
532
533 plugins: Plugins,
534}
535
536impl Metasrv {
537 pub async fn try_start(&self) -> Result<()> {
538 if self
539 .started
540 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
541 .is_err()
542 {
543 warn!("Metasrv already started");
544 return Ok(());
545 }
546
547 let handler_group_builder =
548 self.handler_group_builder
549 .lock()
550 .unwrap()
551 .take()
552 .context(error::UnexpectedSnafu {
553 violated: "expected heartbeat handler group builder",
554 })?;
555 *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
556
557 self.table_metadata_manager
559 .init()
560 .await
561 .context(InitMetadataSnafu)?;
562
563 if let Some(election) = self.election() {
564 let procedure_manager = self.procedure_manager.clone();
565 let in_memory = self.in_memory.clone();
566 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
567 let subscribe_manager = self.subscription_manager();
568 let mut rx = election.subscribe_leader_change();
569 let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
570 greptimedb_telemetry_task
571 .start()
572 .context(StartTelemetryTaskSnafu)?;
573
574 let mut leadership_change_notifier = LeadershipChangeNotifier::default();
576 leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
577 leadership_change_notifier
578 .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
579 leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
580 self.options.node_max_idle_time,
581 self.in_memory.clone(),
582 )));
583 if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
584 leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
585 }
586 if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
587 leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
588 }
589 if let Some(region_flush_trigger) = &self.region_flush_ticker {
590 leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
591 }
592 if let Some(gc_ticker) = &self.gc_ticker {
593 leadership_change_notifier.add_listener(gc_ticker.clone() as _);
594 }
595 if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
596 customizer.customize(&mut leadership_change_notifier);
597 }
598
599 let state_handler = MetaStateHandler {
600 greptimedb_telemetry_task,
601 subscribe_manager,
602 state: self.state.clone(),
603 leader_cached_kv_backend: leader_cached_kv_backend.clone(),
604 leadership_change_notifier,
605 };
606 let _handle = common_runtime::spawn_global(async move {
607 loop {
608 match rx.recv().await {
609 Ok(msg) => {
610 in_memory.reset();
611 leader_cached_kv_backend.reset();
612 info!("Leader's cache has bean cleared on leader change: {msg}");
613 match msg {
614 LeaderChangeMessage::Elected(_) => {
615 state_handler.on_leader_start().await;
616 }
617 LeaderChangeMessage::StepDown(leader) => {
618 error!("Leader :{:?} step down", leader);
619
620 state_handler.on_leader_stop().await;
621 }
622 }
623 }
624 Err(RecvError::Closed) => {
625 error!("Not expected, is leader election loop still running?");
626 break;
627 }
628 Err(RecvError::Lagged(_)) => {
629 break;
630 }
631 }
632 }
633
634 state_handler.on_leader_stop().await;
635 });
636
637 {
639 let election = election.clone();
640 let started = self.started.clone();
641 let node_info = self.node_info();
642 let _handle = common_runtime::spawn_global(async move {
643 while started.load(Ordering::Acquire) {
644 let res = election.register_candidate(&node_info).await;
645 if let Err(e) = res {
646 warn!(e; "Metasrv register candidate error");
647 }
648 }
649 });
650 }
651
652 {
654 let election = election.clone();
655 let started = self.started.clone();
656 let _handle = common_runtime::spawn_global(async move {
657 while started.load(Ordering::Acquire) {
658 let res = election.campaign().await;
659 if let Err(e) = res {
660 warn!(e; "Metasrv election error");
661 }
662 election.reset_campaign().await;
663 info!("Metasrv re-initiate election");
664 }
665 info!("Metasrv stopped");
666 });
667 }
668 } else {
669 warn!(
670 "Ensure only one instance of Metasrv is running, as there is no election service."
671 );
672
673 if let Err(e) = self.wal_options_allocator.start().await {
674 error!(e; "Failed to start wal options allocator");
675 }
676 self.leader_cached_kv_backend
678 .load()
679 .await
680 .context(KvBackendSnafu)?;
681 self.procedure_manager
682 .start()
683 .await
684 .context(StartProcedureManagerSnafu)?;
685 }
686
687 info!("Metasrv started");
688
689 Ok(())
690 }
691
692 pub async fn shutdown(&self) -> Result<()> {
693 if self
694 .started
695 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
696 .is_err()
697 {
698 warn!("Metasrv already stopped");
699 return Ok(());
700 }
701
702 self.procedure_manager
703 .stop()
704 .await
705 .context(StopProcedureManagerSnafu)?;
706
707 info!("Metasrv stopped");
708
709 Ok(())
710 }
711
712 pub fn start_time_ms(&self) -> u64 {
713 self.start_time_ms
714 }
715
716 pub fn resource_stat(&self) -> &ResourceStatRef {
717 &self.resource_stat
718 }
719
720 pub fn node_info(&self) -> MetasrvNodeInfo {
721 let build_info = common_version::build_info();
722 MetasrvNodeInfo {
723 addr: self.options().grpc.server_addr.clone(),
724 version: build_info.version.to_string(),
725 git_commit: build_info.commit_short.to_string(),
726 start_time_ms: self.start_time_ms(),
727 total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
728 total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
729 cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
730 memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
731 hostname: hostname::get()
732 .unwrap_or_default()
733 .to_string_lossy()
734 .to_string(),
735 }
736 }
737
738 pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
741 discovery::utils::alive_datanode(
742 &DefaultSystemTimer,
743 self.meta_peer_client.as_ref(),
744 peer_id,
745 Duration::from_secs(distributed_time_constants::DATANODE_LEASE_SECS),
746 )
747 .await
748 }
749
750 pub fn options(&self) -> &MetasrvOptions {
751 &self.options
752 }
753
754 pub fn in_memory(&self) -> &ResettableKvBackendRef {
755 &self.in_memory
756 }
757
758 pub fn kv_backend(&self) -> &KvBackendRef {
759 &self.kv_backend
760 }
761
762 pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
763 &self.meta_peer_client
764 }
765
766 pub fn selector(&self) -> &SelectorRef {
767 &self.selector
768 }
769
770 pub fn selector_ctx(&self) -> &SelectorContext {
771 &self.selector_ctx
772 }
773
774 pub fn flow_selector(&self) -> &SelectorRef {
775 &self.flow_selector
776 }
777
778 pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
779 self.handler_group.read().unwrap().clone()
780 }
781
782 pub fn election(&self) -> Option<&ElectionRef> {
783 self.election.as_ref()
784 }
785
786 pub fn mailbox(&self) -> &MailboxRef {
787 &self.mailbox
788 }
789
790 pub fn ddl_manager(&self) -> &DdlManagerRef {
791 &self.ddl_manager
792 }
793
794 pub fn procedure_manager(&self) -> &ProcedureManagerRef {
795 &self.procedure_manager
796 }
797
798 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
799 &self.table_metadata_manager
800 }
801
802 pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
803 &self.runtime_switch_manager
804 }
805
806 pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
807 &self.memory_region_keeper
808 }
809
810 pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
811 &self.region_migration_manager
812 }
813
814 pub fn publish(&self) -> Option<PublisherRef> {
815 self.plugins.get::<PublisherRef>()
816 }
817
818 pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
819 self.plugins.get::<SubscriptionManagerRef>()
820 }
821
822 pub fn table_id_sequence(&self) -> &SequenceRef {
823 &self.table_id_sequence
824 }
825
826 pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
827 &self.reconciliation_manager
828 }
829
830 pub fn plugins(&self) -> &Plugins {
831 &self.plugins
832 }
833
834 pub fn started(&self) -> Arc<AtomicBool> {
835 self.started.clone()
836 }
837
838 #[inline]
839 pub fn new_ctx(&self) -> Context {
840 let server_addr = self.options().grpc.server_addr.clone();
841 let in_memory = self.in_memory.clone();
842 let kv_backend = self.kv_backend.clone();
843 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
844 let meta_peer_client = self.meta_peer_client.clone();
845 let mailbox = self.mailbox.clone();
846 let election = self.election.clone();
847 let table_metadata_manager = self.table_metadata_manager.clone();
848 let cache_invalidator = self.cache_invalidator.clone();
849 let leader_region_registry = self.leader_region_registry.clone();
850 let topic_stats_registry = self.topic_stats_registry.clone();
851
852 Context {
853 server_addr,
854 in_memory,
855 kv_backend,
856 leader_cached_kv_backend,
857 meta_peer_client,
858 mailbox,
859 election,
860 is_infancy: false,
861 table_metadata_manager,
862 cache_invalidator,
863 leader_region_registry,
864 topic_stats_registry,
865 }
866 }
867}
868
869#[cfg(test)]
870mod tests {
871 use crate::metasrv::MetasrvNodeInfo;
872
873 #[test]
874 fn test_deserialize_metasrv_node_info() {
875 let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
876 let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
877 assert_eq!(node_info.addr, "127.0.0.1:4002");
878 assert_eq!(node_info.version, "0.1.0");
879 assert_eq!(node_info.git_commit, "1234567890");
880 assert_eq!(node_info.start_time_ms, 1715145600);
881 }
882}