1use snafu::ResultExt;
16use store_api::region_engine::RegionEngine;
17use store_api::region_request::{AffectedRows, RegionFlushRequest, RegionRequest};
18use store_api::storage::RegionId;
19
20use crate::engine::MetricEngineInner;
21use crate::error::{MitoFlushOperationSnafu, Result, UnsupportedRegionRequestSnafu};
22use crate::utils;
23
24impl MetricEngineInner {
25 pub async fn flush_region(
26 &self,
27 region_id: RegionId,
28 req: RegionFlushRequest,
29 ) -> Result<AffectedRows> {
30 if !self.is_physical_region(region_id) {
31 return UnsupportedRegionRequestSnafu {
32 request: RegionRequest::Flush(req),
33 }
34 .fail();
35 }
36
37 let metadata_region_id = utils::to_metadata_region_id(region_id);
38 self.mito
40 .handle_request(metadata_region_id, RegionRequest::Flush(req.clone()))
41 .await
42 .context(MitoFlushOperationSnafu)
43 .map(|response| response.affected_rows)?;
44
45 let data_region_id = utils::to_data_region_id(region_id);
46 self.mito
47 .handle_request(data_region_id, RegionRequest::Flush(req.clone()))
48 .await
49 .context(MitoFlushOperationSnafu)
50 .map(|response| response.affected_rows)
51 }
52}
53
54#[cfg(test)]
55mod tests {
56 use api::v1::Rows;
57 use futures_util::TryStreamExt;
58 use itertools::Itertools as _;
59 use store_api::region_request::RegionPutRequest;
60
61 use super::*;
62 use crate::test_util::{build_rows, row_schema_with_tags, TestEnv};
63
64 #[tokio::test]
65 async fn test_list_ssts_after_write_and_flush_metric() {
66 let env = TestEnv::new().await;
67 let engine = env.metric();
68
69 let phy_to_logi = vec![
70 (
71 RegionId::new(11, 1),
72 vec![RegionId::new(1111, 11), RegionId::new(1111, 12)],
73 ),
74 (RegionId::new(11, 2), vec![RegionId::new(1111, 2)]),
75 (RegionId::new(22, 42), vec![RegionId::new(2222, 42)]),
76 ];
77
78 for (phy_region_id, logi_region_ids) in &phy_to_logi {
79 env.create_physical_region(*phy_region_id, &TestEnv::default_table_dir())
80 .await;
81 for logi_region_id in logi_region_ids {
82 env.create_logical_region(*phy_region_id, *logi_region_id)
83 .await;
84 }
85 }
86
87 for (_, logi_region_ids) in &phy_to_logi {
89 for logi_region_id in logi_region_ids {
90 let schema = row_schema_with_tags(&["job"]);
91 let rows = build_rows(1, 10);
92 let request = RegionRequest::Put(RegionPutRequest {
93 rows: Rows { schema, rows },
94 hint: None,
95 });
96 engine
97 .handle_request(*logi_region_id, request)
98 .await
99 .unwrap();
100 }
101 }
102
103 for (phy_region_id, _) in &phy_to_logi {
104 engine
105 .handle_request(*phy_region_id, RegionRequest::Flush(Default::default()))
106 .await
107 .unwrap();
108 }
109
110 let mito = env.mito();
112 let debug_format = mito
113 .all_ssts_from_manifest()
114 .map(|mut e| {
115 e.file_path = e.file_path.replace(&e.file_id, "<file_id>");
116 e.index_file_path = e
117 .index_file_path
118 .map(|path| path.replace(&e.file_id, "<file_id>"));
119 e.file_id = "<file_id>".to_string();
120 format!("\n{:?}", e)
121 })
122 .sorted()
123 .collect::<Vec<_>>()
124 .join("");
125 assert_eq!(
126 debug_format,
127 r#"
128ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640257(11, 1), table_id: 11, region_number: 1, region_group: 0, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000001/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(20) }
129ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47244640258(11, 2), table_id: 11, region_number: 2, region_group: 0, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/11_0000000002/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10) }
130ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417473(11, 16777217), table_id: 11, region_number: 16777217, region_group: 1, region_sequence: 1, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: 3201, index_file_path: None, index_file_size: None, num_rows: 8, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(8) }
131ManifestSstEntry { table_dir: "test_metric_region/", region_id: 47261417474(11, 16777218), table_id: 11, region_number: 16777218, region_group: 1, region_sequence: 2, file_id: "<file_id>", level: 0, file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) }
132ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94489280554(22, 42), table_id: 22, region_number: 42, region_group: 0, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: 3157, index_file_path: Some("test_metric_region/22_0000000042/data/index/<file_id>.puffin"), index_file_size: Some(235), num_rows: 10, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 9::Millisecond, sequence: Some(10) }
133ManifestSstEntry { table_dir: "test_metric_region/", region_id: 94506057770(22, 16777258), table_id: 22, region_number: 16777258, region_group: 1, region_sequence: 42, file_id: "<file_id>", level: 0, file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: 3185, index_file_path: None, index_file_size: None, num_rows: 4, num_row_groups: 1, min_ts: 0::Millisecond, max_ts: 0::Millisecond, sequence: Some(4) }"#
134 );
135
136 let storage_entries = mito
138 .all_ssts_from_storage()
139 .try_collect::<Vec<_>>()
140 .await
141 .unwrap();
142 let debug_format = storage_entries
143 .into_iter()
144 .map(|mut e| {
145 let i = e.file_path.rfind('/').unwrap();
146 e.file_path.replace_range(i..(i + 37), "/<file_id>");
147 format!("\n{:?}", e)
148 })
149 .sorted()
150 .collect::<Vec<_>>()
151 .join("");
152 assert_eq!(
153 debug_format,
154 r#"
155StorageSstEntry { file_path: "test_metric_region/11_0000000001/data/<file_id>.parquet", file_size: None, last_modified_ms: None }
156StorageSstEntry { file_path: "test_metric_region/11_0000000001/data/index/<file_id>.puffin", file_size: None, last_modified_ms: None }
157StorageSstEntry { file_path: "test_metric_region/11_0000000001/metadata/<file_id>.parquet", file_size: None, last_modified_ms: None }
158StorageSstEntry { file_path: "test_metric_region/11_0000000002/data/<file_id>.parquet", file_size: None, last_modified_ms: None }
159StorageSstEntry { file_path: "test_metric_region/11_0000000002/data/index/<file_id>.puffin", file_size: None, last_modified_ms: None }
160StorageSstEntry { file_path: "test_metric_region/11_0000000002/metadata/<file_id>.parquet", file_size: None, last_modified_ms: None }
161StorageSstEntry { file_path: "test_metric_region/22_0000000042/data/<file_id>.parquet", file_size: None, last_modified_ms: None }
162StorageSstEntry { file_path: "test_metric_region/22_0000000042/data/index/<file_id>.puffin", file_size: None, last_modified_ms: None }
163StorageSstEntry { file_path: "test_metric_region/22_0000000042/metadata/<file_id>.parquet", file_size: None, last_modified_ms: None }"#
164 );
165 }
166}