meta_srv/procedure/
wal_prune.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod manager;
16#[cfg(test)]
17mod test_util;
18pub(crate) mod utils;
19
20use std::sync::Arc;
21
22use common_error::ext::BoxedError;
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::lock_key::RemoteWalLock;
25use common_meta::region_registry::LeaderRegionRegistryRef;
26use common_procedure::error::ToJsonSnafu;
27use common_procedure::{
28    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
29    Result as ProcedureResult, Status, StringKey,
30};
31use common_telemetry::{info, warn};
32use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
33use rskafka::client::Client;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt;
36use store_api::logstore::EntryId;
37
38use crate::Result;
39use crate::error::{self};
40use crate::procedure::wal_prune::utils::{
41    delete_records, get_offsets_for_topic, get_partition_client, update_pruned_entry_id,
42};
43
44pub type KafkaClientRef = Arc<Client>;
45
46#[derive(Clone)]
47pub struct Context {
48    /// The Kafka client.
49    pub client: KafkaClientRef,
50    /// The table metadata manager.
51    pub table_metadata_manager: TableMetadataManagerRef,
52    /// The leader region registry.
53    pub leader_region_registry: LeaderRegionRegistryRef,
54}
55
56/// The data of WAL pruning.
57#[derive(Serialize, Deserialize)]
58pub struct WalPruneData {
59    /// The topic name to prune.
60    pub topic: String,
61    /// The minimum flush entry id for topic, which is used to prune the WAL.
62    pub prunable_entry_id: EntryId,
63}
64
65/// The procedure to prune WAL.
66pub struct WalPruneProcedure {
67    pub data: WalPruneData,
68    pub context: Context,
69    pub _guard: Option<WalPruneProcedureGuard>,
70}
71
72impl WalPruneProcedure {
73    const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
74
75    pub fn new(
76        context: Context,
77        guard: Option<WalPruneProcedureGuard>,
78        topic: String,
79        prunable_entry_id: u64,
80    ) -> Self {
81        Self {
82            data: WalPruneData {
83                topic,
84                prunable_entry_id,
85            },
86            context,
87            _guard: guard,
88        }
89    }
90
91    pub fn from_json(
92        json: &str,
93        context: &Context,
94        tracker: WalPruneProcedureTracker,
95    ) -> ProcedureResult<Self> {
96        let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
97        let guard = tracker.insert_running_procedure(data.topic.clone());
98        Ok(Self {
99            data,
100            context: context.clone(),
101            _guard: guard,
102        })
103    }
104
105    /// Prune the WAL and persist the minimum prunable entry id.
106    ///
107    /// Retry:
108    /// - Failed to update the minimum prunable entry id in kvbackend.
109    /// - Failed to delete records.
110    pub async fn on_prune(&mut self) -> Result<Status> {
111        let partition_client = get_partition_client(&self.context.client, &self.data.topic).await?;
112        let (earliest_offset, latest_offset) =
113            get_offsets_for_topic(&partition_client, &self.data.topic).await?;
114        if self.data.prunable_entry_id <= earliest_offset {
115            warn!(
116                "The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}, latest offset: {}",
117                self.data.topic, self.data.prunable_entry_id, earliest_offset, latest_offset
118            );
119            return Ok(Status::done());
120        }
121
122        // Delete records.
123        delete_records(
124            &partition_client,
125            &self.data.topic,
126            self.data.prunable_entry_id,
127        )
128        .await
129        .map_err(BoxedError::new)
130        .with_context(|_| error::RetryLaterWithSourceSnafu {
131            reason: format!(
132                "Failed to delete records for topic: {}, prunable entry id: {}, latest offset: {}",
133                self.data.topic, self.data.prunable_entry_id, latest_offset
134            ),
135        })?;
136
137        // Update the pruned entry id for the topic.
138        update_pruned_entry_id(
139            &self.context.table_metadata_manager,
140            &self.data.topic,
141            self.data.prunable_entry_id,
142        )
143        .await
144        .map_err(BoxedError::new)
145        .with_context(|_| error::RetryLaterWithSourceSnafu {
146            reason: format!(
147                "Failed to update pruned entry id for topic: {}",
148                self.data.topic
149            ),
150        })?;
151
152        info!(
153            "Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}",
154            self.data.topic, self.data.prunable_entry_id, latest_offset
155        );
156        Ok(Status::done())
157    }
158}
159
160#[async_trait::async_trait]
161impl Procedure for WalPruneProcedure {
162    fn type_name(&self) -> &str {
163        Self::TYPE_NAME
164    }
165
166    fn rollback_supported(&self) -> bool {
167        false
168    }
169
170    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
171        self.on_prune().await.map_err(|e| {
172            if e.is_retryable() {
173                ProcedureError::retry_later(e)
174            } else {
175                ProcedureError::external(e)
176            }
177        })
178    }
179
180    fn dump(&self) -> ProcedureResult<String> {
181        serde_json::to_string(&self.data).context(ToJsonSnafu)
182    }
183
184    /// WAL prune procedure will read the topic-region map from the table metadata manager,
185    /// which are modified by `DROP [TABLE|DATABASE]` and `CREATE [TABLE]` operations.
186    /// But the modifications are atomic, so it does not conflict with the procedure.
187    /// It only abort the procedure sometimes since the `check_heartbeat_collected_region_ids` fails.
188    fn lock_key(&self) -> LockKey {
189        let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
190        LockKey::new(vec![lock_key])
191    }
192}
193
194#[cfg(test)]
195mod tests {
196    use std::assert_matches::assert_matches;
197
198    use common_wal::maybe_skip_kafka_integration_test;
199    use common_wal::test_util::get_kafka_endpoints;
200    use rskafka::client::partition::{FetchResult, UnknownTopicHandling};
201    use rskafka::record::Record;
202
203    use super::*;
204    use crate::procedure::test_util::new_wal_prune_metadata;
205    // Fix this import to correctly point to the test_util module
206    use crate::procedure::wal_prune::test_util::TestEnv;
207
208    /// Mock a test env for testing.
209    /// Including:
210    /// 1. Prepare some data in the table metadata manager and in-memory kv backend.
211    /// 2. Return the procedure, the minimum last entry id to prune and the regions to flush.
212    async fn mock_test_data(context: Context, topic: &str) -> u64 {
213        let n_region = 10;
214        let n_table = 5;
215        // 5 entries per region.
216        let offsets = mock_wal_entries(
217            context.client.clone(),
218            topic,
219            (n_region * n_table * 5) as usize,
220        )
221        .await;
222
223        new_wal_prune_metadata(
224            context.table_metadata_manager.clone(),
225            context.leader_region_registry.clone(),
226            n_region,
227            n_table,
228            &offsets,
229            topic.to_string(),
230        )
231        .await
232    }
233
234    fn record(i: usize) -> Record {
235        let key = format!("key_{i}");
236        let value = format!("value_{i}");
237        Record {
238            key: Some(key.into()),
239            value: Some(value.into()),
240            timestamp: chrono::Utc::now(),
241            headers: Default::default(),
242        }
243    }
244
245    async fn mock_wal_entries(
246        client: KafkaClientRef,
247        topic_name: &str,
248        n_entries: usize,
249    ) -> Vec<i64> {
250        let controller_client = client.controller_client().unwrap();
251        let _ = controller_client
252            .create_topic(topic_name, 1, 1, 5_000)
253            .await;
254        let partition_client = client
255            .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
256            .await
257            .unwrap();
258        let mut offsets = Vec::with_capacity(n_entries);
259        for i in 0..n_entries {
260            let record = vec![record(i)];
261            let offset = partition_client
262                .produce(
263                    record,
264                    rskafka::client::partition::Compression::NoCompression,
265                )
266                .await
267                .unwrap()
268                .offsets;
269            offsets.extend(offset);
270        }
271        offsets
272    }
273
274    async fn check_entry_id_existence(
275        client: KafkaClientRef,
276        topic_name: &str,
277        entry_id: i64,
278        expect_success: bool,
279    ) {
280        let partition_client = client
281            .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
282            .await
283            .unwrap();
284        let res = partition_client
285            .fetch_records(entry_id, 0..10001, 5_000)
286            .await;
287        if expect_success {
288            assert!(res.is_ok());
289            let FetchResult { records, .. } = res.unwrap();
290            assert!(!records.is_empty());
291        } else {
292            let err = res.unwrap_err();
293            // The error is in a private module so we check it through `to_string()`.
294            assert!(err.to_string().contains("OffsetOutOfRange"));
295        }
296    }
297
298    async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
299        let controller_client = client.controller_client().unwrap();
300        controller_client
301            .delete_topic(topic_name, 5_000)
302            .await
303            .unwrap();
304    }
305
306    #[tokio::test]
307    async fn test_procedure_execution() {
308        maybe_skip_kafka_integration_test!();
309        let broker_endpoints = get_kafka_endpoints();
310
311        common_telemetry::init_default_ut_logging();
312        let mut topic_name = uuid::Uuid::new_v4().to_string();
313        // Topic should start with a letter.
314        topic_name = format!("test_procedure_execution-{}", topic_name);
315        let env = TestEnv::new();
316        let context = env.build_wal_prune_context(broker_endpoints).await;
317        // Prepare the topic.
318        TestEnv::prepare_topic(&context.client, &topic_name).await;
319
320        // Mock the test data.
321        let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await;
322        let mut procedure =
323            WalPruneProcedure::new(context.clone(), None, topic_name.clone(), prunable_entry_id);
324        let status = procedure.on_prune().await.unwrap();
325        assert_matches!(status, Status::Done { output: None });
326        // Check if the entry ids after(include) `prunable_entry_id` still exist.
327        check_entry_id_existence(
328            procedure.context.client.clone(),
329            &topic_name,
330            procedure.data.prunable_entry_id as i64,
331            true,
332        )
333        .await;
334        // Check if the entry ids before `prunable_entry_id` are deleted.
335        check_entry_id_existence(
336            procedure.context.client.clone(),
337            &topic_name,
338            procedure.data.prunable_entry_id as i64 - 1,
339            false,
340        )
341        .await;
342
343        let value = env
344            .table_metadata_manager
345            .topic_name_manager()
346            .get(&topic_name)
347            .await
348            .unwrap()
349            .unwrap();
350        assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
351        // Clean up the topic.
352        delete_topic(procedure.context.client, &topic_name).await;
353    }
354}