1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_base::readable_size::ReadableSize;
21use common_meta::instruction::{FlushRegions, Instruction};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
24use common_meta::peer::Peer;
25use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef};
26use common_meta::stats::topic::TopicStatsRegistryRef;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{debug, error, info, warn};
29use common_time::util::current_time_millis;
30use common_wal::config::kafka::common::{
31 DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
32 DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
33};
34use snafu::{OptionExt, ResultExt};
35use store_api::region_request::RegionFlushReason;
36use store_api::storage::RegionId;
37use tokio::sync::mpsc::{Receiver, Sender};
38
39use crate::error::{self, Result};
40use crate::service::mailbox::{Channel, MailboxRef};
41use crate::{define_ticker, metrics};
42
43const RECENT_DURATION: Duration = Duration::from_secs(300);
45
46pub enum Event {
51 Tick,
52}
53
54pub(crate) type RegionFlushTickerRef = Arc<RegionFlushTicker>;
55
56define_ticker!(
57 RegionFlushTicker,
59 event_type = Event,
60 event_value = Event::Tick
61);
62
63pub struct RegionFlushTrigger {
71 table_metadata_manager: TableMetadataManagerRef,
73 leader_region_registry: LeaderRegionRegistryRef,
75 topic_stats_registry: TopicStatsRegistryRef,
77 mailbox: MailboxRef,
79 server_addr: String,
81 flush_trigger_size: ReadableSize,
83 checkpoint_trigger_size: ReadableSize,
85 region_flush_trigger_interval: Duration,
87 periodic_checkpoint_persist_interval: Duration,
89 last_checkpoint_persist_millis_by_region: HashMap<RegionId, i64>,
91 receiver: Receiver<Event>,
93}
94
95impl RegionFlushTrigger {
96 #[allow(clippy::too_many_arguments)]
98 pub(crate) fn new(
99 table_metadata_manager: TableMetadataManagerRef,
100 leader_region_registry: LeaderRegionRegistryRef,
101 topic_stats_registry: TopicStatsRegistryRef,
102 mailbox: MailboxRef,
103 server_addr: String,
104 mut flush_trigger_size: ReadableSize,
105 mut checkpoint_trigger_size: ReadableSize,
106 mut region_flush_trigger_interval: Duration,
107 mut periodic_checkpoint_persist_interval: Duration,
108 ) -> (Self, RegionFlushTicker) {
109 if flush_trigger_size.as_bytes() == 0 {
110 flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
111 warn!(
112 "flush_trigger_size is not set, using default value: {}",
113 flush_trigger_size
114 );
115 }
116 if checkpoint_trigger_size.as_bytes() == 0 {
117 checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE;
118 warn!(
119 "checkpoint_trigger_size is not set, using default value: {}",
120 checkpoint_trigger_size
121 );
122 }
123 if region_flush_trigger_interval.is_zero() {
124 region_flush_trigger_interval = DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL;
125 warn!(
126 "region_flush_trigger_interval is not set, using default value: {:?}",
127 region_flush_trigger_interval
128 );
129 }
130 if periodic_checkpoint_persist_interval.is_zero() {
131 periodic_checkpoint_persist_interval = DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL;
132 warn!(
133 "periodic_checkpoint_persist_interval is not set, using default value: {:?}",
134 periodic_checkpoint_persist_interval
135 );
136 }
137 let (tx, rx) = Self::channel();
138 let region_flush_ticker = RegionFlushTicker::new(region_flush_trigger_interval, tx);
139 let region_flush_trigger = Self {
140 table_metadata_manager,
141 leader_region_registry,
142 topic_stats_registry,
143 mailbox,
144 server_addr,
145 flush_trigger_size,
146 checkpoint_trigger_size,
147 region_flush_trigger_interval,
148 periodic_checkpoint_persist_interval,
149 last_checkpoint_persist_millis_by_region: HashMap::new(),
150 receiver: rx,
151 };
152 (region_flush_trigger, region_flush_ticker)
153 }
154
155 fn channel() -> (Sender<Event>, Receiver<Event>) {
156 tokio::sync::mpsc::channel(8)
157 }
158
159 pub fn try_start(mut self) -> Result<()> {
161 common_runtime::spawn_global(async move { self.run().await });
162 info!("Region flush trigger started");
163 Ok(())
164 }
165
166 async fn run(&mut self) {
167 while let Some(event) = self.receiver.recv().await {
168 match event {
169 Event::Tick => self.handle_tick().await,
170 }
171 }
172 }
173
174 async fn handle_tick(&mut self) {
175 if let Err(e) = self.trigger_flush().await {
176 error!(e; "Failed to trigger flush");
177 }
178 }
179
180 async fn trigger_flush(&mut self) -> Result<()> {
181 let now = Instant::now();
182 let now_millis = current_time_millis();
183 let topics = self
184 .table_metadata_manager
185 .topic_name_manager()
186 .range()
187 .await
188 .context(error::TableMetadataManagerSnafu)?;
189
190 let mut active_region_ids = HashSet::new();
191 for topic in &topics {
192 if let Err(e) = self
193 .handle_topic(topic, now_millis, &mut active_region_ids)
194 .await
195 {
196 error!(e; "Failed to handle regions in topic: {}", topic);
197 }
198 }
199 retain_checkpoint_persist_records(
200 &mut self.last_checkpoint_persist_millis_by_region,
201 &active_region_ids,
202 );
203
204 debug!(
205 "Triggered flush for {} topics in {:?}",
206 topics.len(),
207 now.elapsed()
208 );
209 Ok(())
210 }
211
212 async fn handle_topic(
213 &mut self,
214 topic: &str,
215 now_millis: i64,
216 active_region_ids: &mut HashSet<RegionId>,
217 ) -> Result<()> {
218 let topic_regions = self
219 .table_metadata_manager
220 .topic_region_manager()
221 .regions(topic)
222 .await
223 .context(error::TableMetadataManagerSnafu)?;
224
225 if topic_regions.is_empty() {
226 debug!("No regions found for topic: {}", topic);
227 return Ok(());
228 }
229 active_region_ids.extend(topic_regions.keys().copied());
230
231 let topic_stat = self.retrieve_topic_stat(topic);
232 let size_based_regions = topic_stat
233 .map(|(latest_entry_id, avg_record_size)| {
234 filter_regions_by_replay_size(
235 topic,
236 topic_regions.iter().map(|(region_id, value)| {
237 (*region_id, value.min_entry_id().unwrap_or_default())
238 }),
239 avg_record_size as u64,
240 latest_entry_id,
241 self.checkpoint_trigger_size,
242 )
243 })
244 .unwrap_or_default();
245 let periodic_regions = filter_regions_for_periodic_checkpoint(
248 topic_regions.keys().copied(),
249 &self.last_checkpoint_persist_millis_by_region,
250 now_millis,
251 self.periodic_checkpoint_persist_interval,
252 );
253 let regions_to_persist = merge_region_ids(size_based_regions, periodic_regions);
254 let region_manifests = self
255 .leader_region_registry
256 .batch_get(topic_regions.keys().cloned());
257
258 let pruned_entry_id = self
259 .table_metadata_manager
260 .topic_name_manager()
261 .get(topic)
262 .await
263 .context(error::TableMetadataManagerSnafu)?
264 .map(|v| v.into_inner().pruned_entry_id);
265 debug!("Topic: {}, pruned entry id: {:?}", topic, pruned_entry_id);
266
267 match self
268 .persist_region_checkpoints(
269 topic,
270 ®ions_to_persist,
271 &topic_regions,
272 ®ion_manifests,
273 )
274 .await
275 {
276 Ok(region_ids) => mark_checkpoint_persisted(
279 &mut self.last_checkpoint_persist_millis_by_region,
280 ®ion_ids,
281 now_millis,
282 ),
283 Err(err) => error!(err; "Failed to persist region checkpoints for topic: {}", topic),
284 }
285
286 self.flush_regions_in_topic(topic, topic_stat, pruned_entry_id, region_manifests)
287 .await?;
288
289 Ok(())
290 }
291
292 fn retrieve_topic_stat(&self, topic: &str) -> Option<(u64, usize)> {
296 let Some((latest_entry_id, timestamp)) =
297 self.topic_stats_registry.get_latest_entry_id(topic)
298 else {
299 debug!("No latest entry id found for topic: {}", topic);
300 return None;
301 };
302
303 let Some(stat) = self
304 .topic_stats_registry
305 .get_calculated_topic_stat(topic, self.region_flush_trigger_interval)
306 else {
307 debug!("No topic stat found for topic: {}", topic);
308 return None;
309 };
310
311 let now = current_time_millis();
312 if !is_recent(timestamp, now, RECENT_DURATION) {
313 debug!(
314 "Latest entry id of topic '{}': is not recent (now: {}, stat timestamp: {})",
315 topic, timestamp, now
316 );
317 return None;
318 }
319 if !is_recent(stat.end_ts, now, RECENT_DURATION) {
320 debug!(
321 "Calculated stat of topic '{}': is not recent (now: {}, stat timestamp: {})",
322 topic, stat.end_ts, now
323 );
324 return None;
325 }
326
327 Some((latest_entry_id, stat.avg_record_size))
328 }
329
330 async fn persist_region_checkpoints(
331 &self,
332 topic: &str,
333 region_ids: &[RegionId],
334 topic_regions: &HashMap<RegionId, TopicRegionValue>,
335 leader_regions: &HashMap<RegionId, LeaderRegion>,
336 ) -> Result<Vec<RegionId>> {
337 let regions = region_ids
338 .iter()
339 .flat_map(|region_id| match leader_regions.get(region_id) {
340 Some(leader_region) => should_persist_region_checkpoint(
341 leader_region,
342 topic_regions
343 .get(region_id)
344 .cloned()
345 .and_then(|value| value.checkpoint),
346 )
347 .map(|checkpoint| (*region_id, TopicRegionValue::new(Some(checkpoint)))),
348 None => None,
349 })
350 .collect::<Vec<_>>();
351
352 if regions.is_empty() {
354 return Ok(Vec::new());
355 }
356
357 let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
358 let batch_size = max_txn_ops.min(regions.len());
359 debug!(
360 "persisting {} region checkpoints for topic '{}', regions: {:?}",
361 regions.len(),
362 topic,
363 regions,
364 );
365 for batch in regions.chunks(batch_size) {
366 let batch = batch
367 .iter()
368 .map(|(region_id, value)| (TopicRegionKey::new(*region_id, topic), Some(*value)))
369 .collect::<Vec<_>>();
370 self.table_metadata_manager
371 .topic_region_manager()
372 .batch_put(&batch)
373 .await
374 .context(error::TableMetadataManagerSnafu)?;
375 }
376
377 metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL
378 .with_label_values(&[topic])
379 .inc_by(regions.len() as u64);
380
381 Ok(regions
382 .into_iter()
383 .map(|(region_id, _)| region_id)
384 .collect())
385 }
386
387 async fn flush_regions_in_topic(
388 &self,
389 topic: &str,
390 topic_stat: Option<(u64, usize)>,
391 topic_pruned_entry_id: Option<u64>,
392 region_manifests: HashMap<RegionId, LeaderRegion>,
393 ) -> Result<()> {
394 let regions = region_manifests
395 .into_iter()
396 .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
397 .collect::<Vec<_>>();
398
399 let regions_to_flush = if let Some((latest_entry_id, avg_record_size)) = topic_stat {
400 let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
401 if let Some((_, min_entry_id)) = min_entry_id {
402 let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
403 .saturating_mul(avg_record_size as u64);
404 metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE
405 .with_label_values(&[topic])
406 .set(replay_size as i64);
407 }
408
409 filter_regions_by_replay_size(
411 topic,
412 regions.iter().copied(),
413 avg_record_size as u64,
414 latest_entry_id,
415 self.flush_trigger_size,
416 )
417 } else {
418 Vec::new()
419 };
420 let pruned_regions_to_flush = filter_regions_below_topic_pruned_entry_id(
421 topic,
422 regions.into_iter(),
423 topic_pruned_entry_id,
424 );
425 let regions_to_flush = merge_region_ids(regions_to_flush, pruned_regions_to_flush);
426
427 if !regions_to_flush.is_empty() {
429 self.send_flush_instructions(®ions_to_flush).await?;
430 debug!(
431 "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
432 regions_to_flush.len(),
433 topic,
434 regions_to_flush,
435 );
436 }
437
438 metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
439 .with_label_values(&[topic])
440 .inc_by(regions_to_flush.len() as u64);
441
442 Ok(())
443 }
444
445 async fn send_flush_instructions(&self, regions_to_flush: &[RegionId]) -> Result<()> {
446 let leader_to_region_ids =
447 group_regions_by_leader(&self.table_metadata_manager, regions_to_flush).await?;
448 let flush_instructions = leader_to_region_ids
449 .into_iter()
450 .map(|(leader, region_ids)| {
451 let flush_instruction = Instruction::FlushRegions(
452 FlushRegions::async_batch(region_ids)
453 .with_reason(RegionFlushReason::RemoteWalPrune),
454 );
455 (leader, flush_instruction)
456 });
457
458 let tracing_ctx = TracingContext::from_current_span();
459 let tracing_header = tracing_ctx.to_w3c();
460 for (peer, flush_instruction) in flush_instructions {
461 let msg = MailboxMessage::json_message(
462 &format!("Flush regions: {}", flush_instruction),
463 &format!("Metasrv@{}", self.server_addr),
464 &format!("Datanode-{}@{}", peer.id, peer.addr),
465 common_time::util::current_time_millis(),
466 &flush_instruction,
467 Some(tracing_header.clone()),
468 )
469 .with_context(|_| error::SerializeToJsonSnafu {
470 input: flush_instruction.to_string(),
471 })?;
472 if let Err(e) = self
473 .mailbox
474 .send_oneway(&Channel::Datanode(peer.id), msg)
475 .await
476 {
477 error!(e; "Failed to send flush instruction to datanode {}", peer);
478 }
479 }
480
481 Ok(())
482 }
483}
484
485fn should_persist_region_checkpoint(
487 current: &LeaderRegion,
488 persisted: Option<ReplayCheckpoint>,
489) -> Option<ReplayCheckpoint> {
490 let new_checkpoint = ReplayCheckpoint::new(
491 current.manifest.replay_entry_id(),
492 current.manifest.metadata_replay_entry_id(),
493 );
494
495 let Some(persisted) = persisted else {
496 return Some(new_checkpoint);
497 };
498
499 if new_checkpoint > persisted {
500 return Some(new_checkpoint);
501 }
502 None
503}
504
505fn filter_regions_by_replay_size<I: Iterator<Item = (RegionId, u64)>>(
511 topic: &str,
512 regions: I,
513 avg_record_size: u64,
514 latest_entry_id: u64,
515 threshold: ReadableSize,
516) -> Vec<RegionId> {
517 let mut regions_to_flush = Vec::new();
518 for (region_id, entry_id) in regions {
519 if entry_id < latest_entry_id {
520 let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size);
521 if replay_size > threshold.as_bytes() {
522 debug!(
523 "Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'",
524 region_id,
525 ReadableSize(replay_size),
526 threshold,
527 entry_id,
528 latest_entry_id,
529 topic
530 );
531 regions_to_flush.push(region_id);
532 }
533 }
534 }
535
536 regions_to_flush
537}
538
539fn filter_regions_below_topic_pruned_entry_id<I: Iterator<Item = (RegionId, u64)>>(
541 topic: &str,
542 regions: I,
543 topic_pruned_entry_id: Option<u64>,
544) -> Vec<RegionId> {
545 let Some(topic_pruned_entry_id) = topic_pruned_entry_id else {
546 return Vec::new();
547 };
548
549 let mut regions_to_flush = Vec::new();
550 for (region_id, prunable_entry_id) in regions {
551 if prunable_entry_id < topic_pruned_entry_id {
552 debug!(
553 "Region {}: prunable entry id {} is below topic pruned entry id {}, topic: '{}'",
554 region_id, prunable_entry_id, topic_pruned_entry_id, topic,
555 );
556 regions_to_flush.push(region_id);
557 }
558 }
559
560 regions_to_flush
561}
562
563fn filter_regions_for_periodic_checkpoint<I>(
565 regions: I,
566 last_persisted: &HashMap<RegionId, i64>,
567 now_millis: i64,
568 interval: Duration,
569) -> Vec<RegionId>
570where
571 I: Iterator<Item = RegionId>,
572{
573 let interval_millis = interval.as_millis() as i64;
574 regions
575 .filter(|region_id| {
576 last_persisted
577 .get(region_id)
578 .is_none_or(|last_persist_millis| {
579 now_millis.saturating_sub(*last_persist_millis) >= interval_millis
580 })
581 })
582 .collect()
583}
584
585fn merge_region_ids(left: Vec<RegionId>, right: Vec<RegionId>) -> Vec<RegionId> {
587 left.into_iter()
588 .chain(right)
589 .collect::<HashSet<_>>()
590 .into_iter()
591 .collect()
592}
593
594fn mark_checkpoint_persisted(
596 last_persisted: &mut HashMap<RegionId, i64>,
597 region_ids: &[RegionId],
598 now_millis: i64,
599) {
600 for region_id in region_ids {
601 last_persisted.insert(*region_id, now_millis);
602 }
603}
604
605fn retain_checkpoint_persist_records(
607 last_persisted: &mut HashMap<RegionId, i64>,
608 active_region_ids: &HashSet<RegionId>,
609) {
610 last_persisted.retain(|region_id, _| active_region_ids.contains(region_id));
611}
612
613async fn group_regions_by_leader(
617 table_metadata_manager: &TableMetadataManagerRef,
618 regions_to_flush: &[RegionId],
619) -> Result<HashMap<Peer, Vec<RegionId>>> {
620 let table_ids = regions_to_flush
621 .iter()
622 .map(|region_id| region_id.table_id())
623 .collect::<HashSet<_>>()
624 .into_iter()
625 .collect::<Vec<_>>();
626 let table_ids_table_routes = table_metadata_manager
627 .table_route_manager()
628 .batch_get_physical_table_routes(&table_ids)
629 .await
630 .context(error::TableMetadataManagerSnafu)?;
631
632 let mut peer_region_ids_map: HashMap<Peer, Vec<RegionId>> = HashMap::new();
633 for region_id in regions_to_flush {
634 let table_id = region_id.table_id();
635 let table_route = table_ids_table_routes
636 .get(&table_id)
637 .context(error::TableRouteNotFoundSnafu { table_id })?;
638 let Some(region_route) = table_route
639 .region_routes
640 .iter()
641 .find(|r| r.region.id == *region_id)
642 else {
643 continue;
644 };
645 let Some(peer) = ®ion_route.leader_peer else {
646 continue;
647 };
648
649 match peer_region_ids_map.get_mut(peer) {
650 Some(region_ids) => {
651 region_ids.push(*region_id);
652 }
653 None => {
654 peer_region_ids_map.insert(peer.clone(), vec![*region_id]);
655 }
656 }
657 }
658 Ok(peer_region_ids_map)
659}
660
661fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
665 let duration = duration.as_millis() as i64;
666 now.saturating_sub(timestamp) < duration
667}
668
669#[cfg(test)]
670mod tests {
671 use std::sync::Arc;
672
673 use common_base::readable_size::ReadableSize;
674 use common_meta::instruction::FlushStrategy;
675 use common_meta::key::TableMetadataManager;
676 use common_meta::kv_backend::memory::MemoryKvBackend;
677 use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry};
678 use common_meta::sequence::SequenceBuilder;
679 use common_meta::stats::topic::TopicStatsRegistry;
680 use store_api::storage::RegionId;
681
682 use super::*;
683 use crate::handler::HeartbeatMailbox;
684 use crate::procedure::test_util::{MailboxContext, new_wal_prune_metadata};
685
686 #[test]
687 fn test_is_recent() {
688 let now = current_time_millis();
689 assert!(is_recent(now - 999, now, Duration::from_secs(1)));
690 assert!(!is_recent(now - 1001, now, Duration::from_secs(1)));
691 }
692
693 #[test]
694 fn test_filter_regions_for_periodic_checkpoint() {
695 let now_millis = 10_000;
696 let interval = Duration::from_secs(5);
697 let regions = vec![region_id(1, 1), region_id(1, 2), region_id(1, 3)];
698 let last_persisted = HashMap::from([
699 (region_id(1, 1), now_millis - 4_000),
700 (region_id(1, 2), now_millis - 5_000),
701 ]);
702
703 let result = filter_regions_for_periodic_checkpoint(
704 regions.into_iter(),
705 &last_persisted,
706 now_millis,
707 interval,
708 );
709
710 assert_eq!(result, vec![region_id(1, 2), region_id(1, 3)]);
711 }
712
713 #[test]
714 fn test_merge_region_ids() {
715 let merged = merge_region_ids(
716 vec![region_id(1, 1), region_id(1, 2)],
717 vec![region_id(1, 2), region_id(1, 3)],
718 );
719 let merged = merged.into_iter().collect::<HashSet<_>>();
720
721 assert_eq!(
722 merged,
723 HashSet::from([region_id(1, 1), region_id(1, 2), region_id(1, 3)])
724 );
725 }
726
727 #[test]
728 fn test_mark_checkpoint_persisted() {
729 let now_millis = 10_000;
730 let mut last_persisted = HashMap::from([(region_id(1, 1), 1_000)]);
731
732 mark_checkpoint_persisted(
733 &mut last_persisted,
734 &[region_id(1, 1), region_id(1, 2)],
735 now_millis,
736 );
737
738 assert_eq!(last_persisted.get(®ion_id(1, 1)), Some(&now_millis));
739 assert_eq!(last_persisted.get(®ion_id(1, 2)), Some(&now_millis));
740 }
741
742 #[test]
743 fn test_retain_checkpoint_persist_records() {
744 let mut last_persisted = HashMap::from([
745 (region_id(1, 1), 1_000),
746 (region_id(1, 2), 2_000),
747 (region_id(1, 3), 3_000),
748 ]);
749 let active_regions = HashSet::from([region_id(1, 1), region_id(1, 3)]);
750
751 retain_checkpoint_persist_records(&mut last_persisted, &active_regions);
752
753 assert_eq!(last_persisted.len(), 2);
754 assert!(last_persisted.contains_key(®ion_id(1, 1)));
755 assert!(last_persisted.contains_key(®ion_id(1, 3)));
756 }
757
758 fn region_id(table: u32, region: u32) -> RegionId {
759 RegionId::new(table, region)
760 }
761
762 #[test]
763 fn test_no_regions_to_flush_when_none_exceed_threshold() {
764 let topic = "test_topic";
765 let avg_record_size = 10;
766 let latest_entry_id = 100;
767 let flush_trigger_size = ReadableSize(1000); let regions = vec![
771 (region_id(1, 1), 99), (region_id(1, 2), 98), (region_id(1, 3), 95), ];
775
776 let result = filter_regions_by_replay_size(
777 topic,
778 regions.into_iter(),
779 avg_record_size,
780 latest_entry_id,
781 flush_trigger_size,
782 );
783 assert!(result.is_empty());
784 }
785
786 #[test]
787 fn test_regions_to_flush_when_some_exceed_threshold() {
788 let topic = "test_topic";
789 let avg_record_size = 10;
790 let latest_entry_id = 100;
791 let flush_trigger_size = ReadableSize(50); let regions = vec![
795 (region_id(1, 1), 99), (region_id(1, 2), 98), (region_id(1, 3), 90), ];
799
800 let result = filter_regions_by_replay_size(
801 topic,
802 regions.into_iter(),
803 avg_record_size,
804 latest_entry_id,
805 flush_trigger_size,
806 );
807 assert_eq!(result, vec![region_id(1, 3)]);
808 }
809
810 #[test]
811 fn test_regions_to_flush_with_zero_avg_record_size() {
812 let topic = "test_topic";
813 let avg_record_size = 0;
814 let latest_entry_id = 100;
815 let flush_trigger_size = ReadableSize(1);
816
817 let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
818
819 let result = filter_regions_by_replay_size(
821 topic,
822 regions.into_iter(),
823 avg_record_size,
824 latest_entry_id,
825 flush_trigger_size,
826 );
827 assert!(result.is_empty());
828 }
829
830 #[test]
831 fn test_regions_to_flush_with_prunable_entry_id_equal_latest() {
832 let topic = "test_topic";
833 let avg_record_size = 10;
834 let latest_entry_id = 100;
835 let flush_trigger_size = ReadableSize(10);
836
837 let regions = vec![
838 (region_id(1, 1), 100), (region_id(1, 2), 99), ];
841
842 let result = filter_regions_by_replay_size(
843 topic,
844 regions.into_iter(),
845 avg_record_size,
846 latest_entry_id,
847 flush_trigger_size,
848 );
849 assert!(result.is_empty());
851 }
852
853 #[test]
854 fn test_multiple_regions_to_flush() {
855 let topic = "test_topic";
856 let avg_record_size = 5;
857 let latest_entry_id = 200;
858 let flush_trigger_size = ReadableSize(20);
859
860 let regions = vec![
861 (region_id(1, 1), 190), (region_id(1, 2), 180), (region_id(1, 3), 199), (region_id(1, 4), 200), ];
866
867 let result = filter_regions_by_replay_size(
868 topic,
869 regions.into_iter(),
870 avg_record_size,
871 latest_entry_id,
872 flush_trigger_size,
873 );
874 assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
876 }
877
878 #[test]
879 fn test_filter_regions_below_topic_pruned_entry_id_none() {
880 let topic = "test_topic";
881 let regions = vec![(region_id(1, 1), 90), (region_id(1, 2), 100)];
882
883 let result = filter_regions_below_topic_pruned_entry_id(topic, regions.into_iter(), None);
884
885 assert!(result.is_empty());
886 }
887
888 #[test]
889 fn test_filter_regions_below_topic_pruned_entry_id() {
890 let topic = "test_topic";
891 let regions = vec![
892 (region_id(1, 1), 99), (region_id(1, 2), 100), (region_id(1, 3), 101), (region_id(1, 4), 80), ];
897
898 let result =
899 filter_regions_below_topic_pruned_entry_id(topic, regions.into_iter(), Some(100));
900
901 assert_eq!(result, vec![region_id(1, 1), region_id(1, 4)]);
902 }
903
904 fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
905 LeaderRegion {
906 datanode_id: 1,
907 manifest: LeaderRegionManifestInfo::Metric {
908 data_manifest_version: 1,
909 data_flushed_entry_id: replay_entry_id,
910 data_topic_latest_entry_id: 0,
911 metadata_manifest_version: 1,
912 metadata_flushed_entry_id: metadata_replay_entry_id,
913 metadata_topic_latest_entry_id: 0,
914 },
915 }
916 }
917
918 fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
919 LeaderRegion {
920 datanode_id: 1,
921 manifest: LeaderRegionManifestInfo::Mito {
922 manifest_version: 1,
923 flushed_entry_id: replay_entry_id,
924 topic_latest_entry_id: 0,
925 },
926 }
927 }
928
929 #[test]
930 fn test_should_persist_region_checkpoint() {
931 let current = metric_leader_region(100, 10);
933 let result = should_persist_region_checkpoint(¤t, None).unwrap();
934 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
935
936 let current = mito_leader_region(100);
938 let result =
939 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, None)))
940 .unwrap();
941 assert_eq!(result, ReplayCheckpoint::new(100, None));
942
943 let current = metric_leader_region(100, 10);
944 let result =
945 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))))
946 .unwrap();
947 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
948
949 let current = metric_leader_region(100, 10);
951 let result =
952 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
953 .unwrap();
954 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
955
956 let current = metric_leader_region(100, 10);
958 let result =
959 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)))
960 .unwrap();
961 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
962
963 let current = mito_leader_region(100);
965 let result =
966 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
967 .is_none();
968 assert!(result);
969
970 let current = metric_leader_region(100, 10);
972 let result =
973 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(10))));
974 assert!(result.is_none());
975 let current = mito_leader_region(100);
976 let result =
977 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)));
978 assert!(result.is_none());
979
980 let current = metric_leader_region(80, 11);
983 let result =
984 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
985 assert!(result.is_none());
986 let current = mito_leader_region(80);
987 let result =
988 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
989 assert!(result.is_none());
990 }
991
992 #[tokio::test]
993 async fn test_persist_region_checkpoints_returns_written_region_ids() {
994 let kv_backend = Arc::new(MemoryKvBackend::new());
995 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
996 let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
997 let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
998 let mailbox_sequence = SequenceBuilder::new(
999 "test_persist_region_checkpoints_returns_written_region_ids",
1000 kv_backend,
1001 )
1002 .build();
1003 let mailbox_ctx = MailboxContext::new(mailbox_sequence);
1004
1005 let (trigger, _ticker) = RegionFlushTrigger::new(
1006 table_metadata_manager.clone(),
1007 leader_region_registry,
1008 topic_stats_registry,
1009 mailbox_ctx.mailbox().clone(),
1010 "127.0.0.1:3002".to_string(),
1011 ReadableSize(1),
1012 ReadableSize(1),
1013 DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
1014 DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL,
1015 );
1016
1017 let topic = "test_topic";
1018 let region_to_write = region_id(1, 1);
1019 let region_already_persisted = region_id(1, 2);
1020 let region_without_leader = region_id(1, 3);
1021 let topic_regions = HashMap::from([
1022 (region_to_write, TopicRegionValue::new(None)),
1023 (
1024 region_already_persisted,
1025 TopicRegionValue::new(Some(ReplayCheckpoint::new(100, None))),
1026 ),
1027 (region_without_leader, TopicRegionValue::new(None)),
1028 ]);
1029 let leader_regions = HashMap::from([
1030 (region_to_write, mito_leader_region(100)),
1031 (region_already_persisted, mito_leader_region(100)),
1032 ]);
1033
1034 let written_region_ids = trigger
1035 .persist_region_checkpoints(
1036 topic,
1037 &[
1038 region_to_write,
1039 region_already_persisted,
1040 region_without_leader,
1041 ],
1042 &topic_regions,
1043 &leader_regions,
1044 )
1045 .await
1046 .unwrap();
1047
1048 assert_eq!(written_region_ids, vec![region_to_write]);
1049 let persisted = table_metadata_manager
1050 .topic_region_manager()
1051 .get(TopicRegionKey::new(region_to_write, topic))
1052 .await
1053 .unwrap()
1054 .unwrap();
1055 assert_eq!(persisted.checkpoint, Some(ReplayCheckpoint::new(100, None)));
1056 let skipped = table_metadata_manager
1057 .topic_region_manager()
1058 .get(TopicRegionKey::new(region_already_persisted, topic))
1059 .await
1060 .unwrap();
1061 assert!(skipped.is_none());
1062 }
1063
1064 #[tokio::test]
1065 async fn test_send_flush_instructions_payload_includes_remote_wal_prune_reason() {
1066 let kv_backend = Arc::new(MemoryKvBackend::new());
1067 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1068 let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
1069 let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
1070 let mailbox_sequence =
1071 SequenceBuilder::new("test_remote_wal_prune_flush_reason", kv_backend).build();
1072 let mut mailbox_ctx = MailboxContext::new(mailbox_sequence);
1073
1074 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1075 mailbox_ctx
1076 .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
1077 .await;
1078
1079 let (trigger, _ticker) = RegionFlushTrigger::new(
1080 table_metadata_manager.clone(),
1081 leader_region_registry.clone(),
1082 topic_stats_registry,
1083 mailbox_ctx.mailbox().clone(),
1084 "127.0.0.1:3002".to_string(),
1085 ReadableSize(1),
1086 ReadableSize(1),
1087 DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
1088 DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL,
1089 );
1090
1091 let topic = "test_topic".to_string();
1092 new_wal_prune_metadata(
1093 table_metadata_manager,
1094 leader_region_registry,
1095 1,
1096 1,
1097 &[0],
1098 topic,
1099 )
1100 .await;
1101
1102 let region_id = RegionId::new(0, 0);
1103 trigger.send_flush_instructions(&[region_id]).await.unwrap();
1104
1105 let response = rx.recv().await.unwrap().unwrap();
1106 let msg = response.mailbox_message.unwrap();
1107 let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
1108 let Instruction::FlushRegions(flush_regions) = instruction else {
1109 panic!("Expected FlushRegions instruction");
1110 };
1111
1112 assert_eq!(flush_regions.region_ids, vec![region_id]);
1113 assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1114 assert_eq!(
1115 flush_regions.reason,
1116 Some(RegionFlushReason::RemoteWalPrune)
1117 );
1118 }
1119}