metric_engine/
engine.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod alter;
16mod catchup;
17mod close;
18mod create;
19mod drop;
20mod flush;
21mod open;
22mod options;
23mod put;
24mod read;
25mod region_metadata;
26mod state;
27mod sync;
28
29use std::any::Any;
30use std::collections::HashMap;
31use std::sync::{Arc, RwLock};
32
33use api::region::RegionResponse;
34use async_trait::async_trait;
35use common_error::ext::{BoxedError, ErrorExt};
36use common_error::status_code::StatusCode;
37use common_runtime::RepeatedTask;
38use mito2::engine::MitoEngine;
39pub(crate) use options::IndexOptions;
40use snafu::ResultExt;
41pub(crate) use state::MetricEngineState;
42use store_api::metadata::RegionMetadataRef;
43use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
44use store_api::region_engine::{
45    BatchResponses, RegionEngine, RegionManifestInfo, RegionRole, RegionScannerRef,
46    RegionStatistic, SetRegionRoleStateResponse, SetRegionRoleStateSuccess,
47    SettableRegionRoleState, SyncManifestResponse,
48};
49use store_api::region_request::{BatchRegionDdlRequest, RegionOpenRequest, RegionRequest};
50use store_api::storage::{RegionId, ScanRequest, SequenceNumber};
51
52use crate::config::EngineConfig;
53use crate::data_region::DataRegion;
54use crate::error::{self, Error, Result, StartRepeatedTaskSnafu, UnsupportedRegionRequestSnafu};
55use crate::metadata_region::MetadataRegion;
56use crate::repeated_task::FlushMetadataRegionTask;
57use crate::row_modifier::RowModifier;
58use crate::utils::{self, get_region_statistic};
59
60#[cfg_attr(doc, aquamarine::aquamarine)]
61/// # Metric Engine
62///
63/// ## Regions
64///
65/// Regions in this metric engine has several roles. There is `PhysicalRegion`,
66/// which refer to the region that actually stores the data. And `LogicalRegion`
67/// that is "simulated" over physical regions. Each logical region is associated
68/// with one physical region group, which is a group of two physical regions.
69/// Their relationship is illustrated below:
70///
71/// ```mermaid
72/// erDiagram
73///     LogicalRegion ||--o{ PhysicalRegionGroup : corresponds
74///     PhysicalRegionGroup ||--|| DataRegion : contains
75///     PhysicalRegionGroup ||--|| MetadataRegion : contains
76/// ```
77///
78/// Metric engine uses two region groups. One is for data region
79/// ([METRIC_DATA_REGION_GROUP](crate::consts::METRIC_DATA_REGION_GROUP)), and the
80/// other is for metadata region ([METRIC_METADATA_REGION_GROUP](crate::consts::METRIC_METADATA_REGION_GROUP)).
81/// From the definition of [`RegionId`], we can convert between these two physical
82/// region ids easily. Thus in the code base we usually refer to one "physical
83/// region id", and convert it to the other one when necessary.
84///
85/// The logical region, in contrast, is a virtual region. It doesn't has dedicated
86/// storage or region group. Only a region id that is allocated by meta server.
87/// And all other things is shared with other logical region that are associated
88/// with the same physical region group.
89///
90/// For more document about physical regions, please refer to [`MetadataRegion`]
91/// and [`DataRegion`].
92///
93/// ## Operations
94///
95/// Both physical and logical region are accessible to user. But the operation
96/// they support are different. List below:
97///
98/// | Operations | Logical Region | Physical Region |
99/// | ---------- | -------------- | --------------- |
100/// |   Create   |       ✅        |        ✅        |
101/// |    Drop    |       ✅        |        ❓*       |
102/// |   Write    |       ✅        |        ❌        |
103/// |    Read    |       ✅        |        ✅        |
104/// |   Close    |       ✅        |        ✅        |
105/// |    Open    |       ✅        |        ✅        |
106/// |   Alter    |       ✅        |        ❓*       |
107///
108/// *: Physical region can be dropped only when all related logical regions are dropped.
109/// *: Alter: Physical regions only support altering region options.
110///
111/// ## Internal Columns
112///
113/// The physical data region contains two internal columns. Should
114/// mention that "internal" here is for metric engine itself. Mito
115/// engine will add it's internal columns to the region as well.
116///
117/// Their column id is registered in [`ReservedColumnId`]. And column name is
118/// defined in [`DATA_SCHEMA_TSID_COLUMN_NAME`] and [`DATA_SCHEMA_TABLE_ID_COLUMN_NAME`].
119///
120/// Tsid is generated by hashing all tags. And table id is retrieved from logical region
121/// id to distinguish data from different logical tables.
122#[derive(Clone)]
123pub struct MetricEngine {
124    inner: Arc<MetricEngineInner>,
125}
126
127#[async_trait]
128impl RegionEngine for MetricEngine {
129    /// Name of this engine
130    fn name(&self) -> &str {
131        METRIC_ENGINE_NAME
132    }
133
134    async fn handle_batch_open_requests(
135        &self,
136        parallelism: usize,
137        requests: Vec<(RegionId, RegionOpenRequest)>,
138    ) -> Result<BatchResponses, BoxedError> {
139        self.inner
140            .handle_batch_open_requests(parallelism, requests)
141            .await
142            .map_err(BoxedError::new)
143    }
144
145    async fn handle_batch_ddl_requests(
146        &self,
147        batch_request: BatchRegionDdlRequest,
148    ) -> Result<RegionResponse, BoxedError> {
149        match batch_request {
150            BatchRegionDdlRequest::Create(requests) => {
151                let mut extension_return_value = HashMap::new();
152                let rows = self
153                    .inner
154                    .create_regions(requests, &mut extension_return_value)
155                    .await
156                    .map_err(BoxedError::new)?;
157
158                Ok(RegionResponse {
159                    affected_rows: rows,
160                    extensions: extension_return_value,
161                    metadata: Vec::new(),
162                })
163            }
164            BatchRegionDdlRequest::Alter(requests) => {
165                let mut extension_return_value = HashMap::new();
166                let rows = self
167                    .inner
168                    .alter_regions(requests, &mut extension_return_value)
169                    .await
170                    .map_err(BoxedError::new)?;
171
172                Ok(RegionResponse {
173                    affected_rows: rows,
174                    extensions: extension_return_value,
175                    metadata: Vec::new(),
176                })
177            }
178            BatchRegionDdlRequest::Drop(requests) => {
179                self.handle_requests(
180                    requests
181                        .into_iter()
182                        .map(|(region_id, req)| (region_id, RegionRequest::Drop(req))),
183                )
184                .await
185            }
186        }
187    }
188
189    /// Handles non-query request to the region. Returns the count of affected rows.
190    async fn handle_request(
191        &self,
192        region_id: RegionId,
193        request: RegionRequest,
194    ) -> Result<RegionResponse, BoxedError> {
195        let mut extension_return_value = HashMap::new();
196
197        let result = match request {
198            RegionRequest::Put(put) => self.inner.put_region(region_id, put).await,
199            RegionRequest::Create(create) => {
200                self.inner
201                    .create_regions(vec![(region_id, create)], &mut extension_return_value)
202                    .await
203            }
204            RegionRequest::Drop(drop) => self.inner.drop_region(region_id, drop).await,
205            RegionRequest::Open(open) => self.inner.open_region(region_id, open).await,
206            RegionRequest::Close(close) => self.inner.close_region(region_id, close).await,
207            RegionRequest::Alter(alter) => {
208                self.inner
209                    .alter_regions(vec![(region_id, alter)], &mut extension_return_value)
210                    .await
211            }
212            RegionRequest::Compact(_) => {
213                if self.inner.is_physical_region(region_id) {
214                    self.inner
215                        .mito
216                        .handle_request(region_id, request)
217                        .await
218                        .context(error::MitoFlushOperationSnafu)
219                        .map(|response| response.affected_rows)
220                } else {
221                    UnsupportedRegionRequestSnafu { request }.fail()
222                }
223            }
224            RegionRequest::Flush(req) => self.inner.flush_region(region_id, req).await,
225            RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(),
226            RegionRequest::Delete(_) => {
227                if self.inner.is_physical_region(region_id) {
228                    self.inner
229                        .mito
230                        .handle_request(region_id, request)
231                        .await
232                        .context(error::MitoDeleteOperationSnafu)
233                        .map(|response| response.affected_rows)
234                } else {
235                    UnsupportedRegionRequestSnafu { request }.fail()
236                }
237            }
238            RegionRequest::Catchup(req) => self.inner.catchup_region(region_id, req).await,
239            RegionRequest::BulkInserts(_) => {
240                // todo(hl): find a way to support bulk inserts in metric engine.
241                UnsupportedRegionRequestSnafu { request }.fail()
242            }
243        };
244
245        result.map_err(BoxedError::new).map(|rows| RegionResponse {
246            affected_rows: rows,
247            extensions: extension_return_value,
248            metadata: Vec::new(),
249        })
250    }
251
252    async fn handle_query(
253        &self,
254        region_id: RegionId,
255        request: ScanRequest,
256    ) -> Result<RegionScannerRef, BoxedError> {
257        self.handle_query(region_id, request).await
258    }
259
260    async fn get_last_seq_num(
261        &self,
262        region_id: RegionId,
263    ) -> Result<Option<SequenceNumber>, BoxedError> {
264        self.inner
265            .get_last_seq_num(region_id)
266            .await
267            .map_err(BoxedError::new)
268    }
269
270    /// Retrieves region's metadata.
271    async fn get_metadata(&self, region_id: RegionId) -> Result<RegionMetadataRef, BoxedError> {
272        self.inner
273            .load_region_metadata(region_id)
274            .await
275            .map_err(BoxedError::new)
276    }
277
278    /// Retrieves region's disk usage.
279    ///
280    /// Note: Returns `None` if it's a logical region.
281    fn region_statistic(&self, region_id: RegionId) -> Option<RegionStatistic> {
282        if self.inner.is_physical_region(region_id) {
283            get_region_statistic(&self.inner.mito, region_id)
284        } else {
285            None
286        }
287    }
288
289    /// Stops the engine
290    async fn stop(&self) -> Result<(), BoxedError> {
291        // don't need to stop the underlying mito engine
292        Ok(())
293    }
294
295    fn set_region_role(&self, region_id: RegionId, role: RegionRole) -> Result<(), BoxedError> {
296        // ignore the region not found error
297        for x in [
298            utils::to_metadata_region_id(region_id),
299            utils::to_data_region_id(region_id),
300        ] {
301            if let Err(e) = self.inner.mito.set_region_role(x, role)
302                && e.status_code() != StatusCode::RegionNotFound
303            {
304                return Err(e);
305            }
306        }
307        Ok(())
308    }
309
310    async fn sync_region(
311        &self,
312        region_id: RegionId,
313        manifest_info: RegionManifestInfo,
314    ) -> Result<SyncManifestResponse, BoxedError> {
315        self.inner
316            .sync_region(region_id, manifest_info)
317            .await
318            .map_err(BoxedError::new)
319    }
320
321    async fn set_region_role_state_gracefully(
322        &self,
323        region_id: RegionId,
324        region_role_state: SettableRegionRoleState,
325    ) -> std::result::Result<SetRegionRoleStateResponse, BoxedError> {
326        let metadata_result = match self
327            .inner
328            .mito
329            .set_region_role_state_gracefully(
330                utils::to_metadata_region_id(region_id),
331                region_role_state,
332            )
333            .await?
334        {
335            SetRegionRoleStateResponse::Success(success) => success,
336            SetRegionRoleStateResponse::NotFound => {
337                return Ok(SetRegionRoleStateResponse::NotFound)
338            }
339        };
340
341        let data_result = match self
342            .inner
343            .mito
344            .set_region_role_state_gracefully(region_id, region_role_state)
345            .await?
346        {
347            SetRegionRoleStateResponse::Success(success) => success,
348            SetRegionRoleStateResponse::NotFound => {
349                return Ok(SetRegionRoleStateResponse::NotFound)
350            }
351        };
352
353        Ok(SetRegionRoleStateResponse::success(
354            SetRegionRoleStateSuccess::metric(
355                data_result.last_entry_id().unwrap_or_default(),
356                metadata_result.last_entry_id().unwrap_or_default(),
357            ),
358        ))
359    }
360
361    /// Returns the physical region role.
362    ///
363    /// Note: Returns `None` if it's a logical region.
364    fn role(&self, region_id: RegionId) -> Option<RegionRole> {
365        if self.inner.is_physical_region(region_id) {
366            self.inner.mito.role(region_id)
367        } else {
368            None
369        }
370    }
371
372    fn as_any(&self) -> &dyn Any {
373        self
374    }
375}
376
377impl MetricEngine {
378    pub fn try_new(mito: MitoEngine, mut config: EngineConfig) -> Result<Self> {
379        let metadata_region = MetadataRegion::new(mito.clone());
380        let data_region = DataRegion::new(mito.clone());
381        let state = Arc::new(RwLock::default());
382        config.sanitize();
383        let flush_interval = config.flush_metadata_region_interval;
384        let inner = Arc::new(MetricEngineInner {
385            mito: mito.clone(),
386            metadata_region,
387            data_region,
388            state: state.clone(),
389            config,
390            row_modifier: RowModifier::new(),
391            flush_task: RepeatedTask::new(
392                flush_interval,
393                Box::new(FlushMetadataRegionTask {
394                    state: state.clone(),
395                    mito: mito.clone(),
396                }),
397            ),
398        });
399        inner
400            .flush_task
401            .start(common_runtime::global_runtime())
402            .context(StartRepeatedTaskSnafu { name: "flush_task" })?;
403        Ok(Self { inner })
404    }
405
406    pub fn mito(&self) -> MitoEngine {
407        self.inner.mito.clone()
408    }
409
410    /// Returns all logical regions associated with the physical region.
411    pub async fn logical_regions(&self, physical_region_id: RegionId) -> Result<Vec<RegionId>> {
412        self.inner
413            .metadata_region
414            .logical_regions(physical_region_id)
415            .await
416    }
417
418    /// Handles substrait query and return a stream of record batches
419    async fn handle_query(
420        &self,
421        region_id: RegionId,
422        request: ScanRequest,
423    ) -> Result<RegionScannerRef, BoxedError> {
424        self.inner
425            .read_region(region_id, request)
426            .await
427            .map_err(BoxedError::new)
428    }
429
430    async fn handle_requests(
431        &self,
432        requests: impl IntoIterator<Item = (RegionId, RegionRequest)>,
433    ) -> Result<RegionResponse, BoxedError> {
434        let mut affected_rows = 0;
435        let mut extensions = HashMap::new();
436        for (region_id, request) in requests {
437            let response = self.handle_request(region_id, request).await?;
438            affected_rows += response.affected_rows;
439            extensions.extend(response.extensions);
440        }
441
442        Ok(RegionResponse {
443            affected_rows,
444            extensions,
445            metadata: Vec::new(),
446        })
447    }
448}
449
450#[cfg(test)]
451impl MetricEngine {
452    pub async fn scan_to_stream(
453        &self,
454        region_id: RegionId,
455        request: ScanRequest,
456    ) -> Result<common_recordbatch::SendableRecordBatchStream, BoxedError> {
457        self.inner.scan_to_stream(region_id, request).await
458    }
459
460    /// Returns the configuration of the engine.
461    pub fn config(&self) -> &EngineConfig {
462        &self.inner.config
463    }
464}
465
466struct MetricEngineInner {
467    mito: MitoEngine,
468    metadata_region: MetadataRegion,
469    data_region: DataRegion,
470    state: Arc<RwLock<MetricEngineState>>,
471    config: EngineConfig,
472    row_modifier: RowModifier,
473    flush_task: RepeatedTask<Error>,
474}
475
476#[cfg(test)]
477mod test {
478    use std::collections::HashMap;
479
480    use store_api::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
481    use store_api::region_request::{RegionCloseRequest, RegionOpenRequest};
482
483    use super::*;
484    use crate::test_util::TestEnv;
485
486    #[tokio::test]
487    async fn close_open_regions() {
488        let env = TestEnv::new().await;
489        env.init_metric_region().await;
490        let engine = env.metric();
491
492        // close physical region
493        let physical_region_id = env.default_physical_region_id();
494        engine
495            .handle_request(
496                physical_region_id,
497                RegionRequest::Close(RegionCloseRequest {}),
498            )
499            .await
500            .unwrap();
501
502        // reopen physical region
503        let physical_region_option = [(PHYSICAL_TABLE_METADATA_KEY.to_string(), String::new())]
504            .into_iter()
505            .collect();
506        let open_request = RegionOpenRequest {
507            engine: METRIC_ENGINE_NAME.to_string(),
508            region_dir: env.default_region_dir(),
509            options: physical_region_option,
510            skip_wal_replay: false,
511        };
512        engine
513            .handle_request(physical_region_id, RegionRequest::Open(open_request))
514            .await
515            .unwrap();
516
517        // close nonexistent region won't report error
518        let nonexistent_region_id = RegionId::new(12313, 12);
519        engine
520            .handle_request(
521                nonexistent_region_id,
522                RegionRequest::Close(RegionCloseRequest {}),
523            )
524            .await
525            .unwrap();
526
527        // open nonexistent region won't report error
528        let invalid_open_request = RegionOpenRequest {
529            engine: METRIC_ENGINE_NAME.to_string(),
530            region_dir: env.default_region_dir(),
531            options: HashMap::new(),
532            skip_wal_replay: false,
533        };
534        engine
535            .handle_request(
536                nonexistent_region_id,
537                RegionRequest::Open(invalid_open_request),
538            )
539            .await
540            .unwrap();
541    }
542
543    #[tokio::test]
544    async fn test_role() {
545        let env = TestEnv::new().await;
546        env.init_metric_region().await;
547
548        let logical_region_id = env.default_logical_region_id();
549        let physical_region_id = env.default_physical_region_id();
550
551        assert!(env.metric().role(logical_region_id).is_none());
552        assert!(env.metric().role(physical_region_id).is_some());
553    }
554
555    #[tokio::test]
556    async fn test_region_disk_usage() {
557        let env = TestEnv::new().await;
558        env.init_metric_region().await;
559
560        let logical_region_id = env.default_logical_region_id();
561        let physical_region_id = env.default_physical_region_id();
562
563        assert!(env.metric().region_statistic(logical_region_id).is_none());
564        assert!(env.metric().region_statistic(physical_region_id).is_some());
565    }
566}