Skip to main content

datanode/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode implementation.
16
17use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::cache_invalidator::CacheInvalidatorRef;
26use common_meta::datanode::TopicStatsReporter;
27use common_meta::key::runtime_switch::RuntimeSwitchManager;
28use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
29use common_meta::kv_backend::KvBackendRef;
30pub use common_procedure::options::ProcedureConfig;
31use common_query::prelude::set_default_prefix;
32use common_stat::ResourceStatImpl;
33use common_telemetry::{error, info, warn};
34use common_wal::config::DatanodeWalConfig;
35use common_wal::config::kafka::DatanodeKafkaConfig;
36use common_wal::config::raft_engine::RaftEngineConfig;
37use file_engine::engine::FileRegionEngine;
38use log_store::kafka::log_store::KafkaLogStore;
39use log_store::kafka::{GlobalIndexCollector, default_index_file};
40use log_store::noop::log_store::NoopLogStore;
41use log_store::raft_engine::log_store::RaftEngineLogStore;
42use meta_client::MetaClientRef;
43use metric_engine::engine::MetricEngine;
44use mito2::config::MitoConfig;
45use mito2::engine::{MitoEngine, MitoEngineBuilder};
46use mito2::region::opener::PartitionExprFetcherRef;
47use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
48use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
49use object_store::util::normalize_dir;
50use query::QueryEngineFactory;
51use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
52use servers::server::ServerHandlers;
53use snafu::{OptionExt, ResultExt, ensure};
54use store_api::path_utils::WAL_DIR;
55use store_api::region_engine::{
56    RegionEngineRef, RegionRole, SetRegionRoleStateResponse, SettableRegionRoleState,
57};
58use tokio::fs;
59use tokio::sync::Notify;
60
61use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
62use crate::error::{
63    self, BuildDatanodeSnafu, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu,
64    GetMetadataSnafu, MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
65    ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
66};
67use crate::event_listener::{
68    NoopRegionServerEventListener, RegionServerEventListenerRef, RegionServerEventReceiver,
69    new_region_server_event_channel,
70};
71use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
72use crate::heartbeat::HeartbeatTask;
73use crate::partition_expr_fetcher::MetaPartitionExprFetcher;
74use crate::region_server::{DummyTableProviderFactory, RegionServer};
75use crate::store::{self, new_object_store_without_cache};
76use crate::utils::{RegionOpenRequests, build_region_open_requests};
77
78/// Datanode service.
79pub struct Datanode {
80    services: ServerHandlers,
81    heartbeat_task: Option<HeartbeatTask>,
82    region_event_receiver: Option<RegionServerEventReceiver>,
83    region_server: RegionServer,
84    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
85    leases_notifier: Option<Arc<Notify>>,
86    plugins: Plugins,
87}
88
89impl Datanode {
90    pub async fn start(&mut self) -> Result<()> {
91        info!("Starting datanode instance...");
92
93        self.start_heartbeat().await?;
94        self.wait_coordinated().await;
95
96        self.start_telemetry();
97
98        self.services.start_all().await.context(StartServerSnafu)
99    }
100
101    pub fn server_handlers(&self) -> &ServerHandlers {
102        &self.services
103    }
104
105    pub fn start_telemetry(&self) {
106        if let Err(e) = self.greptimedb_telemetry_task.start() {
107            warn!(e; "Failed to start telemetry task!");
108        }
109    }
110
111    pub async fn start_heartbeat(&mut self) -> Result<()> {
112        if let Some(task) = &self.heartbeat_task {
113            // Safety: The event_receiver must exist.
114            let receiver = self.region_event_receiver.take().unwrap();
115
116            task.start(receiver, self.leases_notifier.clone()).await?;
117        }
118        Ok(())
119    }
120
121    /// If `leases_notifier` exists, it waits until leases have been obtained in all regions.
122    pub async fn wait_coordinated(&mut self) {
123        if let Some(notifier) = self.leases_notifier.take() {
124            notifier.notified().await;
125        }
126    }
127
128    pub fn setup_services(&mut self, services: ServerHandlers) {
129        self.services = services;
130    }
131
132    pub async fn shutdown(&mut self) -> Result<()> {
133        self.services
134            .shutdown_all()
135            .await
136            .context(ShutdownServerSnafu)?;
137
138        let _ = self.greptimedb_telemetry_task.stop().await;
139        if let Some(heartbeat_task) = &self.heartbeat_task {
140            heartbeat_task
141                .close()
142                .map_err(BoxedError::new)
143                .context(ShutdownInstanceSnafu)?;
144        }
145        self.region_server.stop().await?;
146        Ok(())
147    }
148
149    pub fn region_server(&self) -> RegionServer {
150        self.region_server.clone()
151    }
152
153    pub fn plugins(&self) -> Plugins {
154        self.plugins.clone()
155    }
156}
157
158pub struct DatanodeBuilder {
159    opts: DatanodeOptions,
160    table_provider_factory: Option<TableProviderFactoryRef>,
161    plugins: Plugins,
162    meta_client: Option<MetaClientRef>,
163    kv_backend: KvBackendRef,
164    cache_registry: Option<Arc<LayeredCacheRegistry>>,
165    topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
166    open_regions_writable_override: Option<bool>,
167    #[cfg(feature = "enterprise")]
168    extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
169}
170
171impl DatanodeBuilder {
172    pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
173        Self {
174            opts,
175            table_provider_factory: None,
176            plugins,
177            meta_client: None,
178            kv_backend,
179            cache_registry: None,
180            open_regions_writable_override: None,
181            #[cfg(feature = "enterprise")]
182            extension_range_provider_factory: None,
183            topic_stats_reporter: None,
184        }
185    }
186
187    pub fn options(&self) -> &DatanodeOptions {
188        &self.opts
189    }
190
191    pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
192        self.meta_client = Some(client);
193        self
194    }
195
196    pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
197        self.cache_registry = Some(registry);
198        self
199    }
200
201    pub fn kv_backend(&self) -> &KvBackendRef {
202        &self.kv_backend
203    }
204
205    pub fn meta_client(&self) -> Option<&MetaClientRef> {
206        self.meta_client.as_ref()
207    }
208
209    pub fn cache_registry(&self) -> Option<&Arc<LayeredCacheRegistry>> {
210        self.cache_registry.as_ref()
211    }
212
213    pub fn set_plugins(&mut self, plugins: Plugins) {
214        self.plugins = plugins;
215    }
216
217    pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
218        self.table_provider_factory = Some(factory);
219        self
220    }
221
222    /// Overrides whether regions opened during datanode startup should become writable.
223    ///
224    /// When unset, the builder uses its default writable policy for reopened regions
225    /// (writable only when no metasrv client is configured).
226    ///
227    /// Warning: setting this to `true` on a metasrv-controlled datanode (one built
228    /// with `with_meta_client`) will promote regions to Leader before heartbeat and
229    /// lease coordination begin, bypassing the metasrv safety contract and creating a
230    /// potential split-brain window during startup.
231    pub fn with_open_regions_writable_override(&mut self, writable: bool) -> &mut Self {
232        self.open_regions_writable_override = Some(writable);
233        self
234    }
235
236    #[cfg(feature = "enterprise")]
237    pub fn with_extension_range_provider(
238        &mut self,
239        extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
240    ) -> &mut Self {
241        self.extension_range_provider_factory = Some(extension_range_provider_factory);
242        self
243    }
244
245    pub async fn build(mut self) -> Result<Datanode> {
246        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
247        set_default_prefix(self.opts.default_column_prefix.as_deref())
248            .map_err(BoxedError::new)
249            .context(BuildDatanodeSnafu)?;
250
251        let meta_client = self.meta_client.take();
252
253        // If metasrv client is provided, we will use it to control the region server.
254        // Otherwise the region server is self-controlled, meaning no heartbeat and immediately
255        // writable upon open.
256        let controlled_by_metasrv = meta_client.is_some();
257
258        // build and initialize region server
259        let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
260            let (tx, rx) = new_region_server_event_channel();
261            (Box::new(tx) as _, Some(rx))
262        } else {
263            (Box::new(NoopRegionServerEventListener) as _, None)
264        };
265
266        let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
267        let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
268        let table_id_schema_cache: TableSchemaCacheRef =
269            cache_registry.get().context(MissingCacheSnafu)?;
270
271        let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
272            table_id_schema_cache,
273            schema_cache,
274        ));
275
276        let gc_enabled = self.opts.region_engine.iter().any(|engine| {
277            if let RegionEngineConfig::Mito(config) = engine {
278                config.gc.enable
279            } else {
280                false
281            }
282        });
283
284        let file_ref_manager = Arc::new(FileReferenceManager::with_gc_enabled(
285            Some(node_id),
286            gc_enabled,
287        ));
288        let region_server = self
289            .new_region_server(
290                schema_metadata_manager,
291                region_event_listener,
292                file_ref_manager,
293            )
294            .await?;
295
296        // TODO(weny): Considering introducing a readonly kv_backend trait.
297        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
298        let is_recovery_mode = runtime_switch_manager
299            .recovery_mode()
300            .await
301            .context(GetMetadataSnafu)?;
302
303        let region_open_requests =
304            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
305        let open_with_writable = self
306            .open_regions_writable_override
307            .unwrap_or(!controlled_by_metasrv);
308        let open_all_regions = open_all_regions(
309            region_server.clone(),
310            region_open_requests,
311            open_with_writable,
312            self.opts.init_regions_parallelism,
313            // Ignore nonexistent regions in recovery mode.
314            is_recovery_mode,
315        );
316
317        if self.opts.init_regions_in_background {
318            // Opens regions in background.
319            common_runtime::spawn_global(async move {
320                if let Err(err) = open_all_regions.await {
321                    error!(err; "Failed to open regions during the startup.");
322                }
323            });
324        } else {
325            open_all_regions.await?;
326        }
327
328        let heartbeat_task = if let Some(meta_client) = meta_client {
329            let task = self
330                .create_heartbeat_task(&region_server, meta_client, cache_registry)
331                .await?;
332            Some(task)
333        } else {
334            None
335        };
336
337        let is_standalone = heartbeat_task.is_none();
338        let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
339            Some(self.opts.storage.data_home.clone()),
340            is_standalone && self.opts.enable_telemetry,
341        )
342        .await;
343
344        let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
345            Some(Arc::new(Notify::new()))
346        } else {
347            None
348        };
349
350        Ok(Datanode {
351            services: ServerHandlers::default(),
352            heartbeat_task,
353            region_server,
354            greptimedb_telemetry_task,
355            region_event_receiver,
356            leases_notifier,
357            plugins: self.plugins.clone(),
358        })
359    }
360
361    async fn create_heartbeat_task(
362        &self,
363        region_server: &RegionServer,
364        meta_client: MetaClientRef,
365        cache_invalidator: CacheInvalidatorRef,
366    ) -> Result<HeartbeatTask> {
367        let stat = {
368            let mut stat = ResourceStatImpl::default();
369            stat.start_collect_cpu_usage();
370            Arc::new(stat)
371        };
372
373        HeartbeatTask::try_new(
374            &self.opts,
375            region_server.clone(),
376            meta_client,
377            self.kv_backend.clone(),
378            cache_invalidator,
379            self.plugins.clone(),
380            stat,
381        )
382        .await
383    }
384
385    /// Builds [ObjectStoreManager] from [StorageConfig].
386    pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
387        let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
388        let default_name = cfg.store.config_name();
389        let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
390        for store in &cfg.providers {
391            object_store_manager.add(
392                store.config_name(),
393                store::new_object_store(store.clone(), &cfg.data_home).await?,
394            );
395        }
396        Ok(Arc::new(object_store_manager))
397    }
398
399    #[cfg(test)]
400    /// Open all regions belong to this datanode.
401    async fn initialize_region_server(
402        &self,
403        region_server: &RegionServer,
404        open_with_writable: bool,
405    ) -> Result<()> {
406        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
407
408        // TODO(weny): Considering introducing a readonly kv_backend trait.
409        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
410        let is_recovery_mode = runtime_switch_manager
411            .recovery_mode()
412            .await
413            .context(GetMetadataSnafu)?;
414        let region_open_requests =
415            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
416
417        open_all_regions(
418            region_server.clone(),
419            region_open_requests,
420            open_with_writable,
421            self.opts.init_regions_parallelism,
422            is_recovery_mode,
423        )
424        .await
425    }
426
427    async fn new_region_server(
428        &mut self,
429        schema_metadata_manager: SchemaMetadataManagerRef,
430        event_listener: RegionServerEventListenerRef,
431        file_ref_manager: FileReferenceManagerRef,
432    ) -> Result<RegionServer> {
433        let opts: &DatanodeOptions = &self.opts;
434
435        let query_engine_factory = QueryEngineFactory::new_with_plugins(
436            // query engine in datanode only executes plan with resolved table source.
437            DummyCatalogManager::arc(),
438            None,
439            None,
440            None,
441            None,
442            None,
443            false,
444            self.plugins.clone(),
445            opts.query.clone(),
446        );
447        let query_engine = query_engine_factory.query_engine();
448
449        let table_provider_factory = self
450            .table_provider_factory
451            .clone()
452            .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
453        let mut region_server = RegionServer::with_table_provider(
454            query_engine,
455            common_runtime::global_runtime(),
456            event_listener,
457            table_provider_factory,
458            opts.max_concurrent_queries,
459            opts.concurrent_query_limiter_timeout,
460            opts.grpc.flight_compression,
461        );
462        region_server.install_remote_dyn_filter_receiver_injector(&self.plugins);
463
464        let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
465        let engines = self
466            .build_store_engines(
467                object_store_manager,
468                schema_metadata_manager,
469                file_ref_manager,
470                self.plugins.clone(),
471            )
472            .await?;
473        for engine in engines {
474            region_server.register_engine(engine);
475        }
476        if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
477            region_server.set_topic_stats_reporter(topic_stats_reporter);
478        }
479
480        Ok(region_server)
481    }
482
483    // internal utils
484
485    /// Builds [RegionEngineRef] from `store_engine` section in `opts`
486    async fn build_store_engines(
487        &mut self,
488        object_store_manager: ObjectStoreManagerRef,
489        schema_metadata_manager: SchemaMetadataManagerRef,
490        file_ref_manager: FileReferenceManagerRef,
491        plugins: Plugins,
492    ) -> Result<Vec<RegionEngineRef>> {
493        let mut metric_engine_config = metric_engine::config::EngineConfig::default();
494        let mut mito_engine_config = MitoConfig::default();
495        let mut file_engine_config = file_engine::config::EngineConfig::default();
496
497        for engine in &self.opts.region_engine {
498            match engine {
499                RegionEngineConfig::Mito(config) => {
500                    mito_engine_config = config.clone();
501                }
502                RegionEngineConfig::File(config) => {
503                    file_engine_config = config.clone();
504                }
505                RegionEngineConfig::Metric(metric_config) => {
506                    metric_engine_config = metric_config.clone();
507                }
508            }
509        }
510
511        // Build a fetcher to backfill partition_expr on open.
512        let fetcher = Arc::new(MetaPartitionExprFetcher::new(self.kv_backend.clone()));
513        let mito_engine = self
514            .build_mito_engine(
515                object_store_manager.clone(),
516                mito_engine_config,
517                schema_metadata_manager.clone(),
518                file_ref_manager.clone(),
519                fetcher.clone(),
520                plugins.clone(),
521            )
522            .await?;
523
524        let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
525            .context(BuildMetricEngineSnafu)?;
526
527        let file_engine = FileRegionEngine::new(
528            file_engine_config,
529            object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
530        );
531
532        Ok(vec![
533            Arc::new(mito_engine) as _,
534            Arc::new(metric_engine) as _,
535            Arc::new(file_engine) as _,
536        ])
537    }
538
539    /// Builds [MitoEngine] according to options.
540    async fn build_mito_engine(
541        &mut self,
542        object_store_manager: ObjectStoreManagerRef,
543        mut config: MitoConfig,
544        schema_metadata_manager: SchemaMetadataManagerRef,
545        file_ref_manager: FileReferenceManagerRef,
546        partition_expr_fetcher: PartitionExprFetcherRef,
547        plugins: Plugins,
548    ) -> Result<MitoEngine> {
549        let opts = &self.opts;
550        if opts.storage.is_object_storage() {
551            // Enable the write cache when setting object storage
552            config.enable_write_cache = true;
553            info!("Configured 'enable_write_cache=true' for mito engine.");
554        }
555
556        let mito_engine = match &opts.wal {
557            DatanodeWalConfig::RaftEngine(raft_engine_config) => {
558                let log_store =
559                    Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
560                        .await?;
561
562                let builder = MitoEngineBuilder::new(
563                    &opts.storage.data_home,
564                    config,
565                    log_store,
566                    object_store_manager,
567                    schema_metadata_manager,
568                    file_ref_manager,
569                    partition_expr_fetcher.clone(),
570                    plugins,
571                );
572
573                #[cfg(feature = "enterprise")]
574                let builder = builder.with_extension_range_provider_factory(
575                    self.extension_range_provider_factory.take(),
576                );
577
578                builder.try_build().await.context(BuildMitoEngineSnafu)?
579            }
580            DatanodeWalConfig::Kafka(kafka_config) => {
581                if kafka_config.create_index && opts.node_id.is_none() {
582                    warn!("The WAL index creation only available in distributed mode.")
583                }
584                let global_index_collector = if kafka_config.create_index
585                    && let Some(node_id) = opts.node_id
586                {
587                    let operator = new_object_store_without_cache(
588                        &opts.storage.store,
589                        &opts.storage.data_home,
590                    )
591                    .await?;
592                    let path = default_index_file(node_id);
593                    Some(Self::build_global_index_collector(
594                        kafka_config.dump_index_interval,
595                        operator,
596                        path,
597                    ))
598                } else {
599                    None
600                };
601
602                let log_store =
603                    Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
604                self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
605                let builder = MitoEngineBuilder::new(
606                    &opts.storage.data_home,
607                    config,
608                    log_store,
609                    object_store_manager,
610                    schema_metadata_manager,
611                    file_ref_manager,
612                    partition_expr_fetcher,
613                    plugins,
614                );
615
616                #[cfg(feature = "enterprise")]
617                let builder = builder.with_extension_range_provider_factory(
618                    self.extension_range_provider_factory.take(),
619                );
620
621                builder.try_build().await.context(BuildMitoEngineSnafu)?
622            }
623            DatanodeWalConfig::Noop => {
624                let log_store = Arc::new(NoopLogStore);
625
626                let builder = MitoEngineBuilder::new(
627                    &opts.storage.data_home,
628                    config,
629                    log_store,
630                    object_store_manager,
631                    schema_metadata_manager,
632                    file_ref_manager,
633                    partition_expr_fetcher.clone(),
634                    plugins,
635                );
636
637                #[cfg(feature = "enterprise")]
638                let builder = builder.with_extension_range_provider_factory(
639                    self.extension_range_provider_factory.take(),
640                );
641
642                builder.try_build().await.context(BuildMitoEngineSnafu)?
643            }
644        };
645        Ok(mito_engine)
646    }
647
648    /// Builds [RaftEngineLogStore].
649    async fn build_raft_engine_log_store(
650        data_home: &str,
651        config: &RaftEngineConfig,
652    ) -> Result<Arc<RaftEngineLogStore>> {
653        let data_home = normalize_dir(data_home);
654        let wal_dir = match &config.dir {
655            Some(dir) => dir.clone(),
656            None => format!("{}{WAL_DIR}", data_home),
657        };
658
659        // create WAL directory
660        fs::create_dir_all(Path::new(&wal_dir))
661            .await
662            .context(CreateDirSnafu { dir: &wal_dir })?;
663        info!(
664            "Creating raft-engine logstore with config: {:?} and storage path: {}",
665            config, &wal_dir
666        );
667        let logstore = RaftEngineLogStore::try_new(wal_dir, config)
668            .await
669            .map_err(Box::new)
670            .context(OpenLogStoreSnafu)?;
671
672        Ok(Arc::new(logstore))
673    }
674
675    /// Builds [`KafkaLogStore`].
676    async fn build_kafka_log_store(
677        config: &DatanodeKafkaConfig,
678        global_index_collector: Option<GlobalIndexCollector>,
679    ) -> Result<Arc<KafkaLogStore>> {
680        KafkaLogStore::try_new(config, global_index_collector)
681            .await
682            .map_err(Box::new)
683            .context(OpenLogStoreSnafu)
684            .map(Arc::new)
685    }
686
687    /// Builds [`GlobalIndexCollector`]
688    fn build_global_index_collector(
689        dump_index_interval: Duration,
690        operator: object_store::ObjectStore,
691        path: String,
692    ) -> GlobalIndexCollector {
693        GlobalIndexCollector::new(dump_index_interval, operator, path)
694    }
695}
696
697/// Open all regions belong to this datanode.
698async fn open_all_regions(
699    region_server: RegionServer,
700    region_open_requests: RegionOpenRequests,
701    open_with_writable: bool,
702    init_regions_parallelism: usize,
703    ignore_nonexistent_region: bool,
704) -> Result<()> {
705    let RegionOpenRequests {
706        leader_regions,
707        #[cfg(feature = "enterprise")]
708        follower_regions,
709    } = region_open_requests;
710
711    let leader_region_num = leader_regions.len();
712    info!("going to open {} region(s)", leader_region_num);
713    let now = Instant::now();
714    let open_regions = region_server
715        .handle_batch_open_requests(
716            init_regions_parallelism,
717            leader_regions,
718            ignore_nonexistent_region,
719        )
720        .await?;
721    info!(
722        "Opened {} regions in {:?}",
723        open_regions.len(),
724        now.elapsed()
725    );
726    if !ignore_nonexistent_region {
727        ensure!(
728            open_regions.len() == leader_region_num,
729            error::UnexpectedSnafu {
730                violated: format!(
731                    "Expected to open {} of regions, only {} of regions has opened",
732                    leader_region_num,
733                    open_regions.len()
734                )
735            }
736        );
737    } else if open_regions.len() != leader_region_num {
738        warn!(
739            "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
740            leader_region_num,
741            open_regions.len()
742        );
743    }
744
745    for region_id in open_regions {
746        if open_with_writable {
747            let res = region_server.set_region_role(region_id, RegionRole::Leader);
748            match res {
749                Ok(_) => {
750                    // Finalize leadership: persist backfilled metadata.
751                    if let SetRegionRoleStateResponse::InvalidTransition(err) = region_server
752                        .set_region_role_state_gracefully(
753                            region_id,
754                            SettableRegionRoleState::Leader,
755                        )
756                        .await?
757                    {
758                        error!(err; "failed to convert region {region_id} to leader");
759                    }
760                }
761                Err(e) => {
762                    error!(e; "failed to convert region {region_id} to leader");
763                }
764            }
765        }
766    }
767
768    #[cfg(feature = "enterprise")]
769    if !follower_regions.is_empty() {
770        use tokio::time::Instant;
771
772        let follower_region_num = follower_regions.len();
773        info!("going to open {} follower region(s)", follower_region_num);
774
775        let now = Instant::now();
776        let open_regions = region_server
777            .handle_batch_open_requests(
778                init_regions_parallelism,
779                follower_regions,
780                ignore_nonexistent_region,
781            )
782            .await?;
783        info!(
784            "Opened {} follower regions in {:?}",
785            open_regions.len(),
786            now.elapsed()
787        );
788
789        if !ignore_nonexistent_region {
790            ensure!(
791                open_regions.len() == follower_region_num,
792                error::UnexpectedSnafu {
793                    violated: format!(
794                        "Expected to open {} of follower regions, only {} of regions has opened",
795                        follower_region_num,
796                        open_regions.len()
797                    )
798                }
799            );
800        } else if open_regions.len() != follower_region_num {
801            warn!(
802                "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
803                follower_region_num,
804                open_regions.len()
805            );
806        }
807    }
808
809    info!("all regions are opened");
810
811    Ok(())
812}
813
814#[cfg(test)]
815mod tests {
816    use std::assert_matches;
817    use std::collections::{BTreeMap, HashMap};
818    use std::sync::Arc;
819
820    use cache::build_datanode_cache_registry;
821    use common_base::Plugins;
822    use common_meta::cache::LayeredCacheRegistryBuilder;
823    use common_meta::key::RegionRoleSet;
824    use common_meta::key::datanode_table::DatanodeTableManager;
825    use common_meta::kv_backend::KvBackendRef;
826    use common_meta::kv_backend::memory::MemoryKvBackend;
827    use mito2::engine::MITO_ENGINE_NAME;
828    use store_api::region_request::RegionRequest;
829    use store_api::storage::RegionId;
830
831    use crate::config::DatanodeOptions;
832    use crate::datanode::DatanodeBuilder;
833    use crate::tests::{MockRegionEngine, mock_region_server};
834
835    async fn setup_table_datanode(kv: &KvBackendRef) {
836        let mgr = DatanodeTableManager::new(kv.clone());
837        let txn = mgr
838            .build_create_txn(
839                1028,
840                MITO_ENGINE_NAME,
841                "foo/bar/weny",
842                HashMap::from([("foo".to_string(), "bar".to_string())]),
843                HashMap::default(),
844                BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
845            )
846            .unwrap();
847
848        let r = kv.txn(txn).await.unwrap();
849        assert!(r.succeeded);
850    }
851
852    #[tokio::test]
853    async fn test_initialize_region_server() {
854        common_telemetry::init_default_ut_logging();
855        let mut mock_region_server = mock_region_server();
856        let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
857
858        mock_region_server.register_engine(mock_region.clone());
859
860        let kv_backend = Arc::new(MemoryKvBackend::new());
861        let layered_cache_registry = Arc::new(
862            LayeredCacheRegistryBuilder::default()
863                .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
864                .build(),
865        );
866
867        let mut builder = DatanodeBuilder::new(
868            DatanodeOptions {
869                node_id: Some(0),
870                ..Default::default()
871            },
872            Plugins::default(),
873            kv_backend.clone(),
874        );
875        builder.with_cache_registry(layered_cache_registry);
876        setup_table_datanode(&(kv_backend as _)).await;
877
878        builder
879            .initialize_region_server(&mock_region_server, false)
880            .await
881            .unwrap();
882
883        for i in 0..3 {
884            let (region_id, req) = mock_region_handler.recv().await.unwrap();
885            assert_eq!(region_id, RegionId::new(1028, i));
886            if let RegionRequest::Open(req) = req {
887                assert_eq!(
888                    req.options,
889                    HashMap::from([("foo".to_string(), "bar".to_string())])
890                )
891            } else {
892                unreachable!()
893            }
894        }
895
896        assert_matches!(
897            mock_region_handler.try_recv(),
898            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
899        );
900    }
901}