meta_srv/region/
flush_trigger.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_base::readable_size::ReadableSize;
21use common_meta::instruction::{FlushRegions, Instruction};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
24use common_meta::peer::Peer;
25use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef};
26use common_meta::stats::topic::TopicStatsRegistryRef;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{debug, error, info, warn};
29use common_time::util::current_time_millis;
30use common_wal::config::kafka::common::{
31    DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
32};
33use snafu::{OptionExt, ResultExt};
34use store_api::storage::RegionId;
35use tokio::sync::mpsc::{Receiver, Sender};
36
37use crate::error::{self, Result};
38use crate::service::mailbox::{Channel, MailboxRef};
39use crate::{define_ticker, metrics};
40
41/// The interval of the region flush ticker.
42const TICKER_INTERVAL: Duration = Duration::from_secs(60);
43
44/// The duration of the recent period.
45const RECENT_DURATION: Duration = Duration::from_secs(300);
46
47/// [`Event`] represents various types of events that can be processed by the region flush ticker.
48///
49/// Variants:
50/// - `Tick`: This event is used to trigger region flush trigger periodically.
51pub enum Event {
52    Tick,
53}
54
55pub(crate) type RegionFlushTickerRef = Arc<RegionFlushTicker>;
56
57define_ticker!(
58    /// [RegionFlushTicker] is used to trigger region flush trigger periodically.
59    RegionFlushTicker,
60    event_type = Event,
61    event_value = Event::Tick
62);
63
64/// [`RegionFlushTrigger`] is used to ensure that the estimated WAL replay size
65/// stays below a certain threshold by triggering a region flush when the estimated
66/// WAL replay size exceeds that threshold. This helps improve datanode startup
67/// speed and reduce the overall startup time.
68///
69/// The estimated WAL replay size is calculated as:
70/// `(latest_entry_id - flushed_entry_id) * avg_record_size`
71pub struct RegionFlushTrigger {
72    /// The metadata manager.
73    table_metadata_manager: TableMetadataManagerRef,
74    /// The leader region registry.
75    leader_region_registry: LeaderRegionRegistryRef,
76    /// The topic stats registry.
77    topic_stats_registry: TopicStatsRegistryRef,
78    /// The mailbox to send messages.
79    mailbox: MailboxRef,
80    /// The server address.
81    server_addr: String,
82    /// The flush trigger size.
83    flush_trigger_size: ReadableSize,
84    /// The checkpoint trigger size.
85    checkpoint_trigger_size: ReadableSize,
86    /// The receiver of events.
87    receiver: Receiver<Event>,
88}
89
90impl RegionFlushTrigger {
91    /// Creates a new [`RegionFlushTrigger`].
92    pub(crate) fn new(
93        table_metadata_manager: TableMetadataManagerRef,
94        leader_region_registry: LeaderRegionRegistryRef,
95        topic_stats_registry: TopicStatsRegistryRef,
96        mailbox: MailboxRef,
97        server_addr: String,
98        mut flush_trigger_size: ReadableSize,
99        mut checkpoint_trigger_size: ReadableSize,
100    ) -> (Self, RegionFlushTicker) {
101        if flush_trigger_size.as_bytes() == 0 {
102            flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
103            warn!(
104                "flush_trigger_size is not set, using default value: {}",
105                flush_trigger_size
106            );
107        }
108        if checkpoint_trigger_size.as_bytes() == 0 {
109            checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE;
110            warn!(
111                "checkpoint_trigger_size is not set, using default value: {}",
112                checkpoint_trigger_size
113            );
114        }
115        let (tx, rx) = Self::channel();
116        let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx);
117        let region_flush_trigger = Self {
118            table_metadata_manager,
119            leader_region_registry,
120            topic_stats_registry,
121            mailbox,
122            server_addr,
123            flush_trigger_size,
124            checkpoint_trigger_size,
125            receiver: rx,
126        };
127        (region_flush_trigger, region_flush_ticker)
128    }
129
130    fn channel() -> (Sender<Event>, Receiver<Event>) {
131        tokio::sync::mpsc::channel(8)
132    }
133
134    /// Starts the region flush trigger.
135    pub fn try_start(mut self) -> Result<()> {
136        common_runtime::spawn_global(async move { self.run().await });
137        info!("Region flush trigger started");
138        Ok(())
139    }
140
141    async fn run(&mut self) {
142        while let Some(event) = self.receiver.recv().await {
143            match event {
144                Event::Tick => self.handle_tick().await,
145            }
146        }
147    }
148
149    async fn handle_tick(&self) {
150        if let Err(e) = self.trigger_flush().await {
151            error!(e; "Failed to trigger flush");
152        }
153    }
154
155    async fn trigger_flush(&self) -> Result<()> {
156        let now = Instant::now();
157        let topics = self
158            .table_metadata_manager
159            .topic_name_manager()
160            .range()
161            .await
162            .context(error::TableMetadataManagerSnafu)?;
163
164        for topic in &topics {
165            let Some((latest_entry_id, avg_record_size)) = self.retrieve_topic_stat(topic) else {
166                continue;
167            };
168            if let Err(e) = self
169                .flush_regions_in_topic(topic, latest_entry_id, avg_record_size)
170                .await
171            {
172                error!(e; "Failed to flush regions in topic: {}", topic);
173            }
174        }
175
176        debug!(
177            "Triggered flush for {} topics in {:?}",
178            topics.len(),
179            now.elapsed()
180        );
181        Ok(())
182    }
183
184    /// Retrieves the latest entry id and average record size of a topic.
185    ///
186    /// Returns `None` if the topic is not found or the latest entry id is not recent.
187    fn retrieve_topic_stat(&self, topic: &str) -> Option<(u64, usize)> {
188        let Some((latest_entry_id, timestamp)) =
189            self.topic_stats_registry.get_latest_entry_id(topic)
190        else {
191            debug!("No latest entry id found for topic: {}", topic);
192            return None;
193        };
194
195        let Some(stat) = self
196            .topic_stats_registry
197            .get_calculated_topic_stat(topic, TICKER_INTERVAL)
198        else {
199            debug!("No topic stat found for topic: {}", topic);
200            return None;
201        };
202
203        let now = current_time_millis();
204        if !is_recent(timestamp, now, RECENT_DURATION) {
205            debug!(
206                "Latest entry id of topic '{}': is not recent (now: {}, stat timestamp: {})",
207                topic, timestamp, now
208            );
209            return None;
210        }
211        if !is_recent(stat.end_ts, now, RECENT_DURATION) {
212            debug!(
213                "Calculated stat of topic '{}': is not recent (now: {}, stat timestamp: {})",
214                topic, stat.end_ts, now
215            );
216            return None;
217        }
218
219        Some((latest_entry_id, stat.avg_record_size))
220    }
221
222    async fn persist_region_checkpoints(
223        &self,
224        topic: &str,
225        region_ids: &[RegionId],
226        topic_regions: &HashMap<RegionId, TopicRegionValue>,
227        leader_regions: &HashMap<RegionId, LeaderRegion>,
228    ) -> Result<()> {
229        let regions = region_ids
230            .iter()
231            .flat_map(|region_id| match leader_regions.get(region_id) {
232                Some(leader_region) => should_persist_region_checkpoint(
233                    leader_region,
234                    topic_regions
235                        .get(region_id)
236                        .cloned()
237                        .and_then(|value| value.checkpoint),
238                )
239                .map(|checkpoint| {
240                    (
241                        TopicRegionKey::new(*region_id, topic),
242                        Some(TopicRegionValue::new(Some(checkpoint))),
243                    )
244                }),
245                None => None,
246            })
247            .collect::<Vec<_>>();
248
249        // The`chunks` will panic if chunks_size is zero, so we return early if there are no regions to persist.
250        if regions.is_empty() {
251            return Ok(());
252        }
253
254        let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
255        let batch_size = max_txn_ops.min(regions.len());
256        for batch in regions.chunks(batch_size) {
257            self.table_metadata_manager
258                .topic_region_manager()
259                .batch_put(batch)
260                .await
261                .context(error::TableMetadataManagerSnafu)?;
262        }
263
264        metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL
265            .with_label_values(&[topic])
266            .inc_by(regions.len() as u64);
267
268        Ok(())
269    }
270
271    async fn flush_regions_in_topic(
272        &self,
273        topic: &str,
274        latest_entry_id: u64,
275        avg_record_size: usize,
276    ) -> Result<()> {
277        let topic_regions = self
278            .table_metadata_manager
279            .topic_region_manager()
280            .regions(topic)
281            .await
282            .context(error::TableMetadataManagerSnafu)?;
283
284        if topic_regions.is_empty() {
285            debug!("No regions found for topic: {}", topic);
286            return Ok(());
287        }
288
289        // Filters regions need to persist checkpoints.
290        let regions_to_persist = filter_regions_by_replay_size(
291            topic,
292            topic_regions
293                .iter()
294                .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
295            avg_record_size as u64,
296            latest_entry_id,
297            self.checkpoint_trigger_size,
298        );
299        let region_manifests = self
300            .leader_region_registry
301            .batch_get(topic_regions.keys().cloned());
302
303        if let Err(err) = self
304            .persist_region_checkpoints(
305                topic,
306                &regions_to_persist,
307                &topic_regions,
308                &region_manifests,
309            )
310            .await
311        {
312            error!(err; "Failed to persist region checkpoints for topic: {}", topic);
313        }
314
315        let regions = region_manifests
316            .into_iter()
317            .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
318            .collect::<Vec<_>>();
319        let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
320        if let Some((_, min_entry_id)) = min_entry_id {
321            let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
322                .saturating_mul(avg_record_size as u64);
323            metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE
324                .with_label_values(&[topic])
325                .set(replay_size as i64);
326        }
327
328        // Selects regions to flush from the set of active regions.
329        let regions_to_flush = filter_regions_by_replay_size(
330            topic,
331            regions.into_iter(),
332            avg_record_size as u64,
333            latest_entry_id,
334            self.flush_trigger_size,
335        );
336
337        // Sends flush instructions to datanodes.
338        if !regions_to_flush.is_empty() {
339            self.send_flush_instructions(&regions_to_flush).await?;
340            debug!(
341                "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
342                regions_to_flush.len(),
343                topic,
344                regions_to_flush,
345            );
346        }
347
348        metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
349            .with_label_values(&[topic])
350            .inc_by(regions_to_flush.len() as u64);
351
352        Ok(())
353    }
354
355    async fn send_flush_instructions(&self, regions_to_flush: &[RegionId]) -> Result<()> {
356        let leader_to_region_ids =
357            group_regions_by_leader(&self.table_metadata_manager, regions_to_flush).await?;
358        let flush_instructions = leader_to_region_ids
359            .into_iter()
360            .map(|(leader, region_ids)| {
361                let flush_instruction =
362                    Instruction::FlushRegions(FlushRegions::async_batch(region_ids));
363                (leader, flush_instruction)
364            });
365
366        let tracing_ctx = TracingContext::from_current_span();
367        let tracing_header = tracing_ctx.to_w3c();
368        for (peer, flush_instruction) in flush_instructions {
369            let msg = MailboxMessage::json_message(
370                &format!("Flush regions: {}", flush_instruction),
371                &format!("Metasrv@{}", self.server_addr),
372                &format!("Datanode-{}@{}", peer.id, peer.addr),
373                common_time::util::current_time_millis(),
374                &flush_instruction,
375                Some(tracing_header.clone()),
376            )
377            .with_context(|_| error::SerializeToJsonSnafu {
378                input: flush_instruction.to_string(),
379            })?;
380            if let Err(e) = self
381                .mailbox
382                .send_oneway(&Channel::Datanode(peer.id), msg)
383                .await
384            {
385                error!(e; "Failed to send flush instruction to datanode {}", peer);
386            }
387        }
388
389        Ok(())
390    }
391}
392
393/// Determines whether a region checkpoint should be persisted based on current and persisted state.
394fn should_persist_region_checkpoint(
395    current: &LeaderRegion,
396    persisted: Option<ReplayCheckpoint>,
397) -> Option<ReplayCheckpoint> {
398    let new_checkpoint = ReplayCheckpoint::new(
399        current.manifest.replay_entry_id(),
400        current.manifest.metadata_replay_entry_id(),
401    );
402
403    let Some(persisted) = persisted else {
404        return Some(new_checkpoint);
405    };
406
407    if new_checkpoint > persisted {
408        return Some(new_checkpoint);
409    }
410    None
411}
412
413/// Filter regions based on the estimated replay size.
414///
415/// Returns the regions if its estimated replay size exceeds the given threshold.
416/// The estimated replay size is calculated as:
417/// `(latest_entry_id - prunable_entry_id) * avg_record_size`
418fn filter_regions_by_replay_size<I: Iterator<Item = (RegionId, u64)>>(
419    topic: &str,
420    regions: I,
421    avg_record_size: u64,
422    latest_entry_id: u64,
423    threshold: ReadableSize,
424) -> Vec<RegionId> {
425    let mut regions_to_flush = Vec::new();
426    for (region_id, entry_id) in regions {
427        if entry_id < latest_entry_id {
428            let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size);
429            if replay_size > threshold.as_bytes() {
430                debug!(
431                    "Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'",
432                    region_id,
433                    ReadableSize(replay_size),
434                    threshold,
435                    entry_id,
436                    latest_entry_id,
437                    topic
438                );
439                regions_to_flush.push(region_id);
440            }
441        }
442    }
443
444    regions_to_flush
445}
446
447/// Group regions by leader.
448///
449/// The regions are grouped by the leader of the region.
450async fn group_regions_by_leader(
451    table_metadata_manager: &TableMetadataManagerRef,
452    regions_to_flush: &[RegionId],
453) -> Result<HashMap<Peer, Vec<RegionId>>> {
454    let table_ids = regions_to_flush
455        .iter()
456        .map(|region_id| region_id.table_id())
457        .collect::<HashSet<_>>()
458        .into_iter()
459        .collect::<Vec<_>>();
460    let table_ids_table_routes = table_metadata_manager
461        .table_route_manager()
462        .batch_get_physical_table_routes(&table_ids)
463        .await
464        .context(error::TableMetadataManagerSnafu)?;
465
466    let mut peer_region_ids_map: HashMap<Peer, Vec<RegionId>> = HashMap::new();
467    for region_id in regions_to_flush {
468        let table_id = region_id.table_id();
469        let table_route = table_ids_table_routes
470            .get(&table_id)
471            .context(error::TableRouteNotFoundSnafu { table_id })?;
472        let Some(region_route) = table_route
473            .region_routes
474            .iter()
475            .find(|r| r.region.id == *region_id)
476        else {
477            continue;
478        };
479        let Some(peer) = &region_route.leader_peer else {
480            continue;
481        };
482
483        match peer_region_ids_map.get_mut(peer) {
484            Some(region_ids) => {
485                region_ids.push(*region_id);
486            }
487            None => {
488                peer_region_ids_map.insert(peer.clone(), vec![*region_id]);
489            }
490        }
491    }
492    Ok(peer_region_ids_map)
493}
494
495/// Check if the timestamp is recent.
496///
497/// The timestamp is recent if the difference between the current time and the timestamp is less than the duration.
498fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
499    let duration = duration.as_millis() as i64;
500    now.saturating_sub(timestamp) < duration
501}
502
503#[cfg(test)]
504mod tests {
505    use common_base::readable_size::ReadableSize;
506    use common_meta::region_registry::LeaderRegionManifestInfo;
507    use store_api::storage::RegionId;
508
509    use super::*;
510
511    #[test]
512    fn test_is_recent() {
513        let now = current_time_millis();
514        assert!(is_recent(now - 999, now, Duration::from_secs(1)));
515        assert!(!is_recent(now - 1001, now, Duration::from_secs(1)));
516    }
517
518    fn region_id(table: u32, region: u32) -> RegionId {
519        RegionId::new(table, region)
520    }
521
522    #[test]
523    fn test_no_regions_to_flush_when_none_exceed_threshold() {
524        let topic = "test_topic";
525        let avg_record_size = 10;
526        let latest_entry_id = 100;
527        let flush_trigger_size = ReadableSize(1000); // 1000 bytes
528
529        // All regions have prunable_entry_id close to latest_entry_id, so replay_size is small
530        let regions = vec![
531            (region_id(1, 1), 99), // replay_size = (100-99)*10 = 10
532            (region_id(1, 2), 98), // replay_size = 20
533            (region_id(1, 3), 95), // replay_size = 50
534        ];
535
536        let result = filter_regions_by_replay_size(
537            topic,
538            regions.into_iter(),
539            avg_record_size,
540            latest_entry_id,
541            flush_trigger_size,
542        );
543        assert!(result.is_empty());
544    }
545
546    #[test]
547    fn test_regions_to_flush_when_some_exceed_threshold() {
548        let topic = "test_topic";
549        let avg_record_size = 10;
550        let latest_entry_id = 100;
551        let flush_trigger_size = ReadableSize(50); // 50 bytes
552
553        // Only region 1,3 will exceed threshold: (100-90)*10 = 100 > 50
554        let regions = vec![
555            (region_id(1, 1), 99), // replay_size = 10
556            (region_id(1, 2), 98), // replay_size = 20
557            (region_id(1, 3), 90), // replay_size = 100
558        ];
559
560        let result = filter_regions_by_replay_size(
561            topic,
562            regions.into_iter(),
563            avg_record_size,
564            latest_entry_id,
565            flush_trigger_size,
566        );
567        assert_eq!(result, vec![region_id(1, 3)]);
568    }
569
570    #[test]
571    fn test_regions_to_flush_with_zero_avg_record_size() {
572        let topic = "test_topic";
573        let avg_record_size = 0;
574        let latest_entry_id = 100;
575        let flush_trigger_size = ReadableSize(1);
576
577        let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
578
579        // replay_size will always be 0, so none should be flushed
580        let result = filter_regions_by_replay_size(
581            topic,
582            regions.into_iter(),
583            avg_record_size,
584            latest_entry_id,
585            flush_trigger_size,
586        );
587        assert!(result.is_empty());
588    }
589
590    #[test]
591    fn test_regions_to_flush_with_prunable_entry_id_equal_latest() {
592        let topic = "test_topic";
593        let avg_record_size = 10;
594        let latest_entry_id = 100;
595        let flush_trigger_size = ReadableSize(10);
596
597        let regions = vec![
598            (region_id(1, 1), 100), // prunable_entry_id == latest_entry_id, should not be flushed
599            (region_id(1, 2), 99),  // replay_size = 10
600        ];
601
602        let result = filter_regions_by_replay_size(
603            topic,
604            regions.into_iter(),
605            avg_record_size,
606            latest_entry_id,
607            flush_trigger_size,
608        );
609        // Only region 1,2 should be flushed if replay_size > 10
610        assert!(result.is_empty());
611    }
612
613    #[test]
614    fn test_multiple_regions_to_flush() {
615        let topic = "test_topic";
616        let avg_record_size = 5;
617        let latest_entry_id = 200;
618        let flush_trigger_size = ReadableSize(20);
619
620        let regions = vec![
621            (region_id(1, 1), 190), // replay_size = 50
622            (region_id(1, 2), 180), // replay_size = 100
623            (region_id(1, 3), 199), // replay_size = 5
624            (region_id(1, 4), 200), // replay_size = 0
625        ];
626
627        let result = filter_regions_by_replay_size(
628            topic,
629            regions.into_iter(),
630            avg_record_size,
631            latest_entry_id,
632            flush_trigger_size,
633        );
634        // Only regions 1,1 and 1,2 should be flushed
635        assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
636    }
637
638    fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
639        LeaderRegion {
640            datanode_id: 1,
641            manifest: LeaderRegionManifestInfo::Metric {
642                data_manifest_version: 1,
643                data_flushed_entry_id: replay_entry_id,
644                data_topic_latest_entry_id: 0,
645                metadata_manifest_version: 1,
646                metadata_flushed_entry_id: metadata_replay_entry_id,
647                metadata_topic_latest_entry_id: 0,
648            },
649        }
650    }
651
652    fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
653        LeaderRegion {
654            datanode_id: 1,
655            manifest: LeaderRegionManifestInfo::Mito {
656                manifest_version: 1,
657                flushed_entry_id: replay_entry_id,
658                topic_latest_entry_id: 0,
659            },
660        }
661    }
662
663    #[test]
664    fn test_should_persist_region_checkpoint() {
665        // `persisted` is none
666        let current = metric_leader_region(100, 10);
667        let result = should_persist_region_checkpoint(&current, None).unwrap();
668        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
669
670        // `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
671        let current = mito_leader_region(100);
672        let result =
673            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, None)))
674                .unwrap();
675        assert_eq!(result, ReplayCheckpoint::new(100, None));
676
677        let current = metric_leader_region(100, 10);
678        let result =
679            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))))
680                .unwrap();
681        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
682
683        // `persisted.metadata_entry_id` is less than `current.manifest.metadata_replay_entry_id()`
684        let current = metric_leader_region(100, 10);
685        let result =
686            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
687                .unwrap();
688        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
689
690        // `persisted.metadata_entry_id` is none
691        let current = metric_leader_region(100, 10);
692        let result =
693            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)))
694                .unwrap();
695        assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
696
697        // `current.manifest.metadata_replay_entry_id()` is none
698        let current = mito_leader_region(100);
699        let result =
700            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(8))))
701                .is_none();
702        assert!(result);
703
704        // `persisted.entry_id` is equal to `current.manifest.replay_entry_id()`
705        let current = metric_leader_region(100, 10);
706        let result =
707            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, Some(10))));
708        assert!(result.is_none());
709        let current = mito_leader_region(100);
710        let result =
711            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(100, None)));
712        assert!(result.is_none());
713
714        // `persisted.entry_id` is less than `current.manifest.replay_entry_id()`
715        // `persisted.metadata_entry_id` is greater than `current.manifest.metadata_replay_entry_id()`
716        let current = metric_leader_region(80, 11);
717        let result =
718            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
719        assert!(result.is_none());
720        let current = mito_leader_region(80);
721        let result =
722            should_persist_region_checkpoint(&current, Some(ReplayCheckpoint::new(90, Some(10))));
723        assert!(result.is_none());
724    }
725}