metric_engine/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod state;
27mod sync;
28
29use std::any::Any;
30use std::collections::HashMap;
31use std::sync::{Arc, RwLock};
32
33use api::region::RegionResponse;
34use async_trait::async_trait;
35use common_error::ext::{BoxedError, ErrorExt};
36use common_error::status_code::StatusCode;
37use common_runtime::RepeatedTask;
38use mito2::engine::MitoEngine;
39pub(crate) use options::IndexOptions;
40use snafu::ResultExt;
41pub(crate) use state::MetricEngineState;
42use store_api::metadata::RegionMetadataRef;
43use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
44use store_api::region_engine::{
45    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
46    RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
47    SettableRegionRoleState, SyncManifestResponse,
48};
49use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
50use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
51
52use crate::config::EngineConfig;
53use crate::data_region::DataRegion;
54use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu};
55use crate::metadata_region::MetadataRegion;
56use crate::repeated_task::FlushMetadataRegionTask;
57use crate::row_modifier::RowModifier;
58use crate::utils::{self, get_region_statistic};
59
60#[cfg_attr(doc, aquamarine::aquamarine)]
61/// # Metric Engine
62///
63/// ## Regions
64///
65/// Regions in this metric engine has several roles. There is `PhysicalRegion`,
66/// which refer to the region that actually stores the data. And `LogicalRegion`
67/// that is "simulated" over physical regions. Each logical region is associated
68/// with one physical region group, which is a group of two physical regions.
69/// Their relationship is illustrated below:
70///
71/// ```mermaid
72/// erDiagram
73///     LogicalRegion ||--o{ PhysicalRegionGroup : corresponds
74///     PhysicalRegionGroup ||--|| DataRegion : contains
75///     PhysicalRegionGroup ||--|| MetadataRegion : contains
76/// ```
77///
78/// Metric engine uses two region groups. One is for data region
79/// ([METRIC_DATA_REGION_GROUP](crate::consts::METRIC_DATA_REGION_GROUP)), and the
80/// other is for metadata region ([METRIC_METADATA_REGION_GROUP](crate::consts::METRIC_METADATA_REGION_GROUP)).
81/// From the definition of [`RegionId`], we can convert between these two physical
82/// region ids easily. Thus in the code base we usually refer to one "physical
83/// region id", and convert it to the other one when necessary.
84///
85/// The logical region, in contrast, is a virtual region. It doesn't has dedicated
86/// storage or region group. Only a region id that is allocated by meta server.
87/// And all other things is shared with other logical region that are associated
88/// with the same physical region group.
89///
90/// For more document about physical regions, please refer to [`MetadataRegion`]
91/// and [`DataRegion`].
92///
93/// ## Operations
94///
95/// Both physical and logical region are accessible to user. But the operation
96/// they support are different. List below:
97///
98/// | Operations | Logical Region | Physical Region |
99/// | ---------- | -------------- | --------------- |
100/// |   Create   |       ✅        |        ✅        |
101/// |    Drop    |       ✅        |        ❓*       |
102/// |   Write    |       ✅        |        ❌        |
103/// |    Read    |       ✅        |        ✅        |
104/// |   Close    |       ✅        |        ✅        |
105/// |    Open    |       ✅        |        ✅        |
106/// |   Alter    |       ✅        |        ❓*       |
107///
108/// *: Physical region can be dropped only when all related logical regions are dropped.
109/// *: Alter: Physical regions only support altering region options.
110///
111/// ## Internal Columns
112///
113/// The physical data region contains two internal columns. Should
114/// mention that "internal" here is for metric engine itself. Mito
115/// engine will add it's internal columns to the region as well.
116///
117/// Their column id is registered in [`ReservedColumnId`]. And column name is
118/// defined in [`DATA_SCHEMA_TSID_COLUMN_NAME`] and [`DATA_SCHEMA_TABLE_ID_COLUMN_NAME`].
119///
120/// Tsid is generated by hashing all tags. And table id is retrieved from logical region
121/// id to distinguish data from different logical tables.
122#[derive(Clone)]
123pub struct MetricEngine {
124    inner: Arc<MetricEngineInner>,
125}
126
127#[async_trait]
128impl RegionEngine for MetricEngine {
129    /// Name of this engine
130    fn name(&self) -> &str {
131        METRIC_ENGINE_NAME
132    }
133
134    async fn handle_batch_open_requests(
135        &self,
136        parallelism: usize,
137        requests: Vec<(RegionId, RegionOpenRequest)>,
138    ) -> Result<BatchResponses, BoxedError> {
139        self.inner
140            .handle_batch_open_requests(parallelism, requests)
141            .await
142            .map_err(BoxedError::new)
143    }
144
145    async fn handle_batch_ddl_requests(
146        &self,
147        batch_request: BatchRegionDdlRequest,
148    ) -> Result<RegionResponse, BoxedError> {
149        match batch_request {
150            BatchRegionDdlRequest::Create(requests) => {
151                let mut extension_return_value = HashMap::new();
152                let rows = self
153                    .inner
154                    .create_regions(requests, &mut extension_return_value)
155                    .await
156                    .map_err(BoxedError::new)?;
157
158                Ok(RegionResponse {
159                    affected_rows: rows,
160                    extensions: extension_return_value,
161                })
162            }
163            BatchRegionDdlRequest::Alter(requests) => {
164                let mut extension_return_value = HashMap::new();
165                let rows = self
166                    .inner
167                    .alter_regions(requests, &mut extension_return_value)
168                    .await
169                    .map_err(BoxedError::new)?;
170
171                Ok(RegionResponse {
172                    affected_rows: rows,
173                    extensions: extension_return_value,
174                })
175            }
176            BatchRegionDdlRequest::Drop(requests) => {
177                self.handle_requests(
178                    requests
179                        .into_iter()
180                        .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
181                )
182                .await
183            }
184        }
185    }
186
187    /// Handles non-query request to the region. Returns the count of affected rows.
188    async fn handle_request(
189        &self,
190        region_id: RegionId,
191        request: RegionRequest,
192    ) -> Result<RegionResponse, BoxedError> {
193        let mut extension_return_value = HashMap::new();
194
195        let result = match request {
196            RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
197            RegionRequest::Create(create) => {
198                self.inner
199                    .create_regions(vec![(region_id, create)], &mut extension_return_value)
200                    .await
201            }
202            RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
203            RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
204            RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
205            RegionRequest::Alter(alter) => {
206                self.inner
207                    .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
208                    .await
209            }
210            RegionRequest::Compact(_) => {
211                if self.inner.is_physical_region(region_id) {
212                    self.inner
213                        .mito
214                        .handle_request(region_id, request)
215                        .await
216                        .context(error::MitoFlushOperationSnafu)
217                        .map(|response| response.affected_rows)
218                } else {
219                    UnsupportedRegionRequestSnafu { request }.fail()
220                }
221            }
222            RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
223            RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
224            RegionRequest::Delete(_) => {
225                if self.inner.is_physical_region(region_id) {
226                    self.inner
227                        .mito
228                        .handle_request(region_id, request)
229                        .await
230                        .context(error::MitoDeleteOperationSnafu)
231                        .map(|response| response.affected_rows)
232                } else {
233                    UnsupportedRegionRequestSnafu { request }.fail()
234                }
235            }
236            RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
237            RegionRequest::BulkInserts(_) => {
238                // todo(hl): find a way to support bulk inserts in metric engine.
239                UnsupportedRegionRequestSnafu { request }.fail()
240            }
241        };
242
243        result.map_err(BoxedError::new).map(|rows| RegionResponse {
244            affected_rows: rows,
245            extensions: extension_return_value,
246        })
247    }
248
249    async fn handle_query(
250        &self,
251        region_id: RegionId,
252        request: ScanRequest,
253    ) -> Result<RegionScannerRef, BoxedError> {
254        self.handle_query(region_id, request).await
255    }
256
257    async fn get_last_seq_num(
258        &self,
259        region_id: RegionId,
260    ) -> Result<Option<SequenceNumber>, BoxedError> {
261        self.inner
262            .get_last_seq_num(region_id)
263            .await
264            .map_err(BoxedError::new)
265    }
266
267    /// Retrieves region's metadata.
268    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
269        self.inner
270            .load_region_metadata(region_id)
271            .await
272            .map_err(BoxedError::new)
273    }
274
275    /// Retrieves region's disk usage.
276    ///
277    /// Note: Returns `None` if it's a logical region.
278    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
279        if self.inner.is_physical_region(region_id) {
280            get_region_statistic(&self.inner.mito, region_id)
281        } else {
282            None
283        }
284    }
285
286    /// Stops the engine
287    async fn stop(&self) -> Result<(), BoxedError> {
288        // don't need to stop the underlying mito engine
289        Ok(())
290    }
291
292    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
293        // ignore the region not found error
294        for x in [
295            utils::to_metadata_region_id(region_id),
296            utils::to_data_region_id(region_id),
297        ] {
298            if let Err(e) = self.inner.mito.set_region_role(x, role)
299                && e.status_code() != StatusCode::RegionNotFound
300            {
301                return Err(e);
302            }
303        }
304        Ok(())
305    }
306
307    async fn sync_region(
308        &self,
309        region_id: RegionId,
310        manifest_info: RegionManifestInfo,
311    ) -> Result<SyncManifestResponse, BoxedError> {
312        self.inner
313            .sync_region(region_id, manifest_info)
314            .await
315            .map_err(BoxedError::new)
316    }
317
318    async fn set_region_role_state_gracefully(
319        &self,
320        region_id: RegionId,
321        region_role_state: SettableRegionRoleState,
322    ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
323        let metadata_result = match self
324            .inner
325            .mito
326            .set_region_role_state_gracefully(
327                utils::to_metadata_region_id(region_id),
328                region_role_state,
329            )
330            .await?
331        {
332            SetRegionRoleStateResponse::Success(success) => success,
333            SetRegionRoleStateResponse::NotFound => {
334                return Ok(SetRegionRoleStateResponse::NotFound)
335            }
336        };
337
338        let data_result = match self
339            .inner
340            .mito
341            .set_region_role_state_gracefully(region_id, region_role_state)
342            .await?
343        {
344            SetRegionRoleStateResponse::Success(success) => success,
345            SetRegionRoleStateResponse::NotFound => {
346                return Ok(SetRegionRoleStateResponse::NotFound)
347            }
348        };
349
350        Ok(SetRegionRoleStateResponse::success(
351            SetRegionRoleStateSuccess::metric(
352                data_result.last_entry_id().unwrap_or_default(),
353                metadata_result.last_entry_id().unwrap_or_default(),
354            ),
355        ))
356    }
357
358    /// Returns the physical region role.
359    ///
360    /// Note: Returns `None` if it's a logical region.
361    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
362        if self.inner.is_physical_region(region_id) {
363            self.inner.mito.role(region_id)
364        } else {
365            None
366        }
367    }
368
369    fn as_any(&self) -> &dyn Any {
370        self
371    }
372}
373
374impl MetricEngine {
375    pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
376        let metadata_region = MetadataRegion::new(mito.clone());
377        let data_region = DataRegion::new(mito.clone());
378        let state = Arc::new(RwLock::default());
379        config.sanitize();
380        let flush_interval = config.flush_metadata_region_interval;
381        let inner = Arc::new(MetricEngineInner {
382            mito: mito.clone(),
383            metadata_region,
384            data_region,
385            state: state.clone(),
386            config,
387            row_modifier: RowModifier::new(),
388            flush_task: RepeatedTask::new(
389                flush_interval,
390                Box::new(FlushMetadataRegionTask {
391                    state: state.clone(),
392                    mito: mito.clone(),
393                }),
394            ),
395        });
396        inner
397            .flush_task
398            .start(common_runtime::global_runtime())
399            .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
400        Ok(Self { inner })
401    }
402
403    pub fn mito(&self) -> MitoEngine {
404        self.inner.mito.clone()
405    }
406
407    /// Returns all logical regions associated with the physical region.
408    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
409        self.inner
410            .metadata_region
411            .logical_regions(physical_region_id)
412            .await
413    }
414
415    /// Handles substrait query and return a stream of record batches
416    async fn handle_query(
417        &self,
418        region_id: RegionId,
419        request: ScanRequest,
420    ) -> Result<RegionScannerRef, BoxedError> {
421        self.inner
422            .read_region(region_id, request)
423            .await
424            .map_err(BoxedError::new)
425    }
426
427    async fn handle_requests(
428        &self,
429        requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
430    ) -> Result<RegionResponse, BoxedError> {
431        let mut affected_rows = 0;
432        let mut extensions = HashMap::new();
433        for (region_id, request) in requests {
434            let response = self.handle_request(region_id, request).await?;
435            affected_rows += response.affected_rows;
436            extensions.extend(response.extensions);
437        }
438
439        Ok(RegionResponse {
440            affected_rows,
441            extensions,
442        })
443    }
444}
445
446#[cfg(test)]
447impl MetricEngine {
448    pub async fn scan_to_stream(
449        &self,
450        region_id: RegionId,
451        request: ScanRequest,
452    ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
453        self.inner.scan_to_stream(region_id, request).await
454    }
455
456    /// Returns the configuration of the engine.
457    pub fn config(&self) -> &EngineConfig {
458        &self.inner.config
459    }
460}
461
462struct MetricEngineInner {
463    mito: MitoEngine,
464    metadata_region: MetadataRegion,
465    data_region: DataRegion,
466    state: Arc<RwLock<MetricEngineState>>,
467    config: EngineConfig,
468    row_modifier: RowModifier,
469    flush_task: RepeatedTask<Error>,
470}
471
472#[cfg(test)]
473mod test {
474    use std::collections::HashMap;
475
476    use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
477    use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};
478
479    use super::*;
480    use crate::test_util::TestEnv;
481
482    #[tokio::test]
483    async fn close_open_regions() {
484        let env = TestEnv::new().await;
485        env.init_metric_region().await;
486        let engine = env.metric();
487
488        // close physical region
489        let physical_region_id = env.default_physical_region_id();
490        engine
491            .handle_request(
492                physical_region_id,
493                RegionRequest::Close(RegionCloseRequest {}),
494            )
495            .await
496            .unwrap();
497
498        // reopen physical region
499        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
500            .into_iter()
501            .collect();
502        let open_request = RegionOpenRequest {
503            engine: METRIC_ENGINE_NAME.to_string(),
504            region_dir: env.default_region_dir(),
505            options: physical_region_option,
506            skip_wal_replay: false,
507        };
508        engine
509            .handle_request(physical_region_id, RegionRequest::Open(open_request))
510            .await
511            .unwrap();
512
513        // close nonexistent region won't report error
514        let nonexistent_region_id = RegionId::new(12313, 12);
515        engine
516            .handle_request(
517                nonexistent_region_id,
518                RegionRequest::Close(RegionCloseRequest {}),
519            )
520            .await
521            .unwrap();
522
523        // open nonexistent region won't report error
524        let invalid_open_request = RegionOpenRequest {
525            engine: METRIC_ENGINE_NAME.to_string(),
526            region_dir: env.default_region_dir(),
527            options: HashMap::new(),
528            skip_wal_replay: false,
529        };
530        engine
531            .handle_request(
532                nonexistent_region_id,
533                RegionRequest::Open(invalid_open_request),
534            )
535            .await
536            .unwrap();
537    }
538
539    #[tokio::test]
540    async fn test_role() {
541        let env = TestEnv::new().await;
542        env.init_metric_region().await;
543
544        let logical_region_id = env.default_logical_region_id();
545        let physical_region_id = env.default_physical_region_id();
546
547        assert!(env.metric().role(logical_region_id).is_none());
548        assert!(env.metric().role(physical_region_id).is_some());
549    }
550
551    #[tokio::test]
552    async fn test_region_disk_usage() {
553        let env = TestEnv::new().await;
554        env.init_metric_region().await;
555
556        let logical_region_id = env.default_logical_region_id();
557        let physical_region_id = env.default_physical_region_id();
558
559        assert!(env.metric().region_statistic(logical_region_id).is_none());
560        assert!(env.metric().region_statistic(physical_region_id).is_some());
561    }
562}