1pub(crate) mod manager;
16#[cfg(test)]
17mod test_util;
18pub(crate) mod utils;
19
20use std::sync::Arc;
21
22use common_error::ext::BoxedError;
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::lock_key::RemoteWalLock;
25use common_meta::region_registry::LeaderRegionRegistryRef;
26use common_procedure::error::ToJsonSnafu;
27use common_procedure::{
28 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
29 Result as ProcedureResult, Status, StringKey,
30};
31use common_telemetry::{info, warn};
32use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
33use rskafka::client::Client;
34use serde::{Deserialize, Serialize};
35use snafu::ResultExt;
36use store_api::logstore::EntryId;
37
38use crate::Result;
39use crate::error::{self};
40use crate::procedure::wal_prune::utils::{
41 delete_records, get_offsets_for_topic, get_partition_client, update_pruned_entry_id,
42};
43
44pub type KafkaClientRef = Arc<Client>;
45
46#[derive(Clone)]
47pub struct Context {
48 pub client: KafkaClientRef,
50 pub table_metadata_manager: TableMetadataManagerRef,
52 pub leader_region_registry: LeaderRegionRegistryRef,
54}
55
56#[derive(Serialize, Deserialize)]
58pub struct WalPruneData {
59 pub topic: String,
61 pub prunable_entry_id: EntryId,
63}
64
65pub struct WalPruneProcedure {
67 pub data: WalPruneData,
68 pub context: Context,
69 pub _guard: Option<WalPruneProcedureGuard>,
70}
71
72impl WalPruneProcedure {
73 const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
74
75 pub fn new(
76 context: Context,
77 guard: Option<WalPruneProcedureGuard>,
78 topic: String,
79 prunable_entry_id: u64,
80 ) -> Self {
81 Self {
82 data: WalPruneData {
83 topic,
84 prunable_entry_id,
85 },
86 context,
87 _guard: guard,
88 }
89 }
90
91 pub fn from_json(
92 json: &str,
93 context: &Context,
94 tracker: WalPruneProcedureTracker,
95 ) -> ProcedureResult<Self> {
96 let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
97 let guard = tracker.insert_running_procedure(data.topic.clone());
98 Ok(Self {
99 data,
100 context: context.clone(),
101 _guard: guard,
102 })
103 }
104
105 pub async fn on_prune(&mut self) -> Result<Status> {
111 let partition_client = get_partition_client(&self.context.client, &self.data.topic).await?;
112 let (earliest_offset, latest_offset) =
113 get_offsets_for_topic(&partition_client, &self.data.topic).await?;
114 if self.data.prunable_entry_id <= earliest_offset {
115 warn!(
116 "The prunable entry id is less or equal to the earliest offset, topic: {}, prunable entry id: {}, earliest offset: {}, latest offset: {}",
117 self.data.topic, self.data.prunable_entry_id, earliest_offset, latest_offset
118 );
119 return Ok(Status::done());
120 }
121
122 delete_records(
124 &partition_client,
125 &self.data.topic,
126 self.data.prunable_entry_id,
127 )
128 .await?;
129
130 update_pruned_entry_id(
132 &self.context.table_metadata_manager,
133 &self.data.topic,
134 self.data.prunable_entry_id,
135 )
136 .await
137 .map_err(BoxedError::new)
138 .with_context(|_| error::RetryLaterWithSourceSnafu {
139 reason: format!(
140 "Failed to update pruned entry id for topic: {}",
141 self.data.topic
142 ),
143 })?;
144
145 info!(
146 "Successfully pruned WAL for topic: {}, prunable entry id: {}, latest offset: {}",
147 self.data.topic, self.data.prunable_entry_id, latest_offset
148 );
149 Ok(Status::done())
150 }
151}
152
153#[async_trait::async_trait]
154impl Procedure for WalPruneProcedure {
155 fn type_name(&self) -> &str {
156 Self::TYPE_NAME
157 }
158
159 fn rollback_supported(&self) -> bool {
160 false
161 }
162
163 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
164 self.on_prune().await.map_err(|e| {
165 if e.is_retryable() {
166 ProcedureError::retry_later(e)
167 } else {
168 ProcedureError::external(e)
169 }
170 })
171 }
172
173 fn dump(&self) -> ProcedureResult<String> {
174 serde_json::to_string(&self.data).context(ToJsonSnafu)
175 }
176
177 fn lock_key(&self) -> LockKey {
182 let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
183 LockKey::new(vec![lock_key])
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use std::assert_matches;
190
191 use common_wal::maybe_skip_kafka_integration_test;
192 use common_wal::test_util::get_kafka_endpoints;
193 use rskafka::client::partition::{FetchResult, UnknownTopicHandling};
194 use rskafka::record::Record;
195
196 use super::*;
197 use crate::procedure::test_util::new_wal_prune_metadata;
198 use crate::procedure::wal_prune::test_util::TestEnv;
200
201 async fn mock_test_data(context: Context, topic: &str) -> u64 {
206 let n_region = 10;
207 let n_table = 5;
208 let offsets = mock_wal_entries(
210 context.client.clone(),
211 topic,
212 (n_region * n_table * 5) as usize,
213 )
214 .await;
215
216 new_wal_prune_metadata(
217 context.table_metadata_manager.clone(),
218 context.leader_region_registry.clone(),
219 n_region,
220 n_table,
221 &offsets,
222 topic.to_string(),
223 )
224 .await
225 }
226
227 fn record(i: usize) -> Record {
228 let key = format!("key_{i}");
229 let value = format!("value_{i}");
230 Record {
231 key: Some(key.into()),
232 value: Some(value.into()),
233 timestamp: chrono::Utc::now(),
234 headers: Default::default(),
235 }
236 }
237
238 async fn mock_wal_entries(
239 client: KafkaClientRef,
240 topic_name: &str,
241 n_entries: usize,
242 ) -> Vec<i64> {
243 let controller_client = client.controller_client().unwrap();
244 let _ = controller_client
245 .create_topic(topic_name, 1, 1, 5_000)
246 .await;
247 let partition_client = client
248 .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
249 .await
250 .unwrap();
251 let mut offsets = Vec::with_capacity(n_entries);
252 for i in 0..n_entries {
253 let record = vec![record(i)];
254 let offset = partition_client
255 .produce(
256 record,
257 rskafka::client::partition::Compression::NoCompression,
258 )
259 .await
260 .unwrap()
261 .offsets;
262 offsets.extend(offset);
263 }
264 offsets
265 }
266
267 async fn check_entry_id_existence(
268 client: KafkaClientRef,
269 topic_name: &str,
270 entry_id: i64,
271 expect_success: bool,
272 ) {
273 let partition_client = client
274 .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
275 .await
276 .unwrap();
277 let res = partition_client
278 .fetch_records(entry_id, 0..10001, 5_000)
279 .await;
280 if expect_success {
281 assert!(res.is_ok());
282 let FetchResult { records, .. } = res.unwrap();
283 assert!(!records.is_empty());
284 } else {
285 let err = res.unwrap_err();
286 assert!(err.to_string().contains("OffsetOutOfRange"));
288 }
289 }
290
291 async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
292 let controller_client = client.controller_client().unwrap();
293 controller_client
294 .delete_topic(topic_name, 5_000)
295 .await
296 .unwrap();
297 }
298
299 #[tokio::test]
300 async fn test_procedure_execution() {
301 maybe_skip_kafka_integration_test!();
302 let broker_endpoints = get_kafka_endpoints();
303
304 common_telemetry::init_default_ut_logging();
305 let mut topic_name = uuid::Uuid::new_v4().to_string();
306 topic_name = format!("test_procedure_execution-{}", topic_name);
308 let env = TestEnv::new();
309 let context = env.build_wal_prune_context(broker_endpoints).await;
310 TestEnv::prepare_topic(&context.client, &topic_name).await;
312
313 let prunable_entry_id = mock_test_data(context.clone(), &topic_name).await;
315 let mut procedure =
316 WalPruneProcedure::new(context.clone(), None, topic_name.clone(), prunable_entry_id);
317 let status = procedure.on_prune().await.unwrap();
318 assert_matches!(status, Status::Done { output: None });
319 check_entry_id_existence(
321 procedure.context.client.clone(),
322 &topic_name,
323 procedure.data.prunable_entry_id as i64,
324 true,
325 )
326 .await;
327 check_entry_id_existence(
329 procedure.context.client.clone(),
330 &topic_name,
331 procedure.data.prunable_entry_id as i64 - 1,
332 false,
333 )
334 .await;
335
336 let value = env
337 .table_metadata_manager
338 .topic_name_manager()
339 .get(&topic_name)
340 .await
341 .unwrap()
342 .unwrap();
343 assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
344 delete_topic(procedure.context.client, &topic_name).await;
346 }
347}