1use std::collections::HashMap;
16
17use common_meta::key::datanode_table::DatanodeTableManager;
18use common_meta::key::topic_region::{TopicRegionKey, TopicRegionManager, TopicRegionValue};
19use common_meta::kv_backend::KvBackendRef;
20use common_meta::wal_options_allocator::{extract_topic_from_wal_options, prepare_wal_options};
21use common_meta::DatanodeId;
22use futures::TryStreamExt;
23use snafu::ResultExt;
24use store_api::path_utils::table_dir;
25use store_api::region_request::{PathType, RegionOpenRequest, ReplayCheckpoint};
26use store_api::storage::{RegionId, RegionNumber};
27use tracing::info;
28
29use crate::error::{GetMetadataSnafu, Result};
30
31pub(crate) struct RegionOpenRequests {
33 pub leader_regions: Vec<(RegionId, RegionOpenRequest)>,
34 #[cfg(feature = "enterprise")]
35 pub follower_regions: Vec<(RegionId, RegionOpenRequest)>,
36}
37
38fn group_region_by_topic(
39 region_id: RegionId,
40 region_options: &HashMap<RegionNumber, String>,
41 topic_regions: &mut HashMap<String, Vec<RegionId>>,
42) {
43 if let Some(topic) = extract_topic_from_wal_options(region_id, region_options) {
44 topic_regions.entry(topic).or_default().push(region_id);
45 }
46}
47
48fn get_replay_checkpoint(
49 region_id: RegionId,
50 topic_region_values: &Option<HashMap<RegionId, TopicRegionValue>>,
51) -> Option<ReplayCheckpoint> {
52 let topic_region_values = topic_region_values.as_ref()?;
53 let topic_region_value = topic_region_values.get(®ion_id);
54 let replay_checkpoint = topic_region_value.and_then(|value| value.checkpoint);
55 replay_checkpoint.map(|checkpoint| ReplayCheckpoint {
56 entry_id: checkpoint.entry_id,
57 metadata_entry_id: checkpoint.metadata_entry_id,
58 })
59}
60
61pub(crate) async fn build_region_open_requests(
62 node_id: DatanodeId,
63 kv_backend: KvBackendRef,
64) -> Result<RegionOpenRequests> {
65 let datanode_table_manager = DatanodeTableManager::new(kv_backend.clone());
66 let table_values = datanode_table_manager
67 .tables(node_id)
68 .try_collect::<Vec<_>>()
69 .await
70 .context(GetMetadataSnafu)?;
71
72 let topic_region_manager = TopicRegionManager::new(kv_backend);
73 let mut topic_regions = HashMap::<String, Vec<RegionId>>::new();
74 let mut regions = vec![];
75 #[cfg(feature = "enterprise")]
76 let mut follower_regions = vec![];
77
78 for table_value in table_values {
79 for region_number in table_value.regions {
80 let region_id = RegionId::new(table_value.table_id, region_number);
81 let mut region_options = table_value.region_info.region_options.clone();
83 prepare_wal_options(
84 &mut region_options,
85 region_id,
86 &table_value.region_info.region_wal_options,
87 );
88 group_region_by_topic(
89 region_id,
90 &table_value.region_info.region_wal_options,
91 &mut topic_regions,
92 );
93
94 regions.push((
95 region_id,
96 table_value.region_info.engine.clone(),
97 table_value.region_info.region_storage_path.clone(),
98 region_options,
99 ));
100 }
101
102 #[cfg(feature = "enterprise")]
103 for region_number in table_value.follower_regions {
104 let region_id = RegionId::new(table_value.table_id, region_number);
105 let mut region_options = table_value.region_info.region_options.clone();
107 prepare_wal_options(
108 &mut region_options,
109 RegionId::new(table_value.table_id, region_number),
110 &table_value.region_info.region_wal_options,
111 );
112 group_region_by_topic(
113 region_id,
114 &table_value.region_info.region_wal_options,
115 &mut topic_regions,
116 );
117
118 follower_regions.push((
119 RegionId::new(table_value.table_id, region_number),
120 table_value.region_info.engine.clone(),
121 table_value.region_info.region_storage_path.clone(),
122 region_options,
123 ));
124 }
125 }
126
127 let topic_region_values = if !topic_regions.is_empty() {
128 let keys = topic_regions
129 .iter()
130 .flat_map(|(topic, regions)| {
131 regions
132 .iter()
133 .map(|region_id| TopicRegionKey::new(*region_id, topic))
134 })
135 .collect::<Vec<_>>();
136 let topic_region_manager = topic_region_manager
137 .batch_get(keys)
138 .await
139 .context(GetMetadataSnafu)?;
140 Some(topic_region_manager)
141 } else {
142 None
143 };
144
145 let mut leader_region_requests = Vec::with_capacity(regions.len());
146 for (region_id, engine, store_path, options) in regions {
147 let table_dir = table_dir(&store_path, region_id.table_id());
148 let checkpoint = get_replay_checkpoint(region_id, &topic_region_values);
149 info!("region_id: {}, checkpoint: {:?}", region_id, checkpoint);
150 leader_region_requests.push((
151 region_id,
152 RegionOpenRequest {
153 engine,
154 table_dir,
155 path_type: PathType::Bare,
156 options,
157 skip_wal_replay: false,
158 checkpoint,
159 },
160 ));
161 }
162
163 #[cfg(feature = "enterprise")]
164 let follower_region_requests = {
165 let mut follower_region_requests = Vec::with_capacity(follower_regions.len());
166 for (region_id, engine, store_path, options) in follower_regions {
167 let table_dir = table_dir(&store_path, region_id.table_id());
168 follower_region_requests.push((
169 region_id,
170 RegionOpenRequest {
171 engine,
172 table_dir,
173 path_type: PathType::Bare,
174 options,
175 skip_wal_replay: true,
176 checkpoint: None,
177 },
178 ));
179 }
180 follower_region_requests
181 };
182
183 Ok(RegionOpenRequests {
184 leader_regions: leader_region_requests,
185 #[cfg(feature = "enterprise")]
186 follower_regions: follower_region_requests,
187 })
188}