Skip to main content

meta_srv/region/
flush_trigger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_base::readable_size::ReadableSize;
21use common_meta::instruction::{FlushRegions, Instruction};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
24use common_meta::peer::Peer;
25use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef};
26use common_meta::stats::topic::TopicStatsRegistryRef;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{debug, error, info, warn};
29use common_time::util::current_time_millis;
30use common_wal::config::kafka::common::{
31    DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
32};
33use snafu::{OptionExt, ResultExt};
34use store_api::region_request::RegionFlushReason;
35use store_api::storage::RegionId;
36use tokio::sync::mpsc::{Receiver, Sender};
37
38use crate::error::{self, Result};
39use crate::service::mailbox::{Channel, MailboxRef};
40use crate::{define_ticker, metrics};
41
42/// The interval of the region flush ticker.
43const TICKER_INTERVAL: Duration = Duration::from_secs(60);
44
45/// The duration of the recent period.
46const RECENT_DURATION: Duration = Duration::from_secs(300);
47
48/// [`Event`] represents various types of events that can be processed by the region flush ticker.
49///
50/// Variants:
51/// - `Tick`: This event is used to trigger region flush trigger periodically.
52pub enum Event {
53    Tick,
54}
55
56pub(crate) type RegionFlushTickerRef = Arc<RegionFlushTicker>;
57
58define_ticker!(
59    /// [RegionFlushTicker] is used to trigger region flush trigger periodically.
60    RegionFlushTicker,
61    event_type = Event,
62    event_value = Event::Tick
63);
64
65/// [`RegionFlushTrigger`] is used to ensure that the estimated WAL replay size
66/// stays below a certain threshold by triggering a region flush when the estimated
67/// WAL replay size exceeds that threshold. This helps improve datanode startup
68/// speed and reduce the overall startup time.
69///
70/// The estimated WAL replay size is calculated as:
71/// `(latest_entry_id - flushed_entry_id) * avg_record_size`
72pub struct RegionFlushTrigger {
73    /// The metadata manager.
74    table_metadata_manager: TableMetadataManagerRef,
75    /// The leader region registry.
76    leader_region_registry: LeaderRegionRegistryRef,
77    /// The topic stats registry.
78    topic_stats_registry: TopicStatsRegistryRef,
79    /// The mailbox to send messages.
80    mailbox: MailboxRef,
81    /// The server address.
82    server_addr: String,
83    /// The flush trigger size.
84    flush_trigger_size: ReadableSize,
85    /// The checkpoint trigger size.
86    checkpoint_trigger_size: ReadableSize,
87    /// The receiver of events.
88    receiver: Receiver<Event>,
89}
90
91impl RegionFlushTrigger {
92    /// Creates a new [`RegionFlushTrigger`].
93    pub(crate) fn new(
94        table_metadata_manager: TableMetadataManagerRef,
95        leader_region_registry: LeaderRegionRegistryRef,
96        topic_stats_registry: TopicStatsRegistryRef,
97        mailbox: MailboxRef,
98        server_addr: String,
99        mut flush_trigger_size: ReadableSize,
100        mut checkpoint_trigger_size: ReadableSize,
101    ) -> (Self, RegionFlushTicker) {
102        if flush_trigger_size.as_bytes() == 0 {
103            flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
104            warn!(
105                "flush_trigger_size is not set, using default value: {}",
106                flush_trigger_size
107            );
108        }
109        if checkpoint_trigger_size.as_bytes() == 0 {
110            checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE;
111            warn!(
112                "checkpoint_trigger_size is not set, using default value: {}",
113                checkpoint_trigger_size
114            );
115        }
116        let (tx, rx) = Self::channel();
117        let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx);
118        let region_flush_trigger = Self {
119            table_metadata_manager,
120            leader_region_registry,
121            topic_stats_registry,
122            mailbox,
123            server_addr,
124            flush_trigger_size,
125            checkpoint_trigger_size,
126            receiver: rx,
127        };
128        (region_flush_trigger, region_flush_ticker)
129    }
130
131    fn channel() -> (Sender<Event>, Receiver<Event>) {
132        tokio::sync::mpsc::channel(8)
133    }
134
135    /// Starts the region flush trigger.
136    pub fn try_start(mut self) -> Result<()> {
137        common_runtime::spawn_global(async move { self.run().await });
138        info!("Region flush trigger started");
139        Ok(())
140    }
141
142    async fn run(&mut self) {
143        while let Some(event) = self.receiver.recv().await {
144            match event {
145                Event::Tick => self.handle_tick().await,
146            }
147        }
148    }
149
150    async fn handle_tick(&self) {
151        if let Err(e) = self.trigger_flush().await {
152            error!(e; "Failed to trigger flush");
153        }
154    }
155
156    async fn trigger_flush(&self) -> Result<()> {
157        let now = Instant::now();
158        let topics = self
159            .table_metadata_manager
160            .topic_name_manager()
161            .range()
162            .await
163            .context(error::TableMetadataManagerSnafu)?;
164
165        for topic in &topics {
166            let Some((latest_entry_id, avg_record_size)) = self.retrieve_topic_stat(topic) else {
167                continue;
168            };
169            if let Err(e) = self
170                .flush_regions_in_topic(topic, latest_entry_id, avg_record_size)
171                .await
172            {
173                error!(e; "Failed to flush regions in topic: {}", topic);
174            }
175        }
176
177        debug!(
178            "Triggered flush for {} topics in {:?}",
179            topics.len(),
180            now.elapsed()
181        );
182        Ok(())
183    }
184
185    /// Retrieves the latest entry id and average record size of a topic.
186    ///
187    /// Returns `None` if the topic is not found or the latest entry id is not recent.
188    fn retrieve_topic_stat(&self, topic: &str) -> Option<(u64, usize)> {
189        let Some((latest_entry_id, timestamp)) =
190            self.topic_stats_registry.get_latest_entry_id(topic)
191        else {
192            debug!("No latest entry id found for topic: {}", topic);
193            return None;
194        };
195
196        let Some(stat) = self
197            .topic_stats_registry
198            .get_calculated_topic_stat(topic, TICKER_INTERVAL)
199        else {
200            debug!("No topic stat found for topic: {}", topic);
201            return None;
202        };
203
204        let now = current_time_millis();
205        if !is_recent(timestamp, now, RECENT_DURATION) {
206            debug!(
207                "Latest entry id of topic '{}': is not recent (now: {}, stat timestamp: {})",
208                topic, timestamp, now
209            );
210            return None;
211        }
212        if !is_recent(stat.end_ts, now, RECENT_DURATION) {
213            debug!(
214                "Calculated stat of topic '{}': is not recent (now: {}, stat timestamp: {})",
215                topic, stat.end_ts, now
216            );
217            return None;
218        }
219
220        Some((latest_entry_id, stat.avg_record_size))
221    }
222
223    async fn persist_region_checkpoints(
224        &self,
225        topic: &str,
226        region_ids: &[RegionId],
227        topic_regions: &HashMap<RegionId, TopicRegionValue>,
228        leader_regions: &HashMap<RegionId, LeaderRegion>,
229    ) -> Result<()> {
230        let regions = region_ids
231            .iter()
232            .flat_map(|region_id| match leader_regions.get(region_id) {
233                Some(leader_region) => should_persist_region_checkpoint(
234                    leader_region,
235                    topic_regions
236                        .get(region_id)
237                        .cloned()
238                        .and_then(|value| value.checkpoint),
239                )
240                .map(|checkpoint| {
241                    (
242                        TopicRegionKey::new(*region_id, topic),
243                        Some(TopicRegionValue::new(Some(checkpoint))),
244                    )
245                }),
246                None => None,
247            })
248            .collect::<Vec<_>>();
249
250        // The`chunks` will panic if chunks_size is zero, so we return early if there are no regions to persist.
251        if regions.is_empty() {
252            return Ok(());
253        }
254
255        let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
256        let batch_size = max_txn_ops.min(regions.len());
257        for batch in regions.chunks(batch_size) {
258            self.table_metadata_manager
259                .topic_region_manager()
260                .batch_put(batch)
261                .await
262                .context(error::TableMetadataManagerSnafu)?;
263        }
264
265        metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL
266            .with_label_values(&[topic])
267            .inc_by(regions.len() as u64);
268
269        Ok(())
270    }
271
272    async fn flush_regions_in_topic(
273        &self,
274        topic: &str,
275        latest_entry_id: u64,
276        avg_record_size: usize,
277    ) -> Result<()> {
278        let topic_regions = self
279            .table_metadata_manager
280            .topic_region_manager()
281            .regions(topic)
282            .await
283            .context(error::TableMetadataManagerSnafu)?;
284
285        if topic_regions.is_empty() {
286            debug!("No regions found for topic: {}", topic);
287            return Ok(());
288        }
289
290        // Filters regions need to persist checkpoints.
291        let regions_to_persist = filter_regions_by_replay_size(
292            topic,
293            topic_regions
294                .iter()
295                .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
296            avg_record_size as u64,
297            latest_entry_id,
298            self.checkpoint_trigger_size,
299        );
300        let region_manifests = self
301            .leader_region_registry
302            .batch_get(topic_regions.keys().cloned());
303
304        if let Err(err) = self
305            .persist_region_checkpoints(
306                topic,
307                &regions_to_persist,
308                &topic_regions,
309                &region_manifests,
310            )
311            .await
312        {
313            error!(err; "Failed to persist region checkpoints for topic: {}", topic);
314        }
315
316        let regions = region_manifests
317            .into_iter()
318            .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
319            .collect::<Vec<_>>();
320        let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
321        if let Some((_, min_entry_id)) = min_entry_id {
322            let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
323                .saturating_mul(avg_record_size as u64);
324            metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE
325                .with_label_values(&[topic])
326                .set(replay_size as i64);
327        }
328
329        // Selects regions to flush from the set of active regions.
330        let regions_to_flush = filter_regions_by_replay_size(
331            topic,
332            regions.into_iter(),
333            avg_record_size as u64,
334            latest_entry_id,
335            self.flush_trigger_size,
336        );
337
338        // Sends flush instructions to datanodes.
339        if !regions_to_flush.is_empty() {
340            self.send_flush_instructions(&regions_to_flush).await?;
341            debug!(
342                "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
343                regions_to_flush.len(),
344                topic,
345                regions_to_flush,
346            );
347        }
348
349        metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
350            .with_label_values(&[topic])
351            .inc_by(regions_to_flush.len() as u64);
352
353        Ok(())
354    }
355
356    async fn send_flush_instructions(&self, regions_to_flush: &[RegionId]) -> Result<()> {
357        let leader_to_region_ids =
358            group_regions_by_leader(&self.table_metadata_manager, regions_to_flush).await?;
359        let flush_instructions = leader_to_region_ids
360            .into_iter()
361            .map(|(leader, region_ids)| {
362                let flush_instruction = Instruction::FlushRegions(
363                    FlushRegions::async_batch(region_ids)
364                        .with_reason(RegionFlushReason::RemoteWalPrune),
365                );
366                (leader, flush_instruction)
367            });
368
369        let tracing_ctx = TracingContext::from_current_span();
370        let tracing_header = tracing_ctx.to_w3c();
371        for (peer, flush_instruction) in flush_instructions {
372            let msg = MailboxMessage::json_message(
373                &format!("Flush regions: {}", flush_instruction),
374                &format!("Metasrv@{}", self.server_addr),
375                &format!("Datanode-{}@{}", peer.id, peer.addr),
376                common_time::util::current_time_millis(),
377                &flush_instruction,
378                Some(tracing_header.clone()),
379            )
380            .with_context(|_| error::SerializeToJsonSnafu {
381                input: flush_instruction.to_string(),
382            })?;
383            if let Err(e) = self
384                .mailbox
385                .send_oneway(&Channel::Datanode(peer.id), msg)
386                .await
387            {
388                error!(e; "Failed to send flush instruction to datanode {}", peer);
389            }
390        }
391
392        Ok(())
393    }
394}
395
396/// Determines whether a region checkpoint should be persisted based on current and persisted state.
397fn should_persist_region_checkpoint(
398    current: &LeaderRegion,
399    persisted: Option<ReplayCheckpoint>,
400) -> Option<ReplayCheckpoint> {
401    let new_checkpoint = ReplayCheckpoint::new(
402        current.manifest.replay_entry_id(),
403        current.manifest.metadata_replay_entry_id(),
404    );
405
406    let Some(persisted) = persisted else {
407        return Some(new_checkpoint);
408    };
409
410    if new_checkpoint > persisted {
411        return Some(new_checkpoint);
412    }
413    None
414}
415
416/// Filter regions based on the estimated replay size.
417///
418/// Returns the regions if its estimated replay size exceeds the given threshold.
419/// The estimated replay size is calculated as:
420/// `(latest_entry_id - prunable_entry_id) * avg_record_size`
421fn filter_regions_by_replay_size<I: Iterator<Item = (RegionId, u64)>>(
422    topic: &str,
423    regions: I,
424    avg_record_size: u64,
425    latest_entry_id: u64,
426    threshold: ReadableSize,
427) -> Vec<RegionId> {
428    let mut regions_to_flush = Vec::new();
429    for (region_id, entry_id) in regions {
430        if entry_id < latest_entry_id {
431            let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size);
432            if replay_size > threshold.as_bytes() {
433                debug!(
434                    "Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'",
435                    region_id,
436                    ReadableSize(replay_size),
437                    threshold,
438                    entry_id,
439                    latest_entry_id,
440                    topic
441                );
442                regions_to_flush.push(region_id);
443            }
444        }
445    }
446
447    regions_to_flush
448}
449
450/// Group regions by leader.
451///
452/// The regions are grouped by the leader of the region.
453async fn group_regions_by_leader(
454    table_metadata_manager: &TableMetadataManagerRef,
455    regions_to_flush: &[RegionId],
456) -> Result<HashMap<Peer, Vec<RegionId>>> {
457    let table_ids = regions_to_flush
458        .iter()
459        .map(|region_id| region_id.table_id())
460        .collect::<HashSet<_>>()
461        .into_iter()
462        .collect::<Vec<_>>();
463    let table_ids_table_routes = table_metadata_manager
464        .table_route_manager()
465        .batch_get_physical_table_routes(&table_ids)
466        .await
467        .context(error::TableMetadataManagerSnafu)?;
468
469    let mut peer_region_ids_map: HashMap<Peer, Vec<RegionId>> = HashMap::new();
470    for region_id in regions_to_flush {
471        let table_id = region_id.table_id();
472        let table_route = table_ids_table_routes
473            .get(&table_id)
474            .context(error::TableRouteNotFoundSnafu { table_id })?;
475        let Some(region_route) = table_route
476            .region_routes
477            .iter()
478            .find(|r| r.region.id == *region_id)
479        else {
480            continue;
481        };
482        let Some(peer) = &region_route.leader_peer else {
483            continue;
484        };
485
486        match peer_region_ids_map.get_mut(peer) {
487            Some(region_ids) => {
488                region_ids.push(*region_id);
489            }
490            None => {
491                peer_region_ids_map.insert(peer.clone(), vec![*region_id]);
492            }
493        }
494    }
495    Ok(peer_region_ids_map)
496}
497
498/// Check if the timestamp is recent.
499///
500/// The timestamp is recent if the difference between the current time and the timestamp is less than the duration.
501fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
502    let duration = duration.as_millis() as i64;
503    now.saturating_sub(timestamp) < duration
504}
505
506#[cfg(test)]
507mod tests {
508    use std::sync::Arc;
509
510    use common_base::readable_size::ReadableSize;
511    use common_meta::instruction::FlushStrategy;
512    use common_meta::key::TableMetadataManager;
513    use common_meta::kv_backend::memory::MemoryKvBackend;
514    use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry};
515    use common_meta::sequence::SequenceBuilder;
516    use common_meta::stats::topic::TopicStatsRegistry;
517    use store_api::storage::RegionId;
518
519    use super::*;
520    use crate::handler::HeartbeatMailbox;
521    use crate::procedure::test_util::{MailboxContext, new_wal_prune_metadata};
522
523    #[test]
524    fn test_is_recent() {
525        let now = current_time_millis();
526        assert!(is_recent(now - 999, now, Duration::from_secs(1)));
527        assert!(!is_recent(now - 1001, now, Duration::from_secs(1)));
528    }
529
530    fn region_id(table: u32, region: u32) -> RegionId {
531        RegionId::new(table, region)
532    }
533
534    #[test]
535    fn test_no_regions_to_flush_when_none_exceed_threshold() {
536        let topic = "test_topic";
537        let avg_record_size = 10;
538        let latest_entry_id = 100;
539        let flush_trigger_size = ReadableSize(1000); // 1000 bytes
540
541        // All regions have prunable_entry_id close to latest_entry_id, so replay_size is small
542        let regions = vec![
543            (region_id(1, 1), 99), // replay_size = (100-99)*10 = 10
544            (region_id(1, 2), 98), // replay_size = 20
545            (region_id(1, 3), 95), // replay_size = 50
546        ];
547
548        let result = filter_regions_by_replay_size(
549            topic,
550            regions.into_iter(),
551            avg_record_size,
552            latest_entry_id,
553            flush_trigger_size,
554        );
555        assert!(result.is_empty());
556    }
557
558    #[test]
559    fn test_regions_to_flush_when_some_exceed_threshold() {
560        let topic = "test_topic";
561        let avg_record_size = 10;
562        let latest_entry_id = 100;
563        let flush_trigger_size = ReadableSize(50); // 50 bytes
564
565        // Only region 1,3 will exceed threshold: (100-90)*10 = 100 > 50
566        let regions = vec![
567            (region_id(1, 1), 99), // replay_size = 10
568            (region_id(1, 2), 98), // replay_size = 20
569            (region_id(1, 3), 90), // replay_size = 100
570        ];
571
572        let result = filter_regions_by_replay_size(
573            topic,
574            regions.into_iter(),
575            avg_record_size,
576            latest_entry_id,
577            flush_trigger_size,
578        );
579        assert_eq!(result, vec![region_id(1, 3)]);
580    }
581
582    #[test]
583    fn test_regions_to_flush_with_zero_avg_record_size() {
584        let topic = "test_topic";
585        let avg_record_size = 0;
586        let latest_entry_id = 100;
587        let flush_trigger_size = ReadableSize(1);
588
589        let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
590
591        // replay_size will always be 0, so none should be flushed
592        let result = filter_regions_by_replay_size(
593            topic,
594            regions.into_iter(),
595            avg_record_size,
596            latest_entry_id,
597            flush_trigger_size,
598        );
599        assert!(result.is_empty());
600    }
601
602    #[test]
603    fn test_regions_to_flush_with_prunable_entry_id_equal_latest() {
604        let topic = "test_topic";
605        let avg_record_size = 10;
606        let latest_entry_id = 100;
607        let flush_trigger_size = ReadableSize(10);
608
609        let regions = vec![
610            (region_id(1, 1), 100), // prunable_entry_id == latest_entry_id, should not be flushed
611            (region_id(1, 2), 99),  // replay_size = 10
612        ];
613
614        let result = filter_regions_by_replay_size(
615            topic,
616            regions.into_iter(),
617            avg_record_size,
618            latest_entry_id,
619            flush_trigger_size,
620        );
621        // Only region 1,2 should be flushed if replay_size > 10
622        assert!(result.is_empty());
623    }
624
625    #[test]
626    fn test_multiple_regions_to_flush() {
627        let topic = "test_topic";
628        let avg_record_size = 5;
629        let latest_entry_id = 200;
630        let flush_trigger_size = ReadableSize(20);
631
632        let regions = vec![
633            (region_id(1, 1), 190), // replay_size = 50
634            (region_id(1, 2), 180), // replay_size = 100
635            (region_id(1, 3), 199), // replay_size = 5
636            (region_id(1, 4), 200), // replay_size = 0
637        ];
638
639        let result = filter_regions_by_replay_size(
640            topic,
641            regions.into_iter(),
642            avg_record_size,
643            latest_entry_id,
644            flush_trigger_size,
645        );
646        // Only regions 1,1 and 1,2 should be flushed
647        assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
648    }
649
650    fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
651        LeaderRegion {
652            datanode_id: 1,
653            manifest: LeaderRegionManifestInfo::Metric {
654                data_manifest_version: 1,
655                data_flushed_entry_id: replay_entry_id,
656                data_topic_latest_entry_id: 0,
657                metadata_manifest_version: 1,
658                metadata_flushed_entry_id: metadata_replay_entry_id,
659                metadata_topic_latest_entry_id: 0,
660            },
661        }
662    }
663
664    fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
665        LeaderRegion {
666            datanode_id: 1,
667            manifest: LeaderRegionManifestInfo::Mito {
668                manifest_version: 1,
669                flushed_entry_id: replay_entry_id,
670                topic_latest_entry_id: 0,
671            },
672        }
673    }
674
675    #[test]
676    fn test_should_persist_region_checkpoint() {
677        // `persisted` is none
678        let current = metric_leader_region(100, 10);
679        let result = should_persist_region_checkpoint(&current, None).unwrap();
680        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
681
682        // `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
683        let current = mito_leader_region(100);
684        let result =
685            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, None)))
686                .unwrap();
687        assert_eq!(result, ReplayCheckpoint::new(100, None));
688
689        let current = metric_leader_region(100, 10);
690        let result =
691            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))))
692                .unwrap();
693        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
694
695        // `persisted.metadata_entry_id` is less than `current.manifest.metadata_replay_entry_id()`
696        let current = metric_leader_region(100, 10);
697        let result =
698            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
699                .unwrap();
700        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
701
702        // `persisted.metadata_entry_id` is none
703        let current = metric_leader_region(100, 10);
704        let result =
705            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)))
706                .unwrap();
707        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
708
709        // `current.manifest.metadata_replay_entry_id()` is none
710        let current = mito_leader_region(100);
711        let result =
712            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
713                .is_none();
714        assert!(result);
715
716        // `persisted.entry_id` is equal to `current.manifest.replay_entry_id()`
717        let current = metric_leader_region(100, 10);
718        let result =
719            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(10))));
720        assert!(result.is_none());
721        let current = mito_leader_region(100);
722        let result =
723            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)));
724        assert!(result.is_none());
725
726        // `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
727        // `persisted.metadata_entry_id` is greater than `current.manifest.metadata_replay_entry_id()`
728        let current = metric_leader_region(80, 11);
729        let result =
730            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
731        assert!(result.is_none());
732        let current = mito_leader_region(80);
733        let result =
734            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
735        assert!(result.is_none());
736    }
737
738    #[tokio::test]
739    async fn test_send_flush_instructions_payload_includes_remote_wal_prune_reason() {
740        let kv_backend = Arc::new(MemoryKvBackend::new());
741        let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
742        let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
743        let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
744        let mailbox_sequence =
745            SequenceBuilder::new("test_remote_wal_prune_flush_reason", kv_backend).build();
746        let mut mailbox_ctx = MailboxContext::new(mailbox_sequence);
747
748        let (tx, mut rx) = tokio::sync::mpsc::channel(1);
749        mailbox_ctx
750            .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
751            .await;
752
753        let (trigger, _ticker) = RegionFlushTrigger::new(
754            table_metadata_manager.clone(),
755            leader_region_registry.clone(),
756            topic_stats_registry,
757            mailbox_ctx.mailbox().clone(),
758            "127.0.0.1:3002".to_string(),
759            ReadableSize(1),
760            ReadableSize(1),
761        );
762
763        let topic = "test_topic".to_string();
764        new_wal_prune_metadata(
765            table_metadata_manager,
766            leader_region_registry,
767            1,
768            1,
769            &[0],
770            topic,
771        )
772        .await;
773
774        let region_id = RegionId::new(0, 0);
775        trigger.send_flush_instructions(&[region_id]).await.unwrap();
776
777        let response = rx.recv().await.unwrap().unwrap();
778        let msg = response.mailbox_message.unwrap();
779        let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
780        let Instruction::FlushRegions(flush_regions) = instruction else {
781            panic!("Expected FlushRegions instruction");
782        };
783
784        assert_eq!(flush_regions.region_ids, vec![region_id]);
785        assert_eq!(flush_regions.strategy, FlushStrategy::Async);
786        assert_eq!(
787            flush_regions.reason,
788            Some(RegionFlushReason::RemoteWalPrune)
789        );
790    }
791}