datanode/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode implementation.
16
17use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::datanode::TopicStatsReporter;
26use common_meta::key::runtime_switch::RuntimeSwitchManager;
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29pub use common_procedure::options::ProcedureConfig;
30use common_query::prelude::set_default_prefix;
31use common_stat::ResourceStatImpl;
32use common_telemetry::{error, info, warn};
33use common_wal::config::DatanodeWalConfig;
34use common_wal::config::kafka::DatanodeKafkaConfig;
35use common_wal::config::raft_engine::RaftEngineConfig;
36use file_engine::engine::FileRegionEngine;
37use log_store::kafka::log_store::KafkaLogStore;
38use log_store::kafka::{GlobalIndexCollector, default_index_file};
39use log_store::noop::log_store::NoopLogStore;
40use log_store::raft_engine::log_store::RaftEngineLogStore;
41use meta_client::MetaClientRef;
42use metric_engine::engine::MetricEngine;
43use mito2::config::MitoConfig;
44use mito2::engine::{MitoEngine, MitoEngineBuilder};
45use mito2::region::opener::PartitionExprFetcherRef;
46use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
47use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
48use object_store::util::normalize_dir;
49use query::QueryEngineFactory;
50use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
51use servers::server::ServerHandlers;
52use snafu::{OptionExt, ResultExt, ensure};
53use store_api::path_utils::WAL_DIR;
54use store_api::region_engine::{
55    RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
56};
57use tokio::fs;
58use tokio::sync::Notify;
59
60use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
61use crate::error::{
62    self, BuildDatanodeSnafu, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu,
63    GetMetadataSnafu, MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
64    ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
65};
66use crate::event_listener::{
67    NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
68    new_region_server_event_channel,
69};
70use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
71use crate::heartbeat::HeartbeatTask;
72use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
73use crate::region_server::{DummyTableProviderFactory, RegionServer};
74use crate::store::{self, new_object_store_without_cache};
75use crate::utils::{RegionOpenRequests, build_region_open_requests};
76
77/// Datanode service.
78pub struct Datanode {
79    services: ServerHandlers,
80    heartbeat_task: Option<HeartbeatTask>,
81    region_event_receiver: Option<RegionServerEventReceiver>,
82    region_server: RegionServer,
83    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
84    leases_notifier: Option<Arc<Notify>>,
85    plugins: Plugins,
86}
87
88impl Datanode {
89    pub async fn start(&mut self) -> Result<()> {
90        info!("Starting datanode instance...");
91
92        self.start_heartbeat().await?;
93        self.wait_coordinated().await;
94
95        self.start_telemetry();
96
97        self.services.start_all().await.context(StartServerSnafu)
98    }
99
100    pub fn server_handlers(&self) -> &ServerHandlers {
101        &self.services
102    }
103
104    pub fn start_telemetry(&self) {
105        if let Err(e) = self.greptimedb_telemetry_task.start() {
106            warn!(e; "Failed to start telemetry task!");
107        }
108    }
109
110    pub async fn start_heartbeat(&mut self) -> Result<()> {
111        if let Some(task) = &self.heartbeat_task {
112            // Safety: The event_receiver must exist.
113            let receiver = self.region_event_receiver.take().unwrap();
114
115            task.start(receiver, self.leases_notifier.clone()).await?;
116        }
117        Ok(())
118    }
119
120    /// If `leases_notifier` exists, it waits until leases have been obtained in all regions.
121    pub async fn wait_coordinated(&mut self) {
122        if let Some(notifier) = self.leases_notifier.take() {
123            notifier.notified().await;
124        }
125    }
126
127    pub fn setup_services(&mut self, services: ServerHandlers) {
128        self.services = services;
129    }
130
131    pub async fn shutdown(&mut self) -> Result<()> {
132        self.services
133            .shutdown_all()
134            .await
135            .context(ShutdownServerSnafu)?;
136
137        let _ = self.greptimedb_telemetry_task.stop().await;
138        if let Some(heartbeat_task) = &self.heartbeat_task {
139            heartbeat_task
140                .close()
141                .map_err(BoxedError::new)
142                .context(ShutdownInstanceSnafu)?;
143        }
144        self.region_server.stop().await?;
145        Ok(())
146    }
147
148    pub fn region_server(&self) -> RegionServer {
149        self.region_server.clone()
150    }
151
152    pub fn plugins(&self) -> Plugins {
153        self.plugins.clone()
154    }
155}
156
157pub struct DatanodeBuilder {
158    opts: DatanodeOptions,
159    table_provider_factory: Option<TableProviderFactoryRef>,
160    plugins: Plugins,
161    meta_client: Option<MetaClientRef>,
162    kv_backend: KvBackendRef,
163    cache_registry: Option<Arc<LayeredCacheRegistry>>,
164    topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
165    #[cfg(feature = "enterprise")]
166    extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
167}
168
169impl DatanodeBuilder {
170    pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
171        Self {
172            opts,
173            table_provider_factory: None,
174            plugins,
175            meta_client: None,
176            kv_backend,
177            cache_registry: None,
178            #[cfg(feature = "enterprise")]
179            extension_range_provider_factory: None,
180            topic_stats_reporter: None,
181        }
182    }
183
184    pub fn options(&self) -> &DatanodeOptions {
185        &self.opts
186    }
187
188    pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
189        self.meta_client = Some(client);
190        self
191    }
192
193    pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
194        self.cache_registry = Some(registry);
195        self
196    }
197
198    pub fn kv_backend(&self) -> &KvBackendRef {
199        &self.kv_backend
200    }
201
202    pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
203        self.table_provider_factory = Some(factory);
204        self
205    }
206
207    #[cfg(feature = "enterprise")]
208    pub fn with_extension_range_provider(
209        &mut self,
210        extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
211    ) -> &mut Self {
212        self.extension_range_provider_factory = Some(extension_range_provider_factory);
213        self
214    }
215
216    pub async fn build(mut self) -> Result<Datanode> {
217        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
218        set_default_prefix(self.opts.default_column_prefix.as_deref())
219            .map_err(BoxedError::new)
220            .context(BuildDatanodeSnafu)?;
221
222        let meta_client = self.meta_client.take();
223
224        // If metasrv client is provided, we will use it to control the region server.
225        // Otherwise the region server is self-controlled, meaning no heartbeat and immediately
226        // writable upon open.
227        let controlled_by_metasrv = meta_client.is_some();
228
229        // build and initialize region server
230        let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
231            let (tx, rx) = new_region_server_event_channel();
232            (Box::new(tx) as _, Some(rx))
233        } else {
234            (Box::new(NoopRegionServerEventListener) as _, None)
235        };
236
237        let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
238        let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
239        let table_id_schema_cache: TableSchemaCacheRef =
240            cache_registry.get().context(MissingCacheSnafu)?;
241
242        let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
243            table_id_schema_cache,
244            schema_cache,
245        ));
246        let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id)));
247        let region_server = self
248            .new_region_server(
249                schema_metadata_manager,
250                region_event_listener,
251                file_ref_manager,
252            )
253            .await?;
254
255        // TODO(weny): Considering introducing a readonly kv_backend trait.
256        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
257        let is_recovery_mode = runtime_switch_manager
258            .recovery_mode()
259            .await
260            .context(GetMetadataSnafu)?;
261
262        let region_open_requests =
263            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
264        let open_all_regions = open_all_regions(
265            region_server.clone(),
266            region_open_requests,
267            !controlled_by_metasrv,
268            self.opts.init_regions_parallelism,
269            // Ignore nonexistent regions in recovery mode.
270            is_recovery_mode,
271        );
272
273        if self.opts.init_regions_in_background {
274            // Opens regions in background.
275            common_runtime::spawn_global(async move {
276                if let Err(err) = open_all_regions.await {
277                    error!(err; "Failed to open regions during the startup.");
278                }
279            });
280        } else {
281            open_all_regions.await?;
282        }
283
284        let mut resource_stat = ResourceStatImpl::default();
285        resource_stat.start_collect_cpu_usage();
286
287        let heartbeat_task = if let Some(meta_client) = meta_client {
288            Some(
289                HeartbeatTask::try_new(
290                    &self.opts,
291                    region_server.clone(),
292                    meta_client,
293                    cache_registry,
294                    self.plugins.clone(),
295                    Arc::new(resource_stat),
296                )
297                .await?,
298            )
299        } else {
300            None
301        };
302
303        let is_standalone = heartbeat_task.is_none();
304        let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
305            Some(self.opts.storage.data_home.clone()),
306            is_standalone && self.opts.enable_telemetry,
307        )
308        .await;
309
310        let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
311            Some(Arc::new(Notify::new()))
312        } else {
313            None
314        };
315
316        Ok(Datanode {
317            services: ServerHandlers::default(),
318            heartbeat_task,
319            region_server,
320            greptimedb_telemetry_task,
321            region_event_receiver,
322            leases_notifier,
323            plugins: self.plugins.clone(),
324        })
325    }
326
327    /// Builds [ObjectStoreManager] from [StorageConfig].
328    pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
329        let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
330        let default_name = cfg.store.config_name();
331        let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
332        for store in &cfg.providers {
333            object_store_manager.add(
334                store.config_name(),
335                store::new_object_store(store.clone(), &cfg.data_home).await?,
336            );
337        }
338        Ok(Arc::new(object_store_manager))
339    }
340
341    #[cfg(test)]
342    /// Open all regions belong to this datanode.
343    async fn initialize_region_server(
344        &self,
345        region_server: &RegionServer,
346        open_with_writable: bool,
347    ) -> Result<()> {
348        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
349
350        // TODO(weny): Considering introducing a readonly kv_backend trait.
351        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
352        let is_recovery_mode = runtime_switch_manager
353            .recovery_mode()
354            .await
355            .context(GetMetadataSnafu)?;
356        let region_open_requests =
357            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
358
359        open_all_regions(
360            region_server.clone(),
361            region_open_requests,
362            open_with_writable,
363            self.opts.init_regions_parallelism,
364            is_recovery_mode,
365        )
366        .await
367    }
368
369    async fn new_region_server(
370        &mut self,
371        schema_metadata_manager: SchemaMetadataManagerRef,
372        event_listener: RegionServerEventListenerRef,
373        file_ref_manager: FileReferenceManagerRef,
374    ) -> Result<RegionServer> {
375        let opts: &DatanodeOptions = &self.opts;
376
377        let query_engine_factory = QueryEngineFactory::new_with_plugins(
378            // query engine in datanode only executes plan with resolved table source.
379            DummyCatalogManager::arc(),
380            None,
381            None,
382            None,
383            None,
384            None,
385            false,
386            self.plugins.clone(),
387            opts.query.clone(),
388        );
389        let query_engine = query_engine_factory.query_engine();
390
391        let table_provider_factory = self
392            .table_provider_factory
393            .clone()
394            .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
395
396        let mut region_server = RegionServer::with_table_provider(
397            query_engine,
398            common_runtime::global_runtime(),
399            event_listener,
400            table_provider_factory,
401            opts.max_concurrent_queries,
402            //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
403            Duration::from_millis(100),
404            opts.grpc.flight_compression,
405        );
406
407        let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
408        let engines = self
409            .build_store_engines(
410                object_store_manager,
411                schema_metadata_manager,
412                file_ref_manager,
413                self.plugins.clone(),
414            )
415            .await?;
416        for engine in engines {
417            region_server.register_engine(engine);
418        }
419        if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
420            region_server.set_topic_stats_reporter(topic_stats_reporter);
421        }
422
423        Ok(region_server)
424    }
425
426    // internal utils
427
428    /// Builds [RegionEngineRef] from `store_engine` section in `opts`
429    async fn build_store_engines(
430        &mut self,
431        object_store_manager: ObjectStoreManagerRef,
432        schema_metadata_manager: SchemaMetadataManagerRef,
433        file_ref_manager: FileReferenceManagerRef,
434        plugins: Plugins,
435    ) -> Result<Vec<RegionEngineRef>> {
436        let mut metric_engine_config = metric_engine::config::EngineConfig::default();
437        let mut mito_engine_config = MitoConfig::default();
438        let mut file_engine_config = file_engine::config::EngineConfig::default();
439
440        for engine in &self.opts.region_engine {
441            match engine {
442                RegionEngineConfig::Mito(config) => {
443                    mito_engine_config = config.clone();
444                }
445                RegionEngineConfig::File(config) => {
446                    file_engine_config = config.clone();
447                }
448                RegionEngineConfig::Metric(metric_config) => {
449                    metric_engine_config = metric_config.clone();
450                }
451            }
452        }
453
454        // Build a fetcher to backfill partition_expr on open.
455        let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
456        let mito_engine = self
457            .build_mito_engine(
458                object_store_manager.clone(),
459                mito_engine_config,
460                schema_metadata_manager.clone(),
461                file_ref_manager.clone(),
462                fetcher.clone(),
463                plugins.clone(),
464            )
465            .await?;
466
467        let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
468            .context(BuildMetricEngineSnafu)?;
469
470        let file_engine = FileRegionEngine::new(
471            file_engine_config,
472            object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
473        );
474
475        Ok(vec![
476            Arc::new(mito_engine) as _,
477            Arc::new(metric_engine) as _,
478            Arc::new(file_engine) as _,
479        ])
480    }
481
482    /// Builds [MitoEngine] according to options.
483    async fn build_mito_engine(
484        &mut self,
485        object_store_manager: ObjectStoreManagerRef,
486        mut config: MitoConfig,
487        schema_metadata_manager: SchemaMetadataManagerRef,
488        file_ref_manager: FileReferenceManagerRef,
489        partition_expr_fetcher: PartitionExprFetcherRef,
490        plugins: Plugins,
491    ) -> Result<MitoEngine> {
492        let opts = &self.opts;
493        if opts.storage.is_object_storage() {
494            // Enable the write cache when setting object storage
495            config.enable_write_cache = true;
496            info!("Configured 'enable_write_cache=true' for mito engine.");
497        }
498
499        let mito_engine = match &opts.wal {
500            DatanodeWalConfig::RaftEngine(raft_engine_config) => {
501                let log_store =
502                    Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
503                        .await?;
504
505                let builder = MitoEngineBuilder::new(
506                    &opts.storage.data_home,
507                    config,
508                    log_store,
509                    object_store_manager,
510                    schema_metadata_manager,
511                    file_ref_manager,
512                    partition_expr_fetcher.clone(),
513                    plugins,
514                    opts.max_concurrent_queries,
515                );
516
517                #[cfg(feature = "enterprise")]
518                let builder = builder.with_extension_range_provider_factory(
519                    self.extension_range_provider_factory.take(),
520                );
521
522                builder.try_build().await.context(BuildMitoEngineSnafu)?
523            }
524            DatanodeWalConfig::Kafka(kafka_config) => {
525                if kafka_config.create_index && opts.node_id.is_none() {
526                    warn!("The WAL index creation only available in distributed mode.")
527                }
528                let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
529                {
530                    let operator = new_object_store_without_cache(
531                        &opts.storage.store,
532                        &opts.storage.data_home,
533                    )
534                    .await?;
535                    let path = default_index_file(opts.node_id.unwrap());
536                    Some(Self::build_global_index_collector(
537                        kafka_config.dump_index_interval,
538                        operator,
539                        path,
540                    ))
541                } else {
542                    None
543                };
544
545                let log_store =
546                    Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
547                self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
548                let builder = MitoEngineBuilder::new(
549                    &opts.storage.data_home,
550                    config,
551                    log_store,
552                    object_store_manager,
553                    schema_metadata_manager,
554                    file_ref_manager,
555                    partition_expr_fetcher,
556                    plugins,
557                    opts.max_concurrent_queries,
558                );
559
560                #[cfg(feature = "enterprise")]
561                let builder = builder.with_extension_range_provider_factory(
562                    self.extension_range_provider_factory.take(),
563                );
564
565                builder.try_build().await.context(BuildMitoEngineSnafu)?
566            }
567            DatanodeWalConfig::Noop => {
568                let log_store = Arc::new(NoopLogStore);
569
570                let builder = MitoEngineBuilder::new(
571                    &opts.storage.data_home,
572                    config,
573                    log_store,
574                    object_store_manager,
575                    schema_metadata_manager,
576                    file_ref_manager,
577                    partition_expr_fetcher.clone(),
578                    plugins,
579                    opts.max_concurrent_queries,
580                );
581
582                #[cfg(feature = "enterprise")]
583                let builder = builder.with_extension_range_provider_factory(
584                    self.extension_range_provider_factory.take(),
585                );
586
587                builder.try_build().await.context(BuildMitoEngineSnafu)?
588            }
589        };
590        Ok(mito_engine)
591    }
592
593    /// Builds [RaftEngineLogStore].
594    async fn build_raft_engine_log_store(
595        data_home: &str,
596        config: &RaftEngineConfig,
597    ) -> Result<Arc<RaftEngineLogStore>> {
598        let data_home = normalize_dir(data_home);
599        let wal_dir = match &config.dir {
600            Some(dir) => dir.clone(),
601            None => format!("{}{WAL_DIR}", data_home),
602        };
603
604        // create WAL directory
605        fs::create_dir_all(Path::new(&wal_dir))
606            .await
607            .context(CreateDirSnafu { dir: &wal_dir })?;
608        info!(
609            "Creating raft-engine logstore with config: {:?} and storage path: {}",
610            config, &wal_dir
611        );
612        let logstore = RaftEngineLogStore::try_new(wal_dir, config)
613            .await
614            .map_err(Box::new)
615            .context(OpenLogStoreSnafu)?;
616
617        Ok(Arc::new(logstore))
618    }
619
620    /// Builds [`KafkaLogStore`].
621    async fn build_kafka_log_store(
622        config: &DatanodeKafkaConfig,
623        global_index_collector: Option<GlobalIndexCollector>,
624    ) -> Result<Arc<KafkaLogStore>> {
625        KafkaLogStore::try_new(config, global_index_collector)
626            .await
627            .map_err(Box::new)
628            .context(OpenLogStoreSnafu)
629            .map(Arc::new)
630    }
631
632    /// Builds [`GlobalIndexCollector`]
633    fn build_global_index_collector(
634        dump_index_interval: Duration,
635        operator: object_store::ObjectStore,
636        path: String,
637    ) -> GlobalIndexCollector {
638        GlobalIndexCollector::new(dump_index_interval, operator, path)
639    }
640}
641
642/// Open all regions belong to this datanode.
643async fn open_all_regions(
644    region_server: RegionServer,
645    region_open_requests: RegionOpenRequests,
646    open_with_writable: bool,
647    init_regions_parallelism: usize,
648    ignore_nonexistent_region: bool,
649) -> Result<()> {
650    let RegionOpenRequests {
651        leader_regions,
652        #[cfg(feature = "enterprise")]
653        follower_regions,
654    } = region_open_requests;
655
656    let leader_region_num = leader_regions.len();
657    info!("going to open {} region(s)", leader_region_num);
658    let now = Instant::now();
659    let open_regions = region_server
660        .handle_batch_open_requests(
661            init_regions_parallelism,
662            leader_regions,
663            ignore_nonexistent_region,
664        )
665        .await?;
666    info!(
667        "Opened {} regions in {:?}",
668        open_regions.len(),
669        now.elapsed()
670    );
671    if !ignore_nonexistent_region {
672        ensure!(
673            open_regions.len() == leader_region_num,
674            error::UnexpectedSnafu {
675                violated: format!(
676                    "Expected to open {} of regions, only {} of regions has opened",
677                    leader_region_num,
678                    open_regions.len()
679                )
680            }
681        );
682    } else if open_regions.len() != leader_region_num {
683        warn!(
684            "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
685            leader_region_num,
686            open_regions.len()
687        );
688    }
689
690    for region_id in open_regions {
691        if open_with_writable {
692            let res = region_server.set_region_role(region_id, RegionRole::Leader);
693            match res {
694                Ok(_) => {
695                    // Finalize leadership: persist backfilled metadata.
696                    if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
697                        .set_region_role_state_gracefully(
698                            region_id,
699                            SettableRegionRoleState::Leader,
700                        )
701                        .await?
702                    {
703                        error!(err; "failed to convert region {region_id} to leader");
704                    }
705                }
706                Err(e) => {
707                    error!(e; "failed to convert region {region_id} to leader");
708                }
709            }
710        }
711    }
712
713    #[cfg(feature = "enterprise")]
714    if !follower_regions.is_empty() {
715        use tokio::time::Instant;
716
717        let follower_region_num = follower_regions.len();
718        info!("going to open {} follower region(s)", follower_region_num);
719
720        let now = Instant::now();
721        let open_regions = region_server
722            .handle_batch_open_requests(
723                init_regions_parallelism,
724                follower_regions,
725                ignore_nonexistent_region,
726            )
727            .await?;
728        info!(
729            "Opened {} follower regions in {:?}",
730            open_regions.len(),
731            now.elapsed()
732        );
733
734        if !ignore_nonexistent_region {
735            ensure!(
736                open_regions.len() == follower_region_num,
737                error::UnexpectedSnafu {
738                    violated: format!(
739                        "Expected to open {} of follower regions, only {} of regions has opened",
740                        follower_region_num,
741                        open_regions.len()
742                    )
743                }
744            );
745        } else if open_regions.len() != follower_region_num {
746            warn!(
747                "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
748                follower_region_num,
749                open_regions.len()
750            );
751        }
752    }
753
754    info!("all regions are opened");
755
756    Ok(())
757}
758
759#[cfg(test)]
760mod tests {
761    use std::assert_matches::assert_matches;
762    use std::collections::{BTreeMap, HashMap};
763    use std::sync::Arc;
764
765    use cache::build_datanode_cache_registry;
766    use common_base::Plugins;
767    use common_meta::cache::LayeredCacheRegistryBuilder;
768    use common_meta::key::RegionRoleSet;
769    use common_meta::key::datanode_table::DatanodeTableManager;
770    use common_meta::kv_backend::KvBackendRef;
771    use common_meta::kv_backend::memory::MemoryKvBackend;
772    use mito2::engine::MITO_ENGINE_NAME;
773    use store_api::region_request::RegionRequest;
774    use store_api::storage::RegionId;
775
776    use crate::config::DatanodeOptions;
777    use crate::datanode::DatanodeBuilder;
778    use crate::tests::{MockRegionEngine, mock_region_server};
779
780    async fn setup_table_datanode(kv: &KvBackendRef) {
781        let mgr = DatanodeTableManager::new(kv.clone());
782        let txn = mgr
783            .build_create_txn(
784                1028,
785                MITO_ENGINE_NAME,
786                "foo/bar/weny",
787                HashMap::from([("foo".to_string(), "bar".to_string())]),
788                HashMap::default(),
789                BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
790            )
791            .unwrap();
792
793        let r = kv.txn(txn).await.unwrap();
794        assert!(r.succeeded);
795    }
796
797    #[tokio::test]
798    async fn test_initialize_region_server() {
799        common_telemetry::init_default_ut_logging();
800        let mut mock_region_server = mock_region_server();
801        let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
802
803        mock_region_server.register_engine(mock_region.clone());
804
805        let kv_backend = Arc::new(MemoryKvBackend::new());
806        let layered_cache_registry = Arc::new(
807            LayeredCacheRegistryBuilder::default()
808                .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
809                .build(),
810        );
811
812        let mut builder = DatanodeBuilder::new(
813            DatanodeOptions {
814                node_id: Some(0),
815                ..Default::default()
816            },
817            Plugins::default(),
818            kv_backend.clone(),
819        );
820        builder.with_cache_registry(layered_cache_registry);
821        setup_table_datanode(&(kv_backend as _)).await;
822
823        builder
824            .initialize_region_server(&mock_region_server, false)
825            .await
826            .unwrap();
827
828        for i in 0..3 {
829            let (region_id, req) = mock_region_handler.recv().await.unwrap();
830            assert_eq!(region_id, RegionId::new(1028, i));
831            if let RegionRequest::Open(req) = req {
832                assert_eq!(
833                    req.options,
834                    HashMap::from([("foo".to_string(), "bar".to_string())])
835                )
836            } else {
837                unreachable!()
838            }
839        }
840
841        assert_matches!(
842            mock_region_handler.try_recv(),
843            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
844        );
845    }
846}