1use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{DEFAULT_COMPACTION_TIME_WINDOW, EventRecorderImpl, EventRecorderRef};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29 DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
32use common_meta::distributed_time_constants::default_distributed_time_constants;
33use common_meta::key::TableMetadataManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::flow::flow_state::FlowStateManager;
36use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_provider::{build_kafka_client, build_wal_provider};
47use common_procedure::ProcedureManagerRef;
48use common_procedure::local::{LocalManager, ManagerConfig};
49use common_stat::ResourceStatImpl;
50use common_telemetry::{info, warn};
51use snafu::{ResultExt, ensure};
52use store_api::storage::MAX_REGION_SEQ;
53
54use crate::bootstrap::build_default_meta_peer_client;
55use crate::cache_invalidator::MetasrvCacheInvalidator;
56use crate::cluster::MetaPeerClientRef;
57use crate::error::{self, BuildWalProviderSnafu, OtherSnafu, Result};
58use crate::events::EventHandlerImpl;
59use crate::gc::GcScheduler;
60use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
61use crate::handler::failure_handler::RegionFailureHandler;
62use crate::handler::flow_state_handler::FlowStateHandler;
63use crate::handler::persist_stats_handler::PersistStatsHandler;
64use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
65use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
66use crate::metasrv::{
67 ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions,
68 RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ,
69};
70use crate::peer::MetasrvPeerAllocator;
71use crate::procedure::region_migration::DefaultContextFactory;
72use crate::procedure::region_migration::manager::RegionMigrationManager;
73use crate::procedure::repartition::DefaultRepartitionProcedureFactory;
74use crate::procedure::wal_prune::Context as WalPruneContext;
75use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
76use crate::region::flush_trigger::RegionFlushTrigger;
77use crate::region::supervisor::{
78 DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL, HeartbeatAcceptor,
79 RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
80 RegionSupervisorTicker,
81};
82use crate::selector::lease_based::LeaseBasedSelector;
83use crate::selector::round_robin::RoundRobinSelector;
84use crate::service::mailbox::MailboxRef;
85use crate::service::store::cached_kv::LeaderCachedKvBackend;
86use crate::state::State;
87use crate::utils::database::DatabaseOperator;
88use crate::utils::insert_forwarder::InsertForwarder;
89
90const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
92
93pub struct MetasrvBuilder {
95 options: Option<MetasrvOptions>,
96 kv_backend: Option<KvBackendRef>,
97 in_memory: Option<ResettableKvBackendRef>,
98 selector: Option<SelectorRef>,
99 handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
100 election: Option<ElectionRef>,
101 meta_peer_client: Option<MetaPeerClientRef>,
102 node_manager: Option<NodeManagerRef>,
103 plugins: Option<Plugins>,
104 table_metadata_allocator: Option<TableMetadataAllocatorRef>,
105}
106
107impl MetasrvBuilder {
108 pub fn new() -> Self {
109 Self {
110 kv_backend: None,
111 in_memory: None,
112 selector: None,
113 handler_group_builder: None,
114 meta_peer_client: None,
115 election: None,
116 options: None,
117 node_manager: None,
118 plugins: None,
119 table_metadata_allocator: None,
120 }
121 }
122
123 pub fn options(mut self, options: MetasrvOptions) -> Self {
124 self.options = Some(options);
125 self
126 }
127
128 pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
129 self.kv_backend = Some(kv_backend);
130 self
131 }
132
133 pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
134 self.in_memory = Some(in_memory);
135 self
136 }
137
138 pub fn selector(mut self, selector: SelectorRef) -> Self {
139 self.selector = Some(selector);
140 self
141 }
142
143 pub fn heartbeat_handler(
144 mut self,
145 handler_group_builder: HeartbeatHandlerGroupBuilder,
146 ) -> Self {
147 self.handler_group_builder = Some(handler_group_builder);
148 self
149 }
150
151 pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
152 self.meta_peer_client = Some(meta_peer_client);
153 self
154 }
155
156 pub fn election(mut self, election: Option<ElectionRef>) -> Self {
157 self.election = election;
158 self
159 }
160
161 pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
162 self.node_manager = Some(node_manager);
163 self
164 }
165
166 pub fn plugins(mut self, plugins: Plugins) -> Self {
167 self.plugins = Some(plugins);
168 self
169 }
170
171 pub fn table_metadata_allocator(
172 mut self,
173 table_metadata_allocator: TableMetadataAllocatorRef,
174 ) -> Self {
175 self.table_metadata_allocator = Some(table_metadata_allocator);
176 self
177 }
178
179 pub fn options_ref(&self) -> Option<&MetasrvOptions> {
180 self.options.as_ref()
181 }
182
183 pub fn kv_backend_ref(&self) -> Option<&KvBackendRef> {
184 self.kv_backend.as_ref()
185 }
186
187 pub fn in_memory_ref(&self) -> Option<&ResettableKvBackendRef> {
188 self.in_memory.as_ref()
189 }
190
191 pub fn election_ref(&self) -> Option<&ElectionRef> {
192 self.election.as_ref()
193 }
194
195 pub fn meta_peer_client_ref(&self) -> Option<&MetaPeerClientRef> {
196 self.meta_peer_client.as_ref()
197 }
198
199 pub fn node_manager_ref(&self) -> Option<&NodeManagerRef> {
200 self.node_manager.as_ref()
201 }
202
203 pub async fn build(self) -> Result<Metasrv> {
204 let MetasrvBuilder {
205 election,
206 meta_peer_client,
207 options,
208 kv_backend,
209 in_memory,
210 selector,
211 handler_group_builder,
212 node_manager,
213 plugins,
214 table_metadata_allocator,
215 } = self;
216
217 let options = options.unwrap_or_default();
218
219 let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
220 let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
221
222 let state = Arc::new(RwLock::new(match election {
223 None => State::leader(options.grpc.server_addr.clone(), true),
224 Some(_) => State::follower(options.grpc.server_addr.clone()),
225 }));
226
227 let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
228 state.clone(),
229 kv_backend.clone(),
230 ));
231
232 let meta_peer_client = meta_peer_client
233 .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
234 let database_operator = Arc::new(DatabaseOperator::new(meta_peer_client.clone()));
235
236 let event_inserter = Box::new(InsertForwarder::new(
237 database_operator.clone(),
238 Some(InsertOptions {
239 ttl: options.event_recorder.ttl,
240 append_mode: true,
241 twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
242 }),
243 ));
244 let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
246 event_inserter,
247 ))));
248
249 let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
250 let pushers = Pushers::default();
251 let mailbox = build_mailbox(&kv_backend, &pushers);
252 let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
253 let procedure_manager = build_procedure_manager(
254 &options,
255 &kv_backend,
256 &runtime_switch_manager,
257 event_recorder,
258 );
259
260 let table_metadata_manager = Arc::new(TableMetadataManager::new(
261 leader_cached_kv_backend.clone() as _,
262 ));
263 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
264 leader_cached_kv_backend.clone() as _,
265 ));
266
267 let selector_ctx = SelectorContext {
268 peer_discovery: meta_peer_client.clone(),
269 };
270
271 let wal_provider = build_wal_provider(&options.wal, kv_backend.clone())
272 .await
273 .context(BuildWalProviderSnafu)?;
274 let wal_provider = Arc::new(wal_provider);
275 let is_remote_wal = wal_provider.is_remote_wal();
276 let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
277 let sequence = Arc::new(
278 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
279 .initial(MIN_USER_TABLE_ID as u64)
280 .step(10)
281 .build(),
282 );
283 let peer_allocator = Arc::new(
284 MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone())
285 .with_max_items(MAX_REGION_SEQ),
286 );
287 Arc::new(TableMetadataAllocator::with_peer_allocator(
288 sequence,
289 wal_provider.clone(),
290 peer_allocator,
291 ))
292 });
293 let table_id_allocator = table_metadata_allocator.table_id_allocator();
294
295 let flow_selector =
296 Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
297
298 let flow_metadata_allocator = {
299 let flow_selector_ctx = selector_ctx.clone();
301 let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
302 flow_selector_ctx,
303 flow_selector.clone(),
304 ));
305 let seq = Arc::new(
306 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
307 .initial(MIN_USER_FLOW_ID as u64)
308 .step(10)
309 .build(),
310 );
311
312 Arc::new(FlowMetadataAllocator::with_peer_allocator(
313 seq,
314 peer_allocator,
315 ))
316 };
317 let flow_state_handler =
318 FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
319
320 let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
321 let node_manager = node_manager.unwrap_or_else(|| {
322 let datanode_client_channel_config = ChannelConfig::new()
323 .timeout(Some(options.datanode.client.timeout))
324 .connect_timeout(options.datanode.client.connect_timeout)
325 .tcp_nodelay(options.datanode.client.tcp_nodelay);
326 Arc::new(NodeClients::new(datanode_client_channel_config))
327 });
328 let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
329 mailbox.clone(),
330 MetasrvInfo {
331 server_addr: options.grpc.server_addr.clone(),
332 },
333 ));
334
335 if !is_remote_wal && options.enable_region_failover {
336 ensure!(
337 options.allow_region_failover_on_local_wal,
338 error::UnexpectedSnafu {
339 violated: "Region failover is not supported in the local WAL implementation!
340 If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
341 }
342 );
343 if options.allow_region_failover_on_local_wal {
344 warn!(
345 "Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!"
346 );
347 }
348 }
349
350 let (tx, rx) = RegionSupervisor::channel();
351 let (region_failure_detector_controller, region_supervisor_ticker): (
352 RegionFailureDetectorControllerRef,
353 Option<std::sync::Arc<RegionSupervisorTicker>>,
354 ) = if options.enable_region_failover {
355 (
356 Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
357 Some(Arc::new(RegionSupervisorTicker::new(
358 DEFAULT_TICK_INTERVAL,
359 options.region_failure_detector_initialization_delay,
360 DEFAULT_INITIALIZATION_RETRY_PERIOD,
361 tx.clone(),
362 ))),
363 )
364 } else {
365 (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
366 };
367
368 let region_migration_manager = Arc::new(RegionMigrationManager::new(
370 procedure_manager.clone(),
371 DefaultContextFactory::new(
372 in_memory.clone(),
373 table_metadata_manager.clone(),
374 memory_region_keeper.clone(),
375 region_failure_detector_controller.clone(),
376 mailbox.clone(),
377 options.grpc.server_addr.clone(),
378 cache_invalidator.clone(),
379 ),
380 ));
381 region_migration_manager.try_start()?;
382 let region_supervisor_selector = plugins
383 .as_ref()
384 .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
385
386 let supervisor_selector = match region_supervisor_selector {
387 Some(selector) => {
388 info!("Using region stat aware selector");
389 RegionSupervisorSelector::RegionStatAwareSelector(selector)
390 }
391 None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
392 };
393
394 let region_failover_handler = if options.enable_region_failover {
395 let region_supervisor = RegionSupervisor::new(
396 rx,
397 options.failure_detector,
398 selector_ctx.clone(),
399 supervisor_selector,
400 region_migration_manager.clone(),
401 runtime_switch_manager.clone(),
402 meta_peer_client.clone(),
403 leader_cached_kv_backend.clone(),
404 )
405 .with_state(state.clone());
406
407 Some(RegionFailureHandler::new(
408 region_supervisor,
409 HeartbeatAcceptor::new(tx),
410 ))
411 } else {
412 None
413 };
414
415 let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
416 let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
417
418 let ddl_context = DdlContext {
419 node_manager: node_manager.clone(),
420 cache_invalidator: cache_invalidator.clone(),
421 memory_region_keeper: memory_region_keeper.clone(),
422 leader_region_registry: leader_region_registry.clone(),
423 table_metadata_manager: table_metadata_manager.clone(),
424 table_metadata_allocator: table_metadata_allocator.clone(),
425 flow_metadata_manager: flow_metadata_manager.clone(),
426 flow_metadata_allocator: flow_metadata_allocator.clone(),
427 region_failure_detector_controller,
428 };
429 let procedure_manager_c = procedure_manager.clone();
430 let repartition_procedure_factory = Arc::new(DefaultRepartitionProcedureFactory::new(
431 mailbox.clone(),
432 options.grpc.server_addr.clone(),
433 ));
434 let ddl_manager = DdlManager::try_new(
435 ddl_context,
436 procedure_manager_c,
437 repartition_procedure_factory,
438 true,
439 )
440 .context(error::InitDdlManagerSnafu)?;
441
442 let ddl_manager = if let Some(configurator) = plugins
443 .as_ref()
444 .and_then(|p| p.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>())
445 {
446 let ctx = DdlManagerConfigureContext {
447 kv_backend: kv_backend.clone(),
448 meta_peer_client: meta_peer_client.clone(),
449 };
450 configurator
451 .configure(ddl_manager, ctx)
452 .await
453 .context(OtherSnafu)?
454 } else {
455 ddl_manager
456 };
457
458 let ddl_manager = Arc::new(ddl_manager);
459
460 let region_flush_ticker = if is_remote_wal {
461 let remote_wal_options = options.wal.remote_wal_options().unwrap();
462 let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
463 table_metadata_manager.clone(),
464 leader_region_registry.clone(),
465 topic_stats_registry.clone(),
466 mailbox.clone(),
467 options.grpc.server_addr.clone(),
468 remote_wal_options.flush_trigger_size,
469 remote_wal_options.checkpoint_trigger_size,
470 remote_wal_options.region_flush_trigger_interval,
471 remote_wal_options.periodic_checkpoint_persist_interval,
472 );
473 region_flush_trigger.try_start()?;
474
475 Some(Arc::new(region_flush_ticker))
476 } else {
477 None
478 };
479
480 let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
482 let (tx, rx) = WalPruneManager::channel();
483 let remote_wal_options = options.wal.remote_wal_options().unwrap();
485 let kafka_client = build_kafka_client(&remote_wal_options.connection)
486 .await
487 .context(error::BuildKafkaClientSnafu)?;
488 let wal_prune_context = WalPruneContext {
489 client: Arc::new(kafka_client),
490 table_metadata_manager: table_metadata_manager.clone(),
491 leader_region_registry: leader_region_registry.clone(),
492 };
493 let wal_prune_manager = WalPruneManager::new(
494 remote_wal_options.auto_prune_parallelism,
495 remote_wal_options.auto_prune_logical_delete,
496 rx,
497 procedure_manager.clone(),
498 wal_prune_context,
499 );
500 wal_prune_manager.try_start().await?;
502 let wal_prune_ticker = Arc::new(WalPruneTicker::new(
503 remote_wal_options.auto_prune_interval,
504 tx.clone(),
505 ));
506 Some(wal_prune_ticker)
507 } else {
508 None
509 };
510
511 let gc_ticker = if options.gc.enable {
512 let (gc_scheduler, gc_ticker) = GcScheduler::new_with_config(
513 table_metadata_manager.clone(),
514 procedure_manager.clone(),
515 meta_peer_client.clone(),
516 mailbox.clone(),
517 options.grpc.server_addr.clone(),
518 options.gc.clone(),
519 )?;
520 gc_scheduler.try_start()?;
521
522 Some(Arc::new(gc_ticker))
523 } else {
524 None
525 };
526
527 let customized_region_lease_renewer = plugins
528 .as_ref()
529 .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
530
531 let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
532 let inserter = Box::new(InsertForwarder::new(
533 database_operator.clone(),
534 Some(InsertOptions {
535 ttl: options.stats_persistence.ttl,
536 append_mode: true,
537 twcs_compaction_time_window: Some(
538 REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
539 ),
540 }),
541 ));
542
543 Some(PersistStatsHandler::new(
544 inserter,
545 options.stats_persistence.interval,
546 ))
547 } else {
548 None
549 };
550
551 let handler_group_builder = match handler_group_builder {
552 Some(handler_group_builder) => handler_group_builder,
553 None => {
554 let region_lease_handler = RegionLeaseHandler::new(
555 default_distributed_time_constants().region_lease.as_secs(),
556 table_metadata_manager.clone(),
557 memory_region_keeper.clone(),
558 customized_region_lease_renewer,
559 );
560
561 HeartbeatHandlerGroupBuilder::new(pushers)
562 .with_plugins(plugins.clone())
563 .with_region_failure_handler(region_failover_handler)
564 .with_region_lease_handler(Some(region_lease_handler))
565 .with_flush_stats_factor(Some(options.flush_stats_factor))
566 .with_flow_state_handler(Some(flow_state_handler))
567 .with_persist_stats_handler(persist_region_stats_handler)
568 .add_default_handlers()
569 }
570 };
571
572 let enable_telemetry = options.enable_telemetry;
573 let metasrv_home = Path::new(&options.data_home)
574 .join(METASRV_DATA_DIR)
575 .to_string_lossy()
576 .to_string();
577
578 let reconciliation_manager = Arc::new(ReconciliationManager::new(
579 node_manager.clone(),
580 table_metadata_manager.clone(),
581 cache_invalidator.clone(),
582 procedure_manager.clone(),
583 ));
584 reconciliation_manager
585 .try_start()
586 .context(error::InitReconciliationManagerSnafu)?;
587
588 let mut resource_stat = ResourceStatImpl::default();
589 resource_stat.start_collect_cpu_usage();
590
591 Ok(Metasrv {
592 state,
593 started: Arc::new(AtomicBool::new(false)),
594 start_time_ms: common_time::util::current_time_millis() as u64,
595 options,
596 in_memory,
597 kv_backend,
598 leader_cached_kv_backend,
599 meta_peer_client: meta_peer_client.clone(),
600 selector,
601 selector_ctx,
602 flow_selector,
604 handler_group: RwLock::new(None),
605 handler_group_builder: Mutex::new(Some(handler_group_builder)),
606 election,
607 procedure_manager,
608 mailbox,
609 ddl_manager,
610 wal_provider,
611 table_metadata_manager,
612 runtime_switch_manager,
613 greptimedb_telemetry_task: get_greptimedb_telemetry_task(
614 Some(metasrv_home),
615 meta_peer_client,
616 enable_telemetry,
617 )
618 .await,
619 plugins: plugins.unwrap_or_else(Plugins::default),
620 memory_region_keeper,
621 region_migration_manager,
622 region_supervisor_ticker,
623 cache_invalidator,
624 leader_region_registry,
625 wal_prune_ticker,
626 region_flush_ticker,
627 table_id_allocator,
628 reconciliation_manager,
629 topic_stats_registry,
630 resource_stat: Arc::new(resource_stat),
631 gc_ticker,
632 database_operator,
633 })
634 }
635}
636
637fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
638 let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
639 .initial(1)
640 .step(100)
641 .build();
642
643 HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
644}
645
646fn build_procedure_manager(
647 options: &MetasrvOptions,
648 kv_backend: &KvBackendRef,
649 runtime_switch_manager: &RuntimeSwitchManagerRef,
650 event_recorder: EventRecorderRef,
651) -> ProcedureManagerRef {
652 let manager_config = ManagerConfig {
653 max_retry_times: options.procedure.max_retry_times,
654 retry_delay: options.procedure.retry_delay,
655 max_running_procedures: options.procedure.max_running_procedures,
656 ..Default::default()
657 };
658 let kv_state_store = Arc::new(
659 KvStateStore::new(kv_backend.clone()).with_max_value_size(
660 options
661 .procedure
662 .max_metadata_value_size
663 .map(|v| v.as_bytes() as usize),
664 ),
665 );
666
667 Arc::new(LocalManager::new(
668 manager_config,
669 kv_state_store.clone(),
670 kv_state_store,
671 Some(runtime_switch_manager.clone()),
672 Some(event_recorder),
673 ))
674}
675
676impl Default for MetasrvBuilder {
677 fn default() -> Self {
678 Self::new()
679 }
680}
681
682pub struct DdlManagerConfigureContext {
684 pub kv_backend: KvBackendRef,
685 pub meta_peer_client: MetaPeerClientRef,
686}