1pub mod builder;
16
17use std::fmt::{self, Display};
18use std::sync::atomic::{AtomicBool, Ordering};
19use std::sync::{Arc, Mutex, RwLock};
20use std::time::Duration;
21
22use clap::ValueEnum;
23use common_base::readable_size::ReadableSize;
24use common_base::Plugins;
25use common_config::Configurable;
26use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
27use common_meta::cache_invalidator::CacheInvalidatorRef;
28use common_meta::ddl::ProcedureExecutorRef;
29use common_meta::distributed_time_constants;
30use common_meta::key::maintenance::MaintenanceModeManagerRef;
31use common_meta::key::TableMetadataManagerRef;
32use common_meta::kv_backend::{KvBackendRef, ResettableKvBackend, ResettableKvBackendRef};
33use common_meta::leadership_notifier::{
34 LeadershipChangeNotifier, LeadershipChangeNotifierCustomizerRef,
35};
36use common_meta::node_expiry_listener::NodeExpiryListener;
37use common_meta::peer::Peer;
38use common_meta::region_keeper::MemoryRegionKeeperRef;
39use common_meta::region_registry::LeaderRegionRegistryRef;
40use common_meta::wal_options_allocator::WalOptionsAllocatorRef;
41use common_options::datanode::DatanodeClientOptions;
42use common_procedure::options::ProcedureConfig;
43use common_procedure::ProcedureManagerRef;
44use common_telemetry::logging::{LoggingOptions, TracingOptions};
45use common_telemetry::{error, info, warn};
46use common_wal::config::MetasrvWalConfig;
47use serde::{Deserialize, Serialize};
48use servers::export_metrics::ExportMetricsOption;
49use servers::http::HttpOptions;
50use snafu::{OptionExt, ResultExt};
51use store_api::storage::RegionId;
52use table::metadata::TableId;
53use tokio::sync::broadcast::error::RecvError;
54
55use crate::cluster::MetaPeerClientRef;
56use crate::election::{Election, LeaderChangeMessage};
57use crate::error::{
58 self, InitMetadataSnafu, KvBackendSnafu, Result, StartProcedureManagerSnafu,
59 StartTelemetryTaskSnafu, StopProcedureManagerSnafu,
60};
61use crate::failure_detector::PhiAccrualFailureDetectorOptions;
62use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatHandlerGroupRef};
63use crate::lease::lookup_datanode_peer;
64use crate::procedure::region_migration::manager::RegionMigrationManagerRef;
65use crate::procedure::wal_prune::manager::WalPruneTickerRef;
66use crate::procedure::ProcedureManagerListenerAdapter;
67use crate::pubsub::{PublisherRef, SubscriptionManagerRef};
68use crate::region::supervisor::RegionSupervisorTickerRef;
69use crate::selector::{RegionStatAwareSelector, Selector, SelectorType};
70use crate::service::mailbox::MailboxRef;
71use crate::service::store::cached_kv::LeaderCachedKvBackend;
72use crate::state::{become_follower, become_leader, StateRef};
73
74pub const TABLE_ID_SEQ: &str = "table_id";
75pub const FLOW_ID_SEQ: &str = "flow_id";
76pub const METASRV_HOME: &str = "./greptimedb_data/metasrv";
77
78#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
79pub const DEFAULT_META_TABLE_NAME: &str = "greptime_metakv";
80#[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
81pub const DEFAULT_META_ELECTION_LOCK_ID: u64 = 1;
82
83#[derive(Clone, Debug, PartialEq, Serialize, Default, Deserialize, ValueEnum)]
85#[serde(rename_all = "snake_case")]
86pub enum BackendImpl {
87 #[default]
89 EtcdStore,
90 MemoryStore,
92 #[cfg(feature = "pg_kvbackend")]
93 PostgresStore,
95 #[cfg(feature = "mysql_kvbackend")]
96 MysqlStore,
98}
99
100#[derive(Clone, PartialEq, Serialize, Deserialize)]
101#[serde(default)]
102pub struct MetasrvOptions {
103 pub bind_addr: String,
105 pub server_addr: String,
107 pub store_addrs: Vec<String>,
109 pub selector: SelectorType,
111 pub use_memory_store: bool,
113 pub enable_region_failover: bool,
115 pub allow_region_failover_on_local_wal: bool,
120 pub http: HttpOptions,
122 pub logging: LoggingOptions,
124 pub procedure: ProcedureConfig,
126 pub failure_detector: PhiAccrualFailureDetectorOptions,
128 pub datanode: DatanodeClientOptions,
130 pub enable_telemetry: bool,
132 pub data_home: String,
134 pub wal: MetasrvWalConfig,
136 pub export_metrics: ExportMetricsOption,
138 pub store_key_prefix: String,
141 pub max_txn_ops: usize,
152 pub flush_stats_factor: usize,
156 pub tracing: TracingOptions,
158 pub backend: BackendImpl,
160 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
161 pub meta_table_name: String,
163 #[cfg(feature = "pg_kvbackend")]
164 pub meta_election_lock_id: u64,
166 #[serde(with = "humantime_serde")]
167 pub node_max_idle_time: Duration,
168}
169
170impl fmt::Debug for MetasrvOptions {
171 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
172 let mut debug_struct = f.debug_struct("MetasrvOptions");
173 debug_struct
174 .field("bind_addr", &self.bind_addr)
175 .field("server_addr", &self.server_addr)
176 .field("store_addrs", &self.sanitize_store_addrs())
177 .field("selector", &self.selector)
178 .field("use_memory_store", &self.use_memory_store)
179 .field("enable_region_failover", &self.enable_region_failover)
180 .field(
181 "allow_region_failover_on_local_wal",
182 &self.allow_region_failover_on_local_wal,
183 )
184 .field("http", &self.http)
185 .field("logging", &self.logging)
186 .field("procedure", &self.procedure)
187 .field("failure_detector", &self.failure_detector)
188 .field("datanode", &self.datanode)
189 .field("enable_telemetry", &self.enable_telemetry)
190 .field("data_home", &self.data_home)
191 .field("wal", &self.wal)
192 .field("export_metrics", &self.export_metrics)
193 .field("store_key_prefix", &self.store_key_prefix)
194 .field("max_txn_ops", &self.max_txn_ops)
195 .field("flush_stats_factor", &self.flush_stats_factor)
196 .field("tracing", &self.tracing)
197 .field("backend", &self.backend);
198
199 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
200 debug_struct.field("meta_table_name", &self.meta_table_name);
201
202 #[cfg(feature = "pg_kvbackend")]
203 debug_struct.field("meta_election_lock_id", &self.meta_election_lock_id);
204
205 debug_struct
206 .field("node_max_idle_time", &self.node_max_idle_time)
207 .finish()
208 }
209}
210
211const DEFAULT_METASRV_ADDR_PORT: &str = "3002";
212
213impl Default for MetasrvOptions {
214 fn default() -> Self {
215 Self {
216 bind_addr: format!("127.0.0.1:{}", DEFAULT_METASRV_ADDR_PORT),
217 server_addr: String::new(),
219 store_addrs: vec!["127.0.0.1:2379".to_string()],
220 selector: SelectorType::default(),
221 use_memory_store: false,
222 enable_region_failover: false,
223 allow_region_failover_on_local_wal: false,
224 http: HttpOptions::default(),
225 logging: LoggingOptions {
226 dir: format!("{METASRV_HOME}/logs"),
227 ..Default::default()
228 },
229 procedure: ProcedureConfig {
230 max_retry_times: 12,
231 retry_delay: Duration::from_millis(500),
232 max_metadata_value_size: Some(ReadableSize::kb(1500)),
235 max_running_procedures: 128,
236 },
237 failure_detector: PhiAccrualFailureDetectorOptions::default(),
238 datanode: DatanodeClientOptions::default(),
239 enable_telemetry: true,
240 data_home: METASRV_HOME.to_string(),
241 wal: MetasrvWalConfig::default(),
242 export_metrics: ExportMetricsOption::default(),
243 store_key_prefix: String::new(),
244 max_txn_ops: 128,
245 flush_stats_factor: 3,
246 tracing: TracingOptions::default(),
247 backend: BackendImpl::EtcdStore,
248 #[cfg(any(feature = "pg_kvbackend", feature = "mysql_kvbackend"))]
249 meta_table_name: DEFAULT_META_TABLE_NAME.to_string(),
250 #[cfg(feature = "pg_kvbackend")]
251 meta_election_lock_id: DEFAULT_META_ELECTION_LOCK_ID,
252 node_max_idle_time: Duration::from_secs(24 * 60 * 60),
253 }
254 }
255}
256
257impl Configurable for MetasrvOptions {
258 fn env_list_keys() -> Option<&'static [&'static str]> {
259 Some(&["wal.broker_endpoints", "store_addrs"])
260 }
261}
262
263impl MetasrvOptions {
264 #[cfg(not(target_os = "android"))]
266 pub fn detect_server_addr(&mut self) {
267 if self.server_addr.is_empty() {
268 match local_ip_address::local_ip() {
269 Ok(ip) => {
270 let detected_addr = format!(
271 "{}:{}",
272 ip,
273 self.bind_addr
274 .split(':')
275 .nth(1)
276 .unwrap_or(DEFAULT_METASRV_ADDR_PORT)
277 );
278 info!("Using detected: {} as server address", detected_addr);
279 self.server_addr = detected_addr;
280 }
281 Err(e) => {
282 error!("Failed to detect local ip address: {}", e);
283 }
284 }
285 }
286 }
287
288 #[cfg(target_os = "android")]
289 pub fn detect_server_addr(&mut self) {
290 if self.server_addr.is_empty() {
291 common_telemetry::debug!("detect local IP is not supported on Android");
292 }
293 }
294
295 fn sanitize_store_addrs(&self) -> Vec<String> {
296 self.store_addrs
297 .iter()
298 .map(|addr| common_meta::kv_backend::util::sanitize_connection_string(addr))
299 .collect()
300 }
301}
302
303pub struct MetasrvInfo {
304 pub server_addr: String,
305}
306#[derive(Clone)]
307pub struct Context {
308 pub server_addr: String,
309 pub in_memory: ResettableKvBackendRef,
310 pub kv_backend: KvBackendRef,
311 pub leader_cached_kv_backend: ResettableKvBackendRef,
312 pub meta_peer_client: MetaPeerClientRef,
313 pub mailbox: MailboxRef,
314 pub election: Option<ElectionRef>,
315 pub is_infancy: bool,
316 pub table_metadata_manager: TableMetadataManagerRef,
317 pub cache_invalidator: CacheInvalidatorRef,
318 pub leader_region_registry: LeaderRegionRegistryRef,
319}
320
321impl Context {
322 pub fn reset_in_memory(&self) {
323 self.in_memory.reset();
324 self.leader_region_registry.reset();
325 }
326}
327
328pub struct LeaderValue(pub String);
330
331impl<T: AsRef<[u8]>> From<T> for LeaderValue {
332 fn from(value: T) -> Self {
333 let string = String::from_utf8_lossy(value.as_ref());
334 Self(string.to_string())
335 }
336}
337
338#[derive(Debug, Clone, Serialize, Deserialize)]
339pub struct MetasrvNodeInfo {
340 pub addr: String,
342 pub version: String,
344 pub git_commit: String,
346 pub start_time_ms: u64,
348}
349
350impl From<MetasrvNodeInfo> for api::v1::meta::MetasrvNodeInfo {
351 fn from(node_info: MetasrvNodeInfo) -> Self {
352 Self {
353 peer: Some(api::v1::meta::Peer {
354 addr: node_info.addr,
355 ..Default::default()
356 }),
357 version: node_info.version,
358 git_commit: node_info.git_commit,
359 start_time_ms: node_info.start_time_ms,
360 }
361 }
362}
363
364#[derive(Clone, Copy)]
365pub enum SelectTarget {
366 Datanode,
367 Flownode,
368}
369
370impl Display for SelectTarget {
371 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
372 match self {
373 SelectTarget::Datanode => write!(f, "datanode"),
374 SelectTarget::Flownode => write!(f, "flownode"),
375 }
376 }
377}
378
379#[derive(Clone)]
380pub struct SelectorContext {
381 pub server_addr: String,
382 pub datanode_lease_secs: u64,
383 pub flownode_lease_secs: u64,
384 pub kv_backend: KvBackendRef,
385 pub meta_peer_client: MetaPeerClientRef,
386 pub table_id: Option<TableId>,
387}
388
389pub type SelectorRef = Arc<dyn Selector<Context = SelectorContext, Output = Vec<Peer>>>;
390pub type RegionStatAwareSelectorRef =
391 Arc<dyn RegionStatAwareSelector<Context = SelectorContext, Output = Vec<(RegionId, Peer)>>>;
392pub type ElectionRef = Arc<dyn Election<Leader = LeaderValue>>;
393
394pub struct MetaStateHandler {
395 subscribe_manager: Option<SubscriptionManagerRef>,
396 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
397 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
398 leadership_change_notifier: LeadershipChangeNotifier,
399 state: StateRef,
400}
401
402impl MetaStateHandler {
403 pub async fn on_leader_start(&self) {
404 self.state.write().unwrap().next_state(become_leader(false));
405
406 if let Err(e) = self.leader_cached_kv_backend.load().await {
407 error!(e; "Failed to load kv into leader cache kv store");
408 } else {
409 self.state.write().unwrap().next_state(become_leader(true));
410 }
411
412 self.leadership_change_notifier
413 .notify_on_leader_start()
414 .await;
415
416 self.greptimedb_telemetry_task.should_report(true);
417 }
418
419 pub async fn on_leader_stop(&self) {
420 self.state.write().unwrap().next_state(become_follower());
421
422 self.leadership_change_notifier
423 .notify_on_leader_stop()
424 .await;
425
426 self.greptimedb_telemetry_task.should_report(false);
428
429 if let Some(sub_manager) = self.subscribe_manager.clone() {
430 info!("Leader changed, un_subscribe all");
431 if let Err(e) = sub_manager.unsubscribe_all() {
432 error!(e; "Failed to un_subscribe all");
433 }
434 }
435 }
436}
437
438pub struct Metasrv {
439 state: StateRef,
440 started: Arc<AtomicBool>,
441 start_time_ms: u64,
442 options: MetasrvOptions,
443 in_memory: ResettableKvBackendRef,
446 kv_backend: KvBackendRef,
447 leader_cached_kv_backend: Arc<LeaderCachedKvBackend>,
448 meta_peer_client: MetaPeerClientRef,
449 selector: SelectorRef,
451 flow_selector: SelectorRef,
453 handler_group: RwLock<Option<HeartbeatHandlerGroupRef>>,
454 handler_group_builder: Mutex<Option<HeartbeatHandlerGroupBuilder>>,
455 election: Option<ElectionRef>,
456 procedure_manager: ProcedureManagerRef,
457 mailbox: MailboxRef,
458 procedure_executor: ProcedureExecutorRef,
459 wal_options_allocator: WalOptionsAllocatorRef,
460 table_metadata_manager: TableMetadataManagerRef,
461 maintenance_mode_manager: MaintenanceModeManagerRef,
462 memory_region_keeper: MemoryRegionKeeperRef,
463 greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
464 region_migration_manager: RegionMigrationManagerRef,
465 region_supervisor_ticker: Option<RegionSupervisorTickerRef>,
466 cache_invalidator: CacheInvalidatorRef,
467 leader_region_registry: LeaderRegionRegistryRef,
468 wal_prune_ticker: Option<WalPruneTickerRef>,
469
470 plugins: Plugins,
471}
472
473impl Metasrv {
474 pub async fn try_start(&self) -> Result<()> {
475 if self
476 .started
477 .compare_exchange(false, true, Ordering::Relaxed, Ordering::Relaxed)
478 .is_err()
479 {
480 warn!("Metasrv already started");
481 return Ok(());
482 }
483
484 let handler_group_builder =
485 self.handler_group_builder
486 .lock()
487 .unwrap()
488 .take()
489 .context(error::UnexpectedSnafu {
490 violated: "expected heartbeat handler group builder",
491 })?;
492 *self.handler_group.write().unwrap() = Some(Arc::new(handler_group_builder.build()?));
493
494 self.table_metadata_manager
496 .init()
497 .await
498 .context(InitMetadataSnafu)?;
499
500 if let Some(election) = self.election() {
501 let procedure_manager = self.procedure_manager.clone();
502 let in_memory = self.in_memory.clone();
503 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
504 let subscribe_manager = self.subscription_manager();
505 let mut rx = election.subscribe_leader_change();
506 let greptimedb_telemetry_task = self.greptimedb_telemetry_task.clone();
507 greptimedb_telemetry_task
508 .start()
509 .context(StartTelemetryTaskSnafu)?;
510
511 let mut leadership_change_notifier = LeadershipChangeNotifier::default();
513 leadership_change_notifier.add_listener(self.wal_options_allocator.clone());
514 leadership_change_notifier
515 .add_listener(Arc::new(ProcedureManagerListenerAdapter(procedure_manager)));
516 leadership_change_notifier.add_listener(Arc::new(NodeExpiryListener::new(
517 self.options.node_max_idle_time,
518 self.in_memory.clone(),
519 )));
520 if let Some(region_supervisor_ticker) = &self.region_supervisor_ticker {
521 leadership_change_notifier.add_listener(region_supervisor_ticker.clone() as _);
522 }
523 if let Some(wal_prune_ticker) = &self.wal_prune_ticker {
524 leadership_change_notifier.add_listener(wal_prune_ticker.clone() as _);
525 }
526 if let Some(customizer) = self.plugins.get::<LeadershipChangeNotifierCustomizerRef>() {
527 customizer.customize(&mut leadership_change_notifier);
528 }
529
530 let state_handler = MetaStateHandler {
531 greptimedb_telemetry_task,
532 subscribe_manager,
533 state: self.state.clone(),
534 leader_cached_kv_backend: leader_cached_kv_backend.clone(),
535 leadership_change_notifier,
536 };
537 let _handle = common_runtime::spawn_global(async move {
538 loop {
539 match rx.recv().await {
540 Ok(msg) => {
541 in_memory.reset();
542 leader_cached_kv_backend.reset();
543 info!("Leader's cache has bean cleared on leader change: {msg}");
544 match msg {
545 LeaderChangeMessage::Elected(_) => {
546 state_handler.on_leader_start().await;
547 }
548 LeaderChangeMessage::StepDown(leader) => {
549 error!("Leader :{:?} step down", leader);
550
551 state_handler.on_leader_stop().await;
552 }
553 }
554 }
555 Err(RecvError::Closed) => {
556 error!("Not expected, is leader election loop still running?");
557 break;
558 }
559 Err(RecvError::Lagged(_)) => {
560 break;
561 }
562 }
563 }
564
565 state_handler.on_leader_stop().await;
566 });
567
568 {
570 let election = election.clone();
571 let started = self.started.clone();
572 let node_info = self.node_info();
573 let _handle = common_runtime::spawn_global(async move {
574 while started.load(Ordering::Relaxed) {
575 let res = election.register_candidate(&node_info).await;
576 if let Err(e) = res {
577 warn!(e; "Metasrv register candidate error");
578 }
579 }
580 });
581 }
582
583 {
585 let election = election.clone();
586 let started = self.started.clone();
587 let _handle = common_runtime::spawn_global(async move {
588 while started.load(Ordering::Relaxed) {
589 let res = election.campaign().await;
590 if let Err(e) = res {
591 warn!(e; "Metasrv election error");
592 }
593 info!("Metasrv re-initiate election");
594 }
595 info!("Metasrv stopped");
596 });
597 }
598 } else {
599 warn!(
600 "Ensure only one instance of Metasrv is running, as there is no election service."
601 );
602
603 if let Err(e) = self.wal_options_allocator.start().await {
604 error!(e; "Failed to start wal options allocator");
605 }
606 self.leader_cached_kv_backend
608 .load()
609 .await
610 .context(KvBackendSnafu)?;
611 self.procedure_manager
612 .start()
613 .await
614 .context(StartProcedureManagerSnafu)?;
615 }
616
617 info!("Metasrv started");
618
619 Ok(())
620 }
621
622 pub async fn shutdown(&self) -> Result<()> {
623 self.started.store(false, Ordering::Relaxed);
624 self.procedure_manager
625 .stop()
626 .await
627 .context(StopProcedureManagerSnafu)
628 }
629
630 pub fn start_time_ms(&self) -> u64 {
631 self.start_time_ms
632 }
633
634 pub fn node_info(&self) -> MetasrvNodeInfo {
635 let build_info = common_version::build_info();
636 MetasrvNodeInfo {
637 addr: self.options().server_addr.clone(),
638 version: build_info.version.to_string(),
639 git_commit: build_info.commit_short.to_string(),
640 start_time_ms: self.start_time_ms(),
641 }
642 }
643
644 pub(crate) async fn lookup_peer(&self, peer_id: u64) -> Result<Option<Peer>> {
646 lookup_datanode_peer(
647 peer_id,
648 &self.meta_peer_client,
649 distributed_time_constants::DATANODE_LEASE_SECS,
650 )
651 .await
652 }
653
654 pub fn options(&self) -> &MetasrvOptions {
655 &self.options
656 }
657
658 pub fn in_memory(&self) -> &ResettableKvBackendRef {
659 &self.in_memory
660 }
661
662 pub fn kv_backend(&self) -> &KvBackendRef {
663 &self.kv_backend
664 }
665
666 pub fn meta_peer_client(&self) -> &MetaPeerClientRef {
667 &self.meta_peer_client
668 }
669
670 pub fn selector(&self) -> &SelectorRef {
671 &self.selector
672 }
673
674 pub fn flow_selector(&self) -> &SelectorRef {
675 &self.flow_selector
676 }
677
678 pub fn handler_group(&self) -> Option<HeartbeatHandlerGroupRef> {
679 self.handler_group.read().unwrap().clone()
680 }
681
682 pub fn election(&self) -> Option<&ElectionRef> {
683 self.election.as_ref()
684 }
685
686 pub fn mailbox(&self) -> &MailboxRef {
687 &self.mailbox
688 }
689
690 pub fn procedure_executor(&self) -> &ProcedureExecutorRef {
691 &self.procedure_executor
692 }
693
694 pub fn procedure_manager(&self) -> &ProcedureManagerRef {
695 &self.procedure_manager
696 }
697
698 pub fn table_metadata_manager(&self) -> &TableMetadataManagerRef {
699 &self.table_metadata_manager
700 }
701
702 pub fn maintenance_mode_manager(&self) -> &MaintenanceModeManagerRef {
703 &self.maintenance_mode_manager
704 }
705
706 pub fn memory_region_keeper(&self) -> &MemoryRegionKeeperRef {
707 &self.memory_region_keeper
708 }
709
710 pub fn region_migration_manager(&self) -> &RegionMigrationManagerRef {
711 &self.region_migration_manager
712 }
713
714 pub fn publish(&self) -> Option<PublisherRef> {
715 self.plugins.get::<PublisherRef>()
716 }
717
718 pub fn subscription_manager(&self) -> Option<SubscriptionManagerRef> {
719 self.plugins.get::<SubscriptionManagerRef>()
720 }
721
722 pub fn plugins(&self) -> &Plugins {
723 &self.plugins
724 }
725
726 #[inline]
727 pub fn new_ctx(&self) -> Context {
728 let server_addr = self.options().server_addr.clone();
729 let in_memory = self.in_memory.clone();
730 let kv_backend = self.kv_backend.clone();
731 let leader_cached_kv_backend = self.leader_cached_kv_backend.clone();
732 let meta_peer_client = self.meta_peer_client.clone();
733 let mailbox = self.mailbox.clone();
734 let election = self.election.clone();
735 let table_metadata_manager = self.table_metadata_manager.clone();
736 let cache_invalidator = self.cache_invalidator.clone();
737 let leader_region_registry = self.leader_region_registry.clone();
738
739 Context {
740 server_addr,
741 in_memory,
742 kv_backend,
743 leader_cached_kv_backend,
744 meta_peer_client,
745 mailbox,
746 election,
747 is_infancy: false,
748 table_metadata_manager,
749 cache_invalidator,
750 leader_region_registry,
751 }
752 }
753}