meta_srv/metasrv/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::atomic::AtomicBool;
16use std::sync::{Arc, Mutex, RwLock};
17
18use client::client_manager::NodeClients;
19use common_base::Plugins;
20use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
21use common_grpc::channel_manager::ChannelConfig;
22use common_meta::ddl::flow_meta::FlowMetadataAllocator;
23use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
24use common_meta::ddl::{
25    DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
26};
27use common_meta::ddl_manager::DdlManager;
28use common_meta::distributed_time_constants;
29use common_meta::key::flow::flow_state::FlowStateManager;
30use common_meta::key::flow::FlowMetadataManager;
31use common_meta::key::maintenance::MaintenanceModeManager;
32use common_meta::key::TableMetadataManager;
33use common_meta::kv_backend::memory::MemoryKvBackend;
34use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
35use common_meta::node_manager::NodeManagerRef;
36use common_meta::region_keeper::MemoryRegionKeeper;
37use common_meta::region_registry::LeaderRegionRegistry;
38use common_meta::sequence::SequenceBuilder;
39use common_meta::state_store::KvStateStore;
40use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
41use common_procedure::local::{LocalManager, ManagerConfig};
42use common_procedure::ProcedureManagerRef;
43use common_telemetry::{info, warn};
44use snafu::{ensure, ResultExt};
45
46use crate::cache_invalidator::MetasrvCacheInvalidator;
47use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
48use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
49use crate::flow_meta_alloc::FlowPeerAllocator;
50use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
51use crate::handler::failure_handler::RegionFailureHandler;
52use crate::handler::flow_state_handler::FlowStateHandler;
53use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
54use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
55use crate::lease::MetaPeerLookupService;
56use crate::metasrv::{
57    ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget,
58    SelectorContext, SelectorRef, FLOW_ID_SEQ, TABLE_ID_SEQ,
59};
60use crate::procedure::region_migration::manager::RegionMigrationManager;
61use crate::procedure::region_migration::DefaultContextFactory;
62use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
63use crate::procedure::wal_prune::Context as WalPruneContext;
64use crate::region::supervisor::{
65    HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
66    RegionSupervisorTicker, DEFAULT_TICK_INTERVAL,
67};
68use crate::selector::lease_based::LeaseBasedSelector;
69use crate::selector::round_robin::RoundRobinSelector;
70use crate::service::mailbox::MailboxRef;
71use crate::service::store::cached_kv::LeaderCachedKvBackend;
72use crate::state::State;
73use crate::table_meta_alloc::MetasrvPeerAllocator;
74
75// TODO(fys): try use derive_builder macro
76pub struct MetasrvBuilder {
77    options: Option<MetasrvOptions>,
78    kv_backend: Option<KvBackendRef>,
79    in_memory: Option<ResettableKvBackendRef>,
80    selector: Option<SelectorRef>,
81    handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
82    election: Option<ElectionRef>,
83    meta_peer_client: Option<MetaPeerClientRef>,
84    node_manager: Option<NodeManagerRef>,
85    plugins: Option<Plugins>,
86    table_metadata_allocator: Option<TableMetadataAllocatorRef>,
87}
88
89impl MetasrvBuilder {
90    pub fn new() -> Self {
91        Self {
92            kv_backend: None,
93            in_memory: None,
94            selector: None,
95            handler_group_builder: None,
96            meta_peer_client: None,
97            election: None,
98            options: None,
99            node_manager: None,
100            plugins: None,
101            table_metadata_allocator: None,
102        }
103    }
104
105    pub fn options(mut self, options: MetasrvOptions) -> Self {
106        self.options = Some(options);
107        self
108    }
109
110    pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
111        self.kv_backend = Some(kv_backend);
112        self
113    }
114
115    pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
116        self.in_memory = Some(in_memory);
117        self
118    }
119
120    pub fn selector(mut self, selector: SelectorRef) -> Self {
121        self.selector = Some(selector);
122        self
123    }
124
125    pub fn heartbeat_handler(
126        mut self,
127        handler_group_builder: HeartbeatHandlerGroupBuilder,
128    ) -> Self {
129        self.handler_group_builder = Some(handler_group_builder);
130        self
131    }
132
133    pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
134        self.meta_peer_client = Some(meta_peer_client);
135        self
136    }
137
138    pub fn election(mut self, election: Option<ElectionRef>) -> Self {
139        self.election = election;
140        self
141    }
142
143    pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
144        self.node_manager = Some(node_manager);
145        self
146    }
147
148    pub fn plugins(mut self, plugins: Plugins) -> Self {
149        self.plugins = Some(plugins);
150        self
151    }
152
153    pub fn table_metadata_allocator(
154        mut self,
155        table_metadata_allocator: TableMetadataAllocatorRef,
156    ) -> Self {
157        self.table_metadata_allocator = Some(table_metadata_allocator);
158        self
159    }
160
161    pub async fn build(self) -> Result<Metasrv> {
162        let started = Arc::new(AtomicBool::new(false));
163
164        let MetasrvBuilder {
165            election,
166            meta_peer_client,
167            options,
168            kv_backend,
169            in_memory,
170            selector,
171            handler_group_builder,
172            node_manager,
173            plugins,
174            table_metadata_allocator,
175        } = self;
176
177        let options = options.unwrap_or_default();
178
179        let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
180        let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
181
182        let state = Arc::new(RwLock::new(match election {
183            None => State::leader(options.server_addr.to_string(), true),
184            Some(_) => State::follower(options.server_addr.to_string()),
185        }));
186
187        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
188            state.clone(),
189            kv_backend.clone(),
190        ));
191
192        let meta_peer_client = meta_peer_client
193            .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
194        let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
195        let pushers = Pushers::default();
196        let mailbox = build_mailbox(&kv_backend, &pushers);
197        let procedure_manager = build_procedure_manager(&options, &kv_backend);
198
199        let table_metadata_manager = Arc::new(TableMetadataManager::new(
200            leader_cached_kv_backend.clone() as _,
201        ));
202        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
203            leader_cached_kv_backend.clone() as _,
204        ));
205        let maintenance_mode_manager = Arc::new(MaintenanceModeManager::new(kv_backend.clone()));
206        let selector_ctx = SelectorContext {
207            server_addr: options.server_addr.clone(),
208            datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
209            flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
210            kv_backend: kv_backend.clone(),
211            meta_peer_client: meta_peer_client.clone(),
212            table_id: None,
213        };
214
215        let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
216            .await
217            .context(BuildWalOptionsAllocatorSnafu)?;
218        let wal_options_allocator = Arc::new(wal_options_allocator);
219        let is_remote_wal = wal_options_allocator.is_remote_wal();
220        let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
221            let sequence = Arc::new(
222                SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
223                    .initial(MIN_USER_TABLE_ID as u64)
224                    .step(10)
225                    .build(),
226            );
227            let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
228                selector_ctx.clone(),
229                selector.clone(),
230            ));
231            Arc::new(TableMetadataAllocator::with_peer_allocator(
232                sequence,
233                wal_options_allocator.clone(),
234                peer_allocator,
235            ))
236        });
237
238        let flow_selector = Arc::new(RoundRobinSelector::new(
239            SelectTarget::Flownode,
240            Arc::new(Vec::new()),
241        )) as SelectorRef;
242
243        let flow_metadata_allocator = {
244            // for now flownode just use round-robin selector
245            let flow_selector_ctx = selector_ctx.clone();
246            let peer_allocator = Arc::new(FlowPeerAllocator::new(
247                flow_selector_ctx,
248                flow_selector.clone(),
249            ));
250            let seq = Arc::new(
251                SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
252                    .initial(MIN_USER_FLOW_ID as u64)
253                    .step(10)
254                    .build(),
255            );
256
257            Arc::new(FlowMetadataAllocator::with_peer_allocator(
258                seq,
259                peer_allocator,
260            ))
261        };
262        let flow_state_handler =
263            FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
264
265        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
266        let node_manager = node_manager.unwrap_or_else(|| {
267            let datanode_client_channel_config = ChannelConfig::new()
268                .timeout(options.datanode.client.timeout)
269                .connect_timeout(options.datanode.client.connect_timeout)
270                .tcp_nodelay(options.datanode.client.tcp_nodelay);
271            Arc::new(NodeClients::new(datanode_client_channel_config))
272        });
273        let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
274            mailbox.clone(),
275            MetasrvInfo {
276                server_addr: options.server_addr.clone(),
277            },
278        ));
279        let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
280
281        if !is_remote_wal && options.enable_region_failover {
282            ensure!(
283                options.allow_region_failover_on_local_wal,
284                error::UnexpectedSnafu {
285                    violated: "Region failover is not supported in the local WAL implementation! 
286                    If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
287                }
288            );
289            if options.allow_region_failover_on_local_wal {
290                warn!("Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!");
291            }
292        }
293
294        let (tx, rx) = RegionSupervisor::channel();
295        let (region_failure_detector_controller, region_supervisor_ticker): (
296            RegionFailureDetectorControllerRef,
297            Option<std::sync::Arc<RegionSupervisorTicker>>,
298        ) = if options.enable_region_failover {
299            (
300                Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
301                Some(Arc::new(RegionSupervisorTicker::new(
302                    DEFAULT_TICK_INTERVAL,
303                    tx.clone(),
304                ))),
305            )
306        } else {
307            (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
308        };
309
310        // region migration manager
311        let region_migration_manager = Arc::new(RegionMigrationManager::new(
312            procedure_manager.clone(),
313            DefaultContextFactory::new(
314                in_memory.clone(),
315                table_metadata_manager.clone(),
316                memory_region_keeper.clone(),
317                region_failure_detector_controller.clone(),
318                mailbox.clone(),
319                options.server_addr.clone(),
320                cache_invalidator.clone(),
321            ),
322        ));
323        region_migration_manager.try_start()?;
324        let region_supervisor_selector = plugins
325            .as_ref()
326            .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
327
328        let supervisor_selector = match region_supervisor_selector {
329            Some(selector) => {
330                info!("Using region stat aware selector");
331                RegionSupervisorSelector::RegionStatAwareSelector(selector)
332            }
333            None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
334        };
335
336        let region_failover_handler = if options.enable_region_failover {
337            let region_supervisor = RegionSupervisor::new(
338                rx,
339                options.failure_detector,
340                selector_ctx.clone(),
341                supervisor_selector,
342                region_migration_manager.clone(),
343                maintenance_mode_manager.clone(),
344                peer_lookup_service.clone(),
345            );
346
347            Some(RegionFailureHandler::new(
348                region_supervisor,
349                HeartbeatAcceptor::new(tx),
350            ))
351        } else {
352            None
353        };
354
355        let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
356        let ddl_manager = Arc::new(
357            DdlManager::try_new(
358                DdlContext {
359                    node_manager,
360                    cache_invalidator: cache_invalidator.clone(),
361                    memory_region_keeper: memory_region_keeper.clone(),
362                    leader_region_registry: leader_region_registry.clone(),
363                    table_metadata_manager: table_metadata_manager.clone(),
364                    table_metadata_allocator: table_metadata_allocator.clone(),
365                    flow_metadata_manager: flow_metadata_manager.clone(),
366                    flow_metadata_allocator: flow_metadata_allocator.clone(),
367                    region_failure_detector_controller,
368                },
369                procedure_manager.clone(),
370                true,
371            )
372            .context(error::InitDdlManagerSnafu)?,
373        );
374
375        // remote WAL prune ticker and manager
376        let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
377            let (tx, rx) = WalPruneManager::channel();
378            // Safety: Must be remote WAL.
379            let remote_wal_options = options.wal.remote_wal_options().unwrap();
380            let kafka_client = build_kafka_client(&remote_wal_options.connection)
381                .await
382                .context(error::BuildKafkaClientSnafu)?;
383            let wal_prune_context = WalPruneContext {
384                client: Arc::new(kafka_client),
385                table_metadata_manager: table_metadata_manager.clone(),
386                leader_region_registry: leader_region_registry.clone(),
387                server_addr: options.server_addr.clone(),
388                mailbox: mailbox.clone(),
389            };
390            let wal_prune_manager = WalPruneManager::new(
391                table_metadata_manager.clone(),
392                remote_wal_options.auto_prune_parallelism,
393                rx,
394                procedure_manager.clone(),
395                wal_prune_context,
396                remote_wal_options.trigger_flush_threshold,
397            );
398            // Start manager in background. Ticker will be started in the main thread to send ticks.
399            wal_prune_manager.try_start().await?;
400            let wal_prune_ticker = Arc::new(WalPruneTicker::new(
401                remote_wal_options.auto_prune_interval,
402                tx.clone(),
403            ));
404            Some(wal_prune_ticker)
405        } else {
406            None
407        };
408
409        let customized_region_lease_renewer = plugins
410            .as_ref()
411            .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
412
413        let handler_group_builder = match handler_group_builder {
414            Some(handler_group_builder) => handler_group_builder,
415            None => {
416                let region_lease_handler = RegionLeaseHandler::new(
417                    distributed_time_constants::REGION_LEASE_SECS,
418                    table_metadata_manager.clone(),
419                    memory_region_keeper.clone(),
420                    customized_region_lease_renewer,
421                );
422
423                HeartbeatHandlerGroupBuilder::new(pushers)
424                    .with_plugins(plugins.clone())
425                    .with_region_failure_handler(region_failover_handler)
426                    .with_region_lease_handler(Some(region_lease_handler))
427                    .with_flush_stats_factor(Some(options.flush_stats_factor))
428                    .with_flow_state_handler(Some(flow_state_handler))
429                    .add_default_handlers()
430            }
431        };
432
433        let enable_telemetry = options.enable_telemetry;
434        let metasrv_home = options.data_home.to_string();
435
436        Ok(Metasrv {
437            state,
438            started,
439            start_time_ms: common_time::util::current_time_millis() as u64,
440            options,
441            in_memory,
442            kv_backend,
443            leader_cached_kv_backend,
444            meta_peer_client: meta_peer_client.clone(),
445            selector,
446            // TODO(jeremy): We do not allow configuring the flow selector.
447            flow_selector,
448            handler_group: RwLock::new(None),
449            handler_group_builder: Mutex::new(Some(handler_group_builder)),
450            election,
451            procedure_manager,
452            mailbox,
453            procedure_executor: ddl_manager,
454            wal_options_allocator,
455            table_metadata_manager,
456            maintenance_mode_manager,
457            greptimedb_telemetry_task: get_greptimedb_telemetry_task(
458                Some(metasrv_home),
459                meta_peer_client,
460                enable_telemetry,
461            )
462            .await,
463            plugins: plugins.unwrap_or_else(Plugins::default),
464            memory_region_keeper,
465            region_migration_manager,
466            region_supervisor_ticker,
467            cache_invalidator,
468            leader_region_registry,
469            wal_prune_ticker,
470        })
471    }
472}
473
474fn build_default_meta_peer_client(
475    election: &Option<ElectionRef>,
476    in_memory: &ResettableKvBackendRef,
477) -> MetaPeerClientRef {
478    MetaPeerClientBuilder::default()
479        .election(election.clone())
480        .in_memory(in_memory.clone())
481        .build()
482        .map(Arc::new)
483        // Safety: all required fields set at initialization
484        .unwrap()
485}
486
487fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
488    let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
489        .initial(1)
490        .step(100)
491        .build();
492
493    HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
494}
495
496fn build_procedure_manager(
497    options: &MetasrvOptions,
498    kv_backend: &KvBackendRef,
499) -> ProcedureManagerRef {
500    let manager_config = ManagerConfig {
501        max_retry_times: options.procedure.max_retry_times,
502        retry_delay: options.procedure.retry_delay,
503        max_running_procedures: options.procedure.max_running_procedures,
504        ..Default::default()
505    };
506    let kv_state_store = Arc::new(
507        KvStateStore::new(kv_backend.clone()).with_max_value_size(
508            options
509                .procedure
510                .max_metadata_value_size
511                .map(|v| v.as_bytes() as usize),
512        ),
513    );
514
515    Arc::new(LocalManager::new(
516        manager_config,
517        kv_state_store.clone(),
518        kv_state_store,
519    ))
520}
521
522impl Default for MetasrvBuilder {
523    fn default() -> Self {
524        Self::new()
525    }
526}