meta_srv/procedure/
wal_prune.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub(crate) mod manager;
16#[cfg(test)]
17mod test_util;
18pub(crate) mod utils;
19
20use std::sync::Arc;
21
22use common_error::ext::BoxedError;
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::lock_key::RemoteWalLock;
25use common_meta::region_registry::LeaderRegionRegistryRef;
26use common_procedure::error::ToJsonSnafu;
27use common_procedure::{
28    Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
29    Result as ProcedureResult, Status, StringKey,
30};
31use common_telemetry::{info, warn};
32use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
33use rskafka::client::Client;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt;
36use store_api::logstore::EntryId;
37
38use crate::error::{self};
39use crate::procedure::wal_prune::utils::{
40    delete_records, get_offsets_for_topic, get_partition_client, update_pruned_entry_id,
41};
42use crate::Result;
43
44pub type KafkaClientRef = Arc<Client>;
45
46#[derive(Clone)]
47pub struct Context {
48    /// The Kafka client.
49    pub client: KafkaClientRef,
50    /// The table metadata manager.
51    pub table_metadata_manager: TableMetadataManagerRef,
52    /// The leader region registry.
53    pub leader_region_registry: LeaderRegionRegistryRef,
54}
55
56/// The data of WAL pruning.
57#[derive(Serialize, Deserialize)]
58pub struct WalPruneData {
59    /// The topic name to prune.
60    pub topic: String,
61    /// The minimum flush entry id for topic, which is used to prune the WAL.
62    pub prunable_entry_id: EntryId,
63}
64
65/// The procedure to prune WAL.
66pub struct WalPruneProcedure {
67    pub data: WalPruneData,
68    pub context: Context,
69    pub _guard: Option<WalPruneProcedureGuard>,
70}
71
72impl WalPruneProcedure {
73    const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
74
75    pub fn new(
76        context: Context,
77        guard: Option<WalPruneProcedureGuard>,
78        topic: String,
79        prunable_entry_id: u64,
80    ) -> Self {
81        Self {
82            data: WalPruneData {
83                topic,
84                prunable_entry_id,
85            },
86            context,
87            _guard: guard,
88        }
89    }
90
91    pub fn from_json(
92        json: &str,
93        context: &Context,
94        tracker: WalPruneProcedureTracker,
95    ) -> ProcedureResult<Self> {
96        let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
97        let guard = tracker.insert_running_procedure(data.topic.clone());
98        Ok(Self {
99            data,
100            context: context.clone(),
101            _guard: guard,
102        })
103    }
104
105    /// Prune the WAL and persist the minimum prunable entry id.
106    ///
107    /// Retry:
108    /// - Failed to update the minimum prunable entry id in kvbackend.
109    /// - Failed to delete records.
110    pub async fn on_prune(&mut self) -> Result<Status> {
111        let partition_client = get_partition_client(&self.context.client, &self.data.topic).await?;
112        let (earliest_offset, latest_offset) =
113            get_offsets_for_topic(&partition_client, &self.data.topic).await?;
114        if self.data.prunable_entry_id <= earliest_offset {
115            warn!(
116                "The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}, latest offset: {}",
117                self.data.topic,
118                self.data.prunable_entry_id,
119                earliest_offset,
120                latest_offset
121            );
122            return Ok(Status::done());
123        }
124
125        // Delete records.
126        delete_records(
127            &partition_client,
128            &self.data.topic,
129            self.data.prunable_entry_id,
130        )
131        .await
132        .map_err(BoxedError::new)
133        .with_context(|_| error::RetryLaterWithSourceSnafu {
134            reason: format!(
135                "Failed to delete records for topic: {}, prunable entry id: {}, latest offset: {}",
136                self.data.topic, self.data.prunable_entry_id, latest_offset
137            ),
138        })?;
139
140        // Update the pruned entry id for the topic.
141        update_pruned_entry_id(
142            &self.context.table_metadata_manager,
143            &self.data.topic,
144            self.data.prunable_entry_id,
145        )
146        .await
147        .map_err(BoxedError::new)
148        .with_context(|_| error::RetryLaterWithSourceSnafu {
149            reason: format!(
150                "Failed to update pruned entry id for topic: {}",
151                self.data.topic
152            ),
153        })?;
154
155        info!(
156            "Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}",
157            self.data.topic, self.data.prunable_entry_id, latest_offset
158        );
159        Ok(Status::done())
160    }
161}
162
163#[async_trait::async_trait]
164impl Procedure for WalPruneProcedure {
165    fn type_name(&self) -> &str {
166        Self::TYPE_NAME
167    }
168
169    fn rollback_supported(&self) -> bool {
170        false
171    }
172
173    async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
174        self.on_prune().await.map_err(|e| {
175            if e.is_retryable() {
176                ProcedureError::retry_later(e)
177            } else {
178                ProcedureError::external(e)
179            }
180        })
181    }
182
183    fn dump(&self) -> ProcedureResult<String> {
184        serde_json::to_string(&self.data).context(ToJsonSnafu)
185    }
186
187    /// WAL prune procedure will read the topic-region map from the table metadata manager,
188    /// which are modified by `DROP [TABLE|DATABASE]` and `CREATE [TABLE]` operations.
189    /// But the modifications are atomic, so it does not conflict with the procedure.
190    /// It only abort the procedure sometimes since the `check_heartbeat_collected_region_ids` fails.
191    fn lock_key(&self) -> LockKey {
192        let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
193        LockKey::new(vec![lock_key])
194    }
195}
196
197#[cfg(test)]
198mod tests {
199    use std::assert_matches::assert_matches;
200
201    use common_wal::maybe_skip_kafka_integration_test;
202    use common_wal::test_util::get_kafka_endpoints;
203    use rskafka::client::partition::{FetchResult, UnknownTopicHandling};
204    use rskafka::record::Record;
205
206    use super::*;
207    use crate::procedure::test_util::new_wal_prune_metadata;
208    // Fix this import to correctly point to the test_util module
209    use crate::procedure::wal_prune::test_util::TestEnv;
210
211    /// Mock a test env for testing.
212    /// Including:
213    /// 1. Prepare some data in the table metadata manager and in-memory kv backend.
214    /// 2. Return the procedure, the minimum last entry id to prune and the regions to flush.
215    async fn mock_test_data(context: Context, topic: &str) -> u64 {
216        let n_region = 10;
217        let n_table = 5;
218        // 5 entries per region.
219        let offsets = mock_wal_entries(
220            context.client.clone(),
221            topic,
222            (n_region * n_table * 5) as usize,
223        )
224        .await;
225        let prunable_entry_id = new_wal_prune_metadata(
226            context.table_metadata_manager.clone(),
227            context.leader_region_registry.clone(),
228            n_region,
229            n_table,
230            &offsets,
231            topic.to_string(),
232        )
233        .await;
234        prunable_entry_id
235    }
236
237    fn record(i: usize) -> Record {
238        let key = format!("key_{i}");
239        let value = format!("value_{i}");
240        Record {
241            key: Some(key.into()),
242            value: Some(value.into()),
243            timestamp: chrono::Utc::now(),
244            headers: Default::default(),
245        }
246    }
247
248    async fn mock_wal_entries(
249        client: KafkaClientRef,
250        topic_name: &str,
251        n_entries: usize,
252    ) -> Vec<i64> {
253        let controller_client = client.controller_client().unwrap();
254        let _ = controller_client
255            .create_topic(topic_name, 1, 1, 5_000)
256            .await;
257        let partition_client = client
258            .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
259            .await
260            .unwrap();
261        let mut offsets = Vec::with_capacity(n_entries);
262        for i in 0..n_entries {
263            let record = vec![record(i)];
264            let offset = partition_client
265                .produce(
266                    record,
267                    rskafka::client::partition::Compression::NoCompression,
268                )
269                .await
270                .unwrap()
271                .offsets;
272            offsets.extend(offset);
273        }
274        offsets
275    }
276
277    async fn check_entry_id_existence(
278        client: KafkaClientRef,
279        topic_name: &str,
280        entry_id: i64,
281        expect_success: bool,
282    ) {
283        let partition_client = client
284            .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
285            .await
286            .unwrap();
287        let res = partition_client
288            .fetch_records(entry_id, 0..10001, 5_000)
289            .await;
290        if expect_success {
291            assert!(res.is_ok());
292            let FetchResult { records, .. } = res.unwrap();
293            assert!(!records.is_empty());
294        } else {
295            let err = res.unwrap_err();
296            // The error is in a private module so we check it through `to_string()`.
297            assert!(err.to_string().contains("OffsetOutOfRange"));
298        }
299    }
300
301    async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
302        let controller_client = client.controller_client().unwrap();
303        controller_client
304            .delete_topic(topic_name, 5_000)
305            .await
306            .unwrap();
307    }
308
309    #[tokio::test]
310    async fn test_procedure_execution() {
311        maybe_skip_kafka_integration_test!();
312        let broker_endpoints = get_kafka_endpoints();
313
314        common_telemetry::init_default_ut_logging();
315        let mut topic_name = uuid::Uuid::new_v4().to_string();
316        // Topic should start with a letter.
317        topic_name = format!("test_procedure_execution-{}", topic_name);
318        let env = TestEnv::new();
319        let context = env.build_wal_prune_context(broker_endpoints).await;
320        // Prepare the topic.
321        TestEnv::prepare_topic(&context.client, &topic_name).await;
322
323        // Mock the test data.
324        let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await;
325        let mut procedure =
326            WalPruneProcedure::new(context.clone(), None, topic_name.clone(), prunable_entry_id);
327        let status = procedure.on_prune().await.unwrap();
328        assert_matches!(status, Status::Done { output: None });
329        // Check if the entry ids after(include) `prunable_entry_id` still exist.
330        check_entry_id_existence(
331            procedure.context.client.clone(),
332            &topic_name,
333            procedure.data.prunable_entry_id as i64,
334            true,
335        )
336        .await;
337        // Check if the entry ids before `prunable_entry_id` are deleted.
338        check_entry_id_existence(
339            procedure.context.client.clone(),
340            &topic_name,
341            procedure.data.prunable_entry_id as i64 - 1,
342            false,
343        )
344        .await;
345
346        let value = env
347            .table_metadata_manager
348            .topic_name_manager()
349            .get(&topic_name)
350            .await
351            .unwrap()
352            .unwrap();
353        assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
354        // Clean up the topic.
355        delete_topic(procedure.context.client, &topic_name).await;
356    }
357}