metric_engine/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod state;
27mod sync;
28
29use std::any::Any;
30use std::collections::HashMap;
31use std::sync::{Arc, RwLock};
32
33use api::region::RegionResponse;
34use async_trait::async_trait;
35use common_error::ext::{BoxedError, ErrorExt};
36use common_error::status_code::StatusCode;
37use common_runtime::RepeatedTask;
38use mito2::engine::MitoEngine;
39pub(crate) use options::IndexOptions;
40use snafu::ResultExt;
41pub(crate) use state::MetricEngineState;
42use store_api::metadata::RegionMetadataRef;
43use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
44use store_api::region_engine::{
45    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
46    RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
47    SettableRegionRoleState, SyncManifestResponse,
48};
49use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
50use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
51
52use crate::config::EngineConfig;
53use crate::data_region::DataRegion;
54use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu};
55use crate::metadata_region::MetadataRegion;
56use crate::repeated_task::FlushMetadataRegionTask;
57use crate::row_modifier::RowModifier;
58use crate::utils::{self, get_region_statistic};
59
60#[cfg_attr(doc, aquamarine::aquamarine)]
61/// # Metric Engine
62///
63/// ## Regions
64///
65/// Regions in this metric engine has several roles. There is `PhysicalRegion`,
66/// which refer to the region that actually stores the data. And `LogicalRegion`
67/// that is "simulated" over physical regions. Each logical region is associated
68/// with one physical region group, which is a group of two physical regions.
69/// Their relationship is illustrated below:
70///
71/// ```mermaid
72/// erDiagram
73///     LogicalRegion ||--o{ PhysicalRegionGroup : corresponds
74///     PhysicalRegionGroup ||--|| DataRegion : contains
75///     PhysicalRegionGroup ||--|| MetadataRegion : contains
76/// ```
77///
78/// Metric engine uses two region groups. One is for data region
79/// ([METRIC_DATA_REGION_GROUP](crate::consts::METRIC_DATA_REGION_GROUP)), and the
80/// other is for metadata region ([METRIC_METADATA_REGION_GROUP](crate::consts::METRIC_METADATA_REGION_GROUP)).
81/// From the definition of [`RegionId`], we can convert between these two physical
82/// region ids easily. Thus in the code base we usually refer to one "physical
83/// region id", and convert it to the other one when necessary.
84///
85/// The logical region, in contrast, is a virtual region. It doesn't has dedicated
86/// storage or region group. Only a region id that is allocated by meta server.
87/// And all other things is shared with other logical region that are associated
88/// with the same physical region group.
89///
90/// For more document about physical regions, please refer to [`MetadataRegion`]
91/// and [`DataRegion`].
92///
93/// ## Operations
94///
95/// Both physical and logical region are accessible to user. But the operation
96/// they support are different. List below:
97///
98/// | Operations | Logical Region | Physical Region |
99/// | ---------- | -------------- | --------------- |
100/// |   Create   |       ✅        |        ✅        |
101/// |    Drop    |       ✅        |        ❓*       |
102/// |   Write    |       ✅        |        ❌        |
103/// |    Read    |       ✅        |        ✅        |
104/// |   Close    |       ✅        |        ✅        |
105/// |    Open    |       ✅        |        ✅        |
106/// |   Alter    |       ✅        |        ❓*       |
107///
108/// *: Physical region can be dropped only when all related logical regions are dropped.
109/// *: Alter: Physical regions only support altering region options.
110///
111/// ## Internal Columns
112///
113/// The physical data region contains two internal columns. Should
114/// mention that "internal" here is for metric engine itself. Mito
115/// engine will add it's internal columns to the region as well.
116///
117/// Their column id is registered in [`ReservedColumnId`]. And column name is
118/// defined in [`DATA_SCHEMA_TSID_COLUMN_NAME`] and [`DATA_SCHEMA_TABLE_ID_COLUMN_NAME`].
119///
120/// Tsid is generated by hashing all tags. And table id is retrieved from logical region
121/// id to distinguish data from different logical tables.
122#[derive(Clone)]
123pub struct MetricEngine {
124    inner: Arc<MetricEngineInner>,
125}
126
127#[async_trait]
128impl RegionEngine for MetricEngine {
129    /// Name of this engine
130    fn name(&self) -> &str {
131        METRIC_ENGINE_NAME
132    }
133
134    async fn handle_batch_open_requests(
135        &self,
136        parallelism: usize,
137        requests: Vec<(RegionId, RegionOpenRequest)>,
138    ) -> Result<BatchResponses, BoxedError> {
139        self.inner
140            .handle_batch_open_requests(parallelism, requests)
141            .await
142            .map_err(BoxedError::new)
143    }
144
145    async fn handle_batch_ddl_requests(
146        &self,
147        batch_request: BatchRegionDdlRequest,
148    ) -> Result<RegionResponse, BoxedError> {
149        match batch_request {
150            BatchRegionDdlRequest::Create(requests) => {
151                let mut extension_return_value = HashMap::new();
152                let rows = self
153                    .inner
154                    .create_regions(requests, &mut extension_return_value)
155                    .await
156                    .map_err(BoxedError::new)?;
157
158                Ok(RegionResponse {
159                    affected_rows: rows,
160                    extensions: extension_return_value,
161                    metadata: Vec::new(),
162                })
163            }
164            BatchRegionDdlRequest::Alter(requests) => {
165                let mut extension_return_value = HashMap::new();
166                let rows = self
167                    .inner
168                    .alter_regions(requests, &mut extension_return_value)
169                    .await
170                    .map_err(BoxedError::new)?;
171
172                Ok(RegionResponse {
173                    affected_rows: rows,
174                    extensions: extension_return_value,
175                    metadata: Vec::new(),
176                })
177            }
178            BatchRegionDdlRequest::Drop(requests) => {
179                self.handle_requests(
180                    requests
181                        .into_iter()
182                        .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
183                )
184                .await
185            }
186        }
187    }
188
189    /// Handles non-query request to the region. Returns the count of affected rows.
190    async fn handle_request(
191        &self,
192        region_id: RegionId,
193        request: RegionRequest,
194    ) -> Result<RegionResponse, BoxedError> {
195        let mut extension_return_value = HashMap::new();
196
197        let result = match request {
198            RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
199            RegionRequest::Create(create) => {
200                self.inner
201                    .create_regions(vec![(region_id, create)], &mut extension_return_value)
202                    .await
203            }
204            RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
205            RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
206            RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
207            RegionRequest::Alter(alter) => {
208                self.inner
209                    .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
210                    .await
211            }
212            RegionRequest::Compact(_) => {
213                if self.inner.is_physical_region(region_id) {
214                    self.inner
215                        .mito
216                        .handle_request(region_id, request)
217                        .await
218                        .context(error::MitoFlushOperationSnafu)
219                        .map(|response| response.affected_rows)
220                } else {
221                    UnsupportedRegionRequestSnafu { request }.fail()
222                }
223            }
224            RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
225            RegionRequest::BuildIndex(_) => {
226                if self.inner.is_physical_region(region_id) {
227                    self.inner
228                        .mito
229                        .handle_request(region_id, request)
230                        .await
231                        .context(error::MitoFlushOperationSnafu)
232                        .map(|response| response.affected_rows)
233                } else {
234                    UnsupportedRegionRequestSnafu { request }.fail()
235                }
236            }
237            RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
238            RegionRequest::Delete(_) => {
239                if self.inner.is_physical_region(region_id) {
240                    self.inner
241                        .mito
242                        .handle_request(region_id, request)
243                        .await
244                        .context(error::MitoDeleteOperationSnafu)
245                        .map(|response| response.affected_rows)
246                } else {
247                    UnsupportedRegionRequestSnafu { request }.fail()
248                }
249            }
250            RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
251            RegionRequest::BulkInserts(_) => {
252                // todo(hl): find a way to support bulk inserts in metric engine.
253                UnsupportedRegionRequestSnafu { request }.fail()
254            }
255        };
256
257        result.map_err(BoxedError::new).map(|rows| RegionResponse {
258            affected_rows: rows,
259            extensions: extension_return_value,
260            metadata: Vec::new(),
261        })
262    }
263
264    async fn handle_query(
265        &self,
266        region_id: RegionId,
267        request: ScanRequest,
268    ) -> Result<RegionScannerRef, BoxedError> {
269        self.handle_query(region_id, request).await
270    }
271
272    async fn get_committed_sequence(
273        &self,
274        region_id: RegionId,
275    ) -> Result<SequenceNumber, BoxedError> {
276        self.inner
277            .get_last_seq_num(region_id)
278            .await
279            .map_err(BoxedError::new)
280    }
281
282    /// Retrieves region's metadata.
283    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
284        self.inner
285            .load_region_metadata(region_id)
286            .await
287            .map_err(BoxedError::new)
288    }
289
290    /// Retrieves region's disk usage.
291    ///
292    /// Note: Returns `None` if it's a logical region.
293    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
294        if self.inner.is_physical_region(region_id) {
295            get_region_statistic(&self.inner.mito, region_id)
296        } else {
297            None
298        }
299    }
300
301    /// Stops the engine
302    async fn stop(&self) -> Result<(), BoxedError> {
303        // don't need to stop the underlying mito engine
304        Ok(())
305    }
306
307    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
308        // ignore the region not found error
309        for x in [
310            utils::to_metadata_region_id(region_id),
311            utils::to_data_region_id(region_id),
312        ] {
313            if let Err(e) = self.inner.mito.set_region_role(x, role)
314                && e.status_code() != StatusCode::RegionNotFound
315            {
316                return Err(e);
317            }
318        }
319        Ok(())
320    }
321
322    async fn sync_region(
323        &self,
324        region_id: RegionId,
325        manifest_info: RegionManifestInfo,
326    ) -> Result<SyncManifestResponse, BoxedError> {
327        self.inner
328            .sync_region(region_id, manifest_info)
329            .await
330            .map_err(BoxedError::new)
331    }
332
333    async fn set_region_role_state_gracefully(
334        &self,
335        region_id: RegionId,
336        region_role_state: SettableRegionRoleState,
337    ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
338        let metadata_result = match self
339            .inner
340            .mito
341            .set_region_role_state_gracefully(
342                utils::to_metadata_region_id(region_id),
343                region_role_state,
344            )
345            .await?
346        {
347            SetRegionRoleStateResponse::Success(success) => success,
348            SetRegionRoleStateResponse::NotFound => {
349                return Ok(SetRegionRoleStateResponse::NotFound);
350            }
351            SetRegionRoleStateResponse::InvalidTransition(error) => {
352                return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
353            }
354        };
355
356        let data_result = match self
357            .inner
358            .mito
359            .set_region_role_state_gracefully(region_id, region_role_state)
360            .await?
361        {
362            SetRegionRoleStateResponse::Success(success) => success,
363            SetRegionRoleStateResponse::NotFound => {
364                return Ok(SetRegionRoleStateResponse::NotFound);
365            }
366            SetRegionRoleStateResponse::InvalidTransition(error) => {
367                return Ok(SetRegionRoleStateResponse::InvalidTransition(error));
368            }
369        };
370
371        Ok(SetRegionRoleStateResponse::success(
372            SetRegionRoleStateSuccess::metric(
373                data_result.last_entry_id().unwrap_or_default(),
374                metadata_result.last_entry_id().unwrap_or_default(),
375            ),
376        ))
377    }
378
379    /// Returns the physical region role.
380    ///
381    /// Note: Returns `None` if it's a logical region.
382    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
383        if self.inner.is_physical_region(region_id) {
384            self.inner.mito.role(region_id)
385        } else {
386            None
387        }
388    }
389
390    fn as_any(&self) -> &dyn Any {
391        self
392    }
393}
394
395impl MetricEngine {
396    pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
397        let metadata_region = MetadataRegion::new(mito.clone());
398        let data_region = DataRegion::new(mito.clone());
399        let state = Arc::new(RwLock::default());
400        config.sanitize();
401        let flush_interval = config.flush_metadata_region_interval;
402        let inner = Arc::new(MetricEngineInner {
403            mito: mito.clone(),
404            metadata_region,
405            data_region,
406            state: state.clone(),
407            config,
408            row_modifier: RowModifier::default(),
409            flush_task: RepeatedTask::new(
410                flush_interval,
411                Box::new(FlushMetadataRegionTask {
412                    state: state.clone(),
413                    mito: mito.clone(),
414                }),
415            ),
416        });
417        inner
418            .flush_task
419            .start(common_runtime::global_runtime())
420            .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
421        Ok(Self { inner })
422    }
423
424    pub fn mito(&self) -> MitoEngine {
425        self.inner.mito.clone()
426    }
427
428    /// Returns all logical regions associated with the physical region.
429    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
430        self.inner
431            .metadata_region
432            .logical_regions(physical_region_id)
433            .await
434    }
435
436    /// Handles substrait query and return a stream of record batches
437    async fn handle_query(
438        &self,
439        region_id: RegionId,
440        request: ScanRequest,
441    ) -> Result<RegionScannerRef, BoxedError> {
442        self.inner
443            .read_region(region_id, request)
444            .await
445            .map_err(BoxedError::new)
446    }
447
448    async fn handle_requests(
449        &self,
450        requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
451    ) -> Result<RegionResponse, BoxedError> {
452        let mut affected_rows = 0;
453        let mut extensions = HashMap::new();
454        for (region_id, request) in requests {
455            let response = self.handle_request(region_id, request).await?;
456            affected_rows += response.affected_rows;
457            extensions.extend(response.extensions);
458        }
459
460        Ok(RegionResponse {
461            affected_rows,
462            extensions,
463            metadata: Vec::new(),
464        })
465    }
466}
467
468#[cfg(test)]
469impl MetricEngine {
470    pub async fn scan_to_stream(
471        &self,
472        region_id: RegionId,
473        request: ScanRequest,
474    ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
475        self.inner.scan_to_stream(region_id, request).await
476    }
477
478    /// Returns the configuration of the engine.
479    pub fn config(&self) -> &EngineConfig {
480        &self.inner.config
481    }
482}
483
484struct MetricEngineInner {
485    mito: MitoEngine,
486    metadata_region: MetadataRegion,
487    data_region: DataRegion,
488    state: Arc<RwLock<MetricEngineState>>,
489    config: EngineConfig,
490    row_modifier: RowModifier,
491    flush_task: RepeatedTask<Error>,
492}
493
494#[cfg(test)]
495mod test {
496    use std::collections::HashMap;
497
498    use common_telemetry::info;
499    use mito2::sst::location::region_dir_from_table_dir;
500    use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
501    use store_api::region_request::{
502        PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
503    };
504
505    use super::*;
506    use crate::test_util::TestEnv;
507
508    #[tokio::test]
509    async fn close_open_regions() {
510        let env = TestEnv::new().await;
511        env.init_metric_region().await;
512        let engine = env.metric();
513
514        // close physical region
515        let physical_region_id = env.default_physical_region_id();
516        engine
517            .handle_request(
518                physical_region_id,
519                RegionRequest::Close(RegionCloseRequest {}),
520            )
521            .await
522            .unwrap();
523
524        // reopen physical region
525        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
526            .into_iter()
527            .collect();
528        let open_request = RegionOpenRequest {
529            engine: METRIC_ENGINE_NAME.to_string(),
530            table_dir: TestEnv::default_table_dir(),
531            path_type: PathType::Bare, // Use Bare path type for engine regions
532            options: physical_region_option,
533            skip_wal_replay: false,
534            checkpoint: None,
535        };
536        engine
537            .handle_request(physical_region_id, RegionRequest::Open(open_request))
538            .await
539            .unwrap();
540
541        // close nonexistent region won't report error
542        let nonexistent_region_id = RegionId::new(12313, 12);
543        engine
544            .handle_request(
545                nonexistent_region_id,
546                RegionRequest::Close(RegionCloseRequest {}),
547            )
548            .await
549            .unwrap();
550
551        // open nonexistent region won't report error
552        let invalid_open_request = RegionOpenRequest {
553            engine: METRIC_ENGINE_NAME.to_string(),
554            table_dir: TestEnv::default_table_dir(),
555            path_type: PathType::Bare, // Use Bare path type for engine regions
556            options: HashMap::new(),
557            skip_wal_replay: false,
558            checkpoint: None,
559        };
560        engine
561            .handle_request(
562                nonexistent_region_id,
563                RegionRequest::Open(invalid_open_request),
564            )
565            .await
566            .unwrap();
567    }
568
569    #[tokio::test]
570    async fn test_role() {
571        let env = TestEnv::new().await;
572        env.init_metric_region().await;
573
574        let logical_region_id = env.default_logical_region_id();
575        let physical_region_id = env.default_physical_region_id();
576
577        assert!(env.metric().role(logical_region_id).is_none());
578        assert!(env.metric().role(physical_region_id).is_some());
579    }
580
581    #[tokio::test]
582    async fn test_region_disk_usage() {
583        let env = TestEnv::new().await;
584        env.init_metric_region().await;
585
586        let logical_region_id = env.default_logical_region_id();
587        let physical_region_id = env.default_physical_region_id();
588
589        assert!(env.metric().region_statistic(logical_region_id).is_none());
590        assert!(env.metric().region_statistic(physical_region_id).is_some());
591    }
592
593    #[tokio::test]
594    async fn test_open_region_failure() {
595        let env = TestEnv::new().await;
596        env.init_metric_region().await;
597        let physical_region_id = env.default_physical_region_id();
598
599        let metric_engine = env.metric();
600        metric_engine
601            .handle_request(
602                physical_region_id,
603                RegionRequest::Flush(RegionFlushRequest {
604                    row_group_size: None,
605                }),
606            )
607            .await
608            .unwrap();
609
610        let path = region_dir_from_table_dir(
611            &TestEnv::default_table_dir(),
612            physical_region_id,
613            PathType::Metadata,
614        );
615        let object_store = env.get_object_store().unwrap();
616        let list = object_store.list(&path).await.unwrap();
617        // Delete parquet files in metadata region
618        for entry in list {
619            if entry.metadata().is_dir() {
620                continue;
621            }
622            if entry.name().ends_with("parquet") {
623                info!("deleting {}", entry.path());
624                object_store.delete(entry.path()).await.unwrap();
625            }
626        }
627
628        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
629            .into_iter()
630            .collect();
631        let open_request = RegionOpenRequest {
632            engine: METRIC_ENGINE_NAME.to_string(),
633            table_dir: TestEnv::default_table_dir(),
634            path_type: PathType::Bare,
635            options: physical_region_option,
636            skip_wal_replay: false,
637            checkpoint: None,
638        };
639        // Opening an already opened region should succeed.
640        // Since the region is already open, no metadata recovery operations will be performed.
641        metric_engine
642            .handle_request(physical_region_id, RegionRequest::Open(open_request))
643            .await
644            .unwrap();
645
646        // Close the region
647        metric_engine
648            .handle_request(
649                physical_region_id,
650                RegionRequest::Close(RegionCloseRequest {}),
651            )
652            .await
653            .unwrap();
654
655        // Try to reopen region.
656        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
657            .into_iter()
658            .collect();
659        let open_request = RegionOpenRequest {
660            engine: METRIC_ENGINE_NAME.to_string(),
661            table_dir: TestEnv::default_table_dir(),
662            path_type: PathType::Bare,
663            options: physical_region_option,
664            skip_wal_replay: false,
665            checkpoint: None,
666        };
667        let err = metric_engine
668            .handle_request(physical_region_id, RegionRequest::Open(open_request))
669            .await
670            .unwrap_err();
671        // Failed to open region because of missing parquet files.
672        assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
673
674        let mito_engine = metric_engine.mito();
675        let data_region_id = utils::to_data_region_id(physical_region_id);
676        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
677        // The metadata/data region should be closed.
678        let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
679        assert_eq!(err.status_code(), StatusCode::RegionNotFound);
680        let err = mito_engine
681            .get_metadata(metadata_region_id)
682            .await
683            .unwrap_err();
684        assert_eq!(err.status_code(), StatusCode::RegionNotFound);
685    }
686}