1pub(crate) mod manager;
16#[cfg(test)]
17mod test_util;
18
19use std::collections::{HashMap, HashSet};
20use std::sync::Arc;
21use std::time::Duration;
22
23use api::v1::meta::MailboxMessage;
24use common_error::ext::BoxedError;
25use common_meta::instruction::{FlushRegions, Instruction};
26use common_meta::key::TableMetadataManagerRef;
27use common_meta::lock_key::RemoteWalLock;
28use common_meta::peer::Peer;
29use common_meta::region_registry::LeaderRegionRegistryRef;
30use common_procedure::error::ToJsonSnafu;
31use common_procedure::{
32 Context as ProcedureContext, Error as ProcedureError, LockKey, Procedure,
33 Result as ProcedureResult, Status, StringKey,
34};
35use common_telemetry::{info, warn};
36use itertools::{Itertools, MinMaxResult};
37use log_store::kafka::DEFAULT_PARTITION;
38use manager::{WalPruneProcedureGuard, WalPruneProcedureTracker};
39use rskafka::client::partition::UnknownTopicHandling;
40use rskafka::client::Client;
41use serde::{Deserialize, Serialize};
42use snafu::ResultExt;
43use store_api::logstore::EntryId;
44use store_api::storage::RegionId;
45
46use crate::error::{
47 self, BuildPartitionClientSnafu, DeleteRecordsSnafu, TableMetadataManagerSnafu,
48 UpdateTopicNameValueSnafu,
49};
50use crate::service::mailbox::{Channel, MailboxRef};
51use crate::Result;
52
53pub type KafkaClientRef = Arc<Client>;
54
55const DELETE_RECORDS_TIMEOUT: Duration = Duration::from_secs(5);
56
57#[derive(Debug, Serialize, Deserialize)]
59pub enum WalPruneState {
60 Prepare,
61 FlushRegion,
62 Prune,
63}
64
65#[derive(Clone)]
66pub struct Context {
67 pub client: KafkaClientRef,
69 pub table_metadata_manager: TableMetadataManagerRef,
71 pub leader_region_registry: LeaderRegionRegistryRef,
73 pub server_addr: String,
75 pub mailbox: MailboxRef,
77}
78
79#[derive(Serialize, Deserialize)]
81pub struct WalPruneData {
82 pub topic: String,
84 pub prunable_entry_id: EntryId,
86 pub regions_to_flush: Vec<RegionId>,
87 pub trigger_flush_threshold: u64,
90 pub state: WalPruneState,
92}
93
94pub struct WalPruneProcedure {
96 pub data: WalPruneData,
97 pub context: Context,
98 pub _guard: Option<WalPruneProcedureGuard>,
99}
100
101impl WalPruneProcedure {
102 const TYPE_NAME: &'static str = "metasrv-procedure::WalPrune";
103
104 pub fn new(
105 topic: String,
106 context: Context,
107 trigger_flush_threshold: u64,
108 guard: Option<WalPruneProcedureGuard>,
109 ) -> Self {
110 Self {
111 data: WalPruneData {
112 topic,
113 prunable_entry_id: 0,
114 trigger_flush_threshold,
115 regions_to_flush: vec![],
116 state: WalPruneState::Prepare,
117 },
118 context,
119 _guard: guard,
120 }
121 }
122
123 pub fn from_json(
124 json: &str,
125 context: &Context,
126 tracker: WalPruneProcedureTracker,
127 ) -> ProcedureResult<Self> {
128 let data: WalPruneData = serde_json::from_str(json).context(ToJsonSnafu)?;
129 let guard = tracker.insert_running_procedure(data.topic.clone());
130 Ok(Self {
131 data,
132 context: context.clone(),
133 _guard: guard,
134 })
135 }
136
137 async fn build_peer_to_region_ids_map(
138 &self,
139 ctx: &Context,
140 region_ids: &[RegionId],
141 ) -> Result<HashMap<Peer, Vec<RegionId>>> {
142 let table_ids = region_ids
143 .iter()
144 .map(|region_id| region_id.table_id())
145 .collect::<HashSet<_>>()
146 .into_iter()
147 .collect::<Vec<_>>();
148 let table_ids_table_routes_map = ctx
149 .table_metadata_manager
150 .table_route_manager()
151 .batch_get_physical_table_routes(&table_ids)
152 .await
153 .context(TableMetadataManagerSnafu)?;
154
155 let mut peer_region_ids_map = HashMap::new();
156 for region_id in region_ids {
157 let table_id = region_id.table_id();
158 let table_route = match table_ids_table_routes_map.get(&table_id) {
159 Some(route) => route,
160 None => return error::TableRouteNotFoundSnafu { table_id }.fail(),
161 };
162 for region_route in &table_route.region_routes {
163 if region_route.region.id != *region_id {
164 continue;
165 }
166 if let Some(peer) = ®ion_route.leader_peer {
167 peer_region_ids_map
168 .entry(peer.clone())
169 .or_insert_with(Vec::new)
170 .push(*region_id);
171 }
172 }
173 }
174 Ok(peer_region_ids_map)
175 }
176
177 fn build_flush_region_instruction(
178 &self,
179 peer_region_ids_map: HashMap<Peer, Vec<RegionId>>,
180 ) -> Result<Vec<(Peer, Instruction)>> {
181 let peer_and_instructions = peer_region_ids_map
182 .into_iter()
183 .map(|(peer, region_ids)| {
184 let flush_instruction = Instruction::FlushRegions(FlushRegions { region_ids });
185 (peer.clone(), flush_instruction)
186 })
187 .collect();
188 Ok(peer_and_instructions)
189 }
190
191 pub async fn on_prepare(&mut self) -> Result<Status> {
196 let region_ids = self
197 .context
198 .table_metadata_manager
199 .topic_region_manager()
200 .regions(&self.data.topic)
201 .await
202 .context(TableMetadataManagerSnafu)
203 .map_err(BoxedError::new)
204 .with_context(|_| error::RetryLaterWithSourceSnafu {
205 reason: "Failed to get topic-region map",
206 })?;
207 let prunable_entry_ids_map: HashMap<_, _> = self
208 .context
209 .leader_region_registry
210 .batch_get(region_ids.iter().cloned())
211 .into_iter()
212 .map(|(region_id, region)| {
213 let prunable_entry_id = region.manifest.prunable_entry_id();
214 (region_id, prunable_entry_id)
215 })
216 .collect();
217
218 let non_collected_region_ids =
220 check_heartbeat_collected_region_ids(®ion_ids, &prunable_entry_ids_map);
221 if !non_collected_region_ids.is_empty() {
222 warn!("The heartbeat collected region ids do not contain all region ids in the topic-region map. Aborting the WAL prune procedure.
225 topic: {}, non-collected region ids: {:?}", self.data.topic, non_collected_region_ids);
226 return Ok(Status::done());
227 }
228
229 let min_max_result = prunable_entry_ids_map.values().minmax();
230 let max_prunable_entry_id = match min_max_result {
231 MinMaxResult::NoElements => {
232 return Ok(Status::done());
233 }
234 MinMaxResult::OneElement(prunable_entry_id) => {
235 self.data.prunable_entry_id = *prunable_entry_id;
236 *prunable_entry_id
237 }
238 MinMaxResult::MinMax(min_prunable_entry_id, max_prunable_entry_id) => {
239 self.data.prunable_entry_id = *min_prunable_entry_id;
240 *max_prunable_entry_id
241 }
242 };
243 if self.data.trigger_flush_threshold != 0 {
244 for (region_id, prunable_entry_id) in prunable_entry_ids_map {
245 if prunable_entry_id + self.data.trigger_flush_threshold < max_prunable_entry_id {
246 self.data.regions_to_flush.push(region_id);
247 }
248 }
249 self.data.state = WalPruneState::FlushRegion;
250 } else {
251 self.data.state = WalPruneState::Prune;
252 }
253 Ok(Status::executing(true))
254 }
255
256 pub async fn on_sending_flush_request(&mut self) -> Result<Status> {
261 let peer_to_region_ids_map = self
262 .build_peer_to_region_ids_map(&self.context, &self.data.regions_to_flush)
263 .await
264 .map_err(BoxedError::new)
265 .with_context(|_| error::RetryLaterWithSourceSnafu {
266 reason: "Failed to build peer to region ids map",
267 })?;
268 let flush_instructions = self.build_flush_region_instruction(peer_to_region_ids_map)?;
269 for (peer, flush_instruction) in flush_instructions.into_iter() {
270 let msg = MailboxMessage::json_message(
271 &format!("Flush regions: {}", flush_instruction),
272 &format!("Metasrv@{}", self.context.server_addr),
273 &format!("Datanode-{}@{}", peer.id, peer.addr),
274 common_time::util::current_time_millis(),
275 &flush_instruction,
276 )
277 .with_context(|_| error::SerializeToJsonSnafu {
278 input: flush_instruction.to_string(),
279 })?;
280 let ch = Channel::Datanode(peer.id);
281 self.context.mailbox.send_oneway(&ch, msg).await?;
282 }
283 self.data.state = WalPruneState::Prune;
284 Ok(Status::executing(true))
285 }
286
287 pub async fn on_prune(&mut self) -> Result<Status> {
293 let partition_client = self
295 .context
296 .client
297 .partition_client(
298 self.data.topic.clone(),
299 DEFAULT_PARTITION,
300 UnknownTopicHandling::Retry,
301 )
302 .await
303 .context(BuildPartitionClientSnafu {
304 topic: self.data.topic.clone(),
305 partition: DEFAULT_PARTITION,
306 })?;
307
308 let prev = self
311 .context
312 .table_metadata_manager
313 .topic_name_manager()
314 .get(&self.data.topic)
315 .await
316 .context(TableMetadataManagerSnafu)
317 .map_err(BoxedError::new)
318 .with_context(|_| error::RetryLaterWithSourceSnafu {
319 reason: format!("Failed to get TopicNameValue, topic: {}", self.data.topic),
320 })?;
321 self.context
322 .table_metadata_manager
323 .topic_name_manager()
324 .update(&self.data.topic, self.data.prunable_entry_id, prev)
325 .await
326 .context(UpdateTopicNameValueSnafu {
327 topic: &self.data.topic,
328 })
329 .map_err(BoxedError::new)
330 .with_context(|_| error::RetryLaterWithSourceSnafu {
331 reason: format!(
332 "Failed to update pruned entry id for topic: {}",
333 self.data.topic
334 ),
335 })?;
336 partition_client
337 .delete_records(
338 self.data.prunable_entry_id as i64,
340 DELETE_RECORDS_TIMEOUT.as_millis() as i32,
341 )
342 .await
343 .context(DeleteRecordsSnafu {
344 topic: &self.data.topic,
345 partition: DEFAULT_PARTITION,
346 offset: self.data.prunable_entry_id,
347 })
348 .map_err(BoxedError::new)
349 .with_context(|_| error::RetryLaterWithSourceSnafu {
350 reason: format!(
351 "Failed to delete records for topic: {}, partition: {}, offset: {}",
352 self.data.topic, DEFAULT_PARTITION, self.data.prunable_entry_id
353 ),
354 })?;
355 info!(
356 "Successfully pruned WAL for topic: {}, entry id: {}",
357 self.data.topic, self.data.prunable_entry_id
358 );
359 Ok(Status::done())
360 }
361}
362
363#[async_trait::async_trait]
364impl Procedure for WalPruneProcedure {
365 fn type_name(&self) -> &str {
366 Self::TYPE_NAME
367 }
368
369 fn rollback_supported(&self) -> bool {
370 false
371 }
372
373 async fn execute(&mut self, _ctx: &ProcedureContext) -> ProcedureResult<Status> {
374 let state = &self.data.state;
375
376 match state {
377 WalPruneState::Prepare => self.on_prepare().await,
378 WalPruneState::FlushRegion => self.on_sending_flush_request().await,
379 WalPruneState::Prune => self.on_prune().await,
380 }
381 .map_err(|e| {
382 if e.is_retryable() {
383 ProcedureError::retry_later(e)
384 } else {
385 ProcedureError::external(e)
386 }
387 })
388 }
389
390 fn dump(&self) -> ProcedureResult<String> {
391 serde_json::to_string(&self.data).context(ToJsonSnafu)
392 }
393
394 fn lock_key(&self) -> LockKey {
399 let lock_key: StringKey = RemoteWalLock::Write(self.data.topic.clone()).into();
400 LockKey::new(vec![lock_key])
401 }
402}
403
404fn check_heartbeat_collected_region_ids(
406 region_ids: &[RegionId],
407 heartbeat_collected_region_ids: &HashMap<RegionId, u64>,
408) -> Vec<RegionId> {
409 let mut non_collected_region_ids = Vec::new();
410 for region_id in region_ids {
411 if !heartbeat_collected_region_ids.contains_key(region_id) {
412 non_collected_region_ids.push(*region_id);
413 }
414 }
415 non_collected_region_ids
416}
417
418#[cfg(test)]
419mod tests {
420 use std::assert_matches::assert_matches;
421
422 use api::v1::meta::HeartbeatResponse;
423 use common_wal::test_util::run_test_with_kafka_wal;
424 use rskafka::record::Record;
425 use tokio::sync::mpsc::Receiver;
426
427 use super::*;
428 use crate::handler::HeartbeatMailbox;
429 use crate::procedure::test_util::new_wal_prune_metadata;
430 use crate::procedure::wal_prune::test_util::TestEnv;
432
433 async fn mock_test_data(procedure: &WalPruneProcedure) -> (u64, Vec<RegionId>) {
438 let n_region = 10;
439 let n_table = 5;
440 let offsets = mock_wal_entries(
442 procedure.context.client.clone(),
443 &procedure.data.topic,
444 (n_region * n_table * 5) as usize,
445 )
446 .await;
447 let (prunable_entry_id, regions_to_flush) = new_wal_prune_metadata(
448 procedure.context.table_metadata_manager.clone(),
449 procedure.context.leader_region_registry.clone(),
450 n_region,
451 n_table,
452 &offsets,
453 procedure.data.trigger_flush_threshold,
454 procedure.data.topic.clone(),
455 )
456 .await;
457 (prunable_entry_id, regions_to_flush)
458 }
459
460 fn record(i: usize) -> Record {
461 let key = format!("key_{i}");
462 let value = format!("value_{i}");
463 Record {
464 key: Some(key.into()),
465 value: Some(value.into()),
466 timestamp: chrono::Utc::now(),
467 headers: Default::default(),
468 }
469 }
470
471 async fn mock_wal_entries(
472 client: KafkaClientRef,
473 topic_name: &str,
474 n_entries: usize,
475 ) -> Vec<i64> {
476 let controller_client = client.controller_client().unwrap();
477 let _ = controller_client
478 .create_topic(topic_name, 1, 1, 5_000)
479 .await;
480 let partition_client = client
481 .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
482 .await
483 .unwrap();
484 let mut offsets = Vec::with_capacity(n_entries);
485 for i in 0..n_entries {
486 let record = vec![record(i)];
487 let offset = partition_client
488 .produce(
489 record,
490 rskafka::client::partition::Compression::NoCompression,
491 )
492 .await
493 .unwrap();
494 offsets.extend(offset);
495 }
496 offsets
497 }
498
499 async fn check_entry_id_existence(
500 client: KafkaClientRef,
501 topic_name: &str,
502 entry_id: i64,
503 expect_success: bool,
504 ) {
505 let partition_client = client
506 .partition_client(topic_name, 0, UnknownTopicHandling::Retry)
507 .await
508 .unwrap();
509 let res = partition_client
510 .fetch_records(entry_id, 0..10001, 5_000)
511 .await;
512 if expect_success {
513 assert!(res.is_ok());
514 let (record, _high_watermark) = res.unwrap();
515 assert!(!record.is_empty());
516 } else {
517 let err = res.unwrap_err();
518 assert!(err.to_string().contains("OffsetOutOfRange"));
520 }
521 }
522
523 async fn delete_topic(client: KafkaClientRef, topic_name: &str) {
524 let controller_client = client.controller_client().unwrap();
525 controller_client
526 .delete_topic(topic_name, 5_000)
527 .await
528 .unwrap();
529 }
530
531 async fn check_flush_request(
532 rx: &mut Receiver<std::result::Result<HeartbeatResponse, tonic::Status>>,
533 region_ids: &[RegionId],
534 ) {
535 let resp = rx.recv().await.unwrap().unwrap();
536 let msg = resp.mailbox_message.unwrap();
537 let flush_instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
538 let mut flush_requested_region_ids = match flush_instruction {
539 Instruction::FlushRegions(FlushRegions { region_ids, .. }) => region_ids,
540 _ => unreachable!(),
541 };
542 let sorted_region_ids = region_ids
543 .iter()
544 .cloned()
545 .sorted_by_key(|a| a.as_u64())
546 .collect::<Vec<_>>();
547 flush_requested_region_ids.sort_by_key(|a| a.as_u64());
548 assert_eq!(flush_requested_region_ids, sorted_region_ids);
549 }
550
551 #[tokio::test]
552 async fn test_procedure_execution() {
553 run_test_with_kafka_wal(|broker_endpoints| {
554 Box::pin(async {
555 common_telemetry::init_default_ut_logging();
556 let mut topic_name = uuid::Uuid::new_v4().to_string();
557 topic_name = format!("test_procedure_execution-{}", topic_name);
559 let mut env = TestEnv::new();
560 let context = env.build_wal_prune_context(broker_endpoints).await;
561 TestEnv::prepare_topic(&context.client, &topic_name).await;
562 let mut procedure = WalPruneProcedure::new(topic_name.clone(), context, 10, None);
563
564 let result = procedure.on_prune().await;
566 assert_matches!(result, Err(e) if e.is_retryable());
567
568 let (prunable_entry_id, regions_to_flush) = mock_test_data(&procedure).await;
569
570 let status = procedure.on_prepare().await.unwrap();
572 assert_matches!(
573 status,
574 Status::Executing {
575 persist: true,
576 clean_poisons: false
577 }
578 );
579 assert_matches!(procedure.data.state, WalPruneState::FlushRegion);
580 assert_eq!(procedure.data.prunable_entry_id, prunable_entry_id);
581 assert_eq!(
582 procedure.data.regions_to_flush.len(),
583 regions_to_flush.len()
584 );
585 for region_id in ®ions_to_flush {
586 assert!(procedure.data.regions_to_flush.contains(region_id));
587 }
588
589 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
591 env.mailbox
592 .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
593 .await;
594 let status = procedure.on_sending_flush_request().await.unwrap();
595 check_flush_request(&mut rx, ®ions_to_flush).await;
596 assert_matches!(
597 status,
598 Status::Executing {
599 persist: true,
600 clean_poisons: false
601 }
602 );
603 assert_matches!(procedure.data.state, WalPruneState::Prune);
604
605 let status = procedure.on_prune().await.unwrap();
607 assert_matches!(status, Status::Done { output: None });
608 check_entry_id_existence(
610 procedure.context.client.clone(),
611 &topic_name,
612 procedure.data.prunable_entry_id as i64,
613 true,
614 )
615 .await;
616 check_entry_id_existence(
618 procedure.context.client.clone(),
619 &topic_name,
620 procedure.data.prunable_entry_id as i64 - 1,
621 false,
622 )
623 .await;
624
625 let value = env
626 .table_metadata_manager
627 .topic_name_manager()
628 .get(&topic_name)
629 .await
630 .unwrap()
631 .unwrap();
632 assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
633
634 procedure.context.leader_region_registry.reset();
637 let status = procedure.on_prepare().await.unwrap();
638 assert_matches!(status, Status::Done { output: None });
639
640 procedure.data.trigger_flush_threshold = 0;
642 procedure.on_prepare().await.unwrap();
643 assert_matches!(procedure.data.state, WalPruneState::Prune);
644 assert_eq!(value.pruned_entry_id, procedure.data.prunable_entry_id);
645
646 delete_topic(procedure.context.client, &topic_name).await;
648 })
649 })
650 .await;
651 }
652}