1use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{DEFAULT_COMPACTION_TIME_WINDOW, EventRecorderImpl, EventRecorderRef};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29 DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::DdlManager;
32use common_meta::distributed_time_constants::{self};
33use common_meta::key::TableMetadataManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::flow::flow_state::FlowStateManager;
36use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
47use common_procedure::ProcedureManagerRef;
48use common_procedure::local::{LocalManager, ManagerConfig};
49use common_telemetry::{info, warn};
50use snafu::{ResultExt, ensure};
51use store_api::storage::MAX_REGION_SEQ;
52
53use crate::bootstrap::build_default_meta_peer_client;
54use crate::cache_invalidator::MetasrvCacheInvalidator;
55use crate::cluster::MetaPeerClientRef;
56use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
57use crate::events::EventHandlerImpl;
58use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
59use crate::handler::failure_handler::RegionFailureHandler;
60use crate::handler::flow_state_handler::FlowStateHandler;
61use crate::handler::persist_stats_handler::PersistStatsHandler;
62use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
63use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
64use crate::metasrv::{
65 ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions,
66 RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ,
67};
68use crate::peer::MetasrvPeerAllocator;
69use crate::procedure::region_migration::DefaultContextFactory;
70use crate::procedure::region_migration::manager::RegionMigrationManager;
71use crate::procedure::wal_prune::Context as WalPruneContext;
72use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
73use crate::region::flush_trigger::RegionFlushTrigger;
74use crate::region::supervisor::{
75 DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL, HeartbeatAcceptor,
76 RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
77 RegionSupervisorTicker,
78};
79use crate::selector::lease_based::LeaseBasedSelector;
80use crate::selector::round_robin::RoundRobinSelector;
81use crate::service::mailbox::MailboxRef;
82use crate::service::store::cached_kv::LeaderCachedKvBackend;
83use crate::state::State;
84use crate::utils::insert_forwarder::InsertForwarder;
85
86const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
88
89pub struct MetasrvBuilder {
91 options: Option<MetasrvOptions>,
92 kv_backend: Option<KvBackendRef>,
93 in_memory: Option<ResettableKvBackendRef>,
94 selector: Option<SelectorRef>,
95 handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
96 election: Option<ElectionRef>,
97 meta_peer_client: Option<MetaPeerClientRef>,
98 node_manager: Option<NodeManagerRef>,
99 plugins: Option<Plugins>,
100 table_metadata_allocator: Option<TableMetadataAllocatorRef>,
101}
102
103impl MetasrvBuilder {
104 pub fn new() -> Self {
105 Self {
106 kv_backend: None,
107 in_memory: None,
108 selector: None,
109 handler_group_builder: None,
110 meta_peer_client: None,
111 election: None,
112 options: None,
113 node_manager: None,
114 plugins: None,
115 table_metadata_allocator: None,
116 }
117 }
118
119 pub fn options(mut self, options: MetasrvOptions) -> Self {
120 self.options = Some(options);
121 self
122 }
123
124 pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
125 self.kv_backend = Some(kv_backend);
126 self
127 }
128
129 pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
130 self.in_memory = Some(in_memory);
131 self
132 }
133
134 pub fn selector(mut self, selector: SelectorRef) -> Self {
135 self.selector = Some(selector);
136 self
137 }
138
139 pub fn heartbeat_handler(
140 mut self,
141 handler_group_builder: HeartbeatHandlerGroupBuilder,
142 ) -> Self {
143 self.handler_group_builder = Some(handler_group_builder);
144 self
145 }
146
147 pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
148 self.meta_peer_client = Some(meta_peer_client);
149 self
150 }
151
152 pub fn election(mut self, election: Option<ElectionRef>) -> Self {
153 self.election = election;
154 self
155 }
156
157 pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
158 self.node_manager = Some(node_manager);
159 self
160 }
161
162 pub fn plugins(mut self, plugins: Plugins) -> Self {
163 self.plugins = Some(plugins);
164 self
165 }
166
167 pub fn table_metadata_allocator(
168 mut self,
169 table_metadata_allocator: TableMetadataAllocatorRef,
170 ) -> Self {
171 self.table_metadata_allocator = Some(table_metadata_allocator);
172 self
173 }
174
175 pub async fn build(self) -> Result<Metasrv> {
176 let MetasrvBuilder {
177 election,
178 meta_peer_client,
179 options,
180 kv_backend,
181 in_memory,
182 selector,
183 handler_group_builder,
184 node_manager,
185 plugins,
186 table_metadata_allocator,
187 } = self;
188
189 let options = options.unwrap_or_default();
190
191 let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
192 let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
193
194 let state = Arc::new(RwLock::new(match election {
195 None => State::leader(options.grpc.server_addr.to_string(), true),
196 Some(_) => State::follower(options.grpc.server_addr.to_string()),
197 }));
198
199 let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
200 state.clone(),
201 kv_backend.clone(),
202 ));
203
204 let meta_peer_client = meta_peer_client
205 .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
206
207 let event_inserter = Box::new(InsertForwarder::new(
208 meta_peer_client.clone(),
209 Some(InsertOptions {
210 ttl: options.event_recorder.ttl,
211 append_mode: true,
212 twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
213 }),
214 ));
215 let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
217 event_inserter,
218 ))));
219
220 let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
221 let pushers = Pushers::default();
222 let mailbox = build_mailbox(&kv_backend, &pushers);
223 let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
224 let procedure_manager = build_procedure_manager(
225 &options,
226 &kv_backend,
227 &runtime_switch_manager,
228 event_recorder,
229 );
230
231 let table_metadata_manager = Arc::new(TableMetadataManager::new(
232 leader_cached_kv_backend.clone() as _,
233 ));
234 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
235 leader_cached_kv_backend.clone() as _,
236 ));
237
238 let selector_ctx = SelectorContext {
239 peer_discovery: meta_peer_client.clone(),
240 };
241
242 let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
243 .await
244 .context(BuildWalOptionsAllocatorSnafu)?;
245 let wal_options_allocator = Arc::new(wal_options_allocator);
246 let is_remote_wal = wal_options_allocator.is_remote_wal();
247 let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
248 let sequence = Arc::new(
249 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
250 .initial(MIN_USER_TABLE_ID as u64)
251 .step(10)
252 .build(),
253 );
254 let peer_allocator = Arc::new(
255 MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone())
256 .with_max_items(MAX_REGION_SEQ),
257 );
258 Arc::new(TableMetadataAllocator::with_peer_allocator(
259 sequence,
260 wal_options_allocator.clone(),
261 peer_allocator,
262 ))
263 });
264 let table_id_sequence = table_metadata_allocator.table_id_sequence();
265
266 let flow_selector =
267 Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
268
269 let flow_metadata_allocator = {
270 let flow_selector_ctx = selector_ctx.clone();
272 let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
273 flow_selector_ctx,
274 flow_selector.clone(),
275 ));
276 let seq = Arc::new(
277 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
278 .initial(MIN_USER_FLOW_ID as u64)
279 .step(10)
280 .build(),
281 );
282
283 Arc::new(FlowMetadataAllocator::with_peer_allocator(
284 seq,
285 peer_allocator,
286 ))
287 };
288 let flow_state_handler =
289 FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
290
291 let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
292 let node_manager = node_manager.unwrap_or_else(|| {
293 let datanode_client_channel_config = ChannelConfig::new()
294 .timeout(options.datanode.client.timeout)
295 .connect_timeout(options.datanode.client.connect_timeout)
296 .tcp_nodelay(options.datanode.client.tcp_nodelay);
297 Arc::new(NodeClients::new(datanode_client_channel_config))
298 });
299 let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
300 mailbox.clone(),
301 MetasrvInfo {
302 server_addr: options.grpc.server_addr.clone(),
303 },
304 ));
305
306 if !is_remote_wal && options.enable_region_failover {
307 ensure!(
308 options.allow_region_failover_on_local_wal,
309 error::UnexpectedSnafu {
310 violated: "Region failover is not supported in the local WAL implementation!
311 If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
312 }
313 );
314 if options.allow_region_failover_on_local_wal {
315 warn!(
316 "Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!"
317 );
318 }
319 }
320
321 let (tx, rx) = RegionSupervisor::channel();
322 let (region_failure_detector_controller, region_supervisor_ticker): (
323 RegionFailureDetectorControllerRef,
324 Option<std::sync::Arc<RegionSupervisorTicker>>,
325 ) = if options.enable_region_failover {
326 (
327 Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
328 Some(Arc::new(RegionSupervisorTicker::new(
329 DEFAULT_TICK_INTERVAL,
330 options.region_failure_detector_initialization_delay,
331 DEFAULT_INITIALIZATION_RETRY_PERIOD,
332 tx.clone(),
333 ))),
334 )
335 } else {
336 (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
337 };
338
339 let region_migration_manager = Arc::new(RegionMigrationManager::new(
341 procedure_manager.clone(),
342 DefaultContextFactory::new(
343 in_memory.clone(),
344 table_metadata_manager.clone(),
345 memory_region_keeper.clone(),
346 region_failure_detector_controller.clone(),
347 mailbox.clone(),
348 options.grpc.server_addr.clone(),
349 cache_invalidator.clone(),
350 ),
351 ));
352 region_migration_manager.try_start()?;
353 let region_supervisor_selector = plugins
354 .as_ref()
355 .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
356
357 let supervisor_selector = match region_supervisor_selector {
358 Some(selector) => {
359 info!("Using region stat aware selector");
360 RegionSupervisorSelector::RegionStatAwareSelector(selector)
361 }
362 None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
363 };
364
365 let region_failover_handler = if options.enable_region_failover {
366 let region_supervisor = RegionSupervisor::new(
367 rx,
368 options.failure_detector,
369 selector_ctx.clone(),
370 supervisor_selector,
371 region_migration_manager.clone(),
372 runtime_switch_manager.clone(),
373 meta_peer_client.clone(),
374 leader_cached_kv_backend.clone(),
375 );
376
377 Some(RegionFailureHandler::new(
378 region_supervisor,
379 HeartbeatAcceptor::new(tx),
380 ))
381 } else {
382 None
383 };
384
385 let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
386 let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
387
388 let ddl_context = DdlContext {
389 node_manager: node_manager.clone(),
390 cache_invalidator: cache_invalidator.clone(),
391 memory_region_keeper: memory_region_keeper.clone(),
392 leader_region_registry: leader_region_registry.clone(),
393 table_metadata_manager: table_metadata_manager.clone(),
394 table_metadata_allocator: table_metadata_allocator.clone(),
395 flow_metadata_manager: flow_metadata_manager.clone(),
396 flow_metadata_allocator: flow_metadata_allocator.clone(),
397 region_failure_detector_controller,
398 };
399 let procedure_manager_c = procedure_manager.clone();
400 let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
401 .context(error::InitDdlManagerSnafu)?;
402 #[cfg(feature = "enterprise")]
403 let ddl_manager = {
404 let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| {
405 plugins.get::<common_meta::ddl_manager::TriggerDdlManagerRef>()
406 });
407 ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
408 };
409 let ddl_manager = Arc::new(ddl_manager);
410
411 let region_flush_ticker = if is_remote_wal {
412 let remote_wal_options = options.wal.remote_wal_options().unwrap();
413 let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
414 table_metadata_manager.clone(),
415 leader_region_registry.clone(),
416 topic_stats_registry.clone(),
417 mailbox.clone(),
418 options.grpc.server_addr.clone(),
419 remote_wal_options.flush_trigger_size,
420 remote_wal_options.checkpoint_trigger_size,
421 );
422 region_flush_trigger.try_start()?;
423
424 Some(Arc::new(region_flush_ticker))
425 } else {
426 None
427 };
428
429 let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
431 let (tx, rx) = WalPruneManager::channel();
432 let remote_wal_options = options.wal.remote_wal_options().unwrap();
434 let kafka_client = build_kafka_client(&remote_wal_options.connection)
435 .await
436 .context(error::BuildKafkaClientSnafu)?;
437 let wal_prune_context = WalPruneContext {
438 client: Arc::new(kafka_client),
439 table_metadata_manager: table_metadata_manager.clone(),
440 leader_region_registry: leader_region_registry.clone(),
441 };
442 let wal_prune_manager = WalPruneManager::new(
443 remote_wal_options.auto_prune_parallelism,
444 rx,
445 procedure_manager.clone(),
446 wal_prune_context,
447 );
448 wal_prune_manager.try_start().await?;
450 let wal_prune_ticker = Arc::new(WalPruneTicker::new(
451 remote_wal_options.auto_prune_interval,
452 tx.clone(),
453 ));
454 Some(wal_prune_ticker)
455 } else {
456 None
457 };
458
459 let customized_region_lease_renewer = plugins
460 .as_ref()
461 .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
462
463 let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
464 let inserter = Box::new(InsertForwarder::new(
465 meta_peer_client.clone(),
466 Some(InsertOptions {
467 ttl: options.stats_persistence.ttl,
468 append_mode: true,
469 twcs_compaction_time_window: Some(
470 REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
471 ),
472 }),
473 ));
474
475 Some(PersistStatsHandler::new(
476 inserter,
477 options.stats_persistence.interval,
478 ))
479 } else {
480 None
481 };
482
483 let handler_group_builder = match handler_group_builder {
484 Some(handler_group_builder) => handler_group_builder,
485 None => {
486 let region_lease_handler = RegionLeaseHandler::new(
487 distributed_time_constants::REGION_LEASE_SECS,
488 table_metadata_manager.clone(),
489 memory_region_keeper.clone(),
490 customized_region_lease_renewer,
491 );
492
493 HeartbeatHandlerGroupBuilder::new(pushers)
494 .with_plugins(plugins.clone())
495 .with_region_failure_handler(region_failover_handler)
496 .with_region_lease_handler(Some(region_lease_handler))
497 .with_flush_stats_factor(Some(options.flush_stats_factor))
498 .with_flow_state_handler(Some(flow_state_handler))
499 .with_persist_stats_handler(persist_region_stats_handler)
500 .add_default_handlers()
501 }
502 };
503
504 let enable_telemetry = options.enable_telemetry;
505 let metasrv_home = Path::new(&options.data_home)
506 .join(METASRV_DATA_DIR)
507 .to_string_lossy()
508 .to_string();
509
510 let reconciliation_manager = Arc::new(ReconciliationManager::new(
511 node_manager.clone(),
512 table_metadata_manager.clone(),
513 cache_invalidator.clone(),
514 procedure_manager.clone(),
515 ));
516 reconciliation_manager
517 .try_start()
518 .context(error::InitReconciliationManagerSnafu)?;
519
520 Ok(Metasrv {
521 state,
522 started: Arc::new(AtomicBool::new(false)),
523 start_time_ms: common_time::util::current_time_millis() as u64,
524 options,
525 in_memory,
526 kv_backend,
527 leader_cached_kv_backend,
528 meta_peer_client: meta_peer_client.clone(),
529 selector,
530 selector_ctx,
531 flow_selector,
533 handler_group: RwLock::new(None),
534 handler_group_builder: Mutex::new(Some(handler_group_builder)),
535 election,
536 procedure_manager,
537 mailbox,
538 ddl_manager,
539 wal_options_allocator,
540 table_metadata_manager,
541 runtime_switch_manager,
542 greptimedb_telemetry_task: get_greptimedb_telemetry_task(
543 Some(metasrv_home),
544 meta_peer_client,
545 enable_telemetry,
546 )
547 .await,
548 plugins: plugins.unwrap_or_else(Plugins::default),
549 memory_region_keeper,
550 region_migration_manager,
551 region_supervisor_ticker,
552 cache_invalidator,
553 leader_region_registry,
554 wal_prune_ticker,
555 region_flush_ticker,
556 table_id_sequence,
557 reconciliation_manager,
558 topic_stats_registry,
559 resource_spec: Default::default(),
560 })
561 }
562}
563
564fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
565 let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
566 .initial(1)
567 .step(100)
568 .build();
569
570 HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
571}
572
573fn build_procedure_manager(
574 options: &MetasrvOptions,
575 kv_backend: &KvBackendRef,
576 runtime_switch_manager: &RuntimeSwitchManagerRef,
577 event_recorder: EventRecorderRef,
578) -> ProcedureManagerRef {
579 let manager_config = ManagerConfig {
580 max_retry_times: options.procedure.max_retry_times,
581 retry_delay: options.procedure.retry_delay,
582 max_running_procedures: options.procedure.max_running_procedures,
583 ..Default::default()
584 };
585 let kv_state_store = Arc::new(
586 KvStateStore::new(kv_backend.clone()).with_max_value_size(
587 options
588 .procedure
589 .max_metadata_value_size
590 .map(|v| v.as_bytes() as usize),
591 ),
592 );
593
594 Arc::new(LocalManager::new(
595 manager_config,
596 kv_state_store.clone(),
597 kv_state_store,
598 Some(runtime_switch_manager.clone()),
599 Some(event_recorder),
600 ))
601}
602
603impl Default for MetasrvBuilder {
604 fn default() -> Self {
605 Self::new()
606 }
607}