meta_srv/region/
flush_trigger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_base::readable_size::ReadableSize;
21use common_meta::instruction::{FlushRegions, Instruction};
22use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::peer::Peer;
25use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef};
26use common_meta::stats::topic::TopicStatsRegistryRef;
27use common_telemetry::{debug, error, info, warn};
28use common_time::util::current_time_millis;
29use common_wal::config::kafka::common::{
30    DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
31};
32use snafu::{OptionExt, ResultExt};
33use store_api::storage::RegionId;
34use tokio::sync::mpsc::{Receiver, Sender};
35
36use crate::error::{self, Result};
37use crate::service::mailbox::{Channel, MailboxRef};
38use crate::{define_ticker, metrics};
39
40/// The interval of the region flush ticker.
41const TICKER_INTERVAL: Duration = Duration::from_secs(60);
42
43/// The duration of the recent period.
44const RECENT_DURATION: Duration = Duration::from_secs(300);
45
46/// [`Event`] represents various types of events that can be processed by the region flush ticker.
47///
48/// Variants:
49/// - `Tick`: This event is used to trigger region flush trigger periodically.
50pub(crate) enum Event {
51    Tick,
52}
53
54pub(crate) type RegionFlushTickerRef = Arc<RegionFlushTicker>;
55
56define_ticker!(
57    /// [RegionFlushTicker] is used to trigger region flush trigger periodically.
58    RegionFlushTicker,
59    event_type = Event,
60    event_value = Event::Tick
61);
62
63/// [`RegionFlushTrigger`] is used to ensure that the estimated WAL replay size
64/// stays below a certain threshold by triggering a region flush when the estimated
65/// WAL replay size exceeds that threshold. This helps improve datanode startup
66/// speed and reduce the overall startup time.
67///
68/// The estimated WAL replay size is calculated as:
69/// `(latest_entry_id - flushed_entry_id) * avg_record_size`
70pub struct RegionFlushTrigger {
71    /// The metadata manager.
72    table_metadata_manager: TableMetadataManagerRef,
73    /// The leader region registry.
74    leader_region_registry: LeaderRegionRegistryRef,
75    /// The topic stats registry.
76    topic_stats_registry: TopicStatsRegistryRef,
77    /// The mailbox to send messages.
78    mailbox: MailboxRef,
79    /// The server address.
80    server_addr: String,
81    /// The flush trigger size.
82    flush_trigger_size: ReadableSize,
83    /// The checkpoint trigger size.
84    checkpoint_trigger_size: ReadableSize,
85    /// The receiver of events.
86    receiver: Receiver<Event>,
87}
88
89impl RegionFlushTrigger {
90    /// Creates a new [`RegionFlushTrigger`].
91    pub(crate) fn new(
92        table_metadata_manager: TableMetadataManagerRef,
93        leader_region_registry: LeaderRegionRegistryRef,
94        topic_stats_registry: TopicStatsRegistryRef,
95        mailbox: MailboxRef,
96        server_addr: String,
97        mut flush_trigger_size: ReadableSize,
98        mut checkpoint_trigger_size: ReadableSize,
99    ) -> (Self, RegionFlushTicker) {
100        if flush_trigger_size.as_bytes() == 0 {
101            flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
102            warn!(
103                "flush_trigger_size is not set, using default value: {}",
104                flush_trigger_size
105            );
106        }
107        if checkpoint_trigger_size.as_bytes() == 0 {
108            checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE;
109            warn!(
110                "checkpoint_trigger_size is not set, using default value: {}",
111                checkpoint_trigger_size
112            );
113        }
114        let (tx, rx) = Self::channel();
115        let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx);
116        let region_flush_trigger = Self {
117            table_metadata_manager,
118            leader_region_registry,
119            topic_stats_registry,
120            mailbox,
121            server_addr,
122            flush_trigger_size,
123            checkpoint_trigger_size,
124            receiver: rx,
125        };
126        (region_flush_trigger, region_flush_ticker)
127    }
128
129    fn channel() -> (Sender<Event>, Receiver<Event>) {
130        tokio::sync::mpsc::channel(8)
131    }
132
133    /// Starts the region flush trigger.
134    pub fn try_start(mut self) -> Result<()> {
135        common_runtime::spawn_global(async move { self.run().await });
136        info!("Region flush trigger started");
137        Ok(())
138    }
139
140    async fn run(&mut self) {
141        while let Some(event) = self.receiver.recv().await {
142            match event {
143                Event::Tick => self.handle_tick().await,
144            }
145        }
146    }
147
148    async fn handle_tick(&self) {
149        if let Err(e) = self.trigger_flush().await {
150            error!(e; "Failed to trigger flush");
151        }
152    }
153
154    async fn trigger_flush(&self) -> Result<()> {
155        let now = Instant::now();
156        let topics = self
157            .table_metadata_manager
158            .topic_name_manager()
159            .range()
160            .await
161            .context(error::TableMetadataManagerSnafu)?;
162
163        for topic in &topics {
164            let Some((latest_entry_id, avg_record_size)) = self.retrieve_topic_stat(topic) else {
165                continue;
166            };
167            if let Err(e) = self
168                .flush_regions_in_topic(topic, latest_entry_id, avg_record_size)
169                .await
170            {
171                error!(e; "Failed to flush regions in topic: {}", topic);
172            }
173        }
174
175        debug!(
176            "Triggered flush for {} topics in {:?}",
177            topics.len(),
178            now.elapsed()
179        );
180        Ok(())
181    }
182
183    /// Retrieves the latest entry id and average record size of a topic.
184    ///
185    /// Returns `None` if the topic is not found or the latest entry id is not recent.
186    fn retrieve_topic_stat(&self, topic: &str) -> Option<(u64, usize)> {
187        let Some((latest_entry_id, timestamp)) =
188            self.topic_stats_registry.get_latest_entry_id(topic)
189        else {
190            debug!("No latest entry id found for topic: {}", topic);
191            return None;
192        };
193
194        let Some(stat) = self
195            .topic_stats_registry
196            .get_calculated_topic_stat(topic, TICKER_INTERVAL)
197        else {
198            debug!("No topic stat found for topic: {}", topic);
199            return None;
200        };
201
202        let now = current_time_millis();
203        if !is_recent(timestamp, now, RECENT_DURATION) {
204            debug!(
205                "Latest entry id of topic '{}': is not recent (now: {}, stat timestamp: {})",
206                topic, timestamp, now
207            );
208            return None;
209        }
210        if !is_recent(stat.end_ts, now, RECENT_DURATION) {
211            debug!(
212                "Calculated stat of topic '{}': is not recent (now: {}, stat timestamp: {})",
213                topic, stat.end_ts, now
214            );
215            return None;
216        }
217
218        Some((latest_entry_id, stat.avg_record_size))
219    }
220
221    async fn persist_region_checkpoints(
222        &self,
223        topic: &str,
224        region_ids: &[RegionId],
225        topic_regions: &HashMap<RegionId, TopicRegionValue>,
226        leader_regions: &HashMap<RegionId, LeaderRegion>,
227    ) -> Result<()> {
228        let regions = region_ids
229            .iter()
230            .flat_map(|region_id| match leader_regions.get(region_id) {
231                Some(leader_region) => should_persist_region_checkpoint(
232                    leader_region,
233                    topic_regions
234                        .get(region_id)
235                        .cloned()
236                        .and_then(|value| value.checkpoint),
237                )
238                .map(|checkpoint| {
239                    (
240                        TopicRegionKey::new(*region_id, topic),
241                        Some(TopicRegionValue::new(Some(checkpoint))),
242                    )
243                }),
244                None => None,
245            })
246            .collect::<Vec<_>>();
247
248        // The`chunks` will panic if chunks_size is zero, so we return early if there are no regions to persist.
249        if regions.is_empty() {
250            return Ok(());
251        }
252
253        let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
254        let batch_size = max_txn_ops.min(regions.len());
255        for batch in regions.chunks(batch_size) {
256            self.table_metadata_manager
257                .topic_region_manager()
258                .batch_put(batch)
259                .await
260                .context(error::TableMetadataManagerSnafu)?;
261        }
262
263        metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL
264            .with_label_values(&[topic])
265            .inc_by(regions.len() as u64);
266
267        Ok(())
268    }
269
270    async fn flush_regions_in_topic(
271        &self,
272        topic: &str,
273        latest_entry_id: u64,
274        avg_record_size: usize,
275    ) -> Result<()> {
276        let topic_regions = self
277            .table_metadata_manager
278            .topic_region_manager()
279            .regions(topic)
280            .await
281            .context(error::TableMetadataManagerSnafu)?;
282
283        if topic_regions.is_empty() {
284            debug!("No regions found for topic: {}", topic);
285            return Ok(());
286        }
287
288        // Filters regions need to persist checkpoints.
289        let regions_to_persist = filter_regions_by_replay_size(
290            topic,
291            topic_regions
292                .iter()
293                .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
294            avg_record_size as u64,
295            latest_entry_id,
296            self.checkpoint_trigger_size,
297        );
298        let region_manifests = self
299            .leader_region_registry
300            .batch_get(topic_regions.keys().cloned());
301
302        if let Err(err) = self
303            .persist_region_checkpoints(
304                topic,
305                &regions_to_persist,
306                &topic_regions,
307                &region_manifests,
308            )
309            .await
310        {
311            error!(err; "Failed to persist region checkpoints for topic: {}", topic);
312        }
313
314        let regions = region_manifests
315            .into_iter()
316            .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
317            .collect::<Vec<_>>();
318        let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
319        if let Some((_, min_entry_id)) = min_entry_id {
320            let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
321                .saturating_mul(avg_record_size as u64);
322            metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE
323                .with_label_values(&[topic])
324                .set(replay_size as i64);
325        }
326
327        // Selects regions to flush from the set of active regions.
328        let regions_to_flush = filter_regions_by_replay_size(
329            topic,
330            regions.into_iter(),
331            avg_record_size as u64,
332            latest_entry_id,
333            self.flush_trigger_size,
334        );
335
336        // Sends flush instructions to datanodes.
337        if !regions_to_flush.is_empty() {
338            self.send_flush_instructions(&regions_to_flush).await?;
339            debug!(
340                "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
341                regions_to_flush.len(),
342                topic,
343                regions_to_flush,
344            );
345        }
346
347        metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
348            .with_label_values(&[topic])
349            .inc_by(regions_to_flush.len() as u64);
350
351        Ok(())
352    }
353
354    async fn send_flush_instructions(&self, regions_to_flush: &[RegionId]) -> Result<()> {
355        let leader_to_region_ids =
356            group_regions_by_leader(&self.table_metadata_manager, regions_to_flush).await?;
357        let flush_instructions = leader_to_region_ids
358            .into_iter()
359            .map(|(leader, region_ids)| {
360                let flush_instruction =
361                    Instruction::FlushRegions(FlushRegions::async_batch(region_ids));
362                (leader, flush_instruction)
363            });
364
365        for (peer, flush_instruction) in flush_instructions {
366            let msg = MailboxMessage::json_message(
367                &format!("Flush regions: {}", flush_instruction),
368                &format!("Metasrv@{}", self.server_addr),
369                &format!("Datanode-{}@{}", peer.id, peer.addr),
370                common_time::util::current_time_millis(),
371                &flush_instruction,
372            )
373            .with_context(|_| error::SerializeToJsonSnafu {
374                input: flush_instruction.to_string(),
375            })?;
376            if let Err(e) = self
377                .mailbox
378                .send_oneway(&Channel::Datanode(peer.id), msg)
379                .await
380            {
381                error!(e; "Failed to send flush instruction to datanode {}", peer);
382            }
383        }
384
385        Ok(())
386    }
387}
388
389/// Determines whether a region checkpoint should be persisted based on current and persisted state.
390fn should_persist_region_checkpoint(
391    current: &LeaderRegion,
392    persisted: Option<ReplayCheckpoint>,
393) -> Option<ReplayCheckpoint> {
394    let new_checkpoint = ReplayCheckpoint::new(
395        current.manifest.replay_entry_id(),
396        current.manifest.metadata_replay_entry_id(),
397    );
398
399    let Some(persisted) = persisted else {
400        return Some(new_checkpoint);
401    };
402
403    if new_checkpoint > persisted {
404        return Some(new_checkpoint);
405    }
406    None
407}
408
409/// Filter regions based on the estimated replay size.
410///
411/// Returns the regions if its estimated replay size exceeds the given threshold.
412/// The estimated replay size is calculated as:
413/// `(latest_entry_id - prunable_entry_id) * avg_record_size`
414fn filter_regions_by_replay_size<I: Iterator<Item = (RegionId, u64)>>(
415    topic: &str,
416    regions: I,
417    avg_record_size: u64,
418    latest_entry_id: u64,
419    threshold: ReadableSize,
420) -> Vec<RegionId> {
421    let mut regions_to_flush = Vec::new();
422    for (region_id, entry_id) in regions {
423        if entry_id < latest_entry_id {
424            let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size);
425            if replay_size > threshold.as_bytes() {
426                debug!(
427                    "Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'",
428                    region_id, ReadableSize(replay_size), threshold, entry_id, latest_entry_id, topic
429                );
430                regions_to_flush.push(region_id);
431            }
432        }
433    }
434
435    regions_to_flush
436}
437
438/// Group regions by leader.
439///
440/// The regions are grouped by the leader of the region.
441async fn group_regions_by_leader(
442    table_metadata_manager: &TableMetadataManagerRef,
443    regions_to_flush: &[RegionId],
444) -> Result<HashMap<Peer, Vec<RegionId>>> {
445    let table_ids = regions_to_flush
446        .iter()
447        .map(|region_id| region_id.table_id())
448        .collect::<HashSet<_>>()
449        .into_iter()
450        .collect::<Vec<_>>();
451    let table_ids_table_routes = table_metadata_manager
452        .table_route_manager()
453        .batch_get_physical_table_routes(&table_ids)
454        .await
455        .context(error::TableMetadataManagerSnafu)?;
456
457    let mut peer_region_ids_map: HashMap<Peer, Vec<RegionId>> = HashMap::new();
458    for region_id in regions_to_flush {
459        let table_id = region_id.table_id();
460        let table_route = table_ids_table_routes
461            .get(&table_id)
462            .context(error::TableRouteNotFoundSnafu { table_id })?;
463        let Some(region_route) = table_route
464            .region_routes
465            .iter()
466            .find(|r| r.region.id == *region_id)
467        else {
468            continue;
469        };
470        let Some(peer) = &region_route.leader_peer else {
471            continue;
472        };
473
474        match peer_region_ids_map.get_mut(peer) {
475            Some(region_ids) => {
476                region_ids.push(*region_id);
477            }
478            None => {
479                peer_region_ids_map.insert(peer.clone(), vec![*region_id]);
480            }
481        }
482    }
483    Ok(peer_region_ids_map)
484}
485
486/// Check if the timestamp is recent.
487///
488/// The timestamp is recent if the difference between the current time and the timestamp is less than the duration.
489fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
490    let duration = duration.as_millis() as i64;
491    now.saturating_sub(timestamp) < duration
492}
493
494#[cfg(test)]
495mod tests {
496    use common_base::readable_size::ReadableSize;
497    use common_meta::region_registry::LeaderRegionManifestInfo;
498    use store_api::storage::RegionId;
499
500    use super::*;
501
502    #[test]
503    fn test_is_recent() {
504        let now = current_time_millis();
505        assert!(is_recent(now - 999, now, Duration::from_secs(1)));
506        assert!(!is_recent(now - 1001, now, Duration::from_secs(1)));
507    }
508
509    fn region_id(table: u32, region: u32) -> RegionId {
510        RegionId::new(table, region)
511    }
512
513    #[test]
514    fn test_no_regions_to_flush_when_none_exceed_threshold() {
515        let topic = "test_topic";
516        let avg_record_size = 10;
517        let latest_entry_id = 100;
518        let flush_trigger_size = ReadableSize(1000); // 1000 bytes
519
520        // All regions have prunable_entry_id close to latest_entry_id, so replay_size is small
521        let regions = vec![
522            (region_id(1, 1), 99), // replay_size = (100-99)*10 = 10
523            (region_id(1, 2), 98), // replay_size = 20
524            (region_id(1, 3), 95), // replay_size = 50
525        ];
526
527        let result = filter_regions_by_replay_size(
528            topic,
529            regions.into_iter(),
530            avg_record_size,
531            latest_entry_id,
532            flush_trigger_size,
533        );
534        assert!(result.is_empty());
535    }
536
537    #[test]
538    fn test_regions_to_flush_when_some_exceed_threshold() {
539        let topic = "test_topic";
540        let avg_record_size = 10;
541        let latest_entry_id = 100;
542        let flush_trigger_size = ReadableSize(50); // 50 bytes
543
544        // Only region 1,3 will exceed threshold: (100-90)*10 = 100 > 50
545        let regions = vec![
546            (region_id(1, 1), 99), // replay_size = 10
547            (region_id(1, 2), 98), // replay_size = 20
548            (region_id(1, 3), 90), // replay_size = 100
549        ];
550
551        let result = filter_regions_by_replay_size(
552            topic,
553            regions.into_iter(),
554            avg_record_size,
555            latest_entry_id,
556            flush_trigger_size,
557        );
558        assert_eq!(result, vec![region_id(1, 3)]);
559    }
560
561    #[test]
562    fn test_regions_to_flush_with_zero_avg_record_size() {
563        let topic = "test_topic";
564        let avg_record_size = 0;
565        let latest_entry_id = 100;
566        let flush_trigger_size = ReadableSize(1);
567
568        let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
569
570        // replay_size will always be 0, so none should be flushed
571        let result = filter_regions_by_replay_size(
572            topic,
573            regions.into_iter(),
574            avg_record_size,
575            latest_entry_id,
576            flush_trigger_size,
577        );
578        assert!(result.is_empty());
579    }
580
581    #[test]
582    fn test_regions_to_flush_with_prunable_entry_id_equal_latest() {
583        let topic = "test_topic";
584        let avg_record_size = 10;
585        let latest_entry_id = 100;
586        let flush_trigger_size = ReadableSize(10);
587
588        let regions = vec![
589            (region_id(1, 1), 100), // prunable_entry_id == latest_entry_id, should not be flushed
590            (region_id(1, 2), 99),  // replay_size = 10
591        ];
592
593        let result = filter_regions_by_replay_size(
594            topic,
595            regions.into_iter(),
596            avg_record_size,
597            latest_entry_id,
598            flush_trigger_size,
599        );
600        // Only region 1,2 should be flushed if replay_size > 10
601        assert!(result.is_empty());
602    }
603
604    #[test]
605    fn test_multiple_regions_to_flush() {
606        let topic = "test_topic";
607        let avg_record_size = 5;
608        let latest_entry_id = 200;
609        let flush_trigger_size = ReadableSize(20);
610
611        let regions = vec![
612            (region_id(1, 1), 190), // replay_size = 50
613            (region_id(1, 2), 180), // replay_size = 100
614            (region_id(1, 3), 199), // replay_size = 5
615            (region_id(1, 4), 200), // replay_size = 0
616        ];
617
618        let result = filter_regions_by_replay_size(
619            topic,
620            regions.into_iter(),
621            avg_record_size,
622            latest_entry_id,
623            flush_trigger_size,
624        );
625        // Only regions 1,1 and 1,2 should be flushed
626        assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
627    }
628
629    fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
630        LeaderRegion {
631            datanode_id: 1,
632            manifest: LeaderRegionManifestInfo::Metric {
633                data_manifest_version: 1,
634                data_flushed_entry_id: replay_entry_id,
635                data_topic_latest_entry_id: 0,
636                metadata_manifest_version: 1,
637                metadata_flushed_entry_id: metadata_replay_entry_id,
638                metadata_topic_latest_entry_id: 0,
639            },
640        }
641    }
642
643    fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
644        LeaderRegion {
645            datanode_id: 1,
646            manifest: LeaderRegionManifestInfo::Mito {
647                manifest_version: 1,
648                flushed_entry_id: replay_entry_id,
649                topic_latest_entry_id: 0,
650            },
651        }
652    }
653
654    #[test]
655    fn test_should_persist_region_checkpoint() {
656        // `persisted` is none
657        let current = metric_leader_region(100, 10);
658        let result = should_persist_region_checkpoint(&current, None).unwrap();
659        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
660
661        // `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
662        let current = mito_leader_region(100);
663        let result =
664            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, None)))
665                .unwrap();
666        assert_eq!(result, ReplayCheckpoint::new(100, None));
667
668        let current = metric_leader_region(100, 10);
669        let result =
670            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))))
671                .unwrap();
672        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
673
674        // `persisted.metadata_entry_id` is less than `current.manifest.metadata_replay_entry_id()`
675        let current = metric_leader_region(100, 10);
676        let result =
677            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
678                .unwrap();
679        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
680
681        // `persisted.metadata_entry_id` is none
682        let current = metric_leader_region(100, 10);
683        let result =
684            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)))
685                .unwrap();
686        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
687
688        // `current.manifest.metadata_replay_entry_id()` is none
689        let current = mito_leader_region(100);
690        let result =
691            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
692                .is_none();
693        assert!(result);
694
695        // `persisted.entry_id` is equal to `current.manifest.replay_entry_id()`
696        let current = metric_leader_region(100, 10);
697        let result =
698            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(10))));
699        assert!(result.is_none());
700        let current = mito_leader_region(100);
701        let result =
702            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)));
703        assert!(result.is_none());
704
705        // `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
706        // `persisted.metadata_entry_id` is greater than `current.manifest.metadata_replay_entry_id()`
707        let current = metric_leader_region(80, 11);
708        let result =
709            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
710        assert!(result.is_none());
711        let current = mito_leader_region(80);
712        let result =
713            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
714        assert!(result.is_none());
715    }
716}