datanode/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode implementation.
16
17use std::path::Path;
18use std::sync::Arc;
19use std::time::{Duration, Instant};
20
21use common_base::Plugins;
22use common_error::ext::BoxedError;
23use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
24use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
25use common_meta::datanode::TopicStatsReporter;
26use common_meta::key::runtime_switch::RuntimeSwitchManager;
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29pub use common_procedure::options::ProcedureConfig;
30use common_telemetry::{error, info, warn};
31use common_wal::config::kafka::DatanodeKafkaConfig;
32use common_wal::config::raft_engine::RaftEngineConfig;
33use common_wal::config::DatanodeWalConfig;
34use file_engine::engine::FileRegionEngine;
35use log_store::kafka::log_store::KafkaLogStore;
36use log_store::kafka::{default_index_file, GlobalIndexCollector};
37use log_store::raft_engine::log_store::RaftEngineLogStore;
38use meta_client::MetaClientRef;
39use metric_engine::engine::MetricEngine;
40use mito2::config::MitoConfig;
41use mito2::engine::{MitoEngine, MitoEngineBuilder};
42use mito2::sst::file_ref::{FileReferenceManager, FileReferenceManagerRef};
43use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
44use object_store::util::normalize_dir;
45use query::dummy_catalog::{DummyCatalogManager, TableProviderFactoryRef};
46use query::QueryEngineFactory;
47use servers::export_metrics::ExportMetricsTask;
48use servers::server::ServerHandlers;
49use snafu::{ensure, OptionExt, ResultExt};
50use store_api::path_utils::WAL_DIR;
51use store_api::region_engine::{RegionEngineRef, RegionRole};
52use tokio::fs;
53use tokio::sync::Notify;
54
55use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
56use crate::error::{
57    self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
58    MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
59    ShutdownServerSnafu, StartServerSnafu,
60};
61use crate::event_listener::{
62    new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
63    RegionServerEventReceiver,
64};
65use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
66use crate::heartbeat::HeartbeatTask;
67use crate::region_server::{DummyTableProviderFactory, RegionServer};
68use crate::store::{self, new_object_store_without_cache};
69use crate::utils::{build_region_open_requests, RegionOpenRequests};
70
71/// Datanode service.
72pub struct Datanode {
73    services: ServerHandlers,
74    heartbeat_task: Option<HeartbeatTask>,
75    region_event_receiver: Option<RegionServerEventReceiver>,
76    region_server: RegionServer,
77    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
78    leases_notifier: Option<Arc<Notify>>,
79    plugins: Plugins,
80    export_metrics_task: Option<ExportMetricsTask>,
81}
82
83impl Datanode {
84    pub async fn start(&mut self) -> Result<()> {
85        info!("Starting datanode instance...");
86
87        self.start_heartbeat().await?;
88        self.wait_coordinated().await;
89
90        self.start_telemetry();
91
92        if let Some(t) = self.export_metrics_task.as_ref() {
93            t.start(None).context(StartServerSnafu)?
94        }
95
96        self.services.start_all().await.context(StartServerSnafu)
97    }
98
99    pub fn server_handlers(&self) -> &ServerHandlers {
100        &self.services
101    }
102
103    pub fn start_telemetry(&self) {
104        if let Err(e) = self.greptimedb_telemetry_task.start() {
105            warn!(e; "Failed to start telemetry task!");
106        }
107    }
108
109    pub async fn start_heartbeat(&mut self) -> Result<()> {
110        if let Some(task) = &self.heartbeat_task {
111            // Safety: The event_receiver must exist.
112            let receiver = self.region_event_receiver.take().unwrap();
113
114            task.start(receiver, self.leases_notifier.clone()).await?;
115        }
116        Ok(())
117    }
118
119    /// If `leases_notifier` exists, it waits until leases have been obtained in all regions.
120    pub async fn wait_coordinated(&mut self) {
121        if let Some(notifier) = self.leases_notifier.take() {
122            notifier.notified().await;
123        }
124    }
125
126    pub fn setup_services(&mut self, services: ServerHandlers) {
127        self.services = services;
128    }
129
130    pub async fn shutdown(&mut self) -> Result<()> {
131        self.services
132            .shutdown_all()
133            .await
134            .context(ShutdownServerSnafu)?;
135
136        let _ = self.greptimedb_telemetry_task.stop().await;
137        if let Some(heartbeat_task) = &self.heartbeat_task {
138            heartbeat_task
139                .close()
140                .map_err(BoxedError::new)
141                .context(ShutdownInstanceSnafu)?;
142        }
143        self.region_server.stop().await?;
144        Ok(())
145    }
146
147    pub fn region_server(&self) -> RegionServer {
148        self.region_server.clone()
149    }
150
151    pub fn plugins(&self) -> Plugins {
152        self.plugins.clone()
153    }
154}
155
156pub struct DatanodeBuilder {
157    opts: DatanodeOptions,
158    table_provider_factory: Option<TableProviderFactoryRef>,
159    plugins: Plugins,
160    meta_client: Option<MetaClientRef>,
161    kv_backend: KvBackendRef,
162    cache_registry: Option<Arc<LayeredCacheRegistry>>,
163    topic_stats_reporter: Option<Box<dyn TopicStatsReporter>>,
164    #[cfg(feature = "enterprise")]
165    extension_range_provider_factory: Option<mito2::extension::BoxedExtensionRangeProviderFactory>,
166}
167
168impl DatanodeBuilder {
169    pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
170        Self {
171            opts,
172            table_provider_factory: None,
173            plugins,
174            meta_client: None,
175            kv_backend,
176            cache_registry: None,
177            #[cfg(feature = "enterprise")]
178            extension_range_provider_factory: None,
179            topic_stats_reporter: None,
180        }
181    }
182
183    pub fn options(&self) -> &DatanodeOptions {
184        &self.opts
185    }
186
187    pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
188        self.meta_client = Some(client);
189        self
190    }
191
192    pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
193        self.cache_registry = Some(registry);
194        self
195    }
196
197    pub fn kv_backend(&self) -> &KvBackendRef {
198        &self.kv_backend
199    }
200
201    pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
202        self.table_provider_factory = Some(factory);
203        self
204    }
205
206    #[cfg(feature = "enterprise")]
207    pub fn with_extension_range_provider(
208        &mut self,
209        extension_range_provider_factory: mito2::extension::BoxedExtensionRangeProviderFactory,
210    ) -> &mut Self {
211        self.extension_range_provider_factory = Some(extension_range_provider_factory);
212        self
213    }
214
215    pub async fn build(mut self) -> Result<Datanode> {
216        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
217
218        let meta_client = self.meta_client.take();
219
220        // If metasrv client is provided, we will use it to control the region server.
221        // Otherwise the region server is self-controlled, meaning no heartbeat and immediately
222        // writable upon open.
223        let controlled_by_metasrv = meta_client.is_some();
224
225        // build and initialize region server
226        let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
227            let (tx, rx) = new_region_server_event_channel();
228            (Box::new(tx) as _, Some(rx))
229        } else {
230            (Box::new(NoopRegionServerEventListener) as _, None)
231        };
232
233        let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
234        let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
235        let table_id_schema_cache: TableSchemaCacheRef =
236            cache_registry.get().context(MissingCacheSnafu)?;
237
238        let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
239            table_id_schema_cache,
240            schema_cache,
241        ));
242        let file_ref_manager = Arc::new(FileReferenceManager::new(Some(node_id)));
243        let region_server = self
244            .new_region_server(
245                schema_metadata_manager,
246                region_event_listener,
247                file_ref_manager,
248            )
249            .await?;
250
251        // TODO(weny): Considering introducing a readonly kv_backend trait.
252        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
253        let is_recovery_mode = runtime_switch_manager
254            .recovery_mode()
255            .await
256            .context(GetMetadataSnafu)?;
257
258        let region_open_requests =
259            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
260        let open_all_regions = open_all_regions(
261            region_server.clone(),
262            region_open_requests,
263            !controlled_by_metasrv,
264            self.opts.init_regions_parallelism,
265            // Ignore nonexistent regions in recovery mode.
266            is_recovery_mode,
267        );
268
269        if self.opts.init_regions_in_background {
270            // Opens regions in background.
271            common_runtime::spawn_global(async move {
272                if let Err(err) = open_all_regions.await {
273                    error!(err; "Failed to open regions during the startup.");
274                }
275            });
276        } else {
277            open_all_regions.await?;
278        }
279
280        let heartbeat_task = if let Some(meta_client) = meta_client {
281            Some(
282                HeartbeatTask::try_new(
283                    &self.opts,
284                    region_server.clone(),
285                    meta_client,
286                    cache_registry,
287                    self.plugins.clone(),
288                )
289                .await?,
290            )
291        } else {
292            None
293        };
294
295        let is_standalone = heartbeat_task.is_none();
296        let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
297            Some(self.opts.storage.data_home.clone()),
298            is_standalone && self.opts.enable_telemetry,
299        )
300        .await;
301
302        let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
303            Some(Arc::new(Notify::new()))
304        } else {
305            None
306        };
307
308        let export_metrics_task =
309            ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
310                .context(StartServerSnafu)?;
311
312        Ok(Datanode {
313            services: ServerHandlers::default(),
314            heartbeat_task,
315            region_server,
316            greptimedb_telemetry_task,
317            region_event_receiver,
318            leases_notifier,
319            plugins: self.plugins.clone(),
320            export_metrics_task,
321        })
322    }
323
324    /// Builds [ObjectStoreManager] from [StorageConfig].
325    pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
326        let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
327        let default_name = cfg.store.config_name();
328        let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
329        for store in &cfg.providers {
330            object_store_manager.add(
331                store.config_name(),
332                store::new_object_store(store.clone(), &cfg.data_home).await?,
333            );
334        }
335        Ok(Arc::new(object_store_manager))
336    }
337
338    #[cfg(test)]
339    /// Open all regions belong to this datanode.
340    async fn initialize_region_server(
341        &self,
342        region_server: &RegionServer,
343        open_with_writable: bool,
344    ) -> Result<()> {
345        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
346
347        // TODO(weny): Considering introducing a readonly kv_backend trait.
348        let runtime_switch_manager = RuntimeSwitchManager::new(self.kv_backend.clone());
349        let is_recovery_mode = runtime_switch_manager
350            .recovery_mode()
351            .await
352            .context(GetMetadataSnafu)?;
353        let region_open_requests =
354            build_region_open_requests(node_id, self.kv_backend.clone()).await?;
355
356        open_all_regions(
357            region_server.clone(),
358            region_open_requests,
359            open_with_writable,
360            self.opts.init_regions_parallelism,
361            is_recovery_mode,
362        )
363        .await
364    }
365
366    async fn new_region_server(
367        &mut self,
368        schema_metadata_manager: SchemaMetadataManagerRef,
369        event_listener: RegionServerEventListenerRef,
370        file_ref_manager: FileReferenceManagerRef,
371    ) -> Result<RegionServer> {
372        let opts: &DatanodeOptions = &self.opts;
373
374        let query_engine_factory = QueryEngineFactory::new_with_plugins(
375            // query engine in datanode only executes plan with resolved table source.
376            DummyCatalogManager::arc(),
377            None,
378            None,
379            None,
380            None,
381            None,
382            false,
383            self.plugins.clone(),
384            opts.query.clone(),
385        );
386        let query_engine = query_engine_factory.query_engine();
387
388        let table_provider_factory = self
389            .table_provider_factory
390            .clone()
391            .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
392
393        let mut region_server = RegionServer::with_table_provider(
394            query_engine,
395            common_runtime::global_runtime(),
396            event_listener,
397            table_provider_factory,
398            opts.max_concurrent_queries,
399            //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
400            Duration::from_millis(100),
401            opts.grpc.flight_compression,
402        );
403
404        let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
405        let engines = self
406            .build_store_engines(
407                object_store_manager,
408                schema_metadata_manager,
409                file_ref_manager,
410                self.plugins.clone(),
411            )
412            .await?;
413        for engine in engines {
414            region_server.register_engine(engine);
415        }
416        if let Some(topic_stats_reporter) = self.topic_stats_reporter.take() {
417            region_server.set_topic_stats_reporter(topic_stats_reporter);
418        }
419
420        Ok(region_server)
421    }
422
423    // internal utils
424
425    /// Builds [RegionEngineRef] from `store_engine` section in `opts`
426    async fn build_store_engines(
427        &mut self,
428        object_store_manager: ObjectStoreManagerRef,
429        schema_metadata_manager: SchemaMetadataManagerRef,
430        file_ref_manager: FileReferenceManagerRef,
431        plugins: Plugins,
432    ) -> Result<Vec<RegionEngineRef>> {
433        let mut metric_engine_config = metric_engine::config::EngineConfig::default();
434        let mut mito_engine_config = MitoConfig::default();
435        let mut file_engine_config = file_engine::config::EngineConfig::default();
436
437        for engine in &self.opts.region_engine {
438            match engine {
439                RegionEngineConfig::Mito(config) => {
440                    mito_engine_config = config.clone();
441                }
442                RegionEngineConfig::File(config) => {
443                    file_engine_config = config.clone();
444                }
445                RegionEngineConfig::Metric(metric_config) => {
446                    metric_engine_config = metric_config.clone();
447                }
448            }
449        }
450
451        let mito_engine = self
452            .build_mito_engine(
453                object_store_manager.clone(),
454                mito_engine_config,
455                schema_metadata_manager.clone(),
456                file_ref_manager.clone(),
457                plugins.clone(),
458            )
459            .await?;
460
461        let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
462            .context(BuildMetricEngineSnafu)?;
463
464        let file_engine = FileRegionEngine::new(
465            file_engine_config,
466            object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
467        );
468
469        Ok(vec![
470            Arc::new(mito_engine) as _,
471            Arc::new(metric_engine) as _,
472            Arc::new(file_engine) as _,
473        ])
474    }
475
476    /// Builds [MitoEngine] according to options.
477    async fn build_mito_engine(
478        &mut self,
479        object_store_manager: ObjectStoreManagerRef,
480        mut config: MitoConfig,
481        schema_metadata_manager: SchemaMetadataManagerRef,
482        file_ref_manager: FileReferenceManagerRef,
483        plugins: Plugins,
484    ) -> Result<MitoEngine> {
485        let opts = &self.opts;
486        if opts.storage.is_object_storage() {
487            // Enable the write cache when setting object storage
488            config.enable_write_cache = true;
489            info!("Configured 'enable_write_cache=true' for mito engine.");
490        }
491
492        let mito_engine = match &opts.wal {
493            DatanodeWalConfig::RaftEngine(raft_engine_config) => {
494                let log_store =
495                    Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
496                        .await?;
497
498                let builder = MitoEngineBuilder::new(
499                    &opts.storage.data_home,
500                    config,
501                    log_store,
502                    object_store_manager,
503                    schema_metadata_manager,
504                    file_ref_manager,
505                    plugins,
506                );
507
508                #[cfg(feature = "enterprise")]
509                let builder = builder.with_extension_range_provider_factory(
510                    self.extension_range_provider_factory.take(),
511                );
512
513                builder.try_build().await.context(BuildMitoEngineSnafu)?
514            }
515            DatanodeWalConfig::Kafka(kafka_config) => {
516                if kafka_config.create_index && opts.node_id.is_none() {
517                    warn!("The WAL index creation only available in distributed mode.")
518                }
519                let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
520                {
521                    let operator = new_object_store_without_cache(
522                        &opts.storage.store,
523                        &opts.storage.data_home,
524                    )
525                    .await?;
526                    let path = default_index_file(opts.node_id.unwrap());
527                    Some(Self::build_global_index_collector(
528                        kafka_config.dump_index_interval,
529                        operator,
530                        path,
531                    ))
532                } else {
533                    None
534                };
535
536                let log_store =
537                    Self::build_kafka_log_store(kafka_config, global_index_collector).await?;
538                self.topic_stats_reporter = Some(log_store.topic_stats_reporter());
539                let builder = MitoEngineBuilder::new(
540                    &opts.storage.data_home,
541                    config,
542                    log_store,
543                    object_store_manager,
544                    schema_metadata_manager,
545                    file_ref_manager,
546                    plugins,
547                );
548
549                #[cfg(feature = "enterprise")]
550                let builder = builder.with_extension_range_provider_factory(
551                    self.extension_range_provider_factory.take(),
552                );
553
554                builder.try_build().await.context(BuildMitoEngineSnafu)?
555            }
556        };
557        Ok(mito_engine)
558    }
559
560    /// Builds [RaftEngineLogStore].
561    async fn build_raft_engine_log_store(
562        data_home: &str,
563        config: &RaftEngineConfig,
564    ) -> Result<Arc<RaftEngineLogStore>> {
565        let data_home = normalize_dir(data_home);
566        let wal_dir = match &config.dir {
567            Some(dir) => dir.clone(),
568            None => format!("{}{WAL_DIR}", data_home),
569        };
570
571        // create WAL directory
572        fs::create_dir_all(Path::new(&wal_dir))
573            .await
574            .context(CreateDirSnafu { dir: &wal_dir })?;
575        info!(
576            "Creating raft-engine logstore with config: {:?} and storage path: {}",
577            config, &wal_dir
578        );
579        let logstore = RaftEngineLogStore::try_new(wal_dir, config)
580            .await
581            .map_err(Box::new)
582            .context(OpenLogStoreSnafu)?;
583
584        Ok(Arc::new(logstore))
585    }
586
587    /// Builds [`KafkaLogStore`].
588    async fn build_kafka_log_store(
589        config: &DatanodeKafkaConfig,
590        global_index_collector: Option<GlobalIndexCollector>,
591    ) -> Result<Arc<KafkaLogStore>> {
592        KafkaLogStore::try_new(config, global_index_collector)
593            .await
594            .map_err(Box::new)
595            .context(OpenLogStoreSnafu)
596            .map(Arc::new)
597    }
598
599    /// Builds [`GlobalIndexCollector`]
600    fn build_global_index_collector(
601        dump_index_interval: Duration,
602        operator: object_store::ObjectStore,
603        path: String,
604    ) -> GlobalIndexCollector {
605        GlobalIndexCollector::new(dump_index_interval, operator, path)
606    }
607}
608
609/// Open all regions belong to this datanode.
610async fn open_all_regions(
611    region_server: RegionServer,
612    region_open_requests: RegionOpenRequests,
613    open_with_writable: bool,
614    init_regions_parallelism: usize,
615    ignore_nonexistent_region: bool,
616) -> Result<()> {
617    let RegionOpenRequests {
618        leader_regions,
619        #[cfg(feature = "enterprise")]
620        follower_regions,
621    } = region_open_requests;
622
623    let leader_region_num = leader_regions.len();
624    info!("going to open {} region(s)", leader_region_num);
625    let now = Instant::now();
626    let open_regions = region_server
627        .handle_batch_open_requests(
628            init_regions_parallelism,
629            leader_regions,
630            ignore_nonexistent_region,
631        )
632        .await?;
633    info!(
634        "Opened {} regions in {:?}",
635        open_regions.len(),
636        now.elapsed()
637    );
638    if !ignore_nonexistent_region {
639        ensure!(
640            open_regions.len() == leader_region_num,
641            error::UnexpectedSnafu {
642                violated: format!(
643                    "Expected to open {} of regions, only {} of regions has opened",
644                    leader_region_num,
645                    open_regions.len()
646                )
647            }
648        );
649    } else if open_regions.len() != leader_region_num {
650        warn!(
651            "ignore nonexistent region, expected to open {} of regions, only {} of regions has opened",
652            leader_region_num,
653            open_regions.len()
654        );
655    }
656
657    for region_id in open_regions {
658        if open_with_writable {
659            if let Err(e) = region_server.set_region_role(region_id, RegionRole::Leader) {
660                error!(
661                    e; "failed to convert region {region_id} to leader"
662                );
663            }
664        }
665    }
666
667    #[cfg(feature = "enterprise")]
668    if !follower_regions.is_empty() {
669        use tokio::time::Instant;
670
671        let follower_region_num = follower_regions.len();
672        info!("going to open {} follower region(s)", follower_region_num);
673
674        let now = Instant::now();
675        let open_regions = region_server
676            .handle_batch_open_requests(
677                init_regions_parallelism,
678                follower_regions,
679                ignore_nonexistent_region,
680            )
681            .await?;
682        info!(
683            "Opened {} follower regions in {:?}",
684            open_regions.len(),
685            now.elapsed()
686        );
687
688        if !ignore_nonexistent_region {
689            ensure!(
690                open_regions.len() == follower_region_num,
691                error::UnexpectedSnafu {
692                    violated: format!(
693                        "Expected to open {} of follower regions, only {} of regions has opened",
694                        follower_region_num,
695                        open_regions.len()
696                    )
697                }
698            );
699        } else if open_regions.len() != follower_region_num {
700            warn!(
701                "ignore nonexistent region, expected to open {} of follower regions, only {} of regions has opened",
702                follower_region_num,
703                open_regions.len()
704            );
705        }
706    }
707
708    info!("all regions are opened");
709
710    Ok(())
711}
712
713#[cfg(test)]
714mod tests {
715    use std::assert_matches::assert_matches;
716    use std::collections::{BTreeMap, HashMap};
717    use std::sync::Arc;
718
719    use cache::build_datanode_cache_registry;
720    use common_base::Plugins;
721    use common_meta::cache::LayeredCacheRegistryBuilder;
722    use common_meta::key::datanode_table::DatanodeTableManager;
723    use common_meta::key::RegionRoleSet;
724    use common_meta::kv_backend::memory::MemoryKvBackend;
725    use common_meta::kv_backend::KvBackendRef;
726    use mito2::engine::MITO_ENGINE_NAME;
727    use store_api::region_request::RegionRequest;
728    use store_api::storage::RegionId;
729
730    use crate::config::DatanodeOptions;
731    use crate::datanode::DatanodeBuilder;
732    use crate::tests::{mock_region_server, MockRegionEngine};
733
734    async fn setup_table_datanode(kv: &KvBackendRef) {
735        let mgr = DatanodeTableManager::new(kv.clone());
736        let txn = mgr
737            .build_create_txn(
738                1028,
739                MITO_ENGINE_NAME,
740                "foo/bar/weny",
741                HashMap::from([("foo".to_string(), "bar".to_string())]),
742                HashMap::default(),
743                BTreeMap::from([(0, RegionRoleSet::new(vec![0, 1, 2], vec![]))]),
744            )
745            .unwrap();
746
747        let r = kv.txn(txn).await.unwrap();
748        assert!(r.succeeded);
749    }
750
751    #[tokio::test]
752    async fn test_initialize_region_server() {
753        common_telemetry::init_default_ut_logging();
754        let mut mock_region_server = mock_region_server();
755        let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
756
757        mock_region_server.register_engine(mock_region.clone());
758
759        let kv_backend = Arc::new(MemoryKvBackend::new());
760        let layered_cache_registry = Arc::new(
761            LayeredCacheRegistryBuilder::default()
762                .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
763                .build(),
764        );
765
766        let mut builder = DatanodeBuilder::new(
767            DatanodeOptions {
768                node_id: Some(0),
769                ..Default::default()
770            },
771            Plugins::default(),
772            kv_backend.clone(),
773        );
774        builder.with_cache_registry(layered_cache_registry);
775        setup_table_datanode(&(kv_backend as _)).await;
776
777        builder
778            .initialize_region_server(&mock_region_server, false)
779            .await
780            .unwrap();
781
782        for i in 0..3 {
783            let (region_id, req) = mock_region_handler.recv().await.unwrap();
784            assert_eq!(region_id, RegionId::new(1028, i));
785            if let RegionRequest::Open(req) = req {
786                assert_eq!(
787                    req.options,
788                    HashMap::from([("foo".to_string(), "bar".to_string())])
789                )
790            } else {
791                unreachable!()
792            }
793        }
794
795        assert_matches!(
796            mock_region_handler.try_recv(),
797            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
798        );
799    }
800}