Skip to main content

meta_srv/metasrv/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{DEFAULT_COMPACTION_TIME_WINDOW, EventRecorderImpl, EventRecorderRef};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29    DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
32use common_meta::distributed_time_constants::default_distributed_time_constants;
33use common_meta::key::TableMetadataManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::flow::flow_state::FlowStateManager;
36use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_provider::{build_kafka_client, build_wal_provider};
47use common_procedure::ProcedureManagerRef;
48use common_procedure::local::{LocalManager, ManagerConfig};
49use common_stat::ResourceStatImpl;
50use common_telemetry::{info, warn};
51use snafu::{ResultExt, ensure};
52use store_api::storage::MAX_REGION_SEQ;
53
54use crate::bootstrap::build_default_meta_peer_client;
55use crate::cache_invalidator::MetasrvCacheInvalidator;
56use crate::cluster::MetaPeerClientRef;
57use crate::error::{self, BuildWalProviderSnafu, OtherSnafu, Result};
58use crate::events::EventHandlerImpl;
59use crate::gc::GcScheduler;
60use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
61use crate::handler::failure_handler::RegionFailureHandler;
62use crate::handler::flow_state_handler::FlowStateHandler;
63use crate::handler::persist_stats_handler::PersistStatsHandler;
64use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
65use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
66use crate::metasrv::{
67    ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions,
68    RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ,
69};
70use crate::peer::MetasrvPeerAllocator;
71use crate::procedure::region_migration::DefaultContextFactory;
72use crate::procedure::region_migration::manager::RegionMigrationManager;
73use crate::procedure::repartition::DefaultRepartitionProcedureFactory;
74use crate::procedure::wal_prune::Context as WalPruneContext;
75use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
76use crate::region::flush_trigger::RegionFlushTrigger;
77use crate::region::supervisor::{
78    DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL, HeartbeatAcceptor,
79    RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
80    RegionSupervisorTicker,
81};
82use crate::selector::lease_based::LeaseBasedSelector;
83use crate::selector::round_robin::RoundRobinSelector;
84use crate::service::mailbox::MailboxRef;
85use crate::service::store::cached_kv::LeaderCachedKvBackend;
86use crate::state::State;
87use crate::utils::database::DatabaseOperator;
88use crate::utils::insert_forwarder::InsertForwarder;
89
90/// The time window for twcs compaction of the region stats table.
91const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
92
93// TODO(fys): try use derive_builder macro
94pub struct MetasrvBuilder {
95    options: Option<MetasrvOptions>,
96    kv_backend: Option<KvBackendRef>,
97    in_memory: Option<ResettableKvBackendRef>,
98    selector: Option<SelectorRef>,
99    handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
100    election: Option<ElectionRef>,
101    meta_peer_client: Option<MetaPeerClientRef>,
102    node_manager: Option<NodeManagerRef>,
103    plugins: Option<Plugins>,
104    table_metadata_allocator: Option<TableMetadataAllocatorRef>,
105}
106
107impl MetasrvBuilder {
108    pub fn new() -> Self {
109        Self {
110            kv_backend: None,
111            in_memory: None,
112            selector: None,
113            handler_group_builder: None,
114            meta_peer_client: None,
115            election: None,
116            options: None,
117            node_manager: None,
118            plugins: None,
119            table_metadata_allocator: None,
120        }
121    }
122
123    pub fn options(mut self, options: MetasrvOptions) -> Self {
124        self.options = Some(options);
125        self
126    }
127
128    pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
129        self.kv_backend = Some(kv_backend);
130        self
131    }
132
133    pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
134        self.in_memory = Some(in_memory);
135        self
136    }
137
138    pub fn selector(mut self, selector: SelectorRef) -> Self {
139        self.selector = Some(selector);
140        self
141    }
142
143    pub fn heartbeat_handler(
144        mut self,
145        handler_group_builder: HeartbeatHandlerGroupBuilder,
146    ) -> Self {
147        self.handler_group_builder = Some(handler_group_builder);
148        self
149    }
150
151    pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
152        self.meta_peer_client = Some(meta_peer_client);
153        self
154    }
155
156    pub fn election(mut self, election: Option<ElectionRef>) -> Self {
157        self.election = election;
158        self
159    }
160
161    pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
162        self.node_manager = Some(node_manager);
163        self
164    }
165
166    pub fn plugins(mut self, plugins: Plugins) -> Self {
167        self.plugins = Some(plugins);
168        self
169    }
170
171    pub fn table_metadata_allocator(
172        mut self,
173        table_metadata_allocator: TableMetadataAllocatorRef,
174    ) -> Self {
175        self.table_metadata_allocator = Some(table_metadata_allocator);
176        self
177    }
178
179    pub fn options_ref(&self) -> Option<&MetasrvOptions> {
180        self.options.as_ref()
181    }
182
183    pub fn kv_backend_ref(&self) -> Option<&KvBackendRef> {
184        self.kv_backend.as_ref()
185    }
186
187    pub fn in_memory_ref(&self) -> Option<&ResettableKvBackendRef> {
188        self.in_memory.as_ref()
189    }
190
191    pub fn election_ref(&self) -> Option<&ElectionRef> {
192        self.election.as_ref()
193    }
194
195    pub fn meta_peer_client_ref(&self) -> Option<&MetaPeerClientRef> {
196        self.meta_peer_client.as_ref()
197    }
198
199    pub fn node_manager_ref(&self) -> Option<&NodeManagerRef> {
200        self.node_manager.as_ref()
201    }
202
203    pub async fn build(self) -> Result<Metasrv> {
204        let MetasrvBuilder {
205            election,
206            meta_peer_client,
207            options,
208            kv_backend,
209            in_memory,
210            selector,
211            handler_group_builder,
212            node_manager,
213            plugins,
214            table_metadata_allocator,
215        } = self;
216
217        let options = options.unwrap_or_default();
218
219        let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
220        let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
221
222        let state = Arc::new(RwLock::new(match election {
223            None => State::leader(options.grpc.server_addr.clone(), true),
224            Some(_) => State::follower(options.grpc.server_addr.clone()),
225        }));
226
227        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
228            state.clone(),
229            kv_backend.clone(),
230        ));
231
232        let meta_peer_client = meta_peer_client
233            .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
234        let database_operator = Arc::new(DatabaseOperator::new(meta_peer_client.clone()));
235
236        let event_inserter = Box::new(InsertForwarder::new(
237            database_operator.clone(),
238            Some(InsertOptions {
239                ttl: options.event_recorder.ttl,
240                append_mode: true,
241                twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
242            }),
243        ));
244        // Builds the event recorder to record important events and persist them as the system table.
245        let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
246            event_inserter,
247        ))));
248
249        let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
250        let pushers = Pushers::default();
251        let mailbox = build_mailbox(&kv_backend, &pushers);
252        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
253        let procedure_manager = build_procedure_manager(
254            &options,
255            &kv_backend,
256            &runtime_switch_manager,
257            event_recorder,
258        );
259
260        let table_metadata_manager = Arc::new(TableMetadataManager::new(
261            leader_cached_kv_backend.clone() as _,
262        ));
263        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
264            leader_cached_kv_backend.clone() as _,
265        ));
266
267        let selector_ctx = SelectorContext {
268            peer_discovery: meta_peer_client.clone(),
269        };
270
271        let wal_provider = build_wal_provider(&options.wal, kv_backend.clone())
272            .await
273            .context(BuildWalProviderSnafu)?;
274        let wal_provider = Arc::new(wal_provider);
275        let is_remote_wal = wal_provider.is_remote_wal();
276        let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
277            let sequence = Arc::new(
278                SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
279                    .initial(MIN_USER_TABLE_ID as u64)
280                    .step(10)
281                    .build(),
282            );
283            let peer_allocator = Arc::new(
284                MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone())
285                    .with_max_items(MAX_REGION_SEQ),
286            );
287            Arc::new(TableMetadataAllocator::with_peer_allocator(
288                sequence,
289                wal_provider.clone(),
290                peer_allocator,
291            ))
292        });
293        let table_id_allocator = table_metadata_allocator.table_id_allocator();
294
295        let flow_selector =
296            Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
297
298        let flow_metadata_allocator = {
299            // for now flownode just use round-robin selector
300            let flow_selector_ctx = selector_ctx.clone();
301            let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
302                flow_selector_ctx,
303                flow_selector.clone(),
304            ));
305            let seq = Arc::new(
306                SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
307                    .initial(MIN_USER_FLOW_ID as u64)
308                    .step(10)
309                    .build(),
310            );
311
312            Arc::new(FlowMetadataAllocator::with_peer_allocator(
313                seq,
314                peer_allocator,
315            ))
316        };
317        let flow_state_handler =
318            FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
319
320        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
321        let node_manager = node_manager.unwrap_or_else(|| {
322            let datanode_client_channel_config = ChannelConfig::new()
323                .timeout(Some(options.datanode.client.timeout))
324                .connect_timeout(options.datanode.client.connect_timeout)
325                .tcp_nodelay(options.datanode.client.tcp_nodelay);
326            Arc::new(NodeClients::new(datanode_client_channel_config))
327        });
328        let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
329            mailbox.clone(),
330            MetasrvInfo {
331                server_addr: options.grpc.server_addr.clone(),
332            },
333        ));
334
335        if !is_remote_wal && options.enable_region_failover {
336            ensure!(
337                options.allow_region_failover_on_local_wal,
338                error::UnexpectedSnafu {
339                    violated: "Region failover is not supported in the local WAL implementation!
340                    If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
341                }
342            );
343            if options.allow_region_failover_on_local_wal {
344                warn!(
345                    "Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!"
346                );
347            }
348        }
349
350        let (tx, rx) = RegionSupervisor::channel();
351        let (region_failure_detector_controller, region_supervisor_ticker): (
352            RegionFailureDetectorControllerRef,
353            Option<std::sync::Arc<RegionSupervisorTicker>>,
354        ) = if options.enable_region_failover {
355            (
356                Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
357                Some(Arc::new(RegionSupervisorTicker::new(
358                    DEFAULT_TICK_INTERVAL,
359                    options.region_failure_detector_initialization_delay,
360                    DEFAULT_INITIALIZATION_RETRY_PERIOD,
361                    tx.clone(),
362                ))),
363            )
364        } else {
365            (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
366        };
367
368        // region migration manager
369        let region_migration_manager = Arc::new(RegionMigrationManager::new(
370            procedure_manager.clone(),
371            DefaultContextFactory::new(
372                in_memory.clone(),
373                table_metadata_manager.clone(),
374                memory_region_keeper.clone(),
375                region_failure_detector_controller.clone(),
376                mailbox.clone(),
377                options.grpc.server_addr.clone(),
378                cache_invalidator.clone(),
379            ),
380        ));
381        region_migration_manager.try_start()?;
382        let region_supervisor_selector = plugins
383            .as_ref()
384            .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
385
386        let supervisor_selector = match region_supervisor_selector {
387            Some(selector) => {
388                info!("Using region stat aware selector");
389                RegionSupervisorSelector::RegionStatAwareSelector(selector)
390            }
391            None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
392        };
393
394        let region_failover_handler = if options.enable_region_failover {
395            let region_supervisor = RegionSupervisor::new(
396                rx,
397                options.failure_detector,
398                selector_ctx.clone(),
399                supervisor_selector,
400                region_migration_manager.clone(),
401                runtime_switch_manager.clone(),
402                meta_peer_client.clone(),
403                leader_cached_kv_backend.clone(),
404            )
405            .with_state(state.clone());
406
407            Some(RegionFailureHandler::new(
408                region_supervisor,
409                HeartbeatAcceptor::new(tx),
410            ))
411        } else {
412            None
413        };
414
415        let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
416        let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
417
418        let ddl_context = DdlContext {
419            node_manager: node_manager.clone(),
420            cache_invalidator: cache_invalidator.clone(),
421            memory_region_keeper: memory_region_keeper.clone(),
422            leader_region_registry: leader_region_registry.clone(),
423            table_metadata_manager: table_metadata_manager.clone(),
424            table_metadata_allocator: table_metadata_allocator.clone(),
425            flow_metadata_manager: flow_metadata_manager.clone(),
426            flow_metadata_allocator: flow_metadata_allocator.clone(),
427            region_failure_detector_controller,
428        };
429        let procedure_manager_c = procedure_manager.clone();
430        let repartition_procedure_factory = Arc::new(DefaultRepartitionProcedureFactory::new(
431            mailbox.clone(),
432            options.grpc.server_addr.clone(),
433        ));
434        let ddl_manager = DdlManager::try_new(
435            ddl_context,
436            procedure_manager_c,
437            repartition_procedure_factory,
438            true,
439        )
440        .context(error::InitDdlManagerSnafu)?;
441
442        let ddl_manager = if let Some(configurator) = plugins
443            .as_ref()
444            .and_then(|p| p.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>())
445        {
446            let ctx = DdlManagerConfigureContext {
447                kv_backend: kv_backend.clone(),
448                meta_peer_client: meta_peer_client.clone(),
449            };
450            configurator
451                .configure(ddl_manager, ctx)
452                .await
453                .context(OtherSnafu)?
454        } else {
455            ddl_manager
456        };
457
458        let ddl_manager = Arc::new(ddl_manager);
459
460        let region_flush_ticker = if is_remote_wal {
461            let remote_wal_options = options.wal.remote_wal_options().unwrap();
462            let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
463                table_metadata_manager.clone(),
464                leader_region_registry.clone(),
465                topic_stats_registry.clone(),
466                mailbox.clone(),
467                options.grpc.server_addr.clone(),
468                remote_wal_options.flush_trigger_size,
469                remote_wal_options.checkpoint_trigger_size,
470                remote_wal_options.region_flush_trigger_interval,
471                remote_wal_options.periodic_checkpoint_persist_interval,
472            );
473            region_flush_trigger.try_start()?;
474
475            Some(Arc::new(region_flush_ticker))
476        } else {
477            None
478        };
479
480        // remote WAL prune ticker and manager
481        let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
482            let (tx, rx) = WalPruneManager::channel();
483            // Safety: Must be remote WAL.
484            let remote_wal_options = options.wal.remote_wal_options().unwrap();
485            let kafka_client = build_kafka_client(&remote_wal_options.connection)
486                .await
487                .context(error::BuildKafkaClientSnafu)?;
488            let wal_prune_context = WalPruneContext {
489                client: Arc::new(kafka_client),
490                table_metadata_manager: table_metadata_manager.clone(),
491                leader_region_registry: leader_region_registry.clone(),
492            };
493            let wal_prune_manager = WalPruneManager::new(
494                remote_wal_options.auto_prune_parallelism,
495                remote_wal_options.auto_prune_logical_delete,
496                rx,
497                procedure_manager.clone(),
498                wal_prune_context,
499            );
500            // Start manager in background. Ticker will be started in the main thread to send ticks.
501            wal_prune_manager.try_start().await?;
502            let wal_prune_ticker = Arc::new(WalPruneTicker::new(
503                remote_wal_options.auto_prune_interval,
504                tx.clone(),
505            ));
506            Some(wal_prune_ticker)
507        } else {
508            None
509        };
510
511        let gc_ticker = if options.gc.enable {
512            let (gc_scheduler, gc_ticker) = GcScheduler::new_with_config(
513                table_metadata_manager.clone(),
514                procedure_manager.clone(),
515                meta_peer_client.clone(),
516                mailbox.clone(),
517                options.grpc.server_addr.clone(),
518                options.gc.clone(),
519            )?;
520            gc_scheduler.try_start()?;
521
522            Some(Arc::new(gc_ticker))
523        } else {
524            None
525        };
526
527        let customized_region_lease_renewer = plugins
528            .as_ref()
529            .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
530
531        let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
532            let inserter = Box::new(InsertForwarder::new(
533                database_operator.clone(),
534                Some(InsertOptions {
535                    ttl: options.stats_persistence.ttl,
536                    append_mode: true,
537                    twcs_compaction_time_window: Some(
538                        REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
539                    ),
540                }),
541            ));
542
543            Some(PersistStatsHandler::new(
544                inserter,
545                options.stats_persistence.interval,
546            ))
547        } else {
548            None
549        };
550
551        let handler_group_builder = match handler_group_builder {
552            Some(handler_group_builder) => handler_group_builder,
553            None => {
554                let region_lease_handler = RegionLeaseHandler::new(
555                    default_distributed_time_constants().region_lease.as_secs(),
556                    table_metadata_manager.clone(),
557                    memory_region_keeper.clone(),
558                    customized_region_lease_renewer,
559                );
560
561                HeartbeatHandlerGroupBuilder::new(pushers)
562                    .with_plugins(plugins.clone())
563                    .with_region_failure_handler(region_failover_handler)
564                    .with_region_lease_handler(Some(region_lease_handler))
565                    .with_flush_stats_factor(Some(options.flush_stats_factor))
566                    .with_flow_state_handler(Some(flow_state_handler))
567                    .with_persist_stats_handler(persist_region_stats_handler)
568                    .add_default_handlers()
569            }
570        };
571
572        let enable_telemetry = options.enable_telemetry;
573        let metasrv_home = Path::new(&options.data_home)
574            .join(METASRV_DATA_DIR)
575            .to_string_lossy()
576            .to_string();
577
578        let reconciliation_manager = Arc::new(ReconciliationManager::new(
579            node_manager.clone(),
580            table_metadata_manager.clone(),
581            cache_invalidator.clone(),
582            procedure_manager.clone(),
583        ));
584        reconciliation_manager
585            .try_start()
586            .context(error::InitReconciliationManagerSnafu)?;
587
588        let mut resource_stat = ResourceStatImpl::default();
589        resource_stat.start_collect_cpu_usage();
590
591        Ok(Metasrv {
592            state,
593            started: Arc::new(AtomicBool::new(false)),
594            start_time_ms: common_time::util::current_time_millis() as u64,
595            options,
596            in_memory,
597            kv_backend,
598            leader_cached_kv_backend,
599            meta_peer_client: meta_peer_client.clone(),
600            selector,
601            selector_ctx,
602            // TODO(jeremy): We do not allow configuring the flow selector.
603            flow_selector,
604            handler_group: RwLock::new(None),
605            handler_group_builder: Mutex::new(Some(handler_group_builder)),
606            election,
607            procedure_manager,
608            mailbox,
609            ddl_manager,
610            wal_provider,
611            table_metadata_manager,
612            runtime_switch_manager,
613            greptimedb_telemetry_task: get_greptimedb_telemetry_task(
614                Some(metasrv_home),
615                meta_peer_client,
616                enable_telemetry,
617            )
618            .await,
619            plugins: plugins.unwrap_or_else(Plugins::default),
620            memory_region_keeper,
621            region_migration_manager,
622            region_supervisor_ticker,
623            cache_invalidator,
624            leader_region_registry,
625            wal_prune_ticker,
626            region_flush_ticker,
627            table_id_allocator,
628            reconciliation_manager,
629            topic_stats_registry,
630            resource_stat: Arc::new(resource_stat),
631            gc_ticker,
632            database_operator,
633        })
634    }
635}
636
637fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
638    let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
639        .initial(1)
640        .step(100)
641        .build();
642
643    HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
644}
645
646fn build_procedure_manager(
647    options: &MetasrvOptions,
648    kv_backend: &KvBackendRef,
649    runtime_switch_manager: &RuntimeSwitchManagerRef,
650    event_recorder: EventRecorderRef,
651) -> ProcedureManagerRef {
652    let manager_config = ManagerConfig {
653        max_retry_times: options.procedure.max_retry_times,
654        retry_delay: options.procedure.retry_delay,
655        max_running_procedures: options.procedure.max_running_procedures,
656        ..Default::default()
657    };
658    let kv_state_store = Arc::new(
659        KvStateStore::new(kv_backend.clone()).with_max_value_size(
660            options
661                .procedure
662                .max_metadata_value_size
663                .map(|v| v.as_bytes() as usize),
664        ),
665    );
666
667    Arc::new(LocalManager::new(
668        manager_config,
669        kv_state_store.clone(),
670        kv_state_store,
671        Some(runtime_switch_manager.clone()),
672        Some(event_recorder),
673    ))
674}
675
676impl Default for MetasrvBuilder {
677    fn default() -> Self {
678        Self::new()
679    }
680}
681
682/// The context for [`DdlManagerConfiguratorRef`].
683pub struct DdlManagerConfigureContext {
684    pub kv_backend: KvBackendRef,
685    pub meta_peer_client: MetaPeerClientRef,
686}