1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_base::readable_size::ReadableSize;
21use common_meta::instruction::{FlushRegions, Instruction};
22use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
23use common_meta::key::TableMetadataManagerRef;
24use common_meta::peer::Peer;
25use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef};
26use common_meta::stats::topic::TopicStatsRegistryRef;
27use common_telemetry::{debug, error, info, warn};
28use common_time::util::current_time_millis;
29use common_wal::config::kafka::common::{
30 DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
31};
32use snafu::{OptionExt, ResultExt};
33use store_api::storage::RegionId;
34use tokio::sync::mpsc::{Receiver, Sender};
35
36use crate::error::{self, Result};
37use crate::service::mailbox::{Channel, MailboxRef};
38use crate::{define_ticker, metrics};
39
40const TICKER_INTERVAL: Duration = Duration::from_secs(60);
42
43const RECENT_DURATION: Duration = Duration::from_secs(300);
45
46pub(crate) enum Event {
51 Tick,
52}
53
54pub(crate) type RegionFlushTickerRef = Arc<RegionFlushTicker>;
55
56define_ticker!(
57 RegionFlushTicker,
59 event_type = Event,
60 event_value = Event::Tick
61);
62
63pub struct RegionFlushTrigger {
71 table_metadata_manager: TableMetadataManagerRef,
73 leader_region_registry: LeaderRegionRegistryRef,
75 topic_stats_registry: TopicStatsRegistryRef,
77 mailbox: MailboxRef,
79 server_addr: String,
81 flush_trigger_size: ReadableSize,
83 checkpoint_trigger_size: ReadableSize,
85 receiver: Receiver<Event>,
87}
88
89impl RegionFlushTrigger {
90 pub(crate) fn new(
92 table_metadata_manager: TableMetadataManagerRef,
93 leader_region_registry: LeaderRegionRegistryRef,
94 topic_stats_registry: TopicStatsRegistryRef,
95 mailbox: MailboxRef,
96 server_addr: String,
97 mut flush_trigger_size: ReadableSize,
98 mut checkpoint_trigger_size: ReadableSize,
99 ) -> (Self, RegionFlushTicker) {
100 if flush_trigger_size.as_bytes() == 0 {
101 flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
102 warn!(
103 "flush_trigger_size is not set, using default value: {}",
104 flush_trigger_size
105 );
106 }
107 if checkpoint_trigger_size.as_bytes() == 0 {
108 checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE;
109 warn!(
110 "checkpoint_trigger_size is not set, using default value: {}",
111 checkpoint_trigger_size
112 );
113 }
114 let (tx, rx) = Self::channel();
115 let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx);
116 let region_flush_trigger = Self {
117 table_metadata_manager,
118 leader_region_registry,
119 topic_stats_registry,
120 mailbox,
121 server_addr,
122 flush_trigger_size,
123 checkpoint_trigger_size,
124 receiver: rx,
125 };
126 (region_flush_trigger, region_flush_ticker)
127 }
128
129 fn channel() -> (Sender<Event>, Receiver<Event>) {
130 tokio::sync::mpsc::channel(8)
131 }
132
133 pub fn try_start(mut self) -> Result<()> {
135 common_runtime::spawn_global(async move { self.run().await });
136 info!("Region flush trigger started");
137 Ok(())
138 }
139
140 async fn run(&mut self) {
141 while let Some(event) = self.receiver.recv().await {
142 match event {
143 Event::Tick => self.handle_tick().await,
144 }
145 }
146 }
147
148 async fn handle_tick(&self) {
149 if let Err(e) = self.trigger_flush().await {
150 error!(e; "Failed to trigger flush");
151 }
152 }
153
154 async fn trigger_flush(&self) -> Result<()> {
155 let now = Instant::now();
156 let topics = self
157 .table_metadata_manager
158 .topic_name_manager()
159 .range()
160 .await
161 .context(error::TableMetadataManagerSnafu)?;
162
163 for topic in &topics {
164 let Some((latest_entry_id, avg_record_size)) = self.retrieve_topic_stat(topic) else {
165 continue;
166 };
167 if let Err(e) = self
168 .flush_regions_in_topic(topic, latest_entry_id, avg_record_size)
169 .await
170 {
171 error!(e; "Failed to flush regions in topic: {}", topic);
172 }
173 }
174
175 debug!(
176 "Triggered flush for {} topics in {:?}",
177 topics.len(),
178 now.elapsed()
179 );
180 Ok(())
181 }
182
183 fn retrieve_topic_stat(&self, topic: &str) -> Option<(u64, usize)> {
187 let Some((latest_entry_id, timestamp)) =
188 self.topic_stats_registry.get_latest_entry_id(topic)
189 else {
190 debug!("No latest entry id found for topic: {}", topic);
191 return None;
192 };
193
194 let Some(stat) = self
195 .topic_stats_registry
196 .get_calculated_topic_stat(topic, TICKER_INTERVAL)
197 else {
198 debug!("No topic stat found for topic: {}", topic);
199 return None;
200 };
201
202 let now = current_time_millis();
203 if !is_recent(timestamp, now, RECENT_DURATION) {
204 debug!(
205 "Latest entry id of topic '{}': is not recent (now: {}, stat timestamp: {})",
206 topic, timestamp, now
207 );
208 return None;
209 }
210 if !is_recent(stat.end_ts, now, RECENT_DURATION) {
211 debug!(
212 "Calculated stat of topic '{}': is not recent (now: {}, stat timestamp: {})",
213 topic, stat.end_ts, now
214 );
215 return None;
216 }
217
218 Some((latest_entry_id, stat.avg_record_size))
219 }
220
221 async fn persist_region_checkpoints(
222 &self,
223 topic: &str,
224 region_ids: &[RegionId],
225 topic_regions: &HashMap<RegionId, TopicRegionValue>,
226 leader_regions: &HashMap<RegionId, LeaderRegion>,
227 ) -> Result<()> {
228 let regions = region_ids
229 .iter()
230 .flat_map(|region_id| match leader_regions.get(region_id) {
231 Some(leader_region) => should_persist_region_checkpoint(
232 leader_region,
233 topic_regions
234 .get(region_id)
235 .cloned()
236 .and_then(|value| value.checkpoint),
237 )
238 .map(|checkpoint| {
239 (
240 TopicRegionKey::new(*region_id, topic),
241 Some(TopicRegionValue::new(Some(checkpoint))),
242 )
243 }),
244 None => None,
245 })
246 .collect::<Vec<_>>();
247
248 if regions.is_empty() {
250 return Ok(());
251 }
252
253 let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
254 let batch_size = max_txn_ops.min(regions.len());
255 for batch in regions.chunks(batch_size) {
256 self.table_metadata_manager
257 .topic_region_manager()
258 .batch_put(batch)
259 .await
260 .context(error::TableMetadataManagerSnafu)?;
261 }
262
263 metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL
264 .with_label_values(&[topic])
265 .inc_by(regions.len() as u64);
266
267 Ok(())
268 }
269
270 async fn flush_regions_in_topic(
271 &self,
272 topic: &str,
273 latest_entry_id: u64,
274 avg_record_size: usize,
275 ) -> Result<()> {
276 let topic_regions = self
277 .table_metadata_manager
278 .topic_region_manager()
279 .regions(topic)
280 .await
281 .context(error::TableMetadataManagerSnafu)?;
282
283 if topic_regions.is_empty() {
284 debug!("No regions found for topic: {}", topic);
285 return Ok(());
286 }
287
288 let regions_to_persist = filter_regions_by_replay_size(
290 topic,
291 topic_regions
292 .iter()
293 .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
294 avg_record_size as u64,
295 latest_entry_id,
296 self.checkpoint_trigger_size,
297 );
298 let region_manifests = self
299 .leader_region_registry
300 .batch_get(topic_regions.keys().cloned());
301
302 if let Err(err) = self
303 .persist_region_checkpoints(
304 topic,
305 ®ions_to_persist,
306 &topic_regions,
307 ®ion_manifests,
308 )
309 .await
310 {
311 error!(err; "Failed to persist region checkpoints for topic: {}", topic);
312 }
313
314 let regions = region_manifests
315 .into_iter()
316 .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
317 .collect::<Vec<_>>();
318 let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
319 if let Some((_, min_entry_id)) = min_entry_id {
320 let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
321 .saturating_mul(avg_record_size as u64);
322 metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE
323 .with_label_values(&[topic])
324 .set(replay_size as i64);
325 }
326
327 let regions_to_flush = filter_regions_by_replay_size(
329 topic,
330 regions.into_iter(),
331 avg_record_size as u64,
332 latest_entry_id,
333 self.flush_trigger_size,
334 );
335
336 if !regions_to_flush.is_empty() {
338 self.send_flush_instructions(®ions_to_flush).await?;
339 debug!(
340 "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
341 regions_to_flush.len(),
342 topic,
343 regions_to_flush,
344 );
345 }
346
347 metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
348 .with_label_values(&[topic])
349 .inc_by(regions_to_flush.len() as u64);
350
351 Ok(())
352 }
353
354 async fn send_flush_instructions(&self, regions_to_flush: &[RegionId]) -> Result<()> {
355 let leader_to_region_ids =
356 group_regions_by_leader(&self.table_metadata_manager, regions_to_flush).await?;
357 let flush_instructions = leader_to_region_ids
358 .into_iter()
359 .map(|(leader, region_ids)| {
360 let flush_instruction =
361 Instruction::FlushRegions(FlushRegions::async_batch(region_ids));
362 (leader, flush_instruction)
363 });
364
365 for (peer, flush_instruction) in flush_instructions {
366 let msg = MailboxMessage::json_message(
367 &format!("Flush regions: {}", flush_instruction),
368 &format!("Metasrv@{}", self.server_addr),
369 &format!("Datanode-{}@{}", peer.id, peer.addr),
370 common_time::util::current_time_millis(),
371 &flush_instruction,
372 )
373 .with_context(|_| error::SerializeToJsonSnafu {
374 input: flush_instruction.to_string(),
375 })?;
376 if let Err(e) = self
377 .mailbox
378 .send_oneway(&Channel::Datanode(peer.id), msg)
379 .await
380 {
381 error!(e; "Failed to send flush instruction to datanode {}", peer);
382 }
383 }
384
385 Ok(())
386 }
387}
388
389fn should_persist_region_checkpoint(
391 current: &LeaderRegion,
392 persisted: Option<ReplayCheckpoint>,
393) -> Option<ReplayCheckpoint> {
394 let new_checkpoint = ReplayCheckpoint::new(
395 current.manifest.replay_entry_id(),
396 current.manifest.metadata_replay_entry_id(),
397 );
398
399 let Some(persisted) = persisted else {
400 return Some(new_checkpoint);
401 };
402
403 if new_checkpoint > persisted {
404 return Some(new_checkpoint);
405 }
406 None
407}
408
409fn filter_regions_by_replay_size<I: Iterator<Item = (RegionId, u64)>>(
415 topic: &str,
416 regions: I,
417 avg_record_size: u64,
418 latest_entry_id: u64,
419 threshold: ReadableSize,
420) -> Vec<RegionId> {
421 let mut regions_to_flush = Vec::new();
422 for (region_id, entry_id) in regions {
423 if entry_id < latest_entry_id {
424 let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size);
425 if replay_size > threshold.as_bytes() {
426 debug!(
427 "Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'",
428 region_id, ReadableSize(replay_size), threshold, entry_id, latest_entry_id, topic
429 );
430 regions_to_flush.push(region_id);
431 }
432 }
433 }
434
435 regions_to_flush
436}
437
438async fn group_regions_by_leader(
442 table_metadata_manager: &TableMetadataManagerRef,
443 regions_to_flush: &[RegionId],
444) -> Result<HashMap<Peer, Vec<RegionId>>> {
445 let table_ids = regions_to_flush
446 .iter()
447 .map(|region_id| region_id.table_id())
448 .collect::<HashSet<_>>()
449 .into_iter()
450 .collect::<Vec<_>>();
451 let table_ids_table_routes = table_metadata_manager
452 .table_route_manager()
453 .batch_get_physical_table_routes(&table_ids)
454 .await
455 .context(error::TableMetadataManagerSnafu)?;
456
457 let mut peer_region_ids_map: HashMap<Peer, Vec<RegionId>> = HashMap::new();
458 for region_id in regions_to_flush {
459 let table_id = region_id.table_id();
460 let table_route = table_ids_table_routes
461 .get(&table_id)
462 .context(error::TableRouteNotFoundSnafu { table_id })?;
463 let Some(region_route) = table_route
464 .region_routes
465 .iter()
466 .find(|r| r.region.id == *region_id)
467 else {
468 continue;
469 };
470 let Some(peer) = ®ion_route.leader_peer else {
471 continue;
472 };
473
474 match peer_region_ids_map.get_mut(peer) {
475 Some(region_ids) => {
476 region_ids.push(*region_id);
477 }
478 None => {
479 peer_region_ids_map.insert(peer.clone(), vec![*region_id]);
480 }
481 }
482 }
483 Ok(peer_region_ids_map)
484}
485
486fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
490 let duration = duration.as_millis() as i64;
491 now.saturating_sub(timestamp) < duration
492}
493
494#[cfg(test)]
495mod tests {
496 use common_base::readable_size::ReadableSize;
497 use common_meta::region_registry::LeaderRegionManifestInfo;
498 use store_api::storage::RegionId;
499
500 use super::*;
501
502 #[test]
503 fn test_is_recent() {
504 let now = current_time_millis();
505 assert!(is_recent(now - 999, now, Duration::from_secs(1)));
506 assert!(!is_recent(now - 1001, now, Duration::from_secs(1)));
507 }
508
509 fn region_id(table: u32, region: u32) -> RegionId {
510 RegionId::new(table, region)
511 }
512
513 #[test]
514 fn test_no_regions_to_flush_when_none_exceed_threshold() {
515 let topic = "test_topic";
516 let avg_record_size = 10;
517 let latest_entry_id = 100;
518 let flush_trigger_size = ReadableSize(1000); let regions = vec![
522 (region_id(1, 1), 99), (region_id(1, 2), 98), (region_id(1, 3), 95), ];
526
527 let result = filter_regions_by_replay_size(
528 topic,
529 regions.into_iter(),
530 avg_record_size,
531 latest_entry_id,
532 flush_trigger_size,
533 );
534 assert!(result.is_empty());
535 }
536
537 #[test]
538 fn test_regions_to_flush_when_some_exceed_threshold() {
539 let topic = "test_topic";
540 let avg_record_size = 10;
541 let latest_entry_id = 100;
542 let flush_trigger_size = ReadableSize(50); let regions = vec![
546 (region_id(1, 1), 99), (region_id(1, 2), 98), (region_id(1, 3), 90), ];
550
551 let result = filter_regions_by_replay_size(
552 topic,
553 regions.into_iter(),
554 avg_record_size,
555 latest_entry_id,
556 flush_trigger_size,
557 );
558 assert_eq!(result, vec![region_id(1, 3)]);
559 }
560
561 #[test]
562 fn test_regions_to_flush_with_zero_avg_record_size() {
563 let topic = "test_topic";
564 let avg_record_size = 0;
565 let latest_entry_id = 100;
566 let flush_trigger_size = ReadableSize(1);
567
568 let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
569
570 let result = filter_regions_by_replay_size(
572 topic,
573 regions.into_iter(),
574 avg_record_size,
575 latest_entry_id,
576 flush_trigger_size,
577 );
578 assert!(result.is_empty());
579 }
580
581 #[test]
582 fn test_regions_to_flush_with_prunable_entry_id_equal_latest() {
583 let topic = "test_topic";
584 let avg_record_size = 10;
585 let latest_entry_id = 100;
586 let flush_trigger_size = ReadableSize(10);
587
588 let regions = vec![
589 (region_id(1, 1), 100), (region_id(1, 2), 99), ];
592
593 let result = filter_regions_by_replay_size(
594 topic,
595 regions.into_iter(),
596 avg_record_size,
597 latest_entry_id,
598 flush_trigger_size,
599 );
600 assert!(result.is_empty());
602 }
603
604 #[test]
605 fn test_multiple_regions_to_flush() {
606 let topic = "test_topic";
607 let avg_record_size = 5;
608 let latest_entry_id = 200;
609 let flush_trigger_size = ReadableSize(20);
610
611 let regions = vec![
612 (region_id(1, 1), 190), (region_id(1, 2), 180), (region_id(1, 3), 199), (region_id(1, 4), 200), ];
617
618 let result = filter_regions_by_replay_size(
619 topic,
620 regions.into_iter(),
621 avg_record_size,
622 latest_entry_id,
623 flush_trigger_size,
624 );
625 assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
627 }
628
629 fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
630 LeaderRegion {
631 datanode_id: 1,
632 manifest: LeaderRegionManifestInfo::Metric {
633 data_manifest_version: 1,
634 data_flushed_entry_id: replay_entry_id,
635 data_topic_latest_entry_id: 0,
636 metadata_manifest_version: 1,
637 metadata_flushed_entry_id: metadata_replay_entry_id,
638 metadata_topic_latest_entry_id: 0,
639 },
640 }
641 }
642
643 fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
644 LeaderRegion {
645 datanode_id: 1,
646 manifest: LeaderRegionManifestInfo::Mito {
647 manifest_version: 1,
648 flushed_entry_id: replay_entry_id,
649 topic_latest_entry_id: 0,
650 },
651 }
652 }
653
654 #[test]
655 fn test_should_persist_region_checkpoint() {
656 let current = metric_leader_region(100, 10);
658 let result = should_persist_region_checkpoint(¤t, None).unwrap();
659 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
660
661 let current = mito_leader_region(100);
663 let result =
664 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, None)))
665 .unwrap();
666 assert_eq!(result, ReplayCheckpoint::new(100, None));
667
668 let current = metric_leader_region(100, 10);
669 let result =
670 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))))
671 .unwrap();
672 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
673
674 let current = metric_leader_region(100, 10);
676 let result =
677 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
678 .unwrap();
679 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
680
681 let current = metric_leader_region(100, 10);
683 let result =
684 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)))
685 .unwrap();
686 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
687
688 let current = mito_leader_region(100);
690 let result =
691 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
692 .is_none();
693 assert!(result);
694
695 let current = metric_leader_region(100, 10);
697 let result =
698 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(10))));
699 assert!(result.is_none());
700 let current = mito_leader_region(100);
701 let result =
702 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)));
703 assert!(result.is_none());
704
705 let current = metric_leader_region(80, 11);
708 let result =
709 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
710 assert!(result.is_none());
711 let current = mito_leader_region(80);
712 let result =
713 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
714 assert!(result.is_none());
715 }
716}