1pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use clap::ValueEnum;
23use common_base::Plugins;
24use common_base::readable_size::ReadableSize;
25use common_config::{Configurable, DEFAULT_DATA_HOME};
26use common_event_recorder::EventRecorderOptions;
27use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
28use common_meta::cache_invalidator::CacheInvalidatorRef;
29use common_meta::ddl_manager::DdlManagerRef;
30use common_meta::distributed_time_constants;
31use common_meta::key::TableMetadataManagerRef;
32use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
33use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
34use common_meta::leadership_notifier::{
35 LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
36};
37use common_meta::node_expiry_listener::NodeExpiryListener;
38use common_meta::peer::{Peer, PeerDiscoveryRef};
39use common_meta::reconciliation::manager::ReconciliationManagerRef;
40use common_meta::region_keeper::MemoryRegionKeeperRef;
41use common_meta::region_registry::LeaderRegionRegistryRef;
42use common_meta::sequence::SequenceRef;
43use common_meta::stats::topic::TopicStatsRegistryRef;
44use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
45use common_options::datanode::DatanodeClientOptions;
46use common_options::memory::MemoryOptions;
47use common_procedure::ProcedureManagerRef;
48use common_procedure::options::ProcedureConfig;
49use common_stat::ResourceStatRef;
50use common_telemetry::logging::{LoggingOptions, TracingOptions};
51use common_telemetry::{error, info, warn};
52use common_time::util::DefaultSystemTimer;
53use common_wal::config::MetasrvWalConfig;
54use serde::{Deserialize, Serialize};
55use servers::grpc::GrpcOptions;
56use servers::http::HttpOptions;
57use servers::tls::TlsOption;
58use snafu::{OptionExt, ResultExt};
59use store_api::storage::RegionId;
60use tokio::sync::broadcast::error::RecvError;
61
62use crate::cluster::MetaPeerClientRef;
63use crate::discovery;
64use crate::election::{Election, LeaderChangeMessage};
65use crate::error::{
66 self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
67 StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
68};
69use crate::failure_detector::PhiAccrualFailureDetectorOptions;
70use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
71use crate::procedure::ProcedureManagerListenerAdapter;
72use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
73use crate::procedure::wal_prune::manager::WalPruneTickerRef;
74use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
75use crate::region::flush_trigger::RegionFlushTickerRef;
76use crate::region::supervisor::RegionSupervisorTickerRef;
77use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
78use crate::service::mailbox::MailboxRef;
79use crate::service::store::cached_kv::LeaderCachedKvBackend;
80use crate::state::{StateRef, become_follower, become_leader};
81
82pub const TABLE_ID_SEQ: &str = "table_id";
83pub const FLOW_ID_SEQ: &str = "flow_id";
84pub const METASRV_DATA_DIR: &str = "metasrv";
85
86#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
88#[serde(rename_all = "snake_case")]
89pub enum BackendImpl {
90 #[default]
92 EtcdStore,
93 MemoryStore,
95 #[cfg(feature = "pg_kvbackend")]
96 PostgresStore,
98 #[cfg(feature = "mysql_kvbackend")]
99 MysqlStore,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
105pub struct StatsPersistenceOptions {
106 #[serde(with = "humantime_serde")]
108 pub ttl: Duration,
109 #[serde(with = "humantime_serde")]
111 pub interval: Duration,
112}
113
114impl Default for StatsPersistenceOptions {
115 fn default() -> Self {
116 Self {
117 ttl: Duration::ZERO,
118 interval: Duration::from_mins(10),
119 }
120 }
121}
122
123#[derive(Clone, PartialEq, Serialize, Deserialize)]
124#[serde(default)]
125pub struct MetasrvOptions {
126 #[deprecated(note = "Use grpc.bind_addr instead")]
128 pub bind_addr: String,
129 #[deprecated(note = "Use grpc.server_addr instead")]
131 pub server_addr: String,
132 pub store_addrs: Vec<String>,
134 #[serde(default)]
137 pub backend_tls: Option<TlsOption>,
138 pub selector: SelectorType,
140 pub use_memory_store: bool,
142 pub enable_region_failover: bool,
144 #[serde(with = "humantime_serde")]
148 pub region_failure_detector_initialization_delay: Duration,
149 pub allow_region_failover_on_local_wal: bool,
154 pub grpc: GrpcOptions,
155 pub http: HttpOptions,
157 pub logging: LoggingOptions,
159 pub procedure: ProcedureConfig,
161 pub failure_detector: PhiAccrualFailureDetectorOptions,
163 pub datanode: DatanodeClientOptions,
165 pub enable_telemetry: bool,
167 pub data_home: String,
169 pub wal: MetasrvWalConfig,
171 pub store_key_prefix: String,
174 pub max_txn_ops: usize,
185 pub flush_stats_factor: usize,
189 pub tracing: TracingOptions,
191 pub memory: MemoryOptions,
193 pub backend: BackendImpl,
195 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
196 pub meta_table_name: String,
198 #[cfg(feature = "pg_kvbackend")]
199 pub meta_election_lock_id: u64,
201 #[cfg(feature = "pg_kvbackend")]
202 pub meta_schema_name: Option<String>,
204 #[serde(with = "humantime_serde")]
205 pub node_max_idle_time: Duration,
206 pub event_recorder: EventRecorderOptions,
208 pub stats_persistence: StatsPersistenceOptions,
210}
211
212impl fmt::Debug for MetasrvOptions {
213 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
214 let mut debug_struct = f.debug_struct("MetasrvOptions");
215 debug_struct
216 .field("store_addrs", &self.sanitize_store_addrs())
217 .field("backend_tls", &self.backend_tls)
218 .field("selector", &self.selector)
219 .field("use_memory_store", &self.use_memory_store)
220 .field("enable_region_failover", &self.enable_region_failover)
221 .field(
222 "allow_region_failover_on_local_wal",
223 &self.allow_region_failover_on_local_wal,
224 )
225 .field("grpc", &self.grpc)
226 .field("http", &self.http)
227 .field("logging", &self.logging)
228 .field("procedure", &self.procedure)
229 .field("failure_detector", &self.failure_detector)
230 .field("datanode", &self.datanode)
231 .field("enable_telemetry", &self.enable_telemetry)
232 .field("data_home", &self.data_home)
233 .field("wal", &self.wal)
234 .field("store_key_prefix", &self.store_key_prefix)
235 .field("max_txn_ops", &self.max_txn_ops)
236 .field("flush_stats_factor", &self.flush_stats_factor)
237 .field("tracing", &self.tracing)
238 .field("backend", &self.backend)
239 .field("event_recorder", &self.event_recorder)
240 .field("stats_persistence", &self.stats_persistence);
241
242 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
243 debug_struct.field("meta_table_name", &self.meta_table_name);
244
245 #[cfg(feature = "pg_kvbackend")]
246 debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
247 #[cfg(feature = "pg_kvbackend")]
248 debug_struct.field("meta_schema_name", &self.meta_schema_name);
249
250 debug_struct
251 .field("node_max_idle_time", &self.node_max_idle_time)
252 .finish()
253 }
254}
255
256const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
257
258impl Default for MetasrvOptions {
259 fn default() -> Self {
260 Self {
261 #[allow(deprecated)]
262 bind_addr: String::new(),
263 #[allow(deprecated)]
264 server_addr: String::new(),
265 store_addrs: vec!["127.0.0.1:2379".to_string()],
266 backend_tls: None,
267 selector: SelectorType::default(),
268 use_memory_store: false,
269 enable_region_failover: false,
270 region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
271 allow_region_failover_on_local_wal: false,
272 grpc: GrpcOptions {
273 bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
274 ..Default::default()
275 },
276 http: HttpOptions::default(),
277 logging: LoggingOptions::default(),
278 procedure: ProcedureConfig {
279 max_retry_times: 12,
280 retry_delay: Duration::from_millis(500),
281 max_metadata_value_size: Some(ReadableSize::kb(1500)),
284 max_running_procedures: 128,
285 },
286 failure_detector: PhiAccrualFailureDetectorOptions::default(),
287 datanode: DatanodeClientOptions::default(),
288 enable_telemetry: true,
289 data_home: DEFAULT_DATA_HOME.to_string(),
290 wal: MetasrvWalConfig::default(),
291 store_key_prefix: String::new(),
292 max_txn_ops: 128,
293 flush_stats_factor: 3,
294 tracing: TracingOptions::default(),
295 memory: MemoryOptions::default(),
296 backend: BackendImpl::EtcdStore,
297 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
298 meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
299 #[cfg(feature = "pg_kvbackend")]
300 meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
301 #[cfg(feature = "pg_kvbackend")]
302 meta_schema_name: None,
303 node_max_idle_time: Duration::from_secs(24 * 60 * 60),
304 event_recorder: EventRecorderOptions::default(),
305 stats_persistence: StatsPersistenceOptions::default(),
306 }
307 }
308}
309
310impl Configurable for MetasrvOptions {
311 fn env_list_keys() -> Option<&'static [&'static str]> {
312 Some(&["wal.broker_endpoints", "store_addrs"])
313 }
314}
315
316impl MetasrvOptions {
317 fn sanitize_store_addrs(&self) -> Vec<String> {
318 self.store_addrs
319 .iter()
320 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
321 .collect()
322 }
323}
324
325pub struct MetasrvInfo {
326 pub server_addr: String,
327}
328#[derive(Clone)]
329pub struct Context {
330 pub server_addr: String,
331 pub in_memory: ResettableKvBackendRef,
332 pub kv_backend: KvBackendRef,
333 pub leader_cached_kv_backend: ResettableKvBackendRef,
334 pub meta_peer_client: MetaPeerClientRef,
335 pub mailbox: MailboxRef,
336 pub election: Option<ElectionRef>,
337 pub is_infancy: bool,
338 pub table_metadata_manager: TableMetadataManagerRef,
339 pub cache_invalidator: CacheInvalidatorRef,
340 pub leader_region_registry: LeaderRegionRegistryRef,
341 pub topic_stats_registry: TopicStatsRegistryRef,
342}
343
344impl Context {
345 pub fn reset_in_memory(&self) {
346 self.in_memory.reset();
347 self.leader_region_registry.reset();
348 }
349}
350
351pub struct LeaderValue(pub String);
353
354impl<T: AsRef<[u8]>> From<T> for LeaderValue {
355 fn from(value: T) -> Self {
356 let string = String::from_utf8_lossy(value.as_ref());
357 Self(string.to_string())
358 }
359}
360
361#[derive(Debug, Clone, Serialize, Deserialize)]
362pub struct MetasrvNodeInfo {
363 pub addr: String,
365 pub version: String,
367 pub git_commit: String,
369 pub start_time_ms: u64,
371 #[serde(default)]
373 pub total_cpu_millicores: i64,
374 #[serde(default)]
376 pub total_memory_bytes: i64,
377 #[serde(default)]
379 pub cpu_usage_millicores: i64,
380 #[serde(default)]
382 pub memory_usage_bytes: i64,
383 #[serde(default)]
385 pub hostname: String,
386}
387
388#[allow(deprecated)]
390impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
391 fn from(node_info: MetasrvNodeInfo) -> Self {
392 Self {
393 peer: Some(api::v1::meta::Peer {
394 addr: node_info.addr,
395 ..Default::default()
396 }),
397 version: node_info.version.clone(),
400 git_commit: node_info.git_commit.clone(),
401 start_time_ms: node_info.start_time_ms,
402 cpus: node_info.total_cpu_millicores as u32,
403 memory_bytes: node_info.total_memory_bytes as u64,
404 info: Some(api::v1::meta::NodeInfo {
406 version: node_info.version,
407 git_commit: node_info.git_commit,
408 start_time_ms: node_info.start_time_ms,
409 total_cpu_millicores: node_info.total_cpu_millicores,
410 total_memory_bytes: node_info.total_memory_bytes,
411 cpu_usage_millicores: node_info.cpu_usage_millicores,
412 memory_usage_bytes: node_info.memory_usage_bytes,
413 cpus: node_info.total_cpu_millicores as u32,
414 memory_bytes: node_info.total_memory_bytes as u64,
415 hostname: node_info.hostname,
416 }),
417 }
418 }
419}
420
421#[derive(Clone, Copy)]
422pub enum SelectTarget {
423 Datanode,
424 Flownode,
425}
426
427impl Display for SelectTarget {
428 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
429 match self {
430 SelectTarget::Datanode => write!(f, "datanode"),
431 SelectTarget::Flownode => write!(f, "flownode"),
432 }
433 }
434}
435
436#[derive(Clone)]
437pub struct SelectorContext {
438 pub peer_discovery: PeerDiscoveryRef,
439}
440
441pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
442pub type RegionStatAwareSelectorRef =
443 Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
444pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
445
446pub struct MetaStateHandler {
447 subscribe_manager: Option<SubscriptionManagerRef>,
448 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
449 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
450 leadership_change_notifier: LeadershipChangeNotifier,
451 state: StateRef,
452}
453
454impl MetaStateHandler {
455 pub async fn on_leader_start(&self) {
456 self.state.write().unwrap().next_state(become_leader(false));
457
458 if let Err(e) = self.leader_cached_kv_backend.load().await {
459 error!(e; "Failed to load kv into leader cache kv store");
460 } else {
461 self.state.write().unwrap().next_state(become_leader(true));
462 }
463
464 self.leadership_change_notifier
465 .notify_on_leader_start()
466 .await;
467
468 self.greptimedb_telemetry_task.should_report(true);
469 }
470
471 pub async fn on_leader_stop(&self) {
472 self.state.write().unwrap().next_state(become_follower());
473
474 self.leadership_change_notifier
475 .notify_on_leader_stop()
476 .await;
477
478 self.greptimedb_telemetry_task.should_report(false);
480
481 if let Some(sub_manager) = self.subscribe_manager.clone() {
482 info!("Leader changed, un_subscribe all");
483 if let Err(e) = sub_manager.unsubscribe_all() {
484 error!(e; "Failed to un_subscribe all");
485 }
486 }
487 }
488}
489
490pub struct Metasrv {
491 state: StateRef,
492 started: Arc<AtomicBool>,
493 start_time_ms: u64,
494 options: MetasrvOptions,
495 in_memory: ResettableKvBackendRef,
498 kv_backend: KvBackendRef,
499 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
500 meta_peer_client: MetaPeerClientRef,
501 selector: SelectorRef,
503 selector_ctx: SelectorContext,
504 flow_selector: SelectorRef,
506 handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
507 handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
508 election: Option<ElectionRef>,
509 procedure_manager: ProcedureManagerRef,
510 mailbox: MailboxRef,
511 ddl_manager: DdlManagerRef,
512 wal_options_allocator: WalOptionsAllocatorRef,
513 table_metadata_manager: TableMetadataManagerRef,
514 runtime_switch_manager: RuntimeSwitchManagerRef,
515 memory_region_keeper: MemoryRegionKeeperRef,
516 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
517 region_migration_manager: RegionMigrationManagerRef,
518 region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
519 cache_invalidator: CacheInvalidatorRef,
520 leader_region_registry: LeaderRegionRegistryRef,
521 topic_stats_registry: TopicStatsRegistryRef,
522 wal_prune_ticker: Option<WalPruneTickerRef>,
523 region_flush_ticker: Option<RegionFlushTickerRef>,
524 table_id_sequence: SequenceRef,
525 reconciliation_manager: ReconciliationManagerRef,
526 resource_stat: ResourceStatRef,
527
528 plugins: Plugins,
529}
530
531impl Metasrv {
532 pub async fn try_start(&self) -> Result<()> {
533 if self
534 .started
535 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
536 .is_err()
537 {
538 warn!("Metasrv already started");
539 return Ok(());
540 }
541
542 let handler_group_builder =
543 self.handler_group_builder
544 .lock()
545 .unwrap()
546 .take()
547 .context(error::UnexpectedSnafu {
548 violated: "expected heartbeat handler group builder",
549 })?;
550 *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
551
552 self.table_metadata_manager
554 .init()
555 .await
556 .context(InitMetadataSnafu)?;
557
558 if let Some(election) = self.election() {
559 let procedure_manager = self.procedure_manager.clone();
560 let in_memory = self.in_memory.clone();
561 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
562 let subscribe_manager = self.subscription_manager();
563 let mut rx = election.subscribe_leader_change();
564 let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
565 greptimedb_telemetry_task
566 .start()
567 .context(StartTelemetryTaskSnafu)?;
568
569 let mut leadership_change_notifier = LeadershipChangeNotifier::default();
571 leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
572 leadership_change_notifier
573 .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
574 leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
575 self.options.node_max_idle_time,
576 self.in_memory.clone(),
577 )));
578 if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
579 leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
580 }
581 if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
582 leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
583 }
584 if let Some(region_flush_trigger) = &self.region_flush_ticker {
585 leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
586 }
587 if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
588 customizer.customize(&mut leadership_change_notifier);
589 }
590
591 let state_handler = MetaStateHandler {
592 greptimedb_telemetry_task,
593 subscribe_manager,
594 state: self.state.clone(),
595 leader_cached_kv_backend: leader_cached_kv_backend.clone(),
596 leadership_change_notifier,
597 };
598 let _handle = common_runtime::spawn_global(async move {
599 loop {
600 match rx.recv().await {
601 Ok(msg) => {
602 in_memory.reset();
603 leader_cached_kv_backend.reset();
604 info!("Leader's cache has bean cleared on leader change: {msg}");
605 match msg {
606 LeaderChangeMessage::Elected(_) => {
607 state_handler.on_leader_start().await;
608 }
609 LeaderChangeMessage::StepDown(leader) => {
610 error!("Leader :{:?} step down", leader);
611
612 state_handler.on_leader_stop().await;
613 }
614 }
615 }
616 Err(RecvError::Closed) => {
617 error!("Not expected, is leader election loop still running?");
618 break;
619 }
620 Err(RecvError::Lagged(_)) => {
621 break;
622 }
623 }
624 }
625
626 state_handler.on_leader_stop().await;
627 });
628
629 {
631 let election = election.clone();
632 let started = self.started.clone();
633 let node_info = self.node_info();
634 let _handle = common_runtime::spawn_global(async move {
635 while started.load(Ordering::Acquire) {
636 let res = election.register_candidate(&node_info).await;
637 if let Err(e) = res {
638 warn!(e; "Metasrv register candidate error");
639 }
640 }
641 });
642 }
643
644 {
646 let election = election.clone();
647 let started = self.started.clone();
648 let _handle = common_runtime::spawn_global(async move {
649 while started.load(Ordering::Acquire) {
650 let res = election.campaign().await;
651 if let Err(e) = res {
652 warn!(e; "Metasrv election error");
653 }
654 election.reset_campaign().await;
655 info!("Metasrv re-initiate election");
656 }
657 info!("Metasrv stopped");
658 });
659 }
660 } else {
661 warn!(
662 "Ensure only one instance of Metasrv is running, as there is no election service."
663 );
664
665 if let Err(e) = self.wal_options_allocator.start().await {
666 error!(e; "Failed to start wal options allocator");
667 }
668 self.leader_cached_kv_backend
670 .load()
671 .await
672 .context(KvBackendSnafu)?;
673 self.procedure_manager
674 .start()
675 .await
676 .context(StartProcedureManagerSnafu)?;
677 }
678
679 info!("Metasrv started");
680
681 Ok(())
682 }
683
684 pub async fn shutdown(&self) -> Result<()> {
685 if self
686 .started
687 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
688 .is_err()
689 {
690 warn!("Metasrv already stopped");
691 return Ok(());
692 }
693
694 self.procedure_manager
695 .stop()
696 .await
697 .context(StopProcedureManagerSnafu)?;
698
699 info!("Metasrv stopped");
700
701 Ok(())
702 }
703
704 pub fn start_time_ms(&self) -> u64 {
705 self.start_time_ms
706 }
707
708 pub fn resource_stat(&self) -> &ResourceStatRef {
709 &self.resource_stat
710 }
711
712 pub fn node_info(&self) -> MetasrvNodeInfo {
713 let build_info = common_version::build_info();
714 MetasrvNodeInfo {
715 addr: self.options().grpc.server_addr.clone(),
716 version: build_info.version.to_string(),
717 git_commit: build_info.commit_short.to_string(),
718 start_time_ms: self.start_time_ms(),
719 total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
720 total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
721 cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
722 memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
723 hostname: hostname::get()
724 .unwrap_or_default()
725 .to_string_lossy()
726 .to_string(),
727 }
728 }
729
730 pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
733 discovery::utils::alive_datanode(
734 &DefaultSystemTimer,
735 self.meta_peer_client.as_ref(),
736 peer_id,
737 Duration::from_secs(distributed_time_constants::DATANODE_LEASE_SECS),
738 )
739 .await
740 }
741
742 pub fn options(&self) -> &MetasrvOptions {
743 &self.options
744 }
745
746 pub fn in_memory(&self) -> &ResettableKvBackendRef {
747 &self.in_memory
748 }
749
750 pub fn kv_backend(&self) -> &KvBackendRef {
751 &self.kv_backend
752 }
753
754 pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
755 &self.meta_peer_client
756 }
757
758 pub fn selector(&self) -> &SelectorRef {
759 &self.selector
760 }
761
762 pub fn selector_ctx(&self) -> &SelectorContext {
763 &self.selector_ctx
764 }
765
766 pub fn flow_selector(&self) -> &SelectorRef {
767 &self.flow_selector
768 }
769
770 pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
771 self.handler_group.read().unwrap().clone()
772 }
773
774 pub fn election(&self) -> Option<&ElectionRef> {
775 self.election.as_ref()
776 }
777
778 pub fn mailbox(&self) -> &MailboxRef {
779 &self.mailbox
780 }
781
782 pub fn ddl_manager(&self) -> &DdlManagerRef {
783 &self.ddl_manager
784 }
785
786 pub fn procedure_manager(&self) -> &ProcedureManagerRef {
787 &self.procedure_manager
788 }
789
790 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
791 &self.table_metadata_manager
792 }
793
794 pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
795 &self.runtime_switch_manager
796 }
797
798 pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
799 &self.memory_region_keeper
800 }
801
802 pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
803 &self.region_migration_manager
804 }
805
806 pub fn publish(&self) -> Option<PublisherRef> {
807 self.plugins.get::<PublisherRef>()
808 }
809
810 pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
811 self.plugins.get::<SubscriptionManagerRef>()
812 }
813
814 pub fn table_id_sequence(&self) -> &SequenceRef {
815 &self.table_id_sequence
816 }
817
818 pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
819 &self.reconciliation_manager
820 }
821
822 pub fn plugins(&self) -> &Plugins {
823 &self.plugins
824 }
825
826 pub fn started(&self) -> Arc<AtomicBool> {
827 self.started.clone()
828 }
829
830 #[inline]
831 pub fn new_ctx(&self) -> Context {
832 let server_addr = self.options().grpc.server_addr.clone();
833 let in_memory = self.in_memory.clone();
834 let kv_backend = self.kv_backend.clone();
835 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
836 let meta_peer_client = self.meta_peer_client.clone();
837 let mailbox = self.mailbox.clone();
838 let election = self.election.clone();
839 let table_metadata_manager = self.table_metadata_manager.clone();
840 let cache_invalidator = self.cache_invalidator.clone();
841 let leader_region_registry = self.leader_region_registry.clone();
842 let topic_stats_registry = self.topic_stats_registry.clone();
843
844 Context {
845 server_addr,
846 in_memory,
847 kv_backend,
848 leader_cached_kv_backend,
849 meta_peer_client,
850 mailbox,
851 election,
852 is_infancy: false,
853 table_metadata_manager,
854 cache_invalidator,
855 leader_region_registry,
856 topic_stats_registry,
857 }
858 }
859}
860
861#[cfg(test)]
862mod tests {
863 use crate::metasrv::MetasrvNodeInfo;
864
865 #[test]
866 fn test_deserialize_metasrv_node_info() {
867 let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
868 let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
869 assert_eq!(node_info.addr, "127.0.0.1:4002");
870 assert_eq!(node_info.version, "0.1.0");
871 assert_eq!(node_info.git_commit, "1234567890");
872 assert_eq!(node_info.start_time_ms, 1715145600);
873 }
874}