metric_engine/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod state;
27mod sync;
28
29use std::any::Any;
30use std::collections::HashMap;
31use std::sync::{Arc, RwLock};
32
33use api::region::RegionResponse;
34use async_trait::async_trait;
35use common_error::ext::{BoxedError, ErrorExt};
36use common_error::status_code::StatusCode;
37use common_runtime::RepeatedTask;
38use mito2::engine::MitoEngine;
39pub(crate) use options::IndexOptions;
40use snafu::ResultExt;
41pub(crate) use state::MetricEngineState;
42use store_api::metadata::RegionMetadataRef;
43use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
44use store_api::region_engine::{
45    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
46    RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
47    SettableRegionRoleState, SyncManifestResponse,
48};
49use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
50use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
51
52use crate::config::EngineConfig;
53use crate::data_region::DataRegion;
54use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu};
55use crate::metadata_region::MetadataRegion;
56use crate::repeated_task::FlushMetadataRegionTask;
57use crate::row_modifier::RowModifier;
58use crate::utils::{self, get_region_statistic};
59
60#[cfg_attr(doc, aquamarine::aquamarine)]
61/// # Metric Engine
62///
63/// ## Regions
64///
65/// Regions in this metric engine has several roles. There is `PhysicalRegion`,
66/// which refer to the region that actually stores the data. And `LogicalRegion`
67/// that is "simulated" over physical regions. Each logical region is associated
68/// with one physical region group, which is a group of two physical regions.
69/// Their relationship is illustrated below:
70///
71/// ```mermaid
72/// erDiagram
73///     LogicalRegion ||--o{ PhysicalRegionGroup : corresponds
74///     PhysicalRegionGroup ||--|| DataRegion : contains
75///     PhysicalRegionGroup ||--|| MetadataRegion : contains
76/// ```
77///
78/// Metric engine uses two region groups. One is for data region
79/// ([METRIC_DATA_REGION_GROUP](crate::consts::METRIC_DATA_REGION_GROUP)), and the
80/// other is for metadata region ([METRIC_METADATA_REGION_GROUP](crate::consts::METRIC_METADATA_REGION_GROUP)).
81/// From the definition of [`RegionId`], we can convert between these two physical
82/// region ids easily. Thus in the code base we usually refer to one "physical
83/// region id", and convert it to the other one when necessary.
84///
85/// The logical region, in contrast, is a virtual region. It doesn't has dedicated
86/// storage or region group. Only a region id that is allocated by meta server.
87/// And all other things is shared with other logical region that are associated
88/// with the same physical region group.
89///
90/// For more document about physical regions, please refer to [`MetadataRegion`]
91/// and [`DataRegion`].
92///
93/// ## Operations
94///
95/// Both physical and logical region are accessible to user. But the operation
96/// they support are different. List below:
97///
98/// | Operations | Logical Region | Physical Region |
99/// | ---------- | -------------- | --------------- |
100/// |   Create   |       ✅        |        ✅        |
101/// |    Drop    |       ✅        |        ❓*       |
102/// |   Write    |       ✅        |        ❌        |
103/// |    Read    |       ✅        |        ✅        |
104/// |   Close    |       ✅        |        ✅        |
105/// |    Open    |       ✅        |        ✅        |
106/// |   Alter    |       ✅        |        ❓*       |
107///
108/// *: Physical region can be dropped only when all related logical regions are dropped.
109/// *: Alter: Physical regions only support altering region options.
110///
111/// ## Internal Columns
112///
113/// The physical data region contains two internal columns. Should
114/// mention that "internal" here is for metric engine itself. Mito
115/// engine will add it's internal columns to the region as well.
116///
117/// Their column id is registered in [`ReservedColumnId`]. And column name is
118/// defined in [`DATA_SCHEMA_TSID_COLUMN_NAME`] and [`DATA_SCHEMA_TABLE_ID_COLUMN_NAME`].
119///
120/// Tsid is generated by hashing all tags. And table id is retrieved from logical region
121/// id to distinguish data from different logical tables.
122#[derive(Clone)]
123pub struct MetricEngine {
124    inner: Arc<MetricEngineInner>,
125}
126
127#[async_trait]
128impl RegionEngine for MetricEngine {
129    /// Name of this engine
130    fn name(&self) -> &str {
131        METRIC_ENGINE_NAME
132    }
133
134    async fn handle_batch_open_requests(
135        &self,
136        parallelism: usize,
137        requests: Vec<(RegionId, RegionOpenRequest)>,
138    ) -> Result<BatchResponses, BoxedError> {
139        self.inner
140            .handle_batch_open_requests(parallelism, requests)
141            .await
142            .map_err(BoxedError::new)
143    }
144
145    async fn handle_batch_ddl_requests(
146        &self,
147        batch_request: BatchRegionDdlRequest,
148    ) -> Result<RegionResponse, BoxedError> {
149        match batch_request {
150            BatchRegionDdlRequest::Create(requests) => {
151                let mut extension_return_value = HashMap::new();
152                let rows = self
153                    .inner
154                    .create_regions(requests, &mut extension_return_value)
155                    .await
156                    .map_err(BoxedError::new)?;
157
158                Ok(RegionResponse {
159                    affected_rows: rows,
160                    extensions: extension_return_value,
161                    metadata: Vec::new(),
162                })
163            }
164            BatchRegionDdlRequest::Alter(requests) => {
165                let mut extension_return_value = HashMap::new();
166                let rows = self
167                    .inner
168                    .alter_regions(requests, &mut extension_return_value)
169                    .await
170                    .map_err(BoxedError::new)?;
171
172                Ok(RegionResponse {
173                    affected_rows: rows,
174                    extensions: extension_return_value,
175                    metadata: Vec::new(),
176                })
177            }
178            BatchRegionDdlRequest::Drop(requests) => {
179                self.handle_requests(
180                    requests
181                        .into_iter()
182                        .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
183                )
184                .await
185            }
186        }
187    }
188
189    /// Handles non-query request to the region. Returns the count of affected rows.
190    async fn handle_request(
191        &self,
192        region_id: RegionId,
193        request: RegionRequest,
194    ) -> Result<RegionResponse, BoxedError> {
195        let mut extension_return_value = HashMap::new();
196
197        let result = match request {
198            RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
199            RegionRequest::Create(create) => {
200                self.inner
201                    .create_regions(vec![(region_id, create)], &mut extension_return_value)
202                    .await
203            }
204            RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
205            RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
206            RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
207            RegionRequest::Alter(alter) => {
208                self.inner
209                    .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
210                    .await
211            }
212            RegionRequest::Compact(_) => {
213                if self.inner.is_physical_region(region_id) {
214                    self.inner
215                        .mito
216                        .handle_request(region_id, request)
217                        .await
218                        .context(error::MitoFlushOperationSnafu)
219                        .map(|response| response.affected_rows)
220                } else {
221                    UnsupportedRegionRequestSnafu { request }.fail()
222                }
223            }
224            RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
225            RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
226            RegionRequest::Delete(_) => {
227                if self.inner.is_physical_region(region_id) {
228                    self.inner
229                        .mito
230                        .handle_request(region_id, request)
231                        .await
232                        .context(error::MitoDeleteOperationSnafu)
233                        .map(|response| response.affected_rows)
234                } else {
235                    UnsupportedRegionRequestSnafu { request }.fail()
236                }
237            }
238            RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
239            RegionRequest::BulkInserts(_) => {
240                // todo(hl): find a way to support bulk inserts in metric engine.
241                UnsupportedRegionRequestSnafu { request }.fail()
242            }
243        };
244
245        result.map_err(BoxedError::new).map(|rows| RegionResponse {
246            affected_rows: rows,
247            extensions: extension_return_value,
248            metadata: Vec::new(),
249        })
250    }
251
252    async fn handle_query(
253        &self,
254        region_id: RegionId,
255        request: ScanRequest,
256    ) -> Result<RegionScannerRef, BoxedError> {
257        self.handle_query(region_id, request).await
258    }
259
260    async fn get_last_seq_num(
261        &self,
262        region_id: RegionId,
263    ) -> Result<Option<SequenceNumber>, BoxedError> {
264        self.inner
265            .get_last_seq_num(region_id)
266            .await
267            .map_err(BoxedError::new)
268    }
269
270    /// Retrieves region's metadata.
271    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
272        self.inner
273            .load_region_metadata(region_id)
274            .await
275            .map_err(BoxedError::new)
276    }
277
278    /// Retrieves region's disk usage.
279    ///
280    /// Note: Returns `None` if it's a logical region.
281    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
282        if self.inner.is_physical_region(region_id) {
283            get_region_statistic(&self.inner.mito, region_id)
284        } else {
285            None
286        }
287    }
288
289    /// Stops the engine
290    async fn stop(&self) -> Result<(), BoxedError> {
291        // don't need to stop the underlying mito engine
292        Ok(())
293    }
294
295    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
296        // ignore the region not found error
297        for x in [
298            utils::to_metadata_region_id(region_id),
299            utils::to_data_region_id(region_id),
300        ] {
301            if let Err(e) = self.inner.mito.set_region_role(x, role)
302                && e.status_code() != StatusCode::RegionNotFound
303            {
304                return Err(e);
305            }
306        }
307        Ok(())
308    }
309
310    async fn sync_region(
311        &self,
312        region_id: RegionId,
313        manifest_info: RegionManifestInfo,
314    ) -> Result<SyncManifestResponse, BoxedError> {
315        self.inner
316            .sync_region(region_id, manifest_info)
317            .await
318            .map_err(BoxedError::new)
319    }
320
321    async fn set_region_role_state_gracefully(
322        &self,
323        region_id: RegionId,
324        region_role_state: SettableRegionRoleState,
325    ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
326        let metadata_result = match self
327            .inner
328            .mito
329            .set_region_role_state_gracefully(
330                utils::to_metadata_region_id(region_id),
331                region_role_state,
332            )
333            .await?
334        {
335            SetRegionRoleStateResponse::Success(success) => success,
336            SetRegionRoleStateResponse::NotFound => {
337                return Ok(SetRegionRoleStateResponse::NotFound)
338            }
339            SetRegionRoleStateResponse::InvalidTransition(error) => {
340                return Ok(SetRegionRoleStateResponse::InvalidTransition(error))
341            }
342        };
343
344        let data_result = match self
345            .inner
346            .mito
347            .set_region_role_state_gracefully(region_id, region_role_state)
348            .await?
349        {
350            SetRegionRoleStateResponse::Success(success) => success,
351            SetRegionRoleStateResponse::NotFound => {
352                return Ok(SetRegionRoleStateResponse::NotFound)
353            }
354            SetRegionRoleStateResponse::InvalidTransition(error) => {
355                return Ok(SetRegionRoleStateResponse::InvalidTransition(error))
356            }
357        };
358
359        Ok(SetRegionRoleStateResponse::success(
360            SetRegionRoleStateSuccess::metric(
361                data_result.last_entry_id().unwrap_or_default(),
362                metadata_result.last_entry_id().unwrap_or_default(),
363            ),
364        ))
365    }
366
367    /// Returns the physical region role.
368    ///
369    /// Note: Returns `None` if it's a logical region.
370    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
371        if self.inner.is_physical_region(region_id) {
372            self.inner.mito.role(region_id)
373        } else {
374            None
375        }
376    }
377
378    fn as_any(&self) -> &dyn Any {
379        self
380    }
381}
382
383impl MetricEngine {
384    pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
385        let metadata_region = MetadataRegion::new(mito.clone());
386        let data_region = DataRegion::new(mito.clone());
387        let state = Arc::new(RwLock::default());
388        config.sanitize();
389        let flush_interval = config.flush_metadata_region_interval;
390        let inner = Arc::new(MetricEngineInner {
391            mito: mito.clone(),
392            metadata_region,
393            data_region,
394            state: state.clone(),
395            config,
396            row_modifier: RowModifier::default(),
397            flush_task: RepeatedTask::new(
398                flush_interval,
399                Box::new(FlushMetadataRegionTask {
400                    state: state.clone(),
401                    mito: mito.clone(),
402                }),
403            ),
404        });
405        inner
406            .flush_task
407            .start(common_runtime::global_runtime())
408            .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
409        Ok(Self { inner })
410    }
411
412    pub fn mito(&self) -> MitoEngine {
413        self.inner.mito.clone()
414    }
415
416    /// Returns all logical regions associated with the physical region.
417    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
418        self.inner
419            .metadata_region
420            .logical_regions(physical_region_id)
421            .await
422    }
423
424    /// Handles substrait query and return a stream of record batches
425    async fn handle_query(
426        &self,
427        region_id: RegionId,
428        request: ScanRequest,
429    ) -> Result<RegionScannerRef, BoxedError> {
430        self.inner
431            .read_region(region_id, request)
432            .await
433            .map_err(BoxedError::new)
434    }
435
436    async fn handle_requests(
437        &self,
438        requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
439    ) -> Result<RegionResponse, BoxedError> {
440        let mut affected_rows = 0;
441        let mut extensions = HashMap::new();
442        for (region_id, request) in requests {
443            let response = self.handle_request(region_id, request).await?;
444            affected_rows += response.affected_rows;
445            extensions.extend(response.extensions);
446        }
447
448        Ok(RegionResponse {
449            affected_rows,
450            extensions,
451            metadata: Vec::new(),
452        })
453    }
454}
455
456#[cfg(test)]
457impl MetricEngine {
458    pub async fn scan_to_stream(
459        &self,
460        region_id: RegionId,
461        request: ScanRequest,
462    ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
463        self.inner.scan_to_stream(region_id, request).await
464    }
465
466    /// Returns the configuration of the engine.
467    pub fn config(&self) -> &EngineConfig {
468        &self.inner.config
469    }
470}
471
472struct MetricEngineInner {
473    mito: MitoEngine,
474    metadata_region: MetadataRegion,
475    data_region: DataRegion,
476    state: Arc<RwLock<MetricEngineState>>,
477    config: EngineConfig,
478    row_modifier: RowModifier,
479    flush_task: RepeatedTask<Error>,
480}
481
482#[cfg(test)]
483mod test {
484    use std::collections::HashMap;
485
486    use common_telemetry::info;
487    use mito2::sst::location::region_dir_from_table_dir;
488    use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
489    use store_api::region_request::{
490        PathType, RegionCloseRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
491    };
492
493    use super::*;
494    use crate::test_util::TestEnv;
495
496    #[tokio::test]
497    async fn close_open_regions() {
498        let env = TestEnv::new().await;
499        env.init_metric_region().await;
500        let engine = env.metric();
501
502        // close physical region
503        let physical_region_id = env.default_physical_region_id();
504        engine
505            .handle_request(
506                physical_region_id,
507                RegionRequest::Close(RegionCloseRequest {}),
508            )
509            .await
510            .unwrap();
511
512        // reopen physical region
513        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
514            .into_iter()
515            .collect();
516        let open_request = RegionOpenRequest {
517            engine: METRIC_ENGINE_NAME.to_string(),
518            table_dir: TestEnv::default_table_dir(),
519            path_type: PathType::Bare, // Use Bare path type for engine regions
520            options: physical_region_option,
521            skip_wal_replay: false,
522            checkpoint: None,
523        };
524        engine
525            .handle_request(physical_region_id, RegionRequest::Open(open_request))
526            .await
527            .unwrap();
528
529        // close nonexistent region won't report error
530        let nonexistent_region_id = RegionId::new(12313, 12);
531        engine
532            .handle_request(
533                nonexistent_region_id,
534                RegionRequest::Close(RegionCloseRequest {}),
535            )
536            .await
537            .unwrap();
538
539        // open nonexistent region won't report error
540        let invalid_open_request = RegionOpenRequest {
541            engine: METRIC_ENGINE_NAME.to_string(),
542            table_dir: TestEnv::default_table_dir(),
543            path_type: PathType::Bare, // Use Bare path type for engine regions
544            options: HashMap::new(),
545            skip_wal_replay: false,
546            checkpoint: None,
547        };
548        engine
549            .handle_request(
550                nonexistent_region_id,
551                RegionRequest::Open(invalid_open_request),
552            )
553            .await
554            .unwrap();
555    }
556
557    #[tokio::test]
558    async fn test_role() {
559        let env = TestEnv::new().await;
560        env.init_metric_region().await;
561
562        let logical_region_id = env.default_logical_region_id();
563        let physical_region_id = env.default_physical_region_id();
564
565        assert!(env.metric().role(logical_region_id).is_none());
566        assert!(env.metric().role(physical_region_id).is_some());
567    }
568
569    #[tokio::test]
570    async fn test_region_disk_usage() {
571        let env = TestEnv::new().await;
572        env.init_metric_region().await;
573
574        let logical_region_id = env.default_logical_region_id();
575        let physical_region_id = env.default_physical_region_id();
576
577        assert!(env.metric().region_statistic(logical_region_id).is_none());
578        assert!(env.metric().region_statistic(physical_region_id).is_some());
579    }
580
581    #[tokio::test]
582    async fn test_open_region_failure() {
583        let env = TestEnv::new().await;
584        env.init_metric_region().await;
585        let physical_region_id = env.default_physical_region_id();
586
587        let metric_engine = env.metric();
588        metric_engine
589            .handle_request(
590                physical_region_id,
591                RegionRequest::Flush(RegionFlushRequest {
592                    row_group_size: None,
593                }),
594            )
595            .await
596            .unwrap();
597
598        let path = region_dir_from_table_dir(
599            &TestEnv::default_table_dir(),
600            physical_region_id,
601            PathType::Metadata,
602        );
603        let object_store = env.get_object_store().unwrap();
604        let list = object_store.list(&path).await.unwrap();
605        // Delete parquet files in metadata region
606        for entry in list {
607            if entry.metadata().is_dir() {
608                continue;
609            }
610            if entry.name().ends_with("parquet") {
611                info!("deleting {}", entry.path());
612                object_store.delete(entry.path()).await.unwrap();
613            }
614        }
615
616        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
617            .into_iter()
618            .collect();
619        let open_request = RegionOpenRequest {
620            engine: METRIC_ENGINE_NAME.to_string(),
621            table_dir: TestEnv::default_table_dir(),
622            path_type: PathType::Bare,
623            options: physical_region_option,
624            skip_wal_replay: false,
625            checkpoint: None,
626        };
627        // Opening an already opened region should succeed.
628        // Since the region is already open, no metadata recovery operations will be performed.
629        metric_engine
630            .handle_request(physical_region_id, RegionRequest::Open(open_request))
631            .await
632            .unwrap();
633
634        // Close the region
635        metric_engine
636            .handle_request(
637                physical_region_id,
638                RegionRequest::Close(RegionCloseRequest {}),
639            )
640            .await
641            .unwrap();
642
643        // Try to reopen region.
644        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
645            .into_iter()
646            .collect();
647        let open_request = RegionOpenRequest {
648            engine: METRIC_ENGINE_NAME.to_string(),
649            table_dir: TestEnv::default_table_dir(),
650            path_type: PathType::Bare,
651            options: physical_region_option,
652            skip_wal_replay: false,
653            checkpoint: None,
654        };
655        let err = metric_engine
656            .handle_request(physical_region_id, RegionRequest::Open(open_request))
657            .await
658            .unwrap_err();
659        // Failed to open region because of missing parquet files.
660        assert_eq!(err.status_code(), StatusCode::StorageUnavailable);
661
662        let mito_engine = metric_engine.mito();
663        let data_region_id = utils::to_data_region_id(physical_region_id);
664        let metadata_region_id = utils::to_metadata_region_id(physical_region_id);
665        // The metadata/data region should be closed.
666        let err = mito_engine.get_metadata(data_region_id).await.unwrap_err();
667        assert_eq!(err.status_code(), StatusCode::RegionNotFound);
668        let err = mito_engine
669            .get_metadata(metadata_region_id)
670            .await
671            .unwrap_err();
672        assert_eq!(err.status_code(), StatusCode::RegionNotFound);
673    }
674}