metric_engine/engine/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use snafu::ResultExt;
16use store_api::region_engine::RegionEngine;
17use store_api::region_request::{AffectedRows, RegionFlushRequest, RegionRequest};
18use store_api::storage::RegionId;
19
20use crate::engine::MetricEngineInner;
21use crate::error::{MitoFlushOperationSnafu, Result, UnsupportedRegionRequestSnafu};
22use crate::utils;
23
24impl MetricEngineInner {
25    pub async fn flush_region(
26        &self,
27        region_id: RegionId,
28        req: RegionFlushRequest,
29    ) -> Result<AffectedRows> {
30        if !self.is_physical_region(region_id) {
31            return UnsupportedRegionRequestSnafu {
32                request: RegionRequest::Flush(req),
33            }
34            .fail();
35        }
36
37        let metadata_region_id = utils::to_metadata_region_id(region_id);
38        // Flushes the metadata region as well
39        self.mito
40            .handle_request(metadata_region_id, RegionRequest::Flush(req.clone()))
41            .await
42            .context(MitoFlushOperationSnafu)
43            .map(|response| response.affected_rows)?;
44
45        let data_region_id = utils::to_data_region_id(region_id);
46        self.mito
47            .handle_request(data_region_id, RegionRequest::Flush(req.clone()))
48            .await
49            .context(MitoFlushOperationSnafu)
50            .map(|response| response.affected_rows)
51    }
52}
53
54#[cfg(test)]
55mod tests {
56    use api::v1::Rows;
57    use futures_util::TryStreamExt;
58    use itertools::Itertools as _;
59    use store_api::region_request::RegionPutRequest;
60
61    use super::*;
62    use crate::test_util::{build_rows, row_schema_with_tags, TestEnv};
63
64    #[tokio::test]
65    async fn test_list_ssts_after_write_and_flush_metric() {
66        let env = TestEnv::new().await;
67        let engine = env.metric();
68
69        let phy_to_logi = vec![
70            (
71                RegionId::new(11, 1),
72                vec![RegionId::new(1111, 11), RegionId::new(1111, 12)],
73            ),
74            (RegionId::new(11, 2), vec![RegionId::new(1111, 2)]),
75            (RegionId::new(22, 42), vec![RegionId::new(2222, 42)]),
76        ];
77
78        for (phy_region_id, logi_region_ids) in &phy_to_logi {
79            env.create_physical_region(*phy_region_id, &TestEnv::default_table_dir())
80                .await;
81            for logi_region_id in logi_region_ids {
82                env.create_logical_region(*phy_region_id, *logi_region_id)
83                    .await;
84            }
85        }
86
87        // write to logical regions
88        for (_, logi_region_ids) in &phy_to_logi {
89            for logi_region_id in logi_region_ids {
90                let schema = row_schema_with_tags(&["job"]);
91                let rows = build_rows(1, 10);
92                let request = RegionRequest::Put(RegionPutRequest {
93                    rows: Rows { schema, rows },
94                    hint: None,
95                });
96                engine
97                    .handle_request(*logi_region_id, request)
98                    .await
99                    .unwrap();
100            }
101        }
102
103        for (phy_region_id, _) in &phy_to_logi {
104            engine
105                .handle_request(*phy_region_id, RegionRequest::Flush(Default::default()))
106                .await
107                .unwrap();
108        }
109
110        // list from manifest
111        let mito = env.mito();
112        let debug_format = mito
113            .all_ssts_from_manifest()
114            .map(|mut e| {
115                e.file_path = e.file_path.replace(&e.file_id, "<file_id>");
116                e.index_file_path = e
117                    .index_file_path
118                    .map(|path| path.replace(&e.file_id, "<file_id>"));
119                e.file_id = "<file_id>".to_string();
120                format!("\n{:?}", e)
121            })
122            .sorted()
123            .collect::<Vec<_>>()
124            .join("");
125        assert_eq!(
126            debug_format,
127            r#"
128ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20) }
129ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10) }
130ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3201, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8) }
131ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) }
132ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10) }
133ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) }"#
134        );
135
136        // list from storage
137        let storage_entries = mito
138            .all_ssts_from_storage()
139            .try_collect::<Vec<_>>()
140            .await
141            .unwrap();
142        let debug_format = storage_entries
143            .into_iter()
144            .map(|mut e| {
145                let i = e.file_path.rfind('/').unwrap();
146                e.file_path.replace_range(i..(i + 37), "/<file_id>");
147                format!("\n{:?}", e)
148            })
149            .sorted()
150            .collect::<Vec<_>>()
151            .join("");
152        assert_eq!(
153            debug_format,
154            r#"
155StorageSstEntry { file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: None, last_modified_ms: None }
156StorageSstEntry { file_path: "test_metric_region/11_0000000001/data/index/<file_id>.puffin", file_size: None, last_modified_ms: None }
157StorageSstEntry { file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: None, last_modified_ms: None }
158StorageSstEntry { file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: None, last_modified_ms: None }
159StorageSstEntry { file_path: "test_metric_region/11_0000000002/data/index/<file_id>.puffin", file_size: None, last_modified_ms: None }
160StorageSstEntry { file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: None, last_modified_ms: None }
161StorageSstEntry { file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: None, last_modified_ms: None }
162StorageSstEntry { file_path: "test_metric_region/22_0000000042/data/index/<file_id>.puffin", file_size: None, last_modified_ms: None }
163StorageSstEntry { file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: None, last_modified_ms: None }"#
164        );
165    }
166}