datanode/
datanode.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Datanode implementation.
16
17use std::path::Path;
18use std::sync::Arc;
19use std::time::Duration;
20
21use catalog::memory::MemoryCatalogManager;
22use common_base::Plugins;
23use common_error::ext::BoxedError;
24use common_greptimedb_telemetry::GreptimeDBTelemetryTask;
25use common_meta::cache::{LayeredCacheRegistry, SchemaCacheRef, TableSchemaCacheRef};
26use common_meta::key::datanode_table::{DatanodeTableManager, DatanodeTableValue};
27use common_meta::key::{SchemaMetadataManager, SchemaMetadataManagerRef};
28use common_meta::kv_backend::KvBackendRef;
29use common_meta::wal_options_allocator::prepare_wal_options;
30pub use common_procedure::options::ProcedureConfig;
31use common_telemetry::{error, info, warn};
32use common_wal::config::kafka::DatanodeKafkaConfig;
33use common_wal::config::raft_engine::RaftEngineConfig;
34use common_wal::config::DatanodeWalConfig;
35use file_engine::engine::FileRegionEngine;
36use futures_util::TryStreamExt;
37use log_store::kafka::log_store::KafkaLogStore;
38use log_store::kafka::{default_index_file, GlobalIndexCollector};
39use log_store::raft_engine::log_store::RaftEngineLogStore;
40use meta_client::MetaClientRef;
41use metric_engine::engine::MetricEngine;
42use mito2::config::MitoConfig;
43use mito2::engine::MitoEngine;
44use object_store::manager::{ObjectStoreManager, ObjectStoreManagerRef};
45use object_store::util::normalize_dir;
46use query::dummy_catalog::TableProviderFactoryRef;
47use query::QueryEngineFactory;
48use servers::export_metrics::ExportMetricsTask;
49use servers::server::ServerHandlers;
50use snafu::{ensure, OptionExt, ResultExt};
51use store_api::path_utils::{region_dir, WAL_DIR};
52use store_api::region_engine::{RegionEngineRef, RegionRole};
53use store_api::region_request::RegionOpenRequest;
54use store_api::storage::RegionId;
55use tokio::fs;
56use tokio::sync::Notify;
57
58use crate::config::{DatanodeOptions, RegionEngineConfig, StorageConfig};
59use crate::error::{
60    self, BuildMetricEngineSnafu, BuildMitoEngineSnafu, CreateDirSnafu, GetMetadataSnafu,
61    MissingCacheSnafu, MissingNodeIdSnafu, OpenLogStoreSnafu, Result, ShutdownInstanceSnafu,
62    ShutdownServerSnafu, StartServerSnafu,
63};
64use crate::event_listener::{
65    new_region_server_event_channel, NoopRegionServerEventListener, RegionServerEventListenerRef,
66    RegionServerEventReceiver,
67};
68use crate::greptimedb_telemetry::get_greptimedb_telemetry_task;
69use crate::heartbeat::HeartbeatTask;
70use crate::region_server::{DummyTableProviderFactory, RegionServer};
71use crate::store::{self, new_object_store_without_cache};
72
73/// Datanode service.
74pub struct Datanode {
75    services: ServerHandlers,
76    heartbeat_task: Option<HeartbeatTask>,
77    region_event_receiver: Option<RegionServerEventReceiver>,
78    region_server: RegionServer,
79    greptimedb_telemetry_task: Arc<GreptimeDBTelemetryTask>,
80    leases_notifier: Option<Arc<Notify>>,
81    plugins: Plugins,
82    export_metrics_task: Option<ExportMetricsTask>,
83}
84
85impl Datanode {
86    pub async fn start(&mut self) -> Result<()> {
87        info!("Starting datanode instance...");
88
89        self.start_heartbeat().await?;
90        self.wait_coordinated().await;
91
92        self.start_telemetry();
93
94        if let Some(t) = self.export_metrics_task.as_ref() {
95            t.start(None).context(StartServerSnafu)?
96        }
97
98        self.services.start_all().await.context(StartServerSnafu)
99    }
100
101    pub fn server_handlers(&self) -> &ServerHandlers {
102        &self.services
103    }
104
105    pub fn start_telemetry(&self) {
106        if let Err(e) = self.greptimedb_telemetry_task.start() {
107            warn!(e; "Failed to start telemetry task!");
108        }
109    }
110
111    pub async fn start_heartbeat(&mut self) -> Result<()> {
112        if let Some(task) = &self.heartbeat_task {
113            // Safety: The event_receiver must exist.
114            let receiver = self.region_event_receiver.take().unwrap();
115
116            task.start(receiver, self.leases_notifier.clone()).await?;
117        }
118        Ok(())
119    }
120
121    /// If `leases_notifier` exists, it waits until leases have been obtained in all regions.
122    pub async fn wait_coordinated(&mut self) {
123        if let Some(notifier) = self.leases_notifier.take() {
124            notifier.notified().await;
125        }
126    }
127
128    pub fn setup_services(&mut self, services: ServerHandlers) {
129        self.services = services;
130    }
131
132    pub async fn shutdown(&mut self) -> Result<()> {
133        self.services
134            .shutdown_all()
135            .await
136            .context(ShutdownServerSnafu)?;
137
138        let _ = self.greptimedb_telemetry_task.stop().await;
139        if let Some(heartbeat_task) = &self.heartbeat_task {
140            heartbeat_task
141                .close()
142                .map_err(BoxedError::new)
143                .context(ShutdownInstanceSnafu)?;
144        }
145        self.region_server.stop().await?;
146        Ok(())
147    }
148
149    pub fn region_server(&self) -> RegionServer {
150        self.region_server.clone()
151    }
152
153    pub fn plugins(&self) -> Plugins {
154        self.plugins.clone()
155    }
156}
157
158pub struct DatanodeBuilder {
159    opts: DatanodeOptions,
160    table_provider_factory: Option<TableProviderFactoryRef>,
161    plugins: Plugins,
162    meta_client: Option<MetaClientRef>,
163    kv_backend: KvBackendRef,
164    cache_registry: Option<Arc<LayeredCacheRegistry>>,
165}
166
167impl DatanodeBuilder {
168    pub fn new(opts: DatanodeOptions, plugins: Plugins, kv_backend: KvBackendRef) -> Self {
169        Self {
170            opts,
171            table_provider_factory: None,
172            plugins,
173            meta_client: None,
174            kv_backend,
175            cache_registry: None,
176        }
177    }
178
179    pub fn options(&self) -> &DatanodeOptions {
180        &self.opts
181    }
182
183    pub fn with_meta_client(&mut self, client: MetaClientRef) -> &mut Self {
184        self.meta_client = Some(client);
185        self
186    }
187
188    pub fn with_cache_registry(&mut self, registry: Arc<LayeredCacheRegistry>) -> &mut Self {
189        self.cache_registry = Some(registry);
190        self
191    }
192
193    pub fn kv_backend(&self) -> &KvBackendRef {
194        &self.kv_backend
195    }
196
197    pub fn with_table_provider_factory(&mut self, factory: TableProviderFactoryRef) -> &mut Self {
198        self.table_provider_factory = Some(factory);
199        self
200    }
201
202    pub async fn build(mut self) -> Result<Datanode> {
203        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
204
205        let meta_client = self.meta_client.take();
206
207        // If metasrv client is provided, we will use it to control the region server.
208        // Otherwise the region server is self-controlled, meaning no heartbeat and immediately
209        // writable upon open.
210        let controlled_by_metasrv = meta_client.is_some();
211
212        // build and initialize region server
213        let (region_event_listener, region_event_receiver) = if controlled_by_metasrv {
214            let (tx, rx) = new_region_server_event_channel();
215            (Box::new(tx) as _, Some(rx))
216        } else {
217            (Box::new(NoopRegionServerEventListener) as _, None)
218        };
219
220        let cache_registry = self.cache_registry.take().context(MissingCacheSnafu)?;
221        let schema_cache: SchemaCacheRef = cache_registry.get().context(MissingCacheSnafu)?;
222        let table_id_schema_cache: TableSchemaCacheRef =
223            cache_registry.get().context(MissingCacheSnafu)?;
224
225        let schema_metadata_manager = Arc::new(SchemaMetadataManager::new(
226            table_id_schema_cache,
227            schema_cache,
228        ));
229        let region_server = self
230            .new_region_server(schema_metadata_manager, region_event_listener)
231            .await?;
232
233        let datanode_table_manager = DatanodeTableManager::new(self.kv_backend.clone());
234        let table_values = datanode_table_manager
235            .tables(node_id)
236            .try_collect::<Vec<_>>()
237            .await
238            .context(GetMetadataSnafu)?;
239
240        let open_all_regions = open_all_regions(
241            region_server.clone(),
242            table_values,
243            !controlled_by_metasrv,
244            self.opts.init_regions_parallelism,
245        );
246
247        if self.opts.init_regions_in_background {
248            // Opens regions in background.
249            common_runtime::spawn_global(async move {
250                if let Err(err) = open_all_regions.await {
251                    error!(err; "Failed to open regions during the startup.");
252                }
253            });
254        } else {
255            open_all_regions.await?;
256        }
257
258        let heartbeat_task = if let Some(meta_client) = meta_client {
259            Some(
260                HeartbeatTask::try_new(
261                    &self.opts,
262                    region_server.clone(),
263                    meta_client,
264                    cache_registry,
265                    self.plugins.clone(),
266                )
267                .await?,
268            )
269        } else {
270            None
271        };
272
273        let is_standalone = heartbeat_task.is_none();
274        let greptimedb_telemetry_task = get_greptimedb_telemetry_task(
275            Some(self.opts.storage.data_home.clone()),
276            is_standalone && self.opts.enable_telemetry,
277        )
278        .await;
279
280        let leases_notifier = if self.opts.require_lease_before_startup && !is_standalone {
281            Some(Arc::new(Notify::new()))
282        } else {
283            None
284        };
285
286        let export_metrics_task =
287            ExportMetricsTask::try_new(&self.opts.export_metrics, Some(&self.plugins))
288                .context(StartServerSnafu)?;
289
290        Ok(Datanode {
291            services: ServerHandlers::default(),
292            heartbeat_task,
293            region_server,
294            greptimedb_telemetry_task,
295            region_event_receiver,
296            leases_notifier,
297            plugins: self.plugins.clone(),
298            export_metrics_task,
299        })
300    }
301
302    /// Builds [ObjectStoreManager] from [StorageConfig].
303    pub async fn build_object_store_manager(cfg: &StorageConfig) -> Result<ObjectStoreManagerRef> {
304        let object_store = store::new_object_store(cfg.store.clone(), &cfg.data_home).await?;
305        let default_name = cfg.store.config_name();
306        let mut object_store_manager = ObjectStoreManager::new(default_name, object_store);
307        for store in &cfg.providers {
308            object_store_manager.add(
309                store.config_name(),
310                store::new_object_store(store.clone(), &cfg.data_home).await?,
311            );
312        }
313        Ok(Arc::new(object_store_manager))
314    }
315
316    #[cfg(test)]
317    /// Open all regions belong to this datanode.
318    async fn initialize_region_server(
319        &self,
320        region_server: &RegionServer,
321        kv_backend: KvBackendRef,
322        open_with_writable: bool,
323    ) -> Result<()> {
324        let node_id = self.opts.node_id.context(MissingNodeIdSnafu)?;
325
326        let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
327        let table_values = datanode_table_manager
328            .tables(node_id)
329            .try_collect::<Vec<_>>()
330            .await
331            .context(GetMetadataSnafu)?;
332
333        open_all_regions(
334            region_server.clone(),
335            table_values,
336            open_with_writable,
337            self.opts.init_regions_parallelism,
338        )
339        .await
340    }
341
342    async fn new_region_server(
343        &self,
344        schema_metadata_manager: SchemaMetadataManagerRef,
345        event_listener: RegionServerEventListenerRef,
346    ) -> Result<RegionServer> {
347        let opts: &DatanodeOptions = &self.opts;
348
349        let query_engine_factory = QueryEngineFactory::new_with_plugins(
350            // query engine in datanode only executes plan with resolved table source.
351            MemoryCatalogManager::with_default_setup(),
352            None,
353            None,
354            None,
355            None,
356            false,
357            self.plugins.clone(),
358            opts.query.clone(),
359        );
360        let query_engine = query_engine_factory.query_engine();
361
362        let table_provider_factory = self
363            .table_provider_factory
364            .clone()
365            .unwrap_or_else(|| Arc::new(DummyTableProviderFactory));
366
367        let mut region_server = RegionServer::with_table_provider(
368            query_engine,
369            common_runtime::global_runtime(),
370            event_listener,
371            table_provider_factory,
372            opts.max_concurrent_queries,
373            //TODO: revaluate the hardcoded timeout on the next version of datanode concurrency limiter.
374            Duration::from_millis(100),
375        );
376
377        let object_store_manager = Self::build_object_store_manager(&opts.storage).await?;
378        let engines = Self::build_store_engines(
379            opts,
380            object_store_manager,
381            schema_metadata_manager,
382            self.plugins.clone(),
383        )
384        .await?;
385        for engine in engines {
386            region_server.register_engine(engine);
387        }
388
389        Ok(region_server)
390    }
391
392    // internal utils
393
394    /// Builds [RegionEngineRef] from `store_engine` section in `opts`
395    async fn build_store_engines(
396        opts: &DatanodeOptions,
397        object_store_manager: ObjectStoreManagerRef,
398        schema_metadata_manager: SchemaMetadataManagerRef,
399        plugins: Plugins,
400    ) -> Result<Vec<RegionEngineRef>> {
401        let mut metric_engine_config = metric_engine::config::EngineConfig::default();
402        let mut mito_engine_config = MitoConfig::default();
403        let mut file_engine_config = file_engine::config::EngineConfig::default();
404
405        for engine in &opts.region_engine {
406            match engine {
407                RegionEngineConfig::Mito(config) => {
408                    mito_engine_config = config.clone();
409                }
410                RegionEngineConfig::File(config) => {
411                    file_engine_config = config.clone();
412                }
413                RegionEngineConfig::Metric(metric_config) => {
414                    metric_engine_config = metric_config.clone();
415                }
416            }
417        }
418
419        let mito_engine = Self::build_mito_engine(
420            opts,
421            object_store_manager.clone(),
422            mito_engine_config,
423            schema_metadata_manager.clone(),
424            plugins.clone(),
425        )
426        .await?;
427
428        let metric_engine = MetricEngine::try_new(mito_engine.clone(), metric_engine_config)
429            .context(BuildMetricEngineSnafu)?;
430
431        let file_engine = FileRegionEngine::new(
432            file_engine_config,
433            object_store_manager.default_object_store().clone(), // TODO: implement custom storage for file engine
434        );
435
436        Ok(vec![
437            Arc::new(mito_engine) as _,
438            Arc::new(metric_engine) as _,
439            Arc::new(file_engine) as _,
440        ])
441    }
442
443    /// Builds [MitoEngine] according to options.
444    async fn build_mito_engine(
445        opts: &DatanodeOptions,
446        object_store_manager: ObjectStoreManagerRef,
447        mut config: MitoConfig,
448        schema_metadata_manager: SchemaMetadataManagerRef,
449        plugins: Plugins,
450    ) -> Result<MitoEngine> {
451        if opts.storage.is_object_storage() {
452            // Enable the write cache when setting object storage
453            config.enable_write_cache = true;
454            info!("Configured 'enable_write_cache=true' for mito engine.");
455        }
456
457        let mito_engine = match &opts.wal {
458            DatanodeWalConfig::RaftEngine(raft_engine_config) => MitoEngine::new(
459                &opts.storage.data_home,
460                config,
461                Self::build_raft_engine_log_store(&opts.storage.data_home, raft_engine_config)
462                    .await?,
463                object_store_manager,
464                schema_metadata_manager,
465                plugins,
466            )
467            .await
468            .context(BuildMitoEngineSnafu)?,
469            DatanodeWalConfig::Kafka(kafka_config) => {
470                if kafka_config.create_index && opts.node_id.is_none() {
471                    warn!("The WAL index creation only available in distributed mode.")
472                }
473                let global_index_collector = if kafka_config.create_index && opts.node_id.is_some()
474                {
475                    let operator = new_object_store_without_cache(
476                        &opts.storage.store,
477                        &opts.storage.data_home,
478                    )
479                    .await?;
480                    let path = default_index_file(opts.node_id.unwrap());
481                    Some(Self::build_global_index_collector(
482                        kafka_config.dump_index_interval,
483                        operator,
484                        path,
485                    ))
486                } else {
487                    None
488                };
489
490                MitoEngine::new(
491                    &opts.storage.data_home,
492                    config,
493                    Self::build_kafka_log_store(kafka_config, global_index_collector).await?,
494                    object_store_manager,
495                    schema_metadata_manager,
496                    plugins,
497                )
498                .await
499                .context(BuildMitoEngineSnafu)?
500            }
501        };
502        Ok(mito_engine)
503    }
504
505    /// Builds [RaftEngineLogStore].
506    async fn build_raft_engine_log_store(
507        data_home: &str,
508        config: &RaftEngineConfig,
509    ) -> Result<Arc<RaftEngineLogStore>> {
510        let data_home = normalize_dir(data_home);
511        let wal_dir = match &config.dir {
512            Some(dir) => dir.clone(),
513            None => format!("{}{WAL_DIR}", data_home),
514        };
515
516        // create WAL directory
517        fs::create_dir_all(Path::new(&wal_dir))
518            .await
519            .context(CreateDirSnafu { dir: &wal_dir })?;
520        info!(
521            "Creating raft-engine logstore with config: {:?} and storage path: {}",
522            config, &wal_dir
523        );
524        let logstore = RaftEngineLogStore::try_new(wal_dir, config)
525            .await
526            .map_err(Box::new)
527            .context(OpenLogStoreSnafu)?;
528
529        Ok(Arc::new(logstore))
530    }
531
532    /// Builds [`KafkaLogStore`].
533    async fn build_kafka_log_store(
534        config: &DatanodeKafkaConfig,
535        global_index_collector: Option<GlobalIndexCollector>,
536    ) -> Result<Arc<KafkaLogStore>> {
537        KafkaLogStore::try_new(config, global_index_collector)
538            .await
539            .map_err(Box::new)
540            .context(OpenLogStoreSnafu)
541            .map(Arc::new)
542    }
543
544    /// Builds [`GlobalIndexCollector`]
545    fn build_global_index_collector(
546        dump_index_interval: Duration,
547        operator: object_store::ObjectStore,
548        path: String,
549    ) -> GlobalIndexCollector {
550        GlobalIndexCollector::new(dump_index_interval, operator, path)
551    }
552}
553
554/// Open all regions belong to this datanode.
555async fn open_all_regions(
556    region_server: RegionServer,
557    table_values: Vec<DatanodeTableValue>,
558    open_with_writable: bool,
559    init_regions_parallelism: usize,
560) -> Result<()> {
561    let mut regions = vec![];
562    for table_value in table_values {
563        for region_number in table_value.regions {
564            // Augments region options with wal options if a wal options is provided.
565            let mut region_options = table_value.region_info.region_options.clone();
566            prepare_wal_options(
567                &mut region_options,
568                RegionId::new(table_value.table_id, region_number),
569                &table_value.region_info.region_wal_options,
570            );
571
572            regions.push((
573                RegionId::new(table_value.table_id, region_number),
574                table_value.region_info.engine.clone(),
575                table_value.region_info.region_storage_path.clone(),
576                region_options,
577            ));
578        }
579    }
580    let num_regions = regions.len();
581    info!("going to open {} region(s)", num_regions);
582
583    let mut region_requests = Vec::with_capacity(regions.len());
584    for (region_id, engine, store_path, options) in regions {
585        let region_dir = region_dir(&store_path, region_id);
586        region_requests.push((
587            region_id,
588            RegionOpenRequest {
589                engine,
590                region_dir,
591                options,
592                skip_wal_replay: false,
593            },
594        ));
595    }
596
597    let open_regions = region_server
598        .handle_batch_open_requests(init_regions_parallelism, region_requests)
599        .await?;
600    ensure!(
601        open_regions.len() == num_regions,
602        error::UnexpectedSnafu {
603            violated: format!(
604                "Expected to open {} of regions, only {} of regions has opened",
605                num_regions,
606                open_regions.len()
607            )
608        }
609    );
610
611    for region_id in open_regions {
612        if open_with_writable {
613            if let Err(e) = region_server.set_region_role(region_id, RegionRole::Leader) {
614                error!(
615                    e; "failed to convert region {region_id} to leader"
616                );
617            }
618        }
619    }
620    info!("all regions are opened");
621
622    Ok(())
623}
624
625#[cfg(test)]
626mod tests {
627    use std::assert_matches::assert_matches;
628    use std::collections::{BTreeMap, HashMap};
629    use std::sync::Arc;
630
631    use cache::build_datanode_cache_registry;
632    use common_base::Plugins;
633    use common_meta::cache::LayeredCacheRegistryBuilder;
634    use common_meta::key::datanode_table::DatanodeTableManager;
635    use common_meta::kv_backend::memory::MemoryKvBackend;
636    use common_meta::kv_backend::KvBackendRef;
637    use mito2::engine::MITO_ENGINE_NAME;
638    use store_api::region_request::RegionRequest;
639    use store_api::storage::RegionId;
640
641    use crate::config::DatanodeOptions;
642    use crate::datanode::DatanodeBuilder;
643    use crate::tests::{mock_region_server, MockRegionEngine};
644
645    async fn setup_table_datanode(kv: &KvBackendRef) {
646        let mgr = DatanodeTableManager::new(kv.clone());
647        let txn = mgr
648            .build_create_txn(
649                1028,
650                MITO_ENGINE_NAME,
651                "foo/bar/weny",
652                HashMap::from([("foo".to_string(), "bar".to_string())]),
653                HashMap::default(),
654                BTreeMap::from([(0, vec![0, 1, 2])]),
655            )
656            .unwrap();
657
658        let r = kv.txn(txn).await.unwrap();
659        assert!(r.succeeded);
660    }
661
662    #[tokio::test]
663    async fn test_initialize_region_server() {
664        common_telemetry::init_default_ut_logging();
665        let mut mock_region_server = mock_region_server();
666        let (mock_region, mut mock_region_handler) = MockRegionEngine::new(MITO_ENGINE_NAME);
667
668        mock_region_server.register_engine(mock_region.clone());
669
670        let kv_backend = Arc::new(MemoryKvBackend::new());
671        let layered_cache_registry = Arc::new(
672            LayeredCacheRegistryBuilder::default()
673                .add_cache_registry(build_datanode_cache_registry(kv_backend.clone()))
674                .build(),
675        );
676
677        let mut builder = DatanodeBuilder::new(
678            DatanodeOptions {
679                node_id: Some(0),
680                ..Default::default()
681            },
682            Plugins::default(),
683            kv_backend,
684        );
685        builder.with_cache_registry(layered_cache_registry);
686
687        let kv = Arc::new(MemoryKvBackend::default()) as _;
688        setup_table_datanode(&kv).await;
689
690        builder
691            .initialize_region_server(&mock_region_server, kv.clone(), false)
692            .await
693            .unwrap();
694
695        for i in 0..3 {
696            let (region_id, req) = mock_region_handler.recv().await.unwrap();
697            assert_eq!(region_id, RegionId::new(1028, i));
698            if let RegionRequest::Open(req) = req {
699                assert_eq!(
700                    req.options,
701                    HashMap::from([("foo".to_string(), "bar".to_string())])
702                )
703            } else {
704                unreachable!()
705            }
706        }
707
708        assert_matches!(
709            mock_region_handler.try_recv(),
710            Err(tokio::sync::mpsc::error::TryRecvError::Empty)
711        );
712    }
713}