meta_srv/procedure/wal_prune/
utils.rs1use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::region_registry::LeaderRegionRegistryRef;
21use common_telemetry::warn;
22use itertools::{Itertools, MinMaxResult};
23use rskafka::client::partition::{OffsetAt, PartitionClient, UnknownTopicHandling};
24use rskafka::client::Client;
25use snafu::ResultExt;
26use store_api::storage::RegionId;
27
28use crate::error::{
29 BuildPartitionClientSnafu, DeleteRecordsSnafu, GetOffsetSnafu, Result,
30 TableMetadataManagerSnafu, UpdateTopicNameValueSnafu,
31};
32
33const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5);
35const DEFAULT_PARTITION: i32 = 0;
37
38fn missing_region_ids(
39 all_region_ids: &[RegionId],
40 result_set: &HashMap<RegionId, u64>,
41) -> Vec<RegionId> {
42 let mut missing_region_ids = Vec::new();
43 for region_id in all_region_ids {
44 if !result_set.contains_key(region_id) {
45 missing_region_ids.push(*region_id);
46 }
47 }
48 missing_region_ids
49}
50
51pub(crate) async fn find_pruneable_entry_id_for_topic(
57 table_metadata_manager: &TableMetadataManagerRef,
58 leader_region_registry: &LeaderRegionRegistryRef,
59 topic: &str,
60) -> Result<Option<u64>> {
61 let region_ids = table_metadata_manager
62 .topic_region_manager()
63 .regions(topic)
64 .await
65 .context(TableMetadataManagerSnafu)?
66 .into_keys()
67 .collect::<Vec<_>>();
68 if region_ids.is_empty() {
69 return Ok(None);
70 }
71
72 let prunable_entry_ids_map = leader_region_registry
74 .batch_get(region_ids.iter().cloned())
75 .into_iter()
76 .map(|(region_id, region)| {
77 let prunable_entry_id = region.manifest.prunable_entry_id();
78 (region_id, prunable_entry_id)
79 })
80 .collect();
81 let missing_region_ids = missing_region_ids(®ion_ids, &prunable_entry_ids_map);
82 if !missing_region_ids.is_empty() {
83 warn!(
84 "Cannot determine prunable entry id: missing region info from heartbeat. Topic: {}, missing region ids: {:?}",
85 topic, missing_region_ids
86 );
87 return Ok(None);
88 }
89
90 let min_max_result = prunable_entry_ids_map.values().minmax();
91 match min_max_result {
92 MinMaxResult::NoElements => Ok(None),
93 MinMaxResult::OneElement(prunable_entry_id) => Ok(Some(*prunable_entry_id)),
94 MinMaxResult::MinMax(min_prunable_entry_id, _) => Ok(Some(*min_prunable_entry_id)),
95 }
96}
97
98pub(crate) fn should_trigger_prune(current: Option<u64>, prunable_entry_id: u64) -> bool {
103 match current {
104 None => true, Some(current) => prunable_entry_id > current,
106 }
107}
108
109pub(crate) async fn get_partition_client(
111 client: &Arc<Client>,
112 topic: &str,
113) -> Result<PartitionClient> {
114 client
115 .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
116 .await
117 .context(BuildPartitionClientSnafu {
118 topic,
119 partition: DEFAULT_PARTITION,
120 })
121}
122
123pub(crate) async fn get_offsets_for_topic(
125 partition_client: &PartitionClient,
126 topic: &str,
127) -> Result<(u64, u64)> {
128 let earliest_offset = partition_client
129 .get_offset(OffsetAt::Earliest)
130 .await
131 .context(GetOffsetSnafu { topic })?;
132 let latest_offset = partition_client
133 .get_offset(OffsetAt::Latest)
134 .await
135 .context(GetOffsetSnafu { topic })?;
136
137 Ok((earliest_offset as u64, latest_offset as u64))
138}
139
140pub(crate) async fn update_pruned_entry_id(
142 table_metadata_manager: &TableMetadataManagerRef,
143 topic: &str,
144 pruned_entry_id: u64,
145) -> Result<()> {
146 let prev = table_metadata_manager
147 .topic_name_manager()
148 .get(topic)
149 .await
150 .context(TableMetadataManagerSnafu)?;
151
152 table_metadata_manager
153 .topic_name_manager()
154 .update(topic, pruned_entry_id, prev)
155 .await
156 .context(UpdateTopicNameValueSnafu { topic })?;
157
158 Ok(())
159}
160
161pub(crate) async fn delete_records(
163 partition_client: &PartitionClient,
164 topic: &str,
165 pruned_entry_id: u64,
166) -> Result<()> {
167 partition_client
168 .delete_records(
169 pruned_entry_id as i64,
174 DELETE_RECORDS_TIMEOUT.as_millis() as i32,
175 )
176 .await
177 .context(DeleteRecordsSnafu {
178 topic,
179 partition: DEFAULT_PARTITION,
180 offset: pruned_entry_id,
181 })
182}
183
184#[cfg(test)]
185mod tests {
186 use super::*;
187
188 #[test]
189 fn test_should_trigger_prune_none_current() {
190 assert!(should_trigger_prune(None, 10));
192 assert!(should_trigger_prune(None, 0));
193 }
194
195 #[test]
196 fn test_should_trigger_prune_prunable_greater_than_current() {
197 assert!(should_trigger_prune(Some(5), 6));
199 assert!(should_trigger_prune(Some(0), 1));
200 assert!(should_trigger_prune(Some(99), 100));
201 }
202
203 #[test]
204 fn test_should_not_trigger_prune_prunable_equal_to_current() {
205 assert!(!should_trigger_prune(Some(10), 10));
207 assert!(!should_trigger_prune(Some(0), 0));
208 }
209
210 #[test]
211 fn test_should_not_trigger_prune_prunable_less_than_current() {
212 assert!(!should_trigger_prune(Some(10), 9));
214 assert!(!should_trigger_prune(Some(100), 99));
215 }
216
217 #[test]
218 fn test_missing_region_ids_none_missing() {
219 let all_region_ids = vec![RegionId::new(1, 1), RegionId::new(2, 2)];
220 let mut result_set = HashMap::new();
221 result_set.insert(RegionId::new(1, 1), 10);
222 result_set.insert(RegionId::new(2, 2), 20);
223 let missing = missing_region_ids(&all_region_ids, &result_set);
224 assert!(missing.is_empty());
225 }
226
227 #[test]
228 fn test_missing_region_ids_some_missing() {
229 let all_region_ids = vec![
230 RegionId::new(1, 1),
231 RegionId::new(2, 2),
232 RegionId::new(3, 3),
233 ];
234 let mut result_set = HashMap::new();
235 result_set.insert(RegionId::new(1, 1), 10);
236 let missing = missing_region_ids(&all_region_ids, &result_set);
237 assert_eq!(missing, vec![RegionId::new(2, 2), RegionId::new(3, 3)]);
238 }
239
240 #[test]
241 fn test_missing_region_ids_all_missing() {
242 let all_region_ids = vec![RegionId::new(1, 1), RegionId::new(2, 2)];
243 let result_set = HashMap::new();
244 let missing = missing_region_ids(&all_region_ids, &result_set);
245 assert_eq!(missing, all_region_ids);
246 }
247
248 #[test]
249 fn test_missing_region_ids_empty_all() {
250 let all_region_ids: Vec<RegionId> = vec![];
251 let mut result_set = HashMap::new();
252 result_set.insert(RegionId::new(1, 1), 10);
253 let missing = missing_region_ids(&all_region_ids, &result_set);
254 assert!(missing.is_empty());
255 }
256}