meta_srv/metasrv/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{EventRecorderImpl, EventRecorderRef, DEFAULT_COMPACTION_TIME_WINDOW};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29    DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::DdlManager;
32use common_meta::distributed_time_constants::{self};
33use common_meta::key::flow::flow_state::FlowStateManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
36use common_meta::key::TableMetadataManager;
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
47use common_procedure::local::{LocalManager, ManagerConfig};
48use common_procedure::ProcedureManagerRef;
49use common_telemetry::{info, warn};
50use snafu::{ensure, ResultExt};
51
52use crate::cache_invalidator::MetasrvCacheInvalidator;
53use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
54use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
55use crate::events::EventHandlerImpl;
56use crate::flow_meta_alloc::FlowPeerAllocator;
57use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
58use crate::handler::failure_handler::RegionFailureHandler;
59use crate::handler::flow_state_handler::FlowStateHandler;
60use crate::handler::persist_stats_handler::PersistStatsHandler;
61use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
62use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
63use crate::lease::MetaPeerLookupService;
64use crate::metasrv::{
65    ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget,
66    SelectorContext, SelectorRef, FLOW_ID_SEQ, METASRV_DATA_DIR, TABLE_ID_SEQ,
67};
68use crate::procedure::region_migration::manager::RegionMigrationManager;
69use crate::procedure::region_migration::DefaultContextFactory;
70use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
71use crate::procedure::wal_prune::Context as WalPruneContext;
72use crate::region::flush_trigger::RegionFlushTrigger;
73use crate::region::supervisor::{
74    HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
75    RegionSupervisorTicker, DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL,
76};
77use crate::selector::lease_based::LeaseBasedSelector;
78use crate::selector::round_robin::RoundRobinSelector;
79use crate::service::mailbox::MailboxRef;
80use crate::service::store::cached_kv::LeaderCachedKvBackend;
81use crate::state::State;
82use crate::table_meta_alloc::MetasrvPeerAllocator;
83use crate::utils::insert_forwarder::InsertForwarder;
84
85/// The time window for twcs compaction of the region stats table.
86const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
87
88// TODO(fys): try use derive_builder macro
89pub struct MetasrvBuilder {
90    options: Option<MetasrvOptions>,
91    kv_backend: Option<KvBackendRef>,
92    in_memory: Option<ResettableKvBackendRef>,
93    selector: Option<SelectorRef>,
94    handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
95    election: Option<ElectionRef>,
96    meta_peer_client: Option<MetaPeerClientRef>,
97    node_manager: Option<NodeManagerRef>,
98    plugins: Option<Plugins>,
99    table_metadata_allocator: Option<TableMetadataAllocatorRef>,
100}
101
102impl MetasrvBuilder {
103    pub fn new() -> Self {
104        Self {
105            kv_backend: None,
106            in_memory: None,
107            selector: None,
108            handler_group_builder: None,
109            meta_peer_client: None,
110            election: None,
111            options: None,
112            node_manager: None,
113            plugins: None,
114            table_metadata_allocator: None,
115        }
116    }
117
118    pub fn options(mut self, options: MetasrvOptions) -> Self {
119        self.options = Some(options);
120        self
121    }
122
123    pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
124        self.kv_backend = Some(kv_backend);
125        self
126    }
127
128    pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
129        self.in_memory = Some(in_memory);
130        self
131    }
132
133    pub fn selector(mut self, selector: SelectorRef) -> Self {
134        self.selector = Some(selector);
135        self
136    }
137
138    pub fn heartbeat_handler(
139        mut self,
140        handler_group_builder: HeartbeatHandlerGroupBuilder,
141    ) -> Self {
142        self.handler_group_builder = Some(handler_group_builder);
143        self
144    }
145
146    pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
147        self.meta_peer_client = Some(meta_peer_client);
148        self
149    }
150
151    pub fn election(mut self, election: Option<ElectionRef>) -> Self {
152        self.election = election;
153        self
154    }
155
156    pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
157        self.node_manager = Some(node_manager);
158        self
159    }
160
161    pub fn plugins(mut self, plugins: Plugins) -> Self {
162        self.plugins = Some(plugins);
163        self
164    }
165
166    pub fn table_metadata_allocator(
167        mut self,
168        table_metadata_allocator: TableMetadataAllocatorRef,
169    ) -> Self {
170        self.table_metadata_allocator = Some(table_metadata_allocator);
171        self
172    }
173
174    pub async fn build(self) -> Result<Metasrv> {
175        let MetasrvBuilder {
176            election,
177            meta_peer_client,
178            options,
179            kv_backend,
180            in_memory,
181            selector,
182            handler_group_builder,
183            node_manager,
184            plugins,
185            table_metadata_allocator,
186        } = self;
187
188        let options = options.unwrap_or_default();
189
190        let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
191        let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
192
193        let state = Arc::new(RwLock::new(match election {
194            None => State::leader(options.grpc.server_addr.to_string(), true),
195            Some(_) => State::follower(options.grpc.server_addr.to_string()),
196        }));
197
198        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
199            state.clone(),
200            kv_backend.clone(),
201        ));
202
203        let meta_peer_client = meta_peer_client
204            .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
205        let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
206
207        let event_inserter = Box::new(InsertForwarder::new(
208            peer_lookup_service.clone(),
209            Some(InsertOptions {
210                ttl: options.event_recorder.ttl,
211                append_mode: true,
212                twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
213            }),
214        ));
215        // Builds the event recorder to record important events and persist them as the system table.
216        let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
217            event_inserter,
218        ))));
219
220        let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
221        let pushers = Pushers::default();
222        let mailbox = build_mailbox(&kv_backend, &pushers);
223        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
224        let procedure_manager = build_procedure_manager(
225            &options,
226            &kv_backend,
227            &runtime_switch_manager,
228            event_recorder,
229        );
230
231        let table_metadata_manager = Arc::new(TableMetadataManager::new(
232            leader_cached_kv_backend.clone() as _,
233        ));
234        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
235            leader_cached_kv_backend.clone() as _,
236        ));
237
238        let selector_ctx = SelectorContext {
239            server_addr: options.grpc.server_addr.clone(),
240            datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
241            flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
242            kv_backend: kv_backend.clone(),
243            meta_peer_client: meta_peer_client.clone(),
244            table_id: None,
245        };
246
247        let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
248            .await
249            .context(BuildWalOptionsAllocatorSnafu)?;
250        let wal_options_allocator = Arc::new(wal_options_allocator);
251        let is_remote_wal = wal_options_allocator.is_remote_wal();
252        let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
253            let sequence = Arc::new(
254                SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
255                    .initial(MIN_USER_TABLE_ID as u64)
256                    .step(10)
257                    .build(),
258            );
259            let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
260                selector_ctx.clone(),
261                selector.clone(),
262            ));
263            Arc::new(TableMetadataAllocator::with_peer_allocator(
264                sequence,
265                wal_options_allocator.clone(),
266                peer_allocator,
267            ))
268        });
269        let table_id_sequence = table_metadata_allocator.table_id_sequence();
270
271        let flow_selector = Arc::new(RoundRobinSelector::new(
272            SelectTarget::Flownode,
273            Arc::new(Vec::new()),
274        )) as SelectorRef;
275
276        let flow_metadata_allocator = {
277            // for now flownode just use round-robin selector
278            let flow_selector_ctx = selector_ctx.clone();
279            let peer_allocator = Arc::new(FlowPeerAllocator::new(
280                flow_selector_ctx,
281                flow_selector.clone(),
282            ));
283            let seq = Arc::new(
284                SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
285                    .initial(MIN_USER_FLOW_ID as u64)
286                    .step(10)
287                    .build(),
288            );
289
290            Arc::new(FlowMetadataAllocator::with_peer_allocator(
291                seq,
292                peer_allocator,
293            ))
294        };
295        let flow_state_handler =
296            FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
297
298        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
299        let node_manager = node_manager.unwrap_or_else(|| {
300            let datanode_client_channel_config = ChannelConfig::new()
301                .timeout(options.datanode.client.timeout)
302                .connect_timeout(options.datanode.client.connect_timeout)
303                .tcp_nodelay(options.datanode.client.tcp_nodelay);
304            Arc::new(NodeClients::new(datanode_client_channel_config))
305        });
306        let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
307            mailbox.clone(),
308            MetasrvInfo {
309                server_addr: options.grpc.server_addr.clone(),
310            },
311        ));
312
313        if !is_remote_wal && options.enable_region_failover {
314            ensure!(
315                options.allow_region_failover_on_local_wal,
316                error::UnexpectedSnafu {
317                    violated: "Region failover is not supported in the local WAL implementation!
318                    If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
319                }
320            );
321            if options.allow_region_failover_on_local_wal {
322                warn!("Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!");
323            }
324        }
325
326        let (tx, rx) = RegionSupervisor::channel();
327        let (region_failure_detector_controller, region_supervisor_ticker): (
328            RegionFailureDetectorControllerRef,
329            Option<std::sync::Arc<RegionSupervisorTicker>>,
330        ) = if options.enable_region_failover {
331            (
332                Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
333                Some(Arc::new(RegionSupervisorTicker::new(
334                    DEFAULT_TICK_INTERVAL,
335                    options.region_failure_detector_initialization_delay,
336                    DEFAULT_INITIALIZATION_RETRY_PERIOD,
337                    tx.clone(),
338                ))),
339            )
340        } else {
341            (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
342        };
343
344        // region migration manager
345        let region_migration_manager = Arc::new(RegionMigrationManager::new(
346            procedure_manager.clone(),
347            DefaultContextFactory::new(
348                in_memory.clone(),
349                table_metadata_manager.clone(),
350                memory_region_keeper.clone(),
351                region_failure_detector_controller.clone(),
352                mailbox.clone(),
353                options.grpc.server_addr.clone(),
354                cache_invalidator.clone(),
355            ),
356        ));
357        region_migration_manager.try_start()?;
358        let region_supervisor_selector = plugins
359            .as_ref()
360            .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
361
362        let supervisor_selector = match region_supervisor_selector {
363            Some(selector) => {
364                info!("Using region stat aware selector");
365                RegionSupervisorSelector::RegionStatAwareSelector(selector)
366            }
367            None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
368        };
369
370        let region_failover_handler = if options.enable_region_failover {
371            let region_supervisor = RegionSupervisor::new(
372                rx,
373                options.failure_detector,
374                selector_ctx.clone(),
375                supervisor_selector,
376                region_migration_manager.clone(),
377                runtime_switch_manager.clone(),
378                peer_lookup_service.clone(),
379                leader_cached_kv_backend.clone(),
380            );
381
382            Some(RegionFailureHandler::new(
383                region_supervisor,
384                HeartbeatAcceptor::new(tx),
385            ))
386        } else {
387            None
388        };
389
390        let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
391        let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
392
393        let ddl_context = DdlContext {
394            node_manager: node_manager.clone(),
395            cache_invalidator: cache_invalidator.clone(),
396            memory_region_keeper: memory_region_keeper.clone(),
397            leader_region_registry: leader_region_registry.clone(),
398            table_metadata_manager: table_metadata_manager.clone(),
399            table_metadata_allocator: table_metadata_allocator.clone(),
400            flow_metadata_manager: flow_metadata_manager.clone(),
401            flow_metadata_allocator: flow_metadata_allocator.clone(),
402            region_failure_detector_controller,
403        };
404        let procedure_manager_c = procedure_manager.clone();
405        let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
406            .context(error::InitDdlManagerSnafu)?;
407        #[cfg(feature = "enterprise")]
408        let ddl_manager = {
409            let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| {
410                plugins.get::<common_meta::ddl_manager::TriggerDdlManagerRef>()
411            });
412            ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
413        };
414        let ddl_manager = Arc::new(ddl_manager);
415
416        let region_flush_ticker = if is_remote_wal {
417            let remote_wal_options = options.wal.remote_wal_options().unwrap();
418            let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
419                table_metadata_manager.clone(),
420                leader_region_registry.clone(),
421                topic_stats_registry.clone(),
422                mailbox.clone(),
423                options.grpc.server_addr.clone(),
424                remote_wal_options.flush_trigger_size,
425                remote_wal_options.checkpoint_trigger_size,
426            );
427            region_flush_trigger.try_start()?;
428
429            Some(Arc::new(region_flush_ticker))
430        } else {
431            None
432        };
433
434        // remote WAL prune ticker and manager
435        let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
436            let (tx, rx) = WalPruneManager::channel();
437            // Safety: Must be remote WAL.
438            let remote_wal_options = options.wal.remote_wal_options().unwrap();
439            let kafka_client = build_kafka_client(&remote_wal_options.connection)
440                .await
441                .context(error::BuildKafkaClientSnafu)?;
442            let wal_prune_context = WalPruneContext {
443                client: Arc::new(kafka_client),
444                table_metadata_manager: table_metadata_manager.clone(),
445                leader_region_registry: leader_region_registry.clone(),
446            };
447            let wal_prune_manager = WalPruneManager::new(
448                remote_wal_options.auto_prune_parallelism,
449                rx,
450                procedure_manager.clone(),
451                wal_prune_context,
452            );
453            // Start manager in background. Ticker will be started in the main thread to send ticks.
454            wal_prune_manager.try_start().await?;
455            let wal_prune_ticker = Arc::new(WalPruneTicker::new(
456                remote_wal_options.auto_prune_interval,
457                tx.clone(),
458            ));
459            Some(wal_prune_ticker)
460        } else {
461            None
462        };
463
464        let customized_region_lease_renewer = plugins
465            .as_ref()
466            .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
467
468        let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
469            let inserter = Box::new(InsertForwarder::new(
470                peer_lookup_service.clone(),
471                Some(InsertOptions {
472                    ttl: options.stats_persistence.ttl,
473                    append_mode: true,
474                    twcs_compaction_time_window: Some(
475                        REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
476                    ),
477                }),
478            ));
479
480            Some(PersistStatsHandler::new(
481                inserter,
482                options.stats_persistence.interval,
483            ))
484        } else {
485            None
486        };
487
488        let handler_group_builder = match handler_group_builder {
489            Some(handler_group_builder) => handler_group_builder,
490            None => {
491                let region_lease_handler = RegionLeaseHandler::new(
492                    distributed_time_constants::REGION_LEASE_SECS,
493                    table_metadata_manager.clone(),
494                    memory_region_keeper.clone(),
495                    customized_region_lease_renewer,
496                );
497
498                HeartbeatHandlerGroupBuilder::new(pushers)
499                    .with_plugins(plugins.clone())
500                    .with_region_failure_handler(region_failover_handler)
501                    .with_region_lease_handler(Some(region_lease_handler))
502                    .with_flush_stats_factor(Some(options.flush_stats_factor))
503                    .with_flow_state_handler(Some(flow_state_handler))
504                    .with_persist_stats_handler(persist_region_stats_handler)
505                    .add_default_handlers()
506            }
507        };
508
509        let enable_telemetry = options.enable_telemetry;
510        let metasrv_home = Path::new(&options.data_home)
511            .join(METASRV_DATA_DIR)
512            .to_string_lossy()
513            .to_string();
514
515        let reconciliation_manager = Arc::new(ReconciliationManager::new(
516            node_manager.clone(),
517            table_metadata_manager.clone(),
518            cache_invalidator.clone(),
519            procedure_manager.clone(),
520        ));
521        reconciliation_manager
522            .try_start()
523            .context(error::InitReconciliationManagerSnafu)?;
524
525        Ok(Metasrv {
526            state,
527            started: Arc::new(AtomicBool::new(false)),
528            start_time_ms: common_time::util::current_time_millis() as u64,
529            options,
530            in_memory,
531            kv_backend,
532            leader_cached_kv_backend,
533            meta_peer_client: meta_peer_client.clone(),
534            selector,
535            selector_ctx,
536            // TODO(jeremy): We do not allow configuring the flow selector.
537            flow_selector,
538            handler_group: RwLock::new(None),
539            handler_group_builder: Mutex::new(Some(handler_group_builder)),
540            election,
541            procedure_manager,
542            mailbox,
543            ddl_manager,
544            wal_options_allocator,
545            table_metadata_manager,
546            runtime_switch_manager,
547            greptimedb_telemetry_task: get_greptimedb_telemetry_task(
548                Some(metasrv_home),
549                meta_peer_client,
550                enable_telemetry,
551            )
552            .await,
553            plugins: plugins.unwrap_or_else(Plugins::default),
554            memory_region_keeper,
555            region_migration_manager,
556            region_supervisor_ticker,
557            cache_invalidator,
558            leader_region_registry,
559            wal_prune_ticker,
560            region_flush_ticker,
561            table_id_sequence,
562            reconciliation_manager,
563            topic_stats_registry,
564        })
565    }
566}
567
568fn build_default_meta_peer_client(
569    election: &Option<ElectionRef>,
570    in_memory: &ResettableKvBackendRef,
571) -> MetaPeerClientRef {
572    MetaPeerClientBuilder::default()
573        .election(election.clone())
574        .in_memory(in_memory.clone())
575        .build()
576        .map(Arc::new)
577        // Safety: all required fields set at initialization
578        .unwrap()
579}
580
581fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
582    let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
583        .initial(1)
584        .step(100)
585        .build();
586
587    HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
588}
589
590fn build_procedure_manager(
591    options: &MetasrvOptions,
592    kv_backend: &KvBackendRef,
593    runtime_switch_manager: &RuntimeSwitchManagerRef,
594    event_recorder: EventRecorderRef,
595) -> ProcedureManagerRef {
596    let manager_config = ManagerConfig {
597        max_retry_times: options.procedure.max_retry_times,
598        retry_delay: options.procedure.retry_delay,
599        max_running_procedures: options.procedure.max_running_procedures,
600        ..Default::default()
601    };
602    let kv_state_store = Arc::new(
603        KvStateStore::new(kv_backend.clone()).with_max_value_size(
604            options
605                .procedure
606                .max_metadata_value_size
607                .map(|v| v.as_bytes() as usize),
608        ),
609    );
610
611    Arc::new(LocalManager::new(
612        manager_config,
613        kv_state_store.clone(),
614        kv_state_store,
615        Some(runtime_switch_manager.clone()),
616        Some(event_recorder),
617    ))
618}
619
620impl Default for MetasrvBuilder {
621    fn default() -> Self {
622        Self::new()
623    }
624}