datanode/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode implementation.
16
17use std::path::Path;
18use std::sync::Arc;
19use std::time::Duration;
20
21use catalog::memory::MemoryCatalogManager;
22use common_base::Plugins;
23use common_error::ext::BoxedError;
24use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
25use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
26use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29use common_meta::wal_options_allocator::prepare_wal_options;
30pub use common_procedure::options::ProcedureConfig;
31use common_telemetry::{error, info, warn};
32use common_wal::config::kafka::DatanodeKafkaConfig;
33use common_wal::config::raft_engine::RaftEngineConfig;
34use common_wal::config::DatanodeWalConfig;
35use file_engine::engine::FileRegionEngine;
36use futures_util::TryStreamExt;
37use log_store::kafka::log_store::KafkaLogStore;
38use log_store::kafka::{default_index_file, GlobalIndexCollector};
39use log_store::raft_engine::log_store::RaftEngineLogStore;
40use meta_client::MetaClientRef;
41use metric_engine::engine::MetricEngine;
42use mito2::config::MitoConfig;
43use mito2::engine::MitoEngine;
44use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
45use object_store::util::normalize_dir;
46use query::QueryEngineFactory;
47use servers::export_metrics::ExportMetricsTask;
48use servers::server::ServerHandlers;
49use servers::Mode;
50use snafu::{ensure, OptionExt, ResultExt};
51use store_api::path_utils::{region_dir, WAL_DIR};
52use store_api::region_engine::{RegionEngineRef, RegionRole};
53use store_api::region_request::RegionOpenRequest;
54use store_api::storage::RegionId;
55use tokio::fs;
56use tokio::sync::Notify;
57
58use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
59use crate::error::{
60    self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
61    MissingCacheSnafu, MissingKvBackendSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result,
62    ShutdownInstanceSnafu, ShutdownServerSnafu, StartServerSnafu,
63};
64use crate::event_listener::{
65    new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
66    RegionServerEventReceiver,
67};
68use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
69use crate::heartbeat::HeartbeatTask;
70use crate::region_server::{DummyTableProviderFactory, RegionServer};
71use crate::store::{self, new_object_store_without_cache};
72
73/// Datanode service.
74pub struct Datanode {
75    services: ServerHandlers,
76    heartbeat_task: Option<HeartbeatTask>,
77    region_event_receiver: Option<RegionServerEventReceiver>,
78    region_server: RegionServer,
79    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
80    leases_notifier: Option<Arc<Notify>>,
81    plugins: Plugins,
82    export_metrics_task: Option<ExportMetricsTask>,
83}
84
85impl Datanode {
86    pub async fn start(&mut self) -> Result<()> {
87        info!("Starting datanode instance...");
88
89        self.start_heartbeat().await?;
90        self.wait_coordinated().await;
91
92        self.start_telemetry();
93
94        if let Some(t) = self.export_metrics_task.as_ref() {
95            t.start(None).context(StartServerSnafu)?
96        }
97
98        self.services.start_all().await.context(StartServerSnafu)
99    }
100
101    pub fn server_handlers(&self) -> &ServerHandlers {
102        &self.services
103    }
104
105    pub fn start_telemetry(&self) {
106        if let Err(e) = self.greptimedb_telemetry_task.start() {
107            warn!(e; "Failed to start telemetry task!");
108        }
109    }
110
111    pub async fn start_heartbeat(&mut self) -> Result<()> {
112        if let Some(task) = &self.heartbeat_task {
113            // Safety: The event_receiver must exist.
114            let receiver = self.region_event_receiver.take().unwrap();
115
116            task.start(receiver, self.leases_notifier.clone()).await?;
117        }
118        Ok(())
119    }
120
121    /// If `leases_notifier` exists, it waits until leases have been obtained in all regions.
122    pub async fn wait_coordinated(&mut self) {
123        if let Some(notifier) = self.leases_notifier.take() {
124            notifier.notified().await;
125        }
126    }
127
128    pub fn setup_services(&mut self, services: ServerHandlers) {
129        self.services = services;
130    }
131
132    pub async fn shutdown(&self) -> Result<()> {
133        self.services
134            .shutdown_all()
135            .await
136            .context(ShutdownServerSnafu)?;
137
138        let _ = self.greptimedb_telemetry_task.stop().await;
139        if let Some(heartbeat_task) = &self.heartbeat_task {
140            heartbeat_task
141                .close()
142                .map_err(BoxedError::new)
143                .context(ShutdownInstanceSnafu)?;
144        }
145        self.region_server.stop().await?;
146        Ok(())
147    }
148
149    pub fn region_server(&self) -> RegionServer {
150        self.region_server.clone()
151    }
152
153    pub fn plugins(&self) -> Plugins {
154        self.plugins.clone()
155    }
156}
157
158pub struct DatanodeBuilder {
159    opts: DatanodeOptions,
160    mode: Mode,
161    plugins: Plugins,
162    meta_client: Option<MetaClientRef>,
163    kv_backend: Option<KvBackendRef>,
164    cache_registry: Option<Arc<LayeredCacheRegistry>>,
165}
166
167impl DatanodeBuilder {
168    /// `kv_backend` is optional. If absent, the builder will try to build one
169    /// by using the given `opts`
170    pub fn new(opts: DatanodeOptions, plugins: Plugins, mode: Mode) -> Self {
171        Self {
172            opts,
173            mode,
174            plugins,
175            meta_client: None,
176            kv_backend: None,
177            cache_registry: None,
178        }
179    }
180
181    pub fn with_meta_client(self, meta_client: MetaClientRef) -> Self {
182        Self {
183            meta_client: Some(meta_client),
184            ..self
185        }
186    }
187
188    pub fn with_cache_registry(self, cache_registry: Arc<LayeredCacheRegistry>) -> Self {
189        Self {
190            cache_registry: Some(cache_registry),
191            ..self
192        }
193    }
194
195    pub fn with_kv_backend(self, kv_backend: KvBackendRef) -> Self {
196        Self {
197            kv_backend: Some(kv_backend),
198            ..self
199        }
200    }
201
202    pub async fn build(mut self) -> Result<Datanode> {
203        let mode = &self.mode;
204        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
205
206        let meta_client = self.meta_client.take();
207
208        // If metasrv client is provided, we will use it to control the region server.
209        // Otherwise the region server is self-controlled, meaning no heartbeat and immediately
210        // writable upon open.
211        let controlled_by_metasrv = meta_client.is_some();
212
213        let kv_backend = self.kv_backend.take().context(MissingKvBackendSnafu)?;
214
215        // build and initialize region server
216        let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
217            let (tx, rx) = new_region_server_event_channel();
218            (Box::new(tx) as _, Some(rx))
219        } else {
220            (Box::new(NoopRegionServerEventListener) as _, None)
221        };
222
223        let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
224        let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
225        let table_id_schema_cache: TableSchemaCacheRef =
226            cache_registry.get().context(MissingCacheSnafu)?;
227
228        let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
229            table_id_schema_cache,
230            schema_cache,
231        ));
232        let region_server = self
233            .new_region_server(schema_metadata_manager, region_event_listener)
234            .await?;
235
236        let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
237        let table_values = datanode_table_manager
238            .tables(node_id)
239            .try_collect::<Vec<_>>()
240            .await
241            .context(GetMetadataSnafu)?;
242
243        let open_all_regions = open_all_regions(
244            region_server.clone(),
245            table_values,
246            !controlled_by_metasrv,
247            self.opts.init_regions_parallelism,
248        );
249
250        if self.opts.init_regions_in_background {
251            // Opens regions in background.
252            common_runtime::spawn_global(async move {
253                if let Err(err) = open_all_regions.await {
254                    error!(err; "Failed to open regions during the startup.");
255                }
256            });
257        } else {
258            open_all_regions.await?;
259        }
260
261        let heartbeat_task = if let Some(meta_client) = meta_client {
262            Some(
263                HeartbeatTask::try_new(
264                    &self.opts,
265                    region_server.clone(),
266                    meta_client,
267                    cache_registry,
268                    self.plugins.clone(),
269                )
270                .await?,
271            )
272        } else {
273            None
274        };
275
276        let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
277            Some(self.opts.storage.data_home.clone()),
278            mode,
279            self.opts.enable_telemetry,
280        )
281        .await;
282
283        let leases_notifier =
284            if self.opts.require_lease_before_startup && matches!(mode, Mode::Distributed) {
285                Some(Arc::new(Notify::new()))
286            } else {
287                None
288            };
289
290        let export_metrics_task =
291            ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
292                .context(StartServerSnafu)?;
293
294        Ok(Datanode {
295            services: ServerHandlers::default(),
296            heartbeat_task,
297            region_server,
298            greptimedb_telemetry_task,
299            region_event_receiver,
300            leases_notifier,
301            plugins: self.plugins.clone(),
302            export_metrics_task,
303        })
304    }
305
306    /// Builds [ObjectStoreManager] from [StorageConfig].
307    pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
308        let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
309        let default_name = cfg.store.config_name();
310        let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
311        for store in &cfg.providers {
312            object_store_manager.add(
313                store.config_name(),
314                store::new_object_store(store.clone(), &cfg.data_home).await?,
315            );
316        }
317        Ok(Arc::new(object_store_manager))
318    }
319
320    #[cfg(test)]
321    /// Open all regions belong to this datanode.
322    async fn initialize_region_server(
323        &self,
324        region_server: &RegionServer,
325        kv_backend: KvBackendRef,
326        open_with_writable: bool,
327    ) -> Result<()> {
328        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
329
330        let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
331        let table_values = datanode_table_manager
332            .tables(node_id)
333            .try_collect::<Vec<_>>()
334            .await
335            .context(GetMetadataSnafu)?;
336
337        open_all_regions(
338            region_server.clone(),
339            table_values,
340            open_with_writable,
341            self.opts.init_regions_parallelism,
342        )
343        .await
344    }
345
346    async fn new_region_server(
347        &self,
348        schema_metadata_manager: SchemaMetadataManagerRef,
349        event_listener: RegionServerEventListenerRef,
350    ) -> Result<RegionServer> {
351        let opts: &DatanodeOptions = &self.opts;
352
353        let query_engine_factory = QueryEngineFactory::new_with_plugins(
354            // query engine in datanode only executes plan with resolved table source.
355            MemoryCatalogManager::with_default_setup(),
356            None,
357            None,
358            None,
359            None,
360            false,
361            self.plugins.clone(),
362            opts.query.clone(),
363        );
364        let query_engine = query_engine_factory.query_engine();
365
366        let table_provider_factory = Arc::new(DummyTableProviderFactory);
367        let mut region_server = RegionServer::with_table_provider(
368            query_engine,
369            common_runtime::global_runtime(),
370            event_listener,
371            table_provider_factory,
372            opts.max_concurrent_queries,
373            //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
374            Duration::from_millis(100),
375        );
376
377        let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
378        let engines = Self::build_store_engines(
379            opts,
380            object_store_manager,
381            schema_metadata_manager,
382            self.plugins.clone(),
383        )
384        .await?;
385        for engine in engines {
386            region_server.register_engine(engine);
387        }
388
389        Ok(region_server)
390    }
391
392    // internal utils
393
394    /// Builds [RegionEngineRef] from `store_engine` section in `opts`
395    async fn build_store_engines(
396        opts: &DatanodeOptions,
397        object_store_manager: ObjectStoreManagerRef,
398        schema_metadata_manager: SchemaMetadataManagerRef,
399        plugins: Plugins,
400    ) -> Result<Vec<RegionEngineRef>> {
401        let mut metric_engine_config = metric_engine::config::EngineConfig::default();
402        let mut mito_engine_config = MitoConfig::default();
403        let mut file_engine_config = file_engine::config::EngineConfig::default();
404
405        for engine in &opts.region_engine {
406            match engine {
407                RegionEngineConfig::Mito(config) => {
408                    mito_engine_config = config.clone();
409                }
410                RegionEngineConfig::File(config) => {
411                    file_engine_config = config.clone();
412                }
413                RegionEngineConfig::Metric(metric_config) => {
414                    metric_engine_config = metric_config.clone();
415                }
416            }
417        }
418
419        let mito_engine = Self::build_mito_engine(
420            opts,
421            object_store_manager.clone(),
422            mito_engine_config,
423            schema_metadata_manager.clone(),
424            plugins.clone(),
425        )
426        .await?;
427
428        let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
429            .context(BuildMetricEngineSnafu)?;
430
431        let file_engine = FileRegionEngine::new(
432            file_engine_config,
433            object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
434        );
435
436        Ok(vec![
437            Arc::new(mito_engine) as _,
438            Arc::new(metric_engine) as _,
439            Arc::new(file_engine) as _,
440        ])
441    }
442
443    /// Builds [MitoEngine] according to options.
444    async fn build_mito_engine(
445        opts: &DatanodeOptions,
446        object_store_manager: ObjectStoreManagerRef,
447        mut config: MitoConfig,
448        schema_metadata_manager: SchemaMetadataManagerRef,
449        plugins: Plugins,
450    ) -> Result<MitoEngine> {
451        if opts.storage.is_object_storage() {
452            // Enable the write cache when setting object storage
453            config.enable_write_cache = true;
454            info!("Configured 'enable_write_cache=true' for mito engine.");
455        }
456
457        let mito_engine = match &opts.wal {
458            DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
459                &opts.storage.data_home,
460                config,
461                Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
462                    .await?,
463                object_store_manager,
464                schema_metadata_manager,
465                plugins,
466            )
467            .await
468            .context(BuildMitoEngineSnafu)?,
469            DatanodeWalConfig::Kafka(kafka_config) => {
470                if kafka_config.create_index && opts.node_id.is_none() {
471                    warn!("The WAL index creation only available in distributed mode.")
472                }
473                let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
474                {
475                    let operator = new_object_store_without_cache(
476                        &opts.storage.store,
477                        &opts.storage.data_home,
478                    )
479                    .await?;
480                    let path = default_index_file(opts.node_id.unwrap());
481                    Some(Self::build_global_index_collector(
482                        kafka_config.dump_index_interval,
483                        operator,
484                        path,
485                    ))
486                } else {
487                    None
488                };
489
490                MitoEngine::new(
491                    &opts.storage.data_home,
492                    config,
493                    Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
494                    object_store_manager,
495                    schema_metadata_manager,
496                    plugins,
497                )
498                .await
499                .context(BuildMitoEngineSnafu)?
500            }
501        };
502        Ok(mito_engine)
503    }
504
505    /// Builds [RaftEngineLogStore].
506    async fn build_raft_engine_log_store(
507        data_home: &str,
508        config: &RaftEngineConfig,
509    ) -> Result<Arc<RaftEngineLogStore>> {
510        let data_home = normalize_dir(data_home);
511        let wal_dir = match &config.dir {
512            Some(dir) => dir.clone(),
513            None => format!("{}{WAL_DIR}", data_home),
514        };
515
516        // create WAL directory
517        fs::create_dir_all(Path::new(&wal_dir))
518            .await
519            .context(CreateDirSnafu { dir: &wal_dir })?;
520        info!(
521            "Creating raft-engine logstore with config: {:?} and storage path: {}",
522            config, &wal_dir
523        );
524        let logstore = RaftEngineLogStore::try_new(wal_dir, config)
525            .await
526            .map_err(Box::new)
527            .context(OpenLogStoreSnafu)?;
528
529        Ok(Arc::new(logstore))
530    }
531
532    /// Builds [`KafkaLogStore`].
533    async fn build_kafka_log_store(
534        config: &DatanodeKafkaConfig,
535        global_index_collector: Option<GlobalIndexCollector>,
536    ) -> Result<Arc<KafkaLogStore>> {
537        KafkaLogStore::try_new(config, global_index_collector)
538            .await
539            .map_err(Box::new)
540            .context(OpenLogStoreSnafu)
541            .map(Arc::new)
542    }
543
544    /// Builds [`GlobalIndexCollector`]
545    fn build_global_index_collector(
546        dump_index_interval: Duration,
547        operator: object_store::ObjectStore,
548        path: String,
549    ) -> GlobalIndexCollector {
550        GlobalIndexCollector::new(dump_index_interval, operator, path)
551    }
552}
553
554/// Open all regions belong to this datanode.
555async fn open_all_regions(
556    region_server: RegionServer,
557    table_values: Vec<DatanodeTableValue>,
558    open_with_writable: bool,
559    init_regions_parallelism: usize,
560) -> Result<()> {
561    let mut regions = vec![];
562    for table_value in table_values {
563        for region_number in table_value.regions {
564            // Augments region options with wal options if a wal options is provided.
565            let mut region_options = table_value.region_info.region_options.clone();
566            prepare_wal_options(
567                &mut region_options,
568                RegionId::new(table_value.table_id, region_number),
569                &table_value.region_info.region_wal_options,
570            );
571
572            regions.push((
573                RegionId::new(table_value.table_id, region_number),
574                table_value.region_info.engine.clone(),
575                table_value.region_info.region_storage_path.clone(),
576                region_options,
577            ));
578        }
579    }
580    let num_regions = regions.len();
581    info!("going to open {} region(s)", num_regions);
582
583    let mut region_requests = Vec::with_capacity(regions.len());
584    for (region_id, engine, store_path, options) in regions {
585        let region_dir = region_dir(&store_path, region_id);
586        region_requests.push((
587            region_id,
588            RegionOpenRequest {
589                engine,
590                region_dir,
591                options,
592                skip_wal_replay: false,
593            },
594        ));
595    }
596
597    let open_regions = region_server
598        .handle_batch_open_requests(init_regions_parallelism, region_requests)
599        .await?;
600    ensure!(
601        open_regions.len() == num_regions,
602        error::UnexpectedSnafu {
603            violated: format!(
604                "Expected to open {} of regions, only {} of regions has opened",
605                num_regions,
606                open_regions.len()
607            )
608        }
609    );
610
611    for region_id in open_regions {
612        if open_with_writable {
613            if let Err(e) = region_server.set_region_role(region_id, RegionRole::Leader) {
614                error!(
615                    e; "failed to convert region {region_id} to leader"
616                );
617            }
618        }
619    }
620    info!("all regions are opened");
621
622    Ok(())
623}
624
625#[cfg(test)]
626mod tests {
627    use std::assert_matches::assert_matches;
628    use std::collections::{BTreeMap, HashMap};
629    use std::sync::Arc;
630
631    use cache::build_datanode_cache_registry;
632    use common_base::Plugins;
633    use common_meta::cache::LayeredCacheRegistryBuilder;
634    use common_meta::key::datanode_table::DatanodeTableManager;
635    use common_meta::kv_backend::memory::MemoryKvBackend;
636    use common_meta::kv_backend::KvBackendRef;
637    use mito2::engine::MITO_ENGINE_NAME;
638    use servers::Mode;
639    use store_api::region_request::RegionRequest;
640    use store_api::storage::RegionId;
641
642    use crate::config::DatanodeOptions;
643    use crate::datanode::DatanodeBuilder;
644    use crate::tests::{mock_region_server, MockRegionEngine};
645
646    async fn setup_table_datanode(kv: &KvBackendRef) {
647        let mgr = DatanodeTableManager::new(kv.clone());
648        let txn = mgr
649            .build_create_txn(
650                1028,
651                MITO_ENGINE_NAME,
652                "foo/bar/weny",
653                HashMap::from([("foo".to_string(), "bar".to_string())]),
654                HashMap::default(),
655                BTreeMap::from([(0, vec![0, 1, 2])]),
656            )
657            .unwrap();
658
659        let r = kv.txn(txn).await.unwrap();
660        assert!(r.succeeded);
661    }
662
663    #[tokio::test]
664    async fn test_initialize_region_server() {
665        common_telemetry::init_default_ut_logging();
666        let mut mock_region_server = mock_region_server();
667        let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
668
669        mock_region_server.register_engine(mock_region.clone());
670
671        let kv_backend = Arc::new(MemoryKvBackend::new());
672        let layered_cache_registry = Arc::new(
673            LayeredCacheRegistryBuilder::default()
674                .add_cache_registry(build_datanode_cache_registry(kv_backend))
675                .build(),
676        );
677
678        let builder = DatanodeBuilder::new(
679            DatanodeOptions {
680                node_id: Some(0),
681                ..Default::default()
682            },
683            Plugins::default(),
684            Mode::Standalone,
685        )
686        .with_cache_registry(layered_cache_registry);
687
688        let kv = Arc::new(MemoryKvBackend::default()) as _;
689        setup_table_datanode(&kv).await;
690
691        builder
692            .initialize_region_server(&mock_region_server, kv.clone(), false)
693            .await
694            .unwrap();
695
696        for i in 0..3 {
697            let (region_id, req) = mock_region_handler.recv().await.unwrap();
698            assert_eq!(region_id, RegionId::new(1028, i));
699            if let RegionRequest::Open(req) = req {
700                assert_eq!(
701                    req.options,
702                    HashMap::from([("foo".to_string(), "bar".to_string())])
703                )
704            } else {
705                unreachable!()
706            }
707        }
708
709        assert_matches!(
710            mock_region_handler.try_recv(),
711            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
712        );
713    }
714}