meta_srv/procedure/wal_prune/
utils.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::sync::Arc;
17use std::time::Duration;
18
19use common_meta::key::TableMetadataManagerRef;
20use common_meta::region_registry::LeaderRegionRegistryRef;
21use common_telemetry::warn;
22use itertools::{Itertools, MinMaxResult};
23use rskafka::client::partition::{OffsetAt, PartitionClient, UnknownTopicHandling};
24use rskafka::client::Client;
25use snafu::ResultExt;
26use store_api::storage::RegionId;
27
28use crate::error::{
29    BuildPartitionClientSnafu, DeleteRecordsSnafu, GetOffsetSnafu, Result,
30    TableMetadataManagerSnafu, UpdateTopicNameValueSnafu,
31};
32
33/// The default timeout for deleting records.
34const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5);
35/// The default partition.
36const DEFAULT_PARTITION: i32 = 0;
37
38fn missing_region_ids(
39    all_region_ids: &[RegionId],
40    result_set: &HashMap<RegionId, u64>,
41) -> Vec<RegionId> {
42    let mut missing_region_ids = Vec::new();
43    for region_id in all_region_ids {
44        if !result_set.contains_key(region_id) {
45            missing_region_ids.push(*region_id);
46        }
47    }
48    missing_region_ids
49}
50
51/// Finds the prunable entry id for the topic.
52///
53/// Returns `None` if:
54/// - The topic has no region.
55/// - Some region info is missing from heartbeat.
56pub(crate) async fn find_pruneable_entry_id_for_topic(
57    table_metadata_manager: &TableMetadataManagerRef,
58    leader_region_registry: &LeaderRegionRegistryRef,
59    topic: &str,
60) -> Result<Option<u64>> {
61    let region_ids = table_metadata_manager
62        .topic_region_manager()
63        .regions(topic)
64        .await
65        .context(TableMetadataManagerSnafu)?
66        .into_keys()
67        .collect::<Vec<_>>();
68    if region_ids.is_empty() {
69        return Ok(None);
70    }
71
72    // Get the prunable entry id for each region.
73    let prunable_entry_ids_map = leader_region_registry
74        .batch_get(region_ids.iter().cloned())
75        .into_iter()
76        .map(|(region_id, region)| {
77            let prunable_entry_id = region.manifest.prunable_entry_id();
78            (region_id, prunable_entry_id)
79        })
80        .collect();
81    let missing_region_ids = missing_region_ids(&region_ids, &prunable_entry_ids_map);
82    if !missing_region_ids.is_empty() {
83        warn!(
84            "Cannot determine prunable entry id: missing region info from heartbeat. Topic: {}, missing region ids: {:?}",
85            topic, missing_region_ids
86        );
87        return Ok(None);
88    }
89
90    let min_max_result = prunable_entry_ids_map.values().minmax();
91    match min_max_result {
92        MinMaxResult::NoElements => Ok(None),
93        MinMaxResult::OneElement(prunable_entry_id) => Ok(Some(*prunable_entry_id)),
94        MinMaxResult::MinMax(min_prunable_entry_id, _) => Ok(Some(*min_prunable_entry_id)),
95    }
96}
97
98/// Determines whether pruning should be triggered based on the current pruned entry id and the prunable entry id.
99/// Returns true if:
100/// - There is no current pruned entry id (i.e., pruning has never occurred).
101/// - The current pruned entry id is greater than the prunable entry id (i.e., there is something to prune).
102pub(crate) fn should_trigger_prune(current: Option<u64>, prunable_entry_id: u64) -> bool {
103    match current {
104        None => true, // No pruning has occurred yet, should trigger immediately.
105        Some(current) => prunable_entry_id > current,
106    }
107}
108
109/// Returns a partition client for the given topic.
110pub(crate) async fn get_partition_client(
111    client: &Arc<Client>,
112    topic: &str,
113) -> Result<PartitionClient> {
114    client
115        .partition_client(topic, DEFAULT_PARTITION, UnknownTopicHandling::Retry)
116        .await
117        .context(BuildPartitionClientSnafu {
118            topic,
119            partition: DEFAULT_PARTITION,
120        })
121}
122
123/// Returns the earliest and latest offsets for the given topic.
124pub(crate) async fn get_offsets_for_topic(
125    partition_client: &PartitionClient,
126    topic: &str,
127) -> Result<(u64, u64)> {
128    let earliest_offset = partition_client
129        .get_offset(OffsetAt::Earliest)
130        .await
131        .context(GetOffsetSnafu { topic })?;
132    let latest_offset = partition_client
133        .get_offset(OffsetAt::Latest)
134        .await
135        .context(GetOffsetSnafu { topic })?;
136
137    Ok((earliest_offset as u64, latest_offset as u64))
138}
139
140/// Updates the pruned entry id for the given topic.
141pub(crate) async fn update_pruned_entry_id(
142    table_metadata_manager: &TableMetadataManagerRef,
143    topic: &str,
144    pruned_entry_id: u64,
145) -> Result<()> {
146    let prev = table_metadata_manager
147        .topic_name_manager()
148        .get(topic)
149        .await
150        .context(TableMetadataManagerSnafu)?;
151
152    table_metadata_manager
153        .topic_name_manager()
154        .update(topic, pruned_entry_id, prev)
155        .await
156        .context(UpdateTopicNameValueSnafu { topic })?;
157
158    Ok(())
159}
160
161/// Deletes the records for the given topic.
162pub(crate) async fn delete_records(
163    partition_client: &PartitionClient,
164    topic: &str,
165    pruned_entry_id: u64,
166) -> Result<()> {
167    partition_client
168        .delete_records(
169            // Note: here no "+1" is needed because the offset arg is exclusive,
170            // and it's defensive programming just in case somewhere else have a off by one error,
171            // see https://kafka.apache.org/36/javadoc/org/apache/kafka/clients/consumer/KafkaConsumer.html#endOffsets(java.util.Collection)
172            // which we use to get the end offset from high watermark
173            pruned_entry_id as i64,
174            DELETE_RECORDS_TIMEOUT.as_millis() as i32,
175        )
176        .await
177        .context(DeleteRecordsSnafu {
178            topic,
179            partition: DEFAULT_PARTITION,
180            offset: pruned_entry_id,
181        })
182}
183
184#[cfg(test)]
185mod tests {
186    use super::*;
187
188    #[test]
189    fn test_should_trigger_prune_none_current() {
190        // No pruning has occurred yet, should trigger
191        assert!(should_trigger_prune(None, 10));
192        assert!(should_trigger_prune(None, 0));
193    }
194
195    #[test]
196    fn test_should_trigger_prune_prunable_greater_than_current() {
197        // Prunable entry id is greater than current, should trigger
198        assert!(should_trigger_prune(Some(5), 6));
199        assert!(should_trigger_prune(Some(0), 1));
200        assert!(should_trigger_prune(Some(99), 100));
201    }
202
203    #[test]
204    fn test_should_not_trigger_prune_prunable_equal_to_current() {
205        // Prunable entry id is equal to current, should not trigger
206        assert!(!should_trigger_prune(Some(10), 10));
207        assert!(!should_trigger_prune(Some(0), 0));
208    }
209
210    #[test]
211    fn test_should_not_trigger_prune_prunable_less_than_current() {
212        // Prunable entry id is less than current, should not trigger
213        assert!(!should_trigger_prune(Some(10), 9));
214        assert!(!should_trigger_prune(Some(100), 99));
215    }
216
217    #[test]
218    fn test_missing_region_ids_none_missing() {
219        let all_region_ids = vec![RegionId::new(1, 1), RegionId::new(2, 2)];
220        let mut result_set = HashMap::new();
221        result_set.insert(RegionId::new(1, 1), 10);
222        result_set.insert(RegionId::new(2, 2), 20);
223        let missing = missing_region_ids(&all_region_ids, &result_set);
224        assert!(missing.is_empty());
225    }
226
227    #[test]
228    fn test_missing_region_ids_some_missing() {
229        let all_region_ids = vec![
230            RegionId::new(1, 1),
231            RegionId::new(2, 2),
232            RegionId::new(3, 3),
233        ];
234        let mut result_set = HashMap::new();
235        result_set.insert(RegionId::new(1, 1), 10);
236        let missing = missing_region_ids(&all_region_ids, &result_set);
237        assert_eq!(missing, vec![RegionId::new(2, 2), RegionId::new(3, 3)]);
238    }
239
240    #[test]
241    fn test_missing_region_ids_all_missing() {
242        let all_region_ids = vec![RegionId::new(1, 1), RegionId::new(2, 2)];
243        let result_set = HashMap::new();
244        let missing = missing_region_ids(&all_region_ids, &result_set);
245        assert_eq!(missing, all_region_ids);
246    }
247
248    #[test]
249    fn test_missing_region_ids_empty_all() {
250        let all_region_ids: Vec<RegionId> = vec![];
251        let mut result_set = HashMap::new();
252        result_set.insert(RegionId::new(1, 1), 10);
253        let missing = missing_region_ids(&all_region_ids, &result_set);
254        assert!(missing.is_empty());
255    }
256}