meta_srv/metasrv/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{DEFAULT_COMPACTION_TIME_WINDOW, EventRecorderImpl, EventRecorderRef};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29    DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::DdlManager;
32use common_meta::distributed_time_constants::{self};
33use common_meta::key::TableMetadataManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::flow::flow_state::FlowStateManager;
36use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
47use common_procedure::ProcedureManagerRef;
48use common_procedure::local::{LocalManager, ManagerConfig};
49use common_stat::ResourceStatImpl;
50use common_telemetry::{info, warn};
51use snafu::{ResultExt, ensure};
52use store_api::storage::MAX_REGION_SEQ;
53
54use crate::bootstrap::build_default_meta_peer_client;
55use crate::cache_invalidator::MetasrvCacheInvalidator;
56use crate::cluster::MetaPeerClientRef;
57use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
58use crate::events::EventHandlerImpl;
59use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
60use crate::handler::failure_handler::RegionFailureHandler;
61use crate::handler::flow_state_handler::FlowStateHandler;
62use crate::handler::persist_stats_handler::PersistStatsHandler;
63use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
64use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
65use crate::metasrv::{
66    ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions,
67    RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ,
68};
69use crate::peer::MetasrvPeerAllocator;
70use crate::procedure::region_migration::DefaultContextFactory;
71use crate::procedure::region_migration::manager::RegionMigrationManager;
72use crate::procedure::wal_prune::Context as WalPruneContext;
73use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
74use crate::region::flush_trigger::RegionFlushTrigger;
75use crate::region::supervisor::{
76    DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL, HeartbeatAcceptor,
77    RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
78    RegionSupervisorTicker,
79};
80use crate::selector::lease_based::LeaseBasedSelector;
81use crate::selector::round_robin::RoundRobinSelector;
82use crate::service::mailbox::MailboxRef;
83use crate::service::store::cached_kv::LeaderCachedKvBackend;
84use crate::state::State;
85use crate::utils::insert_forwarder::InsertForwarder;
86
87/// The time window for twcs compaction of the region stats table.
88const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
89
90// TODO(fys): try use derive_builder macro
91pub struct MetasrvBuilder {
92    options: Option<MetasrvOptions>,
93    kv_backend: Option<KvBackendRef>,
94    in_memory: Option<ResettableKvBackendRef>,
95    selector: Option<SelectorRef>,
96    handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
97    election: Option<ElectionRef>,
98    meta_peer_client: Option<MetaPeerClientRef>,
99    node_manager: Option<NodeManagerRef>,
100    plugins: Option<Plugins>,
101    table_metadata_allocator: Option<TableMetadataAllocatorRef>,
102}
103
104impl MetasrvBuilder {
105    pub fn new() -> Self {
106        Self {
107            kv_backend: None,
108            in_memory: None,
109            selector: None,
110            handler_group_builder: None,
111            meta_peer_client: None,
112            election: None,
113            options: None,
114            node_manager: None,
115            plugins: None,
116            table_metadata_allocator: None,
117        }
118    }
119
120    pub fn options(mut self, options: MetasrvOptions) -> Self {
121        self.options = Some(options);
122        self
123    }
124
125    pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
126        self.kv_backend = Some(kv_backend);
127        self
128    }
129
130    pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
131        self.in_memory = Some(in_memory);
132        self
133    }
134
135    pub fn selector(mut self, selector: SelectorRef) -> Self {
136        self.selector = Some(selector);
137        self
138    }
139
140    pub fn heartbeat_handler(
141        mut self,
142        handler_group_builder: HeartbeatHandlerGroupBuilder,
143    ) -> Self {
144        self.handler_group_builder = Some(handler_group_builder);
145        self
146    }
147
148    pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
149        self.meta_peer_client = Some(meta_peer_client);
150        self
151    }
152
153    pub fn election(mut self, election: Option<ElectionRef>) -> Self {
154        self.election = election;
155        self
156    }
157
158    pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
159        self.node_manager = Some(node_manager);
160        self
161    }
162
163    pub fn plugins(mut self, plugins: Plugins) -> Self {
164        self.plugins = Some(plugins);
165        self
166    }
167
168    pub fn table_metadata_allocator(
169        mut self,
170        table_metadata_allocator: TableMetadataAllocatorRef,
171    ) -> Self {
172        self.table_metadata_allocator = Some(table_metadata_allocator);
173        self
174    }
175
176    pub async fn build(self) -> Result<Metasrv> {
177        let MetasrvBuilder {
178            election,
179            meta_peer_client,
180            options,
181            kv_backend,
182            in_memory,
183            selector,
184            handler_group_builder,
185            node_manager,
186            plugins,
187            table_metadata_allocator,
188        } = self;
189
190        let options = options.unwrap_or_default();
191
192        let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
193        let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
194
195        let state = Arc::new(RwLock::new(match election {
196            None => State::leader(options.grpc.server_addr.clone(), true),
197            Some(_) => State::follower(options.grpc.server_addr.clone()),
198        }));
199
200        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
201            state.clone(),
202            kv_backend.clone(),
203        ));
204
205        let meta_peer_client = meta_peer_client
206            .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
207
208        let event_inserter = Box::new(InsertForwarder::new(
209            meta_peer_client.clone(),
210            Some(InsertOptions {
211                ttl: options.event_recorder.ttl,
212                append_mode: true,
213                twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
214            }),
215        ));
216        // Builds the event recorder to record important events and persist them as the system table.
217        let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
218            event_inserter,
219        ))));
220
221        let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
222        let pushers = Pushers::default();
223        let mailbox = build_mailbox(&kv_backend, &pushers);
224        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
225        let procedure_manager = build_procedure_manager(
226            &options,
227            &kv_backend,
228            &runtime_switch_manager,
229            event_recorder,
230        );
231
232        let table_metadata_manager = Arc::new(TableMetadataManager::new(
233            leader_cached_kv_backend.clone() as _,
234        ));
235        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
236            leader_cached_kv_backend.clone() as _,
237        ));
238
239        let selector_ctx = SelectorContext {
240            peer_discovery: meta_peer_client.clone(),
241        };
242
243        let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
244            .await
245            .context(BuildWalOptionsAllocatorSnafu)?;
246        let wal_options_allocator = Arc::new(wal_options_allocator);
247        let is_remote_wal = wal_options_allocator.is_remote_wal();
248        let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
249            let sequence = Arc::new(
250                SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
251                    .initial(MIN_USER_TABLE_ID as u64)
252                    .step(10)
253                    .build(),
254            );
255            let peer_allocator = Arc::new(
256                MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone())
257                    .with_max_items(MAX_REGION_SEQ),
258            );
259            Arc::new(TableMetadataAllocator::with_peer_allocator(
260                sequence,
261                wal_options_allocator.clone(),
262                peer_allocator,
263            ))
264        });
265        let table_id_sequence = table_metadata_allocator.table_id_sequence();
266
267        let flow_selector =
268            Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
269
270        let flow_metadata_allocator = {
271            // for now flownode just use round-robin selector
272            let flow_selector_ctx = selector_ctx.clone();
273            let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
274                flow_selector_ctx,
275                flow_selector.clone(),
276            ));
277            let seq = Arc::new(
278                SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
279                    .initial(MIN_USER_FLOW_ID as u64)
280                    .step(10)
281                    .build(),
282            );
283
284            Arc::new(FlowMetadataAllocator::with_peer_allocator(
285                seq,
286                peer_allocator,
287            ))
288        };
289        let flow_state_handler =
290            FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
291
292        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
293        let node_manager = node_manager.unwrap_or_else(|| {
294            let datanode_client_channel_config = ChannelConfig::new()
295                .timeout(options.datanode.client.timeout)
296                .connect_timeout(options.datanode.client.connect_timeout)
297                .tcp_nodelay(options.datanode.client.tcp_nodelay);
298            Arc::new(NodeClients::new(datanode_client_channel_config))
299        });
300        let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
301            mailbox.clone(),
302            MetasrvInfo {
303                server_addr: options.grpc.server_addr.clone(),
304            },
305        ));
306
307        if !is_remote_wal && options.enable_region_failover {
308            ensure!(
309                options.allow_region_failover_on_local_wal,
310                error::UnexpectedSnafu {
311                    violated: "Region failover is not supported in the local WAL implementation!
312                    If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
313                }
314            );
315            if options.allow_region_failover_on_local_wal {
316                warn!(
317                    "Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!"
318                );
319            }
320        }
321
322        let (tx, rx) = RegionSupervisor::channel();
323        let (region_failure_detector_controller, region_supervisor_ticker): (
324            RegionFailureDetectorControllerRef,
325            Option<std::sync::Arc<RegionSupervisorTicker>>,
326        ) = if options.enable_region_failover {
327            (
328                Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
329                Some(Arc::new(RegionSupervisorTicker::new(
330                    DEFAULT_TICK_INTERVAL,
331                    options.region_failure_detector_initialization_delay,
332                    DEFAULT_INITIALIZATION_RETRY_PERIOD,
333                    tx.clone(),
334                ))),
335            )
336        } else {
337            (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
338        };
339
340        // region migration manager
341        let region_migration_manager = Arc::new(RegionMigrationManager::new(
342            procedure_manager.clone(),
343            DefaultContextFactory::new(
344                in_memory.clone(),
345                table_metadata_manager.clone(),
346                memory_region_keeper.clone(),
347                region_failure_detector_controller.clone(),
348                mailbox.clone(),
349                options.grpc.server_addr.clone(),
350                cache_invalidator.clone(),
351            ),
352        ));
353        region_migration_manager.try_start()?;
354        let region_supervisor_selector = plugins
355            .as_ref()
356            .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
357
358        let supervisor_selector = match region_supervisor_selector {
359            Some(selector) => {
360                info!("Using region stat aware selector");
361                RegionSupervisorSelector::RegionStatAwareSelector(selector)
362            }
363            None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
364        };
365
366        let region_failover_handler = if options.enable_region_failover {
367            let region_supervisor = RegionSupervisor::new(
368                rx,
369                options.failure_detector,
370                selector_ctx.clone(),
371                supervisor_selector,
372                region_migration_manager.clone(),
373                runtime_switch_manager.clone(),
374                meta_peer_client.clone(),
375                leader_cached_kv_backend.clone(),
376            );
377
378            Some(RegionFailureHandler::new(
379                region_supervisor,
380                HeartbeatAcceptor::new(tx),
381            ))
382        } else {
383            None
384        };
385
386        let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
387        let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
388
389        let ddl_context = DdlContext {
390            node_manager: node_manager.clone(),
391            cache_invalidator: cache_invalidator.clone(),
392            memory_region_keeper: memory_region_keeper.clone(),
393            leader_region_registry: leader_region_registry.clone(),
394            table_metadata_manager: table_metadata_manager.clone(),
395            table_metadata_allocator: table_metadata_allocator.clone(),
396            flow_metadata_manager: flow_metadata_manager.clone(),
397            flow_metadata_allocator: flow_metadata_allocator.clone(),
398            region_failure_detector_controller,
399        };
400        let procedure_manager_c = procedure_manager.clone();
401        let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
402            .context(error::InitDdlManagerSnafu)?;
403        #[cfg(feature = "enterprise")]
404        let ddl_manager = {
405            let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| {
406                plugins.get::<common_meta::ddl_manager::TriggerDdlManagerRef>()
407            });
408            ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
409        };
410        let ddl_manager = Arc::new(ddl_manager);
411
412        let region_flush_ticker = if is_remote_wal {
413            let remote_wal_options = options.wal.remote_wal_options().unwrap();
414            let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
415                table_metadata_manager.clone(),
416                leader_region_registry.clone(),
417                topic_stats_registry.clone(),
418                mailbox.clone(),
419                options.grpc.server_addr.clone(),
420                remote_wal_options.flush_trigger_size,
421                remote_wal_options.checkpoint_trigger_size,
422            );
423            region_flush_trigger.try_start()?;
424
425            Some(Arc::new(region_flush_ticker))
426        } else {
427            None
428        };
429
430        // remote WAL prune ticker and manager
431        let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
432            let (tx, rx) = WalPruneManager::channel();
433            // Safety: Must be remote WAL.
434            let remote_wal_options = options.wal.remote_wal_options().unwrap();
435            let kafka_client = build_kafka_client(&remote_wal_options.connection)
436                .await
437                .context(error::BuildKafkaClientSnafu)?;
438            let wal_prune_context = WalPruneContext {
439                client: Arc::new(kafka_client),
440                table_metadata_manager: table_metadata_manager.clone(),
441                leader_region_registry: leader_region_registry.clone(),
442            };
443            let wal_prune_manager = WalPruneManager::new(
444                remote_wal_options.auto_prune_parallelism,
445                rx,
446                procedure_manager.clone(),
447                wal_prune_context,
448            );
449            // Start manager in background. Ticker will be started in the main thread to send ticks.
450            wal_prune_manager.try_start().await?;
451            let wal_prune_ticker = Arc::new(WalPruneTicker::new(
452                remote_wal_options.auto_prune_interval,
453                tx.clone(),
454            ));
455            Some(wal_prune_ticker)
456        } else {
457            None
458        };
459
460        let customized_region_lease_renewer = plugins
461            .as_ref()
462            .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
463
464        let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
465            let inserter = Box::new(InsertForwarder::new(
466                meta_peer_client.clone(),
467                Some(InsertOptions {
468                    ttl: options.stats_persistence.ttl,
469                    append_mode: true,
470                    twcs_compaction_time_window: Some(
471                        REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
472                    ),
473                }),
474            ));
475
476            Some(PersistStatsHandler::new(
477                inserter,
478                options.stats_persistence.interval,
479            ))
480        } else {
481            None
482        };
483
484        let handler_group_builder = match handler_group_builder {
485            Some(handler_group_builder) => handler_group_builder,
486            None => {
487                let region_lease_handler = RegionLeaseHandler::new(
488                    distributed_time_constants::REGION_LEASE_SECS,
489                    table_metadata_manager.clone(),
490                    memory_region_keeper.clone(),
491                    customized_region_lease_renewer,
492                );
493
494                HeartbeatHandlerGroupBuilder::new(pushers)
495                    .with_plugins(plugins.clone())
496                    .with_region_failure_handler(region_failover_handler)
497                    .with_region_lease_handler(Some(region_lease_handler))
498                    .with_flush_stats_factor(Some(options.flush_stats_factor))
499                    .with_flow_state_handler(Some(flow_state_handler))
500                    .with_persist_stats_handler(persist_region_stats_handler)
501                    .add_default_handlers()
502            }
503        };
504
505        let enable_telemetry = options.enable_telemetry;
506        let metasrv_home = Path::new(&options.data_home)
507            .join(METASRV_DATA_DIR)
508            .to_string_lossy()
509            .to_string();
510
511        let reconciliation_manager = Arc::new(ReconciliationManager::new(
512            node_manager.clone(),
513            table_metadata_manager.clone(),
514            cache_invalidator.clone(),
515            procedure_manager.clone(),
516        ));
517        reconciliation_manager
518            .try_start()
519            .context(error::InitReconciliationManagerSnafu)?;
520
521        let mut resource_stat = ResourceStatImpl::default();
522        resource_stat.start_collect_cpu_usage();
523
524        Ok(Metasrv {
525            state,
526            started: Arc::new(AtomicBool::new(false)),
527            start_time_ms: common_time::util::current_time_millis() as u64,
528            options,
529            in_memory,
530            kv_backend,
531            leader_cached_kv_backend,
532            meta_peer_client: meta_peer_client.clone(),
533            selector,
534            selector_ctx,
535            // TODO(jeremy): We do not allow configuring the flow selector.
536            flow_selector,
537            handler_group: RwLock::new(None),
538            handler_group_builder: Mutex::new(Some(handler_group_builder)),
539            election,
540            procedure_manager,
541            mailbox,
542            ddl_manager,
543            wal_options_allocator,
544            table_metadata_manager,
545            runtime_switch_manager,
546            greptimedb_telemetry_task: get_greptimedb_telemetry_task(
547                Some(metasrv_home),
548                meta_peer_client,
549                enable_telemetry,
550            )
551            .await,
552            plugins: plugins.unwrap_or_else(Plugins::default),
553            memory_region_keeper,
554            region_migration_manager,
555            region_supervisor_ticker,
556            cache_invalidator,
557            leader_region_registry,
558            wal_prune_ticker,
559            region_flush_ticker,
560            table_id_sequence,
561            reconciliation_manager,
562            topic_stats_registry,
563            resource_stat: Arc::new(resource_stat),
564        })
565    }
566}
567
568fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
569    let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
570        .initial(1)
571        .step(100)
572        .build();
573
574    HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
575}
576
577fn build_procedure_manager(
578    options: &MetasrvOptions,
579    kv_backend: &KvBackendRef,
580    runtime_switch_manager: &RuntimeSwitchManagerRef,
581    event_recorder: EventRecorderRef,
582) -> ProcedureManagerRef {
583    let manager_config = ManagerConfig {
584        max_retry_times: options.procedure.max_retry_times,
585        retry_delay: options.procedure.retry_delay,
586        max_running_procedures: options.procedure.max_running_procedures,
587        ..Default::default()
588    };
589    let kv_state_store = Arc::new(
590        KvStateStore::new(kv_backend.clone()).with_max_value_size(
591            options
592                .procedure
593                .max_metadata_value_size
594                .map(|v| v.as_bytes() as usize),
595        ),
596    );
597
598    Arc::new(LocalManager::new(
599        manager_config,
600        kv_state_store.clone(),
601        kv_state_store,
602        Some(runtime_switch_manager.clone()),
603        Some(event_recorder),
604    ))
605}
606
607impl Default for MetasrvBuilder {
608    fn default() -> Self {
609        Self::new()
610    }
611}