meta_srv/metasrv/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{DEFAULT_COMPACTION_TIME_WINDOW, EventRecorderImpl, EventRecorderRef};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29    DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
32use common_meta::distributed_time_constants::default_distributed_time_constants;
33use common_meta::key::TableMetadataManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::flow::flow_state::FlowStateManager;
36use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_provider::{build_kafka_client, build_wal_provider};
47use common_procedure::ProcedureManagerRef;
48use common_procedure::local::{LocalManager, ManagerConfig};
49use common_stat::ResourceStatImpl;
50use common_telemetry::{info, warn};
51use snafu::{ResultExt, ensure};
52use store_api::storage::MAX_REGION_SEQ;
53
54use crate::bootstrap::build_default_meta_peer_client;
55use crate::cache_invalidator::MetasrvCacheInvalidator;
56use crate::cluster::MetaPeerClientRef;
57use crate::error::{self, BuildWalProviderSnafu, OtherSnafu, Result};
58use crate::events::EventHandlerImpl;
59use crate::gc::GcScheduler;
60use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
61use crate::handler::failure_handler::RegionFailureHandler;
62use crate::handler::flow_state_handler::FlowStateHandler;
63use crate::handler::persist_stats_handler::PersistStatsHandler;
64use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
65use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
66use crate::metasrv::{
67    ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions,
68    RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ,
69};
70use crate::peer::MetasrvPeerAllocator;
71use crate::procedure::region_migration::DefaultContextFactory;
72use crate::procedure::region_migration::manager::RegionMigrationManager;
73use crate::procedure::repartition::DefaultRepartitionProcedureFactory;
74use crate::procedure::wal_prune::Context as WalPruneContext;
75use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
76use crate::region::flush_trigger::RegionFlushTrigger;
77use crate::region::supervisor::{
78    DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL, HeartbeatAcceptor,
79    RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
80    RegionSupervisorTicker,
81};
82use crate::selector::lease_based::LeaseBasedSelector;
83use crate::selector::round_robin::RoundRobinSelector;
84use crate::service::mailbox::MailboxRef;
85use crate::service::store::cached_kv::LeaderCachedKvBackend;
86use crate::state::State;
87use crate::utils::insert_forwarder::InsertForwarder;
88
89/// The time window for twcs compaction of the region stats table.
90const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
91
92// TODO(fys): try use derive_builder macro
93pub struct MetasrvBuilder {
94    options: Option<MetasrvOptions>,
95    kv_backend: Option<KvBackendRef>,
96    in_memory: Option<ResettableKvBackendRef>,
97    selector: Option<SelectorRef>,
98    handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
99    election: Option<ElectionRef>,
100    meta_peer_client: Option<MetaPeerClientRef>,
101    node_manager: Option<NodeManagerRef>,
102    plugins: Option<Plugins>,
103    table_metadata_allocator: Option<TableMetadataAllocatorRef>,
104}
105
106impl MetasrvBuilder {
107    pub fn new() -> Self {
108        Self {
109            kv_backend: None,
110            in_memory: None,
111            selector: None,
112            handler_group_builder: None,
113            meta_peer_client: None,
114            election: None,
115            options: None,
116            node_manager: None,
117            plugins: None,
118            table_metadata_allocator: None,
119        }
120    }
121
122    pub fn options(mut self, options: MetasrvOptions) -> Self {
123        self.options = Some(options);
124        self
125    }
126
127    pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
128        self.kv_backend = Some(kv_backend);
129        self
130    }
131
132    pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
133        self.in_memory = Some(in_memory);
134        self
135    }
136
137    pub fn selector(mut self, selector: SelectorRef) -> Self {
138        self.selector = Some(selector);
139        self
140    }
141
142    pub fn heartbeat_handler(
143        mut self,
144        handler_group_builder: HeartbeatHandlerGroupBuilder,
145    ) -> Self {
146        self.handler_group_builder = Some(handler_group_builder);
147        self
148    }
149
150    pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
151        self.meta_peer_client = Some(meta_peer_client);
152        self
153    }
154
155    pub fn election(mut self, election: Option<ElectionRef>) -> Self {
156        self.election = election;
157        self
158    }
159
160    pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
161        self.node_manager = Some(node_manager);
162        self
163    }
164
165    pub fn plugins(mut self, plugins: Plugins) -> Self {
166        self.plugins = Some(plugins);
167        self
168    }
169
170    pub fn table_metadata_allocator(
171        mut self,
172        table_metadata_allocator: TableMetadataAllocatorRef,
173    ) -> Self {
174        self.table_metadata_allocator = Some(table_metadata_allocator);
175        self
176    }
177
178    pub async fn build(self) -> Result<Metasrv> {
179        let MetasrvBuilder {
180            election,
181            meta_peer_client,
182            options,
183            kv_backend,
184            in_memory,
185            selector,
186            handler_group_builder,
187            node_manager,
188            plugins,
189            table_metadata_allocator,
190        } = self;
191
192        let options = options.unwrap_or_default();
193
194        let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
195        let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
196
197        let state = Arc::new(RwLock::new(match election {
198            None => State::leader(options.grpc.server_addr.clone(), true),
199            Some(_) => State::follower(options.grpc.server_addr.clone()),
200        }));
201
202        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
203            state.clone(),
204            kv_backend.clone(),
205        ));
206
207        let meta_peer_client = meta_peer_client
208            .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
209
210        let event_inserter = Box::new(InsertForwarder::new(
211            meta_peer_client.clone(),
212            Some(InsertOptions {
213                ttl: options.event_recorder.ttl,
214                append_mode: true,
215                twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
216            }),
217        ));
218        // Builds the event recorder to record important events and persist them as the system table.
219        let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
220            event_inserter,
221        ))));
222
223        let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
224        let pushers = Pushers::default();
225        let mailbox = build_mailbox(&kv_backend, &pushers);
226        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
227        let procedure_manager = build_procedure_manager(
228            &options,
229            &kv_backend,
230            &runtime_switch_manager,
231            event_recorder,
232        );
233
234        let table_metadata_manager = Arc::new(TableMetadataManager::new(
235            leader_cached_kv_backend.clone() as _,
236        ));
237        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
238            leader_cached_kv_backend.clone() as _,
239        ));
240
241        let selector_ctx = SelectorContext {
242            peer_discovery: meta_peer_client.clone(),
243        };
244
245        let wal_provider = build_wal_provider(&options.wal, kv_backend.clone())
246            .await
247            .context(BuildWalProviderSnafu)?;
248        let wal_provider = Arc::new(wal_provider);
249        let is_remote_wal = wal_provider.is_remote_wal();
250        let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
251            let sequence = Arc::new(
252                SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
253                    .initial(MIN_USER_TABLE_ID as u64)
254                    .step(10)
255                    .build(),
256            );
257            let peer_allocator = Arc::new(
258                MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone())
259                    .with_max_items(MAX_REGION_SEQ),
260            );
261            Arc::new(TableMetadataAllocator::with_peer_allocator(
262                sequence,
263                wal_provider.clone(),
264                peer_allocator,
265            ))
266        });
267        let table_id_allocator = table_metadata_allocator.table_id_allocator();
268
269        let flow_selector =
270            Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
271
272        let flow_metadata_allocator = {
273            // for now flownode just use round-robin selector
274            let flow_selector_ctx = selector_ctx.clone();
275            let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
276                flow_selector_ctx,
277                flow_selector.clone(),
278            ));
279            let seq = Arc::new(
280                SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
281                    .initial(MIN_USER_FLOW_ID as u64)
282                    .step(10)
283                    .build(),
284            );
285
286            Arc::new(FlowMetadataAllocator::with_peer_allocator(
287                seq,
288                peer_allocator,
289            ))
290        };
291        let flow_state_handler =
292            FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
293
294        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
295        let node_manager = node_manager.unwrap_or_else(|| {
296            let datanode_client_channel_config = ChannelConfig::new()
297                .timeout(Some(options.datanode.client.timeout))
298                .connect_timeout(options.datanode.client.connect_timeout)
299                .tcp_nodelay(options.datanode.client.tcp_nodelay);
300            Arc::new(NodeClients::new(datanode_client_channel_config))
301        });
302        let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
303            mailbox.clone(),
304            MetasrvInfo {
305                server_addr: options.grpc.server_addr.clone(),
306            },
307        ));
308
309        if !is_remote_wal && options.enable_region_failover {
310            ensure!(
311                options.allow_region_failover_on_local_wal,
312                error::UnexpectedSnafu {
313                    violated: "Region failover is not supported in the local WAL implementation!
314                    If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
315                }
316            );
317            if options.allow_region_failover_on_local_wal {
318                warn!(
319                    "Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!"
320                );
321            }
322        }
323
324        let (tx, rx) = RegionSupervisor::channel();
325        let (region_failure_detector_controller, region_supervisor_ticker): (
326            RegionFailureDetectorControllerRef,
327            Option<std::sync::Arc<RegionSupervisorTicker>>,
328        ) = if options.enable_region_failover {
329            (
330                Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
331                Some(Arc::new(RegionSupervisorTicker::new(
332                    DEFAULT_TICK_INTERVAL,
333                    options.region_failure_detector_initialization_delay,
334                    DEFAULT_INITIALIZATION_RETRY_PERIOD,
335                    tx.clone(),
336                ))),
337            )
338        } else {
339            (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
340        };
341
342        // region migration manager
343        let region_migration_manager = Arc::new(RegionMigrationManager::new(
344            procedure_manager.clone(),
345            DefaultContextFactory::new(
346                in_memory.clone(),
347                table_metadata_manager.clone(),
348                memory_region_keeper.clone(),
349                region_failure_detector_controller.clone(),
350                mailbox.clone(),
351                options.grpc.server_addr.clone(),
352                cache_invalidator.clone(),
353            ),
354        ));
355        region_migration_manager.try_start()?;
356        let region_supervisor_selector = plugins
357            .as_ref()
358            .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
359
360        let supervisor_selector = match region_supervisor_selector {
361            Some(selector) => {
362                info!("Using region stat aware selector");
363                RegionSupervisorSelector::RegionStatAwareSelector(selector)
364            }
365            None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
366        };
367
368        let region_failover_handler = if options.enable_region_failover {
369            let region_supervisor = RegionSupervisor::new(
370                rx,
371                options.failure_detector,
372                selector_ctx.clone(),
373                supervisor_selector,
374                region_migration_manager.clone(),
375                runtime_switch_manager.clone(),
376                meta_peer_client.clone(),
377                leader_cached_kv_backend.clone(),
378            )
379            .with_state(state.clone());
380
381            Some(RegionFailureHandler::new(
382                region_supervisor,
383                HeartbeatAcceptor::new(tx),
384            ))
385        } else {
386            None
387        };
388
389        let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
390        let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
391
392        let ddl_context = DdlContext {
393            node_manager: node_manager.clone(),
394            cache_invalidator: cache_invalidator.clone(),
395            memory_region_keeper: memory_region_keeper.clone(),
396            leader_region_registry: leader_region_registry.clone(),
397            table_metadata_manager: table_metadata_manager.clone(),
398            table_metadata_allocator: table_metadata_allocator.clone(),
399            flow_metadata_manager: flow_metadata_manager.clone(),
400            flow_metadata_allocator: flow_metadata_allocator.clone(),
401            region_failure_detector_controller,
402        };
403        let procedure_manager_c = procedure_manager.clone();
404        let repartition_procedure_factory = Arc::new(DefaultRepartitionProcedureFactory::new(
405            mailbox.clone(),
406            options.grpc.server_addr.clone(),
407        ));
408        let ddl_manager = DdlManager::try_new(
409            ddl_context,
410            procedure_manager_c,
411            repartition_procedure_factory,
412            true,
413        )
414        .context(error::InitDdlManagerSnafu)?;
415
416        let ddl_manager = if let Some(configurator) = plugins
417            .as_ref()
418            .and_then(|p| p.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>())
419        {
420            let ctx = DdlManagerConfigureContext {
421                kv_backend: kv_backend.clone(),
422                meta_peer_client: meta_peer_client.clone(),
423            };
424            configurator
425                .configure(ddl_manager, ctx)
426                .await
427                .context(OtherSnafu)?
428        } else {
429            ddl_manager
430        };
431
432        let ddl_manager = Arc::new(ddl_manager);
433
434        let region_flush_ticker = if is_remote_wal {
435            let remote_wal_options = options.wal.remote_wal_options().unwrap();
436            let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
437                table_metadata_manager.clone(),
438                leader_region_registry.clone(),
439                topic_stats_registry.clone(),
440                mailbox.clone(),
441                options.grpc.server_addr.clone(),
442                remote_wal_options.flush_trigger_size,
443                remote_wal_options.checkpoint_trigger_size,
444            );
445            region_flush_trigger.try_start()?;
446
447            Some(Arc::new(region_flush_ticker))
448        } else {
449            None
450        };
451
452        // remote WAL prune ticker and manager
453        let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
454            let (tx, rx) = WalPruneManager::channel();
455            // Safety: Must be remote WAL.
456            let remote_wal_options = options.wal.remote_wal_options().unwrap();
457            let kafka_client = build_kafka_client(&remote_wal_options.connection)
458                .await
459                .context(error::BuildKafkaClientSnafu)?;
460            let wal_prune_context = WalPruneContext {
461                client: Arc::new(kafka_client),
462                table_metadata_manager: table_metadata_manager.clone(),
463                leader_region_registry: leader_region_registry.clone(),
464            };
465            let wal_prune_manager = WalPruneManager::new(
466                remote_wal_options.auto_prune_parallelism,
467                rx,
468                procedure_manager.clone(),
469                wal_prune_context,
470            );
471            // Start manager in background. Ticker will be started in the main thread to send ticks.
472            wal_prune_manager.try_start().await?;
473            let wal_prune_ticker = Arc::new(WalPruneTicker::new(
474                remote_wal_options.auto_prune_interval,
475                tx.clone(),
476            ));
477            Some(wal_prune_ticker)
478        } else {
479            None
480        };
481
482        let gc_ticker = if options.gc.enable {
483            let (gc_scheduler, gc_ticker) = GcScheduler::new_with_config(
484                table_metadata_manager.clone(),
485                procedure_manager.clone(),
486                meta_peer_client.clone(),
487                mailbox.clone(),
488                options.grpc.server_addr.clone(),
489                options.gc.clone(),
490            )?;
491            gc_scheduler.try_start()?;
492
493            Some(Arc::new(gc_ticker))
494        } else {
495            None
496        };
497
498        let customized_region_lease_renewer = plugins
499            .as_ref()
500            .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
501
502        let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
503            let inserter = Box::new(InsertForwarder::new(
504                meta_peer_client.clone(),
505                Some(InsertOptions {
506                    ttl: options.stats_persistence.ttl,
507                    append_mode: true,
508                    twcs_compaction_time_window: Some(
509                        REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
510                    ),
511                }),
512            ));
513
514            Some(PersistStatsHandler::new(
515                inserter,
516                options.stats_persistence.interval,
517            ))
518        } else {
519            None
520        };
521
522        let handler_group_builder = match handler_group_builder {
523            Some(handler_group_builder) => handler_group_builder,
524            None => {
525                let region_lease_handler = RegionLeaseHandler::new(
526                    default_distributed_time_constants().region_lease.as_secs(),
527                    table_metadata_manager.clone(),
528                    memory_region_keeper.clone(),
529                    customized_region_lease_renewer,
530                );
531
532                HeartbeatHandlerGroupBuilder::new(pushers)
533                    .with_plugins(plugins.clone())
534                    .with_region_failure_handler(region_failover_handler)
535                    .with_region_lease_handler(Some(region_lease_handler))
536                    .with_flush_stats_factor(Some(options.flush_stats_factor))
537                    .with_flow_state_handler(Some(flow_state_handler))
538                    .with_persist_stats_handler(persist_region_stats_handler)
539                    .add_default_handlers()
540            }
541        };
542
543        let enable_telemetry = options.enable_telemetry;
544        let metasrv_home = Path::new(&options.data_home)
545            .join(METASRV_DATA_DIR)
546            .to_string_lossy()
547            .to_string();
548
549        let reconciliation_manager = Arc::new(ReconciliationManager::new(
550            node_manager.clone(),
551            table_metadata_manager.clone(),
552            cache_invalidator.clone(),
553            procedure_manager.clone(),
554        ));
555        reconciliation_manager
556            .try_start()
557            .context(error::InitReconciliationManagerSnafu)?;
558
559        let mut resource_stat = ResourceStatImpl::default();
560        resource_stat.start_collect_cpu_usage();
561
562        Ok(Metasrv {
563            state,
564            started: Arc::new(AtomicBool::new(false)),
565            start_time_ms: common_time::util::current_time_millis() as u64,
566            options,
567            in_memory,
568            kv_backend,
569            leader_cached_kv_backend,
570            meta_peer_client: meta_peer_client.clone(),
571            selector,
572            selector_ctx,
573            // TODO(jeremy): We do not allow configuring the flow selector.
574            flow_selector,
575            handler_group: RwLock::new(None),
576            handler_group_builder: Mutex::new(Some(handler_group_builder)),
577            election,
578            procedure_manager,
579            mailbox,
580            ddl_manager,
581            wal_provider,
582            table_metadata_manager,
583            runtime_switch_manager,
584            greptimedb_telemetry_task: get_greptimedb_telemetry_task(
585                Some(metasrv_home),
586                meta_peer_client,
587                enable_telemetry,
588            )
589            .await,
590            plugins: plugins.unwrap_or_else(Plugins::default),
591            memory_region_keeper,
592            region_migration_manager,
593            region_supervisor_ticker,
594            cache_invalidator,
595            leader_region_registry,
596            wal_prune_ticker,
597            region_flush_ticker,
598            table_id_allocator,
599            reconciliation_manager,
600            topic_stats_registry,
601            resource_stat: Arc::new(resource_stat),
602            gc_ticker,
603        })
604    }
605}
606
607fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
608    let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
609        .initial(1)
610        .step(100)
611        .build();
612
613    HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
614}
615
616fn build_procedure_manager(
617    options: &MetasrvOptions,
618    kv_backend: &KvBackendRef,
619    runtime_switch_manager: &RuntimeSwitchManagerRef,
620    event_recorder: EventRecorderRef,
621) -> ProcedureManagerRef {
622    let manager_config = ManagerConfig {
623        max_retry_times: options.procedure.max_retry_times,
624        retry_delay: options.procedure.retry_delay,
625        max_running_procedures: options.procedure.max_running_procedures,
626        ..Default::default()
627    };
628    let kv_state_store = Arc::new(
629        KvStateStore::new(kv_backend.clone()).with_max_value_size(
630            options
631                .procedure
632                .max_metadata_value_size
633                .map(|v| v.as_bytes() as usize),
634        ),
635    );
636
637    Arc::new(LocalManager::new(
638        manager_config,
639        kv_state_store.clone(),
640        kv_state_store,
641        Some(runtime_switch_manager.clone()),
642        Some(event_recorder),
643    ))
644}
645
646impl Default for MetasrvBuilder {
647    fn default() -> Self {
648        Self::new()
649    }
650}
651
652/// The context for [`DdlManagerConfiguratorRef`].
653pub struct DdlManagerConfigureContext {
654    pub kv_backend: KvBackendRef,
655    pub meta_peer_client: MetaPeerClientRef,
656}