1pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use clap::ValueEnum;
23use common_base::Plugins;
24use common_base::readable_size::ReadableSize;
25use common_config::utils::ResourceSpec;
26use common_config::{Configurable, DEFAULT_DATA_HOME};
27use common_event_recorder::EventRecorderOptions;
28use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
29use common_meta::cache_invalidator::CacheInvalidatorRef;
30use common_meta::ddl_manager::DdlManagerRef;
31use common_meta::distributed_time_constants;
32use common_meta::key::TableMetadataManagerRef;
33use common_meta::key::runtime_switch::RuntimeSwitchManagerRef;
34use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
35use common_meta::leadership_notifier::{
36 LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
37};
38use common_meta::node_expiry_listener::NodeExpiryListener;
39use common_meta::peer::{Peer, PeerDiscoveryRef};
40use common_meta::reconciliation::manager::ReconciliationManagerRef;
41use common_meta::region_keeper::MemoryRegionKeeperRef;
42use common_meta::region_registry::LeaderRegionRegistryRef;
43use common_meta::sequence::SequenceRef;
44use common_meta::stats::topic::TopicStatsRegistryRef;
45use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
46use common_options::datanode::DatanodeClientOptions;
47use common_options::memory::MemoryOptions;
48use common_procedure::ProcedureManagerRef;
49use common_procedure::options::ProcedureConfig;
50use common_telemetry::logging::{LoggingOptions, TracingOptions};
51use common_telemetry::{error, info, warn};
52use common_wal::config::MetasrvWalConfig;
53use serde::{Deserialize, Serialize};
54use servers::export_metrics::ExportMetricsOption;
55use servers::grpc::GrpcOptions;
56use servers::http::HttpOptions;
57use servers::tls::TlsOption;
58use snafu::{OptionExt, ResultExt};
59use store_api::storage::RegionId;
60use tokio::sync::broadcast::error::RecvError;
61
62use crate::cluster::MetaPeerClientRef;
63use crate::discovery;
64use crate::election::{Election, LeaderChangeMessage};
65use crate::error::{
66 self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
67 StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
68};
69use crate::failure_detector::PhiAccrualFailureDetectorOptions;
70use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
71use crate::procedure::ProcedureManagerListenerAdapter;
72use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
73use crate::procedure::wal_prune::manager::WalPruneTickerRef;
74use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
75use crate::region::flush_trigger::RegionFlushTickerRef;
76use crate::region::supervisor::RegionSupervisorTickerRef;
77use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
78use crate::service::mailbox::MailboxRef;
79use crate::service::store::cached_kv::LeaderCachedKvBackend;
80use crate::state::{StateRef, become_follower, become_leader};
81
82pub const TABLE_ID_SEQ: &str = "table_id";
83pub const FLOW_ID_SEQ: &str = "flow_id";
84pub const METASRV_DATA_DIR: &str = "metasrv";
85
86#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
88#[serde(rename_all = "snake_case")]
89pub enum BackendImpl {
90 #[default]
92 EtcdStore,
93 MemoryStore,
95 #[cfg(feature = "pg_kvbackend")]
96 PostgresStore,
98 #[cfg(feature = "mysql_kvbackend")]
99 MysqlStore,
101}
102
103#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
105pub struct StatsPersistenceOptions {
106 #[serde(with = "humantime_serde")]
108 pub ttl: Duration,
109 #[serde(with = "humantime_serde")]
111 pub interval: Duration,
112}
113
114impl Default for StatsPersistenceOptions {
115 fn default() -> Self {
116 Self {
117 ttl: Duration::ZERO,
118 interval: Duration::from_mins(10),
119 }
120 }
121}
122
123#[derive(Clone, PartialEq, Serialize, Deserialize)]
124#[serde(default)]
125pub struct MetasrvOptions {
126 #[deprecated(note = "Use grpc.bind_addr instead")]
128 pub bind_addr: String,
129 #[deprecated(note = "Use grpc.server_addr instead")]
131 pub server_addr: String,
132 pub store_addrs: Vec<String>,
134 #[serde(default)]
137 pub backend_tls: Option<TlsOption>,
138 pub selector: SelectorType,
140 pub use_memory_store: bool,
142 pub enable_region_failover: bool,
144 #[serde(with = "humantime_serde")]
148 pub region_failure_detector_initialization_delay: Duration,
149 pub allow_region_failover_on_local_wal: bool,
154 pub grpc: GrpcOptions,
155 pub http: HttpOptions,
157 pub logging: LoggingOptions,
159 pub procedure: ProcedureConfig,
161 pub failure_detector: PhiAccrualFailureDetectorOptions,
163 pub datanode: DatanodeClientOptions,
165 pub enable_telemetry: bool,
167 pub data_home: String,
169 pub wal: MetasrvWalConfig,
171 pub export_metrics: ExportMetricsOption,
173 pub store_key_prefix: String,
176 pub max_txn_ops: usize,
187 pub flush_stats_factor: usize,
191 pub tracing: TracingOptions,
193 pub memory: MemoryOptions,
195 pub backend: BackendImpl,
197 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
198 pub meta_table_name: String,
200 #[cfg(feature = "pg_kvbackend")]
201 pub meta_election_lock_id: u64,
203 #[cfg(feature = "pg_kvbackend")]
204 pub meta_schema_name: Option<String>,
206 #[serde(with = "humantime_serde")]
207 pub node_max_idle_time: Duration,
208 pub event_recorder: EventRecorderOptions,
210 pub stats_persistence: StatsPersistenceOptions,
212}
213
214impl fmt::Debug for MetasrvOptions {
215 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
216 let mut debug_struct = f.debug_struct("MetasrvOptions");
217 debug_struct
218 .field("store_addrs", &self.sanitize_store_addrs())
219 .field("backend_tls", &self.backend_tls)
220 .field("selector", &self.selector)
221 .field("use_memory_store", &self.use_memory_store)
222 .field("enable_region_failover", &self.enable_region_failover)
223 .field(
224 "allow_region_failover_on_local_wal",
225 &self.allow_region_failover_on_local_wal,
226 )
227 .field("grpc", &self.grpc)
228 .field("http", &self.http)
229 .field("logging", &self.logging)
230 .field("procedure", &self.procedure)
231 .field("failure_detector", &self.failure_detector)
232 .field("datanode", &self.datanode)
233 .field("enable_telemetry", &self.enable_telemetry)
234 .field("data_home", &self.data_home)
235 .field("wal", &self.wal)
236 .field("export_metrics", &self.export_metrics)
237 .field("store_key_prefix", &self.store_key_prefix)
238 .field("max_txn_ops", &self.max_txn_ops)
239 .field("flush_stats_factor", &self.flush_stats_factor)
240 .field("tracing", &self.tracing)
241 .field("backend", &self.backend)
242 .field("event_recorder", &self.event_recorder)
243 .field("stats_persistence", &self.stats_persistence);
244
245 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
246 debug_struct.field("meta_table_name", &self.meta_table_name);
247
248 #[cfg(feature = "pg_kvbackend")]
249 debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
250 #[cfg(feature = "pg_kvbackend")]
251 debug_struct.field("meta_schema_name", &self.meta_schema_name);
252
253 debug_struct
254 .field("node_max_idle_time", &self.node_max_idle_time)
255 .finish()
256 }
257}
258
259const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
260
261impl Default for MetasrvOptions {
262 fn default() -> Self {
263 Self {
264 #[allow(deprecated)]
265 bind_addr: String::new(),
266 #[allow(deprecated)]
267 server_addr: String::new(),
268 store_addrs: vec!["127.0.0.1:2379".to_string()],
269 backend_tls: None,
270 selector: SelectorType::default(),
271 use_memory_store: false,
272 enable_region_failover: false,
273 region_failure_detector_initialization_delay: Duration::from_secs(10 * 60),
274 allow_region_failover_on_local_wal: false,
275 grpc: GrpcOptions {
276 bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
277 ..Default::default()
278 },
279 http: HttpOptions::default(),
280 logging: LoggingOptions::default(),
281 procedure: ProcedureConfig {
282 max_retry_times: 12,
283 retry_delay: Duration::from_millis(500),
284 max_metadata_value_size: Some(ReadableSize::kb(1500)),
287 max_running_procedures: 128,
288 },
289 failure_detector: PhiAccrualFailureDetectorOptions::default(),
290 datanode: DatanodeClientOptions::default(),
291 enable_telemetry: true,
292 data_home: DEFAULT_DATA_HOME.to_string(),
293 wal: MetasrvWalConfig::default(),
294 export_metrics: ExportMetricsOption::default(),
295 store_key_prefix: String::new(),
296 max_txn_ops: 128,
297 flush_stats_factor: 3,
298 tracing: TracingOptions::default(),
299 memory: MemoryOptions::default(),
300 backend: BackendImpl::EtcdStore,
301 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
302 meta_table_name: common_meta::kv_backend::DEFAULT_META_TABLE_NAME.to_string(),
303 #[cfg(feature = "pg_kvbackend")]
304 meta_election_lock_id: common_meta::kv_backend::DEFAULT_META_ELECTION_LOCK_ID,
305 #[cfg(feature = "pg_kvbackend")]
306 meta_schema_name: None,
307 node_max_idle_time: Duration::from_secs(24 * 60 * 60),
308 event_recorder: EventRecorderOptions::default(),
309 stats_persistence: StatsPersistenceOptions::default(),
310 }
311 }
312}
313
314impl Configurable for MetasrvOptions {
315 fn env_list_keys() -> Option<&'static [&'static str]> {
316 Some(&["wal.broker_endpoints", "store_addrs"])
317 }
318}
319
320impl MetasrvOptions {
321 fn sanitize_store_addrs(&self) -> Vec<String> {
322 self.store_addrs
323 .iter()
324 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
325 .collect()
326 }
327}
328
329pub struct MetasrvInfo {
330 pub server_addr: String,
331}
332#[derive(Clone)]
333pub struct Context {
334 pub server_addr: String,
335 pub in_memory: ResettableKvBackendRef,
336 pub kv_backend: KvBackendRef,
337 pub leader_cached_kv_backend: ResettableKvBackendRef,
338 pub meta_peer_client: MetaPeerClientRef,
339 pub mailbox: MailboxRef,
340 pub election: Option<ElectionRef>,
341 pub is_infancy: bool,
342 pub table_metadata_manager: TableMetadataManagerRef,
343 pub cache_invalidator: CacheInvalidatorRef,
344 pub leader_region_registry: LeaderRegionRegistryRef,
345 pub topic_stats_registry: TopicStatsRegistryRef,
346}
347
348impl Context {
349 pub fn reset_in_memory(&self) {
350 self.in_memory.reset();
351 self.leader_region_registry.reset();
352 }
353}
354
355pub struct LeaderValue(pub String);
357
358impl<T: AsRef<[u8]>> From<T> for LeaderValue {
359 fn from(value: T) -> Self {
360 let string = String::from_utf8_lossy(value.as_ref());
361 Self(string.to_string())
362 }
363}
364
365#[derive(Debug, Clone, Serialize, Deserialize)]
366pub struct MetasrvNodeInfo {
367 pub addr: String,
369 pub version: String,
371 pub git_commit: String,
373 pub start_time_ms: u64,
375 #[serde(default)]
377 pub cpus: u32,
378 #[serde(default)]
379 pub memory_bytes: u64,
381}
382
383impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
384 fn from(node_info: MetasrvNodeInfo) -> Self {
385 Self {
386 peer: Some(api::v1::meta::Peer {
387 addr: node_info.addr,
388 ..Default::default()
389 }),
390 version: node_info.version,
391 git_commit: node_info.git_commit,
392 start_time_ms: node_info.start_time_ms,
393 cpus: node_info.cpus,
394 memory_bytes: node_info.memory_bytes,
395 }
396 }
397}
398
399#[derive(Clone, Copy)]
400pub enum SelectTarget {
401 Datanode,
402 Flownode,
403}
404
405impl Display for SelectTarget {
406 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
407 match self {
408 SelectTarget::Datanode => write!(f, "datanode"),
409 SelectTarget::Flownode => write!(f, "flownode"),
410 }
411 }
412}
413
414#[derive(Clone)]
415pub struct SelectorContext {
416 pub peer_discovery: PeerDiscoveryRef,
417}
418
419pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
420pub type RegionStatAwareSelectorRef =
421 Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
422pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
423
424pub struct MetaStateHandler {
425 subscribe_manager: Option<SubscriptionManagerRef>,
426 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
427 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
428 leadership_change_notifier: LeadershipChangeNotifier,
429 state: StateRef,
430}
431
432impl MetaStateHandler {
433 pub async fn on_leader_start(&self) {
434 self.state.write().unwrap().next_state(become_leader(false));
435
436 if let Err(e) = self.leader_cached_kv_backend.load().await {
437 error!(e; "Failed to load kv into leader cache kv store");
438 } else {
439 self.state.write().unwrap().next_state(become_leader(true));
440 }
441
442 self.leadership_change_notifier
443 .notify_on_leader_start()
444 .await;
445
446 self.greptimedb_telemetry_task.should_report(true);
447 }
448
449 pub async fn on_leader_stop(&self) {
450 self.state.write().unwrap().next_state(become_follower());
451
452 self.leadership_change_notifier
453 .notify_on_leader_stop()
454 .await;
455
456 self.greptimedb_telemetry_task.should_report(false);
458
459 if let Some(sub_manager) = self.subscribe_manager.clone() {
460 info!("Leader changed, un_subscribe all");
461 if let Err(e) = sub_manager.unsubscribe_all() {
462 error!(e; "Failed to un_subscribe all");
463 }
464 }
465 }
466}
467
468pub struct Metasrv {
469 state: StateRef,
470 started: Arc<AtomicBool>,
471 start_time_ms: u64,
472 options: MetasrvOptions,
473 in_memory: ResettableKvBackendRef,
476 kv_backend: KvBackendRef,
477 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
478 meta_peer_client: MetaPeerClientRef,
479 selector: SelectorRef,
481 selector_ctx: SelectorContext,
482 flow_selector: SelectorRef,
484 handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
485 handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
486 election: Option<ElectionRef>,
487 procedure_manager: ProcedureManagerRef,
488 mailbox: MailboxRef,
489 ddl_manager: DdlManagerRef,
490 wal_options_allocator: WalOptionsAllocatorRef,
491 table_metadata_manager: TableMetadataManagerRef,
492 runtime_switch_manager: RuntimeSwitchManagerRef,
493 memory_region_keeper: MemoryRegionKeeperRef,
494 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
495 region_migration_manager: RegionMigrationManagerRef,
496 region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
497 cache_invalidator: CacheInvalidatorRef,
498 leader_region_registry: LeaderRegionRegistryRef,
499 topic_stats_registry: TopicStatsRegistryRef,
500 wal_prune_ticker: Option<WalPruneTickerRef>,
501 region_flush_ticker: Option<RegionFlushTickerRef>,
502 table_id_sequence: SequenceRef,
503 reconciliation_manager: ReconciliationManagerRef,
504 resource_spec: ResourceSpec,
505
506 plugins: Plugins,
507}
508
509impl Metasrv {
510 pub async fn try_start(&self) -> Result<()> {
511 if self
512 .started
513 .compare_exchange(false, true, Ordering::AcqRel, Ordering::Acquire)
514 .is_err()
515 {
516 warn!("Metasrv already started");
517 return Ok(());
518 }
519
520 let handler_group_builder =
521 self.handler_group_builder
522 .lock()
523 .unwrap()
524 .take()
525 .context(error::UnexpectedSnafu {
526 violated: "expected heartbeat handler group builder",
527 })?;
528 *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
529
530 self.table_metadata_manager
532 .init()
533 .await
534 .context(InitMetadataSnafu)?;
535
536 if let Some(election) = self.election() {
537 let procedure_manager = self.procedure_manager.clone();
538 let in_memory = self.in_memory.clone();
539 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
540 let subscribe_manager = self.subscription_manager();
541 let mut rx = election.subscribe_leader_change();
542 let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
543 greptimedb_telemetry_task
544 .start()
545 .context(StartTelemetryTaskSnafu)?;
546
547 let mut leadership_change_notifier = LeadershipChangeNotifier::default();
549 leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
550 leadership_change_notifier
551 .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
552 leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
553 self.options.node_max_idle_time,
554 self.in_memory.clone(),
555 )));
556 if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
557 leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
558 }
559 if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
560 leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
561 }
562 if let Some(region_flush_trigger) = &self.region_flush_ticker {
563 leadership_change_notifier.add_listener(region_flush_trigger.clone() as _);
564 }
565 if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
566 customizer.customize(&mut leadership_change_notifier);
567 }
568
569 let state_handler = MetaStateHandler {
570 greptimedb_telemetry_task,
571 subscribe_manager,
572 state: self.state.clone(),
573 leader_cached_kv_backend: leader_cached_kv_backend.clone(),
574 leadership_change_notifier,
575 };
576 let _handle = common_runtime::spawn_global(async move {
577 loop {
578 match rx.recv().await {
579 Ok(msg) => {
580 in_memory.reset();
581 leader_cached_kv_backend.reset();
582 info!("Leader's cache has bean cleared on leader change: {msg}");
583 match msg {
584 LeaderChangeMessage::Elected(_) => {
585 state_handler.on_leader_start().await;
586 }
587 LeaderChangeMessage::StepDown(leader) => {
588 error!("Leader :{:?} step down", leader);
589
590 state_handler.on_leader_stop().await;
591 }
592 }
593 }
594 Err(RecvError::Closed) => {
595 error!("Not expected, is leader election loop still running?");
596 break;
597 }
598 Err(RecvError::Lagged(_)) => {
599 break;
600 }
601 }
602 }
603
604 state_handler.on_leader_stop().await;
605 });
606
607 {
609 let election = election.clone();
610 let started = self.started.clone();
611 let node_info = self.node_info();
612 let _handle = common_runtime::spawn_global(async move {
613 while started.load(Ordering::Acquire) {
614 let res = election.register_candidate(&node_info).await;
615 if let Err(e) = res {
616 warn!(e; "Metasrv register candidate error");
617 }
618 }
619 });
620 }
621
622 {
624 let election = election.clone();
625 let started = self.started.clone();
626 let _handle = common_runtime::spawn_global(async move {
627 while started.load(Ordering::Acquire) {
628 let res = election.campaign().await;
629 if let Err(e) = res {
630 warn!(e; "Metasrv election error");
631 }
632 election.reset_campaign().await;
633 info!("Metasrv re-initiate election");
634 }
635 info!("Metasrv stopped");
636 });
637 }
638 } else {
639 warn!(
640 "Ensure only one instance of Metasrv is running, as there is no election service."
641 );
642
643 if let Err(e) = self.wal_options_allocator.start().await {
644 error!(e; "Failed to start wal options allocator");
645 }
646 self.leader_cached_kv_backend
648 .load()
649 .await
650 .context(KvBackendSnafu)?;
651 self.procedure_manager
652 .start()
653 .await
654 .context(StartProcedureManagerSnafu)?;
655 }
656
657 info!("Metasrv started");
658
659 Ok(())
660 }
661
662 pub async fn shutdown(&self) -> Result<()> {
663 if self
664 .started
665 .compare_exchange(true, false, Ordering::AcqRel, Ordering::Acquire)
666 .is_err()
667 {
668 warn!("Metasrv already stopped");
669 return Ok(());
670 }
671
672 self.procedure_manager
673 .stop()
674 .await
675 .context(StopProcedureManagerSnafu)?;
676
677 info!("Metasrv stopped");
678
679 Ok(())
680 }
681
682 pub fn start_time_ms(&self) -> u64 {
683 self.start_time_ms
684 }
685
686 pub fn resource_spec(&self) -> &ResourceSpec {
687 &self.resource_spec
688 }
689
690 pub fn node_info(&self) -> MetasrvNodeInfo {
691 let build_info = common_version::build_info();
692 MetasrvNodeInfo {
693 addr: self.options().grpc.server_addr.clone(),
694 version: build_info.version.to_string(),
695 git_commit: build_info.commit_short.to_string(),
696 start_time_ms: self.start_time_ms(),
697 cpus: self.resource_spec().cpus as u32,
698 memory_bytes: self.resource_spec().memory.unwrap_or_default().as_bytes(),
699 }
700 }
701
702 pub(crate) async fn lookup_datanode_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
705 discovery::utils::alive_datanode(
706 self.meta_peer_client.as_ref(),
707 peer_id,
708 Duration::from_secs(distributed_time_constants::DATANODE_LEASE_SECS),
709 )
710 .await
711 }
712
713 pub fn options(&self) -> &MetasrvOptions {
714 &self.options
715 }
716
717 pub fn in_memory(&self) -> &ResettableKvBackendRef {
718 &self.in_memory
719 }
720
721 pub fn kv_backend(&self) -> &KvBackendRef {
722 &self.kv_backend
723 }
724
725 pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
726 &self.meta_peer_client
727 }
728
729 pub fn selector(&self) -> &SelectorRef {
730 &self.selector
731 }
732
733 pub fn selector_ctx(&self) -> &SelectorContext {
734 &self.selector_ctx
735 }
736
737 pub fn flow_selector(&self) -> &SelectorRef {
738 &self.flow_selector
739 }
740
741 pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
742 self.handler_group.read().unwrap().clone()
743 }
744
745 pub fn election(&self) -> Option<&ElectionRef> {
746 self.election.as_ref()
747 }
748
749 pub fn mailbox(&self) -> &MailboxRef {
750 &self.mailbox
751 }
752
753 pub fn ddl_manager(&self) -> &DdlManagerRef {
754 &self.ddl_manager
755 }
756
757 pub fn procedure_manager(&self) -> &ProcedureManagerRef {
758 &self.procedure_manager
759 }
760
761 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
762 &self.table_metadata_manager
763 }
764
765 pub fn runtime_switch_manager(&self) -> &RuntimeSwitchManagerRef {
766 &self.runtime_switch_manager
767 }
768
769 pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
770 &self.memory_region_keeper
771 }
772
773 pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
774 &self.region_migration_manager
775 }
776
777 pub fn publish(&self) -> Option<PublisherRef> {
778 self.plugins.get::<PublisherRef>()
779 }
780
781 pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
782 self.plugins.get::<SubscriptionManagerRef>()
783 }
784
785 pub fn table_id_sequence(&self) -> &SequenceRef {
786 &self.table_id_sequence
787 }
788
789 pub fn reconciliation_manager(&self) -> &ReconciliationManagerRef {
790 &self.reconciliation_manager
791 }
792
793 pub fn plugins(&self) -> &Plugins {
794 &self.plugins
795 }
796
797 pub fn started(&self) -> Arc<AtomicBool> {
798 self.started.clone()
799 }
800
801 #[inline]
802 pub fn new_ctx(&self) -> Context {
803 let server_addr = self.options().grpc.server_addr.clone();
804 let in_memory = self.in_memory.clone();
805 let kv_backend = self.kv_backend.clone();
806 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
807 let meta_peer_client = self.meta_peer_client.clone();
808 let mailbox = self.mailbox.clone();
809 let election = self.election.clone();
810 let table_metadata_manager = self.table_metadata_manager.clone();
811 let cache_invalidator = self.cache_invalidator.clone();
812 let leader_region_registry = self.leader_region_registry.clone();
813 let topic_stats_registry = self.topic_stats_registry.clone();
814
815 Context {
816 server_addr,
817 in_memory,
818 kv_backend,
819 leader_cached_kv_backend,
820 meta_peer_client,
821 mailbox,
822 election,
823 is_infancy: false,
824 table_metadata_manager,
825 cache_invalidator,
826 leader_region_registry,
827 topic_stats_registry,
828 }
829 }
830}