1pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use api::v1::meta::{HeartbeatConfig, Role};
23use clap::ValueEnum;
24use common_base::Plugins;
25use common_base::readable_size::ReadableSize;
26use common_config::{Configurable, DEFAULT_DATA_HOME};
27use common_event_recorder::EventRecorderOptions;
28use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
29use common_meta::cache_invalidator::CacheInvalidatorRef;
30use common_meta::ddl::allocator::resource_id::ResourceIdAllocatorRef;
31use common_meta::ddl_manager::DdlManagerRef;
32use common_meta::distributed_time_constants::{
33 self, BASE_HEARTBEAT_INTERVAL, default_distributed_time_constants, frontend_heartbeat_interval,
34};
35use common_meta::election::LeaderChangeMessage;
36pub use common_meta::election::{ElectionRef, MetasrvNodeInfo};
37use common_meta::key::TableMetadataManagerRef;
38use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
39use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
40use common_meta::leadership_notifier::{
41 LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
42};
43use common_meta::node_expiry_listener::NodeExpiryListener;
44use common_meta::peer::{Peer, PeerDiscoveryRef};
45use common_meta::reconciliation::manager::ReconciliationManagerRef;
46use common_meta::region_keeper::MemoryRegionKeeperRef;
47use common_meta::region_registry::LeaderRegionRegistryRef;
48use common_meta::stats::topic::TopicStatsRegistryRef;
49use common_meta::wal_provider::WalProviderRef;
50use common_options::datanode::DatanodeClientOptions;
51use common_options::memory::MemoryOptions;
52use common_procedure::ProcedureManagerRef;
53use common_procedure::options::ProcedureConfig;
54use common_stat::ResourceStatRef;
55use common_telemetry::logging::{LoggingOptions, TracingOptions};
56use common_telemetry::{error, info, warn};
57use common_time::util::DefaultSystemTimer;
58use common_wal::config::MetasrvWalConfig;
59use serde::{Deserialize, Serialize};
60use servers::grpc::GrpcOptions;
61use servers::http::HttpOptions;
62use servers::tls::TlsOption;
63use snafu::{OptionExt, ResultExt};
64use store_api::storage::RegionId;
65use tokio::sync::broadcast::error::RecvError;
66
67use crate::cluster::MetaPeerClientRef;
68use crate::discovery;
69use crate::error::{
70 self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
71 StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
72};
73use crate::failure_detector::PhiAccrualFailureDetectorOptions;
74use crate::gc::{GcSchedulerOptions, GcTickerRef};
75use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
76use crate::procedure::ProcedureManagerListenerAdapter;
77use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
78use crate::procedure::wal_prune::manager::WalPruneTickerRef;
79use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
80use crate::region::flush_trigger::RegionFlushTickerRef;
81use crate::region::supervisor::RegionSupervisorTickerRef;
82use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
83use crate::service::mailbox::MailboxRef;
84use crate::service::store::cached_kv::LeaderCachedKvBackend;
85use crate::state::{StateRef, become_follower, become_leader};
86use crate::utils::database::DatabaseOperatorRef;
87
88pub const TABLE_ID_SEQ: &str = "table_id";
89pub const FLOW_ID_SEQ: &str = "flow_id";
90pub const METASRV_DATA_DIR: &str = "metasrv";
91
92#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
94#[serde(rename_all = "snake_case")]
95pub enum BackendImpl {
96 #[default]
98 EtcdStore,
99 MemoryStore,
101 #[cfg(feature = "pg_kvbackend")]
102 PostgresStore,
104 #[cfg(feature = "mysql_kvbackend")]
105 MysqlStore,
107}
108
109#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
111pub struct StatsPersistenceOptions {
112 #[serde(with = "humantime_serde")]
114 pub ttl: Duration,
115 #[serde(with = "humantime_serde")]
117 pub interval: Duration,
118}
119
120impl Default for StatsPersistenceOptions {
121 fn default() -> Self {
122 Self {
123 ttl: Duration::ZERO,
124 interval: Duration::from_mins(10),
125 }
126 }
127}
128
129#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
131#[serde(default)]
132pub struct HeartbeatOptions {
133 #[serde(with = "humantime_serde")]
135 pub interval: Duration,
136 #[serde(with = "humantime_serde")]
138 pub retry_interval: Duration,
139}
140
141impl Default for HeartbeatOptions {
142 fn default() -> Self {
143 Self {
144 interval: BASE_HEARTBEAT_INTERVAL,
145 retry_interval: BASE_HEARTBEAT_INTERVAL,
146 }
147 }
148}
149
150impl HeartbeatOptions {
151 pub fn datanode_from(base_interval: Duration) -> Self {
152 Self {
153 interval: base_interval,
154 retry_interval: base_interval,
155 }
156 }
157
158 pub fn frontend_from(base_interval: Duration) -> Self {
159 Self {
160 interval: frontend_heartbeat_interval(base_interval),
161 retry_interval: base_interval,
162 }
163 }
164
165 pub fn flownode_from(base_interval: Duration) -> Self {
166 Self {
167 interval: base_interval,
168 retry_interval: base_interval,
169 }
170 }
171}
172
173impl From<HeartbeatOptions> for HeartbeatConfig {
174 fn from(opts: HeartbeatOptions) -> Self {
175 Self {
176 heartbeat_interval_ms: opts.interval.as_millis() as u64,
177 retry_interval_ms: opts.retry_interval.as_millis() as u64,
178 }
179 }
180}
181
182#[derive(Clone, PartialEq, Serialize, Deserialize, Debug)]
183#[serde(default)]
184pub struct BackendClientOptions {
185 #[serde(with = "humantime_serde")]
186 pub keep_alive_timeout: Duration,
187 #[serde(with = "humantime_serde")]
188 pub keep_alive_interval: Duration,
189 #[serde(with = "humantime_serde")]
190 pub connect_timeout: Duration,
191}
192
193impl Default for BackendClientOptions {
194 fn default() -> Self {
195 Self {
196 keep_alive_interval: Duration::from_secs(10),
197 keep_alive_timeout: Duration::from_secs(3),
198 connect_timeout: Duration::from_secs(3),
199 }
200 }
201}
202
203#[derive(Clone, PartialEq, Serialize, Deserialize)]
204#[serde(default)]
205pub struct MetasrvOptions {
206 #[deprecated(note = "Use grpc.bind_addr instead")]
208 pub bind_addr: String,
209 #[deprecated(note = "Use grpc.server_addr instead")]
211 pub server_addr: String,
212 pub store_addrs: Vec<String>,
214 #[serde(default)]
217 pub backend_tls: Option<TlsOption>,
218 #[serde(default)]
221 pub backend_client: BackendClientOptions,
222 pub selector: SelectorType,
224 pub enable_region_failover: bool,
226 #[serde(with = "humantime_serde")]
231 pub heartbeat_interval: Duration,
232 #[serde(with = "humantime_serde")]
236 pub region_failure_detector_initialization_delay: Duration,
237 pub allow_region_failover_on_local_wal: bool,
242 pub grpc: GrpcOptions,
243 pub http: HttpOptions,
245 pub logging: LoggingOptions,
247 pub procedure: ProcedureConfig,
249 pub failure_detector: PhiAccrualFailureDetectorOptions,
251 pub datanode: DatanodeClientOptions,
253 pub enable_telemetry: bool,
255 pub data_home: String,
257 pub wal: MetasrvWalConfig,
259 pub store_key_prefix: String,
262 pub max_txn_ops: usize,
273 pub flush_stats_factor: usize,
277 pub tracing: TracingOptions,
279 pub memory: MemoryOptions,
281 pub backend: BackendImpl,
283 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
284 pub meta_table_name: String,
286 #[cfg(feature = "pg_kvbackend")]
287 pub meta_election_lock_id: u64,
289 #[cfg(feature = "pg_kvbackend")]
290 pub meta_schema_name: Option<String>,
292 #[cfg(feature = "pg_kvbackend")]
293 pub auto_create_schema: bool,
295 #[serde(with = "humantime_serde")]
296 pub node_max_idle_time: Duration,
297 pub event_recorder: EventRecorderOptions,
299 pub stats_persistence: StatsPersistenceOptions,
301 pub gc: GcSchedulerOptions,
303}
304
305impl fmt::Debug for MetasrvOptions {
306 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
307 let mut debug_struct = f.debug_struct("MetasrvOptions");
308 debug_struct
309 .field("store_addrs", &self.sanitize_store_addrs())
310 .field("backend_tls", &self.backend_tls)
311 .field("selector", &self.selector)
312 .field("enable_region_failover", &self.enable_region_failover)
313 .field(
314 "allow_region_failover_on_local_wal",
315 &self.allow_region_failover_on_local_wal,
316 )
317 .field("grpc", &self.grpc)
318 .field("http", &self.http)
319 .field("logging", &self.logging)
320 .field("procedure", &self.procedure)
321 .field("failure_detector", &self.failure_detector)
322 .field("datanode", &self.datanode)
323 .field("enable_telemetry", &self.enable_telemetry)
324 .field("data_home", &self.data_home)
325 .field("wal", &self.wal)
326 .field("store_key_prefix", &self.store_key_prefix)
327 .field("max_txn_ops", &self.max_txn_ops)
328 .field("flush_stats_factor", &self.flush_stats_factor)
329 .field("tracing", &self.tracing)
330 .field("backend", &self.backend)
331 .field("event_recorder", &self.event_recorder)
332 .field("stats_persistence", &self.stats_persistence)
333 .field("heartbeat_interval", &self.heartbeat_interval)
334 .field("backend_client", &self.backend_client);
335
336 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
337 debug_struct.field("meta_table_name", &self.meta_table_name);
338
339 #[cfg(feature = "pg_kvbackend")]
340 debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
341 #[cfg(feature = "pg_kvbackend")]
342 debug_struct.field("meta_schema_name", &self.meta_schema_name);
343
344 debug_struct
345 .field("node_max_idle_time", &self.node_max_idle_time)
346 .finish()
347 }
348}
349
350const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
351
352impl Default for MetasrvOptions {
353 fn default() -> Self {
354 Self {
355 #[allow(deprecated)]
356 bind_addr: String::new(),
357 #[allow(deprecated)]
358 server_addr: String::new(),
359 store_addrs: vec!["127.0.0.1:2379".to_string()],
360 backend_tls: Some(TlsOption::prefer()),
361 selector: SelectorType::default(),
362 enable_region_failover: false,
363 heartbeat_interval: distributed_time_constants::BASE_HEARTBEAT_INTERVAL,
364 region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
365 allow_region_failover_on_local_wal: false,
366 grpc: GrpcOptions {
367 bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
368 ..Default::default()
369 },
370 http: HttpOptions::default(),
371 logging: LoggingOptions::default(),
372 procedure: ProcedureConfig {
373 max_retry_times: 12,
374 retry_delay: Duration::from_millis(500),
375 max_metadata_value_size: Some(ReadableSize::kb(1500)),
378 max_running_procedures: 128,
379 },
380 failure_detector: PhiAccrualFailureDetectorOptions::default(),
381 datanode: DatanodeClientOptions::default(),
382 enable_telemetry: true,
383 data_home: DEFAULT_DATA_HOME.to_string(),
384 wal: MetasrvWalConfig::default(),
385 store_key_prefix: String::new(),
386 max_txn_ops: 128,
387 flush_stats_factor: 3,
388 tracing: TracingOptions::default(),
389 memory: MemoryOptions::default(),
390 backend: BackendImpl::EtcdStore,
391 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
392 meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
393 #[cfg(feature = "pg_kvbackend")]
394 meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
395 #[cfg(feature = "pg_kvbackend")]
396 meta_schema_name: None,
397 #[cfg(feature = "pg_kvbackend")]
398 auto_create_schema: true,
399 node_max_idle_time: Duration::from_secs(24 * 60 * 60),
400 event_recorder: EventRecorderOptions::default(),
401 stats_persistence: StatsPersistenceOptions::default(),
402 gc: GcSchedulerOptions::default(),
403 backend_client: BackendClientOptions::default(),
404 }
405 }
406}
407
408impl Configurable for MetasrvOptions {
409 fn env_list_keys() -> Option<&'static [&'static str]> {
410 Some(&["wal.broker_endpoints", "store_addrs"])
411 }
412}
413
414impl MetasrvOptions {
415 fn sanitize_store_addrs(&self) -> Vec<String> {
416 self.store_addrs
417 .iter()
418 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
419 .collect()
420 }
421}
422
423pub struct MetasrvInfo {
424 pub server_addr: String,
425}
426#[derive(Clone)]
427pub struct Context {
428 pub server_addr: String,
429 pub in_memory: ResettableKvBackendRef,
430 pub kv_backend: KvBackendRef,
431 pub leader_cached_kv_backend: ResettableKvBackendRef,
432 pub meta_peer_client: MetaPeerClientRef,
433 pub mailbox: MailboxRef,
434 pub election: Option<ElectionRef>,
435 pub is_infancy: bool,
436 pub table_metadata_manager: TableMetadataManagerRef,
437 pub cache_invalidator: CacheInvalidatorRef,
438 pub leader_region_registry: LeaderRegionRegistryRef,
439 pub topic_stats_registry: TopicStatsRegistryRef,
440 pub heartbeat_interval: Duration,
441 pub is_handshake: bool,
442}
443
444impl Context {
445 pub fn reset_in_memory(&self) {
446 self.in_memory.reset();
447 self.leader_region_registry.reset();
448 }
449
450 pub fn with_handshake(mut self, is_handshake: bool) -> Self {
451 self.is_handshake = is_handshake;
452 self
453 }
454
455 pub fn heartbeat_options_for(&self, role: Role) -> HeartbeatOptions {
456 match role {
457 Role::Datanode => HeartbeatOptions::datanode_from(self.heartbeat_interval),
458 Role::Frontend => HeartbeatOptions::frontend_from(self.heartbeat_interval),
459 Role::Flownode => HeartbeatOptions::flownode_from(self.heartbeat_interval),
460 }
461 }
462}
463
464#[derive(Clone, Copy)]
465pub enum SelectTarget {
466 Datanode,
467 Flownode,
468}
469
470impl Display for SelectTarget {
471 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
472 match self {
473 SelectTarget::Datanode => write!(f, "datanode"),
474 SelectTarget::Flownode => write!(f, "flownode"),
475 }
476 }
477}
478
479#[derive(Clone)]
480pub struct SelectorContext {
481 pub peer_discovery: PeerDiscoveryRef,
482}
483
484pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
485
486pub struct SelectorFactoryContext {
493 pub metasrv_options: MetasrvOptions,
494 pub meta_peer_client: MetaPeerClientRef,
495 pub in_memory: ResettableKvBackendRef,
496 pub election: Option<ElectionRef>,
497 pub base_selector: SelectorRef,
498}
499
500pub trait SelectorFactory: Send + Sync {
502 fn build(&self, ctx: SelectorFactoryContext) -> SelectorRef;
503}
504
505pub type SelectorFactoryRef = Arc<dyn SelectorFactory>;
507pub type RegionStatAwareSelectorRef =
508 Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
509
510pub struct MetaStateHandler {
511 subscribe_manager: Option<SubscriptionManagerRef>,
512 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
513 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
514 leadership_change_notifier: LeadershipChangeNotifier,
515 state: StateRef,
516}
517
518impl MetaStateHandler {
519 pub async fn on_leader_start(&self) {
520 self.state.write().unwrap().next_state(become_leader(false));
521
522 if let Err(e) = self.leader_cached_kv_backend.load().await {
523 error!(e; "Failed to load kv into leader cache kv store");
524 } else {
525 self.state.write().unwrap().next_state(become_leader(true));
526 }
527
528 self.leadership_change_notifier
529 .notify_on_leader_start()
530 .await;
531
532 self.greptimedb_telemetry_task.should_report(true);
533 }
534
535 pub async fn on_leader_stop(&self) {
536 self.state.write().unwrap().next_state(become_follower());
537
538 self.leadership_change_notifier
539 .notify_on_leader_stop()
540 .await;
541
542 self.greptimedb_telemetry_task.should_report(false);
544
545 if let Some(sub_manager) = self.subscribe_manager.clone() {
546 info!("Leader changed, un_subscribe all");
547 if let Err(e) = sub_manager.unsubscribe_all() {
548 error!(e; "Failed to un_subscribe all");
549 }
550 }
551 }
552}
553
554pub struct Metasrv {
555 state: StateRef,
556 started: Arc<AtomicBool>,
557 start_time_ms: u64,
558 options: MetasrvOptions,
559 in_memory: ResettableKvBackendRef,
562 kv_backend: KvBackendRef,
563 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
564 meta_peer_client: MetaPeerClientRef,
565 selector: SelectorRef,
567 selector_ctx: SelectorContext,
568 flow_selector: SelectorRef,
570 handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
571 handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
572 election: Option<ElectionRef>,
573 procedure_manager: ProcedureManagerRef,
574 mailbox: MailboxRef,
575 ddl_manager: DdlManagerRef,
576 wal_provider: WalProviderRef,
577 table_metadata_manager: TableMetadataManagerRef,
578 runtime_switch_manager: RuntimeSwitchManagerRef,
579 memory_region_keeper: MemoryRegionKeeperRef,
580 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
581 region_migration_manager: RegionMigrationManagerRef,
582 region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
583 cache_invalidator: CacheInvalidatorRef,
584 leader_region_registry: LeaderRegionRegistryRef,
585 topic_stats_registry: TopicStatsRegistryRef,
586 wal_prune_ticker: Option<WalPruneTickerRef>,
587 region_flush_ticker: Option<RegionFlushTickerRef>,
588 table_id_allocator: ResourceIdAllocatorRef,
589 reconciliation_manager: ReconciliationManagerRef,
590 resource_stat: ResourceStatRef,
591 gc_ticker: Option<GcTickerRef>,
592 database_operator: DatabaseOperatorRef,
593
594 plugins: Plugins,
595}
596
597impl Metasrv {
598 pub async fn try_start(&self) -> Result<()> {
599 if self
600 .started
601 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
602 .is_err()
603 {
604 warn!("Metasrv already started");
605 return Ok(());
606 }
607
608 let handler_group_builder =
609 self.handler_group_builder
610 .lock()
611 .unwrap()
612 .take()
613 .context(error::UnexpectedSnafu {
614 violated: "expected heartbeat handler group builder",
615 })?;
616 *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
617
618 self.table_metadata_manager
620 .init()
621 .await
622 .context(InitMetadataSnafu)?;
623
624 if let Some(election) = self.election() {
625 let procedure_manager = self.procedure_manager.clone();
626 let in_memory = self.in_memory.clone();
627 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
628 let subscribe_manager = self.subscription_manager();
629 let mut rx = election.subscribe_leader_change();
630 let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
631 greptimedb_telemetry_task
632 .start()
633 .context(StartTelemetryTaskSnafu)?;
634
635 let mut leadership_change_notifier = LeadershipChangeNotifier::default();
637 leadership_change_notifier.add_listener(self.wal_provider.clone());
638 leadership_change_notifier
639 .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
640 leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
641 self.options.node_max_idle_time,
642 self.in_memory.clone(),
643 )));
644 if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
645 leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
646 }
647 if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
648 leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
649 }
650 if let Some(region_flush_trigger) = &self.region_flush_ticker {
651 leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
652 }
653 if let Some(gc_ticker) = &self.gc_ticker {
654 leadership_change_notifier.add_listener(gc_ticker.clone() as _);
655 }
656 if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
657 customizer.customize(&mut leadership_change_notifier);
658 }
659
660 let state_handler = MetaStateHandler {
661 greptimedb_telemetry_task,
662 subscribe_manager,
663 state: self.state.clone(),
664 leader_cached_kv_backend: leader_cached_kv_backend.clone(),
665 leadership_change_notifier,
666 };
667 let _handle = common_runtime::spawn_global(async move {
668 loop {
669 match rx.recv().await {
670 Ok(msg) => {
671 in_memory.reset();
672 leader_cached_kv_backend.reset();
673 info!("Leader's cache has bean cleared on leader change: {msg}");
674 match msg {
675 LeaderChangeMessage::Elected(_) => {
676 state_handler.on_leader_start().await;
677 }
678 LeaderChangeMessage::StepDown(leader) => {
679 error!("Leader :{:?} step down", leader);
680
681 state_handler.on_leader_stop().await;
682 }
683 }
684 }
685 Err(RecvError::Closed) => {
686 error!("Not expected, is leader election loop still running?");
687 break;
688 }
689 Err(RecvError::Lagged(_)) => {
690 break;
691 }
692 }
693 }
694
695 state_handler.on_leader_stop().await;
696 });
697
698 {
700 let election = election.clone();
701 let started = self.started.clone();
702 let node_info = self.node_info();
703 let _handle = common_runtime::spawn_global(async move {
704 while started.load(Ordering::Acquire) {
705 let res = election.register_candidate(&node_info).await;
706 if let Err(e) = res {
707 warn!(e; "Metasrv register candidate error");
708 }
709 }
710 });
711 }
712
713 {
715 let election = election.clone();
716 let started = self.started.clone();
717 let _handle = common_runtime::spawn_global(async move {
718 while started.load(Ordering::Acquire) {
719 let res = election.campaign().await;
720 if let Err(e) = res {
721 warn!(e; "Metasrv election error");
722 }
723 election.reset_campaign().await;
724 info!("Metasrv re-initiate election");
725 }
726 info!("Metasrv stopped");
727 });
728 }
729 } else {
730 warn!(
731 "Ensure only one instance of Metasrv is running, as there is no election service."
732 );
733
734 if let Err(e) = self.wal_provider.start().await {
735 error!(e; "Failed to start wal provider");
736 }
737 self.leader_cached_kv_backend
739 .load()
740 .await
741 .context(KvBackendSnafu)?;
742 self.procedure_manager
743 .start()
744 .await
745 .context(StartProcedureManagerSnafu)?;
746 }
747
748 info!("Metasrv started");
749
750 Ok(())
751 }
752
753 pub async fn shutdown(&self) -> Result<()> {
754 if self
755 .started
756 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
757 .is_err()
758 {
759 warn!("Metasrv already stopped");
760 return Ok(());
761 }
762
763 self.procedure_manager
764 .stop()
765 .await
766 .context(StopProcedureManagerSnafu)?;
767
768 info!("Metasrv stopped");
769
770 Ok(())
771 }
772
773 pub fn start_time_ms(&self) -> u64 {
774 self.start_time_ms
775 }
776
777 pub fn resource_stat(&self) -> &ResourceStatRef {
778 &self.resource_stat
779 }
780
781 pub fn node_info(&self) -> MetasrvNodeInfo {
782 let build_info = common_version::build_info();
783 MetasrvNodeInfo {
784 addr: self.options().grpc.server_addr.clone(),
785 version: build_info.version.to_string(),
786 git_commit: build_info.commit_short.to_string(),
787 start_time_ms: self.start_time_ms(),
788 total_cpu_millicores: self.resource_stat.get_total_cpu_millicores(),
789 total_memory_bytes: self.resource_stat.get_total_memory_bytes(),
790 cpu_usage_millicores: self.resource_stat.get_cpu_usage_millicores(),
791 memory_usage_bytes: self.resource_stat.get_memory_usage_bytes(),
792 hostname: hostname::get()
793 .unwrap_or_default()
794 .to_string_lossy()
795 .to_string(),
796 }
797 }
798
799 pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
802 discovery::utils::alive_datanode(
803 &DefaultSystemTimer,
804 self.meta_peer_client.as_ref(),
805 peer_id,
806 default_distributed_time_constants().datanode_lease,
807 )
808 .await
809 }
810
811 pub fn options(&self) -> &MetasrvOptions {
812 &self.options
813 }
814
815 pub fn in_memory(&self) -> &ResettableKvBackendRef {
816 &self.in_memory
817 }
818
819 pub fn kv_backend(&self) -> &KvBackendRef {
820 &self.kv_backend
821 }
822
823 pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
824 &self.meta_peer_client
825 }
826
827 pub fn selector(&self) -> &SelectorRef {
828 &self.selector
829 }
830
831 pub fn selector_ctx(&self) -> &SelectorContext {
832 &self.selector_ctx
833 }
834
835 pub fn flow_selector(&self) -> &SelectorRef {
836 &self.flow_selector
837 }
838
839 pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
840 self.handler_group.read().unwrap().clone()
841 }
842
843 pub fn election(&self) -> Option<&ElectionRef> {
844 self.election.as_ref()
845 }
846
847 pub fn mailbox(&self) -> &MailboxRef {
848 &self.mailbox
849 }
850
851 pub fn ddl_manager(&self) -> &DdlManagerRef {
852 &self.ddl_manager
853 }
854
855 pub fn procedure_manager(&self) -> &ProcedureManagerRef {
856 &self.procedure_manager
857 }
858
859 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
860 &self.table_metadata_manager
861 }
862
863 pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
864 &self.runtime_switch_manager
865 }
866
867 pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
868 &self.memory_region_keeper
869 }
870
871 pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
872 &self.region_migration_manager
873 }
874
875 pub fn publish(&self) -> Option<PublisherRef> {
876 self.plugins.get::<PublisherRef>()
877 }
878
879 pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
880 self.plugins.get::<SubscriptionManagerRef>()
881 }
882
883 pub fn table_id_allocator(&self) -> &ResourceIdAllocatorRef {
884 &self.table_id_allocator
885 }
886
887 pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
888 &self.reconciliation_manager
889 }
890
891 pub fn database_operator(&self) -> &DatabaseOperatorRef {
892 &self.database_operator
893 }
894
895 pub fn plugins(&self) -> &Plugins {
896 &self.plugins
897 }
898
899 pub fn started(&self) -> Arc<AtomicBool> {
900 self.started.clone()
901 }
902
903 pub fn gc_ticker(&self) -> Option<GcTickerRef> {
904 self.gc_ticker.as_ref().cloned()
905 }
906
907 #[inline]
908 pub fn new_ctx(&self) -> Context {
909 let server_addr = self.options().grpc.server_addr.clone();
910 let in_memory = self.in_memory.clone();
911 let kv_backend = self.kv_backend.clone();
912 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
913 let meta_peer_client = self.meta_peer_client.clone();
914 let mailbox = self.mailbox.clone();
915 let election = self.election.clone();
916 let table_metadata_manager = self.table_metadata_manager.clone();
917 let cache_invalidator = self.cache_invalidator.clone();
918 let leader_region_registry = self.leader_region_registry.clone();
919 let topic_stats_registry = self.topic_stats_registry.clone();
920
921 Context {
922 server_addr,
923 in_memory,
924 kv_backend,
925 leader_cached_kv_backend,
926 meta_peer_client,
927 mailbox,
928 election,
929 is_infancy: false,
930 table_metadata_manager,
931 cache_invalidator,
932 leader_region_registry,
933 topic_stats_registry,
934 heartbeat_interval: self.options().heartbeat_interval,
935 is_handshake: false,
936 }
937 }
938}
939
940#[cfg(test)]
941mod tests {
942 use crate::metasrv::MetasrvNodeInfo;
943
944 #[test]
945 fn test_deserialize_metasrv_node_info() {
946 let str = r#"{"addr":"127.0.0.1:4002","version":"0.1.0","git_commit":"1234567890","start_time_ms":1715145600}"#;
947 let node_info: MetasrvNodeInfo = serde_json::from_str(str).unwrap();
948 assert_eq!(node_info.addr, "127.0.0.1:4002");
949 assert_eq!(node_info.version, "0.1.0");
950 assert_eq!(node_info.git_commit, "1234567890");
951 assert_eq!(node_info.start_time_ms, 1715145600);
952 }
953}