Skip to main content

meta_srv/region/
flush_trigger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_base::readable_size::ReadableSize;
21use common_meta::instruction::{FlushRegions, Instruction};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
24use common_meta::peer::Peer;
25use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef};
26use common_meta::stats::topic::TopicStatsRegistryRef;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{debug, error, info, warn};
29use common_time::util::current_time_millis;
30use common_wal::config::kafka::common::{
31    DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
32    DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL, DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
33};
34use snafu::{OptionExt, ResultExt};
35use store_api::region_request::RegionFlushReason;
36use store_api::storage::RegionId;
37use tokio::sync::mpsc::{Receiver, Sender};
38
39use crate::error::{self, Result};
40use crate::service::mailbox::{Channel, MailboxRef};
41use crate::{define_ticker, metrics};
42
43/// The duration of the recent period.
44const RECENT_DURATION: Duration = Duration::from_secs(300);
45
46/// [`Event`] represents various types of events that can be processed by the region flush ticker.
47///
48/// Variants:
49/// - `Tick`: This event is used to trigger region flush trigger periodically.
50pub enum Event {
51    Tick,
52}
53
54pub(crate) type RegionFlushTickerRef = Arc<RegionFlushTicker>;
55
56define_ticker!(
57    /// [RegionFlushTicker] is used to trigger region flush trigger periodically.
58    RegionFlushTicker,
59    event_type = Event,
60    event_value = Event::Tick
61);
62
63/// [`RegionFlushTrigger`] is used to ensure that the estimated WAL replay size
64/// stays below a certain threshold by triggering a region flush when the estimated
65/// WAL replay size exceeds that threshold. This helps improve datanode startup
66/// speed and reduce the overall startup time.
67///
68/// The estimated WAL replay size is calculated as:
69/// `(latest_entry_id - flushed_entry_id) * avg_record_size`
70pub struct RegionFlushTrigger {
71    /// The metadata manager.
72    table_metadata_manager: TableMetadataManagerRef,
73    /// The leader region registry.
74    leader_region_registry: LeaderRegionRegistryRef,
75    /// The topic stats registry.
76    topic_stats_registry: TopicStatsRegistryRef,
77    /// The mailbox to send messages.
78    mailbox: MailboxRef,
79    /// The server address.
80    server_addr: String,
81    /// The flush trigger size.
82    flush_trigger_size: ReadableSize,
83    /// The checkpoint trigger size.
84    checkpoint_trigger_size: ReadableSize,
85    /// The interval of the region flush trigger.
86    region_flush_trigger_interval: Duration,
87    /// The interval to periodically persist region checkpoints regardless of replay size.
88    periodic_checkpoint_persist_interval: Duration,
89    /// The last timestamp in milliseconds when a region checkpoint was persisted.
90    last_checkpoint_persist_millis_by_region: HashMap<RegionId, i64>,
91    /// The receiver of events.
92    receiver: Receiver<Event>,
93}
94
95impl RegionFlushTrigger {
96    /// Creates a new [`RegionFlushTrigger`].
97    #[allow(clippy::too_many_arguments)]
98    pub(crate) fn new(
99        table_metadata_manager: TableMetadataManagerRef,
100        leader_region_registry: LeaderRegionRegistryRef,
101        topic_stats_registry: TopicStatsRegistryRef,
102        mailbox: MailboxRef,
103        server_addr: String,
104        mut flush_trigger_size: ReadableSize,
105        mut checkpoint_trigger_size: ReadableSize,
106        mut region_flush_trigger_interval: Duration,
107        mut periodic_checkpoint_persist_interval: Duration,
108    ) -> (Self, RegionFlushTicker) {
109        if flush_trigger_size.as_bytes() == 0 {
110            flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
111            warn!(
112                "flush_trigger_size is not set, using default value: {}",
113                flush_trigger_size
114            );
115        }
116        if checkpoint_trigger_size.as_bytes() == 0 {
117            checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE;
118            warn!(
119                "checkpoint_trigger_size is not set, using default value: {}",
120                checkpoint_trigger_size
121            );
122        }
123        if region_flush_trigger_interval.is_zero() {
124            region_flush_trigger_interval = DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL;
125            warn!(
126                "region_flush_trigger_interval is not set, using default value: {:?}",
127                region_flush_trigger_interval
128            );
129        }
130        if periodic_checkpoint_persist_interval.is_zero() {
131            periodic_checkpoint_persist_interval = DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL;
132            warn!(
133                "periodic_checkpoint_persist_interval is not set, using default value: {:?}",
134                periodic_checkpoint_persist_interval
135            );
136        }
137        let (tx, rx) = Self::channel();
138        let region_flush_ticker = RegionFlushTicker::new(region_flush_trigger_interval, tx);
139        let region_flush_trigger = Self {
140            table_metadata_manager,
141            leader_region_registry,
142            topic_stats_registry,
143            mailbox,
144            server_addr,
145            flush_trigger_size,
146            checkpoint_trigger_size,
147            region_flush_trigger_interval,
148            periodic_checkpoint_persist_interval,
149            last_checkpoint_persist_millis_by_region: HashMap::new(),
150            receiver: rx,
151        };
152        (region_flush_trigger, region_flush_ticker)
153    }
154
155    fn channel() -> (Sender<Event>, Receiver<Event>) {
156        tokio::sync::mpsc::channel(8)
157    }
158
159    /// Starts the region flush trigger.
160    pub fn try_start(mut self) -> Result<()> {
161        common_runtime::spawn_global(async move { self.run().await });
162        info!("Region flush trigger started");
163        Ok(())
164    }
165
166    async fn run(&mut self) {
167        while let Some(event) = self.receiver.recv().await {
168            match event {
169                Event::Tick => self.handle_tick().await,
170            }
171        }
172    }
173
174    async fn handle_tick(&mut self) {
175        if let Err(e) = self.trigger_flush().await {
176            error!(e; "Failed to trigger flush");
177        }
178    }
179
180    async fn trigger_flush(&mut self) -> Result<()> {
181        let now = Instant::now();
182        let now_millis = current_time_millis();
183        let topics = self
184            .table_metadata_manager
185            .topic_name_manager()
186            .range()
187            .await
188            .context(error::TableMetadataManagerSnafu)?;
189
190        let mut active_region_ids = HashSet::new();
191        for topic in &topics {
192            if let Err(e) = self
193                .handle_topic(topic, now_millis, &mut active_region_ids)
194                .await
195            {
196                error!(e; "Failed to handle regions in topic: {}", topic);
197            }
198        }
199        retain_checkpoint_persist_records(
200            &mut self.last_checkpoint_persist_millis_by_region,
201            &active_region_ids,
202        );
203
204        debug!(
205            "Triggered flush for {} topics in {:?}",
206            topics.len(),
207            now.elapsed()
208        );
209        Ok(())
210    }
211
212    async fn handle_topic(
213        &mut self,
214        topic: &str,
215        now_millis: i64,
216        active_region_ids: &mut HashSet<RegionId>,
217    ) -> Result<()> {
218        let topic_regions = self
219            .table_metadata_manager
220            .topic_region_manager()
221            .regions(topic)
222            .await
223            .context(error::TableMetadataManagerSnafu)?;
224
225        if topic_regions.is_empty() {
226            debug!("No regions found for topic: {}", topic);
227            return Ok(());
228        }
229        active_region_ids.extend(topic_regions.keys().copied());
230
231        let topic_stat = self.retrieve_topic_stat(topic);
232        let size_based_regions = topic_stat
233            .map(|(latest_entry_id, avg_record_size)| {
234                filter_regions_by_replay_size(
235                    topic,
236                    topic_regions.iter().map(|(region_id, value)| {
237                        (*region_id, value.min_entry_id().unwrap_or_default())
238                    }),
239                    avg_record_size as u64,
240                    latest_entry_id,
241                    self.checkpoint_trigger_size,
242                )
243            })
244            .unwrap_or_default();
245        // Periodic checkpoint persistence is intentionally independent of topic stats freshness:
246        // Kafka retention can advance even when recent write stats are unavailable.
247        let periodic_regions = filter_regions_for_periodic_checkpoint(
248            topic_regions.keys().copied(),
249            &self.last_checkpoint_persist_millis_by_region,
250            now_millis,
251            self.periodic_checkpoint_persist_interval,
252        );
253        let regions_to_persist = merge_region_ids(size_based_regions, periodic_regions);
254        let region_manifests = self
255            .leader_region_registry
256            .batch_get(topic_regions.keys().cloned());
257
258        let pruned_entry_id = self
259            .table_metadata_manager
260            .topic_name_manager()
261            .get(topic)
262            .await
263            .context(error::TableMetadataManagerSnafu)?
264            .map(|v| v.into_inner().pruned_entry_id);
265        debug!("Topic: {}, pruned entry id: {:?}", topic, pruned_entry_id);
266
267        match self
268            .persist_region_checkpoints(
269                topic,
270                &regions_to_persist,
271                &topic_regions,
272                &region_manifests,
273            )
274            .await
275        {
276            // Only mark regions that were actually written to KV. If the checkpoint is stale,
277            // already persisted, or the write fails, the next tick should retry.
278            Ok(region_ids) => mark_checkpoint_persisted(
279                &mut self.last_checkpoint_persist_millis_by_region,
280                &region_ids,
281                now_millis,
282            ),
283            Err(err) => error!(err; "Failed to persist region checkpoints for topic: {}", topic),
284        }
285
286        self.flush_regions_in_topic(topic, topic_stat, pruned_entry_id, region_manifests)
287            .await?;
288
289        Ok(())
290    }
291
292    /// Retrieves the latest entry id and average record size of a topic.
293    ///
294    /// Returns `None` if the topic is not found or the latest entry id is not recent.
295    fn retrieve_topic_stat(&self, topic: &str) -> Option<(u64, usize)> {
296        let Some((latest_entry_id, timestamp)) =
297            self.topic_stats_registry.get_latest_entry_id(topic)
298        else {
299            debug!("No latest entry id found for topic: {}", topic);
300            return None;
301        };
302
303        let Some(stat) = self
304            .topic_stats_registry
305            .get_calculated_topic_stat(topic, self.region_flush_trigger_interval)
306        else {
307            debug!("No topic stat found for topic: {}", topic);
308            return None;
309        };
310
311        let now = current_time_millis();
312        if !is_recent(timestamp, now, RECENT_DURATION) {
313            debug!(
314                "Latest entry id of topic '{}': is not recent (now: {}, stat timestamp: {})",
315                topic, timestamp, now
316            );
317            return None;
318        }
319        if !is_recent(stat.end_ts, now, RECENT_DURATION) {
320            debug!(
321                "Calculated stat of topic '{}': is not recent (now: {}, stat timestamp: {})",
322                topic, stat.end_ts, now
323            );
324            return None;
325        }
326
327        Some((latest_entry_id, stat.avg_record_size))
328    }
329
330    async fn persist_region_checkpoints(
331        &self,
332        topic: &str,
333        region_ids: &[RegionId],
334        topic_regions: &HashMap<RegionId, TopicRegionValue>,
335        leader_regions: &HashMap<RegionId, LeaderRegion>,
336    ) -> Result<Vec<RegionId>> {
337        let regions = region_ids
338            .iter()
339            .flat_map(|region_id| match leader_regions.get(region_id) {
340                Some(leader_region) => should_persist_region_checkpoint(
341                    leader_region,
342                    topic_regions
343                        .get(region_id)
344                        .cloned()
345                        .and_then(|value| value.checkpoint),
346                )
347                .map(|checkpoint| (*region_id, TopicRegionValue::new(Some(checkpoint)))),
348                None => None,
349            })
350            .collect::<Vec<_>>();
351
352        // `chunks` will panic if chunk size is zero, so return early if there are no regions to persist.
353        if regions.is_empty() {
354            return Ok(Vec::new());
355        }
356
357        let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
358        let batch_size = max_txn_ops.min(regions.len());
359        debug!(
360            "persisting {} region checkpoints for topic '{}', regions: {:?}",
361            regions.len(),
362            topic,
363            regions,
364        );
365        for batch in regions.chunks(batch_size) {
366            let batch = batch
367                .iter()
368                .map(|(region_id, value)| (TopicRegionKey::new(*region_id, topic), Some(*value)))
369                .collect::<Vec<_>>();
370            self.table_metadata_manager
371                .topic_region_manager()
372                .batch_put(&batch)
373                .await
374                .context(error::TableMetadataManagerSnafu)?;
375        }
376
377        metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL
378            .with_label_values(&[topic])
379            .inc_by(regions.len() as u64);
380
381        Ok(regions
382            .into_iter()
383            .map(|(region_id, _)| region_id)
384            .collect())
385    }
386
387    async fn flush_regions_in_topic(
388        &self,
389        topic: &str,
390        topic_stat: Option<(u64, usize)>,
391        topic_pruned_entry_id: Option<u64>,
392        region_manifests: HashMap<RegionId, LeaderRegion>,
393    ) -> Result<()> {
394        let regions = region_manifests
395            .into_iter()
396            .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
397            .collect::<Vec<_>>();
398
399        let regions_to_flush = if let Some((latest_entry_id, avg_record_size)) = topic_stat {
400            let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
401            if let Some((_, min_entry_id)) = min_entry_id {
402                let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
403                    .saturating_mul(avg_record_size as u64);
404                metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE
405                    .with_label_values(&[topic])
406                    .set(replay_size as i64);
407            }
408
409            // Selects regions to flush from the set of active regions.
410            filter_regions_by_replay_size(
411                topic,
412                regions.iter().copied(),
413                avg_record_size as u64,
414                latest_entry_id,
415                self.flush_trigger_size,
416            )
417        } else {
418            Vec::new()
419        };
420        let pruned_regions_to_flush = filter_regions_below_topic_pruned_entry_id(
421            topic,
422            regions.into_iter(),
423            topic_pruned_entry_id,
424        );
425        let regions_to_flush = merge_region_ids(regions_to_flush, pruned_regions_to_flush);
426
427        // Sends flush instructions to datanodes.
428        if !regions_to_flush.is_empty() {
429            self.send_flush_instructions(&regions_to_flush).await?;
430            debug!(
431                "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
432                regions_to_flush.len(),
433                topic,
434                regions_to_flush,
435            );
436        }
437
438        metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
439            .with_label_values(&[topic])
440            .inc_by(regions_to_flush.len() as u64);
441
442        Ok(())
443    }
444
445    async fn send_flush_instructions(&self, regions_to_flush: &[RegionId]) -> Result<()> {
446        let leader_to_region_ids =
447            group_regions_by_leader(&self.table_metadata_manager, regions_to_flush).await?;
448        let flush_instructions = leader_to_region_ids
449            .into_iter()
450            .map(|(leader, region_ids)| {
451                let flush_instruction = Instruction::FlushRegions(
452                    FlushRegions::async_batch(region_ids)
453                        .with_reason(RegionFlushReason::RemoteWalPrune),
454                );
455                (leader, flush_instruction)
456            });
457
458        let tracing_ctx = TracingContext::from_current_span();
459        let tracing_header = tracing_ctx.to_w3c();
460        for (peer, flush_instruction) in flush_instructions {
461            let msg = MailboxMessage::json_message(
462                &format!("Flush regions: {}", flush_instruction),
463                &format!("Metasrv@{}", self.server_addr),
464                &format!("Datanode-{}@{}", peer.id, peer.addr),
465                common_time::util::current_time_millis(),
466                &flush_instruction,
467                Some(tracing_header.clone()),
468            )
469            .with_context(|_| error::SerializeToJsonSnafu {
470                input: flush_instruction.to_string(),
471            })?;
472            if let Err(e) = self
473                .mailbox
474                .send_oneway(&Channel::Datanode(peer.id), msg)
475                .await
476            {
477                error!(e; "Failed to send flush instruction to datanode {}", peer);
478            }
479        }
480
481        Ok(())
482    }
483}
484
485/// Determines whether a region checkpoint should be persisted based on current and persisted state.
486fn should_persist_region_checkpoint(
487    current: &LeaderRegion,
488    persisted: Option<ReplayCheckpoint>,
489) -> Option<ReplayCheckpoint> {
490    let new_checkpoint = ReplayCheckpoint::new(
491        current.manifest.replay_entry_id(),
492        current.manifest.metadata_replay_entry_id(),
493    );
494
495    let Some(persisted) = persisted else {
496        return Some(new_checkpoint);
497    };
498
499    if new_checkpoint > persisted {
500        return Some(new_checkpoint);
501    }
502    None
503}
504
505/// Filter regions based on the estimated replay size.
506///
507/// Returns the regions if its estimated replay size exceeds the given threshold.
508/// The estimated replay size is calculated as:
509/// `(latest_entry_id - prunable_entry_id) * avg_record_size`
510fn filter_regions_by_replay_size<I: Iterator<Item = (RegionId, u64)>>(
511    topic: &str,
512    regions: I,
513    avg_record_size: u64,
514    latest_entry_id: u64,
515    threshold: ReadableSize,
516) -> Vec<RegionId> {
517    let mut regions_to_flush = Vec::new();
518    for (region_id, entry_id) in regions {
519        if entry_id < latest_entry_id {
520            let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size);
521            if replay_size > threshold.as_bytes() {
522                debug!(
523                    "Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'",
524                    region_id,
525                    ReadableSize(replay_size),
526                    threshold,
527                    entry_id,
528                    latest_entry_id,
529                    topic
530                );
531                regions_to_flush.push(region_id);
532            }
533        }
534    }
535
536    regions_to_flush
537}
538
539/// Filters regions whose prunable entry id is behind the topic pruned entry id.
540fn filter_regions_below_topic_pruned_entry_id<I: Iterator<Item = (RegionId, u64)>>(
541    topic: &str,
542    regions: I,
543    topic_pruned_entry_id: Option<u64>,
544) -> Vec<RegionId> {
545    let Some(topic_pruned_entry_id) = topic_pruned_entry_id else {
546        return Vec::new();
547    };
548
549    let mut regions_to_flush = Vec::new();
550    for (region_id, prunable_entry_id) in regions {
551        if prunable_entry_id < topic_pruned_entry_id {
552            debug!(
553                "Region {}: prunable entry id {} is below topic pruned entry id {}, topic: '{}'",
554                region_id, prunable_entry_id, topic_pruned_entry_id, topic,
555            );
556            regions_to_flush.push(region_id);
557        }
558    }
559
560    regions_to_flush
561}
562
563/// Filters regions that need periodic checkpoint persistence.
564fn filter_regions_for_periodic_checkpoint<I>(
565    regions: I,
566    last_persisted: &HashMap<RegionId, i64>,
567    now_millis: i64,
568    interval: Duration,
569) -> Vec<RegionId>
570where
571    I: Iterator<Item = RegionId>,
572{
573    let interval_millis = interval.as_millis() as i64;
574    regions
575        .filter(|region_id| {
576            last_persisted
577                .get(region_id)
578                .is_none_or(|last_persist_millis| {
579                    now_millis.saturating_sub(*last_persist_millis) >= interval_millis
580                })
581        })
582        .collect()
583}
584
585/// Merges two region id lists and removes duplicates.
586fn merge_region_ids(left: Vec<RegionId>, right: Vec<RegionId>) -> Vec<RegionId> {
587    left.into_iter()
588        .chain(right)
589        .collect::<HashSet<_>>()
590        .into_iter()
591        .collect()
592}
593
594/// Marks checkpoint persistence timestamps for regions.
595fn mark_checkpoint_persisted(
596    last_persisted: &mut HashMap<RegionId, i64>,
597    region_ids: &[RegionId],
598    now_millis: i64,
599) {
600    for region_id in region_ids {
601        last_persisted.insert(*region_id, now_millis);
602    }
603}
604
605/// Retains checkpoint persistence records for active regions.
606fn retain_checkpoint_persist_records(
607    last_persisted: &mut HashMap<RegionId, i64>,
608    active_region_ids: &HashSet<RegionId>,
609) {
610    last_persisted.retain(|region_id, _| active_region_ids.contains(region_id));
611}
612
613/// Group regions by leader.
614///
615/// The regions are grouped by the leader of the region.
616async fn group_regions_by_leader(
617    table_metadata_manager: &TableMetadataManagerRef,
618    regions_to_flush: &[RegionId],
619) -> Result<HashMap<Peer, Vec<RegionId>>> {
620    let table_ids = regions_to_flush
621        .iter()
622        .map(|region_id| region_id.table_id())
623        .collect::<HashSet<_>>()
624        .into_iter()
625        .collect::<Vec<_>>();
626    let table_ids_table_routes = table_metadata_manager
627        .table_route_manager()
628        .batch_get_physical_table_routes(&table_ids)
629        .await
630        .context(error::TableMetadataManagerSnafu)?;
631
632    let mut peer_region_ids_map: HashMap<Peer, Vec<RegionId>> = HashMap::new();
633    for region_id in regions_to_flush {
634        let table_id = region_id.table_id();
635        let table_route = table_ids_table_routes
636            .get(&table_id)
637            .context(error::TableRouteNotFoundSnafu { table_id })?;
638        let Some(region_route) = table_route
639            .region_routes
640            .iter()
641            .find(|r| r.region.id == *region_id)
642        else {
643            continue;
644        };
645        let Some(peer) = &region_route.leader_peer else {
646            continue;
647        };
648
649        match peer_region_ids_map.get_mut(peer) {
650            Some(region_ids) => {
651                region_ids.push(*region_id);
652            }
653            None => {
654                peer_region_ids_map.insert(peer.clone(), vec![*region_id]);
655            }
656        }
657    }
658    Ok(peer_region_ids_map)
659}
660
661/// Check if the timestamp is recent.
662///
663/// The timestamp is recent if the difference between the current time and the timestamp is less than the duration.
664fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
665    let duration = duration.as_millis() as i64;
666    now.saturating_sub(timestamp) < duration
667}
668
669#[cfg(test)]
670mod tests {
671    use std::sync::Arc;
672
673    use common_base::readable_size::ReadableSize;
674    use common_meta::instruction::FlushStrategy;
675    use common_meta::key::TableMetadataManager;
676    use common_meta::kv_backend::memory::MemoryKvBackend;
677    use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry};
678    use common_meta::sequence::SequenceBuilder;
679    use common_meta::stats::topic::TopicStatsRegistry;
680    use store_api::storage::RegionId;
681
682    use super::*;
683    use crate::handler::HeartbeatMailbox;
684    use crate::procedure::test_util::{MailboxContext, new_wal_prune_metadata};
685
686    #[test]
687    fn test_is_recent() {
688        let now = current_time_millis();
689        assert!(is_recent(now - 999, now, Duration::from_secs(1)));
690        assert!(!is_recent(now - 1001, now, Duration::from_secs(1)));
691    }
692
693    #[test]
694    fn test_filter_regions_for_periodic_checkpoint() {
695        let now_millis = 10_000;
696        let interval = Duration::from_secs(5);
697        let regions = vec![region_id(1, 1), region_id(1, 2), region_id(1, 3)];
698        let last_persisted = HashMap::from([
699            (region_id(1, 1), now_millis - 4_000),
700            (region_id(1, 2), now_millis - 5_000),
701        ]);
702
703        let result = filter_regions_for_periodic_checkpoint(
704            regions.into_iter(),
705            &last_persisted,
706            now_millis,
707            interval,
708        );
709
710        assert_eq!(result, vec![region_id(1, 2), region_id(1, 3)]);
711    }
712
713    #[test]
714    fn test_merge_region_ids() {
715        let merged = merge_region_ids(
716            vec![region_id(1, 1), region_id(1, 2)],
717            vec![region_id(1, 2), region_id(1, 3)],
718        );
719        let merged = merged.into_iter().collect::<HashSet<_>>();
720
721        assert_eq!(
722            merged,
723            HashSet::from([region_id(1, 1), region_id(1, 2), region_id(1, 3)])
724        );
725    }
726
727    #[test]
728    fn test_mark_checkpoint_persisted() {
729        let now_millis = 10_000;
730        let mut last_persisted = HashMap::from([(region_id(1, 1), 1_000)]);
731
732        mark_checkpoint_persisted(
733            &mut last_persisted,
734            &[region_id(1, 1), region_id(1, 2)],
735            now_millis,
736        );
737
738        assert_eq!(last_persisted.get(&region_id(1, 1)), Some(&now_millis));
739        assert_eq!(last_persisted.get(&region_id(1, 2)), Some(&now_millis));
740    }
741
742    #[test]
743    fn test_retain_checkpoint_persist_records() {
744        let mut last_persisted = HashMap::from([
745            (region_id(1, 1), 1_000),
746            (region_id(1, 2), 2_000),
747            (region_id(1, 3), 3_000),
748        ]);
749        let active_regions = HashSet::from([region_id(1, 1), region_id(1, 3)]);
750
751        retain_checkpoint_persist_records(&mut last_persisted, &active_regions);
752
753        assert_eq!(last_persisted.len(), 2);
754        assert!(last_persisted.contains_key(&region_id(1, 1)));
755        assert!(last_persisted.contains_key(&region_id(1, 3)));
756    }
757
758    fn region_id(table: u32, region: u32) -> RegionId {
759        RegionId::new(table, region)
760    }
761
762    #[test]
763    fn test_no_regions_to_flush_when_none_exceed_threshold() {
764        let topic = "test_topic";
765        let avg_record_size = 10;
766        let latest_entry_id = 100;
767        let flush_trigger_size = ReadableSize(1000); // 1000 bytes
768
769        // All regions have prunable_entry_id close to latest_entry_id, so replay_size is small
770        let regions = vec![
771            (region_id(1, 1), 99), // replay_size = (100-99)*10 = 10
772            (region_id(1, 2), 98), // replay_size = 20
773            (region_id(1, 3), 95), // replay_size = 50
774        ];
775
776        let result = filter_regions_by_replay_size(
777            topic,
778            regions.into_iter(),
779            avg_record_size,
780            latest_entry_id,
781            flush_trigger_size,
782        );
783        assert!(result.is_empty());
784    }
785
786    #[test]
787    fn test_regions_to_flush_when_some_exceed_threshold() {
788        let topic = "test_topic";
789        let avg_record_size = 10;
790        let latest_entry_id = 100;
791        let flush_trigger_size = ReadableSize(50); // 50 bytes
792
793        // Only region 1,3 will exceed threshold: (100-90)*10 = 100 > 50
794        let regions = vec![
795            (region_id(1, 1), 99), // replay_size = 10
796            (region_id(1, 2), 98), // replay_size = 20
797            (region_id(1, 3), 90), // replay_size = 100
798        ];
799
800        let result = filter_regions_by_replay_size(
801            topic,
802            regions.into_iter(),
803            avg_record_size,
804            latest_entry_id,
805            flush_trigger_size,
806        );
807        assert_eq!(result, vec![region_id(1, 3)]);
808    }
809
810    #[test]
811    fn test_regions_to_flush_with_zero_avg_record_size() {
812        let topic = "test_topic";
813        let avg_record_size = 0;
814        let latest_entry_id = 100;
815        let flush_trigger_size = ReadableSize(1);
816
817        let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
818
819        // replay_size will always be 0, so none should be flushed
820        let result = filter_regions_by_replay_size(
821            topic,
822            regions.into_iter(),
823            avg_record_size,
824            latest_entry_id,
825            flush_trigger_size,
826        );
827        assert!(result.is_empty());
828    }
829
830    #[test]
831    fn test_regions_to_flush_with_prunable_entry_id_equal_latest() {
832        let topic = "test_topic";
833        let avg_record_size = 10;
834        let latest_entry_id = 100;
835        let flush_trigger_size = ReadableSize(10);
836
837        let regions = vec![
838            (region_id(1, 1), 100), // prunable_entry_id == latest_entry_id, should not be flushed
839            (region_id(1, 2), 99),  // replay_size = 10
840        ];
841
842        let result = filter_regions_by_replay_size(
843            topic,
844            regions.into_iter(),
845            avg_record_size,
846            latest_entry_id,
847            flush_trigger_size,
848        );
849        // Only region 1,2 should be flushed if replay_size > 10
850        assert!(result.is_empty());
851    }
852
853    #[test]
854    fn test_multiple_regions_to_flush() {
855        let topic = "test_topic";
856        let avg_record_size = 5;
857        let latest_entry_id = 200;
858        let flush_trigger_size = ReadableSize(20);
859
860        let regions = vec![
861            (region_id(1, 1), 190), // replay_size = 50
862            (region_id(1, 2), 180), // replay_size = 100
863            (region_id(1, 3), 199), // replay_size = 5
864            (region_id(1, 4), 200), // replay_size = 0
865        ];
866
867        let result = filter_regions_by_replay_size(
868            topic,
869            regions.into_iter(),
870            avg_record_size,
871            latest_entry_id,
872            flush_trigger_size,
873        );
874        // Only regions 1,1 and 1,2 should be flushed
875        assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
876    }
877
878    #[test]
879    fn test_filter_regions_below_topic_pruned_entry_id_none() {
880        let topic = "test_topic";
881        let regions = vec![(region_id(1, 1), 90), (region_id(1, 2), 100)];
882
883        let result = filter_regions_below_topic_pruned_entry_id(topic, regions.into_iter(), None);
884
885        assert!(result.is_empty());
886    }
887
888    #[test]
889    fn test_filter_regions_below_topic_pruned_entry_id() {
890        let topic = "test_topic";
891        let regions = vec![
892            (region_id(1, 1), 99),  // below the topic pruned entry id
893            (region_id(1, 2), 100), // equal to the topic pruned entry id
894            (region_id(1, 3), 101), // above the topic pruned entry id
895            (region_id(1, 4), 80),  // below the topic pruned entry id
896        ];
897
898        let result =
899            filter_regions_below_topic_pruned_entry_id(topic, regions.into_iter(), Some(100));
900
901        assert_eq!(result, vec![region_id(1, 1), region_id(1, 4)]);
902    }
903
904    fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
905        LeaderRegion {
906            datanode_id: 1,
907            manifest: LeaderRegionManifestInfo::Metric {
908                data_manifest_version: 1,
909                data_flushed_entry_id: replay_entry_id,
910                data_topic_latest_entry_id: 0,
911                metadata_manifest_version: 1,
912                metadata_flushed_entry_id: metadata_replay_entry_id,
913                metadata_topic_latest_entry_id: 0,
914            },
915        }
916    }
917
918    fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
919        LeaderRegion {
920            datanode_id: 1,
921            manifest: LeaderRegionManifestInfo::Mito {
922                manifest_version: 1,
923                flushed_entry_id: replay_entry_id,
924                topic_latest_entry_id: 0,
925            },
926        }
927    }
928
929    #[test]
930    fn test_should_persist_region_checkpoint() {
931        // `persisted` is none
932        let current = metric_leader_region(100, 10);
933        let result = should_persist_region_checkpoint(&current, None).unwrap();
934        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
935
936        // `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
937        let current = mito_leader_region(100);
938        let result =
939            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, None)))
940                .unwrap();
941        assert_eq!(result, ReplayCheckpoint::new(100, None));
942
943        let current = metric_leader_region(100, 10);
944        let result =
945            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))))
946                .unwrap();
947        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
948
949        // `persisted.metadata_entry_id` is less than `current.manifest.metadata_replay_entry_id()`
950        let current = metric_leader_region(100, 10);
951        let result =
952            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
953                .unwrap();
954        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
955
956        // `persisted.metadata_entry_id` is none
957        let current = metric_leader_region(100, 10);
958        let result =
959            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)))
960                .unwrap();
961        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
962
963        // `current.manifest.metadata_replay_entry_id()` is none
964        let current = mito_leader_region(100);
965        let result =
966            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
967                .is_none();
968        assert!(result);
969
970        // `persisted.entry_id` is equal to `current.manifest.replay_entry_id()`
971        let current = metric_leader_region(100, 10);
972        let result =
973            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(10))));
974        assert!(result.is_none());
975        let current = mito_leader_region(100);
976        let result =
977            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)));
978        assert!(result.is_none());
979
980        // `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
981        // `persisted.metadata_entry_id` is greater than `current.manifest.metadata_replay_entry_id()`
982        let current = metric_leader_region(80, 11);
983        let result =
984            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
985        assert!(result.is_none());
986        let current = mito_leader_region(80);
987        let result =
988            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
989        assert!(result.is_none());
990    }
991
992    #[tokio::test]
993    async fn test_persist_region_checkpoints_returns_written_region_ids() {
994        let kv_backend = Arc::new(MemoryKvBackend::new());
995        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
996        let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
997        let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
998        let mailbox_sequence = SequenceBuilder::new(
999            "test_persist_region_checkpoints_returns_written_region_ids",
1000            kv_backend,
1001        )
1002        .build();
1003        let mailbox_ctx = MailboxContext::new(mailbox_sequence);
1004
1005        let (trigger, _ticker) = RegionFlushTrigger::new(
1006            table_metadata_manager.clone(),
1007            leader_region_registry,
1008            topic_stats_registry,
1009            mailbox_ctx.mailbox().clone(),
1010            "127.0.0.1:3002".to_string(),
1011            ReadableSize(1),
1012            ReadableSize(1),
1013            DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
1014            DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL,
1015        );
1016
1017        let topic = "test_topic";
1018        let region_to_write = region_id(1, 1);
1019        let region_already_persisted = region_id(1, 2);
1020        let region_without_leader = region_id(1, 3);
1021        let topic_regions = HashMap::from([
1022            (region_to_write, TopicRegionValue::new(None)),
1023            (
1024                region_already_persisted,
1025                TopicRegionValue::new(Some(ReplayCheckpoint::new(100, None))),
1026            ),
1027            (region_without_leader, TopicRegionValue::new(None)),
1028        ]);
1029        let leader_regions = HashMap::from([
1030            (region_to_write, mito_leader_region(100)),
1031            (region_already_persisted, mito_leader_region(100)),
1032        ]);
1033
1034        let written_region_ids = trigger
1035            .persist_region_checkpoints(
1036                topic,
1037                &[
1038                    region_to_write,
1039                    region_already_persisted,
1040                    region_without_leader,
1041                ],
1042                &topic_regions,
1043                &leader_regions,
1044            )
1045            .await
1046            .unwrap();
1047
1048        assert_eq!(written_region_ids, vec![region_to_write]);
1049        let persisted = table_metadata_manager
1050            .topic_region_manager()
1051            .get(TopicRegionKey::new(region_to_write, topic))
1052            .await
1053            .unwrap()
1054            .unwrap();
1055        assert_eq!(persisted.checkpoint, Some(ReplayCheckpoint::new(100, None)));
1056        let skipped = table_metadata_manager
1057            .topic_region_manager()
1058            .get(TopicRegionKey::new(region_already_persisted, topic))
1059            .await
1060            .unwrap();
1061        assert!(skipped.is_none());
1062    }
1063
1064    #[tokio::test]
1065    async fn test_send_flush_instructions_payload_includes_remote_wal_prune_reason() {
1066        let kv_backend = Arc::new(MemoryKvBackend::new());
1067        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
1068        let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
1069        let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
1070        let mailbox_sequence =
1071            SequenceBuilder::new("test_remote_wal_prune_flush_reason", kv_backend).build();
1072        let mut mailbox_ctx = MailboxContext::new(mailbox_sequence);
1073
1074        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
1075        mailbox_ctx
1076            .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
1077            .await;
1078
1079        let (trigger, _ticker) = RegionFlushTrigger::new(
1080            table_metadata_manager.clone(),
1081            leader_region_registry.clone(),
1082            topic_stats_registry,
1083            mailbox_ctx.mailbox().clone(),
1084            "127.0.0.1:3002".to_string(),
1085            ReadableSize(1),
1086            ReadableSize(1),
1087            DEFAULT_REGION_FLUSH_TRIGGER_INTERVAL,
1088            DEFAULT_PERIODIC_CHECKPOINT_PERSIST_INTERVAL,
1089        );
1090
1091        let topic = "test_topic".to_string();
1092        new_wal_prune_metadata(
1093            table_metadata_manager,
1094            leader_region_registry,
1095            1,
1096            1,
1097            &[0],
1098            topic,
1099        )
1100        .await;
1101
1102        let region_id = RegionId::new(0, 0);
1103        trigger.send_flush_instructions(&[region_id]).await.unwrap();
1104
1105        let response = rx.recv().await.unwrap().unwrap();
1106        let msg = response.mailbox_message.unwrap();
1107        let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
1108        let Instruction::FlushRegions(flush_regions) = instruction else {
1109            panic!("Expected FlushRegions instruction");
1110        };
1111
1112        assert_eq!(flush_regions.region_ids, vec![region_id]);
1113        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
1114        assert_eq!(
1115            flush_regions.reason,
1116            Some(RegionFlushReason::RemoteWalPrune)
1117        );
1118    }
1119}