datanode/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode implementation.
16
17use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::cache_invalidator::CacheInvalidatorRef;
26use common_meta::datanode::TopicStatsReporter;
27use common_meta::key::runtime_switch::RuntimeSwitchManager;
28use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
29use common_meta::kv_backend::KvBackendRef;
30pub use common_procedure::options::ProcedureConfig;
31use common_query::prelude::set_default_prefix;
32use common_stat::ResourceStatImpl;
33use common_telemetry::{error, info, warn};
34use common_wal::config::DatanodeWalConfig;
35use common_wal::config::kafka::DatanodeKafkaConfig;
36use common_wal::config::raft_engine::RaftEngineConfig;
37use file_engine::engine::FileRegionEngine;
38use log_store::kafka::log_store::KafkaLogStore;
39use log_store::kafka::{GlobalIndexCollector, default_index_file};
40use log_store::noop::log_store::NoopLogStore;
41use log_store::raft_engine::log_store::RaftEngineLogStore;
42use meta_client::MetaClientRef;
43use metric_engine::engine::MetricEngine;
44use mito2::config::MitoConfig;
45use mito2::engine::{MitoEngine, MitoEngineBuilder};
46use mito2::region::opener::PartitionExprFetcherRef;
47use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
48use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
49use object_store::util::normalize_dir;
50use query::QueryEngineFactory;
51use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
52use servers::server::ServerHandlers;
53use snafu::{OptionExt, ResultExt, ensure};
54use store_api::path_utils::WAL_DIR;
55use store_api::region_engine::{
56    RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
57};
58use tokio::fs;
59use tokio::sync::Notify;
60
61use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
62use crate::error::{
63    self, BuildDatanodeSnafu, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu,
64    GetMetadataSnafu, MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
65    ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
66};
67use crate::event_listener::{
68    NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
69    new_region_server_event_channel,
70};
71use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
72use crate::heartbeat::HeartbeatTask;
73use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
74use crate::region_server::{DummyTableProviderFactory, RegionServer};
75use crate::store::{self, new_object_store_without_cache};
76use crate::utils::{RegionOpenRequests, build_region_open_requests};
77
78/// Datanode service.
79pub struct Datanode {
80    services: ServerHandlers,
81    heartbeat_task: Option<HeartbeatTask>,
82    region_event_receiver: Option<RegionServerEventReceiver>,
83    region_server: RegionServer,
84    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
85    leases_notifier: Option<Arc<Notify>>,
86    plugins: Plugins,
87}
88
89impl Datanode {
90    pub async fn start(&mut self) -> Result<()> {
91        info!("Starting datanode instance...");
92
93        self.start_heartbeat().await?;
94        self.wait_coordinated().await;
95
96        self.start_telemetry();
97
98        self.services.start_all().await.context(StartServerSnafu)
99    }
100
101    pub fn server_handlers(&self) -> &ServerHandlers {
102        &self.services
103    }
104
105    pub fn start_telemetry(&self) {
106        if let Err(e) = self.greptimedb_telemetry_task.start() {
107            warn!(e; "Failed to start telemetry task!");
108        }
109    }
110
111    pub async fn start_heartbeat(&mut self) -> Result<()> {
112        if let Some(task) = &self.heartbeat_task {
113            // Safety: The event_receiver must exist.
114            let receiver = self.region_event_receiver.take().unwrap();
115
116            task.start(receiver, self.leases_notifier.clone()).await?;
117        }
118        Ok(())
119    }
120
121    /// If `leases_notifier` exists, it waits until leases have been obtained in all regions.
122    pub async fn wait_coordinated(&mut self) {
123        if let Some(notifier) = self.leases_notifier.take() {
124            notifier.notified().await;
125        }
126    }
127
128    pub fn setup_services(&mut self, services: ServerHandlers) {
129        self.services = services;
130    }
131
132    pub async fn shutdown(&mut self) -> Result<()> {
133        self.services
134            .shutdown_all()
135            .await
136            .context(ShutdownServerSnafu)?;
137
138        let _ = self.greptimedb_telemetry_task.stop().await;
139        if let Some(heartbeat_task) = &self.heartbeat_task {
140            heartbeat_task
141                .close()
142                .map_err(BoxedError::new)
143                .context(ShutdownInstanceSnafu)?;
144        }
145        self.region_server.stop().await?;
146        Ok(())
147    }
148
149    pub fn region_server(&self) -> RegionServer {
150        self.region_server.clone()
151    }
152
153    pub fn plugins(&self) -> Plugins {
154        self.plugins.clone()
155    }
156}
157
158pub struct DatanodeBuilder {
159    opts: DatanodeOptions,
160    table_provider_factory: Option<TableProviderFactoryRef>,
161    plugins: Plugins,
162    meta_client: Option<MetaClientRef>,
163    kv_backend: KvBackendRef,
164    cache_registry: Option<Arc<LayeredCacheRegistry>>,
165    topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
166    #[cfg(feature = "enterprise")]
167    extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
168}
169
170impl DatanodeBuilder {
171    pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
172        Self {
173            opts,
174            table_provider_factory: None,
175            plugins,
176            meta_client: None,
177            kv_backend,
178            cache_registry: None,
179            #[cfg(feature = "enterprise")]
180            extension_range_provider_factory: None,
181            topic_stats_reporter: None,
182        }
183    }
184
185    pub fn options(&self) -> &DatanodeOptions {
186        &self.opts
187    }
188
189    pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
190        self.meta_client = Some(client);
191        self
192    }
193
194    pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
195        self.cache_registry = Some(registry);
196        self
197    }
198
199    pub fn kv_backend(&self) -> &KvBackendRef {
200        &self.kv_backend
201    }
202
203    pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
204        self.table_provider_factory = Some(factory);
205        self
206    }
207
208    #[cfg(feature = "enterprise")]
209    pub fn with_extension_range_provider(
210        &mut self,
211        extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
212    ) -> &mut Self {
213        self.extension_range_provider_factory = Some(extension_range_provider_factory);
214        self
215    }
216
217    pub async fn build(mut self) -> Result<Datanode> {
218        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
219        set_default_prefix(self.opts.default_column_prefix.as_deref())
220            .map_err(BoxedError::new)
221            .context(BuildDatanodeSnafu)?;
222
223        let meta_client = self.meta_client.take();
224
225        // If metasrv client is provided, we will use it to control the region server.
226        // Otherwise the region server is self-controlled, meaning no heartbeat and immediately
227        // writable upon open.
228        let controlled_by_metasrv = meta_client.is_some();
229
230        // build and initialize region server
231        let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
232            let (tx, rx) = new_region_server_event_channel();
233            (Box::new(tx) as _, Some(rx))
234        } else {
235            (Box::new(NoopRegionServerEventListener) as _, None)
236        };
237
238        let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
239        let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
240        let table_id_schema_cache: TableSchemaCacheRef =
241            cache_registry.get().context(MissingCacheSnafu)?;
242
243        let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
244            table_id_schema_cache,
245            schema_cache,
246        ));
247        let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id)));
248        let region_server = self
249            .new_region_server(
250                schema_metadata_manager,
251                region_event_listener,
252                file_ref_manager,
253            )
254            .await?;
255
256        // TODO(weny): Considering introducing a readonly kv_backend trait.
257        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
258        let is_recovery_mode = runtime_switch_manager
259            .recovery_mode()
260            .await
261            .context(GetMetadataSnafu)?;
262
263        let region_open_requests =
264            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
265        let open_all_regions = open_all_regions(
266            region_server.clone(),
267            region_open_requests,
268            !controlled_by_metasrv,
269            self.opts.init_regions_parallelism,
270            // Ignore nonexistent regions in recovery mode.
271            is_recovery_mode,
272        );
273
274        if self.opts.init_regions_in_background {
275            // Opens regions in background.
276            common_runtime::spawn_global(async move {
277                if let Err(err) = open_all_regions.await {
278                    error!(err; "Failed to open regions during the startup.");
279                }
280            });
281        } else {
282            open_all_regions.await?;
283        }
284
285        let heartbeat_task = if let Some(meta_client) = meta_client {
286            let task = self
287                .create_heartbeat_task(&region_server, meta_client, cache_registry)
288                .await?;
289            Some(task)
290        } else {
291            None
292        };
293
294        let is_standalone = heartbeat_task.is_none();
295        let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
296            Some(self.opts.storage.data_home.clone()),
297            is_standalone && self.opts.enable_telemetry,
298        )
299        .await;
300
301        let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
302            Some(Arc::new(Notify::new()))
303        } else {
304            None
305        };
306
307        Ok(Datanode {
308            services: ServerHandlers::default(),
309            heartbeat_task,
310            region_server,
311            greptimedb_telemetry_task,
312            region_event_receiver,
313            leases_notifier,
314            plugins: self.plugins.clone(),
315        })
316    }
317
318    async fn create_heartbeat_task(
319        &self,
320        region_server: &RegionServer,
321        meta_client: MetaClientRef,
322        cache_invalidator: CacheInvalidatorRef,
323    ) -> Result<HeartbeatTask> {
324        let stat = {
325            let mut stat = ResourceStatImpl::default();
326            stat.start_collect_cpu_usage();
327            Arc::new(stat)
328        };
329
330        HeartbeatTask::try_new(
331            &self.opts,
332            region_server.clone(),
333            meta_client,
334            cache_invalidator,
335            self.plugins.clone(),
336            stat,
337        )
338        .await
339    }
340
341    /// Builds [ObjectStoreManager] from [StorageConfig].
342    pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
343        let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
344        let default_name = cfg.store.config_name();
345        let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
346        for store in &cfg.providers {
347            object_store_manager.add(
348                store.config_name(),
349                store::new_object_store(store.clone(), &cfg.data_home).await?,
350            );
351        }
352        Ok(Arc::new(object_store_manager))
353    }
354
355    #[cfg(test)]
356    /// Open all regions belong to this datanode.
357    async fn initialize_region_server(
358        &self,
359        region_server: &RegionServer,
360        open_with_writable: bool,
361    ) -> Result<()> {
362        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
363
364        // TODO(weny): Considering introducing a readonly kv_backend trait.
365        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
366        let is_recovery_mode = runtime_switch_manager
367            .recovery_mode()
368            .await
369            .context(GetMetadataSnafu)?;
370        let region_open_requests =
371            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
372
373        open_all_regions(
374            region_server.clone(),
375            region_open_requests,
376            open_with_writable,
377            self.opts.init_regions_parallelism,
378            is_recovery_mode,
379        )
380        .await
381    }
382
383    async fn new_region_server(
384        &mut self,
385        schema_metadata_manager: SchemaMetadataManagerRef,
386        event_listener: RegionServerEventListenerRef,
387        file_ref_manager: FileReferenceManagerRef,
388    ) -> Result<RegionServer> {
389        let opts: &DatanodeOptions = &self.opts;
390
391        let query_engine_factory = QueryEngineFactory::new_with_plugins(
392            // query engine in datanode only executes plan with resolved table source.
393            DummyCatalogManager::arc(),
394            None,
395            None,
396            None,
397            None,
398            None,
399            false,
400            self.plugins.clone(),
401            opts.query.clone(),
402        );
403        let query_engine = query_engine_factory.query_engine();
404
405        let table_provider_factory = self
406            .table_provider_factory
407            .clone()
408            .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
409
410        let mut region_server = RegionServer::with_table_provider(
411            query_engine,
412            common_runtime::global_runtime(),
413            event_listener,
414            table_provider_factory,
415            opts.max_concurrent_queries,
416            //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
417            Duration::from_millis(100),
418            opts.grpc.flight_compression,
419        );
420
421        let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
422        let engines = self
423            .build_store_engines(
424                object_store_manager,
425                schema_metadata_manager,
426                file_ref_manager,
427                self.plugins.clone(),
428            )
429            .await?;
430        for engine in engines {
431            region_server.register_engine(engine);
432        }
433        if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
434            region_server.set_topic_stats_reporter(topic_stats_reporter);
435        }
436
437        Ok(region_server)
438    }
439
440    // internal utils
441
442    /// Builds [RegionEngineRef] from `store_engine` section in `opts`
443    async fn build_store_engines(
444        &mut self,
445        object_store_manager: ObjectStoreManagerRef,
446        schema_metadata_manager: SchemaMetadataManagerRef,
447        file_ref_manager: FileReferenceManagerRef,
448        plugins: Plugins,
449    ) -> Result<Vec<RegionEngineRef>> {
450        let mut metric_engine_config = metric_engine::config::EngineConfig::default();
451        let mut mito_engine_config = MitoConfig::default();
452        let mut file_engine_config = file_engine::config::EngineConfig::default();
453
454        for engine in &self.opts.region_engine {
455            match engine {
456                RegionEngineConfig::Mito(config) => {
457                    mito_engine_config = config.clone();
458                }
459                RegionEngineConfig::File(config) => {
460                    file_engine_config = config.clone();
461                }
462                RegionEngineConfig::Metric(metric_config) => {
463                    metric_engine_config = metric_config.clone();
464                }
465            }
466        }
467
468        // Build a fetcher to backfill partition_expr on open.
469        let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
470        let mito_engine = self
471            .build_mito_engine(
472                object_store_manager.clone(),
473                mito_engine_config,
474                schema_metadata_manager.clone(),
475                file_ref_manager.clone(),
476                fetcher.clone(),
477                plugins.clone(),
478            )
479            .await?;
480
481        let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
482            .context(BuildMetricEngineSnafu)?;
483
484        let file_engine = FileRegionEngine::new(
485            file_engine_config,
486            object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
487        );
488
489        Ok(vec![
490            Arc::new(mito_engine) as _,
491            Arc::new(metric_engine) as _,
492            Arc::new(file_engine) as _,
493        ])
494    }
495
496    /// Builds [MitoEngine] according to options.
497    async fn build_mito_engine(
498        &mut self,
499        object_store_manager: ObjectStoreManagerRef,
500        mut config: MitoConfig,
501        schema_metadata_manager: SchemaMetadataManagerRef,
502        file_ref_manager: FileReferenceManagerRef,
503        partition_expr_fetcher: PartitionExprFetcherRef,
504        plugins: Plugins,
505    ) -> Result<MitoEngine> {
506        let opts = &self.opts;
507        if opts.storage.is_object_storage() {
508            // Enable the write cache when setting object storage
509            config.enable_write_cache = true;
510            info!("Configured 'enable_write_cache=true' for mito engine.");
511        }
512
513        let mito_engine = match &opts.wal {
514            DatanodeWalConfig::RaftEngine(raft_engine_config) => {
515                let log_store =
516                    Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
517                        .await?;
518
519                let builder = MitoEngineBuilder::new(
520                    &opts.storage.data_home,
521                    config,
522                    log_store,
523                    object_store_manager,
524                    schema_metadata_manager,
525                    file_ref_manager,
526                    partition_expr_fetcher.clone(),
527                    plugins,
528                    opts.max_concurrent_queries,
529                );
530
531                #[cfg(feature = "enterprise")]
532                let builder = builder.with_extension_range_provider_factory(
533                    self.extension_range_provider_factory.take(),
534                );
535
536                builder.try_build().await.context(BuildMitoEngineSnafu)?
537            }
538            DatanodeWalConfig::Kafka(kafka_config) => {
539                if kafka_config.create_index && opts.node_id.is_none() {
540                    warn!("The WAL index creation only available in distributed mode.")
541                }
542                let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
543                {
544                    let operator = new_object_store_without_cache(
545                        &opts.storage.store,
546                        &opts.storage.data_home,
547                    )
548                    .await?;
549                    let path = default_index_file(opts.node_id.unwrap());
550                    Some(Self::build_global_index_collector(
551                        kafka_config.dump_index_interval,
552                        operator,
553                        path,
554                    ))
555                } else {
556                    None
557                };
558
559                let log_store =
560                    Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
561                self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
562                let builder = MitoEngineBuilder::new(
563                    &opts.storage.data_home,
564                    config,
565                    log_store,
566                    object_store_manager,
567                    schema_metadata_manager,
568                    file_ref_manager,
569                    partition_expr_fetcher,
570                    plugins,
571                    opts.max_concurrent_queries,
572                );
573
574                #[cfg(feature = "enterprise")]
575                let builder = builder.with_extension_range_provider_factory(
576                    self.extension_range_provider_factory.take(),
577                );
578
579                builder.try_build().await.context(BuildMitoEngineSnafu)?
580            }
581            DatanodeWalConfig::Noop => {
582                let log_store = Arc::new(NoopLogStore);
583
584                let builder = MitoEngineBuilder::new(
585                    &opts.storage.data_home,
586                    config,
587                    log_store,
588                    object_store_manager,
589                    schema_metadata_manager,
590                    file_ref_manager,
591                    partition_expr_fetcher.clone(),
592                    plugins,
593                    opts.max_concurrent_queries,
594                );
595
596                #[cfg(feature = "enterprise")]
597                let builder = builder.with_extension_range_provider_factory(
598                    self.extension_range_provider_factory.take(),
599                );
600
601                builder.try_build().await.context(BuildMitoEngineSnafu)?
602            }
603        };
604        Ok(mito_engine)
605    }
606
607    /// Builds [RaftEngineLogStore].
608    async fn build_raft_engine_log_store(
609        data_home: &str,
610        config: &RaftEngineConfig,
611    ) -> Result<Arc<RaftEngineLogStore>> {
612        let data_home = normalize_dir(data_home);
613        let wal_dir = match &config.dir {
614            Some(dir) => dir.clone(),
615            None => format!("{}{WAL_DIR}", data_home),
616        };
617
618        // create WAL directory
619        fs::create_dir_all(Path::new(&wal_dir))
620            .await
621            .context(CreateDirSnafu { dir: &wal_dir })?;
622        info!(
623            "Creating raft-engine logstore with config: {:?} and storage path: {}",
624            config, &wal_dir
625        );
626        let logstore = RaftEngineLogStore::try_new(wal_dir, config)
627            .await
628            .map_err(Box::new)
629            .context(OpenLogStoreSnafu)?;
630
631        Ok(Arc::new(logstore))
632    }
633
634    /// Builds [`KafkaLogStore`].
635    async fn build_kafka_log_store(
636        config: &DatanodeKafkaConfig,
637        global_index_collector: Option<GlobalIndexCollector>,
638    ) -> Result<Arc<KafkaLogStore>> {
639        KafkaLogStore::try_new(config, global_index_collector)
640            .await
641            .map_err(Box::new)
642            .context(OpenLogStoreSnafu)
643            .map(Arc::new)
644    }
645
646    /// Builds [`GlobalIndexCollector`]
647    fn build_global_index_collector(
648        dump_index_interval: Duration,
649        operator: object_store::ObjectStore,
650        path: String,
651    ) -> GlobalIndexCollector {
652        GlobalIndexCollector::new(dump_index_interval, operator, path)
653    }
654}
655
656/// Open all regions belong to this datanode.
657async fn open_all_regions(
658    region_server: RegionServer,
659    region_open_requests: RegionOpenRequests,
660    open_with_writable: bool,
661    init_regions_parallelism: usize,
662    ignore_nonexistent_region: bool,
663) -> Result<()> {
664    let RegionOpenRequests {
665        leader_regions,
666        #[cfg(feature = "enterprise")]
667        follower_regions,
668    } = region_open_requests;
669
670    let leader_region_num = leader_regions.len();
671    info!("going to open {} region(s)", leader_region_num);
672    let now = Instant::now();
673    let open_regions = region_server
674        .handle_batch_open_requests(
675            init_regions_parallelism,
676            leader_regions,
677            ignore_nonexistent_region,
678        )
679        .await?;
680    info!(
681        "Opened {} regions in {:?}",
682        open_regions.len(),
683        now.elapsed()
684    );
685    if !ignore_nonexistent_region {
686        ensure!(
687            open_regions.len() == leader_region_num,
688            error::UnexpectedSnafu {
689                violated: format!(
690                    "Expected to open {} of regions, only {} of regions has opened",
691                    leader_region_num,
692                    open_regions.len()
693                )
694            }
695        );
696    } else if open_regions.len() != leader_region_num {
697        warn!(
698            "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
699            leader_region_num,
700            open_regions.len()
701        );
702    }
703
704    for region_id in open_regions {
705        if open_with_writable {
706            let res = region_server.set_region_role(region_id, RegionRole::Leader);
707            match res {
708                Ok(_) => {
709                    // Finalize leadership: persist backfilled metadata.
710                    if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
711                        .set_region_role_state_gracefully(
712                            region_id,
713                            SettableRegionRoleState::Leader,
714                        )
715                        .await?
716                    {
717                        error!(err; "failed to convert region {region_id} to leader");
718                    }
719                }
720                Err(e) => {
721                    error!(e; "failed to convert region {region_id} to leader");
722                }
723            }
724        }
725    }
726
727    #[cfg(feature = "enterprise")]
728    if !follower_regions.is_empty() {
729        use tokio::time::Instant;
730
731        let follower_region_num = follower_regions.len();
732        info!("going to open {} follower region(s)", follower_region_num);
733
734        let now = Instant::now();
735        let open_regions = region_server
736            .handle_batch_open_requests(
737                init_regions_parallelism,
738                follower_regions,
739                ignore_nonexistent_region,
740            )
741            .await?;
742        info!(
743            "Opened {} follower regions in {:?}",
744            open_regions.len(),
745            now.elapsed()
746        );
747
748        if !ignore_nonexistent_region {
749            ensure!(
750                open_regions.len() == follower_region_num,
751                error::UnexpectedSnafu {
752                    violated: format!(
753                        "Expected to open {} of follower regions, only {} of regions has opened",
754                        follower_region_num,
755                        open_regions.len()
756                    )
757                }
758            );
759        } else if open_regions.len() != follower_region_num {
760            warn!(
761                "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
762                follower_region_num,
763                open_regions.len()
764            );
765        }
766    }
767
768    info!("all regions are opened");
769
770    Ok(())
771}
772
773#[cfg(test)]
774mod tests {
775    use std::assert_matches::assert_matches;
776    use std::collections::{BTreeMap, HashMap};
777    use std::sync::Arc;
778
779    use cache::build_datanode_cache_registry;
780    use common_base::Plugins;
781    use common_meta::cache::LayeredCacheRegistryBuilder;
782    use common_meta::key::RegionRoleSet;
783    use common_meta::key::datanode_table::DatanodeTableManager;
784    use common_meta::kv_backend::KvBackendRef;
785    use common_meta::kv_backend::memory::MemoryKvBackend;
786    use mito2::engine::MITO_ENGINE_NAME;
787    use store_api::region_request::RegionRequest;
788    use store_api::storage::RegionId;
789
790    use crate::config::DatanodeOptions;
791    use crate::datanode::DatanodeBuilder;
792    use crate::tests::{MockRegionEngine, mock_region_server};
793
794    async fn setup_table_datanode(kv: &KvBackendRef) {
795        let mgr = DatanodeTableManager::new(kv.clone());
796        let txn = mgr
797            .build_create_txn(
798                1028,
799                MITO_ENGINE_NAME,
800                "foo/bar/weny",
801                HashMap::from([("foo".to_string(), "bar".to_string())]),
802                HashMap::default(),
803                BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
804            )
805            .unwrap();
806
807        let r = kv.txn(txn).await.unwrap();
808        assert!(r.succeeded);
809    }
810
811    #[tokio::test]
812    async fn test_initialize_region_server() {
813        common_telemetry::init_default_ut_logging();
814        let mut mock_region_server = mock_region_server();
815        let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
816
817        mock_region_server.register_engine(mock_region.clone());
818
819        let kv_backend = Arc::new(MemoryKvBackend::new());
820        let layered_cache_registry = Arc::new(
821            LayeredCacheRegistryBuilder::default()
822                .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
823                .build(),
824        );
825
826        let mut builder = DatanodeBuilder::new(
827            DatanodeOptions {
828                node_id: Some(0),
829                ..Default::default()
830            },
831            Plugins::default(),
832            kv_backend.clone(),
833        );
834        builder.with_cache_registry(layered_cache_registry);
835        setup_table_datanode(&(kv_backend as _)).await;
836
837        builder
838            .initialize_region_server(&mock_region_server, false)
839            .await
840            .unwrap();
841
842        for i in 0..3 {
843            let (region_id, req) = mock_region_handler.recv().await.unwrap();
844            assert_eq!(region_id, RegionId::new(1028, i));
845            if let RegionRequest::Open(req) = req {
846                assert_eq!(
847                    req.options,
848                    HashMap::from([("foo".to_string(), "bar".to_string())])
849                )
850            } else {
851                unreachable!()
852            }
853        }
854
855        assert_matches!(
856            mock_region_handler.try_recv(),
857            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
858        );
859    }
860}