datanode/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode implementation.
16
17use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::cache_invalidator::CacheInvalidatorRef;
26use common_meta::datanode::TopicStatsReporter;
27use common_meta::key::runtime_switch::RuntimeSwitchManager;
28use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
29use common_meta::kv_backend::KvBackendRef;
30pub use common_procedure::options::ProcedureConfig;
31use common_query::prelude::set_default_prefix;
32use common_stat::ResourceStatImpl;
33use common_telemetry::{error, info, warn};
34use common_wal::config::DatanodeWalConfig;
35use common_wal::config::kafka::DatanodeKafkaConfig;
36use common_wal::config::raft_engine::RaftEngineConfig;
37use file_engine::engine::FileRegionEngine;
38use log_store::kafka::log_store::KafkaLogStore;
39use log_store::kafka::{GlobalIndexCollector, default_index_file};
40use log_store::noop::log_store::NoopLogStore;
41use log_store::raft_engine::log_store::RaftEngineLogStore;
42use meta_client::MetaClientRef;
43use metric_engine::engine::MetricEngine;
44use mito2::config::MitoConfig;
45use mito2::engine::{MitoEngine, MitoEngineBuilder};
46use mito2::region::opener::PartitionExprFetcherRef;
47use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
48use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
49use object_store::util::normalize_dir;
50use query::QueryEngineFactory;
51use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
52use servers::server::ServerHandlers;
53use snafu::{OptionExt, ResultExt, ensure};
54use store_api::path_utils::WAL_DIR;
55use store_api::region_engine::{
56    RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
57};
58use tokio::fs;
59use tokio::sync::Notify;
60
61use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
62use crate::error::{
63    self, BuildDatanodeSnafu, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu,
64    GetMetadataSnafu, MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
65    ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
66};
67use crate::event_listener::{
68    NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
69    new_region_server_event_channel,
70};
71use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
72use crate::heartbeat::HeartbeatTask;
73use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
74use crate::region_server::{DummyTableProviderFactory, RegionServer};
75use crate::store::{self, new_object_store_without_cache};
76use crate::utils::{RegionOpenRequests, build_region_open_requests};
77
78/// Datanode service.
79pub struct Datanode {
80    services: ServerHandlers,
81    heartbeat_task: Option<HeartbeatTask>,
82    region_event_receiver: Option<RegionServerEventReceiver>,
83    region_server: RegionServer,
84    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
85    leases_notifier: Option<Arc<Notify>>,
86    plugins: Plugins,
87}
88
89impl Datanode {
90    pub async fn start(&mut self) -> Result<()> {
91        info!("Starting datanode instance...");
92
93        self.start_heartbeat().await?;
94        self.wait_coordinated().await;
95
96        self.start_telemetry();
97
98        self.services.start_all().await.context(StartServerSnafu)
99    }
100
101    pub fn server_handlers(&self) -> &ServerHandlers {
102        &self.services
103    }
104
105    pub fn start_telemetry(&self) {
106        if let Err(e) = self.greptimedb_telemetry_task.start() {
107            warn!(e; "Failed to start telemetry task!");
108        }
109    }
110
111    pub async fn start_heartbeat(&mut self) -> Result<()> {
112        if let Some(task) = &self.heartbeat_task {
113            // Safety: The event_receiver must exist.
114            let receiver = self.region_event_receiver.take().unwrap();
115
116            task.start(receiver, self.leases_notifier.clone()).await?;
117        }
118        Ok(())
119    }
120
121    /// If `leases_notifier` exists, it waits until leases have been obtained in all regions.
122    pub async fn wait_coordinated(&mut self) {
123        if let Some(notifier) = self.leases_notifier.take() {
124            notifier.notified().await;
125        }
126    }
127
128    pub fn setup_services(&mut self, services: ServerHandlers) {
129        self.services = services;
130    }
131
132    pub async fn shutdown(&mut self) -> Result<()> {
133        self.services
134            .shutdown_all()
135            .await
136            .context(ShutdownServerSnafu)?;
137
138        let _ = self.greptimedb_telemetry_task.stop().await;
139        if let Some(heartbeat_task) = &self.heartbeat_task {
140            heartbeat_task
141                .close()
142                .map_err(BoxedError::new)
143                .context(ShutdownInstanceSnafu)?;
144        }
145        self.region_server.stop().await?;
146        Ok(())
147    }
148
149    pub fn region_server(&self) -> RegionServer {
150        self.region_server.clone()
151    }
152
153    pub fn plugins(&self) -> Plugins {
154        self.plugins.clone()
155    }
156}
157
158pub struct DatanodeBuilder {
159    opts: DatanodeOptions,
160    table_provider_factory: Option<TableProviderFactoryRef>,
161    plugins: Plugins,
162    meta_client: Option<MetaClientRef>,
163    kv_backend: KvBackendRef,
164    cache_registry: Option<Arc<LayeredCacheRegistry>>,
165    topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
166    #[cfg(feature = "enterprise")]
167    extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
168}
169
170impl DatanodeBuilder {
171    pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
172        Self {
173            opts,
174            table_provider_factory: None,
175            plugins,
176            meta_client: None,
177            kv_backend,
178            cache_registry: None,
179            #[cfg(feature = "enterprise")]
180            extension_range_provider_factory: None,
181            topic_stats_reporter: None,
182        }
183    }
184
185    pub fn options(&self) -> &DatanodeOptions {
186        &self.opts
187    }
188
189    pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
190        self.meta_client = Some(client);
191        self
192    }
193
194    pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
195        self.cache_registry = Some(registry);
196        self
197    }
198
199    pub fn kv_backend(&self) -> &KvBackendRef {
200        &self.kv_backend
201    }
202
203    pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
204        self.table_provider_factory = Some(factory);
205        self
206    }
207
208    #[cfg(feature = "enterprise")]
209    pub fn with_extension_range_provider(
210        &mut self,
211        extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
212    ) -> &mut Self {
213        self.extension_range_provider_factory = Some(extension_range_provider_factory);
214        self
215    }
216
217    pub async fn build(mut self) -> Result<Datanode> {
218        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
219        set_default_prefix(self.opts.default_column_prefix.as_deref())
220            .map_err(BoxedError::new)
221            .context(BuildDatanodeSnafu)?;
222
223        let meta_client = self.meta_client.take();
224
225        // If metasrv client is provided, we will use it to control the region server.
226        // Otherwise the region server is self-controlled, meaning no heartbeat and immediately
227        // writable upon open.
228        let controlled_by_metasrv = meta_client.is_some();
229
230        // build and initialize region server
231        let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
232            let (tx, rx) = new_region_server_event_channel();
233            (Box::new(tx) as _, Some(rx))
234        } else {
235            (Box::new(NoopRegionServerEventListener) as _, None)
236        };
237
238        let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
239        let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
240        let table_id_schema_cache: TableSchemaCacheRef =
241            cache_registry.get().context(MissingCacheSnafu)?;
242
243        let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
244            table_id_schema_cache,
245            schema_cache,
246        ));
247
248        let gc_enabled = self.opts.region_engine.iter().any(|engine| {
249            if let RegionEngineConfig::Mito(config) = engine {
250                config.gc.enable
251            } else {
252                false
253            }
254        });
255
256        let file_ref_manager = Arc::new(FileReferenceManager::with_gc_enabled(
257            Some(node_id),
258            gc_enabled,
259        ));
260        let region_server = self
261            .new_region_server(
262                schema_metadata_manager,
263                region_event_listener,
264                file_ref_manager,
265            )
266            .await?;
267
268        // TODO(weny): Considering introducing a readonly kv_backend trait.
269        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
270        let is_recovery_mode = runtime_switch_manager
271            .recovery_mode()
272            .await
273            .context(GetMetadataSnafu)?;
274
275        let region_open_requests =
276            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
277        let open_all_regions = open_all_regions(
278            region_server.clone(),
279            region_open_requests,
280            !controlled_by_metasrv,
281            self.opts.init_regions_parallelism,
282            // Ignore nonexistent regions in recovery mode.
283            is_recovery_mode,
284        );
285
286        if self.opts.init_regions_in_background {
287            // Opens regions in background.
288            common_runtime::spawn_global(async move {
289                if let Err(err) = open_all_regions.await {
290                    error!(err; "Failed to open regions during the startup.");
291                }
292            });
293        } else {
294            open_all_regions.await?;
295        }
296
297        let heartbeat_task = if let Some(meta_client) = meta_client {
298            let task = self
299                .create_heartbeat_task(&region_server, meta_client, cache_registry)
300                .await?;
301            Some(task)
302        } else {
303            None
304        };
305
306        let is_standalone = heartbeat_task.is_none();
307        let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
308            Some(self.opts.storage.data_home.clone()),
309            is_standalone && self.opts.enable_telemetry,
310        )
311        .await;
312
313        let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
314            Some(Arc::new(Notify::new()))
315        } else {
316            None
317        };
318
319        Ok(Datanode {
320            services: ServerHandlers::default(),
321            heartbeat_task,
322            region_server,
323            greptimedb_telemetry_task,
324            region_event_receiver,
325            leases_notifier,
326            plugins: self.plugins.clone(),
327        })
328    }
329
330    async fn create_heartbeat_task(
331        &self,
332        region_server: &RegionServer,
333        meta_client: MetaClientRef,
334        cache_invalidator: CacheInvalidatorRef,
335    ) -> Result<HeartbeatTask> {
336        let stat = {
337            let mut stat = ResourceStatImpl::default();
338            stat.start_collect_cpu_usage();
339            Arc::new(stat)
340        };
341
342        HeartbeatTask::try_new(
343            &self.opts,
344            region_server.clone(),
345            meta_client,
346            self.kv_backend.clone(),
347            cache_invalidator,
348            self.plugins.clone(),
349            stat,
350        )
351        .await
352    }
353
354    /// Builds [ObjectStoreManager] from [StorageConfig].
355    pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
356        let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
357        let default_name = cfg.store.config_name();
358        let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
359        for store in &cfg.providers {
360            object_store_manager.add(
361                store.config_name(),
362                store::new_object_store(store.clone(), &cfg.data_home).await?,
363            );
364        }
365        Ok(Arc::new(object_store_manager))
366    }
367
368    #[cfg(test)]
369    /// Open all regions belong to this datanode.
370    async fn initialize_region_server(
371        &self,
372        region_server: &RegionServer,
373        open_with_writable: bool,
374    ) -> Result<()> {
375        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
376
377        // TODO(weny): Considering introducing a readonly kv_backend trait.
378        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
379        let is_recovery_mode = runtime_switch_manager
380            .recovery_mode()
381            .await
382            .context(GetMetadataSnafu)?;
383        let region_open_requests =
384            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
385
386        open_all_regions(
387            region_server.clone(),
388            region_open_requests,
389            open_with_writable,
390            self.opts.init_regions_parallelism,
391            is_recovery_mode,
392        )
393        .await
394    }
395
396    async fn new_region_server(
397        &mut self,
398        schema_metadata_manager: SchemaMetadataManagerRef,
399        event_listener: RegionServerEventListenerRef,
400        file_ref_manager: FileReferenceManagerRef,
401    ) -> Result<RegionServer> {
402        let opts: &DatanodeOptions = &self.opts;
403
404        let query_engine_factory = QueryEngineFactory::new_with_plugins(
405            // query engine in datanode only executes plan with resolved table source.
406            DummyCatalogManager::arc(),
407            None,
408            None,
409            None,
410            None,
411            None,
412            false,
413            self.plugins.clone(),
414            opts.query.clone(),
415        );
416        let query_engine = query_engine_factory.query_engine();
417
418        let table_provider_factory = self
419            .table_provider_factory
420            .clone()
421            .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
422
423        let mut region_server = RegionServer::with_table_provider(
424            query_engine,
425            common_runtime::global_runtime(),
426            event_listener,
427            table_provider_factory,
428            opts.max_concurrent_queries,
429            //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
430            Duration::from_millis(100),
431            opts.grpc.flight_compression,
432        );
433
434        let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
435        let engines = self
436            .build_store_engines(
437                object_store_manager,
438                schema_metadata_manager,
439                file_ref_manager,
440                self.plugins.clone(),
441            )
442            .await?;
443        for engine in engines {
444            region_server.register_engine(engine);
445        }
446        if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
447            region_server.set_topic_stats_reporter(topic_stats_reporter);
448        }
449
450        Ok(region_server)
451    }
452
453    // internal utils
454
455    /// Builds [RegionEngineRef] from `store_engine` section in `opts`
456    async fn build_store_engines(
457        &mut self,
458        object_store_manager: ObjectStoreManagerRef,
459        schema_metadata_manager: SchemaMetadataManagerRef,
460        file_ref_manager: FileReferenceManagerRef,
461        plugins: Plugins,
462    ) -> Result<Vec<RegionEngineRef>> {
463        let mut metric_engine_config = metric_engine::config::EngineConfig::default();
464        let mut mito_engine_config = MitoConfig::default();
465        let mut file_engine_config = file_engine::config::EngineConfig::default();
466
467        for engine in &self.opts.region_engine {
468            match engine {
469                RegionEngineConfig::Mito(config) => {
470                    mito_engine_config = config.clone();
471                }
472                RegionEngineConfig::File(config) => {
473                    file_engine_config = config.clone();
474                }
475                RegionEngineConfig::Metric(metric_config) => {
476                    metric_engine_config = metric_config.clone();
477                }
478            }
479        }
480
481        // Build a fetcher to backfill partition_expr on open.
482        let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
483        let mito_engine = self
484            .build_mito_engine(
485                object_store_manager.clone(),
486                mito_engine_config,
487                schema_metadata_manager.clone(),
488                file_ref_manager.clone(),
489                fetcher.clone(),
490                plugins.clone(),
491            )
492            .await?;
493
494        let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
495            .context(BuildMetricEngineSnafu)?;
496
497        let file_engine = FileRegionEngine::new(
498            file_engine_config,
499            object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
500        );
501
502        Ok(vec![
503            Arc::new(mito_engine) as _,
504            Arc::new(metric_engine) as _,
505            Arc::new(file_engine) as _,
506        ])
507    }
508
509    /// Builds [MitoEngine] according to options.
510    async fn build_mito_engine(
511        &mut self,
512        object_store_manager: ObjectStoreManagerRef,
513        mut config: MitoConfig,
514        schema_metadata_manager: SchemaMetadataManagerRef,
515        file_ref_manager: FileReferenceManagerRef,
516        partition_expr_fetcher: PartitionExprFetcherRef,
517        plugins: Plugins,
518    ) -> Result<MitoEngine> {
519        let opts = &self.opts;
520        if opts.storage.is_object_storage() {
521            // Enable the write cache when setting object storage
522            config.enable_write_cache = true;
523            info!("Configured 'enable_write_cache=true' for mito engine.");
524        }
525
526        let mito_engine = match &opts.wal {
527            DatanodeWalConfig::RaftEngine(raft_engine_config) => {
528                let log_store =
529                    Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
530                        .await?;
531
532                let builder = MitoEngineBuilder::new(
533                    &opts.storage.data_home,
534                    config,
535                    log_store,
536                    object_store_manager,
537                    schema_metadata_manager,
538                    file_ref_manager,
539                    partition_expr_fetcher.clone(),
540                    plugins,
541                    opts.max_concurrent_queries,
542                );
543
544                #[cfg(feature = "enterprise")]
545                let builder = builder.with_extension_range_provider_factory(
546                    self.extension_range_provider_factory.take(),
547                );
548
549                builder.try_build().await.context(BuildMitoEngineSnafu)?
550            }
551            DatanodeWalConfig::Kafka(kafka_config) => {
552                if kafka_config.create_index && opts.node_id.is_none() {
553                    warn!("The WAL index creation only available in distributed mode.")
554                }
555                let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
556                {
557                    let operator = new_object_store_without_cache(
558                        &opts.storage.store,
559                        &opts.storage.data_home,
560                    )
561                    .await?;
562                    let path = default_index_file(opts.node_id.unwrap());
563                    Some(Self::build_global_index_collector(
564                        kafka_config.dump_index_interval,
565                        operator,
566                        path,
567                    ))
568                } else {
569                    None
570                };
571
572                let log_store =
573                    Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
574                self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
575                let builder = MitoEngineBuilder::new(
576                    &opts.storage.data_home,
577                    config,
578                    log_store,
579                    object_store_manager,
580                    schema_metadata_manager,
581                    file_ref_manager,
582                    partition_expr_fetcher,
583                    plugins,
584                    opts.max_concurrent_queries,
585                );
586
587                #[cfg(feature = "enterprise")]
588                let builder = builder.with_extension_range_provider_factory(
589                    self.extension_range_provider_factory.take(),
590                );
591
592                builder.try_build().await.context(BuildMitoEngineSnafu)?
593            }
594            DatanodeWalConfig::Noop => {
595                let log_store = Arc::new(NoopLogStore);
596
597                let builder = MitoEngineBuilder::new(
598                    &opts.storage.data_home,
599                    config,
600                    log_store,
601                    object_store_manager,
602                    schema_metadata_manager,
603                    file_ref_manager,
604                    partition_expr_fetcher.clone(),
605                    plugins,
606                    opts.max_concurrent_queries,
607                );
608
609                #[cfg(feature = "enterprise")]
610                let builder = builder.with_extension_range_provider_factory(
611                    self.extension_range_provider_factory.take(),
612                );
613
614                builder.try_build().await.context(BuildMitoEngineSnafu)?
615            }
616        };
617        Ok(mito_engine)
618    }
619
620    /// Builds [RaftEngineLogStore].
621    async fn build_raft_engine_log_store(
622        data_home: &str,
623        config: &RaftEngineConfig,
624    ) -> Result<Arc<RaftEngineLogStore>> {
625        let data_home = normalize_dir(data_home);
626        let wal_dir = match &config.dir {
627            Some(dir) => dir.clone(),
628            None => format!("{}{WAL_DIR}", data_home),
629        };
630
631        // create WAL directory
632        fs::create_dir_all(Path::new(&wal_dir))
633            .await
634            .context(CreateDirSnafu { dir: &wal_dir })?;
635        info!(
636            "Creating raft-engine logstore with config: {:?} and storage path: {}",
637            config, &wal_dir
638        );
639        let logstore = RaftEngineLogStore::try_new(wal_dir, config)
640            .await
641            .map_err(Box::new)
642            .context(OpenLogStoreSnafu)?;
643
644        Ok(Arc::new(logstore))
645    }
646
647    /// Builds [`KafkaLogStore`].
648    async fn build_kafka_log_store(
649        config: &DatanodeKafkaConfig,
650        global_index_collector: Option<GlobalIndexCollector>,
651    ) -> Result<Arc<KafkaLogStore>> {
652        KafkaLogStore::try_new(config, global_index_collector)
653            .await
654            .map_err(Box::new)
655            .context(OpenLogStoreSnafu)
656            .map(Arc::new)
657    }
658
659    /// Builds [`GlobalIndexCollector`]
660    fn build_global_index_collector(
661        dump_index_interval: Duration,
662        operator: object_store::ObjectStore,
663        path: String,
664    ) -> GlobalIndexCollector {
665        GlobalIndexCollector::new(dump_index_interval, operator, path)
666    }
667}
668
669/// Open all regions belong to this datanode.
670async fn open_all_regions(
671    region_server: RegionServer,
672    region_open_requests: RegionOpenRequests,
673    open_with_writable: bool,
674    init_regions_parallelism: usize,
675    ignore_nonexistent_region: bool,
676) -> Result<()> {
677    let RegionOpenRequests {
678        leader_regions,
679        #[cfg(feature = "enterprise")]
680        follower_regions,
681    } = region_open_requests;
682
683    let leader_region_num = leader_regions.len();
684    info!("going to open {} region(s)", leader_region_num);
685    let now = Instant::now();
686    let open_regions = region_server
687        .handle_batch_open_requests(
688            init_regions_parallelism,
689            leader_regions,
690            ignore_nonexistent_region,
691        )
692        .await?;
693    info!(
694        "Opened {} regions in {:?}",
695        open_regions.len(),
696        now.elapsed()
697    );
698    if !ignore_nonexistent_region {
699        ensure!(
700            open_regions.len() == leader_region_num,
701            error::UnexpectedSnafu {
702                violated: format!(
703                    "Expected to open {} of regions, only {} of regions has opened",
704                    leader_region_num,
705                    open_regions.len()
706                )
707            }
708        );
709    } else if open_regions.len() != leader_region_num {
710        warn!(
711            "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
712            leader_region_num,
713            open_regions.len()
714        );
715    }
716
717    for region_id in open_regions {
718        if open_with_writable {
719            let res = region_server.set_region_role(region_id, RegionRole::Leader);
720            match res {
721                Ok(_) => {
722                    // Finalize leadership: persist backfilled metadata.
723                    if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
724                        .set_region_role_state_gracefully(
725                            region_id,
726                            SettableRegionRoleState::Leader,
727                        )
728                        .await?
729                    {
730                        error!(err; "failed to convert region {region_id} to leader");
731                    }
732                }
733                Err(e) => {
734                    error!(e; "failed to convert region {region_id} to leader");
735                }
736            }
737        }
738    }
739
740    #[cfg(feature = "enterprise")]
741    if !follower_regions.is_empty() {
742        use tokio::time::Instant;
743
744        let follower_region_num = follower_regions.len();
745        info!("going to open {} follower region(s)", follower_region_num);
746
747        let now = Instant::now();
748        let open_regions = region_server
749            .handle_batch_open_requests(
750                init_regions_parallelism,
751                follower_regions,
752                ignore_nonexistent_region,
753            )
754            .await?;
755        info!(
756            "Opened {} follower regions in {:?}",
757            open_regions.len(),
758            now.elapsed()
759        );
760
761        if !ignore_nonexistent_region {
762            ensure!(
763                open_regions.len() == follower_region_num,
764                error::UnexpectedSnafu {
765                    violated: format!(
766                        "Expected to open {} of follower regions, only {} of regions has opened",
767                        follower_region_num,
768                        open_regions.len()
769                    )
770                }
771            );
772        } else if open_regions.len() != follower_region_num {
773            warn!(
774                "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
775                follower_region_num,
776                open_regions.len()
777            );
778        }
779    }
780
781    info!("all regions are opened");
782
783    Ok(())
784}
785
786#[cfg(test)]
787mod tests {
788    use std::assert_matches::assert_matches;
789    use std::collections::{BTreeMap, HashMap};
790    use std::sync::Arc;
791
792    use cache::build_datanode_cache_registry;
793    use common_base::Plugins;
794    use common_meta::cache::LayeredCacheRegistryBuilder;
795    use common_meta::key::RegionRoleSet;
796    use common_meta::key::datanode_table::DatanodeTableManager;
797    use common_meta::kv_backend::KvBackendRef;
798    use common_meta::kv_backend::memory::MemoryKvBackend;
799    use mito2::engine::MITO_ENGINE_NAME;
800    use store_api::region_request::RegionRequest;
801    use store_api::storage::RegionId;
802
803    use crate::config::DatanodeOptions;
804    use crate::datanode::DatanodeBuilder;
805    use crate::tests::{MockRegionEngine, mock_region_server};
806
807    async fn setup_table_datanode(kv: &KvBackendRef) {
808        let mgr = DatanodeTableManager::new(kv.clone());
809        let txn = mgr
810            .build_create_txn(
811                1028,
812                MITO_ENGINE_NAME,
813                "foo/bar/weny",
814                HashMap::from([("foo".to_string(), "bar".to_string())]),
815                HashMap::default(),
816                BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
817            )
818            .unwrap();
819
820        let r = kv.txn(txn).await.unwrap();
821        assert!(r.succeeded);
822    }
823
824    #[tokio::test]
825    async fn test_initialize_region_server() {
826        common_telemetry::init_default_ut_logging();
827        let mut mock_region_server = mock_region_server();
828        let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
829
830        mock_region_server.register_engine(mock_region.clone());
831
832        let kv_backend = Arc::new(MemoryKvBackend::new());
833        let layered_cache_registry = Arc::new(
834            LayeredCacheRegistryBuilder::default()
835                .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
836                .build(),
837        );
838
839        let mut builder = DatanodeBuilder::new(
840            DatanodeOptions {
841                node_id: Some(0),
842                ..Default::default()
843            },
844            Plugins::default(),
845            kv_backend.clone(),
846        );
847        builder.with_cache_registry(layered_cache_registry);
848        setup_table_datanode(&(kv_backend as _)).await;
849
850        builder
851            .initialize_region_server(&mock_region_server, false)
852            .await
853            .unwrap();
854
855        for i in 0..3 {
856            let (region_id, req) = mock_region_handler.recv().await.unwrap();
857            assert_eq!(region_id, RegionId::new(1028, i));
858            if let RegionRequest::Open(req) = req {
859                assert_eq!(
860                    req.options,
861                    HashMap::from([("foo".to_string(), "bar".to_string())])
862                )
863            } else {
864                unreachable!()
865            }
866        }
867
868        assert_matches!(
869            mock_region_handler.try_recv(),
870            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
871        );
872    }
873}