1use std::sync::atomic::AtomicBool;
16use std::sync::{Arc, Mutex, RwLock};
17
18use client::client_manager::NodeClients;
19use common_base::Plugins;
20use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
21use common_grpc::channel_manager::ChannelConfig;
22use common_meta::ddl::flow_meta::FlowMetadataAllocator;
23use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
24use common_meta::ddl::{
25 DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
26};
27use common_meta::ddl_manager::DdlManager;
28use common_meta::distributed_time_constants;
29use common_meta::key::flow::flow_state::FlowStateManager;
30use common_meta::key::flow::FlowMetadataManager;
31use common_meta::key::maintenance::MaintenanceModeManager;
32use common_meta::key::TableMetadataManager;
33use common_meta::kv_backend::memory::MemoryKvBackend;
34use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
35use common_meta::node_manager::NodeManagerRef;
36use common_meta::region_keeper::MemoryRegionKeeper;
37use common_meta::region_registry::LeaderRegionRegistry;
38use common_meta::sequence::SequenceBuilder;
39use common_meta::state_store::KvStateStore;
40use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
41use common_procedure::local::{LocalManager, ManagerConfig};
42use common_procedure::ProcedureManagerRef;
43use common_telemetry::{info, warn};
44use snafu::{ensure, ResultExt};
45
46use crate::cache_invalidator::MetasrvCacheInvalidator;
47use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
48use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
49use crate::flow_meta_alloc::FlowPeerAllocator;
50use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
51use crate::handler::failure_handler::RegionFailureHandler;
52use crate::handler::flow_state_handler::FlowStateHandler;
53use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
54use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
55use crate::lease::MetaPeerLookupService;
56use crate::metasrv::{
57 ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget,
58 SelectorContext, SelectorRef, FLOW_ID_SEQ, TABLE_ID_SEQ,
59};
60use crate::procedure::region_migration::manager::RegionMigrationManager;
61use crate::procedure::region_migration::DefaultContextFactory;
62use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
63use crate::procedure::wal_prune::Context as WalPruneContext;
64use crate::region::supervisor::{
65 HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
66 RegionSupervisorTicker, DEFAULT_TICK_INTERVAL,
67};
68use crate::selector::lease_based::LeaseBasedSelector;
69use crate::selector::round_robin::RoundRobinSelector;
70use crate::service::mailbox::MailboxRef;
71use crate::service::store::cached_kv::LeaderCachedKvBackend;
72use crate::state::State;
73use crate::table_meta_alloc::MetasrvPeerAllocator;
74
75pub struct MetasrvBuilder {
77 options: Option<MetasrvOptions>,
78 kv_backend: Option<KvBackendRef>,
79 in_memory: Option<ResettableKvBackendRef>,
80 selector: Option<SelectorRef>,
81 handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
82 election: Option<ElectionRef>,
83 meta_peer_client: Option<MetaPeerClientRef>,
84 node_manager: Option<NodeManagerRef>,
85 plugins: Option<Plugins>,
86 table_metadata_allocator: Option<TableMetadataAllocatorRef>,
87}
88
89impl MetasrvBuilder {
90 pub fn new() -> Self {
91 Self {
92 kv_backend: None,
93 in_memory: None,
94 selector: None,
95 handler_group_builder: None,
96 meta_peer_client: None,
97 election: None,
98 options: None,
99 node_manager: None,
100 plugins: None,
101 table_metadata_allocator: None,
102 }
103 }
104
105 pub fn options(mut self, options: MetasrvOptions) -> Self {
106 self.options = Some(options);
107 self
108 }
109
110 pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
111 self.kv_backend = Some(kv_backend);
112 self
113 }
114
115 pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
116 self.in_memory = Some(in_memory);
117 self
118 }
119
120 pub fn selector(mut self, selector: SelectorRef) -> Self {
121 self.selector = Some(selector);
122 self
123 }
124
125 pub fn heartbeat_handler(
126 mut self,
127 handler_group_builder: HeartbeatHandlerGroupBuilder,
128 ) -> Self {
129 self.handler_group_builder = Some(handler_group_builder);
130 self
131 }
132
133 pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
134 self.meta_peer_client = Some(meta_peer_client);
135 self
136 }
137
138 pub fn election(mut self, election: Option<ElectionRef>) -> Self {
139 self.election = election;
140 self
141 }
142
143 pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
144 self.node_manager = Some(node_manager);
145 self
146 }
147
148 pub fn plugins(mut self, plugins: Plugins) -> Self {
149 self.plugins = Some(plugins);
150 self
151 }
152
153 pub fn table_metadata_allocator(
154 mut self,
155 table_metadata_allocator: TableMetadataAllocatorRef,
156 ) -> Self {
157 self.table_metadata_allocator = Some(table_metadata_allocator);
158 self
159 }
160
161 pub async fn build(self) -> Result<Metasrv> {
162 let started = Arc::new(AtomicBool::new(false));
163
164 let MetasrvBuilder {
165 election,
166 meta_peer_client,
167 options,
168 kv_backend,
169 in_memory,
170 selector,
171 handler_group_builder,
172 node_manager,
173 plugins,
174 table_metadata_allocator,
175 } = self;
176
177 let options = options.unwrap_or_default();
178
179 let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
180 let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
181
182 let state = Arc::new(RwLock::new(match election {
183 None => State::leader(options.server_addr.to_string(), true),
184 Some(_) => State::follower(options.server_addr.to_string()),
185 }));
186
187 let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
188 state.clone(),
189 kv_backend.clone(),
190 ));
191
192 let meta_peer_client = meta_peer_client
193 .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
194 let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
195 let pushers = Pushers::default();
196 let mailbox = build_mailbox(&kv_backend, &pushers);
197 let procedure_manager = build_procedure_manager(&options, &kv_backend);
198
199 let table_metadata_manager = Arc::new(TableMetadataManager::new(
200 leader_cached_kv_backend.clone() as _,
201 ));
202 let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
203 leader_cached_kv_backend.clone() as _,
204 ));
205 let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
206 let selector_ctx = SelectorContext {
207 server_addr: options.server_addr.clone(),
208 datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
209 flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
210 kv_backend: kv_backend.clone(),
211 meta_peer_client: meta_peer_client.clone(),
212 table_id: None,
213 };
214
215 let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
216 .await
217 .context(BuildWalOptionsAllocatorSnafu)?;
218 let wal_options_allocator = Arc::new(wal_options_allocator);
219 let is_remote_wal = wal_options_allocator.is_remote_wal();
220 let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
221 let sequence = Arc::new(
222 SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
223 .initial(MIN_USER_TABLE_ID as u64)
224 .step(10)
225 .build(),
226 );
227 let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
228 selector_ctx.clone(),
229 selector.clone(),
230 ));
231 Arc::new(TableMetadataAllocator::with_peer_allocator(
232 sequence,
233 wal_options_allocator.clone(),
234 peer_allocator,
235 ))
236 });
237
238 let flow_selector = Arc::new(RoundRobinSelector::new(
239 SelectTarget::Flownode,
240 Arc::new(Vec::new()),
241 )) as SelectorRef;
242
243 let flow_metadata_allocator = {
244 let flow_selector_ctx = selector_ctx.clone();
246 let peer_allocator = Arc::new(FlowPeerAllocator::new(
247 flow_selector_ctx,
248 flow_selector.clone(),
249 ));
250 let seq = Arc::new(
251 SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
252 .initial(MIN_USER_FLOW_ID as u64)
253 .step(10)
254 .build(),
255 );
256
257 Arc::new(FlowMetadataAllocator::with_peer_allocator(
258 seq,
259 peer_allocator,
260 ))
261 };
262 let flow_state_handler =
263 FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
264
265 let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
266 let node_manager = node_manager.unwrap_or_else(|| {
267 let datanode_client_channel_config = ChannelConfig::new()
268 .timeout(options.datanode.client.timeout)
269 .connect_timeout(options.datanode.client.connect_timeout)
270 .tcp_nodelay(options.datanode.client.tcp_nodelay);
271 Arc::new(NodeClients::new(datanode_client_channel_config))
272 });
273 let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
274 mailbox.clone(),
275 MetasrvInfo {
276 server_addr: options.server_addr.clone(),
277 },
278 ));
279 let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
280
281 if !is_remote_wal && options.enable_region_failover {
282 ensure!(
283 options.allow_region_failover_on_local_wal,
284 error::UnexpectedSnafu {
285 violated: "Region failover is not supported in the local WAL implementation!
286 If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
287 }
288 );
289 if options.allow_region_failover_on_local_wal {
290 warn!("Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!");
291 }
292 }
293
294 let (tx, rx) = RegionSupervisor::channel();
295 let (region_failure_detector_controller, region_supervisor_ticker): (
296 RegionFailureDetectorControllerRef,
297 Option<std::sync::Arc<RegionSupervisorTicker>>,
298 ) = if options.enable_region_failover {
299 (
300 Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
301 Some(Arc::new(RegionSupervisorTicker::new(
302 DEFAULT_TICK_INTERVAL,
303 tx.clone(),
304 ))),
305 )
306 } else {
307 (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
308 };
309
310 let region_migration_manager = Arc::new(RegionMigrationManager::new(
312 procedure_manager.clone(),
313 DefaultContextFactory::new(
314 in_memory.clone(),
315 table_metadata_manager.clone(),
316 memory_region_keeper.clone(),
317 region_failure_detector_controller.clone(),
318 mailbox.clone(),
319 options.server_addr.clone(),
320 cache_invalidator.clone(),
321 ),
322 ));
323 region_migration_manager.try_start()?;
324 let region_supervisor_selector = plugins
325 .as_ref()
326 .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
327
328 let supervisor_selector = match region_supervisor_selector {
329 Some(selector) => {
330 info!("Using region stat aware selector");
331 RegionSupervisorSelector::RegionStatAwareSelector(selector)
332 }
333 None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
334 };
335
336 let region_failover_handler = if options.enable_region_failover {
337 let region_supervisor = RegionSupervisor::new(
338 rx,
339 options.failure_detector,
340 selector_ctx.clone(),
341 supervisor_selector,
342 region_migration_manager.clone(),
343 maintenance_mode_manager.clone(),
344 peer_lookup_service.clone(),
345 );
346
347 Some(RegionFailureHandler::new(
348 region_supervisor,
349 HeartbeatAcceptor::new(tx),
350 ))
351 } else {
352 None
353 };
354
355 let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
356 let ddl_manager = Arc::new(
357 DdlManager::try_new(
358 DdlContext {
359 node_manager,
360 cache_invalidator: cache_invalidator.clone(),
361 memory_region_keeper: memory_region_keeper.clone(),
362 leader_region_registry: leader_region_registry.clone(),
363 table_metadata_manager: table_metadata_manager.clone(),
364 table_metadata_allocator: table_metadata_allocator.clone(),
365 flow_metadata_manager: flow_metadata_manager.clone(),
366 flow_metadata_allocator: flow_metadata_allocator.clone(),
367 region_failure_detector_controller,
368 },
369 procedure_manager.clone(),
370 true,
371 )
372 .context(error::InitDdlManagerSnafu)?,
373 );
374
375 let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
377 let (tx, rx) = WalPruneManager::channel();
378 let remote_wal_options = options.wal.remote_wal_options().unwrap();
380 let kafka_client = build_kafka_client(&remote_wal_options.connection)
381 .await
382 .context(error::BuildKafkaClientSnafu)?;
383 let wal_prune_context = WalPruneContext {
384 client: Arc::new(kafka_client),
385 table_metadata_manager: table_metadata_manager.clone(),
386 leader_region_registry: leader_region_registry.clone(),
387 server_addr: options.server_addr.clone(),
388 mailbox: mailbox.clone(),
389 };
390 let wal_prune_manager = WalPruneManager::new(
391 table_metadata_manager.clone(),
392 remote_wal_options.auto_prune_parallelism,
393 rx,
394 procedure_manager.clone(),
395 wal_prune_context,
396 remote_wal_options.trigger_flush_threshold,
397 );
398 wal_prune_manager.try_start().await?;
400 let wal_prune_ticker = Arc::new(WalPruneTicker::new(
401 remote_wal_options.auto_prune_interval,
402 tx.clone(),
403 ));
404 Some(wal_prune_ticker)
405 } else {
406 None
407 };
408
409 let customized_region_lease_renewer = plugins
410 .as_ref()
411 .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
412
413 let handler_group_builder = match handler_group_builder {
414 Some(handler_group_builder) => handler_group_builder,
415 None => {
416 let region_lease_handler = RegionLeaseHandler::new(
417 distributed_time_constants::REGION_LEASE_SECS,
418 table_metadata_manager.clone(),
419 memory_region_keeper.clone(),
420 customized_region_lease_renewer,
421 );
422
423 HeartbeatHandlerGroupBuilder::new(pushers)
424 .with_plugins(plugins.clone())
425 .with_region_failure_handler(region_failover_handler)
426 .with_region_lease_handler(Some(region_lease_handler))
427 .with_flush_stats_factor(Some(options.flush_stats_factor))
428 .with_flow_state_handler(Some(flow_state_handler))
429 .add_default_handlers()
430 }
431 };
432
433 let enable_telemetry = options.enable_telemetry;
434 let metasrv_home = options.data_home.to_string();
435
436 Ok(Metasrv {
437 state,
438 started,
439 start_time_ms: common_time::util::current_time_millis() as u64,
440 options,
441 in_memory,
442 kv_backend,
443 leader_cached_kv_backend,
444 meta_peer_client: meta_peer_client.clone(),
445 selector,
446 flow_selector,
448 handler_group: RwLock::new(None),
449 handler_group_builder: Mutex::new(Some(handler_group_builder)),
450 election,
451 procedure_manager,
452 mailbox,
453 procedure_executor: ddl_manager,
454 wal_options_allocator,
455 table_metadata_manager,
456 maintenance_mode_manager,
457 greptimedb_telemetry_task: get_greptimedb_telemetry_task(
458 Some(metasrv_home),
459 meta_peer_client,
460 enable_telemetry,
461 )
462 .await,
463 plugins: plugins.unwrap_or_else(Plugins::default),
464 memory_region_keeper,
465 region_migration_manager,
466 region_supervisor_ticker,
467 cache_invalidator,
468 leader_region_registry,
469 wal_prune_ticker,
470 })
471 }
472}
473
474fn build_default_meta_peer_client(
475 election: &Option<ElectionRef>,
476 in_memory: &ResettableKvBackendRef,
477) -> MetaPeerClientRef {
478 MetaPeerClientBuilder::default()
479 .election(election.clone())
480 .in_memory(in_memory.clone())
481 .build()
482 .map(Arc::new)
483 .unwrap()
485}
486
487fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
488 let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
489 .initial(1)
490 .step(100)
491 .build();
492
493 HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
494}
495
496fn build_procedure_manager(
497 options: &MetasrvOptions,
498 kv_backend: &KvBackendRef,
499) -> ProcedureManagerRef {
500 let manager_config = ManagerConfig {
501 max_retry_times: options.procedure.max_retry_times,
502 retry_delay: options.procedure.retry_delay,
503 max_running_procedures: options.procedure.max_running_procedures,
504 ..Default::default()
505 };
506 let kv_state_store = Arc::new(
507 KvStateStore::new(kv_backend.clone()).with_max_value_size(
508 options
509 .procedure
510 .max_metadata_value_size
511 .map(|v| v.as_bytes() as usize),
512 ),
513 );
514
515 Arc::new(LocalManager::new(
516 manager_config,
517 kv_state_store.clone(),
518 kv_state_store,
519 ))
520}
521
522impl Default for MetasrvBuilder {
523 fn default() -> Self {
524 Self::new()
525 }
526}