meta_srv/metasrv/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18use std::time::Duration;
19
20use client::client_manager::NodeClients;
21use client::inserter::InsertOptions;
22use common_base::Plugins;
23use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
24use common_event_recorder::{DEFAULT_COMPACTION_TIME_WINDOW, EventRecorderImpl, EventRecorderRef};
25use common_grpc::channel_manager::ChannelConfig;
26use common_meta::ddl::flow_meta::FlowMetadataAllocator;
27use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
28use common_meta::ddl::{
29    DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
30};
31use common_meta::ddl_manager::DdlManager;
32use common_meta::distributed_time_constants::{self};
33use common_meta::key::TableMetadataManager;
34use common_meta::key::flow::FlowMetadataManager;
35use common_meta::key::flow::flow_state::FlowStateManager;
36use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
37use common_meta::kv_backend::memory::MemoryKvBackend;
38use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
39use common_meta::node_manager::NodeManagerRef;
40use common_meta::reconciliation::manager::ReconciliationManager;
41use common_meta::region_keeper::MemoryRegionKeeper;
42use common_meta::region_registry::LeaderRegionRegistry;
43use common_meta::sequence::SequenceBuilder;
44use common_meta::state_store::KvStateStore;
45use common_meta::stats::topic::TopicStatsRegistry;
46use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
47use common_procedure::ProcedureManagerRef;
48use common_procedure::local::{LocalManager, ManagerConfig};
49use common_telemetry::{info, warn};
50use snafu::{ResultExt, ensure};
51use store_api::storage::MAX_REGION_SEQ;
52
53use crate::bootstrap::build_default_meta_peer_client;
54use crate::cache_invalidator::MetasrvCacheInvalidator;
55use crate::cluster::MetaPeerClientRef;
56use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
57use crate::events::EventHandlerImpl;
58use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
59use crate::handler::failure_handler::RegionFailureHandler;
60use crate::handler::flow_state_handler::FlowStateHandler;
61use crate::handler::persist_stats_handler::PersistStatsHandler;
62use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
63use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
64use crate::metasrv::{
65    ElectionRef, FLOW_ID_SEQ, METASRV_DATA_DIR, Metasrv, MetasrvInfo, MetasrvOptions,
66    RegionStatAwareSelectorRef, SelectTarget, SelectorContext, SelectorRef, TABLE_ID_SEQ,
67};
68use crate::peer::MetasrvPeerAllocator;
69use crate::procedure::region_migration::DefaultContextFactory;
70use crate::procedure::region_migration::manager::RegionMigrationManager;
71use crate::procedure::wal_prune::Context as WalPruneContext;
72use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
73use crate::region::flush_trigger::RegionFlushTrigger;
74use crate::region::supervisor::{
75    DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL, HeartbeatAcceptor,
76    RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
77    RegionSupervisorTicker,
78};
79use crate::selector::lease_based::LeaseBasedSelector;
80use crate::selector::round_robin::RoundRobinSelector;
81use crate::service::mailbox::MailboxRef;
82use crate::service::store::cached_kv::LeaderCachedKvBackend;
83use crate::state::State;
84use crate::utils::insert_forwarder::InsertForwarder;
85
86/// The time window for twcs compaction of the region stats table.
87const REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW: Duration = Duration::from_days(1);
88
89// TODO(fys): try use derive_builder macro
90pub struct MetasrvBuilder {
91    options: Option<MetasrvOptions>,
92    kv_backend: Option<KvBackendRef>,
93    in_memory: Option<ResettableKvBackendRef>,
94    selector: Option<SelectorRef>,
95    handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
96    election: Option<ElectionRef>,
97    meta_peer_client: Option<MetaPeerClientRef>,
98    node_manager: Option<NodeManagerRef>,
99    plugins: Option<Plugins>,
100    table_metadata_allocator: Option<TableMetadataAllocatorRef>,
101}
102
103impl MetasrvBuilder {
104    pub fn new() -> Self {
105        Self {
106            kv_backend: None,
107            in_memory: None,
108            selector: None,
109            handler_group_builder: None,
110            meta_peer_client: None,
111            election: None,
112            options: None,
113            node_manager: None,
114            plugins: None,
115            table_metadata_allocator: None,
116        }
117    }
118
119    pub fn options(mut self, options: MetasrvOptions) -> Self {
120        self.options = Some(options);
121        self
122    }
123
124    pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
125        self.kv_backend = Some(kv_backend);
126        self
127    }
128
129    pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
130        self.in_memory = Some(in_memory);
131        self
132    }
133
134    pub fn selector(mut self, selector: SelectorRef) -> Self {
135        self.selector = Some(selector);
136        self
137    }
138
139    pub fn heartbeat_handler(
140        mut self,
141        handler_group_builder: HeartbeatHandlerGroupBuilder,
142    ) -> Self {
143        self.handler_group_builder = Some(handler_group_builder);
144        self
145    }
146
147    pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
148        self.meta_peer_client = Some(meta_peer_client);
149        self
150    }
151
152    pub fn election(mut self, election: Option<ElectionRef>) -> Self {
153        self.election = election;
154        self
155    }
156
157    pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
158        self.node_manager = Some(node_manager);
159        self
160    }
161
162    pub fn plugins(mut self, plugins: Plugins) -> Self {
163        self.plugins = Some(plugins);
164        self
165    }
166
167    pub fn table_metadata_allocator(
168        mut self,
169        table_metadata_allocator: TableMetadataAllocatorRef,
170    ) -> Self {
171        self.table_metadata_allocator = Some(table_metadata_allocator);
172        self
173    }
174
175    pub async fn build(self) -> Result<Metasrv> {
176        let MetasrvBuilder {
177            election,
178            meta_peer_client,
179            options,
180            kv_backend,
181            in_memory,
182            selector,
183            handler_group_builder,
184            node_manager,
185            plugins,
186            table_metadata_allocator,
187        } = self;
188
189        let options = options.unwrap_or_default();
190
191        let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
192        let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
193
194        let state = Arc::new(RwLock::new(match election {
195            None => State::leader(options.grpc.server_addr.to_string(), true),
196            Some(_) => State::follower(options.grpc.server_addr.to_string()),
197        }));
198
199        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
200            state.clone(),
201            kv_backend.clone(),
202        ));
203
204        let meta_peer_client = meta_peer_client
205            .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
206
207        let event_inserter = Box::new(InsertForwarder::new(
208            meta_peer_client.clone(),
209            Some(InsertOptions {
210                ttl: options.event_recorder.ttl,
211                append_mode: true,
212                twcs_compaction_time_window: Some(DEFAULT_COMPACTION_TIME_WINDOW),
213            }),
214        ));
215        // Builds the event recorder to record important events and persist them as the system table.
216        let event_recorder = Arc::new(EventRecorderImpl::new(Box::new(EventHandlerImpl::new(
217            event_inserter,
218        ))));
219
220        let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector));
221        let pushers = Pushers::default();
222        let mailbox = build_mailbox(&kv_backend, &pushers);
223        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
224        let procedure_manager = build_procedure_manager(
225            &options,
226            &kv_backend,
227            &runtime_switch_manager,
228            event_recorder,
229        );
230
231        let table_metadata_manager = Arc::new(TableMetadataManager::new(
232            leader_cached_kv_backend.clone() as _,
233        ));
234        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
235            leader_cached_kv_backend.clone() as _,
236        ));
237
238        let selector_ctx = SelectorContext {
239            peer_discovery: meta_peer_client.clone(),
240        };
241
242        let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
243            .await
244            .context(BuildWalOptionsAllocatorSnafu)?;
245        let wal_options_allocator = Arc::new(wal_options_allocator);
246        let is_remote_wal = wal_options_allocator.is_remote_wal();
247        let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
248            let sequence = Arc::new(
249                SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
250                    .initial(MIN_USER_TABLE_ID as u64)
251                    .step(10)
252                    .build(),
253            );
254            let peer_allocator = Arc::new(
255                MetasrvPeerAllocator::new(selector_ctx.clone(), selector.clone())
256                    .with_max_items(MAX_REGION_SEQ),
257            );
258            Arc::new(TableMetadataAllocator::with_peer_allocator(
259                sequence,
260                wal_options_allocator.clone(),
261                peer_allocator,
262            ))
263        });
264        let table_id_sequence = table_metadata_allocator.table_id_sequence();
265
266        let flow_selector =
267            Arc::new(RoundRobinSelector::new(SelectTarget::Flownode)) as SelectorRef;
268
269        let flow_metadata_allocator = {
270            // for now flownode just use round-robin selector
271            let flow_selector_ctx = selector_ctx.clone();
272            let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
273                flow_selector_ctx,
274                flow_selector.clone(),
275            ));
276            let seq = Arc::new(
277                SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
278                    .initial(MIN_USER_FLOW_ID as u64)
279                    .step(10)
280                    .build(),
281            );
282
283            Arc::new(FlowMetadataAllocator::with_peer_allocator(
284                seq,
285                peer_allocator,
286            ))
287        };
288        let flow_state_handler =
289            FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
290
291        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
292        let node_manager = node_manager.unwrap_or_else(|| {
293            let datanode_client_channel_config = ChannelConfig::new()
294                .timeout(options.datanode.client.timeout)
295                .connect_timeout(options.datanode.client.connect_timeout)
296                .tcp_nodelay(options.datanode.client.tcp_nodelay);
297            Arc::new(NodeClients::new(datanode_client_channel_config))
298        });
299        let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
300            mailbox.clone(),
301            MetasrvInfo {
302                server_addr: options.grpc.server_addr.clone(),
303            },
304        ));
305
306        if !is_remote_wal && options.enable_region_failover {
307            ensure!(
308                options.allow_region_failover_on_local_wal,
309                error::UnexpectedSnafu {
310                    violated: "Region failover is not supported in the local WAL implementation!
311                    If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
312                }
313            );
314            if options.allow_region_failover_on_local_wal {
315                warn!(
316                    "Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!"
317                );
318            }
319        }
320
321        let (tx, rx) = RegionSupervisor::channel();
322        let (region_failure_detector_controller, region_supervisor_ticker): (
323            RegionFailureDetectorControllerRef,
324            Option<std::sync::Arc<RegionSupervisorTicker>>,
325        ) = if options.enable_region_failover {
326            (
327                Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
328                Some(Arc::new(RegionSupervisorTicker::new(
329                    DEFAULT_TICK_INTERVAL,
330                    options.region_failure_detector_initialization_delay,
331                    DEFAULT_INITIALIZATION_RETRY_PERIOD,
332                    tx.clone(),
333                ))),
334            )
335        } else {
336            (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
337        };
338
339        // region migration manager
340        let region_migration_manager = Arc::new(RegionMigrationManager::new(
341            procedure_manager.clone(),
342            DefaultContextFactory::new(
343                in_memory.clone(),
344                table_metadata_manager.clone(),
345                memory_region_keeper.clone(),
346                region_failure_detector_controller.clone(),
347                mailbox.clone(),
348                options.grpc.server_addr.clone(),
349                cache_invalidator.clone(),
350            ),
351        ));
352        region_migration_manager.try_start()?;
353        let region_supervisor_selector = plugins
354            .as_ref()
355            .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
356
357        let supervisor_selector = match region_supervisor_selector {
358            Some(selector) => {
359                info!("Using region stat aware selector");
360                RegionSupervisorSelector::RegionStatAwareSelector(selector)
361            }
362            None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
363        };
364
365        let region_failover_handler = if options.enable_region_failover {
366            let region_supervisor = RegionSupervisor::new(
367                rx,
368                options.failure_detector,
369                selector_ctx.clone(),
370                supervisor_selector,
371                region_migration_manager.clone(),
372                runtime_switch_manager.clone(),
373                meta_peer_client.clone(),
374                leader_cached_kv_backend.clone(),
375            );
376
377            Some(RegionFailureHandler::new(
378                region_supervisor,
379                HeartbeatAcceptor::new(tx),
380            ))
381        } else {
382            None
383        };
384
385        let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
386        let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
387
388        let ddl_context = DdlContext {
389            node_manager: node_manager.clone(),
390            cache_invalidator: cache_invalidator.clone(),
391            memory_region_keeper: memory_region_keeper.clone(),
392            leader_region_registry: leader_region_registry.clone(),
393            table_metadata_manager: table_metadata_manager.clone(),
394            table_metadata_allocator: table_metadata_allocator.clone(),
395            flow_metadata_manager: flow_metadata_manager.clone(),
396            flow_metadata_allocator: flow_metadata_allocator.clone(),
397            region_failure_detector_controller,
398        };
399        let procedure_manager_c = procedure_manager.clone();
400        let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
401            .context(error::InitDdlManagerSnafu)?;
402        #[cfg(feature = "enterprise")]
403        let ddl_manager = {
404            let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| {
405                plugins.get::<common_meta::ddl_manager::TriggerDdlManagerRef>()
406            });
407            ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
408        };
409        let ddl_manager = Arc::new(ddl_manager);
410
411        let region_flush_ticker = if is_remote_wal {
412            let remote_wal_options = options.wal.remote_wal_options().unwrap();
413            let (region_flush_trigger, region_flush_ticker) = RegionFlushTrigger::new(
414                table_metadata_manager.clone(),
415                leader_region_registry.clone(),
416                topic_stats_registry.clone(),
417                mailbox.clone(),
418                options.grpc.server_addr.clone(),
419                remote_wal_options.flush_trigger_size,
420                remote_wal_options.checkpoint_trigger_size,
421            );
422            region_flush_trigger.try_start()?;
423
424            Some(Arc::new(region_flush_ticker))
425        } else {
426            None
427        };
428
429        // remote WAL prune ticker and manager
430        let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
431            let (tx, rx) = WalPruneManager::channel();
432            // Safety: Must be remote WAL.
433            let remote_wal_options = options.wal.remote_wal_options().unwrap();
434            let kafka_client = build_kafka_client(&remote_wal_options.connection)
435                .await
436                .context(error::BuildKafkaClientSnafu)?;
437            let wal_prune_context = WalPruneContext {
438                client: Arc::new(kafka_client),
439                table_metadata_manager: table_metadata_manager.clone(),
440                leader_region_registry: leader_region_registry.clone(),
441            };
442            let wal_prune_manager = WalPruneManager::new(
443                remote_wal_options.auto_prune_parallelism,
444                rx,
445                procedure_manager.clone(),
446                wal_prune_context,
447            );
448            // Start manager in background. Ticker will be started in the main thread to send ticks.
449            wal_prune_manager.try_start().await?;
450            let wal_prune_ticker = Arc::new(WalPruneTicker::new(
451                remote_wal_options.auto_prune_interval,
452                tx.clone(),
453            ));
454            Some(wal_prune_ticker)
455        } else {
456            None
457        };
458
459        let customized_region_lease_renewer = plugins
460            .as_ref()
461            .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
462
463        let persist_region_stats_handler = if !options.stats_persistence.ttl.is_zero() {
464            let inserter = Box::new(InsertForwarder::new(
465                meta_peer_client.clone(),
466                Some(InsertOptions {
467                    ttl: options.stats_persistence.ttl,
468                    append_mode: true,
469                    twcs_compaction_time_window: Some(
470                        REGION_STATS_TABLE_TWCS_COMPACTION_TIME_WINDOW,
471                    ),
472                }),
473            ));
474
475            Some(PersistStatsHandler::new(
476                inserter,
477                options.stats_persistence.interval,
478            ))
479        } else {
480            None
481        };
482
483        let handler_group_builder = match handler_group_builder {
484            Some(handler_group_builder) => handler_group_builder,
485            None => {
486                let region_lease_handler = RegionLeaseHandler::new(
487                    distributed_time_constants::REGION_LEASE_SECS,
488                    table_metadata_manager.clone(),
489                    memory_region_keeper.clone(),
490                    customized_region_lease_renewer,
491                );
492
493                HeartbeatHandlerGroupBuilder::new(pushers)
494                    .with_plugins(plugins.clone())
495                    .with_region_failure_handler(region_failover_handler)
496                    .with_region_lease_handler(Some(region_lease_handler))
497                    .with_flush_stats_factor(Some(options.flush_stats_factor))
498                    .with_flow_state_handler(Some(flow_state_handler))
499                    .with_persist_stats_handler(persist_region_stats_handler)
500                    .add_default_handlers()
501            }
502        };
503
504        let enable_telemetry = options.enable_telemetry;
505        let metasrv_home = Path::new(&options.data_home)
506            .join(METASRV_DATA_DIR)
507            .to_string_lossy()
508            .to_string();
509
510        let reconciliation_manager = Arc::new(ReconciliationManager::new(
511            node_manager.clone(),
512            table_metadata_manager.clone(),
513            cache_invalidator.clone(),
514            procedure_manager.clone(),
515        ));
516        reconciliation_manager
517            .try_start()
518            .context(error::InitReconciliationManagerSnafu)?;
519
520        Ok(Metasrv {
521            state,
522            started: Arc::new(AtomicBool::new(false)),
523            start_time_ms: common_time::util::current_time_millis() as u64,
524            options,
525            in_memory,
526            kv_backend,
527            leader_cached_kv_backend,
528            meta_peer_client: meta_peer_client.clone(),
529            selector,
530            selector_ctx,
531            // TODO(jeremy): We do not allow configuring the flow selector.
532            flow_selector,
533            handler_group: RwLock::new(None),
534            handler_group_builder: Mutex::new(Some(handler_group_builder)),
535            election,
536            procedure_manager,
537            mailbox,
538            ddl_manager,
539            wal_options_allocator,
540            table_metadata_manager,
541            runtime_switch_manager,
542            greptimedb_telemetry_task: get_greptimedb_telemetry_task(
543                Some(metasrv_home),
544                meta_peer_client,
545                enable_telemetry,
546            )
547            .await,
548            plugins: plugins.unwrap_or_else(Plugins::default),
549            memory_region_keeper,
550            region_migration_manager,
551            region_supervisor_ticker,
552            cache_invalidator,
553            leader_region_registry,
554            wal_prune_ticker,
555            region_flush_ticker,
556            table_id_sequence,
557            reconciliation_manager,
558            topic_stats_registry,
559            resource_spec: Default::default(),
560        })
561    }
562}
563
564fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
565    let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
566        .initial(1)
567        .step(100)
568        .build();
569
570    HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
571}
572
573fn build_procedure_manager(
574    options: &MetasrvOptions,
575    kv_backend: &KvBackendRef,
576    runtime_switch_manager: &RuntimeSwitchManagerRef,
577    event_recorder: EventRecorderRef,
578) -> ProcedureManagerRef {
579    let manager_config = ManagerConfig {
580        max_retry_times: options.procedure.max_retry_times,
581        retry_delay: options.procedure.retry_delay,
582        max_running_procedures: options.procedure.max_running_procedures,
583        ..Default::default()
584    };
585    let kv_state_store = Arc::new(
586        KvStateStore::new(kv_backend.clone()).with_max_value_size(
587            options
588                .procedure
589                .max_metadata_value_size
590                .map(|v| v.as_bytes() as usize),
591        ),
592    );
593
594    Arc::new(LocalManager::new(
595        manager_config,
596        kv_state_store.clone(),
597        kv_state_store,
598        Some(runtime_switch_manager.clone()),
599        Some(event_recorder),
600    ))
601}
602
603impl Default for MetasrvBuilder {
604    fn default() -> Self {
605        Self::new()
606    }
607}