datanode/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode implementation.
16
17use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::datanode::TopicStatsReporter;
26use common_meta::key::runtime_switch::RuntimeSwitchManager;
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29pub use common_procedure::options::ProcedureConfig;
30use common_stat::ResourceStatImpl;
31use common_telemetry::{error, info, warn};
32use common_wal::config::DatanodeWalConfig;
33use common_wal::config::kafka::DatanodeKafkaConfig;
34use common_wal::config::raft_engine::RaftEngineConfig;
35use file_engine::engine::FileRegionEngine;
36use log_store::kafka::log_store::KafkaLogStore;
37use log_store::kafka::{GlobalIndexCollector, default_index_file};
38use log_store::noop::log_store::NoopLogStore;
39use log_store::raft_engine::log_store::RaftEngineLogStore;
40use meta_client::MetaClientRef;
41use metric_engine::engine::MetricEngine;
42use mito2::config::MitoConfig;
43use mito2::engine::{MitoEngine, MitoEngineBuilder};
44use mito2::region::opener::PartitionExprFetcherRef;
45use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
46use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
47use object_store::util::normalize_dir;
48use query::QueryEngineFactory;
49use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
50use servers::export_metrics::ExportMetricsTask;
51use servers::server::ServerHandlers;
52use snafu::{OptionExt, ResultExt, ensure};
53use store_api::path_utils::WAL_DIR;
54use store_api::region_engine::{
55    RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
56};
57use tokio::fs;
58use tokio::sync::Notify;
59
60use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
61use crate::error::{
62    self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
63    MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
64    ShutdownServerSnafu, StartServerSnafu,
65};
66use crate::event_listener::{
67    NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
68    new_region_server_event_channel,
69};
70use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
71use crate::heartbeat::HeartbeatTask;
72use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
73use crate::region_server::{DummyTableProviderFactory, RegionServer};
74use crate::store::{self, new_object_store_without_cache};
75use crate::utils::{RegionOpenRequests, build_region_open_requests};
76
77/// Datanode service.
78pub struct Datanode {
79    services: ServerHandlers,
80    heartbeat_task: Option<HeartbeatTask>,
81    region_event_receiver: Option<RegionServerEventReceiver>,
82    region_server: RegionServer,
83    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
84    leases_notifier: Option<Arc<Notify>>,
85    plugins: Plugins,
86    export_metrics_task: Option<ExportMetricsTask>,
87}
88
89impl Datanode {
90    pub async fn start(&mut self) -> Result<()> {
91        info!("Starting datanode instance...");
92
93        self.start_heartbeat().await?;
94        self.wait_coordinated().await;
95
96        self.start_telemetry();
97
98        if let Some(t) = self.export_metrics_task.as_ref() {
99            t.start(None).context(StartServerSnafu)?
100        }
101
102        self.services.start_all().await.context(StartServerSnafu)
103    }
104
105    pub fn server_handlers(&self) -> &ServerHandlers {
106        &self.services
107    }
108
109    pub fn start_telemetry(&self) {
110        if let Err(e) = self.greptimedb_telemetry_task.start() {
111            warn!(e; "Failed to start telemetry task!");
112        }
113    }
114
115    pub async fn start_heartbeat(&mut self) -> Result<()> {
116        if let Some(task) = &self.heartbeat_task {
117            // Safety: The event_receiver must exist.
118            let receiver = self.region_event_receiver.take().unwrap();
119
120            task.start(receiver, self.leases_notifier.clone()).await?;
121        }
122        Ok(())
123    }
124
125    /// If `leases_notifier` exists, it waits until leases have been obtained in all regions.
126    pub async fn wait_coordinated(&mut self) {
127        if let Some(notifier) = self.leases_notifier.take() {
128            notifier.notified().await;
129        }
130    }
131
132    pub fn setup_services(&mut self, services: ServerHandlers) {
133        self.services = services;
134    }
135
136    pub async fn shutdown(&mut self) -> Result<()> {
137        self.services
138            .shutdown_all()
139            .await
140            .context(ShutdownServerSnafu)?;
141
142        let _ = self.greptimedb_telemetry_task.stop().await;
143        if let Some(heartbeat_task) = &self.heartbeat_task {
144            heartbeat_task
145                .close()
146                .map_err(BoxedError::new)
147                .context(ShutdownInstanceSnafu)?;
148        }
149        self.region_server.stop().await?;
150        Ok(())
151    }
152
153    pub fn region_server(&self) -> RegionServer {
154        self.region_server.clone()
155    }
156
157    pub fn plugins(&self) -> Plugins {
158        self.plugins.clone()
159    }
160}
161
162pub struct DatanodeBuilder {
163    opts: DatanodeOptions,
164    table_provider_factory: Option<TableProviderFactoryRef>,
165    plugins: Plugins,
166    meta_client: Option<MetaClientRef>,
167    kv_backend: KvBackendRef,
168    cache_registry: Option<Arc<LayeredCacheRegistry>>,
169    topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
170    #[cfg(feature = "enterprise")]
171    extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
172}
173
174impl DatanodeBuilder {
175    pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
176        Self {
177            opts,
178            table_provider_factory: None,
179            plugins,
180            meta_client: None,
181            kv_backend,
182            cache_registry: None,
183            #[cfg(feature = "enterprise")]
184            extension_range_provider_factory: None,
185            topic_stats_reporter: None,
186        }
187    }
188
189    pub fn options(&self) -> &DatanodeOptions {
190        &self.opts
191    }
192
193    pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
194        self.meta_client = Some(client);
195        self
196    }
197
198    pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
199        self.cache_registry = Some(registry);
200        self
201    }
202
203    pub fn kv_backend(&self) -> &KvBackendRef {
204        &self.kv_backend
205    }
206
207    pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
208        self.table_provider_factory = Some(factory);
209        self
210    }
211
212    #[cfg(feature = "enterprise")]
213    pub fn with_extension_range_provider(
214        &mut self,
215        extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
216    ) -> &mut Self {
217        self.extension_range_provider_factory = Some(extension_range_provider_factory);
218        self
219    }
220
221    pub async fn build(mut self) -> Result<Datanode> {
222        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
223
224        let meta_client = self.meta_client.take();
225
226        // If metasrv client is provided, we will use it to control the region server.
227        // Otherwise the region server is self-controlled, meaning no heartbeat and immediately
228        // writable upon open.
229        let controlled_by_metasrv = meta_client.is_some();
230
231        // build and initialize region server
232        let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
233            let (tx, rx) = new_region_server_event_channel();
234            (Box::new(tx) as _, Some(rx))
235        } else {
236            (Box::new(NoopRegionServerEventListener) as _, None)
237        };
238
239        let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
240        let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
241        let table_id_schema_cache: TableSchemaCacheRef =
242            cache_registry.get().context(MissingCacheSnafu)?;
243
244        let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
245            table_id_schema_cache,
246            schema_cache,
247        ));
248        let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id)));
249        let region_server = self
250            .new_region_server(
251                schema_metadata_manager,
252                region_event_listener,
253                file_ref_manager,
254            )
255            .await?;
256
257        // TODO(weny): Considering introducing a readonly kv_backend trait.
258        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
259        let is_recovery_mode = runtime_switch_manager
260            .recovery_mode()
261            .await
262            .context(GetMetadataSnafu)?;
263
264        let region_open_requests =
265            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
266        let open_all_regions = open_all_regions(
267            region_server.clone(),
268            region_open_requests,
269            !controlled_by_metasrv,
270            self.opts.init_regions_parallelism,
271            // Ignore nonexistent regions in recovery mode.
272            is_recovery_mode,
273        );
274
275        if self.opts.init_regions_in_background {
276            // Opens regions in background.
277            common_runtime::spawn_global(async move {
278                if let Err(err) = open_all_regions.await {
279                    error!(err; "Failed to open regions during the startup.");
280                }
281            });
282        } else {
283            open_all_regions.await?;
284        }
285
286        let mut resource_stat = ResourceStatImpl::default();
287        resource_stat.start_collect_cpu_usage();
288
289        let heartbeat_task = if let Some(meta_client) = meta_client {
290            Some(
291                HeartbeatTask::try_new(
292                    &self.opts,
293                    region_server.clone(),
294                    meta_client,
295                    cache_registry,
296                    self.plugins.clone(),
297                    Arc::new(resource_stat),
298                )
299                .await?,
300            )
301        } else {
302            None
303        };
304
305        let is_standalone = heartbeat_task.is_none();
306        let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
307            Some(self.opts.storage.data_home.clone()),
308            is_standalone && self.opts.enable_telemetry,
309        )
310        .await;
311
312        let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
313            Some(Arc::new(Notify::new()))
314        } else {
315            None
316        };
317
318        let export_metrics_task =
319            ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
320                .context(StartServerSnafu)?;
321
322        Ok(Datanode {
323            services: ServerHandlers::default(),
324            heartbeat_task,
325            region_server,
326            greptimedb_telemetry_task,
327            region_event_receiver,
328            leases_notifier,
329            plugins: self.plugins.clone(),
330            export_metrics_task,
331        })
332    }
333
334    /// Builds [ObjectStoreManager] from [StorageConfig].
335    pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
336        let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
337        let default_name = cfg.store.config_name();
338        let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
339        for store in &cfg.providers {
340            object_store_manager.add(
341                store.config_name(),
342                store::new_object_store(store.clone(), &cfg.data_home).await?,
343            );
344        }
345        Ok(Arc::new(object_store_manager))
346    }
347
348    #[cfg(test)]
349    /// Open all regions belong to this datanode.
350    async fn initialize_region_server(
351        &self,
352        region_server: &RegionServer,
353        open_with_writable: bool,
354    ) -> Result<()> {
355        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
356
357        // TODO(weny): Considering introducing a readonly kv_backend trait.
358        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
359        let is_recovery_mode = runtime_switch_manager
360            .recovery_mode()
361            .await
362            .context(GetMetadataSnafu)?;
363        let region_open_requests =
364            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
365
366        open_all_regions(
367            region_server.clone(),
368            region_open_requests,
369            open_with_writable,
370            self.opts.init_regions_parallelism,
371            is_recovery_mode,
372        )
373        .await
374    }
375
376    async fn new_region_server(
377        &mut self,
378        schema_metadata_manager: SchemaMetadataManagerRef,
379        event_listener: RegionServerEventListenerRef,
380        file_ref_manager: FileReferenceManagerRef,
381    ) -> Result<RegionServer> {
382        let opts: &DatanodeOptions = &self.opts;
383
384        let query_engine_factory = QueryEngineFactory::new_with_plugins(
385            // query engine in datanode only executes plan with resolved table source.
386            DummyCatalogManager::arc(),
387            None,
388            None,
389            None,
390            None,
391            None,
392            false,
393            self.plugins.clone(),
394            opts.query.clone(),
395        );
396        let query_engine = query_engine_factory.query_engine();
397
398        let table_provider_factory = self
399            .table_provider_factory
400            .clone()
401            .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
402
403        let mut region_server = RegionServer::with_table_provider(
404            query_engine,
405            common_runtime::global_runtime(),
406            event_listener,
407            table_provider_factory,
408            opts.max_concurrent_queries,
409            //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
410            Duration::from_millis(100),
411            opts.grpc.flight_compression,
412        );
413
414        let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
415        let engines = self
416            .build_store_engines(
417                object_store_manager,
418                schema_metadata_manager,
419                file_ref_manager,
420                self.plugins.clone(),
421            )
422            .await?;
423        for engine in engines {
424            region_server.register_engine(engine);
425        }
426        if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
427            region_server.set_topic_stats_reporter(topic_stats_reporter);
428        }
429
430        Ok(region_server)
431    }
432
433    // internal utils
434
435    /// Builds [RegionEngineRef] from `store_engine` section in `opts`
436    async fn build_store_engines(
437        &mut self,
438        object_store_manager: ObjectStoreManagerRef,
439        schema_metadata_manager: SchemaMetadataManagerRef,
440        file_ref_manager: FileReferenceManagerRef,
441        plugins: Plugins,
442    ) -> Result<Vec<RegionEngineRef>> {
443        let mut metric_engine_config = metric_engine::config::EngineConfig::default();
444        let mut mito_engine_config = MitoConfig::default();
445        let mut file_engine_config = file_engine::config::EngineConfig::default();
446
447        for engine in &self.opts.region_engine {
448            match engine {
449                RegionEngineConfig::Mito(config) => {
450                    mito_engine_config = config.clone();
451                }
452                RegionEngineConfig::File(config) => {
453                    file_engine_config = config.clone();
454                }
455                RegionEngineConfig::Metric(metric_config) => {
456                    metric_engine_config = metric_config.clone();
457                }
458            }
459        }
460
461        // Build a fetcher to backfill partition_expr on open.
462        let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
463        let mito_engine = self
464            .build_mito_engine(
465                object_store_manager.clone(),
466                mito_engine_config,
467                schema_metadata_manager.clone(),
468                file_ref_manager.clone(),
469                fetcher.clone(),
470                plugins.clone(),
471            )
472            .await?;
473
474        let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
475            .context(BuildMetricEngineSnafu)?;
476
477        let file_engine = FileRegionEngine::new(
478            file_engine_config,
479            object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
480        );
481
482        Ok(vec![
483            Arc::new(mito_engine) as _,
484            Arc::new(metric_engine) as _,
485            Arc::new(file_engine) as _,
486        ])
487    }
488
489    /// Builds [MitoEngine] according to options.
490    async fn build_mito_engine(
491        &mut self,
492        object_store_manager: ObjectStoreManagerRef,
493        mut config: MitoConfig,
494        schema_metadata_manager: SchemaMetadataManagerRef,
495        file_ref_manager: FileReferenceManagerRef,
496        partition_expr_fetcher: PartitionExprFetcherRef,
497        plugins: Plugins,
498    ) -> Result<MitoEngine> {
499        let opts = &self.opts;
500        if opts.storage.is_object_storage() {
501            // Enable the write cache when setting object storage
502            config.enable_write_cache = true;
503            info!("Configured 'enable_write_cache=true' for mito engine.");
504        }
505
506        let mito_engine = match &opts.wal {
507            DatanodeWalConfig::RaftEngine(raft_engine_config) => {
508                let log_store =
509                    Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
510                        .await?;
511
512                let builder = MitoEngineBuilder::new(
513                    &opts.storage.data_home,
514                    config,
515                    log_store,
516                    object_store_manager,
517                    schema_metadata_manager,
518                    file_ref_manager,
519                    partition_expr_fetcher.clone(),
520                    plugins,
521                );
522
523                #[cfg(feature = "enterprise")]
524                let builder = builder.with_extension_range_provider_factory(
525                    self.extension_range_provider_factory.take(),
526                );
527
528                builder.try_build().await.context(BuildMitoEngineSnafu)?
529            }
530            DatanodeWalConfig::Kafka(kafka_config) => {
531                if kafka_config.create_index && opts.node_id.is_none() {
532                    warn!("The WAL index creation only available in distributed mode.")
533                }
534                let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
535                {
536                    let operator = new_object_store_without_cache(
537                        &opts.storage.store,
538                        &opts.storage.data_home,
539                    )
540                    .await?;
541                    let path = default_index_file(opts.node_id.unwrap());
542                    Some(Self::build_global_index_collector(
543                        kafka_config.dump_index_interval,
544                        operator,
545                        path,
546                    ))
547                } else {
548                    None
549                };
550
551                let log_store =
552                    Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
553                self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
554                let builder = MitoEngineBuilder::new(
555                    &opts.storage.data_home,
556                    config,
557                    log_store,
558                    object_store_manager,
559                    schema_metadata_manager,
560                    file_ref_manager,
561                    partition_expr_fetcher,
562                    plugins,
563                );
564
565                #[cfg(feature = "enterprise")]
566                let builder = builder.with_extension_range_provider_factory(
567                    self.extension_range_provider_factory.take(),
568                );
569
570                builder.try_build().await.context(BuildMitoEngineSnafu)?
571            }
572            DatanodeWalConfig::Noop => {
573                let log_store = Arc::new(NoopLogStore);
574
575                let builder = MitoEngineBuilder::new(
576                    &opts.storage.data_home,
577                    config,
578                    log_store,
579                    object_store_manager,
580                    schema_metadata_manager,
581                    file_ref_manager,
582                    partition_expr_fetcher.clone(),
583                    plugins,
584                );
585
586                #[cfg(feature = "enterprise")]
587                let builder = builder.with_extension_range_provider_factory(
588                    self.extension_range_provider_factory.take(),
589                );
590
591                builder.try_build().await.context(BuildMitoEngineSnafu)?
592            }
593        };
594        Ok(mito_engine)
595    }
596
597    /// Builds [RaftEngineLogStore].
598    async fn build_raft_engine_log_store(
599        data_home: &str,
600        config: &RaftEngineConfig,
601    ) -> Result<Arc<RaftEngineLogStore>> {
602        let data_home = normalize_dir(data_home);
603        let wal_dir = match &config.dir {
604            Some(dir) => dir.clone(),
605            None => format!("{}{WAL_DIR}", data_home),
606        };
607
608        // create WAL directory
609        fs::create_dir_all(Path::new(&wal_dir))
610            .await
611            .context(CreateDirSnafu { dir: &wal_dir })?;
612        info!(
613            "Creating raft-engine logstore with config: {:?} and storage path: {}",
614            config, &wal_dir
615        );
616        let logstore = RaftEngineLogStore::try_new(wal_dir, config)
617            .await
618            .map_err(Box::new)
619            .context(OpenLogStoreSnafu)?;
620
621        Ok(Arc::new(logstore))
622    }
623
624    /// Builds [`KafkaLogStore`].
625    async fn build_kafka_log_store(
626        config: &DatanodeKafkaConfig,
627        global_index_collector: Option<GlobalIndexCollector>,
628    ) -> Result<Arc<KafkaLogStore>> {
629        KafkaLogStore::try_new(config, global_index_collector)
630            .await
631            .map_err(Box::new)
632            .context(OpenLogStoreSnafu)
633            .map(Arc::new)
634    }
635
636    /// Builds [`GlobalIndexCollector`]
637    fn build_global_index_collector(
638        dump_index_interval: Duration,
639        operator: object_store::ObjectStore,
640        path: String,
641    ) -> GlobalIndexCollector {
642        GlobalIndexCollector::new(dump_index_interval, operator, path)
643    }
644}
645
646/// Open all regions belong to this datanode.
647async fn open_all_regions(
648    region_server: RegionServer,
649    region_open_requests: RegionOpenRequests,
650    open_with_writable: bool,
651    init_regions_parallelism: usize,
652    ignore_nonexistent_region: bool,
653) -> Result<()> {
654    let RegionOpenRequests {
655        leader_regions,
656        #[cfg(feature = "enterprise")]
657        follower_regions,
658    } = region_open_requests;
659
660    let leader_region_num = leader_regions.len();
661    info!("going to open {} region(s)", leader_region_num);
662    let now = Instant::now();
663    let open_regions = region_server
664        .handle_batch_open_requests(
665            init_regions_parallelism,
666            leader_regions,
667            ignore_nonexistent_region,
668        )
669        .await?;
670    info!(
671        "Opened {} regions in {:?}",
672        open_regions.len(),
673        now.elapsed()
674    );
675    if !ignore_nonexistent_region {
676        ensure!(
677            open_regions.len() == leader_region_num,
678            error::UnexpectedSnafu {
679                violated: format!(
680                    "Expected to open {} of regions, only {} of regions has opened",
681                    leader_region_num,
682                    open_regions.len()
683                )
684            }
685        );
686    } else if open_regions.len() != leader_region_num {
687        warn!(
688            "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
689            leader_region_num,
690            open_regions.len()
691        );
692    }
693
694    for region_id in open_regions {
695        if open_with_writable {
696            let res = region_server.set_region_role(region_id, RegionRole::Leader);
697            match res {
698                Ok(_) => {
699                    // Finalize leadership: persist backfilled metadata.
700                    if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
701                        .set_region_role_state_gracefully(
702                            region_id,
703                            SettableRegionRoleState::Leader,
704                        )
705                        .await?
706                    {
707                        error!(err; "failed to convert region {region_id} to leader");
708                    }
709                }
710                Err(e) => {
711                    error!(e; "failed to convert region {region_id} to leader");
712                }
713            }
714        }
715    }
716
717    #[cfg(feature = "enterprise")]
718    if !follower_regions.is_empty() {
719        use tokio::time::Instant;
720
721        let follower_region_num = follower_regions.len();
722        info!("going to open {} follower region(s)", follower_region_num);
723
724        let now = Instant::now();
725        let open_regions = region_server
726            .handle_batch_open_requests(
727                init_regions_parallelism,
728                follower_regions,
729                ignore_nonexistent_region,
730            )
731            .await?;
732        info!(
733            "Opened {} follower regions in {:?}",
734            open_regions.len(),
735            now.elapsed()
736        );
737
738        if !ignore_nonexistent_region {
739            ensure!(
740                open_regions.len() == follower_region_num,
741                error::UnexpectedSnafu {
742                    violated: format!(
743                        "Expected to open {} of follower regions, only {} of regions has opened",
744                        follower_region_num,
745                        open_regions.len()
746                    )
747                }
748            );
749        } else if open_regions.len() != follower_region_num {
750            warn!(
751                "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
752                follower_region_num,
753                open_regions.len()
754            );
755        }
756    }
757
758    info!("all regions are opened");
759
760    Ok(())
761}
762
763#[cfg(test)]
764mod tests {
765    use std::assert_matches::assert_matches;
766    use std::collections::{BTreeMap, HashMap};
767    use std::sync::Arc;
768
769    use cache::build_datanode_cache_registry;
770    use common_base::Plugins;
771    use common_meta::cache::LayeredCacheRegistryBuilder;
772    use common_meta::key::RegionRoleSet;
773    use common_meta::key::datanode_table::DatanodeTableManager;
774    use common_meta::kv_backend::KvBackendRef;
775    use common_meta::kv_backend::memory::MemoryKvBackend;
776    use mito2::engine::MITO_ENGINE_NAME;
777    use store_api::region_request::RegionRequest;
778    use store_api::storage::RegionId;
779
780    use crate::config::DatanodeOptions;
781    use crate::datanode::DatanodeBuilder;
782    use crate::tests::{MockRegionEngine, mock_region_server};
783
784    async fn setup_table_datanode(kv: &KvBackendRef) {
785        let mgr = DatanodeTableManager::new(kv.clone());
786        let txn = mgr
787            .build_create_txn(
788                1028,
789                MITO_ENGINE_NAME,
790                "foo/bar/weny",
791                HashMap::from([("foo".to_string(), "bar".to_string())]),
792                HashMap::default(),
793                BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
794            )
795            .unwrap();
796
797        let r = kv.txn(txn).await.unwrap();
798        assert!(r.succeeded);
799    }
800
801    #[tokio::test]
802    async fn test_initialize_region_server() {
803        common_telemetry::init_default_ut_logging();
804        let mut mock_region_server = mock_region_server();
805        let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
806
807        mock_region_server.register_engine(mock_region.clone());
808
809        let kv_backend = Arc::new(MemoryKvBackend::new());
810        let layered_cache_registry = Arc::new(
811            LayeredCacheRegistryBuilder::default()
812                .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
813                .build(),
814        );
815
816        let mut builder = DatanodeBuilder::new(
817            DatanodeOptions {
818                node_id: Some(0),
819                ..Default::default()
820            },
821            Plugins::default(),
822            kv_backend.clone(),
823        );
824        builder.with_cache_registry(layered_cache_registry);
825        setup_table_datanode(&(kv_backend as _)).await;
826
827        builder
828            .initialize_region_server(&mock_region_server, false)
829            .await
830            .unwrap();
831
832        for i in 0..3 {
833            let (region_id, req) = mock_region_handler.recv().await.unwrap();
834            assert_eq!(region_id, RegionId::new(1028, i));
835            if let RegionRequest::Open(req) = req {
836                assert_eq!(
837                    req.options,
838                    HashMap::from([("foo".to_string(), "bar".to_string())])
839                )
840            } else {
841                unreachable!()
842            }
843        }
844
845        assert_matches!(
846            mock_region_handler.try_recv(),
847            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
848        );
849    }
850}