1use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{DEFAULT_COMPACTION_TIME_WINDOW, EventRecorderImpl, EventRecorderRef};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29 DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
32use common_meta::distributed_time_constants::{self};
33use common_meta::key::TableMetadataManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::flow::flow_state::FlowStateManager;
36use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
47use common_procedure::ProcedureManagerRef;
48use common_procedure::local::{LocalManager, ManagerConfig};
49use common_stat::ResourceStatImpl;
50use common_telemetry::{info, warn};
51use snafu::{ResultExt, ensure};
52use store_api::storage::MAX_REGION_SEQ;
53
54use crate::bootstrap::build_default_meta_peer_client;
55use crate::cache_invalidator::MetasrvCacheInvalidator;
56use crate::cluster::MetaPeerClientRef;
57use crate::error::{self, BuildWalOptionsAllocatorSnafu, OtherSnafu, Result};
58use crate::events::EventHandlerImpl;
59use crate::gc::GcScheduler;
60use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
61use crate::handler::failure_handler::RegionFailureHandler;
62use crate::handler::flow_state_handler::FlowStateHandler;
63use crate::handler::persist_stats_handler::PersistStatsHandler;
64use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
65use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
66use crate::metasrv::{
67 ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions,
68 RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ,
69};
70use crate::peer::MetasrvPeerAllocator;
71use crate::procedure::region_migration::DefaultContextFactory;
72use crate::procedure::region_migration::manager::RegionMigrationManager;
73use crate::procedure::wal_prune::Context as WalPruneContext;
74use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
75use crate::region::flush_trigger::RegionFlushTrigger;
76use crate::region::supervisor::{
77 DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL, HeartbeatAcceptor,
78 RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
79 RegionSupervisorTicker,
80};
81use crate::selector::lease_based::LeaseBasedSelector;
82use crate::selector::round_robin::RoundRobinSelector;
83use crate::service::mailbox::MailboxRef;
84use crate::service::store::cached_kv::LeaderCachedKvBackend;
85use crate::state::State;
86use crate::utils::insert_forwarder::InsertForwarder;
87
88const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
90
91pub struct MetasrvBuilder {
93 options: Option<MetasrvOptions>,
94 kv_backend: Option<KvBackendRef>,
95 in_memory: Option<ResettableKvBackendRef>,
96 selector: Option<SelectorRef>,
97 handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
98 election: Option<ElectionRef>,
99 meta_peer_client: Option<MetaPeerClientRef>,
100 node_manager: Option<NodeManagerRef>,
101 plugins: Option<Plugins>,
102 table_metadata_allocator: Option<TableMetadataAllocatorRef>,
103}
104
105impl MetasrvBuilder {
106 pub fn new() -> Self {
107 Self {
108 kv_backend: None,
109 in_memory: None,
110 selector: None,
111 handler_group_builder: None,
112 meta_peer_client: None,
113 election: None,
114 options: None,
115 node_manager: None,
116 plugins: None,
117 table_metadata_allocator: None,
118 }
119 }
120
121 pub fn options(mut self, options: MetasrvOptions) -> Self {
122 self.options = Some(options);
123 self
124 }
125
126 pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
127 self.kv_backend = Some(kv_backend);
128 self
129 }
130
131 pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
132 self.in_memory = Some(in_memory);
133 self
134 }
135
136 pub fn selector(mut self, selector: SelectorRef) -> Self {
137 self.selector = Some(selector);
138 self
139 }
140
141 pub fn heartbeat_handler(
142 mut self,
143 handler_group_builder: HeartbeatHandlerGroupBuilder,
144 ) -> Self {
145 self.handler_group_builder = Some(handler_group_builder);
146 self
147 }
148
149 pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
150 self.meta_peer_client = Some(meta_peer_client);
151 self
152 }
153
154 pub fn election(mut self, election: Option<ElectionRef>) -> Self {
155 self.election = election;
156 self
157 }
158
159 pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
160 self.node_manager = Some(node_manager);
161 self
162 }
163
164 pub fn plugins(mut self, plugins: Plugins) -> Self {
165 self.plugins = Some(plugins);
166 self
167 }
168
169 pub fn table_metadata_allocator(
170 mut self,
171 table_metadata_allocator: TableMetadataAllocatorRef,
172 ) -> Self {
173 self.table_metadata_allocator = Some(table_metadata_allocator);
174 self
175 }
176
177 pub async fn build(self) -> Result<Metasrv> {
178 let MetasrvBuilder {
179 election,
180 meta_peer_client,
181 options,
182 kv_backend,
183 in_memory,
184 selector,
185 handler_group_builder,
186 node_manager,
187 plugins,
188 table_metadata_allocator,
189 } = self;
190
191 let options = options.unwrap_or_default();
192
193 let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
194 let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
195
196 let state = Arc::new(RwLock::new(match election {
197 None => State::leader(options.grpc.server_addr.clone(), true),
198 Some(_) => State::follower(options.grpc.server_addr.clone()),
199 }));
200
201 let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
202 state.clone(),
203 kv_backend.clone(),
204 ));
205
206 let meta_peer_client = meta_peer_client
207 .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
208
209 let event_inserter = Box::new(InsertForwarder::new(
210 meta_peer_client.clone(),
211 Some(InsertOptions {
212 ttl: options.event_recorder.ttl,
213 append_mode: true,
214 twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
215 }),
216 ));
217 let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
219 event_inserter,
220 ))));
221
222 let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
223 let pushers = Pushers::default();
224 let mailbox = build_mailbox(&kv_backend, &pushers);
225 let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
226 let procedure_manager = build_procedure_manager(
227 &options,
228 &kv_backend,
229 &runtime_switch_manager,
230 event_recorder,
231 );
232
233 let table_metadata_manager = Arc::new(TableMetadataManager::new(
234 leader_cached_kv_backend.clone() as _,
235 ));
236 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
237 leader_cached_kv_backend.clone() as _,
238 ));
239
240 let selector_ctx = SelectorContext {
241 peer_discovery: meta_peer_client.clone(),
242 };
243
244 let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
245 .await
246 .context(BuildWalOptionsAllocatorSnafu)?;
247 let wal_options_allocator = Arc::new(wal_options_allocator);
248 let is_remote_wal = wal_options_allocator.is_remote_wal();
249 let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
250 let sequence = Arc::new(
251 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
252 .initial(MIN_USER_TABLE_ID as u64)
253 .step(10)
254 .build(),
255 );
256 let peer_allocator = Arc::new(
257 MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone())
258 .with_max_items(MAX_REGION_SEQ),
259 );
260 Arc::new(TableMetadataAllocator::with_peer_allocator(
261 sequence,
262 wal_options_allocator.clone(),
263 peer_allocator,
264 ))
265 });
266 let table_id_sequence = table_metadata_allocator.table_id_sequence();
267
268 let flow_selector =
269 Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
270
271 let flow_metadata_allocator = {
272 let flow_selector_ctx = selector_ctx.clone();
274 let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
275 flow_selector_ctx,
276 flow_selector.clone(),
277 ));
278 let seq = Arc::new(
279 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
280 .initial(MIN_USER_FLOW_ID as u64)
281 .step(10)
282 .build(),
283 );
284
285 Arc::new(FlowMetadataAllocator::with_peer_allocator(
286 seq,
287 peer_allocator,
288 ))
289 };
290 let flow_state_handler =
291 FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
292
293 let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
294 let node_manager = node_manager.unwrap_or_else(|| {
295 let datanode_client_channel_config = ChannelConfig::new()
296 .timeout(options.datanode.client.timeout)
297 .connect_timeout(options.datanode.client.connect_timeout)
298 .tcp_nodelay(options.datanode.client.tcp_nodelay);
299 Arc::new(NodeClients::new(datanode_client_channel_config))
300 });
301 let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
302 mailbox.clone(),
303 MetasrvInfo {
304 server_addr: options.grpc.server_addr.clone(),
305 },
306 ));
307
308 if !is_remote_wal && options.enable_region_failover {
309 ensure!(
310 options.allow_region_failover_on_local_wal,
311 error::UnexpectedSnafu {
312 violated: "Region failover is not supported in the local WAL implementation!
313 If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
314 }
315 );
316 if options.allow_region_failover_on_local_wal {
317 warn!(
318 "Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!"
319 );
320 }
321 }
322
323 let (tx, rx) = RegionSupervisor::channel();
324 let (region_failure_detector_controller, region_supervisor_ticker): (
325 RegionFailureDetectorControllerRef,
326 Option<std::sync::Arc<RegionSupervisorTicker>>,
327 ) = if options.enable_region_failover {
328 (
329 Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
330 Some(Arc::new(RegionSupervisorTicker::new(
331 DEFAULT_TICK_INTERVAL,
332 options.region_failure_detector_initialization_delay,
333 DEFAULT_INITIALIZATION_RETRY_PERIOD,
334 tx.clone(),
335 ))),
336 )
337 } else {
338 (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
339 };
340
341 let region_migration_manager = Arc::new(RegionMigrationManager::new(
343 procedure_manager.clone(),
344 DefaultContextFactory::new(
345 in_memory.clone(),
346 table_metadata_manager.clone(),
347 memory_region_keeper.clone(),
348 region_failure_detector_controller.clone(),
349 mailbox.clone(),
350 options.grpc.server_addr.clone(),
351 cache_invalidator.clone(),
352 ),
353 ));
354 region_migration_manager.try_start()?;
355 let region_supervisor_selector = plugins
356 .as_ref()
357 .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
358
359 let supervisor_selector = match region_supervisor_selector {
360 Some(selector) => {
361 info!("Using region stat aware selector");
362 RegionSupervisorSelector::RegionStatAwareSelector(selector)
363 }
364 None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
365 };
366
367 let region_failover_handler = if options.enable_region_failover {
368 let region_supervisor = RegionSupervisor::new(
369 rx,
370 options.failure_detector,
371 selector_ctx.clone(),
372 supervisor_selector,
373 region_migration_manager.clone(),
374 runtime_switch_manager.clone(),
375 meta_peer_client.clone(),
376 leader_cached_kv_backend.clone(),
377 )
378 .with_state(state.clone());
379
380 Some(RegionFailureHandler::new(
381 region_supervisor,
382 HeartbeatAcceptor::new(tx),
383 ))
384 } else {
385 None
386 };
387
388 let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
389 let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
390
391 let ddl_context = DdlContext {
392 node_manager: node_manager.clone(),
393 cache_invalidator: cache_invalidator.clone(),
394 memory_region_keeper: memory_region_keeper.clone(),
395 leader_region_registry: leader_region_registry.clone(),
396 table_metadata_manager: table_metadata_manager.clone(),
397 table_metadata_allocator: table_metadata_allocator.clone(),
398 flow_metadata_manager: flow_metadata_manager.clone(),
399 flow_metadata_allocator: flow_metadata_allocator.clone(),
400 region_failure_detector_controller,
401 };
402 let procedure_manager_c = procedure_manager.clone();
403 let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
404 .context(error::InitDdlManagerSnafu)?;
405
406 let ddl_manager = if let Some(configurator) = plugins
407 .as_ref()
408 .and_then(|p| p.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>())
409 {
410 let ctx = DdlManagerConfigureContext {
411 kv_backend: kv_backend.clone(),
412 meta_peer_client: meta_peer_client.clone(),
413 };
414 configurator
415 .configure(ddl_manager, ctx)
416 .await
417 .context(OtherSnafu)?
418 } else {
419 ddl_manager
420 };
421
422 let ddl_manager = Arc::new(ddl_manager);
423
424 let region_flush_ticker = if is_remote_wal {
425 let remote_wal_options = options.wal.remote_wal_options().unwrap();
426 let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
427 table_metadata_manager.clone(),
428 leader_region_registry.clone(),
429 topic_stats_registry.clone(),
430 mailbox.clone(),
431 options.grpc.server_addr.clone(),
432 remote_wal_options.flush_trigger_size,
433 remote_wal_options.checkpoint_trigger_size,
434 );
435 region_flush_trigger.try_start()?;
436
437 Some(Arc::new(region_flush_ticker))
438 } else {
439 None
440 };
441
442 let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
444 let (tx, rx) = WalPruneManager::channel();
445 let remote_wal_options = options.wal.remote_wal_options().unwrap();
447 let kafka_client = build_kafka_client(&remote_wal_options.connection)
448 .await
449 .context(error::BuildKafkaClientSnafu)?;
450 let wal_prune_context = WalPruneContext {
451 client: Arc::new(kafka_client),
452 table_metadata_manager: table_metadata_manager.clone(),
453 leader_region_registry: leader_region_registry.clone(),
454 };
455 let wal_prune_manager = WalPruneManager::new(
456 remote_wal_options.auto_prune_parallelism,
457 rx,
458 procedure_manager.clone(),
459 wal_prune_context,
460 );
461 wal_prune_manager.try_start().await?;
463 let wal_prune_ticker = Arc::new(WalPruneTicker::new(
464 remote_wal_options.auto_prune_interval,
465 tx.clone(),
466 ));
467 Some(wal_prune_ticker)
468 } else {
469 None
470 };
471
472 let gc_ticker = if options.gc.enable {
473 let (gc_scheduler, gc_ticker) = GcScheduler::new_with_config(
474 table_metadata_manager.clone(),
475 procedure_manager.clone(),
476 meta_peer_client.clone(),
477 mailbox.clone(),
478 options.grpc.server_addr.clone(),
479 options.gc.clone(),
480 )?;
481 gc_scheduler.try_start()?;
482
483 Some(Arc::new(gc_ticker))
484 } else {
485 None
486 };
487
488 let customized_region_lease_renewer = plugins
489 .as_ref()
490 .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
491
492 let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
493 let inserter = Box::new(InsertForwarder::new(
494 meta_peer_client.clone(),
495 Some(InsertOptions {
496 ttl: options.stats_persistence.ttl,
497 append_mode: true,
498 twcs_compaction_time_window: Some(
499 REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
500 ),
501 }),
502 ));
503
504 Some(PersistStatsHandler::new(
505 inserter,
506 options.stats_persistence.interval,
507 ))
508 } else {
509 None
510 };
511
512 let handler_group_builder = match handler_group_builder {
513 Some(handler_group_builder) => handler_group_builder,
514 None => {
515 let region_lease_handler = RegionLeaseHandler::new(
516 distributed_time_constants::REGION_LEASE_SECS,
517 table_metadata_manager.clone(),
518 memory_region_keeper.clone(),
519 customized_region_lease_renewer,
520 );
521
522 HeartbeatHandlerGroupBuilder::new(pushers)
523 .with_plugins(plugins.clone())
524 .with_region_failure_handler(region_failover_handler)
525 .with_region_lease_handler(Some(region_lease_handler))
526 .with_flush_stats_factor(Some(options.flush_stats_factor))
527 .with_flow_state_handler(Some(flow_state_handler))
528 .with_persist_stats_handler(persist_region_stats_handler)
529 .add_default_handlers()
530 }
531 };
532
533 let enable_telemetry = options.enable_telemetry;
534 let metasrv_home = Path::new(&options.data_home)
535 .join(METASRV_DATA_DIR)
536 .to_string_lossy()
537 .to_string();
538
539 let reconciliation_manager = Arc::new(ReconciliationManager::new(
540 node_manager.clone(),
541 table_metadata_manager.clone(),
542 cache_invalidator.clone(),
543 procedure_manager.clone(),
544 ));
545 reconciliation_manager
546 .try_start()
547 .context(error::InitReconciliationManagerSnafu)?;
548
549 let mut resource_stat = ResourceStatImpl::default();
550 resource_stat.start_collect_cpu_usage();
551
552 Ok(Metasrv {
553 state,
554 started: Arc::new(AtomicBool::new(false)),
555 start_time_ms: common_time::util::current_time_millis() as u64,
556 options,
557 in_memory,
558 kv_backend,
559 leader_cached_kv_backend,
560 meta_peer_client: meta_peer_client.clone(),
561 selector,
562 selector_ctx,
563 flow_selector,
565 handler_group: RwLock::new(None),
566 handler_group_builder: Mutex::new(Some(handler_group_builder)),
567 election,
568 procedure_manager,
569 mailbox,
570 ddl_manager,
571 wal_options_allocator,
572 table_metadata_manager,
573 runtime_switch_manager,
574 greptimedb_telemetry_task: get_greptimedb_telemetry_task(
575 Some(metasrv_home),
576 meta_peer_client,
577 enable_telemetry,
578 )
579 .await,
580 plugins: plugins.unwrap_or_else(Plugins::default),
581 memory_region_keeper,
582 region_migration_manager,
583 region_supervisor_ticker,
584 cache_invalidator,
585 leader_region_registry,
586 wal_prune_ticker,
587 region_flush_ticker,
588 table_id_sequence,
589 reconciliation_manager,
590 topic_stats_registry,
591 resource_stat: Arc::new(resource_stat),
592 gc_ticker,
593 })
594 }
595}
596
597fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
598 let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
599 .initial(1)
600 .step(100)
601 .build();
602
603 HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
604}
605
606fn build_procedure_manager(
607 options: &MetasrvOptions,
608 kv_backend: &KvBackendRef,
609 runtime_switch_manager: &RuntimeSwitchManagerRef,
610 event_recorder: EventRecorderRef,
611) -> ProcedureManagerRef {
612 let manager_config = ManagerConfig {
613 max_retry_times: options.procedure.max_retry_times,
614 retry_delay: options.procedure.retry_delay,
615 max_running_procedures: options.procedure.max_running_procedures,
616 ..Default::default()
617 };
618 let kv_state_store = Arc::new(
619 KvStateStore::new(kv_backend.clone()).with_max_value_size(
620 options
621 .procedure
622 .max_metadata_value_size
623 .map(|v| v.as_bytes() as usize),
624 ),
625 );
626
627 Arc::new(LocalManager::new(
628 manager_config,
629 kv_state_store.clone(),
630 kv_state_store,
631 Some(runtime_switch_manager.clone()),
632 Some(event_recorder),
633 ))
634}
635
636impl Default for MetasrvBuilder {
637 fn default() -> Self {
638 Self::new()
639 }
640}
641
642pub struct DdlManagerConfigureContext {
644 pub kv_backend: KvBackendRef,
645 pub meta_peer_client: MetaPeerClientRef,
646}