Skip to main content

meta_srv/procedure/
wal_prune.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod manager;
16#[cfg(test)]
17mod test_util;
18pub(crate) mod utils;
19
20use std::sync::Arc;
21
22use common_error::ext::BoxedError;
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::lock_key::RemoteWalLock;
25use common_meta::region_registry::LeaderRegionRegistryRef;
26use common_procedure::error::ToJsonSnafu;
27use common_procedure::{
28    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
29    Result as ProcedureResult, Status, StringKey,
30};
31use common_telemetry::{info, warn};
32use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
33use rskafka::client::Client;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt;
36use store_api::logstore::EntryId;
37
38use crate::Result;
39use crate::error::{self};
40use crate::procedure::wal_prune::utils::{
41    delete_records, get_offsets_for_topic, get_partition_client, update_pruned_entry_id,
42};
43
44pub type KafkaClientRef = Arc<Client>;
45
46#[derive(Clone)]
47pub struct Context {
48    /// The Kafka client.
49    pub client: KafkaClientRef,
50    /// The table metadata manager.
51    pub table_metadata_manager: TableMetadataManagerRef,
52    /// The leader region registry.
53    pub leader_region_registry: LeaderRegionRegistryRef,
54}
55
56/// The data of WAL pruning.
57#[derive(Serialize, Deserialize)]
58pub struct WalPruneData {
59    /// The topic name to prune.
60    pub topic: String,
61    /// The minimum flush entry id for topic, which is used to prune the WAL.
62    pub prunable_entry_id: EntryId,
63}
64
65/// The procedure to prune WAL.
66pub struct WalPruneProcedure {
67    pub data: WalPruneData,
68    pub context: Context,
69    pub _guard: Option<WalPruneProcedureGuard>,
70}
71
72impl WalPruneProcedure {
73    const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
74
75    pub fn new(
76        context: Context,
77        guard: Option<WalPruneProcedureGuard>,
78        topic: String,
79        prunable_entry_id: u64,
80    ) -> Self {
81        Self {
82            data: WalPruneData {
83                topic,
84                prunable_entry_id,
85            },
86            context,
87            _guard: guard,
88        }
89    }
90
91    pub fn from_json(
92        json: &str,
93        context: &Context,
94        tracker: WalPruneProcedureTracker,
95    ) -> ProcedureResult<Self> {
96        let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
97        let guard = tracker.insert_running_procedure(data.topic.clone());
98        Ok(Self {
99            data,
100            context: context.clone(),
101            _guard: guard,
102        })
103    }
104
105    /// Prune the WAL and persist the minimum prunable entry id.
106    ///
107    /// Retry:
108    /// - Kafka client errors that have exhausted rskafka's internal retry.
109    /// - Failed to update the pruned entry id in the table metadata manager.
110    pub async fn on_prune(&mut self) -> Result<Status> {
111        let partition_client = get_partition_client(&self.context.client, &self.data.topic).await?;
112        let (earliest_offset, latest_offset) =
113            get_offsets_for_topic(&partition_client, &self.data.topic).await?;
114        if self.data.prunable_entry_id <= earliest_offset {
115            warn!(
116                "The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}, latest offset: {}",
117                self.data.topic, self.data.prunable_entry_id, earliest_offset, latest_offset
118            );
119            return Ok(Status::done());
120        }
121
122        // Delete records.
123        delete_records(
124            &partition_client,
125            &self.data.topic,
126            self.data.prunable_entry_id,
127        )
128        .await?;
129
130        // Update the pruned entry id for the topic.
131        update_pruned_entry_id(
132            &self.context.table_metadata_manager,
133            &self.data.topic,
134            self.data.prunable_entry_id,
135        )
136        .await
137        .map_err(BoxedError::new)
138        .with_context(|_| error::RetryLaterWithSourceSnafu {
139            reason: format!(
140                "Failed to update pruned entry id for topic: {}",
141                self.data.topic
142            ),
143        })?;
144
145        info!(
146            "Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}",
147            self.data.topic, self.data.prunable_entry_id, latest_offset
148        );
149        Ok(Status::done())
150    }
151}
152
153#[async_trait::async_trait]
154impl Procedure for WalPruneProcedure {
155    fn type_name(&self) -> &str {
156        Self::TYPE_NAME
157    }
158
159    fn rollback_supported(&self) -> bool {
160        false
161    }
162
163    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
164        self.on_prune().await.map_err(|e| {
165            if e.is_retryable() {
166                ProcedureError::retry_later(e)
167            } else {
168                ProcedureError::external(e)
169            }
170        })
171    }
172
173    fn dump(&self) -> ProcedureResult<String> {
174        serde_json::to_string(&self.data).context(ToJsonSnafu)
175    }
176
177    /// WAL prune procedure will read the topic-region map from the table metadata manager,
178    /// which are modified by `DROP [TABLE|DATABASE]` and `CREATE [TABLE]` operations.
179    /// But the modifications are atomic, so it does not conflict with the procedure.
180    /// It only abort the procedure sometimes since the `check_heartbeat_collected_region_ids` fails.
181    fn lock_key(&self) -> LockKey {
182        let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
183        LockKey::new(vec![lock_key])
184    }
185}
186
187#[cfg(test)]
188mod tests {
189    use std::assert_matches;
190
191    use common_wal::maybe_skip_kafka_integration_test;
192    use common_wal::test_util::get_kafka_endpoints;
193    use rskafka::client::partition::{FetchResult, UnknownTopicHandling};
194    use rskafka::record::Record;
195
196    use super::*;
197    use crate::procedure::test_util::new_wal_prune_metadata;
198    // Fix this import to correctly point to the test_util module
199    use crate::procedure::wal_prune::test_util::TestEnv;
200
201    /// Mock a test env for testing.
202    /// Including:
203    /// 1. Prepare some data in the table metadata manager and in-memory kv backend.
204    /// 2. Return the procedure, the minimum last entry id to prune and the regions to flush.
205    async fn mock_test_data(context: Context, topic: &str) -> u64 {
206        let n_region = 10;
207        let n_table = 5;
208        // 5 entries per region.
209        let offsets = mock_wal_entries(
210            context.client.clone(),
211            topic,
212            (n_region * n_table * 5) as usize,
213        )
214        .await;
215
216        new_wal_prune_metadata(
217            context.table_metadata_manager.clone(),
218            context.leader_region_registry.clone(),
219            n_region,
220            n_table,
221            &offsets,
222            topic.to_string(),
223        )
224        .await
225    }
226
227    fn record(i: usize) -> Record {
228        let key = format!("key_{i}");
229        let value = format!("value_{i}");
230        Record {
231            key: Some(key.into()),
232            value: Some(value.into()),
233            timestamp: chrono::Utc::now(),
234            headers: Default::default(),
235        }
236    }
237
238    async fn mock_wal_entries(
239        client: KafkaClientRef,
240        topic_name: &str,
241        n_entries: usize,
242    ) -> Vec<i64> {
243        let controller_client = client.controller_client().unwrap();
244        let _ = controller_client
245            .create_topic(topic_name, 1, 1, 5_000)
246            .await;
247        let partition_client = client
248            .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
249            .await
250            .unwrap();
251        let mut offsets = Vec::with_capacity(n_entries);
252        for i in 0..n_entries {
253            let record = vec![record(i)];
254            let offset = partition_client
255                .produce(
256                    record,
257                    rskafka::client::partition::Compression::NoCompression,
258                )
259                .await
260                .unwrap()
261                .offsets;
262            offsets.extend(offset);
263        }
264        offsets
265    }
266
267    async fn check_entry_id_existence(
268        client: KafkaClientRef,
269        topic_name: &str,
270        entry_id: i64,
271        expect_success: bool,
272    ) {
273        let partition_client = client
274            .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
275            .await
276            .unwrap();
277        let res = partition_client
278            .fetch_records(entry_id, 0..10001, 5_000)
279            .await;
280        if expect_success {
281            assert!(res.is_ok());
282            let FetchResult { records, .. } = res.unwrap();
283            assert!(!records.is_empty());
284        } else {
285            let err = res.unwrap_err();
286            // The error is in a private module so we check it through `to_string()`.
287            assert!(err.to_string().contains("OffsetOutOfRange"));
288        }
289    }
290
291    async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
292        let controller_client = client.controller_client().unwrap();
293        controller_client
294            .delete_topic(topic_name, 5_000)
295            .await
296            .unwrap();
297    }
298
299    #[tokio::test]
300    async fn test_procedure_execution() {
301        maybe_skip_kafka_integration_test!();
302        let broker_endpoints = get_kafka_endpoints();
303
304        common_telemetry::init_default_ut_logging();
305        let mut topic_name = uuid::Uuid::new_v4().to_string();
306        // Topic should start with a letter.
307        topic_name = format!("test_procedure_execution-{}", topic_name);
308        let env = TestEnv::new();
309        let context = env.build_wal_prune_context(broker_endpoints).await;
310        // Prepare the topic.
311        TestEnv::prepare_topic(&context.client, &topic_name).await;
312
313        // Mock the test data.
314        let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await;
315        let mut procedure =
316            WalPruneProcedure::new(context.clone(), None, topic_name.clone(), prunable_entry_id);
317        let status = procedure.on_prune().await.unwrap();
318        assert_matches!(status, Status::Done { output: None });
319        // Check if the entry ids after(include) `prunable_entry_id` still exist.
320        check_entry_id_existence(
321            procedure.context.client.clone(),
322            &topic_name,
323            procedure.data.prunable_entry_id as i64,
324            true,
325        )
326        .await;
327        // Check if the entry ids before `prunable_entry_id` are deleted.
328        check_entry_id_existence(
329            procedure.context.client.clone(),
330            &topic_name,
331            procedure.data.prunable_entry_id as i64 - 1,
332            false,
333        )
334        .await;
335
336        let value = env
337            .table_metadata_manager
338            .topic_name_manager()
339            .get(&topic_name)
340            .await
341            .unwrap()
342            .unwrap();
343        assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
344        // Clean up the topic.
345        delete_topic(procedure.context.client, &topic_name).await;
346    }
347}