1pub(crate) mod manager;
16#[cfg(test)]
17mod test_util;
18pub(crate) mod utils;
19
20use std::sync::Arc;
21
22use common_error::ext::BoxedError;
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::lock_key::RemoteWalLock;
25use common_meta::region_registry::LeaderRegionRegistryRef;
26use common_procedure::error::ToJsonSnafu;
27use common_procedure::{
28 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
29 Result as ProcedureResult, Status, StringKey,
30};
31use common_telemetry::{info, warn};
32use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
33use rskafka::client::Client;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt;
36use store_api::logstore::EntryId;
37
38use crate::error::{self};
39use crate::procedure::wal_prune::utils::{
40 delete_records, get_offsets_for_topic, get_partition_client, update_pruned_entry_id,
41};
42use crate::Result;
43
44pub type KafkaClientRef = Arc<Client>;
45
46#[derive(Clone)]
47pub struct Context {
48 pub client: KafkaClientRef,
50 pub table_metadata_manager: TableMetadataManagerRef,
52 pub leader_region_registry: LeaderRegionRegistryRef,
54}
55
56#[derive(Serialize, Deserialize)]
58pub struct WalPruneData {
59 pub topic: String,
61 pub prunable_entry_id: EntryId,
63}
64
65pub struct WalPruneProcedure {
67 pub data: WalPruneData,
68 pub context: Context,
69 pub _guard: Option<WalPruneProcedureGuard>,
70}
71
72impl WalPruneProcedure {
73 const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
74
75 pub fn new(
76 context: Context,
77 guard: Option<WalPruneProcedureGuard>,
78 topic: String,
79 prunable_entry_id: u64,
80 ) -> Self {
81 Self {
82 data: WalPruneData {
83 topic,
84 prunable_entry_id,
85 },
86 context,
87 _guard: guard,
88 }
89 }
90
91 pub fn from_json(
92 json: &str,
93 context: &Context,
94 tracker: WalPruneProcedureTracker,
95 ) -> ProcedureResult<Self> {
96 let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
97 let guard = tracker.insert_running_procedure(data.topic.clone());
98 Ok(Self {
99 data,
100 context: context.clone(),
101 _guard: guard,
102 })
103 }
104
105 pub async fn on_prune(&mut self) -> Result<Status> {
111 let partition_client = get_partition_client(&self.context.client, &self.data.topic).await?;
112 let (earliest_offset, latest_offset) =
113 get_offsets_for_topic(&partition_client, &self.data.topic).await?;
114 if self.data.prunable_entry_id <= earliest_offset {
115 warn!(
116 "The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}, latest offset: {}",
117 self.data.topic,
118 self.data.prunable_entry_id,
119 earliest_offset,
120 latest_offset
121 );
122 return Ok(Status::done());
123 }
124
125 delete_records(
127 &partition_client,
128 &self.data.topic,
129 self.data.prunable_entry_id,
130 )
131 .await
132 .map_err(BoxedError::new)
133 .with_context(|_| error::RetryLaterWithSourceSnafu {
134 reason: format!(
135 "Failed to delete records for topic: {}, prunable entry id: {}, latest offset: {}",
136 self.data.topic, self.data.prunable_entry_id, latest_offset
137 ),
138 })?;
139
140 update_pruned_entry_id(
142 &self.context.table_metadata_manager,
143 &self.data.topic,
144 self.data.prunable_entry_id,
145 )
146 .await
147 .map_err(BoxedError::new)
148 .with_context(|_| error::RetryLaterWithSourceSnafu {
149 reason: format!(
150 "Failed to update pruned entry id for topic: {}",
151 self.data.topic
152 ),
153 })?;
154
155 info!(
156 "Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}",
157 self.data.topic, self.data.prunable_entry_id, latest_offset
158 );
159 Ok(Status::done())
160 }
161}
162
163#[async_trait::async_trait]
164impl Procedure for WalPruneProcedure {
165 fn type_name(&self) -> &str {
166 Self::TYPE_NAME
167 }
168
169 fn rollback_supported(&self) -> bool {
170 false
171 }
172
173 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
174 self.on_prune().await.map_err(|e| {
175 if e.is_retryable() {
176 ProcedureError::retry_later(e)
177 } else {
178 ProcedureError::external(e)
179 }
180 })
181 }
182
183 fn dump(&self) -> ProcedureResult<String> {
184 serde_json::to_string(&self.data).context(ToJsonSnafu)
185 }
186
187 fn lock_key(&self) -> LockKey {
192 let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
193 LockKey::new(vec![lock_key])
194 }
195}
196
197#[cfg(test)]
198mod tests {
199 use std::assert_matches::assert_matches;
200
201 use common_wal::maybe_skip_kafka_integration_test;
202 use common_wal::test_util::get_kafka_endpoints;
203 use rskafka::client::partition::{FetchResult, UnknownTopicHandling};
204 use rskafka::record::Record;
205
206 use super::*;
207 use crate::procedure::test_util::new_wal_prune_metadata;
208 use crate::procedure::wal_prune::test_util::TestEnv;
210
211 async fn mock_test_data(context: Context, topic: &str) -> u64 {
216 let n_region = 10;
217 let n_table = 5;
218 let offsets = mock_wal_entries(
220 context.client.clone(),
221 topic,
222 (n_region * n_table * 5) as usize,
223 )
224 .await;
225 let prunable_entry_id = new_wal_prune_metadata(
226 context.table_metadata_manager.clone(),
227 context.leader_region_registry.clone(),
228 n_region,
229 n_table,
230 &offsets,
231 topic.to_string(),
232 )
233 .await;
234 prunable_entry_id
235 }
236
237 fn record(i: usize) -> Record {
238 let key = format!("key_{i}");
239 let value = format!("value_{i}");
240 Record {
241 key: Some(key.into()),
242 value: Some(value.into()),
243 timestamp: chrono::Utc::now(),
244 headers: Default::default(),
245 }
246 }
247
248 async fn mock_wal_entries(
249 client: KafkaClientRef,
250 topic_name: &str,
251 n_entries: usize,
252 ) -> Vec<i64> {
253 let controller_client = client.controller_client().unwrap();
254 let _ = controller_client
255 .create_topic(topic_name, 1, 1, 5_000)
256 .await;
257 let partition_client = client
258 .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
259 .await
260 .unwrap();
261 let mut offsets = Vec::with_capacity(n_entries);
262 for i in 0..n_entries {
263 let record = vec![record(i)];
264 let offset = partition_client
265 .produce(
266 record,
267 rskafka::client::partition::Compression::NoCompression,
268 )
269 .await
270 .unwrap()
271 .offsets;
272 offsets.extend(offset);
273 }
274 offsets
275 }
276
277 async fn check_entry_id_existence(
278 client: KafkaClientRef,
279 topic_name: &str,
280 entry_id: i64,
281 expect_success: bool,
282 ) {
283 let partition_client = client
284 .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
285 .await
286 .unwrap();
287 let res = partition_client
288 .fetch_records(entry_id, 0..10001, 5_000)
289 .await;
290 if expect_success {
291 assert!(res.is_ok());
292 let FetchResult { records, .. } = res.unwrap();
293 assert!(!records.is_empty());
294 } else {
295 let err = res.unwrap_err();
296 assert!(err.to_string().contains("OffsetOutOfRange"));
298 }
299 }
300
301 async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
302 let controller_client = client.controller_client().unwrap();
303 controller_client
304 .delete_topic(topic_name, 5_000)
305 .await
306 .unwrap();
307 }
308
309 #[tokio::test]
310 async fn test_procedure_execution() {
311 maybe_skip_kafka_integration_test!();
312 let broker_endpoints = get_kafka_endpoints();
313
314 common_telemetry::init_default_ut_logging();
315 let mut topic_name = uuid::Uuid::new_v4().to_string();
316 topic_name = format!("test_procedure_execution-{}", topic_name);
318 let env = TestEnv::new();
319 let context = env.build_wal_prune_context(broker_endpoints).await;
320 TestEnv::prepare_topic(&context.client, &topic_name).await;
322
323 let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await;
325 let mut procedure =
326 WalPruneProcedure::new(context.clone(), None, topic_name.clone(), prunable_entry_id);
327 let status = procedure.on_prune().await.unwrap();
328 assert_matches!(status, Status::Done { output: None });
329 check_entry_id_existence(
331 procedure.context.client.clone(),
332 &topic_name,
333 procedure.data.prunable_entry_id as i64,
334 true,
335 )
336 .await;
337 check_entry_id_existence(
339 procedure.context.client.clone(),
340 &topic_name,
341 procedure.data.prunable_entry_id as i64 - 1,
342 false,
343 )
344 .await;
345
346 let value = env
347 .table_metadata_manager
348 .topic_name_manager()
349 .get(&topic_name)
350 .await
351 .unwrap()
352 .unwrap();
353 assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
354 delete_topic(procedure.context.client, &topic_name).await;
356 }
357}