datanode/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode implementation.
16
17use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::datanode::TopicStatsReporter;
26use common_meta::key::runtime_switch::RuntimeSwitchManager;
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29pub use common_procedure::options::ProcedureConfig;
30use common_telemetry::{error, info, warn};
31use common_wal::config::DatanodeWalConfig;
32use common_wal::config::kafka::DatanodeKafkaConfig;
33use common_wal::config::raft_engine::RaftEngineConfig;
34use file_engine::engine::FileRegionEngine;
35use log_store::kafka::log_store::KafkaLogStore;
36use log_store::kafka::{GlobalIndexCollector, default_index_file};
37use log_store::raft_engine::log_store::RaftEngineLogStore;
38use meta_client::MetaClientRef;
39use metric_engine::engine::MetricEngine;
40use mito2::config::MitoConfig;
41use mito2::engine::{MitoEngine, MitoEngineBuilder};
42use mito2::region::opener::PartitionExprFetcherRef;
43use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
44use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
45use object_store::util::normalize_dir;
46use query::QueryEngineFactory;
47use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
48use servers::export_metrics::ExportMetricsTask;
49use servers::server::ServerHandlers;
50use snafu::{OptionExt, ResultExt, ensure};
51use store_api::path_utils::WAL_DIR;
52use store_api::region_engine::{
53    RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
54};
55use tokio::fs;
56use tokio::sync::Notify;
57
58use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
59use crate::error::{
60    self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
61    MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
62    ShutdownServerSnafu, StartServerSnafu,
63};
64use crate::event_listener::{
65    NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
66    new_region_server_event_channel,
67};
68use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
69use crate::heartbeat::HeartbeatTask;
70use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
71use crate::region_server::{DummyTableProviderFactory, RegionServer};
72use crate::store::{self, new_object_store_without_cache};
73use crate::utils::{RegionOpenRequests, build_region_open_requests};
74
75/// Datanode service.
76pub struct Datanode {
77    services: ServerHandlers,
78    heartbeat_task: Option<HeartbeatTask>,
79    region_event_receiver: Option<RegionServerEventReceiver>,
80    region_server: RegionServer,
81    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
82    leases_notifier: Option<Arc<Notify>>,
83    plugins: Plugins,
84    export_metrics_task: Option<ExportMetricsTask>,
85}
86
87impl Datanode {
88    pub async fn start(&mut self) -> Result<()> {
89        info!("Starting datanode instance...");
90
91        self.start_heartbeat().await?;
92        self.wait_coordinated().await;
93
94        self.start_telemetry();
95
96        if let Some(t) = self.export_metrics_task.as_ref() {
97            t.start(None).context(StartServerSnafu)?
98        }
99
100        self.services.start_all().await.context(StartServerSnafu)
101    }
102
103    pub fn server_handlers(&self) -> &ServerHandlers {
104        &self.services
105    }
106
107    pub fn start_telemetry(&self) {
108        if let Err(e) = self.greptimedb_telemetry_task.start() {
109            warn!(e; "Failed to start telemetry task!");
110        }
111    }
112
113    pub async fn start_heartbeat(&mut self) -> Result<()> {
114        if let Some(task) = &self.heartbeat_task {
115            // Safety: The event_receiver must exist.
116            let receiver = self.region_event_receiver.take().unwrap();
117
118            task.start(receiver, self.leases_notifier.clone()).await?;
119        }
120        Ok(())
121    }
122
123    /// If `leases_notifier` exists, it waits until leases have been obtained in all regions.
124    pub async fn wait_coordinated(&mut self) {
125        if let Some(notifier) = self.leases_notifier.take() {
126            notifier.notified().await;
127        }
128    }
129
130    pub fn setup_services(&mut self, services: ServerHandlers) {
131        self.services = services;
132    }
133
134    pub async fn shutdown(&mut self) -> Result<()> {
135        self.services
136            .shutdown_all()
137            .await
138            .context(ShutdownServerSnafu)?;
139
140        let _ = self.greptimedb_telemetry_task.stop().await;
141        if let Some(heartbeat_task) = &self.heartbeat_task {
142            heartbeat_task
143                .close()
144                .map_err(BoxedError::new)
145                .context(ShutdownInstanceSnafu)?;
146        }
147        self.region_server.stop().await?;
148        Ok(())
149    }
150
151    pub fn region_server(&self) -> RegionServer {
152        self.region_server.clone()
153    }
154
155    pub fn plugins(&self) -> Plugins {
156        self.plugins.clone()
157    }
158}
159
160pub struct DatanodeBuilder {
161    opts: DatanodeOptions,
162    table_provider_factory: Option<TableProviderFactoryRef>,
163    plugins: Plugins,
164    meta_client: Option<MetaClientRef>,
165    kv_backend: KvBackendRef,
166    cache_registry: Option<Arc<LayeredCacheRegistry>>,
167    topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
168    #[cfg(feature = "enterprise")]
169    extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
170}
171
172impl DatanodeBuilder {
173    pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
174        Self {
175            opts,
176            table_provider_factory: None,
177            plugins,
178            meta_client: None,
179            kv_backend,
180            cache_registry: None,
181            #[cfg(feature = "enterprise")]
182            extension_range_provider_factory: None,
183            topic_stats_reporter: None,
184        }
185    }
186
187    pub fn options(&self) -> &DatanodeOptions {
188        &self.opts
189    }
190
191    pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
192        self.meta_client = Some(client);
193        self
194    }
195
196    pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
197        self.cache_registry = Some(registry);
198        self
199    }
200
201    pub fn kv_backend(&self) -> &KvBackendRef {
202        &self.kv_backend
203    }
204
205    pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
206        self.table_provider_factory = Some(factory);
207        self
208    }
209
210    #[cfg(feature = "enterprise")]
211    pub fn with_extension_range_provider(
212        &mut self,
213        extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
214    ) -> &mut Self {
215        self.extension_range_provider_factory = Some(extension_range_provider_factory);
216        self
217    }
218
219    pub async fn build(mut self) -> Result<Datanode> {
220        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
221
222        let meta_client = self.meta_client.take();
223
224        // If metasrv client is provided, we will use it to control the region server.
225        // Otherwise the region server is self-controlled, meaning no heartbeat and immediately
226        // writable upon open.
227        let controlled_by_metasrv = meta_client.is_some();
228
229        // build and initialize region server
230        let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
231            let (tx, rx) = new_region_server_event_channel();
232            (Box::new(tx) as _, Some(rx))
233        } else {
234            (Box::new(NoopRegionServerEventListener) as _, None)
235        };
236
237        let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
238        let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
239        let table_id_schema_cache: TableSchemaCacheRef =
240            cache_registry.get().context(MissingCacheSnafu)?;
241
242        let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
243            table_id_schema_cache,
244            schema_cache,
245        ));
246        let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id)));
247        let region_server = self
248            .new_region_server(
249                schema_metadata_manager,
250                region_event_listener,
251                file_ref_manager,
252            )
253            .await?;
254
255        // TODO(weny): Considering introducing a readonly kv_backend trait.
256        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
257        let is_recovery_mode = runtime_switch_manager
258            .recovery_mode()
259            .await
260            .context(GetMetadataSnafu)?;
261
262        let region_open_requests =
263            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
264        let open_all_regions = open_all_regions(
265            region_server.clone(),
266            region_open_requests,
267            !controlled_by_metasrv,
268            self.opts.init_regions_parallelism,
269            // Ignore nonexistent regions in recovery mode.
270            is_recovery_mode,
271        );
272
273        if self.opts.init_regions_in_background {
274            // Opens regions in background.
275            common_runtime::spawn_global(async move {
276                if let Err(err) = open_all_regions.await {
277                    error!(err; "Failed to open regions during the startup.");
278                }
279            });
280        } else {
281            open_all_regions.await?;
282        }
283
284        let heartbeat_task = if let Some(meta_client) = meta_client {
285            Some(
286                HeartbeatTask::try_new(
287                    &self.opts,
288                    region_server.clone(),
289                    meta_client,
290                    cache_registry,
291                    self.plugins.clone(),
292                )
293                .await?,
294            )
295        } else {
296            None
297        };
298
299        let is_standalone = heartbeat_task.is_none();
300        let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
301            Some(self.opts.storage.data_home.clone()),
302            is_standalone && self.opts.enable_telemetry,
303        )
304        .await;
305
306        let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
307            Some(Arc::new(Notify::new()))
308        } else {
309            None
310        };
311
312        let export_metrics_task =
313            ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
314                .context(StartServerSnafu)?;
315
316        Ok(Datanode {
317            services: ServerHandlers::default(),
318            heartbeat_task,
319            region_server,
320            greptimedb_telemetry_task,
321            region_event_receiver,
322            leases_notifier,
323            plugins: self.plugins.clone(),
324            export_metrics_task,
325        })
326    }
327
328    /// Builds [ObjectStoreManager] from [StorageConfig].
329    pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
330        let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
331        let default_name = cfg.store.config_name();
332        let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
333        for store in &cfg.providers {
334            object_store_manager.add(
335                store.config_name(),
336                store::new_object_store(store.clone(), &cfg.data_home).await?,
337            );
338        }
339        Ok(Arc::new(object_store_manager))
340    }
341
342    #[cfg(test)]
343    /// Open all regions belong to this datanode.
344    async fn initialize_region_server(
345        &self,
346        region_server: &RegionServer,
347        open_with_writable: bool,
348    ) -> Result<()> {
349        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
350
351        // TODO(weny): Considering introducing a readonly kv_backend trait.
352        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
353        let is_recovery_mode = runtime_switch_manager
354            .recovery_mode()
355            .await
356            .context(GetMetadataSnafu)?;
357        let region_open_requests =
358            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
359
360        open_all_regions(
361            region_server.clone(),
362            region_open_requests,
363            open_with_writable,
364            self.opts.init_regions_parallelism,
365            is_recovery_mode,
366        )
367        .await
368    }
369
370    async fn new_region_server(
371        &mut self,
372        schema_metadata_manager: SchemaMetadataManagerRef,
373        event_listener: RegionServerEventListenerRef,
374        file_ref_manager: FileReferenceManagerRef,
375    ) -> Result<RegionServer> {
376        let opts: &DatanodeOptions = &self.opts;
377
378        let query_engine_factory = QueryEngineFactory::new_with_plugins(
379            // query engine in datanode only executes plan with resolved table source.
380            DummyCatalogManager::arc(),
381            None,
382            None,
383            None,
384            None,
385            None,
386            false,
387            self.plugins.clone(),
388            opts.query.clone(),
389        );
390        let query_engine = query_engine_factory.query_engine();
391
392        let table_provider_factory = self
393            .table_provider_factory
394            .clone()
395            .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
396
397        let mut region_server = RegionServer::with_table_provider(
398            query_engine,
399            common_runtime::global_runtime(),
400            event_listener,
401            table_provider_factory,
402            opts.max_concurrent_queries,
403            //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
404            Duration::from_millis(100),
405            opts.grpc.flight_compression,
406        );
407
408        let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
409        let engines = self
410            .build_store_engines(
411                object_store_manager,
412                schema_metadata_manager,
413                file_ref_manager,
414                self.plugins.clone(),
415            )
416            .await?;
417        for engine in engines {
418            region_server.register_engine(engine);
419        }
420        if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
421            region_server.set_topic_stats_reporter(topic_stats_reporter);
422        }
423
424        Ok(region_server)
425    }
426
427    // internal utils
428
429    /// Builds [RegionEngineRef] from `store_engine` section in `opts`
430    async fn build_store_engines(
431        &mut self,
432        object_store_manager: ObjectStoreManagerRef,
433        schema_metadata_manager: SchemaMetadataManagerRef,
434        file_ref_manager: FileReferenceManagerRef,
435        plugins: Plugins,
436    ) -> Result<Vec<RegionEngineRef>> {
437        let mut metric_engine_config = metric_engine::config::EngineConfig::default();
438        let mut mito_engine_config = MitoConfig::default();
439        let mut file_engine_config = file_engine::config::EngineConfig::default();
440
441        for engine in &self.opts.region_engine {
442            match engine {
443                RegionEngineConfig::Mito(config) => {
444                    mito_engine_config = config.clone();
445                }
446                RegionEngineConfig::File(config) => {
447                    file_engine_config = config.clone();
448                }
449                RegionEngineConfig::Metric(metric_config) => {
450                    metric_engine_config = metric_config.clone();
451                }
452            }
453        }
454
455        // Build a fetcher to backfill partition_expr on open.
456        let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
457        let mito_engine = self
458            .build_mito_engine(
459                object_store_manager.clone(),
460                mito_engine_config,
461                schema_metadata_manager.clone(),
462                file_ref_manager.clone(),
463                fetcher.clone(),
464                plugins.clone(),
465            )
466            .await?;
467
468        let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
469            .context(BuildMetricEngineSnafu)?;
470
471        let file_engine = FileRegionEngine::new(
472            file_engine_config,
473            object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
474        );
475
476        Ok(vec![
477            Arc::new(mito_engine) as _,
478            Arc::new(metric_engine) as _,
479            Arc::new(file_engine) as _,
480        ])
481    }
482
483    /// Builds [MitoEngine] according to options.
484    async fn build_mito_engine(
485        &mut self,
486        object_store_manager: ObjectStoreManagerRef,
487        mut config: MitoConfig,
488        schema_metadata_manager: SchemaMetadataManagerRef,
489        file_ref_manager: FileReferenceManagerRef,
490        partition_expr_fetcher: PartitionExprFetcherRef,
491        plugins: Plugins,
492    ) -> Result<MitoEngine> {
493        let opts = &self.opts;
494        if opts.storage.is_object_storage() {
495            // Enable the write cache when setting object storage
496            config.enable_write_cache = true;
497            info!("Configured 'enable_write_cache=true' for mito engine.");
498        }
499
500        let mito_engine = match &opts.wal {
501            DatanodeWalConfig::RaftEngine(raft_engine_config) => {
502                let log_store =
503                    Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
504                        .await?;
505
506                let builder = MitoEngineBuilder::new(
507                    &opts.storage.data_home,
508                    config,
509                    log_store,
510                    object_store_manager,
511                    schema_metadata_manager,
512                    file_ref_manager,
513                    partition_expr_fetcher.clone(),
514                    plugins,
515                );
516
517                #[cfg(feature = "enterprise")]
518                let builder = builder.with_extension_range_provider_factory(
519                    self.extension_range_provider_factory.take(),
520                );
521
522                builder.try_build().await.context(BuildMitoEngineSnafu)?
523            }
524            DatanodeWalConfig::Kafka(kafka_config) => {
525                if kafka_config.create_index && opts.node_id.is_none() {
526                    warn!("The WAL index creation only available in distributed mode.")
527                }
528                let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
529                {
530                    let operator = new_object_store_without_cache(
531                        &opts.storage.store,
532                        &opts.storage.data_home,
533                    )
534                    .await?;
535                    let path = default_index_file(opts.node_id.unwrap());
536                    Some(Self::build_global_index_collector(
537                        kafka_config.dump_index_interval,
538                        operator,
539                        path,
540                    ))
541                } else {
542                    None
543                };
544
545                let log_store =
546                    Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
547                self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
548                let builder = MitoEngineBuilder::new(
549                    &opts.storage.data_home,
550                    config,
551                    log_store,
552                    object_store_manager,
553                    schema_metadata_manager,
554                    file_ref_manager,
555                    partition_expr_fetcher,
556                    plugins,
557                );
558
559                #[cfg(feature = "enterprise")]
560                let builder = builder.with_extension_range_provider_factory(
561                    self.extension_range_provider_factory.take(),
562                );
563
564                builder.try_build().await.context(BuildMitoEngineSnafu)?
565            }
566        };
567        Ok(mito_engine)
568    }
569
570    /// Builds [RaftEngineLogStore].
571    async fn build_raft_engine_log_store(
572        data_home: &str,
573        config: &RaftEngineConfig,
574    ) -> Result<Arc<RaftEngineLogStore>> {
575        let data_home = normalize_dir(data_home);
576        let wal_dir = match &config.dir {
577            Some(dir) => dir.clone(),
578            None => format!("{}{WAL_DIR}", data_home),
579        };
580
581        // create WAL directory
582        fs::create_dir_all(Path::new(&wal_dir))
583            .await
584            .context(CreateDirSnafu { dir: &wal_dir })?;
585        info!(
586            "Creating raft-engine logstore with config: {:?} and storage path: {}",
587            config, &wal_dir
588        );
589        let logstore = RaftEngineLogStore::try_new(wal_dir, config)
590            .await
591            .map_err(Box::new)
592            .context(OpenLogStoreSnafu)?;
593
594        Ok(Arc::new(logstore))
595    }
596
597    /// Builds [`KafkaLogStore`].
598    async fn build_kafka_log_store(
599        config: &DatanodeKafkaConfig,
600        global_index_collector: Option<GlobalIndexCollector>,
601    ) -> Result<Arc<KafkaLogStore>> {
602        KafkaLogStore::try_new(config, global_index_collector)
603            .await
604            .map_err(Box::new)
605            .context(OpenLogStoreSnafu)
606            .map(Arc::new)
607    }
608
609    /// Builds [`GlobalIndexCollector`]
610    fn build_global_index_collector(
611        dump_index_interval: Duration,
612        operator: object_store::ObjectStore,
613        path: String,
614    ) -> GlobalIndexCollector {
615        GlobalIndexCollector::new(dump_index_interval, operator, path)
616    }
617}
618
619/// Open all regions belong to this datanode.
620async fn open_all_regions(
621    region_server: RegionServer,
622    region_open_requests: RegionOpenRequests,
623    open_with_writable: bool,
624    init_regions_parallelism: usize,
625    ignore_nonexistent_region: bool,
626) -> Result<()> {
627    let RegionOpenRequests {
628        leader_regions,
629        #[cfg(feature = "enterprise")]
630        follower_regions,
631    } = region_open_requests;
632
633    let leader_region_num = leader_regions.len();
634    info!("going to open {} region(s)", leader_region_num);
635    let now = Instant::now();
636    let open_regions = region_server
637        .handle_batch_open_requests(
638            init_regions_parallelism,
639            leader_regions,
640            ignore_nonexistent_region,
641        )
642        .await?;
643    info!(
644        "Opened {} regions in {:?}",
645        open_regions.len(),
646        now.elapsed()
647    );
648    if !ignore_nonexistent_region {
649        ensure!(
650            open_regions.len() == leader_region_num,
651            error::UnexpectedSnafu {
652                violated: format!(
653                    "Expected to open {} of regions, only {} of regions has opened",
654                    leader_region_num,
655                    open_regions.len()
656                )
657            }
658        );
659    } else if open_regions.len() != leader_region_num {
660        warn!(
661            "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
662            leader_region_num,
663            open_regions.len()
664        );
665    }
666
667    for region_id in open_regions {
668        if open_with_writable {
669            let res = region_server.set_region_role(region_id, RegionRole::Leader);
670            match res {
671                Ok(_) => {
672                    // Finalize leadership: persist backfilled metadata.
673                    if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
674                        .set_region_role_state_gracefully(
675                            region_id,
676                            SettableRegionRoleState::Leader,
677                        )
678                        .await?
679                    {
680                        error!(err; "failed to convert region {region_id} to leader");
681                    }
682                }
683                Err(e) => {
684                    error!(e; "failed to convert region {region_id} to leader");
685                }
686            }
687        }
688    }
689
690    #[cfg(feature = "enterprise")]
691    if !follower_regions.is_empty() {
692        use tokio::time::Instant;
693
694        let follower_region_num = follower_regions.len();
695        info!("going to open {} follower region(s)", follower_region_num);
696
697        let now = Instant::now();
698        let open_regions = region_server
699            .handle_batch_open_requests(
700                init_regions_parallelism,
701                follower_regions,
702                ignore_nonexistent_region,
703            )
704            .await?;
705        info!(
706            "Opened {} follower regions in {:?}",
707            open_regions.len(),
708            now.elapsed()
709        );
710
711        if !ignore_nonexistent_region {
712            ensure!(
713                open_regions.len() == follower_region_num,
714                error::UnexpectedSnafu {
715                    violated: format!(
716                        "Expected to open {} of follower regions, only {} of regions has opened",
717                        follower_region_num,
718                        open_regions.len()
719                    )
720                }
721            );
722        } else if open_regions.len() != follower_region_num {
723            warn!(
724                "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
725                follower_region_num,
726                open_regions.len()
727            );
728        }
729    }
730
731    info!("all regions are opened");
732
733    Ok(())
734}
735
736#[cfg(test)]
737mod tests {
738    use std::assert_matches::assert_matches;
739    use std::collections::{BTreeMap, HashMap};
740    use std::sync::Arc;
741
742    use cache::build_datanode_cache_registry;
743    use common_base::Plugins;
744    use common_meta::cache::LayeredCacheRegistryBuilder;
745    use common_meta::key::RegionRoleSet;
746    use common_meta::key::datanode_table::DatanodeTableManager;
747    use common_meta::kv_backend::KvBackendRef;
748    use common_meta::kv_backend::memory::MemoryKvBackend;
749    use mito2::engine::MITO_ENGINE_NAME;
750    use store_api::region_request::RegionRequest;
751    use store_api::storage::RegionId;
752
753    use crate::config::DatanodeOptions;
754    use crate::datanode::DatanodeBuilder;
755    use crate::tests::{MockRegionEngine, mock_region_server};
756
757    async fn setup_table_datanode(kv: &KvBackendRef) {
758        let mgr = DatanodeTableManager::new(kv.clone());
759        let txn = mgr
760            .build_create_txn(
761                1028,
762                MITO_ENGINE_NAME,
763                "foo/bar/weny",
764                HashMap::from([("foo".to_string(), "bar".to_string())]),
765                HashMap::default(),
766                BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
767            )
768            .unwrap();
769
770        let r = kv.txn(txn).await.unwrap();
771        assert!(r.succeeded);
772    }
773
774    #[tokio::test]
775    async fn test_initialize_region_server() {
776        common_telemetry::init_default_ut_logging();
777        let mut mock_region_server = mock_region_server();
778        let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
779
780        mock_region_server.register_engine(mock_region.clone());
781
782        let kv_backend = Arc::new(MemoryKvBackend::new());
783        let layered_cache_registry = Arc::new(
784            LayeredCacheRegistryBuilder::default()
785                .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
786                .build(),
787        );
788
789        let mut builder = DatanodeBuilder::new(
790            DatanodeOptions {
791                node_id: Some(0),
792                ..Default::default()
793            },
794            Plugins::default(),
795            kv_backend.clone(),
796        );
797        builder.with_cache_registry(layered_cache_registry);
798        setup_table_datanode(&(kv_backend as _)).await;
799
800        builder
801            .initialize_region_server(&mock_region_server, false)
802            .await
803            .unwrap();
804
805        for i in 0..3 {
806            let (region_id, req) = mock_region_handler.recv().await.unwrap();
807            assert_eq!(region_id, RegionId::new(1028, i));
808            if let RegionRequest::Open(req) = req {
809                assert_eq!(
810                    req.options,
811                    HashMap::from([("foo".to_string(), "bar".to_string())])
812                )
813            } else {
814                unreachable!()
815            }
816        }
817
818        assert_matches!(
819            mock_region_handler.try_recv(),
820            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
821        );
822    }
823}