1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_base::readable_size::ReadableSize;
21use common_meta::instruction::{FlushRegions, Instruction};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
24use common_meta::peer::Peer;
25use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef};
26use common_meta::stats::topic::TopicStatsRegistryRef;
27use common_telemetry::{debug, error, info, warn};
28use common_time::util::current_time_millis;
29use common_wal::config::kafka::common::{
30 DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
31};
32use snafu::{OptionExt, ResultExt};
33use store_api::storage::RegionId;
34use tokio::sync::mpsc::{Receiver, Sender};
35
36use crate::error::{self, Result};
37use crate::service::mailbox::{Channel, MailboxRef};
38use crate::{define_ticker, metrics};
39
40const TICKER_INTERVAL: Duration = Duration::from_secs(60);
42
43const RECENT_DURATION: Duration = Duration::from_secs(300);
45
46pub(crate) enum Event {
51 Tick,
52}
53
54pub(crate) type RegionFlushTickerRef = Arc<RegionFlushTicker>;
55
56define_ticker!(
57 RegionFlushTicker,
59 event_type = Event,
60 event_value = Event::Tick
61);
62
63pub struct RegionFlushTrigger {
71 table_metadata_manager: TableMetadataManagerRef,
73 leader_region_registry: LeaderRegionRegistryRef,
75 topic_stats_registry: TopicStatsRegistryRef,
77 mailbox: MailboxRef,
79 server_addr: String,
81 flush_trigger_size: ReadableSize,
83 checkpoint_trigger_size: ReadableSize,
85 receiver: Receiver<Event>,
87}
88
89impl RegionFlushTrigger {
90 pub(crate) fn new(
92 table_metadata_manager: TableMetadataManagerRef,
93 leader_region_registry: LeaderRegionRegistryRef,
94 topic_stats_registry: TopicStatsRegistryRef,
95 mailbox: MailboxRef,
96 server_addr: String,
97 mut flush_trigger_size: ReadableSize,
98 mut checkpoint_trigger_size: ReadableSize,
99 ) -> (Self, RegionFlushTicker) {
100 if flush_trigger_size.as_bytes() == 0 {
101 flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
102 warn!(
103 "flush_trigger_size is not set, using default value: {}",
104 flush_trigger_size
105 );
106 }
107 if checkpoint_trigger_size.as_bytes() == 0 {
108 checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE;
109 warn!(
110 "checkpoint_trigger_size is not set, using default value: {}",
111 checkpoint_trigger_size
112 );
113 }
114 let (tx, rx) = Self::channel();
115 let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx);
116 let region_flush_trigger = Self {
117 table_metadata_manager,
118 leader_region_registry,
119 topic_stats_registry,
120 mailbox,
121 server_addr,
122 flush_trigger_size,
123 checkpoint_trigger_size,
124 receiver: rx,
125 };
126 (region_flush_trigger, region_flush_ticker)
127 }
128
129 fn channel() -> (Sender<Event>, Receiver<Event>) {
130 tokio::sync::mpsc::channel(8)
131 }
132
133 pub fn try_start(mut self) -> Result<()> {
135 common_runtime::spawn_global(async move { self.run().await });
136 info!("Region flush trigger started");
137 Ok(())
138 }
139
140 async fn run(&mut self) {
141 while let Some(event) = self.receiver.recv().await {
142 match event {
143 Event::Tick => self.handle_tick().await,
144 }
145 }
146 }
147
148 async fn handle_tick(&self) {
149 if let Err(e) = self.trigger_flush().await {
150 error!(e; "Failed to trigger flush");
151 }
152 }
153
154 async fn trigger_flush(&self) -> Result<()> {
155 let now = Instant::now();
156 let topics = self
157 .table_metadata_manager
158 .topic_name_manager()
159 .range()
160 .await
161 .context(error::TableMetadataManagerSnafu)?;
162
163 for topic in &topics {
164 let Some((latest_entry_id, avg_record_size)) = self.retrieve_topic_stat(topic) else {
165 continue;
166 };
167 if let Err(e) = self
168 .flush_regions_in_topic(topic, latest_entry_id, avg_record_size)
169 .await
170 {
171 error!(e; "Failed to flush regions in topic: {}", topic);
172 }
173 }
174
175 debug!(
176 "Triggered flush for {} topics in {:?}",
177 topics.len(),
178 now.elapsed()
179 );
180 Ok(())
181 }
182
183 fn retrieve_topic_stat(&self, topic: &str) -> Option<(u64, usize)> {
187 let Some((latest_entry_id, timestamp)) =
188 self.topic_stats_registry.get_latest_entry_id(topic)
189 else {
190 debug!("No latest entry id found for topic: {}", topic);
191 return None;
192 };
193
194 let Some(stat) = self
195 .topic_stats_registry
196 .get_calculated_topic_stat(topic, TICKER_INTERVAL)
197 else {
198 debug!("No topic stat found for topic: {}", topic);
199 return None;
200 };
201
202 let now = current_time_millis();
203 if !is_recent(timestamp, now, RECENT_DURATION) {
204 debug!(
205 "Latest entry id of topic '{}': is not recent (now: {}, stat timestamp: {})",
206 topic, timestamp, now
207 );
208 return None;
209 }
210 if !is_recent(stat.end_ts, now, RECENT_DURATION) {
211 debug!(
212 "Calculated stat of topic '{}': is not recent (now: {}, stat timestamp: {})",
213 topic, stat.end_ts, now
214 );
215 return None;
216 }
217
218 Some((latest_entry_id, stat.avg_record_size))
219 }
220
221 async fn persist_region_checkpoints(
222 &self,
223 topic: &str,
224 region_ids: &[RegionId],
225 topic_regions: &HashMap<RegionId, TopicRegionValue>,
226 leader_regions: &HashMap<RegionId, LeaderRegion>,
227 ) -> Result<()> {
228 let regions = region_ids
229 .iter()
230 .flat_map(|region_id| match leader_regions.get(region_id) {
231 Some(leader_region) => should_persist_region_checkpoint(
232 leader_region,
233 topic_regions
234 .get(region_id)
235 .cloned()
236 .and_then(|value| value.checkpoint),
237 )
238 .map(|checkpoint| {
239 (
240 TopicRegionKey::new(*region_id, topic),
241 Some(TopicRegionValue::new(Some(checkpoint))),
242 )
243 }),
244 None => None,
245 })
246 .collect::<Vec<_>>();
247
248 if regions.is_empty() {
250 return Ok(());
251 }
252
253 let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
254 let batch_size = max_txn_ops.min(regions.len());
255 for batch in regions.chunks(batch_size) {
256 self.table_metadata_manager
257 .topic_region_manager()
258 .batch_put(batch)
259 .await
260 .context(error::TableMetadataManagerSnafu)?;
261 }
262
263 metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL
264 .with_label_values(&[topic])
265 .inc_by(regions.len() as u64);
266
267 Ok(())
268 }
269
270 async fn flush_regions_in_topic(
271 &self,
272 topic: &str,
273 latest_entry_id: u64,
274 avg_record_size: usize,
275 ) -> Result<()> {
276 let topic_regions = self
277 .table_metadata_manager
278 .topic_region_manager()
279 .regions(topic)
280 .await
281 .context(error::TableMetadataManagerSnafu)?;
282
283 if topic_regions.is_empty() {
284 debug!("No regions found for topic: {}", topic);
285 return Ok(());
286 }
287
288 let regions_to_persist = filter_regions_by_replay_size(
290 topic,
291 topic_regions
292 .iter()
293 .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
294 avg_record_size as u64,
295 latest_entry_id,
296 self.checkpoint_trigger_size,
297 );
298 let region_manifests = self
299 .leader_region_registry
300 .batch_get(topic_regions.keys().cloned());
301
302 if let Err(err) = self
303 .persist_region_checkpoints(
304 topic,
305 ®ions_to_persist,
306 &topic_regions,
307 ®ion_manifests,
308 )
309 .await
310 {
311 error!(err; "Failed to persist region checkpoints for topic: {}", topic);
312 }
313
314 let regions = region_manifests
315 .into_iter()
316 .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
317 .collect::<Vec<_>>();
318 let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
319 if let Some((_, min_entry_id)) = min_entry_id {
320 let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
321 .saturating_mul(avg_record_size as u64);
322 metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE
323 .with_label_values(&[topic])
324 .set(replay_size as i64);
325 }
326
327 let regions_to_flush = filter_regions_by_replay_size(
329 topic,
330 regions.into_iter(),
331 avg_record_size as u64,
332 latest_entry_id,
333 self.flush_trigger_size,
334 );
335
336 if !regions_to_flush.is_empty() {
338 self.send_flush_instructions(®ions_to_flush).await?;
339 debug!(
340 "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
341 regions_to_flush.len(),
342 topic,
343 regions_to_flush,
344 );
345 }
346
347 metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
348 .with_label_values(&[topic])
349 .inc_by(regions_to_flush.len() as u64);
350
351 Ok(())
352 }
353
354 async fn send_flush_instructions(&self, regions_to_flush: &[RegionId]) -> Result<()> {
355 let leader_to_region_ids =
356 group_regions_by_leader(&self.table_metadata_manager, regions_to_flush).await?;
357 let flush_instructions = leader_to_region_ids
358 .into_iter()
359 .map(|(leader, region_ids)| {
360 let flush_instruction =
361 Instruction::FlushRegions(FlushRegions::async_batch(region_ids));
362 (leader, flush_instruction)
363 });
364
365 for (peer, flush_instruction) in flush_instructions {
366 let msg = MailboxMessage::json_message(
367 &format!("Flush regions: {}", flush_instruction),
368 &format!("Metasrv@{}", self.server_addr),
369 &format!("Datanode-{}@{}", peer.id, peer.addr),
370 common_time::util::current_time_millis(),
371 &flush_instruction,
372 )
373 .with_context(|_| error::SerializeToJsonSnafu {
374 input: flush_instruction.to_string(),
375 })?;
376 if let Err(e) = self
377 .mailbox
378 .send_oneway(&Channel::Datanode(peer.id), msg)
379 .await
380 {
381 error!(e; "Failed to send flush instruction to datanode {}", peer);
382 }
383 }
384
385 Ok(())
386 }
387}
388
389fn should_persist_region_checkpoint(
391 current: &LeaderRegion,
392 persisted: Option<ReplayCheckpoint>,
393) -> Option<ReplayCheckpoint> {
394 let new_checkpoint = ReplayCheckpoint::new(
395 current.manifest.replay_entry_id(),
396 current.manifest.metadata_replay_entry_id(),
397 );
398
399 let Some(persisted) = persisted else {
400 return Some(new_checkpoint);
401 };
402
403 if new_checkpoint > persisted {
404 return Some(new_checkpoint);
405 }
406 None
407}
408
409fn filter_regions_by_replay_size<I: Iterator<Item = (RegionId, u64)>>(
415 topic: &str,
416 regions: I,
417 avg_record_size: u64,
418 latest_entry_id: u64,
419 threshold: ReadableSize,
420) -> Vec<RegionId> {
421 let mut regions_to_flush = Vec::new();
422 for (region_id, entry_id) in regions {
423 if entry_id < latest_entry_id {
424 let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size);
425 if replay_size > threshold.as_bytes() {
426 debug!(
427 "Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'",
428 region_id,
429 ReadableSize(replay_size),
430 threshold,
431 entry_id,
432 latest_entry_id,
433 topic
434 );
435 regions_to_flush.push(region_id);
436 }
437 }
438 }
439
440 regions_to_flush
441}
442
443async fn group_regions_by_leader(
447 table_metadata_manager: &TableMetadataManagerRef,
448 regions_to_flush: &[RegionId],
449) -> Result<HashMap<Peer, Vec<RegionId>>> {
450 let table_ids = regions_to_flush
451 .iter()
452 .map(|region_id| region_id.table_id())
453 .collect::<HashSet<_>>()
454 .into_iter()
455 .collect::<Vec<_>>();
456 let table_ids_table_routes = table_metadata_manager
457 .table_route_manager()
458 .batch_get_physical_table_routes(&table_ids)
459 .await
460 .context(error::TableMetadataManagerSnafu)?;
461
462 let mut peer_region_ids_map: HashMap<Peer, Vec<RegionId>> = HashMap::new();
463 for region_id in regions_to_flush {
464 let table_id = region_id.table_id();
465 let table_route = table_ids_table_routes
466 .get(&table_id)
467 .context(error::TableRouteNotFoundSnafu { table_id })?;
468 let Some(region_route) = table_route
469 .region_routes
470 .iter()
471 .find(|r| r.region.id == *region_id)
472 else {
473 continue;
474 };
475 let Some(peer) = ®ion_route.leader_peer else {
476 continue;
477 };
478
479 match peer_region_ids_map.get_mut(peer) {
480 Some(region_ids) => {
481 region_ids.push(*region_id);
482 }
483 None => {
484 peer_region_ids_map.insert(peer.clone(), vec![*region_id]);
485 }
486 }
487 }
488 Ok(peer_region_ids_map)
489}
490
491fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
495 let duration = duration.as_millis() as i64;
496 now.saturating_sub(timestamp) < duration
497}
498
499#[cfg(test)]
500mod tests {
501 use common_base::readable_size::ReadableSize;
502 use common_meta::region_registry::LeaderRegionManifestInfo;
503 use store_api::storage::RegionId;
504
505 use super::*;
506
507 #[test]
508 fn test_is_recent() {
509 let now = current_time_millis();
510 assert!(is_recent(now - 999, now, Duration::from_secs(1)));
511 assert!(!is_recent(now - 1001, now, Duration::from_secs(1)));
512 }
513
514 fn region_id(table: u32, region: u32) -> RegionId {
515 RegionId::new(table, region)
516 }
517
518 #[test]
519 fn test_no_regions_to_flush_when_none_exceed_threshold() {
520 let topic = "test_topic";
521 let avg_record_size = 10;
522 let latest_entry_id = 100;
523 let flush_trigger_size = ReadableSize(1000); let regions = vec![
527 (region_id(1, 1), 99), (region_id(1, 2), 98), (region_id(1, 3), 95), ];
531
532 let result = filter_regions_by_replay_size(
533 topic,
534 regions.into_iter(),
535 avg_record_size,
536 latest_entry_id,
537 flush_trigger_size,
538 );
539 assert!(result.is_empty());
540 }
541
542 #[test]
543 fn test_regions_to_flush_when_some_exceed_threshold() {
544 let topic = "test_topic";
545 let avg_record_size = 10;
546 let latest_entry_id = 100;
547 let flush_trigger_size = ReadableSize(50); let regions = vec![
551 (region_id(1, 1), 99), (region_id(1, 2), 98), (region_id(1, 3), 90), ];
555
556 let result = filter_regions_by_replay_size(
557 topic,
558 regions.into_iter(),
559 avg_record_size,
560 latest_entry_id,
561 flush_trigger_size,
562 );
563 assert_eq!(result, vec![region_id(1, 3)]);
564 }
565
566 #[test]
567 fn test_regions_to_flush_with_zero_avg_record_size() {
568 let topic = "test_topic";
569 let avg_record_size = 0;
570 let latest_entry_id = 100;
571 let flush_trigger_size = ReadableSize(1);
572
573 let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
574
575 let result = filter_regions_by_replay_size(
577 topic,
578 regions.into_iter(),
579 avg_record_size,
580 latest_entry_id,
581 flush_trigger_size,
582 );
583 assert!(result.is_empty());
584 }
585
586 #[test]
587 fn test_regions_to_flush_with_prunable_entry_id_equal_latest() {
588 let topic = "test_topic";
589 let avg_record_size = 10;
590 let latest_entry_id = 100;
591 let flush_trigger_size = ReadableSize(10);
592
593 let regions = vec![
594 (region_id(1, 1), 100), (region_id(1, 2), 99), ];
597
598 let result = filter_regions_by_replay_size(
599 topic,
600 regions.into_iter(),
601 avg_record_size,
602 latest_entry_id,
603 flush_trigger_size,
604 );
605 assert!(result.is_empty());
607 }
608
609 #[test]
610 fn test_multiple_regions_to_flush() {
611 let topic = "test_topic";
612 let avg_record_size = 5;
613 let latest_entry_id = 200;
614 let flush_trigger_size = ReadableSize(20);
615
616 let regions = vec![
617 (region_id(1, 1), 190), (region_id(1, 2), 180), (region_id(1, 3), 199), (region_id(1, 4), 200), ];
622
623 let result = filter_regions_by_replay_size(
624 topic,
625 regions.into_iter(),
626 avg_record_size,
627 latest_entry_id,
628 flush_trigger_size,
629 );
630 assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
632 }
633
634 fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
635 LeaderRegion {
636 datanode_id: 1,
637 manifest: LeaderRegionManifestInfo::Metric {
638 data_manifest_version: 1,
639 data_flushed_entry_id: replay_entry_id,
640 data_topic_latest_entry_id: 0,
641 metadata_manifest_version: 1,
642 metadata_flushed_entry_id: metadata_replay_entry_id,
643 metadata_topic_latest_entry_id: 0,
644 },
645 }
646 }
647
648 fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
649 LeaderRegion {
650 datanode_id: 1,
651 manifest: LeaderRegionManifestInfo::Mito {
652 manifest_version: 1,
653 flushed_entry_id: replay_entry_id,
654 topic_latest_entry_id: 0,
655 },
656 }
657 }
658
659 #[test]
660 fn test_should_persist_region_checkpoint() {
661 let current = metric_leader_region(100, 10);
663 let result = should_persist_region_checkpoint(¤t, None).unwrap();
664 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
665
666 let current = mito_leader_region(100);
668 let result =
669 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, None)))
670 .unwrap();
671 assert_eq!(result, ReplayCheckpoint::new(100, None));
672
673 let current = metric_leader_region(100, 10);
674 let result =
675 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))))
676 .unwrap();
677 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
678
679 let current = metric_leader_region(100, 10);
681 let result =
682 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
683 .unwrap();
684 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
685
686 let current = metric_leader_region(100, 10);
688 let result =
689 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)))
690 .unwrap();
691 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
692
693 let current = mito_leader_region(100);
695 let result =
696 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
697 .is_none();
698 assert!(result);
699
700 let current = metric_leader_region(100, 10);
702 let result =
703 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(10))));
704 assert!(result.is_none());
705 let current = mito_leader_region(100);
706 let result =
707 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)));
708 assert!(result.is_none());
709
710 let current = metric_leader_region(80, 11);
713 let result =
714 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
715 assert!(result.is_none());
716 let current = mito_leader_region(80);
717 let result =
718 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
719 assert!(result.is_none());
720 }
721}