1use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{EventRecorderImpl, EventRecorderRef, DEFAULT_COMPACTION_TIME_WINDOW};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29 DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::DdlManager;
32use common_meta::distributed_time_constants::{self};
33use common_meta::key::flow::flow_state::FlowStateManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
36use common_meta::key::TableMetadataManager;
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
47use common_procedure::local::{LocalManager, ManagerConfig};
48use common_procedure::ProcedureManagerRef;
49use common_telemetry::{info, warn};
50use snafu::{ensure, ResultExt};
51
52use crate::cache_invalidator::MetasrvCacheInvalidator;
53use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
54use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
55use crate::events::EventHandlerImpl;
56use crate::flow_meta_alloc::FlowPeerAllocator;
57use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
58use crate::handler::failure_handler::RegionFailureHandler;
59use crate::handler::flow_state_handler::FlowStateHandler;
60use crate::handler::persist_stats_handler::PersistStatsHandler;
61use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
62use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
63use crate::lease::MetaPeerLookupService;
64use crate::metasrv::{
65 ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget,
66 SelectorContext, SelectorRef, FLOW_ID_SEQ, METASRV_DATA_DIR, TABLE_ID_SEQ,
67};
68use crate::procedure::region_migration::manager::RegionMigrationManager;
69use crate::procedure::region_migration::DefaultContextFactory;
70use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
71use crate::procedure::wal_prune::Context as WalPruneContext;
72use crate::region::flush_trigger::RegionFlushTrigger;
73use crate::region::supervisor::{
74 HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
75 RegionSupervisorTicker, DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL,
76};
77use crate::selector::lease_based::LeaseBasedSelector;
78use crate::selector::round_robin::RoundRobinSelector;
79use crate::service::mailbox::MailboxRef;
80use crate::service::store::cached_kv::LeaderCachedKvBackend;
81use crate::state::State;
82use crate::table_meta_alloc::MetasrvPeerAllocator;
83use crate::utils::insert_forwarder::InsertForwarder;
84
85const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
87
88pub struct MetasrvBuilder {
90 options: Option<MetasrvOptions>,
91 kv_backend: Option<KvBackendRef>,
92 in_memory: Option<ResettableKvBackendRef>,
93 selector: Option<SelectorRef>,
94 handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
95 election: Option<ElectionRef>,
96 meta_peer_client: Option<MetaPeerClientRef>,
97 node_manager: Option<NodeManagerRef>,
98 plugins: Option<Plugins>,
99 table_metadata_allocator: Option<TableMetadataAllocatorRef>,
100}
101
102impl MetasrvBuilder {
103 pub fn new() -> Self {
104 Self {
105 kv_backend: None,
106 in_memory: None,
107 selector: None,
108 handler_group_builder: None,
109 meta_peer_client: None,
110 election: None,
111 options: None,
112 node_manager: None,
113 plugins: None,
114 table_metadata_allocator: None,
115 }
116 }
117
118 pub fn options(mut self, options: MetasrvOptions) -> Self {
119 self.options = Some(options);
120 self
121 }
122
123 pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
124 self.kv_backend = Some(kv_backend);
125 self
126 }
127
128 pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
129 self.in_memory = Some(in_memory);
130 self
131 }
132
133 pub fn selector(mut self, selector: SelectorRef) -> Self {
134 self.selector = Some(selector);
135 self
136 }
137
138 pub fn heartbeat_handler(
139 mut self,
140 handler_group_builder: HeartbeatHandlerGroupBuilder,
141 ) -> Self {
142 self.handler_group_builder = Some(handler_group_builder);
143 self
144 }
145
146 pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
147 self.meta_peer_client = Some(meta_peer_client);
148 self
149 }
150
151 pub fn election(mut self, election: Option<ElectionRef>) -> Self {
152 self.election = election;
153 self
154 }
155
156 pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
157 self.node_manager = Some(node_manager);
158 self
159 }
160
161 pub fn plugins(mut self, plugins: Plugins) -> Self {
162 self.plugins = Some(plugins);
163 self
164 }
165
166 pub fn table_metadata_allocator(
167 mut self,
168 table_metadata_allocator: TableMetadataAllocatorRef,
169 ) -> Self {
170 self.table_metadata_allocator = Some(table_metadata_allocator);
171 self
172 }
173
174 pub async fn build(self) -> Result<Metasrv> {
175 let MetasrvBuilder {
176 election,
177 meta_peer_client,
178 options,
179 kv_backend,
180 in_memory,
181 selector,
182 handler_group_builder,
183 node_manager,
184 plugins,
185 table_metadata_allocator,
186 } = self;
187
188 let options = options.unwrap_or_default();
189
190 let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
191 let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
192
193 let state = Arc::new(RwLock::new(match election {
194 None => State::leader(options.grpc.server_addr.to_string(), true),
195 Some(_) => State::follower(options.grpc.server_addr.to_string()),
196 }));
197
198 let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
199 state.clone(),
200 kv_backend.clone(),
201 ));
202
203 let meta_peer_client = meta_peer_client
204 .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
205 let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
206
207 let event_inserter = Box::new(InsertForwarder::new(
208 peer_lookup_service.clone(),
209 Some(InsertOptions {
210 ttl: options.event_recorder.ttl,
211 append_mode: true,
212 twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
213 }),
214 ));
215 let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
217 event_inserter,
218 ))));
219
220 let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
221 let pushers = Pushers::default();
222 let mailbox = build_mailbox(&kv_backend, &pushers);
223 let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
224 let procedure_manager = build_procedure_manager(
225 &options,
226 &kv_backend,
227 &runtime_switch_manager,
228 event_recorder,
229 );
230
231 let table_metadata_manager = Arc::new(TableMetadataManager::new(
232 leader_cached_kv_backend.clone() as _,
233 ));
234 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
235 leader_cached_kv_backend.clone() as _,
236 ));
237
238 let selector_ctx = SelectorContext {
239 server_addr: options.grpc.server_addr.clone(),
240 datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
241 flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
242 kv_backend: kv_backend.clone(),
243 meta_peer_client: meta_peer_client.clone(),
244 table_id: None,
245 };
246
247 let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
248 .await
249 .context(BuildWalOptionsAllocatorSnafu)?;
250 let wal_options_allocator = Arc::new(wal_options_allocator);
251 let is_remote_wal = wal_options_allocator.is_remote_wal();
252 let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
253 let sequence = Arc::new(
254 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
255 .initial(MIN_USER_TABLE_ID as u64)
256 .step(10)
257 .build(),
258 );
259 let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
260 selector_ctx.clone(),
261 selector.clone(),
262 ));
263 Arc::new(TableMetadataAllocator::with_peer_allocator(
264 sequence,
265 wal_options_allocator.clone(),
266 peer_allocator,
267 ))
268 });
269 let table_id_sequence = table_metadata_allocator.table_id_sequence();
270
271 let flow_selector = Arc::new(RoundRobinSelector::new(
272 SelectTarget::Flownode,
273 Arc::new(Vec::new()),
274 )) as SelectorRef;
275
276 let flow_metadata_allocator = {
277 let flow_selector_ctx = selector_ctx.clone();
279 let peer_allocator = Arc::new(FlowPeerAllocator::new(
280 flow_selector_ctx,
281 flow_selector.clone(),
282 ));
283 let seq = Arc::new(
284 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
285 .initial(MIN_USER_FLOW_ID as u64)
286 .step(10)
287 .build(),
288 );
289
290 Arc::new(FlowMetadataAllocator::with_peer_allocator(
291 seq,
292 peer_allocator,
293 ))
294 };
295 let flow_state_handler =
296 FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
297
298 let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
299 let node_manager = node_manager.unwrap_or_else(|| {
300 let datanode_client_channel_config = ChannelConfig::new()
301 .timeout(options.datanode.client.timeout)
302 .connect_timeout(options.datanode.client.connect_timeout)
303 .tcp_nodelay(options.datanode.client.tcp_nodelay);
304 Arc::new(NodeClients::new(datanode_client_channel_config))
305 });
306 let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
307 mailbox.clone(),
308 MetasrvInfo {
309 server_addr: options.grpc.server_addr.clone(),
310 },
311 ));
312
313 if !is_remote_wal && options.enable_region_failover {
314 ensure!(
315 options.allow_region_failover_on_local_wal,
316 error::UnexpectedSnafu {
317 violated: "Region failover is not supported in the local WAL implementation!
318 If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
319 }
320 );
321 if options.allow_region_failover_on_local_wal {
322 warn!("Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!");
323 }
324 }
325
326 let (tx, rx) = RegionSupervisor::channel();
327 let (region_failure_detector_controller, region_supervisor_ticker): (
328 RegionFailureDetectorControllerRef,
329 Option<std::sync::Arc<RegionSupervisorTicker>>,
330 ) = if options.enable_region_failover {
331 (
332 Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
333 Some(Arc::new(RegionSupervisorTicker::new(
334 DEFAULT_TICK_INTERVAL,
335 options.region_failure_detector_initialization_delay,
336 DEFAULT_INITIALIZATION_RETRY_PERIOD,
337 tx.clone(),
338 ))),
339 )
340 } else {
341 (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
342 };
343
344 let region_migration_manager = Arc::new(RegionMigrationManager::new(
346 procedure_manager.clone(),
347 DefaultContextFactory::new(
348 in_memory.clone(),
349 table_metadata_manager.clone(),
350 memory_region_keeper.clone(),
351 region_failure_detector_controller.clone(),
352 mailbox.clone(),
353 options.grpc.server_addr.clone(),
354 cache_invalidator.clone(),
355 ),
356 ));
357 region_migration_manager.try_start()?;
358 let region_supervisor_selector = plugins
359 .as_ref()
360 .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
361
362 let supervisor_selector = match region_supervisor_selector {
363 Some(selector) => {
364 info!("Using region stat aware selector");
365 RegionSupervisorSelector::RegionStatAwareSelector(selector)
366 }
367 None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
368 };
369
370 let region_failover_handler = if options.enable_region_failover {
371 let region_supervisor = RegionSupervisor::new(
372 rx,
373 options.failure_detector,
374 selector_ctx.clone(),
375 supervisor_selector,
376 region_migration_manager.clone(),
377 runtime_switch_manager.clone(),
378 peer_lookup_service.clone(),
379 leader_cached_kv_backend.clone(),
380 );
381
382 Some(RegionFailureHandler::new(
383 region_supervisor,
384 HeartbeatAcceptor::new(tx),
385 ))
386 } else {
387 None
388 };
389
390 let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
391 let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
392
393 let ddl_context = DdlContext {
394 node_manager: node_manager.clone(),
395 cache_invalidator: cache_invalidator.clone(),
396 memory_region_keeper: memory_region_keeper.clone(),
397 leader_region_registry: leader_region_registry.clone(),
398 table_metadata_manager: table_metadata_manager.clone(),
399 table_metadata_allocator: table_metadata_allocator.clone(),
400 flow_metadata_manager: flow_metadata_manager.clone(),
401 flow_metadata_allocator: flow_metadata_allocator.clone(),
402 region_failure_detector_controller,
403 };
404 let procedure_manager_c = procedure_manager.clone();
405 let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
406 .context(error::InitDdlManagerSnafu)?;
407 #[cfg(feature = "enterprise")]
408 let ddl_manager = {
409 let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| {
410 plugins.get::<common_meta::ddl_manager::TriggerDdlManagerRef>()
411 });
412 ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
413 };
414 let ddl_manager = Arc::new(ddl_manager);
415
416 let region_flush_ticker = if is_remote_wal {
417 let remote_wal_options = options.wal.remote_wal_options().unwrap();
418 let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
419 table_metadata_manager.clone(),
420 leader_region_registry.clone(),
421 topic_stats_registry.clone(),
422 mailbox.clone(),
423 options.grpc.server_addr.clone(),
424 remote_wal_options.flush_trigger_size,
425 remote_wal_options.checkpoint_trigger_size,
426 );
427 region_flush_trigger.try_start()?;
428
429 Some(Arc::new(region_flush_ticker))
430 } else {
431 None
432 };
433
434 let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
436 let (tx, rx) = WalPruneManager::channel();
437 let remote_wal_options = options.wal.remote_wal_options().unwrap();
439 let kafka_client = build_kafka_client(&remote_wal_options.connection)
440 .await
441 .context(error::BuildKafkaClientSnafu)?;
442 let wal_prune_context = WalPruneContext {
443 client: Arc::new(kafka_client),
444 table_metadata_manager: table_metadata_manager.clone(),
445 leader_region_registry: leader_region_registry.clone(),
446 };
447 let wal_prune_manager = WalPruneManager::new(
448 remote_wal_options.auto_prune_parallelism,
449 rx,
450 procedure_manager.clone(),
451 wal_prune_context,
452 );
453 wal_prune_manager.try_start().await?;
455 let wal_prune_ticker = Arc::new(WalPruneTicker::new(
456 remote_wal_options.auto_prune_interval,
457 tx.clone(),
458 ));
459 Some(wal_prune_ticker)
460 } else {
461 None
462 };
463
464 let customized_region_lease_renewer = plugins
465 .as_ref()
466 .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
467
468 let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
469 let inserter = Box::new(InsertForwarder::new(
470 peer_lookup_service.clone(),
471 Some(InsertOptions {
472 ttl: options.stats_persistence.ttl,
473 append_mode: true,
474 twcs_compaction_time_window: Some(
475 REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
476 ),
477 }),
478 ));
479
480 Some(PersistStatsHandler::new(
481 inserter,
482 options.stats_persistence.interval,
483 ))
484 } else {
485 None
486 };
487
488 let handler_group_builder = match handler_group_builder {
489 Some(handler_group_builder) => handler_group_builder,
490 None => {
491 let region_lease_handler = RegionLeaseHandler::new(
492 distributed_time_constants::REGION_LEASE_SECS,
493 table_metadata_manager.clone(),
494 memory_region_keeper.clone(),
495 customized_region_lease_renewer,
496 );
497
498 HeartbeatHandlerGroupBuilder::new(pushers)
499 .with_plugins(plugins.clone())
500 .with_region_failure_handler(region_failover_handler)
501 .with_region_lease_handler(Some(region_lease_handler))
502 .with_flush_stats_factor(Some(options.flush_stats_factor))
503 .with_flow_state_handler(Some(flow_state_handler))
504 .with_persist_stats_handler(persist_region_stats_handler)
505 .add_default_handlers()
506 }
507 };
508
509 let enable_telemetry = options.enable_telemetry;
510 let metasrv_home = Path::new(&options.data_home)
511 .join(METASRV_DATA_DIR)
512 .to_string_lossy()
513 .to_string();
514
515 let reconciliation_manager = Arc::new(ReconciliationManager::new(
516 node_manager.clone(),
517 table_metadata_manager.clone(),
518 cache_invalidator.clone(),
519 procedure_manager.clone(),
520 ));
521 reconciliation_manager
522 .try_start()
523 .context(error::InitReconciliationManagerSnafu)?;
524
525 Ok(Metasrv {
526 state,
527 started: Arc::new(AtomicBool::new(false)),
528 start_time_ms: common_time::util::current_time_millis() as u64,
529 options,
530 in_memory,
531 kv_backend,
532 leader_cached_kv_backend,
533 meta_peer_client: meta_peer_client.clone(),
534 selector,
535 selector_ctx,
536 flow_selector,
538 handler_group: RwLock::new(None),
539 handler_group_builder: Mutex::new(Some(handler_group_builder)),
540 election,
541 procedure_manager,
542 mailbox,
543 ddl_manager,
544 wal_options_allocator,
545 table_metadata_manager,
546 runtime_switch_manager,
547 greptimedb_telemetry_task: get_greptimedb_telemetry_task(
548 Some(metasrv_home),
549 meta_peer_client,
550 enable_telemetry,
551 )
552 .await,
553 plugins: plugins.unwrap_or_else(Plugins::default),
554 memory_region_keeper,
555 region_migration_manager,
556 region_supervisor_ticker,
557 cache_invalidator,
558 leader_region_registry,
559 wal_prune_ticker,
560 region_flush_ticker,
561 table_id_sequence,
562 reconciliation_manager,
563 topic_stats_registry,
564 })
565 }
566}
567
568fn build_default_meta_peer_client(
569 election: &Option<ElectionRef>,
570 in_memory: &ResettableKvBackendRef,
571) -> MetaPeerClientRef {
572 MetaPeerClientBuilder::default()
573 .election(election.clone())
574 .in_memory(in_memory.clone())
575 .build()
576 .map(Arc::new)
577 .unwrap()
579}
580
581fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
582 let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
583 .initial(1)
584 .step(100)
585 .build();
586
587 HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
588}
589
590fn build_procedure_manager(
591 options: &MetasrvOptions,
592 kv_backend: &KvBackendRef,
593 runtime_switch_manager: &RuntimeSwitchManagerRef,
594 event_recorder: EventRecorderRef,
595) -> ProcedureManagerRef {
596 let manager_config = ManagerConfig {
597 max_retry_times: options.procedure.max_retry_times,
598 retry_delay: options.procedure.retry_delay,
599 max_running_procedures: options.procedure.max_running_procedures,
600 ..Default::default()
601 };
602 let kv_state_store = Arc::new(
603 KvStateStore::new(kv_backend.clone()).with_max_value_size(
604 options
605 .procedure
606 .max_metadata_value_size
607 .map(|v| v.as_bytes() as usize),
608 ),
609 );
610
611 Arc::new(LocalManager::new(
612 manager_config,
613 kv_state_store.clone(),
614 kv_state_store,
615 Some(runtime_switch_manager.clone()),
616 Some(event_recorder),
617 ))
618}
619
620impl Default for MetasrvBuilder {
621 fn default() -> Self {
622 Self::new()
623 }
624}