meta_srv/metasrv/
builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::path::Path;
16use std::sync::atomic::AtomicBool;
17use std::sync::{Arc, Mutex, RwLock};
18
19use client::client_manager::NodeClients;
20use common_base::Plugins;
21use common_catalog::consts::{MIN_USER_FLOW_ID, MIN_USER_TABLE_ID};
22use common_grpc::channel_manager::ChannelConfig;
23use common_meta::ddl::flow_meta::FlowMetadataAllocator;
24use common_meta::ddl::table_meta::{TableMetadataAllocator, TableMetadataAllocatorRef};
25use common_meta::ddl::{
26    DdlContext, NoopRegionFailureDetectorControl, RegionFailureDetectorControllerRef,
27};
28use common_meta::ddl_manager::DdlManager;
29use common_meta::distributed_time_constants;
30use common_meta::key::flow::flow_state::FlowStateManager;
31use common_meta::key::flow::FlowMetadataManager;
32use common_meta::key::runtime_switch::{RuntimeSwitchManager, RuntimeSwitchManagerRef};
33use common_meta::key::TableMetadataManager;
34use common_meta::kv_backend::memory::MemoryKvBackend;
35use common_meta::kv_backend::{KvBackendRef, ResettableKvBackendRef};
36use common_meta::node_manager::NodeManagerRef;
37use common_meta::region_keeper::MemoryRegionKeeper;
38use common_meta::region_registry::LeaderRegionRegistry;
39use common_meta::sequence::SequenceBuilder;
40use common_meta::state_store::KvStateStore;
41use common_meta::wal_options_allocator::{build_kafka_client, build_wal_options_allocator};
42use common_procedure::local::{LocalManager, ManagerConfig};
43use common_procedure::ProcedureManagerRef;
44use common_telemetry::{info, warn};
45use snafu::{ensure, ResultExt};
46
47use crate::cache_invalidator::MetasrvCacheInvalidator;
48use crate::cluster::{MetaPeerClientBuilder, MetaPeerClientRef};
49use crate::error::{self, BuildWalOptionsAllocatorSnafu, Result};
50use crate::flow_meta_alloc::FlowPeerAllocator;
51use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
52use crate::handler::failure_handler::RegionFailureHandler;
53use crate::handler::flow_state_handler::FlowStateHandler;
54use crate::handler::region_lease_handler::{CustomizedRegionLeaseRenewerRef, RegionLeaseHandler};
55use crate::handler::{HeartbeatHandlerGroupBuilder, HeartbeatMailbox, Pushers};
56use crate::lease::MetaPeerLookupService;
57use crate::metasrv::{
58    ElectionRef, Metasrv, MetasrvInfo, MetasrvOptions, RegionStatAwareSelectorRef, SelectTarget,
59    SelectorContext, SelectorRef, FLOW_ID_SEQ, METASRV_DATA_DIR, TABLE_ID_SEQ,
60};
61use crate::procedure::region_migration::manager::RegionMigrationManager;
62use crate::procedure::region_migration::DefaultContextFactory;
63use crate::procedure::wal_prune::manager::{WalPruneManager, WalPruneTicker};
64use crate::procedure::wal_prune::Context as WalPruneContext;
65use crate::region::supervisor::{
66    HeartbeatAcceptor, RegionFailureDetectorControl, RegionSupervisor, RegionSupervisorSelector,
67    RegionSupervisorTicker, DEFAULT_INITIALIZATION_RETRY_PERIOD, DEFAULT_TICK_INTERVAL,
68};
69use crate::selector::lease_based::LeaseBasedSelector;
70use crate::selector::round_robin::RoundRobinSelector;
71use crate::service::mailbox::MailboxRef;
72use crate::service::store::cached_kv::LeaderCachedKvBackend;
73use crate::state::State;
74use crate::table_meta_alloc::MetasrvPeerAllocator;
75
76// TODO(fys): try use derive_builder macro
77pub struct MetasrvBuilder {
78    options: Option<MetasrvOptions>,
79    kv_backend: Option<KvBackendRef>,
80    in_memory: Option<ResettableKvBackendRef>,
81    selector: Option<SelectorRef>,
82    handler_group_builder: Option<HeartbeatHandlerGroupBuilder>,
83    election: Option<ElectionRef>,
84    meta_peer_client: Option<MetaPeerClientRef>,
85    node_manager: Option<NodeManagerRef>,
86    plugins: Option<Plugins>,
87    table_metadata_allocator: Option<TableMetadataAllocatorRef>,
88}
89
90impl MetasrvBuilder {
91    pub fn new() -> Self {
92        Self {
93            kv_backend: None,
94            in_memory: None,
95            selector: None,
96            handler_group_builder: None,
97            meta_peer_client: None,
98            election: None,
99            options: None,
100            node_manager: None,
101            plugins: None,
102            table_metadata_allocator: None,
103        }
104    }
105
106    pub fn options(mut self, options: MetasrvOptions) -> Self {
107        self.options = Some(options);
108        self
109    }
110
111    pub fn kv_backend(mut self, kv_backend: KvBackendRef) -> Self {
112        self.kv_backend = Some(kv_backend);
113        self
114    }
115
116    pub fn in_memory(mut self, in_memory: ResettableKvBackendRef) -> Self {
117        self.in_memory = Some(in_memory);
118        self
119    }
120
121    pub fn selector(mut self, selector: SelectorRef) -> Self {
122        self.selector = Some(selector);
123        self
124    }
125
126    pub fn heartbeat_handler(
127        mut self,
128        handler_group_builder: HeartbeatHandlerGroupBuilder,
129    ) -> Self {
130        self.handler_group_builder = Some(handler_group_builder);
131        self
132    }
133
134    pub fn meta_peer_client(mut self, meta_peer_client: MetaPeerClientRef) -> Self {
135        self.meta_peer_client = Some(meta_peer_client);
136        self
137    }
138
139    pub fn election(mut self, election: Option<ElectionRef>) -> Self {
140        self.election = election;
141        self
142    }
143
144    pub fn node_manager(mut self, node_manager: NodeManagerRef) -> Self {
145        self.node_manager = Some(node_manager);
146        self
147    }
148
149    pub fn plugins(mut self, plugins: Plugins) -> Self {
150        self.plugins = Some(plugins);
151        self
152    }
153
154    pub fn table_metadata_allocator(
155        mut self,
156        table_metadata_allocator: TableMetadataAllocatorRef,
157    ) -> Self {
158        self.table_metadata_allocator = Some(table_metadata_allocator);
159        self
160    }
161
162    pub async fn build(self) -> Result<Metasrv> {
163        let MetasrvBuilder {
164            election,
165            meta_peer_client,
166            options,
167            kv_backend,
168            in_memory,
169            selector,
170            handler_group_builder,
171            node_manager,
172            plugins,
173            table_metadata_allocator,
174        } = self;
175
176        let options = options.unwrap_or_default();
177
178        let kv_backend = kv_backend.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
179        let in_memory = in_memory.unwrap_or_else(|| Arc::new(MemoryKvBackend::new()));
180
181        let state = Arc::new(RwLock::new(match election {
182            None => State::leader(options.grpc.server_addr.to_string(), true),
183            Some(_) => State::follower(options.grpc.server_addr.to_string()),
184        }));
185
186        let leader_cached_kv_backend = Arc::new(LeaderCachedKvBackend::new(
187            state.clone(),
188            kv_backend.clone(),
189        ));
190
191        let meta_peer_client = meta_peer_client
192            .unwrap_or_else(|| build_default_meta_peer_client(&election, &in_memory));
193        let selector = selector.unwrap_or_else(|| Arc::new(LeaseBasedSelector::default()));
194        let pushers = Pushers::default();
195        let mailbox = build_mailbox(&kv_backend, &pushers);
196        let runtime_switch_manager = Arc::new(RuntimeSwitchManager::new(kv_backend.clone()));
197        let procedure_manager =
198            build_procedure_manager(&options, &kv_backend, &runtime_switch_manager);
199
200        let table_metadata_manager = Arc::new(TableMetadataManager::new(
201            leader_cached_kv_backend.clone() as _,
202        ));
203        let flow_metadata_manager = Arc::new(FlowMetadataManager::new(
204            leader_cached_kv_backend.clone() as _,
205        ));
206
207        let selector_ctx = SelectorContext {
208            server_addr: options.grpc.server_addr.clone(),
209            datanode_lease_secs: distributed_time_constants::DATANODE_LEASE_SECS,
210            flownode_lease_secs: distributed_time_constants::FLOWNODE_LEASE_SECS,
211            kv_backend: kv_backend.clone(),
212            meta_peer_client: meta_peer_client.clone(),
213            table_id: None,
214        };
215
216        let wal_options_allocator = build_wal_options_allocator(&options.wal, kv_backend.clone())
217            .await
218            .context(BuildWalOptionsAllocatorSnafu)?;
219        let wal_options_allocator = Arc::new(wal_options_allocator);
220        let is_remote_wal = wal_options_allocator.is_remote_wal();
221        let table_metadata_allocator = table_metadata_allocator.unwrap_or_else(|| {
222            let sequence = Arc::new(
223                SequenceBuilder::new(TABLE_ID_SEQ, kv_backend.clone())
224                    .initial(MIN_USER_TABLE_ID as u64)
225                    .step(10)
226                    .build(),
227            );
228            let peer_allocator = Arc::new(MetasrvPeerAllocator::new(
229                selector_ctx.clone(),
230                selector.clone(),
231            ));
232            Arc::new(TableMetadataAllocator::with_peer_allocator(
233                sequence,
234                wal_options_allocator.clone(),
235                peer_allocator,
236            ))
237        });
238
239        let flow_selector = Arc::new(RoundRobinSelector::new(
240            SelectTarget::Flownode,
241            Arc::new(Vec::new()),
242        )) as SelectorRef;
243
244        let flow_metadata_allocator = {
245            // for now flownode just use round-robin selector
246            let flow_selector_ctx = selector_ctx.clone();
247            let peer_allocator = Arc::new(FlowPeerAllocator::new(
248                flow_selector_ctx,
249                flow_selector.clone(),
250            ));
251            let seq = Arc::new(
252                SequenceBuilder::new(FLOW_ID_SEQ, kv_backend.clone())
253                    .initial(MIN_USER_FLOW_ID as u64)
254                    .step(10)
255                    .build(),
256            );
257
258            Arc::new(FlowMetadataAllocator::with_peer_allocator(
259                seq,
260                peer_allocator,
261            ))
262        };
263        let flow_state_handler =
264            FlowStateHandler::new(FlowStateManager::new(in_memory.clone().as_kv_backend_ref()));
265
266        let memory_region_keeper = Arc::new(MemoryRegionKeeper::default());
267        let node_manager = node_manager.unwrap_or_else(|| {
268            let datanode_client_channel_config = ChannelConfig::new()
269                .timeout(options.datanode.client.timeout)
270                .connect_timeout(options.datanode.client.connect_timeout)
271                .tcp_nodelay(options.datanode.client.tcp_nodelay);
272            Arc::new(NodeClients::new(datanode_client_channel_config))
273        });
274        let cache_invalidator = Arc::new(MetasrvCacheInvalidator::new(
275            mailbox.clone(),
276            MetasrvInfo {
277                server_addr: options.grpc.server_addr.clone(),
278            },
279        ));
280        let peer_lookup_service = Arc::new(MetaPeerLookupService::new(meta_peer_client.clone()));
281
282        if !is_remote_wal && options.enable_region_failover {
283            ensure!(
284                options.allow_region_failover_on_local_wal,
285                error::UnexpectedSnafu {
286                    violated: "Region failover is not supported in the local WAL implementation!
287                    If you want to enable region failover for local WAL, please set `allow_region_failover_on_local_wal` to true.",
288                }
289            );
290            if options.allow_region_failover_on_local_wal {
291                warn!("Region failover is force enabled in the local WAL implementation! This may lead to data loss during failover!");
292            }
293        }
294
295        let (tx, rx) = RegionSupervisor::channel();
296        let (region_failure_detector_controller, region_supervisor_ticker): (
297            RegionFailureDetectorControllerRef,
298            Option<std::sync::Arc<RegionSupervisorTicker>>,
299        ) = if options.enable_region_failover {
300            (
301                Arc::new(RegionFailureDetectorControl::new(tx.clone())) as _,
302                Some(Arc::new(RegionSupervisorTicker::new(
303                    DEFAULT_TICK_INTERVAL,
304                    options.region_failure_detector_initialization_delay,
305                    DEFAULT_INITIALIZATION_RETRY_PERIOD,
306                    tx.clone(),
307                ))),
308            )
309        } else {
310            (Arc::new(NoopRegionFailureDetectorControl) as _, None as _)
311        };
312
313        // region migration manager
314        let region_migration_manager = Arc::new(RegionMigrationManager::new(
315            procedure_manager.clone(),
316            DefaultContextFactory::new(
317                in_memory.clone(),
318                table_metadata_manager.clone(),
319                memory_region_keeper.clone(),
320                region_failure_detector_controller.clone(),
321                mailbox.clone(),
322                options.grpc.server_addr.clone(),
323                cache_invalidator.clone(),
324            ),
325        ));
326        region_migration_manager.try_start()?;
327        let region_supervisor_selector = plugins
328            .as_ref()
329            .and_then(|plugins| plugins.get::<RegionStatAwareSelectorRef>());
330
331        let supervisor_selector = match region_supervisor_selector {
332            Some(selector) => {
333                info!("Using region stat aware selector");
334                RegionSupervisorSelector::RegionStatAwareSelector(selector)
335            }
336            None => RegionSupervisorSelector::NaiveSelector(selector.clone()),
337        };
338
339        let region_failover_handler = if options.enable_region_failover {
340            let region_supervisor = RegionSupervisor::new(
341                rx,
342                options.failure_detector,
343                selector_ctx.clone(),
344                supervisor_selector,
345                region_migration_manager.clone(),
346                runtime_switch_manager.clone(),
347                peer_lookup_service.clone(),
348                leader_cached_kv_backend.clone(),
349            );
350
351            Some(RegionFailureHandler::new(
352                region_supervisor,
353                HeartbeatAcceptor::new(tx),
354            ))
355        } else {
356            None
357        };
358
359        let leader_region_registry = Arc::new(LeaderRegionRegistry::default());
360
361        let ddl_context = DdlContext {
362            node_manager,
363            cache_invalidator: cache_invalidator.clone(),
364            memory_region_keeper: memory_region_keeper.clone(),
365            leader_region_registry: leader_region_registry.clone(),
366            table_metadata_manager: table_metadata_manager.clone(),
367            table_metadata_allocator: table_metadata_allocator.clone(),
368            flow_metadata_manager: flow_metadata_manager.clone(),
369            flow_metadata_allocator: flow_metadata_allocator.clone(),
370            region_failure_detector_controller,
371        };
372        let procedure_manager_c = procedure_manager.clone();
373        let ddl_manager = DdlManager::try_new(ddl_context, procedure_manager_c, true)
374            .context(error::InitDdlManagerSnafu)?;
375        #[cfg(feature = "enterprise")]
376        let ddl_manager = {
377            let trigger_ddl_manager = plugins.as_ref().and_then(|plugins| {
378                plugins.get::<common_meta::ddl_manager::TriggerDdlManagerRef>()
379            });
380            ddl_manager.with_trigger_ddl_manager(trigger_ddl_manager)
381        };
382        let ddl_manager = Arc::new(ddl_manager);
383
384        // remote WAL prune ticker and manager
385        let wal_prune_ticker = if is_remote_wal && options.wal.enable_active_wal_pruning() {
386            let (tx, rx) = WalPruneManager::channel();
387            // Safety: Must be remote WAL.
388            let remote_wal_options = options.wal.remote_wal_options().unwrap();
389            let kafka_client = build_kafka_client(&remote_wal_options.connection)
390                .await
391                .context(error::BuildKafkaClientSnafu)?;
392            let wal_prune_context = WalPruneContext {
393                client: Arc::new(kafka_client),
394                table_metadata_manager: table_metadata_manager.clone(),
395                leader_region_registry: leader_region_registry.clone(),
396                server_addr: options.grpc.server_addr.clone(),
397                mailbox: mailbox.clone(),
398            };
399            let wal_prune_manager = WalPruneManager::new(
400                table_metadata_manager.clone(),
401                remote_wal_options.auto_prune_parallelism,
402                rx,
403                procedure_manager.clone(),
404                wal_prune_context,
405                remote_wal_options.trigger_flush_threshold,
406            );
407            // Start manager in background. Ticker will be started in the main thread to send ticks.
408            wal_prune_manager.try_start().await?;
409            let wal_prune_ticker = Arc::new(WalPruneTicker::new(
410                remote_wal_options.auto_prune_interval,
411                tx.clone(),
412            ));
413            Some(wal_prune_ticker)
414        } else {
415            None
416        };
417
418        let customized_region_lease_renewer = plugins
419            .as_ref()
420            .and_then(|plugins| plugins.get::<CustomizedRegionLeaseRenewerRef>());
421
422        let handler_group_builder = match handler_group_builder {
423            Some(handler_group_builder) => handler_group_builder,
424            None => {
425                let region_lease_handler = RegionLeaseHandler::new(
426                    distributed_time_constants::REGION_LEASE_SECS,
427                    table_metadata_manager.clone(),
428                    memory_region_keeper.clone(),
429                    customized_region_lease_renewer,
430                );
431
432                HeartbeatHandlerGroupBuilder::new(pushers)
433                    .with_plugins(plugins.clone())
434                    .with_region_failure_handler(region_failover_handler)
435                    .with_region_lease_handler(Some(region_lease_handler))
436                    .with_flush_stats_factor(Some(options.flush_stats_factor))
437                    .with_flow_state_handler(Some(flow_state_handler))
438                    .add_default_handlers()
439            }
440        };
441
442        let enable_telemetry = options.enable_telemetry;
443        let metasrv_home = Path::new(&options.data_home)
444            .join(METASRV_DATA_DIR)
445            .to_string_lossy()
446            .to_string();
447
448        Ok(Metasrv {
449            state,
450            started: Arc::new(AtomicBool::new(false)),
451            start_time_ms: common_time::util::current_time_millis() as u64,
452            options,
453            in_memory,
454            kv_backend,
455            leader_cached_kv_backend,
456            meta_peer_client: meta_peer_client.clone(),
457            selector,
458            selector_ctx,
459            // TODO(jeremy): We do not allow configuring the flow selector.
460            flow_selector,
461            handler_group: RwLock::new(None),
462            handler_group_builder: Mutex::new(Some(handler_group_builder)),
463            election,
464            procedure_manager,
465            mailbox,
466            procedure_executor: ddl_manager,
467            wal_options_allocator,
468            table_metadata_manager,
469            runtime_switch_manager,
470            greptimedb_telemetry_task: get_greptimedb_telemetry_task(
471                Some(metasrv_home),
472                meta_peer_client,
473                enable_telemetry,
474            )
475            .await,
476            plugins: plugins.unwrap_or_else(Plugins::default),
477            memory_region_keeper,
478            region_migration_manager,
479            region_supervisor_ticker,
480            cache_invalidator,
481            leader_region_registry,
482            wal_prune_ticker,
483        })
484    }
485}
486
487fn build_default_meta_peer_client(
488    election: &Option<ElectionRef>,
489    in_memory: &ResettableKvBackendRef,
490) -> MetaPeerClientRef {
491    MetaPeerClientBuilder::default()
492        .election(election.clone())
493        .in_memory(in_memory.clone())
494        .build()
495        .map(Arc::new)
496        // Safety: all required fields set at initialization
497        .unwrap()
498}
499
500fn build_mailbox(kv_backend: &KvBackendRef, pushers: &Pushers) -> MailboxRef {
501    let mailbox_sequence = SequenceBuilder::new("heartbeat_mailbox", kv_backend.clone())
502        .initial(1)
503        .step(100)
504        .build();
505
506    HeartbeatMailbox::create(pushers.clone(), mailbox_sequence)
507}
508
509fn build_procedure_manager(
510    options: &MetasrvOptions,
511    kv_backend: &KvBackendRef,
512    runtime_switch_manager: &RuntimeSwitchManagerRef,
513) -> ProcedureManagerRef {
514    let manager_config = ManagerConfig {
515        max_retry_times: options.procedure.max_retry_times,
516        retry_delay: options.procedure.retry_delay,
517        max_running_procedures: options.procedure.max_running_procedures,
518        ..Default::default()
519    };
520    let kv_state_store = Arc::new(
521        KvStateStore::new(kv_backend.clone()).with_max_value_size(
522            options
523                .procedure
524                .max_metadata_value_size
525                .map(|v| v.as_bytes() as usize),
526        ),
527    );
528
529    Arc::new(LocalManager::new(
530        manager_config,
531        kv_state_store.clone(),
532        kv_state_store,
533        Some(runtime_switch_manager.clone()),
534    ))
535}
536
537impl Default for MetasrvBuilder {
538    fn default() -> Self {
539        Self::new()
540    }
541}