Skip to main content

meta_srv/procedure/
wal_prune.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod manager;
16#[cfg(test)]
17mod test_util;
18pub(crate) mod utils;
19
20use std::sync::Arc;
21
22use common_error::ext::BoxedError;
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::lock_key::RemoteWalLock;
25use common_meta::region_registry::LeaderRegionRegistryRef;
26use common_procedure::error::ToJsonSnafu;
27use common_procedure::{
28    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
29    Result as ProcedureResult, Status, StringKey,
30};
31use common_telemetry::{info, warn};
32use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
33use rskafka::client::Client;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt;
36use store_api::logstore::EntryId;
37
38use crate::Result;
39use crate::error::{self};
40use crate::procedure::wal_prune::utils::{
41    delete_records, get_offsets_for_topic, get_partition_client, update_pruned_entry_id,
42};
43
44pub type KafkaClientRef = Arc<Client>;
45
46#[derive(Clone)]
47pub struct Context {
48    /// The Kafka client.
49    pub client: KafkaClientRef,
50    /// The table metadata manager.
51    pub table_metadata_manager: TableMetadataManagerRef,
52    /// The leader region registry.
53    pub leader_region_registry: LeaderRegionRegistryRef,
54}
55
56/// The data of WAL pruning.
57#[derive(Serialize, Deserialize)]
58pub struct WalPruneData {
59    /// The topic name to prune.
60    pub topic: String,
61    /// The minimum flush entry id for topic, which is used to prune the WAL.
62    pub prunable_entry_id: EntryId,
63    /// Whether pruning only updates metadata and skips Kafka DeleteRecords.
64    #[serde(default)]
65    pub logical_delete: bool,
66}
67
68/// The procedure to prune WAL.
69pub struct WalPruneProcedure {
70    pub data: WalPruneData,
71    pub context: Context,
72    pub _guard: Option<WalPruneProcedureGuard>,
73}
74
75impl WalPruneProcedure {
76    const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
77
78    pub fn new(
79        context: Context,
80        guard: Option<WalPruneProcedureGuard>,
81        topic: String,
82        prunable_entry_id: u64,
83        logical_delete: bool,
84    ) -> Self {
85        Self {
86            data: WalPruneData {
87                topic,
88                prunable_entry_id,
89                logical_delete,
90            },
91            context,
92            _guard: guard,
93        }
94    }
95
96    pub fn from_json(
97        json: &str,
98        context: &Context,
99        tracker: WalPruneProcedureTracker,
100    ) -> ProcedureResult<Self> {
101        let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
102        let guard = tracker.insert_running_procedure(data.topic.clone());
103        Ok(Self {
104            data,
105            context: context.clone(),
106            _guard: guard,
107        })
108    }
109
110    /// Prune the WAL and persist the minimum prunable entry id.
111    ///
112    /// Retry:
113    /// - Kafka client errors that have exhausted rskafka's internal retry.
114    /// - Failed to update the pruned entry id in the table metadata manager.
115    pub async fn on_prune(&mut self) -> Result<Status> {
116        let partition_client = get_partition_client(&self.context.client, &self.data.topic).await?;
117        let (earliest_offset, latest_offset) =
118            get_offsets_for_topic(&partition_client, &self.data.topic).await?;
119        if self.data.prunable_entry_id <= earliest_offset {
120            warn!(
121                "The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}, latest offset: {}",
122                self.data.topic, self.data.prunable_entry_id, earliest_offset, latest_offset
123            );
124            return Ok(Status::done());
125        }
126
127        if self.data.logical_delete {
128            info!(
129                "Skipping physical deletion of records for logical WAL pruning, topic: {}, prunable entry id: {}",
130                self.data.topic, self.data.prunable_entry_id
131            );
132        } else {
133            // Delete records.
134            delete_records(
135                &partition_client,
136                &self.data.topic,
137                self.data.prunable_entry_id,
138            )
139            .await?;
140        }
141
142        // Update the pruned entry id for the topic.
143        update_pruned_entry_id(
144            &self.context.table_metadata_manager,
145            &self.data.topic,
146            self.data.prunable_entry_id,
147        )
148        .await
149        .map_err(BoxedError::new)
150        .with_context(|_| error::RetryLaterWithSourceSnafu {
151            reason: format!(
152                "Failed to update pruned entry id for topic: {}",
153                self.data.topic
154            ),
155        })?;
156
157        info!(
158            "Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}",
159            self.data.topic, self.data.prunable_entry_id, latest_offset
160        );
161        Ok(Status::done())
162    }
163}
164
165#[async_trait::async_trait]
166impl Procedure for WalPruneProcedure {
167    fn type_name(&self) -> &str {
168        Self::TYPE_NAME
169    }
170
171    fn rollback_supported(&self) -> bool {
172        false
173    }
174
175    async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
176        let _guard = ctx
177            .provider
178            .acquire_lock(&(RemoteWalLock::Write(self.data.topic.clone()).into()))
179            .await;
180
181        self.on_prune().await.map_err(|e| {
182            if e.is_retryable() {
183                ProcedureError::retry_later(e)
184            } else {
185                ProcedureError::external(e)
186            }
187        })
188    }
189
190    fn dump(&self) -> ProcedureResult<String> {
191        serde_json::to_string(&self.data).context(ToJsonSnafu)
192    }
193
194    /// WAL prune procedure will read the topic-region map from the table metadata manager,
195    /// which are modified by `DROP [TABLE|DATABASE]` and `CREATE [TABLE]` operations.
196    /// But the modifications are atomic, so it does not conflict with the procedure.
197    /// It only abort the procedure sometimes since the `check_heartbeat_collected_region_ids` fails.
198    fn lock_key(&self) -> LockKey {
199        let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
200        LockKey::new(vec![lock_key])
201    }
202}
203
204#[cfg(test)]
205mod tests {
206    use std::assert_matches;
207
208    use common_wal::maybe_skip_kafka_integration_test;
209    use common_wal::test_util::get_kafka_endpoints;
210    use rskafka::client::partition::{FetchResult, UnknownTopicHandling};
211    use rskafka::record::Record;
212
213    use super::*;
214    use crate::procedure::test_util::new_wal_prune_metadata;
215    // Fix this import to correctly point to the test_util module
216    use crate::procedure::wal_prune::test_util::TestEnv;
217
218    /// Mock a test env for testing.
219    /// Including:
220    /// 1. Prepare some data in the table metadata manager and in-memory kv backend.
221    /// 2. Return the procedure, the minimum last entry id to prune and the regions to flush.
222    async fn mock_test_data(context: Context, topic: &str) -> u64 {
223        let n_region = 10;
224        let n_table = 5;
225        // 5 entries per region.
226        let offsets = mock_wal_entries(
227            context.client.clone(),
228            topic,
229            (n_region * n_table * 5) as usize,
230        )
231        .await;
232
233        new_wal_prune_metadata(
234            context.table_metadata_manager.clone(),
235            context.leader_region_registry.clone(),
236            n_region,
237            n_table,
238            &offsets[1..],
239            topic.to_string(),
240        )
241        .await
242    }
243
244    fn record(i: usize) -> Record {
245        let key = format!("key_{i}");
246        let value = format!("value_{i}");
247        Record {
248            key: Some(key.into()),
249            value: Some(value.into()),
250            timestamp: chrono::Utc::now(),
251            headers: Default::default(),
252        }
253    }
254
255    async fn mock_wal_entries(
256        client: KafkaClientRef,
257        topic_name: &str,
258        n_entries: usize,
259    ) -> Vec<i64> {
260        let controller_client = client.controller_client().unwrap();
261        let _ = controller_client
262            .create_topic(topic_name, 1, 1, 5_000)
263            .await;
264        let partition_client = client
265            .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
266            .await
267            .unwrap();
268        let mut offsets = Vec::with_capacity(n_entries);
269        for i in 0..n_entries {
270            let record = vec![record(i)];
271            let offset = partition_client
272                .produce(
273                    record,
274                    rskafka::client::partition::Compression::NoCompression,
275                )
276                .await
277                .unwrap()
278                .offsets;
279            offsets.extend(offset);
280        }
281        offsets
282    }
283
284    async fn check_entry_id_existence(
285        client: KafkaClientRef,
286        topic_name: &str,
287        entry_id: i64,
288        expect_success: bool,
289    ) {
290        let partition_client = client
291            .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
292            .await
293            .unwrap();
294        let res = partition_client
295            .fetch_records(entry_id, 0..10001, 5_000)
296            .await;
297        if expect_success {
298            assert!(res.is_ok());
299            let FetchResult { records, .. } = res.unwrap();
300            assert!(!records.is_empty());
301        } else {
302            let err = res.unwrap_err();
303            // The error is in a private module so we check it through `to_string()`.
304            assert!(err.to_string().contains("OffsetOutOfRange"));
305        }
306    }
307
308    async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
309        let controller_client = client.controller_client().unwrap();
310        controller_client
311            .delete_topic(topic_name, 5_000)
312            .await
313            .unwrap();
314    }
315
316    #[test]
317    fn test_wal_prune_data_backward_compatibility() {
318        let data: WalPruneData =
319            serde_json::from_str(r#"{"topic":"test_topic","prunable_entry_id":42}"#).unwrap();
320
321        assert_eq!(data.topic, "test_topic");
322        assert_eq!(data.prunable_entry_id, 42);
323        assert!(!data.logical_delete);
324    }
325
326    #[tokio::test]
327    async fn test_procedure_execution() {
328        maybe_skip_kafka_integration_test!();
329        let broker_endpoints = get_kafka_endpoints();
330
331        common_telemetry::init_default_ut_logging();
332        let mut topic_name = uuid::Uuid::new_v4().to_string();
333        // Topic should start with a letter.
334        topic_name = format!("test_procedure_execution-{}", topic_name);
335        let env = TestEnv::new();
336        let context = env.build_wal_prune_context(broker_endpoints).await;
337        // Prepare the topic.
338        TestEnv::prepare_topic(&context.client, &topic_name).await;
339
340        // Mock the test data.
341        let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await;
342        let mut procedure = WalPruneProcedure::new(
343            context.clone(),
344            None,
345            topic_name.clone(),
346            prunable_entry_id,
347            false,
348        );
349        let status = procedure.on_prune().await.unwrap();
350        assert_matches!(status, Status::Done { output: None });
351        // Check if the entry ids after(include) `prunable_entry_id` still exist.
352        check_entry_id_existence(
353            procedure.context.client.clone(),
354            &topic_name,
355            procedure.data.prunable_entry_id as i64,
356            true,
357        )
358        .await;
359        // Check if the entry ids before `prunable_entry_id` are deleted.
360        check_entry_id_existence(
361            procedure.context.client.clone(),
362            &topic_name,
363            procedure.data.prunable_entry_id as i64 - 1,
364            false,
365        )
366        .await;
367
368        let value = env
369            .table_metadata_manager
370            .topic_name_manager()
371            .get(&topic_name)
372            .await
373            .unwrap()
374            .unwrap();
375        assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
376        // Clean up the topic.
377        delete_topic(procedure.context.client, &topic_name).await;
378    }
379
380    #[tokio::test]
381    async fn test_procedure_execution_with_logical_delete() {
382        maybe_skip_kafka_integration_test!();
383        let broker_endpoints = get_kafka_endpoints();
384
385        common_telemetry::init_default_ut_logging();
386        let mut topic_name = uuid::Uuid::new_v4().to_string();
387        // Topic should start with a letter.
388        topic_name = format!("test_procedure_execution_with_logical_delete-{topic_name}");
389        let env = TestEnv::new();
390        let context = env.build_wal_prune_context(broker_endpoints).await;
391        // Prepare the topic.
392        TestEnv::prepare_topic(&context.client, &topic_name).await;
393
394        // Mock the test data.
395        let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await;
396        let mut procedure = WalPruneProcedure::new(
397            context.clone(),
398            None,
399            topic_name.clone(),
400            prunable_entry_id,
401            true,
402        );
403        let status = procedure.on_prune().await.unwrap();
404        assert_matches!(status, Status::Done { output: None });
405        // Logical delete should keep the entry ids before `prunable_entry_id`.
406        check_entry_id_existence(
407            procedure.context.client.clone(),
408            &topic_name,
409            procedure.data.prunable_entry_id as i64 - 1,
410            true,
411        )
412        .await;
413
414        let value = env
415            .table_metadata_manager
416            .topic_name_manager()
417            .get(&topic_name)
418            .await
419            .unwrap()
420            .unwrap();
421        assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
422        // Clean up the topic.
423        delete_topic(procedure.context.client, &topic_name).await;
424    }
425}