1use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{DEFAULT_COMPACTION_TIME_WINDOW, EventRecorderImpl, EventRecorderRef};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29 DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::DdlManager;
32use common_meta::distributed_time_constants::{self};
33use common_meta::key::TableMetadataManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::flow::flow_state::FlowStateManager;
36use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
47use common_procedure::ProcedureManagerRef;
48use common_procedure::local::{LocalManager, ManagerConfig};
49use common_stat::ResourceStatImpl;
50use common_telemetry::{info, warn};
51use snafu::{ResultExt, ensure};
52use store_api::storage::MAX_REGION_SEQ;
53
54use crate::bootstrap::build_default_meta_peer_client;
55use crate::cache_invalidator::MetasrvCacheInvalidator;
56use crate::cluster::MetaPeerClientRef;
57use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
58use crate::events::EventHandlerImpl;
59use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
60use crate::handler::failure_handler::RegionFailureHandler;
61use crate::handler::flow_state_handler::FlowStateHandler;
62use crate::handler::persist_stats_handler::PersistStatsHandler;
63use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
64use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
65use crate::metasrv::{
66 ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions,
67 RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ,
68};
69use crate::peer::MetasrvPeerAllocator;
70use crate::procedure::region_migration::DefaultContextFactory;
71use crate::procedure::region_migration::manager::RegionMigrationManager;
72use crate::procedure::wal_prune::Context as WalPruneContext;
73use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
74use crate::region::flush_trigger::RegionFlushTrigger;
75use crate::region::supervisor::{
76 DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL, HeartbeatAcceptor,
77 RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
78 RegionSupervisorTicker,
79};
80use crate::selector::lease_based::LeaseBasedSelector;
81use crate::selector::round_robin::RoundRobinSelector;
82use crate::service::mailbox::MailboxRef;
83use crate::service::store::cached_kv::LeaderCachedKvBackend;
84use crate::state::State;
85use crate::utils::insert_forwarder::InsertForwarder;
86
87const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
89
90pub struct MetasrvBuilder {
92 options: Option<MetasrvOptions>,
93 kv_backend: Option<KvBackendRef>,
94 in_memory: Option<ResettableKvBackendRef>,
95 selector: Option<SelectorRef>,
96 handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
97 election: Option<ElectionRef>,
98 meta_peer_client: Option<MetaPeerClientRef>,
99 node_manager: Option<NodeManagerRef>,
100 plugins: Option<Plugins>,
101 table_metadata_allocator: Option<TableMetadataAllocatorRef>,
102}
103
104impl MetasrvBuilder {
105 pub fn new() -> Self {
106 Self {
107 kv_backend: None,
108 in_memory: None,
109 selector: None,
110 handler_group_builder: None,
111 meta_peer_client: None,
112 election: None,
113 options: None,
114 node_manager: None,
115 plugins: None,
116 table_metadata_allocator: None,
117 }
118 }
119
120 pub fn options(mut self, options: MetasrvOptions) -> Self {
121 self.options = Some(options);
122 self
123 }
124
125 pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
126 self.kv_backend = Some(kv_backend);
127 self
128 }
129
130 pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
131 self.in_memory = Some(in_memory);
132 self
133 }
134
135 pub fn selector(mut self, selector: SelectorRef) -> Self {
136 self.selector = Some(selector);
137 self
138 }
139
140 pub fn heartbeat_handler(
141 mut self,
142 handler_group_builder: HeartbeatHandlerGroupBuilder,
143 ) -> Self {
144 self.handler_group_builder = Some(handler_group_builder);
145 self
146 }
147
148 pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
149 self.meta_peer_client = Some(meta_peer_client);
150 self
151 }
152
153 pub fn election(mut self, election: Option<ElectionRef>) -> Self {
154 self.election = election;
155 self
156 }
157
158 pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
159 self.node_manager = Some(node_manager);
160 self
161 }
162
163 pub fn plugins(mut self, plugins: Plugins) -> Self {
164 self.plugins = Some(plugins);
165 self
166 }
167
168 pub fn table_metadata_allocator(
169 mut self,
170 table_metadata_allocator: TableMetadataAllocatorRef,
171 ) -> Self {
172 self.table_metadata_allocator = Some(table_metadata_allocator);
173 self
174 }
175
176 pub async fn build(self) -> Result<Metasrv> {
177 let MetasrvBuilder {
178 election,
179 meta_peer_client,
180 options,
181 kv_backend,
182 in_memory,
183 selector,
184 handler_group_builder,
185 node_manager,
186 plugins,
187 table_metadata_allocator,
188 } = self;
189
190 let options = options.unwrap_or_default();
191
192 let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
193 let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
194
195 let state = Arc::new(RwLock::new(match election {
196 None => State::leader(options.grpc.server_addr.clone(), true),
197 Some(_) => State::follower(options.grpc.server_addr.clone()),
198 }));
199
200 let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
201 state.clone(),
202 kv_backend.clone(),
203 ));
204
205 let meta_peer_client = meta_peer_client
206 .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
207
208 let event_inserter = Box::new(InsertForwarder::new(
209 meta_peer_client.clone(),
210 Some(InsertOptions {
211 ttl: options.event_recorder.ttl,
212 append_mode: true,
213 twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
214 }),
215 ));
216 let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
218 event_inserter,
219 ))));
220
221 let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
222 let pushers = Pushers::default();
223 let mailbox = build_mailbox(&kv_backend, &pushers);
224 let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
225 let procedure_manager = build_procedure_manager(
226 &options,
227 &kv_backend,
228 &runtime_switch_manager,
229 event_recorder,
230 );
231
232 let table_metadata_manager = Arc::new(TableMetadataManager::new(
233 leader_cached_kv_backend.clone() as _,
234 ));
235 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
236 leader_cached_kv_backend.clone() as _,
237 ));
238
239 let selector_ctx = SelectorContext {
240 peer_discovery: meta_peer_client.clone(),
241 };
242
243 let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
244 .await
245 .context(BuildWalOptionsAllocatorSnafu)?;
246 let wal_options_allocator = Arc::new(wal_options_allocator);
247 let is_remote_wal = wal_options_allocator.is_remote_wal();
248 let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
249 let sequence = Arc::new(
250 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
251 .initial(MIN_USER_TABLE_ID as u64)
252 .step(10)
253 .build(),
254 );
255 let peer_allocator = Arc::new(
256 MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone())
257 .with_max_items(MAX_REGION_SEQ),
258 );
259 Arc::new(TableMetadataAllocator::with_peer_allocator(
260 sequence,
261 wal_options_allocator.clone(),
262 peer_allocator,
263 ))
264 });
265 let table_id_sequence = table_metadata_allocator.table_id_sequence();
266
267 let flow_selector =
268 Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
269
270 let flow_metadata_allocator = {
271 let flow_selector_ctx = selector_ctx.clone();
273 let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
274 flow_selector_ctx,
275 flow_selector.clone(),
276 ));
277 let seq = Arc::new(
278 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
279 .initial(MIN_USER_FLOW_ID as u64)
280 .step(10)
281 .build(),
282 );
283
284 Arc::new(FlowMetadataAllocator::with_peer_allocator(
285 seq,
286 peer_allocator,
287 ))
288 };
289 let flow_state_handler =
290 FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
291
292 let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
293 let node_manager = node_manager.unwrap_or_else(|| {
294 let datanode_client_channel_config = ChannelConfig::new()
295 .timeout(options.datanode.client.timeout)
296 .connect_timeout(options.datanode.client.connect_timeout)
297 .tcp_nodelay(options.datanode.client.tcp_nodelay);
298 Arc::new(NodeClients::new(datanode_client_channel_config))
299 });
300 let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
301 mailbox.clone(),
302 MetasrvInfo {
303 server_addr: options.grpc.server_addr.clone(),
304 },
305 ));
306
307 if !is_remote_wal && options.enable_region_failover {
308 ensure!(
309 options.allow_region_failover_on_local_wal,
310 error::UnexpectedSnafu {
311 violated: "Region failover is not supported in the local WAL implementation!
312 If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
313 }
314 );
315 if options.allow_region_failover_on_local_wal {
316 warn!(
317 "Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!"
318 );
319 }
320 }
321
322 let (tx, rx) = RegionSupervisor::channel();
323 let (region_failure_detector_controller, region_supervisor_ticker): (
324 RegionFailureDetectorControllerRef,
325 Option<std::sync::Arc<RegionSupervisorTicker>>,
326 ) = if options.enable_region_failover {
327 (
328 Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
329 Some(Arc::new(RegionSupervisorTicker::new(
330 DEFAULT_TICK_INTERVAL,
331 options.region_failure_detector_initialization_delay,
332 DEFAULT_INITIALIZATION_RETRY_PERIOD,
333 tx.clone(),
334 ))),
335 )
336 } else {
337 (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
338 };
339
340 let region_migration_manager = Arc::new(RegionMigrationManager::new(
342 procedure_manager.clone(),
343 DefaultContextFactory::new(
344 in_memory.clone(),
345 table_metadata_manager.clone(),
346 memory_region_keeper.clone(),
347 region_failure_detector_controller.clone(),
348 mailbox.clone(),
349 options.grpc.server_addr.clone(),
350 cache_invalidator.clone(),
351 ),
352 ));
353 region_migration_manager.try_start()?;
354 let region_supervisor_selector = plugins
355 .as_ref()
356 .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
357
358 let supervisor_selector = match region_supervisor_selector {
359 Some(selector) => {
360 info!("Using region stat aware selector");
361 RegionSupervisorSelector::RegionStatAwareSelector(selector)
362 }
363 None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
364 };
365
366 let region_failover_handler = if options.enable_region_failover {
367 let region_supervisor = RegionSupervisor::new(
368 rx,
369 options.failure_detector,
370 selector_ctx.clone(),
371 supervisor_selector,
372 region_migration_manager.clone(),
373 runtime_switch_manager.clone(),
374 meta_peer_client.clone(),
375 leader_cached_kv_backend.clone(),
376 );
377
378 Some(RegionFailureHandler::new(
379 region_supervisor,
380 HeartbeatAcceptor::new(tx),
381 ))
382 } else {
383 None
384 };
385
386 let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
387 let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
388
389 let ddl_context = DdlContext {
390 node_manager: node_manager.clone(),
391 cache_invalidator: cache_invalidator.clone(),
392 memory_region_keeper: memory_region_keeper.clone(),
393 leader_region_registry: leader_region_registry.clone(),
394 table_metadata_manager: table_metadata_manager.clone(),
395 table_metadata_allocator: table_metadata_allocator.clone(),
396 flow_metadata_manager: flow_metadata_manager.clone(),
397 flow_metadata_allocator: flow_metadata_allocator.clone(),
398 region_failure_detector_controller,
399 };
400 let procedure_manager_c = procedure_manager.clone();
401 let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
402 .context(error::InitDdlManagerSnafu)?;
403 #[cfg(feature = "enterprise")]
404 let ddl_manager = {
405 let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| {
406 plugins.get::<common_meta::ddl_manager::TriggerDdlManagerRef>()
407 });
408 ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
409 };
410 let ddl_manager = Arc::new(ddl_manager);
411
412 let region_flush_ticker = if is_remote_wal {
413 let remote_wal_options = options.wal.remote_wal_options().unwrap();
414 let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
415 table_metadata_manager.clone(),
416 leader_region_registry.clone(),
417 topic_stats_registry.clone(),
418 mailbox.clone(),
419 options.grpc.server_addr.clone(),
420 remote_wal_options.flush_trigger_size,
421 remote_wal_options.checkpoint_trigger_size,
422 );
423 region_flush_trigger.try_start()?;
424
425 Some(Arc::new(region_flush_ticker))
426 } else {
427 None
428 };
429
430 let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
432 let (tx, rx) = WalPruneManager::channel();
433 let remote_wal_options = options.wal.remote_wal_options().unwrap();
435 let kafka_client = build_kafka_client(&remote_wal_options.connection)
436 .await
437 .context(error::BuildKafkaClientSnafu)?;
438 let wal_prune_context = WalPruneContext {
439 client: Arc::new(kafka_client),
440 table_metadata_manager: table_metadata_manager.clone(),
441 leader_region_registry: leader_region_registry.clone(),
442 };
443 let wal_prune_manager = WalPruneManager::new(
444 remote_wal_options.auto_prune_parallelism,
445 rx,
446 procedure_manager.clone(),
447 wal_prune_context,
448 );
449 wal_prune_manager.try_start().await?;
451 let wal_prune_ticker = Arc::new(WalPruneTicker::new(
452 remote_wal_options.auto_prune_interval,
453 tx.clone(),
454 ));
455 Some(wal_prune_ticker)
456 } else {
457 None
458 };
459
460 let customized_region_lease_renewer = plugins
461 .as_ref()
462 .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
463
464 let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
465 let inserter = Box::new(InsertForwarder::new(
466 meta_peer_client.clone(),
467 Some(InsertOptions {
468 ttl: options.stats_persistence.ttl,
469 append_mode: true,
470 twcs_compaction_time_window: Some(
471 REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
472 ),
473 }),
474 ));
475
476 Some(PersistStatsHandler::new(
477 inserter,
478 options.stats_persistence.interval,
479 ))
480 } else {
481 None
482 };
483
484 let handler_group_builder = match handler_group_builder {
485 Some(handler_group_builder) => handler_group_builder,
486 None => {
487 let region_lease_handler = RegionLeaseHandler::new(
488 distributed_time_constants::REGION_LEASE_SECS,
489 table_metadata_manager.clone(),
490 memory_region_keeper.clone(),
491 customized_region_lease_renewer,
492 );
493
494 HeartbeatHandlerGroupBuilder::new(pushers)
495 .with_plugins(plugins.clone())
496 .with_region_failure_handler(region_failover_handler)
497 .with_region_lease_handler(Some(region_lease_handler))
498 .with_flush_stats_factor(Some(options.flush_stats_factor))
499 .with_flow_state_handler(Some(flow_state_handler))
500 .with_persist_stats_handler(persist_region_stats_handler)
501 .add_default_handlers()
502 }
503 };
504
505 let enable_telemetry = options.enable_telemetry;
506 let metasrv_home = Path::new(&options.data_home)
507 .join(METASRV_DATA_DIR)
508 .to_string_lossy()
509 .to_string();
510
511 let reconciliation_manager = Arc::new(ReconciliationManager::new(
512 node_manager.clone(),
513 table_metadata_manager.clone(),
514 cache_invalidator.clone(),
515 procedure_manager.clone(),
516 ));
517 reconciliation_manager
518 .try_start()
519 .context(error::InitReconciliationManagerSnafu)?;
520
521 let mut resource_stat = ResourceStatImpl::default();
522 resource_stat.start_collect_cpu_usage();
523
524 Ok(Metasrv {
525 state,
526 started: Arc::new(AtomicBool::new(false)),
527 start_time_ms: common_time::util::current_time_millis() as u64,
528 options,
529 in_memory,
530 kv_backend,
531 leader_cached_kv_backend,
532 meta_peer_client: meta_peer_client.clone(),
533 selector,
534 selector_ctx,
535 flow_selector,
537 handler_group: RwLock::new(None),
538 handler_group_builder: Mutex::new(Some(handler_group_builder)),
539 election,
540 procedure_manager,
541 mailbox,
542 ddl_manager,
543 wal_options_allocator,
544 table_metadata_manager,
545 runtime_switch_manager,
546 greptimedb_telemetry_task: get_greptimedb_telemetry_task(
547 Some(metasrv_home),
548 meta_peer_client,
549 enable_telemetry,
550 )
551 .await,
552 plugins: plugins.unwrap_or_else(Plugins::default),
553 memory_region_keeper,
554 region_migration_manager,
555 region_supervisor_ticker,
556 cache_invalidator,
557 leader_region_registry,
558 wal_prune_ticker,
559 region_flush_ticker,
560 table_id_sequence,
561 reconciliation_manager,
562 topic_stats_registry,
563 resource_stat: Arc::new(resource_stat),
564 })
565 }
566}
567
568fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
569 let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
570 .initial(1)
571 .step(100)
572 .build();
573
574 HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
575}
576
577fn build_procedure_manager(
578 options: &MetasrvOptions,
579 kv_backend: &KvBackendRef,
580 runtime_switch_manager: &RuntimeSwitchManagerRef,
581 event_recorder: EventRecorderRef,
582) -> ProcedureManagerRef {
583 let manager_config = ManagerConfig {
584 max_retry_times: options.procedure.max_retry_times,
585 retry_delay: options.procedure.retry_delay,
586 max_running_procedures: options.procedure.max_running_procedures,
587 ..Default::default()
588 };
589 let kv_state_store = Arc::new(
590 KvStateStore::new(kv_backend.clone()).with_max_value_size(
591 options
592 .procedure
593 .max_metadata_value_size
594 .map(|v| v.as_bytes() as usize),
595 ),
596 );
597
598 Arc::new(LocalManager::new(
599 manager_config,
600 kv_state_store.clone(),
601 kv_state_store,
602 Some(runtime_switch_manager.clone()),
603 Some(event_recorder),
604 ))
605}
606
607impl Default for MetasrvBuilder {
608 fn default() -> Self {
609 Self::new()
610 }
611}