1use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{DEFAULT_COMPACTION_TIME_WINDOW, EventRecorderImpl, EventRecorderRef};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29 DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::{DdlManager, DdlManagerConfiguratorRef};
32use common_meta::distributed_time_constants::default_distributed_time_constants;
33use common_meta::key::TableMetadataManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::flow::flow_state::FlowStateManager;
36use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_provider::{build_kafka_client, build_wal_provider};
47use common_procedure::ProcedureManagerRef;
48use common_procedure::local::{LocalManager, ManagerConfig};
49use common_stat::ResourceStatImpl;
50use common_telemetry::{info, warn};
51use snafu::{ResultExt, ensure};
52use store_api::storage::MAX_REGION_SEQ;
53
54use crate::bootstrap::build_default_meta_peer_client;
55use crate::cache_invalidator::MetasrvCacheInvalidator;
56use crate::cluster::MetaPeerClientRef;
57use crate::error::{self, BuildWalProviderSnafu, OtherSnafu, Result};
58use crate::events::EventHandlerImpl;
59use crate::gc::GcScheduler;
60use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
61use crate::handler::failure_handler::RegionFailureHandler;
62use crate::handler::flow_state_handler::FlowStateHandler;
63use crate::handler::persist_stats_handler::PersistStatsHandler;
64use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
65use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
66use crate::metasrv::{
67 ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions,
68 RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ,
69};
70use crate::peer::MetasrvPeerAllocator;
71use crate::procedure::region_migration::DefaultContextFactory;
72use crate::procedure::region_migration::manager::RegionMigrationManager;
73use crate::procedure::repartition::DefaultRepartitionProcedureFactory;
74use crate::procedure::wal_prune::Context as WalPruneContext;
75use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
76use crate::region::flush_trigger::RegionFlushTrigger;
77use crate::region::supervisor::{
78 DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL, HeartbeatAcceptor,
79 RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
80 RegionSupervisorTicker,
81};
82use crate::selector::lease_based::LeaseBasedSelector;
83use crate::selector::round_robin::RoundRobinSelector;
84use crate::service::mailbox::MailboxRef;
85use crate::service::store::cached_kv::LeaderCachedKvBackend;
86use crate::state::State;
87use crate::utils::insert_forwarder::InsertForwarder;
88
89const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
91
92pub struct MetasrvBuilder {
94 options: Option<MetasrvOptions>,
95 kv_backend: Option<KvBackendRef>,
96 in_memory: Option<ResettableKvBackendRef>,
97 selector: Option<SelectorRef>,
98 handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
99 election: Option<ElectionRef>,
100 meta_peer_client: Option<MetaPeerClientRef>,
101 node_manager: Option<NodeManagerRef>,
102 plugins: Option<Plugins>,
103 table_metadata_allocator: Option<TableMetadataAllocatorRef>,
104}
105
106impl MetasrvBuilder {
107 pub fn new() -> Self {
108 Self {
109 kv_backend: None,
110 in_memory: None,
111 selector: None,
112 handler_group_builder: None,
113 meta_peer_client: None,
114 election: None,
115 options: None,
116 node_manager: None,
117 plugins: None,
118 table_metadata_allocator: None,
119 }
120 }
121
122 pub fn options(mut self, options: MetasrvOptions) -> Self {
123 self.options = Some(options);
124 self
125 }
126
127 pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
128 self.kv_backend = Some(kv_backend);
129 self
130 }
131
132 pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
133 self.in_memory = Some(in_memory);
134 self
135 }
136
137 pub fn selector(mut self, selector: SelectorRef) -> Self {
138 self.selector = Some(selector);
139 self
140 }
141
142 pub fn heartbeat_handler(
143 mut self,
144 handler_group_builder: HeartbeatHandlerGroupBuilder,
145 ) -> Self {
146 self.handler_group_builder = Some(handler_group_builder);
147 self
148 }
149
150 pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
151 self.meta_peer_client = Some(meta_peer_client);
152 self
153 }
154
155 pub fn election(mut self, election: Option<ElectionRef>) -> Self {
156 self.election = election;
157 self
158 }
159
160 pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
161 self.node_manager = Some(node_manager);
162 self
163 }
164
165 pub fn plugins(mut self, plugins: Plugins) -> Self {
166 self.plugins = Some(plugins);
167 self
168 }
169
170 pub fn table_metadata_allocator(
171 mut self,
172 table_metadata_allocator: TableMetadataAllocatorRef,
173 ) -> Self {
174 self.table_metadata_allocator = Some(table_metadata_allocator);
175 self
176 }
177
178 pub async fn build(self) -> Result<Metasrv> {
179 let MetasrvBuilder {
180 election,
181 meta_peer_client,
182 options,
183 kv_backend,
184 in_memory,
185 selector,
186 handler_group_builder,
187 node_manager,
188 plugins,
189 table_metadata_allocator,
190 } = self;
191
192 let options = options.unwrap_or_default();
193
194 let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
195 let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
196
197 let state = Arc::new(RwLock::new(match election {
198 None => State::leader(options.grpc.server_addr.clone(), true),
199 Some(_) => State::follower(options.grpc.server_addr.clone()),
200 }));
201
202 let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
203 state.clone(),
204 kv_backend.clone(),
205 ));
206
207 let meta_peer_client = meta_peer_client
208 .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
209
210 let event_inserter = Box::new(InsertForwarder::new(
211 meta_peer_client.clone(),
212 Some(InsertOptions {
213 ttl: options.event_recorder.ttl,
214 append_mode: true,
215 twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
216 }),
217 ));
218 let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
220 event_inserter,
221 ))));
222
223 let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
224 let pushers = Pushers::default();
225 let mailbox = build_mailbox(&kv_backend, &pushers);
226 let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
227 let procedure_manager = build_procedure_manager(
228 &options,
229 &kv_backend,
230 &runtime_switch_manager,
231 event_recorder,
232 );
233
234 let table_metadata_manager = Arc::new(TableMetadataManager::new(
235 leader_cached_kv_backend.clone() as _,
236 ));
237 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
238 leader_cached_kv_backend.clone() as _,
239 ));
240
241 let selector_ctx = SelectorContext {
242 peer_discovery: meta_peer_client.clone(),
243 };
244
245 let wal_provider = build_wal_provider(&options.wal, kv_backend.clone())
246 .await
247 .context(BuildWalProviderSnafu)?;
248 let wal_provider = Arc::new(wal_provider);
249 let is_remote_wal = wal_provider.is_remote_wal();
250 let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
251 let sequence = Arc::new(
252 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
253 .initial(MIN_USER_TABLE_ID as u64)
254 .step(10)
255 .build(),
256 );
257 let peer_allocator = Arc::new(
258 MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone())
259 .with_max_items(MAX_REGION_SEQ),
260 );
261 Arc::new(TableMetadataAllocator::with_peer_allocator(
262 sequence,
263 wal_provider.clone(),
264 peer_allocator,
265 ))
266 });
267 let table_id_allocator = table_metadata_allocator.table_id_allocator();
268
269 let flow_selector =
270 Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
271
272 let flow_metadata_allocator = {
273 let flow_selector_ctx = selector_ctx.clone();
275 let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
276 flow_selector_ctx,
277 flow_selector.clone(),
278 ));
279 let seq = Arc::new(
280 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
281 .initial(MIN_USER_FLOW_ID as u64)
282 .step(10)
283 .build(),
284 );
285
286 Arc::new(FlowMetadataAllocator::with_peer_allocator(
287 seq,
288 peer_allocator,
289 ))
290 };
291 let flow_state_handler =
292 FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
293
294 let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
295 let node_manager = node_manager.unwrap_or_else(|| {
296 let datanode_client_channel_config = ChannelConfig::new()
297 .timeout(Some(options.datanode.client.timeout))
298 .connect_timeout(options.datanode.client.connect_timeout)
299 .tcp_nodelay(options.datanode.client.tcp_nodelay);
300 Arc::new(NodeClients::new(datanode_client_channel_config))
301 });
302 let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
303 mailbox.clone(),
304 MetasrvInfo {
305 server_addr: options.grpc.server_addr.clone(),
306 },
307 ));
308
309 if !is_remote_wal && options.enable_region_failover {
310 ensure!(
311 options.allow_region_failover_on_local_wal,
312 error::UnexpectedSnafu {
313 violated: "Region failover is not supported in the local WAL implementation!
314 If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
315 }
316 );
317 if options.allow_region_failover_on_local_wal {
318 warn!(
319 "Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!"
320 );
321 }
322 }
323
324 let (tx, rx) = RegionSupervisor::channel();
325 let (region_failure_detector_controller, region_supervisor_ticker): (
326 RegionFailureDetectorControllerRef,
327 Option<std::sync::Arc<RegionSupervisorTicker>>,
328 ) = if options.enable_region_failover {
329 (
330 Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
331 Some(Arc::new(RegionSupervisorTicker::new(
332 DEFAULT_TICK_INTERVAL,
333 options.region_failure_detector_initialization_delay,
334 DEFAULT_INITIALIZATION_RETRY_PERIOD,
335 tx.clone(),
336 ))),
337 )
338 } else {
339 (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
340 };
341
342 let region_migration_manager = Arc::new(RegionMigrationManager::new(
344 procedure_manager.clone(),
345 DefaultContextFactory::new(
346 in_memory.clone(),
347 table_metadata_manager.clone(),
348 memory_region_keeper.clone(),
349 region_failure_detector_controller.clone(),
350 mailbox.clone(),
351 options.grpc.server_addr.clone(),
352 cache_invalidator.clone(),
353 ),
354 ));
355 region_migration_manager.try_start()?;
356 let region_supervisor_selector = plugins
357 .as_ref()
358 .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
359
360 let supervisor_selector = match region_supervisor_selector {
361 Some(selector) => {
362 info!("Using region stat aware selector");
363 RegionSupervisorSelector::RegionStatAwareSelector(selector)
364 }
365 None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
366 };
367
368 let region_failover_handler = if options.enable_region_failover {
369 let region_supervisor = RegionSupervisor::new(
370 rx,
371 options.failure_detector,
372 selector_ctx.clone(),
373 supervisor_selector,
374 region_migration_manager.clone(),
375 runtime_switch_manager.clone(),
376 meta_peer_client.clone(),
377 leader_cached_kv_backend.clone(),
378 )
379 .with_state(state.clone());
380
381 Some(RegionFailureHandler::new(
382 region_supervisor,
383 HeartbeatAcceptor::new(tx),
384 ))
385 } else {
386 None
387 };
388
389 let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
390 let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
391
392 let ddl_context = DdlContext {
393 node_manager: node_manager.clone(),
394 cache_invalidator: cache_invalidator.clone(),
395 memory_region_keeper: memory_region_keeper.clone(),
396 leader_region_registry: leader_region_registry.clone(),
397 table_metadata_manager: table_metadata_manager.clone(),
398 table_metadata_allocator: table_metadata_allocator.clone(),
399 flow_metadata_manager: flow_metadata_manager.clone(),
400 flow_metadata_allocator: flow_metadata_allocator.clone(),
401 region_failure_detector_controller,
402 };
403 let procedure_manager_c = procedure_manager.clone();
404 let repartition_procedure_factory = Arc::new(DefaultRepartitionProcedureFactory::new(
405 mailbox.clone(),
406 options.grpc.server_addr.clone(),
407 ));
408 let ddl_manager = DdlManager::try_new(
409 ddl_context,
410 procedure_manager_c,
411 repartition_procedure_factory,
412 true,
413 )
414 .context(error::InitDdlManagerSnafu)?;
415
416 let ddl_manager = if let Some(configurator) = plugins
417 .as_ref()
418 .and_then(|p| p.get::<DdlManagerConfiguratorRef<DdlManagerConfigureContext>>())
419 {
420 let ctx = DdlManagerConfigureContext {
421 kv_backend: kv_backend.clone(),
422 meta_peer_client: meta_peer_client.clone(),
423 };
424 configurator
425 .configure(ddl_manager, ctx)
426 .await
427 .context(OtherSnafu)?
428 } else {
429 ddl_manager
430 };
431
432 let ddl_manager = Arc::new(ddl_manager);
433
434 let region_flush_ticker = if is_remote_wal {
435 let remote_wal_options = options.wal.remote_wal_options().unwrap();
436 let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
437 table_metadata_manager.clone(),
438 leader_region_registry.clone(),
439 topic_stats_registry.clone(),
440 mailbox.clone(),
441 options.grpc.server_addr.clone(),
442 remote_wal_options.flush_trigger_size,
443 remote_wal_options.checkpoint_trigger_size,
444 );
445 region_flush_trigger.try_start()?;
446
447 Some(Arc::new(region_flush_ticker))
448 } else {
449 None
450 };
451
452 let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
454 let (tx, rx) = WalPruneManager::channel();
455 let remote_wal_options = options.wal.remote_wal_options().unwrap();
457 let kafka_client = build_kafka_client(&remote_wal_options.connection)
458 .await
459 .context(error::BuildKafkaClientSnafu)?;
460 let wal_prune_context = WalPruneContext {
461 client: Arc::new(kafka_client),
462 table_metadata_manager: table_metadata_manager.clone(),
463 leader_region_registry: leader_region_registry.clone(),
464 };
465 let wal_prune_manager = WalPruneManager::new(
466 remote_wal_options.auto_prune_parallelism,
467 rx,
468 procedure_manager.clone(),
469 wal_prune_context,
470 );
471 wal_prune_manager.try_start().await?;
473 let wal_prune_ticker = Arc::new(WalPruneTicker::new(
474 remote_wal_options.auto_prune_interval,
475 tx.clone(),
476 ));
477 Some(wal_prune_ticker)
478 } else {
479 None
480 };
481
482 let gc_ticker = if options.gc.enable {
483 let (gc_scheduler, gc_ticker) = GcScheduler::new_with_config(
484 table_metadata_manager.clone(),
485 procedure_manager.clone(),
486 meta_peer_client.clone(),
487 mailbox.clone(),
488 options.grpc.server_addr.clone(),
489 options.gc.clone(),
490 )?;
491 gc_scheduler.try_start()?;
492
493 Some(Arc::new(gc_ticker))
494 } else {
495 None
496 };
497
498 let customized_region_lease_renewer = plugins
499 .as_ref()
500 .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
501
502 let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
503 let inserter = Box::new(InsertForwarder::new(
504 meta_peer_client.clone(),
505 Some(InsertOptions {
506 ttl: options.stats_persistence.ttl,
507 append_mode: true,
508 twcs_compaction_time_window: Some(
509 REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
510 ),
511 }),
512 ));
513
514 Some(PersistStatsHandler::new(
515 inserter,
516 options.stats_persistence.interval,
517 ))
518 } else {
519 None
520 };
521
522 let handler_group_builder = match handler_group_builder {
523 Some(handler_group_builder) => handler_group_builder,
524 None => {
525 let region_lease_handler = RegionLeaseHandler::new(
526 default_distributed_time_constants().region_lease.as_secs(),
527 table_metadata_manager.clone(),
528 memory_region_keeper.clone(),
529 customized_region_lease_renewer,
530 );
531
532 HeartbeatHandlerGroupBuilder::new(pushers)
533 .with_plugins(plugins.clone())
534 .with_region_failure_handler(region_failover_handler)
535 .with_region_lease_handler(Some(region_lease_handler))
536 .with_flush_stats_factor(Some(options.flush_stats_factor))
537 .with_flow_state_handler(Some(flow_state_handler))
538 .with_persist_stats_handler(persist_region_stats_handler)
539 .add_default_handlers()
540 }
541 };
542
543 let enable_telemetry = options.enable_telemetry;
544 let metasrv_home = Path::new(&options.data_home)
545 .join(METASRV_DATA_DIR)
546 .to_string_lossy()
547 .to_string();
548
549 let reconciliation_manager = Arc::new(ReconciliationManager::new(
550 node_manager.clone(),
551 table_metadata_manager.clone(),
552 cache_invalidator.clone(),
553 procedure_manager.clone(),
554 ));
555 reconciliation_manager
556 .try_start()
557 .context(error::InitReconciliationManagerSnafu)?;
558
559 let mut resource_stat = ResourceStatImpl::default();
560 resource_stat.start_collect_cpu_usage();
561
562 Ok(Metasrv {
563 state,
564 started: Arc::new(AtomicBool::new(false)),
565 start_time_ms: common_time::util::current_time_millis() as u64,
566 options,
567 in_memory,
568 kv_backend,
569 leader_cached_kv_backend,
570 meta_peer_client: meta_peer_client.clone(),
571 selector,
572 selector_ctx,
573 flow_selector,
575 handler_group: RwLock::new(None),
576 handler_group_builder: Mutex::new(Some(handler_group_builder)),
577 election,
578 procedure_manager,
579 mailbox,
580 ddl_manager,
581 wal_provider,
582 table_metadata_manager,
583 runtime_switch_manager,
584 greptimedb_telemetry_task: get_greptimedb_telemetry_task(
585 Some(metasrv_home),
586 meta_peer_client,
587 enable_telemetry,
588 )
589 .await,
590 plugins: plugins.unwrap_or_else(Plugins::default),
591 memory_region_keeper,
592 region_migration_manager,
593 region_supervisor_ticker,
594 cache_invalidator,
595 leader_region_registry,
596 wal_prune_ticker,
597 region_flush_ticker,
598 table_id_allocator,
599 reconciliation_manager,
600 topic_stats_registry,
601 resource_stat: Arc::new(resource_stat),
602 gc_ticker,
603 })
604 }
605}
606
607fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
608 let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
609 .initial(1)
610 .step(100)
611 .build();
612
613 HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
614}
615
616fn build_procedure_manager(
617 options: &MetasrvOptions,
618 kv_backend: &KvBackendRef,
619 runtime_switch_manager: &RuntimeSwitchManagerRef,
620 event_recorder: EventRecorderRef,
621) -> ProcedureManagerRef {
622 let manager_config = ManagerConfig {
623 max_retry_times: options.procedure.max_retry_times,
624 retry_delay: options.procedure.retry_delay,
625 max_running_procedures: options.procedure.max_running_procedures,
626 ..Default::default()
627 };
628 let kv_state_store = Arc::new(
629 KvStateStore::new(kv_backend.clone()).with_max_value_size(
630 options
631 .procedure
632 .max_metadata_value_size
633 .map(|v| v.as_bytes() as usize),
634 ),
635 );
636
637 Arc::new(LocalManager::new(
638 manager_config,
639 kv_state_store.clone(),
640 kv_state_store,
641 Some(runtime_switch_manager.clone()),
642 Some(event_recorder),
643 ))
644}
645
646impl Default for MetasrvBuilder {
647 fn default() -> Self {
648 Self::new()
649 }
650}
651
652pub struct DdlManagerConfigureContext {
654 pub kv_backend: KvBackendRef,
655 pub meta_peer_client: MetaPeerClientRef,
656}