meta_srv/metasrv/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{DEFAULT_COMPACTION_TIME_WINDOW, EventRecorderImpl, EventRecorderRef};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29    DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
32use common_meta::distributed_time_constants::{self};
33use common_meta::key::TableMetadataManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::flow::flow_state::FlowStateManager;
36use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
47use common_procedure::ProcedureManagerRef;
48use common_procedure::local::{LocalManager, ManagerConfig};
49use common_stat::ResourceStatImpl;
50use common_telemetry::{info, warn};
51use snafu::{ResultExt, ensure};
52use store_api::storage::MAX_REGION_SEQ;
53
54use crate::bootstrap::build_default_meta_peer_client;
55use crate::cache_invalidator::MetasrvCacheInvalidator;
56use crate::cluster::MetaPeerClientRef;
57use crate::error::{self, BuildWalOptionsAllocatorSnafu, OtherSnafu, Result};
58use crate::events::EventHandlerImpl;
59use crate::gc::GcScheduler;
60use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
61use crate::handler::failure_handler::RegionFailureHandler;
62use crate::handler::flow_state_handler::FlowStateHandler;
63use crate::handler::persist_stats_handler::PersistStatsHandler;
64use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
65use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
66use crate::metasrv::{
67    ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions,
68    RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ,
69};
70use crate::peer::MetasrvPeerAllocator;
71use crate::procedure::region_migration::DefaultContextFactory;
72use crate::procedure::region_migration::manager::RegionMigrationManager;
73use crate::procedure::wal_prune::Context as WalPruneContext;
74use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
75use crate::region::flush_trigger::RegionFlushTrigger;
76use crate::region::supervisor::{
77    DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL, HeartbeatAcceptor,
78    RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
79    RegionSupervisorTicker,
80};
81use crate::selector::lease_based::LeaseBasedSelector;
82use crate::selector::round_robin::RoundRobinSelector;
83use crate::service::mailbox::MailboxRef;
84use crate::service::store::cached_kv::LeaderCachedKvBackend;
85use crate::state::State;
86use crate::utils::insert_forwarder::InsertForwarder;
87
88/// The time window for twcs compaction of the region stats table.
89const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
90
91// TODO(fys): try use derive_builder macro
92pub struct MetasrvBuilder {
93    options: Option<MetasrvOptions>,
94    kv_backend: Option<KvBackendRef>,
95    in_memory: Option<ResettableKvBackendRef>,
96    selector: Option<SelectorRef>,
97    handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
98    election: Option<ElectionRef>,
99    meta_peer_client: Option<MetaPeerClientRef>,
100    node_manager: Option<NodeManagerRef>,
101    plugins: Option<Plugins>,
102    table_metadata_allocator: Option<TableMetadataAllocatorRef>,
103}
104
105impl MetasrvBuilder {
106    pub fn new() -> Self {
107        Self {
108            kv_backend: None,
109            in_memory: None,
110            selector: None,
111            handler_group_builder: None,
112            meta_peer_client: None,
113            election: None,
114            options: None,
115            node_manager: None,
116            plugins: None,
117            table_metadata_allocator: None,
118        }
119    }
120
121    pub fn options(mut self, options: MetasrvOptions) -> Self {
122        self.options = Some(options);
123        self
124    }
125
126    pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
127        self.kv_backend = Some(kv_backend);
128        self
129    }
130
131    pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
132        self.in_memory = Some(in_memory);
133        self
134    }
135
136    pub fn selector(mut self, selector: SelectorRef) -> Self {
137        self.selector = Some(selector);
138        self
139    }
140
141    pub fn heartbeat_handler(
142        mut self,
143        handler_group_builder: HeartbeatHandlerGroupBuilder,
144    ) -> Self {
145        self.handler_group_builder = Some(handler_group_builder);
146        self
147    }
148
149    pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
150        self.meta_peer_client = Some(meta_peer_client);
151        self
152    }
153
154    pub fn election(mut self, election: Option<ElectionRef>) -> Self {
155        self.election = election;
156        self
157    }
158
159    pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
160        self.node_manager = Some(node_manager);
161        self
162    }
163
164    pub fn plugins(mut self, plugins: Plugins) -> Self {
165        self.plugins = Some(plugins);
166        self
167    }
168
169    pub fn table_metadata_allocator(
170        mut self,
171        table_metadata_allocator: TableMetadataAllocatorRef,
172    ) -> Self {
173        self.table_metadata_allocator = Some(table_metadata_allocator);
174        self
175    }
176
177    pub async fn build(self) -> Result<Metasrv> {
178        let MetasrvBuilder {
179            election,
180            meta_peer_client,
181            options,
182            kv_backend,
183            in_memory,
184            selector,
185            handler_group_builder,
186            node_manager,
187            plugins,
188            table_metadata_allocator,
189        } = self;
190
191        let options = options.unwrap_or_default();
192
193        let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
194        let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
195
196        let state = Arc::new(RwLock::new(match election {
197            None => State::leader(options.grpc.server_addr.clone(), true),
198            Some(_) => State::follower(options.grpc.server_addr.clone()),
199        }));
200
201        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
202            state.clone(),
203            kv_backend.clone(),
204        ));
205
206        let meta_peer_client = meta_peer_client
207            .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
208
209        let event_inserter = Box::new(InsertForwarder::new(
210            meta_peer_client.clone(),
211            Some(InsertOptions {
212                ttl: options.event_recorder.ttl,
213                append_mode: true,
214                twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
215            }),
216        ));
217        // Builds the event recorder to record important events and persist them as the system table.
218        let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
219            event_inserter,
220        ))));
221
222        let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
223        let pushers = Pushers::default();
224        let mailbox = build_mailbox(&kv_backend, &pushers);
225        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
226        let procedure_manager = build_procedure_manager(
227            &options,
228            &kv_backend,
229            &runtime_switch_manager,
230            event_recorder,
231        );
232
233        let table_metadata_manager = Arc::new(TableMetadataManager::new(
234            leader_cached_kv_backend.clone() as _,
235        ));
236        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
237            leader_cached_kv_backend.clone() as _,
238        ));
239
240        let selector_ctx = SelectorContext {
241            peer_discovery: meta_peer_client.clone(),
242        };
243
244        let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
245            .await
246            .context(BuildWalOptionsAllocatorSnafu)?;
247        let wal_options_allocator = Arc::new(wal_options_allocator);
248        let is_remote_wal = wal_options_allocator.is_remote_wal();
249        let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
250            let sequence = Arc::new(
251                SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
252                    .initial(MIN_USER_TABLE_ID as u64)
253                    .step(10)
254                    .build(),
255            );
256            let peer_allocator = Arc::new(
257                MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone())
258                    .with_max_items(MAX_REGION_SEQ),
259            );
260            Arc::new(TableMetadataAllocator::with_peer_allocator(
261                sequence,
262                wal_options_allocator.clone(),
263                peer_allocator,
264            ))
265        });
266        let table_id_sequence = table_metadata_allocator.table_id_sequence();
267
268        let flow_selector =
269            Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
270
271        let flow_metadata_allocator = {
272            // for now flownode just use round-robin selector
273            let flow_selector_ctx = selector_ctx.clone();
274            let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
275                flow_selector_ctx,
276                flow_selector.clone(),
277            ));
278            let seq = Arc::new(
279                SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
280                    .initial(MIN_USER_FLOW_ID as u64)
281                    .step(10)
282                    .build(),
283            );
284
285            Arc::new(FlowMetadataAllocator::with_peer_allocator(
286                seq,
287                peer_allocator,
288            ))
289        };
290        let flow_state_handler =
291            FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
292
293        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
294        let node_manager = node_manager.unwrap_or_else(|| {
295            let datanode_client_channel_config = ChannelConfig::new()
296                .timeout(options.datanode.client.timeout)
297                .connect_timeout(options.datanode.client.connect_timeout)
298                .tcp_nodelay(options.datanode.client.tcp_nodelay);
299            Arc::new(NodeClients::new(datanode_client_channel_config))
300        });
301        let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
302            mailbox.clone(),
303            MetasrvInfo {
304                server_addr: options.grpc.server_addr.clone(),
305            },
306        ));
307
308        if !is_remote_wal && options.enable_region_failover {
309            ensure!(
310                options.allow_region_failover_on_local_wal,
311                error::UnexpectedSnafu {
312                    violated: "Region failover is not supported in the local WAL implementation!
313                    If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
314                }
315            );
316            if options.allow_region_failover_on_local_wal {
317                warn!(
318                    "Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!"
319                );
320            }
321        }
322
323        let (tx, rx) = RegionSupervisor::channel();
324        let (region_failure_detector_controller, region_supervisor_ticker): (
325            RegionFailureDetectorControllerRef,
326            Option<std::sync::Arc<RegionSupervisorTicker>>,
327        ) = if options.enable_region_failover {
328            (
329                Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
330                Some(Arc::new(RegionSupervisorTicker::new(
331                    DEFAULT_TICK_INTERVAL,
332                    options.region_failure_detector_initialization_delay,
333                    DEFAULT_INITIALIZATION_RETRY_PERIOD,
334                    tx.clone(),
335                ))),
336            )
337        } else {
338            (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
339        };
340
341        // region migration manager
342        let region_migration_manager = Arc::new(RegionMigrationManager::new(
343            procedure_manager.clone(),
344            DefaultContextFactory::new(
345                in_memory.clone(),
346                table_metadata_manager.clone(),
347                memory_region_keeper.clone(),
348                region_failure_detector_controller.clone(),
349                mailbox.clone(),
350                options.grpc.server_addr.clone(),
351                cache_invalidator.clone(),
352            ),
353        ));
354        region_migration_manager.try_start()?;
355        let region_supervisor_selector = plugins
356            .as_ref()
357            .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
358
359        let supervisor_selector = match region_supervisor_selector {
360            Some(selector) => {
361                info!("Using region stat aware selector");
362                RegionSupervisorSelector::RegionStatAwareSelector(selector)
363            }
364            None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
365        };
366
367        let region_failover_handler = if options.enable_region_failover {
368            let region_supervisor = RegionSupervisor::new(
369                rx,
370                options.failure_detector,
371                selector_ctx.clone(),
372                supervisor_selector,
373                region_migration_manager.clone(),
374                runtime_switch_manager.clone(),
375                meta_peer_client.clone(),
376                leader_cached_kv_backend.clone(),
377            )
378            .with_state(state.clone());
379
380            Some(RegionFailureHandler::new(
381                region_supervisor,
382                HeartbeatAcceptor::new(tx),
383            ))
384        } else {
385            None
386        };
387
388        let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
389        let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
390
391        let ddl_context = DdlContext {
392            node_manager: node_manager.clone(),
393            cache_invalidator: cache_invalidator.clone(),
394            memory_region_keeper: memory_region_keeper.clone(),
395            leader_region_registry: leader_region_registry.clone(),
396            table_metadata_manager: table_metadata_manager.clone(),
397            table_metadata_allocator: table_metadata_allocator.clone(),
398            flow_metadata_manager: flow_metadata_manager.clone(),
399            flow_metadata_allocator: flow_metadata_allocator.clone(),
400            region_failure_detector_controller,
401        };
402        let procedure_manager_c = procedure_manager.clone();
403        let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
404            .context(error::InitDdlManagerSnafu)?;
405
406        let ddl_manager = if let Some(configurator) = plugins
407            .as_ref()
408            .and_then(|p| p.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>())
409        {
410            let ctx = DdlManagerConfigureContext {
411                kv_backend: kv_backend.clone(),
412                meta_peer_client: meta_peer_client.clone(),
413            };
414            configurator
415                .configure(ddl_manager, ctx)
416                .await
417                .context(OtherSnafu)?
418        } else {
419            ddl_manager
420        };
421
422        let ddl_manager = Arc::new(ddl_manager);
423
424        let region_flush_ticker = if is_remote_wal {
425            let remote_wal_options = options.wal.remote_wal_options().unwrap();
426            let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
427                table_metadata_manager.clone(),
428                leader_region_registry.clone(),
429                topic_stats_registry.clone(),
430                mailbox.clone(),
431                options.grpc.server_addr.clone(),
432                remote_wal_options.flush_trigger_size,
433                remote_wal_options.checkpoint_trigger_size,
434            );
435            region_flush_trigger.try_start()?;
436
437            Some(Arc::new(region_flush_ticker))
438        } else {
439            None
440        };
441
442        // remote WAL prune ticker and manager
443        let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
444            let (tx, rx) = WalPruneManager::channel();
445            // Safety: Must be remote WAL.
446            let remote_wal_options = options.wal.remote_wal_options().unwrap();
447            let kafka_client = build_kafka_client(&remote_wal_options.connection)
448                .await
449                .context(error::BuildKafkaClientSnafu)?;
450            let wal_prune_context = WalPruneContext {
451                client: Arc::new(kafka_client),
452                table_metadata_manager: table_metadata_manager.clone(),
453                leader_region_registry: leader_region_registry.clone(),
454            };
455            let wal_prune_manager = WalPruneManager::new(
456                remote_wal_options.auto_prune_parallelism,
457                rx,
458                procedure_manager.clone(),
459                wal_prune_context,
460            );
461            // Start manager in background. Ticker will be started in the main thread to send ticks.
462            wal_prune_manager.try_start().await?;
463            let wal_prune_ticker = Arc::new(WalPruneTicker::new(
464                remote_wal_options.auto_prune_interval,
465                tx.clone(),
466            ));
467            Some(wal_prune_ticker)
468        } else {
469            None
470        };
471
472        let gc_ticker = if options.gc.enable {
473            let (gc_scheduler, gc_ticker) = GcScheduler::new_with_config(
474                table_metadata_manager.clone(),
475                procedure_manager.clone(),
476                meta_peer_client.clone(),
477                mailbox.clone(),
478                options.grpc.server_addr.clone(),
479                options.gc.clone(),
480            )?;
481            gc_scheduler.try_start()?;
482
483            Some(Arc::new(gc_ticker))
484        } else {
485            None
486        };
487
488        let customized_region_lease_renewer = plugins
489            .as_ref()
490            .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
491
492        let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
493            let inserter = Box::new(InsertForwarder::new(
494                meta_peer_client.clone(),
495                Some(InsertOptions {
496                    ttl: options.stats_persistence.ttl,
497                    append_mode: true,
498                    twcs_compaction_time_window: Some(
499                        REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
500                    ),
501                }),
502            ));
503
504            Some(PersistStatsHandler::new(
505                inserter,
506                options.stats_persistence.interval,
507            ))
508        } else {
509            None
510        };
511
512        let handler_group_builder = match handler_group_builder {
513            Some(handler_group_builder) => handler_group_builder,
514            None => {
515                let region_lease_handler = RegionLeaseHandler::new(
516                    distributed_time_constants::REGION_LEASE_SECS,
517                    table_metadata_manager.clone(),
518                    memory_region_keeper.clone(),
519                    customized_region_lease_renewer,
520                );
521
522                HeartbeatHandlerGroupBuilder::new(pushers)
523                    .with_plugins(plugins.clone())
524                    .with_region_failure_handler(region_failover_handler)
525                    .with_region_lease_handler(Some(region_lease_handler))
526                    .with_flush_stats_factor(Some(options.flush_stats_factor))
527                    .with_flow_state_handler(Some(flow_state_handler))
528                    .with_persist_stats_handler(persist_region_stats_handler)
529                    .add_default_handlers()
530            }
531        };
532
533        let enable_telemetry = options.enable_telemetry;
534        let metasrv_home = Path::new(&options.data_home)
535            .join(METASRV_DATA_DIR)
536            .to_string_lossy()
537            .to_string();
538
539        let reconciliation_manager = Arc::new(ReconciliationManager::new(
540            node_manager.clone(),
541            table_metadata_manager.clone(),
542            cache_invalidator.clone(),
543            procedure_manager.clone(),
544        ));
545        reconciliation_manager
546            .try_start()
547            .context(error::InitReconciliationManagerSnafu)?;
548
549        let mut resource_stat = ResourceStatImpl::default();
550        resource_stat.start_collect_cpu_usage();
551
552        Ok(Metasrv {
553            state,
554            started: Arc::new(AtomicBool::new(false)),
555            start_time_ms: common_time::util::current_time_millis() as u64,
556            options,
557            in_memory,
558            kv_backend,
559            leader_cached_kv_backend,
560            meta_peer_client: meta_peer_client.clone(),
561            selector,
562            selector_ctx,
563            // TODO(jeremy): We do not allow configuring the flow selector.
564            flow_selector,
565            handler_group: RwLock::new(None),
566            handler_group_builder: Mutex::new(Some(handler_group_builder)),
567            election,
568            procedure_manager,
569            mailbox,
570            ddl_manager,
571            wal_options_allocator,
572            table_metadata_manager,
573            runtime_switch_manager,
574            greptimedb_telemetry_task: get_greptimedb_telemetry_task(
575                Some(metasrv_home),
576                meta_peer_client,
577                enable_telemetry,
578            )
579            .await,
580            plugins: plugins.unwrap_or_else(Plugins::default),
581            memory_region_keeper,
582            region_migration_manager,
583            region_supervisor_ticker,
584            cache_invalidator,
585            leader_region_registry,
586            wal_prune_ticker,
587            region_flush_ticker,
588            table_id_sequence,
589            reconciliation_manager,
590            topic_stats_registry,
591            resource_stat: Arc::new(resource_stat),
592            gc_ticker,
593        })
594    }
595}
596
597fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
598    let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
599        .initial(1)
600        .step(100)
601        .build();
602
603    HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
604}
605
606fn build_procedure_manager(
607    options: &MetasrvOptions,
608    kv_backend: &KvBackendRef,
609    runtime_switch_manager: &RuntimeSwitchManagerRef,
610    event_recorder: EventRecorderRef,
611) -> ProcedureManagerRef {
612    let manager_config = ManagerConfig {
613        max_retry_times: options.procedure.max_retry_times,
614        retry_delay: options.procedure.retry_delay,
615        max_running_procedures: options.procedure.max_running_procedures,
616        ..Default::default()
617    };
618    let kv_state_store = Arc::new(
619        KvStateStore::new(kv_backend.clone()).with_max_value_size(
620            options
621                .procedure
622                .max_metadata_value_size
623                .map(|v| v.as_bytes() as usize),
624        ),
625    );
626
627    Arc::new(LocalManager::new(
628        manager_config,
629        kv_state_store.clone(),
630        kv_state_store,
631        Some(runtime_switch_manager.clone()),
632        Some(event_recorder),
633    ))
634}
635
636impl Default for MetasrvBuilder {
637    fn default() -> Self {
638        Self::new()
639    }
640}
641
642/// The context for [`DdlManagerConfiguratorRef`].
643pub struct DdlManagerConfigureContext {
644    pub kv_backend: KvBackendRef,
645    pub meta_peer_client: MetaPeerClientRef,
646}