1pub(crate) mod manager;
16#[cfg(test)]
17mod test_util;
18pub(crate) mod utils;
19
20use std::sync::Arc;
21
22use common_error::ext::BoxedError;
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::lock_key::RemoteWalLock;
25use common_meta::region_registry::LeaderRegionRegistryRef;
26use common_procedure::error::ToJsonSnafu;
27use common_procedure::{
28 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
29 Result as ProcedureResult, Status, StringKey,
30};
31use common_telemetry::{info, warn};
32use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
33use rskafka::client::Client;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt;
36use store_api::logstore::EntryId;
37
38use crate::Result;
39use crate::error::{self};
40use crate::procedure::wal_prune::utils::{
41 delete_records, get_offsets_for_topic, get_partition_client, update_pruned_entry_id,
42};
43
44pub type KafkaClientRef = Arc<Client>;
45
46#[derive(Clone)]
47pub struct Context {
48 pub client: KafkaClientRef,
50 pub table_metadata_manager: TableMetadataManagerRef,
52 pub leader_region_registry: LeaderRegionRegistryRef,
54}
55
56#[derive(Serialize, Deserialize)]
58pub struct WalPruneData {
59 pub topic: String,
61 pub prunable_entry_id: EntryId,
63 #[serde(default)]
65 pub logical_delete: bool,
66}
67
68pub struct WalPruneProcedure {
70 pub data: WalPruneData,
71 pub context: Context,
72 pub _guard: Option<WalPruneProcedureGuard>,
73}
74
75impl WalPruneProcedure {
76 const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
77
78 pub fn new(
79 context: Context,
80 guard: Option<WalPruneProcedureGuard>,
81 topic: String,
82 prunable_entry_id: u64,
83 logical_delete: bool,
84 ) -> Self {
85 Self {
86 data: WalPruneData {
87 topic,
88 prunable_entry_id,
89 logical_delete,
90 },
91 context,
92 _guard: guard,
93 }
94 }
95
96 pub fn from_json(
97 json: &str,
98 context: &Context,
99 tracker: WalPruneProcedureTracker,
100 ) -> ProcedureResult<Self> {
101 let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
102 let guard = tracker.insert_running_procedure(data.topic.clone());
103 Ok(Self {
104 data,
105 context: context.clone(),
106 _guard: guard,
107 })
108 }
109
110 pub async fn on_prune(&mut self) -> Result<Status> {
116 let partition_client = get_partition_client(&self.context.client, &self.data.topic).await?;
117 let (earliest_offset, latest_offset) =
118 get_offsets_for_topic(&partition_client, &self.data.topic).await?;
119 if self.data.prunable_entry_id <= earliest_offset {
120 warn!(
121 "The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}, latest offset: {}",
122 self.data.topic, self.data.prunable_entry_id, earliest_offset, latest_offset
123 );
124 return Ok(Status::done());
125 }
126
127 if self.data.logical_delete {
128 info!(
129 "Skipping physical deletion of records for logical WAL pruning, topic: {}, prunable entry id: {}",
130 self.data.topic, self.data.prunable_entry_id
131 );
132 } else {
133 delete_records(
135 &partition_client,
136 &self.data.topic,
137 self.data.prunable_entry_id,
138 )
139 .await?;
140 }
141
142 update_pruned_entry_id(
144 &self.context.table_metadata_manager,
145 &self.data.topic,
146 self.data.prunable_entry_id,
147 )
148 .await
149 .map_err(BoxedError::new)
150 .with_context(|_| error::RetryLaterWithSourceSnafu {
151 reason: format!(
152 "Failed to update pruned entry id for topic: {}",
153 self.data.topic
154 ),
155 })?;
156
157 info!(
158 "Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}",
159 self.data.topic, self.data.prunable_entry_id, latest_offset
160 );
161 Ok(Status::done())
162 }
163}
164
165#[async_trait::async_trait]
166impl Procedure for WalPruneProcedure {
167 fn type_name(&self) -> &str {
168 Self::TYPE_NAME
169 }
170
171 fn rollback_supported(&self) -> bool {
172 false
173 }
174
175 async fn execute(&mut self, ctx: &ProcedureContext) -> ProcedureResult<Status> {
176 let _guard = ctx
177 .provider
178 .acquire_lock(&(RemoteWalLock::Write(self.data.topic.clone()).into()))
179 .await;
180
181 self.on_prune().await.map_err(|e| {
182 if e.is_retryable() {
183 ProcedureError::retry_later(e)
184 } else {
185 ProcedureError::external(e)
186 }
187 })
188 }
189
190 fn dump(&self) -> ProcedureResult<String> {
191 serde_json::to_string(&self.data).context(ToJsonSnafu)
192 }
193
194 fn lock_key(&self) -> LockKey {
199 let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
200 LockKey::new(vec![lock_key])
201 }
202}
203
204#[cfg(test)]
205mod tests {
206 use std::assert_matches;
207
208 use common_wal::maybe_skip_kafka_integration_test;
209 use common_wal::test_util::get_kafka_endpoints;
210 use rskafka::client::partition::{FetchResult, UnknownTopicHandling};
211 use rskafka::record::Record;
212
213 use super::*;
214 use crate::procedure::test_util::new_wal_prune_metadata;
215 use crate::procedure::wal_prune::test_util::TestEnv;
217
218 async fn mock_test_data(context: Context, topic: &str) -> u64 {
223 let n_region = 10;
224 let n_table = 5;
225 let offsets = mock_wal_entries(
227 context.client.clone(),
228 topic,
229 (n_region * n_table * 5) as usize,
230 )
231 .await;
232
233 new_wal_prune_metadata(
234 context.table_metadata_manager.clone(),
235 context.leader_region_registry.clone(),
236 n_region,
237 n_table,
238 &offsets[1..],
239 topic.to_string(),
240 )
241 .await
242 }
243
244 fn record(i: usize) -> Record {
245 let key = format!("key_{i}");
246 let value = format!("value_{i}");
247 Record {
248 key: Some(key.into()),
249 value: Some(value.into()),
250 timestamp: chrono::Utc::now(),
251 headers: Default::default(),
252 }
253 }
254
255 async fn mock_wal_entries(
256 client: KafkaClientRef,
257 topic_name: &str,
258 n_entries: usize,
259 ) -> Vec<i64> {
260 let controller_client = client.controller_client().unwrap();
261 let _ = controller_client
262 .create_topic(topic_name, 1, 1, 5_000)
263 .await;
264 let partition_client = client
265 .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
266 .await
267 .unwrap();
268 let mut offsets = Vec::with_capacity(n_entries);
269 for i in 0..n_entries {
270 let record = vec![record(i)];
271 let offset = partition_client
272 .produce(
273 record,
274 rskafka::client::partition::Compression::NoCompression,
275 )
276 .await
277 .unwrap()
278 .offsets;
279 offsets.extend(offset);
280 }
281 offsets
282 }
283
284 async fn check_entry_id_existence(
285 client: KafkaClientRef,
286 topic_name: &str,
287 entry_id: i64,
288 expect_success: bool,
289 ) {
290 let partition_client = client
291 .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
292 .await
293 .unwrap();
294 let res = partition_client
295 .fetch_records(entry_id, 0..10001, 5_000)
296 .await;
297 if expect_success {
298 assert!(res.is_ok());
299 let FetchResult { records, .. } = res.unwrap();
300 assert!(!records.is_empty());
301 } else {
302 let err = res.unwrap_err();
303 assert!(err.to_string().contains("OffsetOutOfRange"));
305 }
306 }
307
308 async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
309 let controller_client = client.controller_client().unwrap();
310 controller_client
311 .delete_topic(topic_name, 5_000)
312 .await
313 .unwrap();
314 }
315
316 #[test]
317 fn test_wal_prune_data_backward_compatibility() {
318 let data: WalPruneData =
319 serde_json::from_str(r#"{"topic":"test_topic","prunable_entry_id":42}"#).unwrap();
320
321 assert_eq!(data.topic, "test_topic");
322 assert_eq!(data.prunable_entry_id, 42);
323 assert!(!data.logical_delete);
324 }
325
326 #[tokio::test]
327 async fn test_procedure_execution() {
328 maybe_skip_kafka_integration_test!();
329 let broker_endpoints = get_kafka_endpoints();
330
331 common_telemetry::init_default_ut_logging();
332 let mut topic_name = uuid::Uuid::new_v4().to_string();
333 topic_name = format!("test_procedure_execution-{}", topic_name);
335 let env = TestEnv::new();
336 let context = env.build_wal_prune_context(broker_endpoints).await;
337 TestEnv::prepare_topic(&context.client, &topic_name).await;
339
340 let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await;
342 let mut procedure = WalPruneProcedure::new(
343 context.clone(),
344 None,
345 topic_name.clone(),
346 prunable_entry_id,
347 false,
348 );
349 let status = procedure.on_prune().await.unwrap();
350 assert_matches!(status, Status::Done { output: None });
351 check_entry_id_existence(
353 procedure.context.client.clone(),
354 &topic_name,
355 procedure.data.prunable_entry_id as i64,
356 true,
357 )
358 .await;
359 check_entry_id_existence(
361 procedure.context.client.clone(),
362 &topic_name,
363 procedure.data.prunable_entry_id as i64 - 1,
364 false,
365 )
366 .await;
367
368 let value = env
369 .table_metadata_manager
370 .topic_name_manager()
371 .get(&topic_name)
372 .await
373 .unwrap()
374 .unwrap();
375 assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
376 delete_topic(procedure.context.client, &topic_name).await;
378 }
379
380 #[tokio::test]
381 async fn test_procedure_execution_with_logical_delete() {
382 maybe_skip_kafka_integration_test!();
383 let broker_endpoints = get_kafka_endpoints();
384
385 common_telemetry::init_default_ut_logging();
386 let mut topic_name = uuid::Uuid::new_v4().to_string();
387 topic_name = format!("test_procedure_execution_with_logical_delete-{topic_name}");
389 let env = TestEnv::new();
390 let context = env.build_wal_prune_context(broker_endpoints).await;
391 TestEnv::prepare_topic(&context.client, &topic_name).await;
393
394 let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await;
396 let mut procedure = WalPruneProcedure::new(
397 context.clone(),
398 None,
399 topic_name.clone(),
400 prunable_entry_id,
401 true,
402 );
403 let status = procedure.on_prune().await.unwrap();
404 assert_matches!(status, Status::Done { output: None });
405 check_entry_id_existence(
407 procedure.context.client.clone(),
408 &topic_name,
409 procedure.data.prunable_entry_id as i64 - 1,
410 true,
411 )
412 .await;
413
414 let value = env
415 .table_metadata_manager
416 .topic_name_manager()
417 .get(&topic_name)
418 .await
419 .unwrap()
420 .unwrap();
421 assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
422 delete_topic(procedure.context.client, &topic_name).await;
424 }
425}