1use std::collections::HashMap;
16
17use common_meta::DatanodeId;
18use common_meta::key::datanode_table::DatanodeTableManager;
19use common_meta::key::topic_name::{TopicNameKey, TopicNameManager, TopicNameValue};
20use common_meta::key::topic_region::{
21 ReplayCheckpoint as MetadataReplayCheckpoint, TopicRegionKey, TopicRegionManager,
22 TopicRegionValue,
23};
24use common_meta::kv_backend::KvBackendRef;
25use common_meta::wal_provider::{extract_topic_from_wal_options, prepare_wal_options};
26use futures::TryStreamExt;
27use snafu::ResultExt;
28use store_api::metric_engine_consts::METRIC_ENGINE_NAME;
29use store_api::path_utils::table_dir;
30use store_api::region_request::{PathType, RegionOpenRequest, ReplayCheckpoint};
31use store_api::storage::{RegionId, RegionNumber};
32use tracing::info;
33
34use crate::error::{GetMetadataSnafu, Result};
35
36pub struct RegionOpenRequests {
38 pub(crate) leader_regions: Vec<(RegionId, RegionOpenRequest)>,
39 #[cfg(feature = "enterprise")]
40 pub(crate) follower_regions: Vec<(RegionId, RegionOpenRequest)>,
41}
42
43impl RegionOpenRequests {
44 #[allow(clippy::type_complexity)]
46 pub fn into_parts(
47 self,
48 ) -> (
49 Vec<(RegionId, RegionOpenRequest)>,
50 Vec<(RegionId, RegionOpenRequest)>,
51 ) {
52 let leader_regions = self.leader_regions;
53 #[cfg(feature = "enterprise")]
54 let follower_regions = self.follower_regions;
55 #[cfg(not(feature = "enterprise"))]
56 let follower_regions = Vec::new();
57 (leader_regions, follower_regions)
58 }
59}
60
61fn group_region_by_topic(
62 region_id: RegionId,
63 region_options: &HashMap<RegionNumber, String>,
64 topic_regions: &mut HashMap<String, Vec<RegionId>>,
65) {
66 if let Some(topic) = extract_topic_from_wal_options(region_id, region_options) {
67 topic_regions.entry(topic).or_default().push(region_id);
68 }
69}
70
71fn region_pruned_entry_ids(
72 topic_regions: &HashMap<String, Vec<RegionId>>,
73 topic_name_values: &HashMap<String, TopicNameValue>,
74) -> HashMap<RegionId, u64> {
75 topic_regions
76 .iter()
77 .flat_map(|(topic, region_ids)| {
78 topic_name_values
79 .get(topic)
80 .into_iter()
81 .flat_map(move |value| {
82 region_ids
83 .iter()
84 .map(move |region_id| (*region_id, value.pruned_entry_id))
85 })
86 })
87 .collect()
88}
89
90fn get_replay_checkpoint(
91 region_id: RegionId,
92 topic_region_values: &Option<HashMap<RegionId, TopicRegionValue>>,
93 pruned_entry_id: Option<u64>,
94 is_metric_engine: bool,
95) -> Option<ReplayCheckpoint> {
96 let checkpoint = topic_region_values
97 .as_ref()
98 .and_then(|values| values.get(®ion_id))
99 .and_then(|value| value.checkpoint)
100 .map(|checkpoint| {
101 MetadataReplayCheckpoint::new(checkpoint.entry_id, checkpoint.metadata_entry_id)
102 });
103
104 MetadataReplayCheckpoint::merge_with_topic_pruned_entry_id(
105 checkpoint,
106 pruned_entry_id,
107 is_metric_engine,
108 )
109 .map(|checkpoint| ReplayCheckpoint {
110 entry_id: checkpoint.entry_id,
111 metadata_entry_id: checkpoint.metadata_entry_id,
112 })
113}
114
115pub async fn build_region_open_requests(
117 node_id: DatanodeId,
118 kv_backend: KvBackendRef,
119) -> Result<RegionOpenRequests> {
120 let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
121 let table_values = datanode_table_manager
122 .tables(node_id)
123 .try_collect::<Vec<_>>()
124 .await
125 .context(GetMetadataSnafu)?;
126
127 let topic_region_manager = TopicRegionManager::new(kv_backend.clone());
128 let topic_name_manager = TopicNameManager::new(kv_backend);
129 let mut topic_regions = HashMap::<String, Vec<RegionId>>::new();
130 let mut regions = vec![];
131 #[cfg(feature = "enterprise")]
132 let mut follower_regions = vec![];
133
134 for table_value in table_values {
135 for region_number in table_value.regions {
136 let region_id = RegionId::new(table_value.table_id, region_number);
137 let mut region_options = table_value.region_info.region_options.clone();
139 prepare_wal_options(
140 &mut region_options,
141 region_id,
142 &table_value.region_info.region_wal_options,
143 );
144 group_region_by_topic(
145 region_id,
146 &table_value.region_info.region_wal_options,
147 &mut topic_regions,
148 );
149
150 regions.push((
151 region_id,
152 table_value.region_info.engine.clone(),
153 table_value.region_info.region_storage_path.clone(),
154 region_options,
155 ));
156 }
157
158 #[cfg(feature = "enterprise")]
159 for region_number in table_value.follower_regions {
160 let region_id = RegionId::new(table_value.table_id, region_number);
161 let mut region_options = table_value.region_info.region_options.clone();
163 prepare_wal_options(
164 &mut region_options,
165 RegionId::new(table_value.table_id, region_number),
166 &table_value.region_info.region_wal_options,
167 );
168 group_region_by_topic(
169 region_id,
170 &table_value.region_info.region_wal_options,
171 &mut topic_regions,
172 );
173
174 follower_regions.push((
175 RegionId::new(table_value.table_id, region_number),
176 table_value.region_info.engine.clone(),
177 table_value.region_info.region_storage_path.clone(),
178 region_options,
179 ));
180 }
181 }
182
183 let topic_region_values = if !topic_regions.is_empty() {
184 let keys = topic_regions
185 .iter()
186 .flat_map(|(topic, regions)| {
187 regions
188 .iter()
189 .map(|region_id| TopicRegionKey::new(*region_id, topic))
190 })
191 .collect::<Vec<_>>();
192 let topic_region_manager = topic_region_manager
193 .batch_get(keys)
194 .await
195 .context(GetMetadataSnafu)?;
196 Some(topic_region_manager)
197 } else {
198 None
199 };
200
201 let topic_name_values = if !topic_regions.is_empty() {
202 let topics = topic_regions
203 .keys()
204 .map(|topic| TopicNameKey::new(topic))
205 .collect::<Vec<_>>();
206 Some(
207 topic_name_manager
208 .batch_get(topics)
209 .await
210 .context(GetMetadataSnafu)?,
211 )
212 } else {
213 None
214 };
215 let region_pruned_entry_ids = topic_name_values
216 .as_ref()
217 .map(|values| region_pruned_entry_ids(&topic_regions, values));
218
219 let mut leader_region_requests = Vec::with_capacity(regions.len());
220 for (region_id, engine, store_path, options) in regions {
221 let table_dir = table_dir(&store_path, region_id.table_id());
222 let pruned_entry_id = region_pruned_entry_ids
223 .as_ref()
224 .and_then(|values| values.get(®ion_id).copied());
225 let checkpoint = get_replay_checkpoint(
226 region_id,
227 &topic_region_values,
228 pruned_entry_id,
229 engine == METRIC_ENGINE_NAME,
230 );
231 info!("region_id: {}, checkpoint: {:?}", region_id, checkpoint);
232 leader_region_requests.push((
233 region_id,
234 RegionOpenRequest {
235 engine,
236 table_dir,
237 path_type: PathType::Bare,
238 options,
239 skip_wal_replay: false,
240 checkpoint,
241 requirements: Default::default(),
242 },
243 ));
244 }
245
246 #[cfg(feature = "enterprise")]
247 let follower_region_requests = {
248 let mut follower_region_requests = Vec::with_capacity(follower_regions.len());
249 for (region_id, engine, store_path, options) in follower_regions {
250 let table_dir = table_dir(&store_path, region_id.table_id());
251 follower_region_requests.push((
252 region_id,
253 RegionOpenRequest {
254 engine,
255 table_dir,
256 path_type: PathType::Bare,
257 options,
258 skip_wal_replay: true,
259 checkpoint: None,
260 requirements: Default::default(),
261 },
262 ));
263 }
264 follower_region_requests
265 };
266
267 Ok(RegionOpenRequests {
268 leader_regions: leader_region_requests,
269 #[cfg(feature = "enterprise")]
270 follower_regions: follower_region_requests,
271 })
272}