1pub(crate) mod manager;
16#[cfg(test)]
17mod test_util;
18pub(crate) mod utils;
19
20use std::sync::Arc;
21
22use common_error::ext::BoxedError;
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::lock_key::RemoteWalLock;
25use common_meta::region_registry::LeaderRegionRegistryRef;
26use common_procedure::error::ToJsonSnafu;
27use common_procedure::{
28 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
29 Result as ProcedureResult, Status, StringKey,
30};
31use common_telemetry::{info, warn};
32use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
33use rskafka::client::Client;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt;
36use store_api::logstore::EntryId;
37
38use crate::Result;
39use crate::error::{self};
40use crate::procedure::wal_prune::utils::{
41 delete_records, get_offsets_for_topic, get_partition_client, update_pruned_entry_id,
42};
43
44pub type KafkaClientRef = Arc<Client>;
45
46#[derive(Clone)]
47pub struct Context {
48 pub client: KafkaClientRef,
50 pub table_metadata_manager: TableMetadataManagerRef,
52 pub leader_region_registry: LeaderRegionRegistryRef,
54}
55
56#[derive(Serialize, Deserialize)]
58pub struct WalPruneData {
59 pub topic: String,
61 pub prunable_entry_id: EntryId,
63}
64
65pub struct WalPruneProcedure {
67 pub data: WalPruneData,
68 pub context: Context,
69 pub _guard: Option<WalPruneProcedureGuard>,
70}
71
72impl WalPruneProcedure {
73 const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
74
75 pub fn new(
76 context: Context,
77 guard: Option<WalPruneProcedureGuard>,
78 topic: String,
79 prunable_entry_id: u64,
80 ) -> Self {
81 Self {
82 data: WalPruneData {
83 topic,
84 prunable_entry_id,
85 },
86 context,
87 _guard: guard,
88 }
89 }
90
91 pub fn from_json(
92 json: &str,
93 context: &Context,
94 tracker: WalPruneProcedureTracker,
95 ) -> ProcedureResult<Self> {
96 let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
97 let guard = tracker.insert_running_procedure(data.topic.clone());
98 Ok(Self {
99 data,
100 context: context.clone(),
101 _guard: guard,
102 })
103 }
104
105 pub async fn on_prune(&mut self) -> Result<Status> {
111 let partition_client = get_partition_client(&self.context.client, &self.data.topic).await?;
112 let (earliest_offset, latest_offset) =
113 get_offsets_for_topic(&partition_client, &self.data.topic).await?;
114 if self.data.prunable_entry_id <= earliest_offset {
115 warn!(
116 "The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}, latest offset: {}",
117 self.data.topic, self.data.prunable_entry_id, earliest_offset, latest_offset
118 );
119 return Ok(Status::done());
120 }
121
122 delete_records(
124 &partition_client,
125 &self.data.topic,
126 self.data.prunable_entry_id,
127 )
128 .await
129 .map_err(BoxedError::new)
130 .with_context(|_| error::RetryLaterWithSourceSnafu {
131 reason: format!(
132 "Failed to delete records for topic: {}, prunable entry id: {}, latest offset: {}",
133 self.data.topic, self.data.prunable_entry_id, latest_offset
134 ),
135 })?;
136
137 update_pruned_entry_id(
139 &self.context.table_metadata_manager,
140 &self.data.topic,
141 self.data.prunable_entry_id,
142 )
143 .await
144 .map_err(BoxedError::new)
145 .with_context(|_| error::RetryLaterWithSourceSnafu {
146 reason: format!(
147 "Failed to update pruned entry id for topic: {}",
148 self.data.topic
149 ),
150 })?;
151
152 info!(
153 "Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}",
154 self.data.topic, self.data.prunable_entry_id, latest_offset
155 );
156 Ok(Status::done())
157 }
158}
159
160#[async_trait::async_trait]
161impl Procedure for WalPruneProcedure {
162 fn type_name(&self) -> &str {
163 Self::TYPE_NAME
164 }
165
166 fn rollback_supported(&self) -> bool {
167 false
168 }
169
170 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
171 self.on_prune().await.map_err(|e| {
172 if e.is_retryable() {
173 ProcedureError::retry_later(e)
174 } else {
175 ProcedureError::external(e)
176 }
177 })
178 }
179
180 fn dump(&self) -> ProcedureResult<String> {
181 serde_json::to_string(&self.data).context(ToJsonSnafu)
182 }
183
184 fn lock_key(&self) -> LockKey {
189 let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
190 LockKey::new(vec![lock_key])
191 }
192}
193
194#[cfg(test)]
195mod tests {
196 use std::assert_matches::assert_matches;
197
198 use common_wal::maybe_skip_kafka_integration_test;
199 use common_wal::test_util::get_kafka_endpoints;
200 use rskafka::client::partition::{FetchResult, UnknownTopicHandling};
201 use rskafka::record::Record;
202
203 use super::*;
204 use crate::procedure::test_util::new_wal_prune_metadata;
205 use crate::procedure::wal_prune::test_util::TestEnv;
207
208 async fn mock_test_data(context: Context, topic: &str) -> u64 {
213 let n_region = 10;
214 let n_table = 5;
215 let offsets = mock_wal_entries(
217 context.client.clone(),
218 topic,
219 (n_region * n_table * 5) as usize,
220 )
221 .await;
222
223 new_wal_prune_metadata(
224 context.table_metadata_manager.clone(),
225 context.leader_region_registry.clone(),
226 n_region,
227 n_table,
228 &offsets,
229 topic.to_string(),
230 )
231 .await
232 }
233
234 fn record(i: usize) -> Record {
235 let key = format!("key_{i}");
236 let value = format!("value_{i}");
237 Record {
238 key: Some(key.into()),
239 value: Some(value.into()),
240 timestamp: chrono::Utc::now(),
241 headers: Default::default(),
242 }
243 }
244
245 async fn mock_wal_entries(
246 client: KafkaClientRef,
247 topic_name: &str,
248 n_entries: usize,
249 ) -> Vec<i64> {
250 let controller_client = client.controller_client().unwrap();
251 let _ = controller_client
252 .create_topic(topic_name, 1, 1, 5_000)
253 .await;
254 let partition_client = client
255 .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
256 .await
257 .unwrap();
258 let mut offsets = Vec::with_capacity(n_entries);
259 for i in 0..n_entries {
260 let record = vec![record(i)];
261 let offset = partition_client
262 .produce(
263 record,
264 rskafka::client::partition::Compression::NoCompression,
265 )
266 .await
267 .unwrap()
268 .offsets;
269 offsets.extend(offset);
270 }
271 offsets
272 }
273
274 async fn check_entry_id_existence(
275 client: KafkaClientRef,
276 topic_name: &str,
277 entry_id: i64,
278 expect_success: bool,
279 ) {
280 let partition_client = client
281 .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
282 .await
283 .unwrap();
284 let res = partition_client
285 .fetch_records(entry_id, 0..10001, 5_000)
286 .await;
287 if expect_success {
288 assert!(res.is_ok());
289 let FetchResult { records, .. } = res.unwrap();
290 assert!(!records.is_empty());
291 } else {
292 let err = res.unwrap_err();
293 assert!(err.to_string().contains("OffsetOutOfRange"));
295 }
296 }
297
298 async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
299 let controller_client = client.controller_client().unwrap();
300 controller_client
301 .delete_topic(topic_name, 5_000)
302 .await
303 .unwrap();
304 }
305
306 #[tokio::test]
307 async fn test_procedure_execution() {
308 maybe_skip_kafka_integration_test!();
309 let broker_endpoints = get_kafka_endpoints();
310
311 common_telemetry::init_default_ut_logging();
312 let mut topic_name = uuid::Uuid::new_v4().to_string();
313 topic_name = format!("test_procedure_execution-{}", topic_name);
315 let env = TestEnv::new();
316 let context = env.build_wal_prune_context(broker_endpoints).await;
317 TestEnv::prepare_topic(&context.client, &topic_name).await;
319
320 let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await;
322 let mut procedure =
323 WalPruneProcedure::new(context.clone(), None, topic_name.clone(), prunable_entry_id);
324 let status = procedure.on_prune().await.unwrap();
325 assert_matches!(status, Status::Done { output: None });
326 check_entry_id_existence(
328 procedure.context.client.clone(),
329 &topic_name,
330 procedure.data.prunable_entry_id as i64,
331 true,
332 )
333 .await;
334 check_entry_id_existence(
336 procedure.context.client.clone(),
337 &topic_name,
338 procedure.data.prunable_entry_id as i64 - 1,
339 false,
340 )
341 .await;
342
343 let value = env
344 .table_metadata_manager
345 .topic_name_manager()
346 .get(&topic_name)
347 .await
348 .unwrap()
349 .unwrap();
350 assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
351 delete_topic(procedure.context.client, &topic_name).await;
353 }
354}