1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_base::readable_size::ReadableSize;
21use common_meta::instruction::{FlushRegions, Instruction};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
24use common_meta::peer::Peer;
25use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef};
26use common_meta::stats::topic::TopicStatsRegistryRef;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{debug, error, info, warn};
29use common_time::util::current_time_millis;
30use common_wal::config::kafka::common::{
31 DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
32};
33use snafu::{OptionExt, ResultExt};
34use store_api::storage::RegionId;
35use tokio::sync::mpsc::{Receiver, Sender};
36
37use crate::error::{self, Result};
38use crate::service::mailbox::{Channel, MailboxRef};
39use crate::{define_ticker, metrics};
40
41const TICKER_INTERVAL: Duration = Duration::from_secs(60);
43
44const RECENT_DURATION: Duration = Duration::from_secs(300);
46
47pub enum Event {
52 Tick,
53}
54
55pub(crate) type RegionFlushTickerRef = Arc<RegionFlushTicker>;
56
57define_ticker!(
58 RegionFlushTicker,
60 event_type = Event,
61 event_value = Event::Tick
62);
63
64pub struct RegionFlushTrigger {
72 table_metadata_manager: TableMetadataManagerRef,
74 leader_region_registry: LeaderRegionRegistryRef,
76 topic_stats_registry: TopicStatsRegistryRef,
78 mailbox: MailboxRef,
80 server_addr: String,
82 flush_trigger_size: ReadableSize,
84 checkpoint_trigger_size: ReadableSize,
86 receiver: Receiver<Event>,
88}
89
90impl RegionFlushTrigger {
91 pub(crate) fn new(
93 table_metadata_manager: TableMetadataManagerRef,
94 leader_region_registry: LeaderRegionRegistryRef,
95 topic_stats_registry: TopicStatsRegistryRef,
96 mailbox: MailboxRef,
97 server_addr: String,
98 mut flush_trigger_size: ReadableSize,
99 mut checkpoint_trigger_size: ReadableSize,
100 ) -> (Self, RegionFlushTicker) {
101 if flush_trigger_size.as_bytes() == 0 {
102 flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
103 warn!(
104 "flush_trigger_size is not set, using default value: {}",
105 flush_trigger_size
106 );
107 }
108 if checkpoint_trigger_size.as_bytes() == 0 {
109 checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE;
110 warn!(
111 "checkpoint_trigger_size is not set, using default value: {}",
112 checkpoint_trigger_size
113 );
114 }
115 let (tx, rx) = Self::channel();
116 let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx);
117 let region_flush_trigger = Self {
118 table_metadata_manager,
119 leader_region_registry,
120 topic_stats_registry,
121 mailbox,
122 server_addr,
123 flush_trigger_size,
124 checkpoint_trigger_size,
125 receiver: rx,
126 };
127 (region_flush_trigger, region_flush_ticker)
128 }
129
130 fn channel() -> (Sender<Event>, Receiver<Event>) {
131 tokio::sync::mpsc::channel(8)
132 }
133
134 pub fn try_start(mut self) -> Result<()> {
136 common_runtime::spawn_global(async move { self.run().await });
137 info!("Region flush trigger started");
138 Ok(())
139 }
140
141 async fn run(&mut self) {
142 while let Some(event) = self.receiver.recv().await {
143 match event {
144 Event::Tick => self.handle_tick().await,
145 }
146 }
147 }
148
149 async fn handle_tick(&self) {
150 if let Err(e) = self.trigger_flush().await {
151 error!(e; "Failed to trigger flush");
152 }
153 }
154
155 async fn trigger_flush(&self) -> Result<()> {
156 let now = Instant::now();
157 let topics = self
158 .table_metadata_manager
159 .topic_name_manager()
160 .range()
161 .await
162 .context(error::TableMetadataManagerSnafu)?;
163
164 for topic in &topics {
165 let Some((latest_entry_id, avg_record_size)) = self.retrieve_topic_stat(topic) else {
166 continue;
167 };
168 if let Err(e) = self
169 .flush_regions_in_topic(topic, latest_entry_id, avg_record_size)
170 .await
171 {
172 error!(e; "Failed to flush regions in topic: {}", topic);
173 }
174 }
175
176 debug!(
177 "Triggered flush for {} topics in {:?}",
178 topics.len(),
179 now.elapsed()
180 );
181 Ok(())
182 }
183
184 fn retrieve_topic_stat(&self, topic: &str) -> Option<(u64, usize)> {
188 let Some((latest_entry_id, timestamp)) =
189 self.topic_stats_registry.get_latest_entry_id(topic)
190 else {
191 debug!("No latest entry id found for topic: {}", topic);
192 return None;
193 };
194
195 let Some(stat) = self
196 .topic_stats_registry
197 .get_calculated_topic_stat(topic, TICKER_INTERVAL)
198 else {
199 debug!("No topic stat found for topic: {}", topic);
200 return None;
201 };
202
203 let now = current_time_millis();
204 if !is_recent(timestamp, now, RECENT_DURATION) {
205 debug!(
206 "Latest entry id of topic '{}': is not recent (now: {}, stat timestamp: {})",
207 topic, timestamp, now
208 );
209 return None;
210 }
211 if !is_recent(stat.end_ts, now, RECENT_DURATION) {
212 debug!(
213 "Calculated stat of topic '{}': is not recent (now: {}, stat timestamp: {})",
214 topic, stat.end_ts, now
215 );
216 return None;
217 }
218
219 Some((latest_entry_id, stat.avg_record_size))
220 }
221
222 async fn persist_region_checkpoints(
223 &self,
224 topic: &str,
225 region_ids: &[RegionId],
226 topic_regions: &HashMap<RegionId, TopicRegionValue>,
227 leader_regions: &HashMap<RegionId, LeaderRegion>,
228 ) -> Result<()> {
229 let regions = region_ids
230 .iter()
231 .flat_map(|region_id| match leader_regions.get(region_id) {
232 Some(leader_region) => should_persist_region_checkpoint(
233 leader_region,
234 topic_regions
235 .get(region_id)
236 .cloned()
237 .and_then(|value| value.checkpoint),
238 )
239 .map(|checkpoint| {
240 (
241 TopicRegionKey::new(*region_id, topic),
242 Some(TopicRegionValue::new(Some(checkpoint))),
243 )
244 }),
245 None => None,
246 })
247 .collect::<Vec<_>>();
248
249 if regions.is_empty() {
251 return Ok(());
252 }
253
254 let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
255 let batch_size = max_txn_ops.min(regions.len());
256 for batch in regions.chunks(batch_size) {
257 self.table_metadata_manager
258 .topic_region_manager()
259 .batch_put(batch)
260 .await
261 .context(error::TableMetadataManagerSnafu)?;
262 }
263
264 metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL
265 .with_label_values(&[topic])
266 .inc_by(regions.len() as u64);
267
268 Ok(())
269 }
270
271 async fn flush_regions_in_topic(
272 &self,
273 topic: &str,
274 latest_entry_id: u64,
275 avg_record_size: usize,
276 ) -> Result<()> {
277 let topic_regions = self
278 .table_metadata_manager
279 .topic_region_manager()
280 .regions(topic)
281 .await
282 .context(error::TableMetadataManagerSnafu)?;
283
284 if topic_regions.is_empty() {
285 debug!("No regions found for topic: {}", topic);
286 return Ok(());
287 }
288
289 let regions_to_persist = filter_regions_by_replay_size(
291 topic,
292 topic_regions
293 .iter()
294 .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
295 avg_record_size as u64,
296 latest_entry_id,
297 self.checkpoint_trigger_size,
298 );
299 let region_manifests = self
300 .leader_region_registry
301 .batch_get(topic_regions.keys().cloned());
302
303 if let Err(err) = self
304 .persist_region_checkpoints(
305 topic,
306 ®ions_to_persist,
307 &topic_regions,
308 ®ion_manifests,
309 )
310 .await
311 {
312 error!(err; "Failed to persist region checkpoints for topic: {}", topic);
313 }
314
315 let regions = region_manifests
316 .into_iter()
317 .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
318 .collect::<Vec<_>>();
319 let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
320 if let Some((_, min_entry_id)) = min_entry_id {
321 let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
322 .saturating_mul(avg_record_size as u64);
323 metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE
324 .with_label_values(&[topic])
325 .set(replay_size as i64);
326 }
327
328 let regions_to_flush = filter_regions_by_replay_size(
330 topic,
331 regions.into_iter(),
332 avg_record_size as u64,
333 latest_entry_id,
334 self.flush_trigger_size,
335 );
336
337 if !regions_to_flush.is_empty() {
339 self.send_flush_instructions(®ions_to_flush).await?;
340 debug!(
341 "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
342 regions_to_flush.len(),
343 topic,
344 regions_to_flush,
345 );
346 }
347
348 metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
349 .with_label_values(&[topic])
350 .inc_by(regions_to_flush.len() as u64);
351
352 Ok(())
353 }
354
355 async fn send_flush_instructions(&self, regions_to_flush: &[RegionId]) -> Result<()> {
356 let leader_to_region_ids =
357 group_regions_by_leader(&self.table_metadata_manager, regions_to_flush).await?;
358 let flush_instructions = leader_to_region_ids
359 .into_iter()
360 .map(|(leader, region_ids)| {
361 let flush_instruction =
362 Instruction::FlushRegions(FlushRegions::async_batch(region_ids));
363 (leader, flush_instruction)
364 });
365
366 let tracing_ctx = TracingContext::from_current_span();
367 let tracing_header = tracing_ctx.to_w3c();
368 for (peer, flush_instruction) in flush_instructions {
369 let msg = MailboxMessage::json_message(
370 &format!("Flush regions: {}", flush_instruction),
371 &format!("Metasrv@{}", self.server_addr),
372 &format!("Datanode-{}@{}", peer.id, peer.addr),
373 common_time::util::current_time_millis(),
374 &flush_instruction,
375 Some(tracing_header.clone()),
376 )
377 .with_context(|_| error::SerializeToJsonSnafu {
378 input: flush_instruction.to_string(),
379 })?;
380 if let Err(e) = self
381 .mailbox
382 .send_oneway(&Channel::Datanode(peer.id), msg)
383 .await
384 {
385 error!(e; "Failed to send flush instruction to datanode {}", peer);
386 }
387 }
388
389 Ok(())
390 }
391}
392
393fn should_persist_region_checkpoint(
395 current: &LeaderRegion,
396 persisted: Option<ReplayCheckpoint>,
397) -> Option<ReplayCheckpoint> {
398 let new_checkpoint = ReplayCheckpoint::new(
399 current.manifest.replay_entry_id(),
400 current.manifest.metadata_replay_entry_id(),
401 );
402
403 let Some(persisted) = persisted else {
404 return Some(new_checkpoint);
405 };
406
407 if new_checkpoint > persisted {
408 return Some(new_checkpoint);
409 }
410 None
411}
412
413fn filter_regions_by_replay_size<I: Iterator<Item = (RegionId, u64)>>(
419 topic: &str,
420 regions: I,
421 avg_record_size: u64,
422 latest_entry_id: u64,
423 threshold: ReadableSize,
424) -> Vec<RegionId> {
425 let mut regions_to_flush = Vec::new();
426 for (region_id, entry_id) in regions {
427 if entry_id < latest_entry_id {
428 let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size);
429 if replay_size > threshold.as_bytes() {
430 debug!(
431 "Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'",
432 region_id,
433 ReadableSize(replay_size),
434 threshold,
435 entry_id,
436 latest_entry_id,
437 topic
438 );
439 regions_to_flush.push(region_id);
440 }
441 }
442 }
443
444 regions_to_flush
445}
446
447async fn group_regions_by_leader(
451 table_metadata_manager: &TableMetadataManagerRef,
452 regions_to_flush: &[RegionId],
453) -> Result<HashMap<Peer, Vec<RegionId>>> {
454 let table_ids = regions_to_flush
455 .iter()
456 .map(|region_id| region_id.table_id())
457 .collect::<HashSet<_>>()
458 .into_iter()
459 .collect::<Vec<_>>();
460 let table_ids_table_routes = table_metadata_manager
461 .table_route_manager()
462 .batch_get_physical_table_routes(&table_ids)
463 .await
464 .context(error::TableMetadataManagerSnafu)?;
465
466 let mut peer_region_ids_map: HashMap<Peer, Vec<RegionId>> = HashMap::new();
467 for region_id in regions_to_flush {
468 let table_id = region_id.table_id();
469 let table_route = table_ids_table_routes
470 .get(&table_id)
471 .context(error::TableRouteNotFoundSnafu { table_id })?;
472 let Some(region_route) = table_route
473 .region_routes
474 .iter()
475 .find(|r| r.region.id == *region_id)
476 else {
477 continue;
478 };
479 let Some(peer) = ®ion_route.leader_peer else {
480 continue;
481 };
482
483 match peer_region_ids_map.get_mut(peer) {
484 Some(region_ids) => {
485 region_ids.push(*region_id);
486 }
487 None => {
488 peer_region_ids_map.insert(peer.clone(), vec![*region_id]);
489 }
490 }
491 }
492 Ok(peer_region_ids_map)
493}
494
495fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
499 let duration = duration.as_millis() as i64;
500 now.saturating_sub(timestamp) < duration
501}
502
503#[cfg(test)]
504mod tests {
505 use common_base::readable_size::ReadableSize;
506 use common_meta::region_registry::LeaderRegionManifestInfo;
507 use store_api::storage::RegionId;
508
509 use super::*;
510
511 #[test]
512 fn test_is_recent() {
513 let now = current_time_millis();
514 assert!(is_recent(now - 999, now, Duration::from_secs(1)));
515 assert!(!is_recent(now - 1001, now, Duration::from_secs(1)));
516 }
517
518 fn region_id(table: u32, region: u32) -> RegionId {
519 RegionId::new(table, region)
520 }
521
522 #[test]
523 fn test_no_regions_to_flush_when_none_exceed_threshold() {
524 let topic = "test_topic";
525 let avg_record_size = 10;
526 let latest_entry_id = 100;
527 let flush_trigger_size = ReadableSize(1000); let regions = vec![
531 (region_id(1, 1), 99), (region_id(1, 2), 98), (region_id(1, 3), 95), ];
535
536 let result = filter_regions_by_replay_size(
537 topic,
538 regions.into_iter(),
539 avg_record_size,
540 latest_entry_id,
541 flush_trigger_size,
542 );
543 assert!(result.is_empty());
544 }
545
546 #[test]
547 fn test_regions_to_flush_when_some_exceed_threshold() {
548 let topic = "test_topic";
549 let avg_record_size = 10;
550 let latest_entry_id = 100;
551 let flush_trigger_size = ReadableSize(50); let regions = vec![
555 (region_id(1, 1), 99), (region_id(1, 2), 98), (region_id(1, 3), 90), ];
559
560 let result = filter_regions_by_replay_size(
561 topic,
562 regions.into_iter(),
563 avg_record_size,
564 latest_entry_id,
565 flush_trigger_size,
566 );
567 assert_eq!(result, vec![region_id(1, 3)]);
568 }
569
570 #[test]
571 fn test_regions_to_flush_with_zero_avg_record_size() {
572 let topic = "test_topic";
573 let avg_record_size = 0;
574 let latest_entry_id = 100;
575 let flush_trigger_size = ReadableSize(1);
576
577 let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
578
579 let result = filter_regions_by_replay_size(
581 topic,
582 regions.into_iter(),
583 avg_record_size,
584 latest_entry_id,
585 flush_trigger_size,
586 );
587 assert!(result.is_empty());
588 }
589
590 #[test]
591 fn test_regions_to_flush_with_prunable_entry_id_equal_latest() {
592 let topic = "test_topic";
593 let avg_record_size = 10;
594 let latest_entry_id = 100;
595 let flush_trigger_size = ReadableSize(10);
596
597 let regions = vec![
598 (region_id(1, 1), 100), (region_id(1, 2), 99), ];
601
602 let result = filter_regions_by_replay_size(
603 topic,
604 regions.into_iter(),
605 avg_record_size,
606 latest_entry_id,
607 flush_trigger_size,
608 );
609 assert!(result.is_empty());
611 }
612
613 #[test]
614 fn test_multiple_regions_to_flush() {
615 let topic = "test_topic";
616 let avg_record_size = 5;
617 let latest_entry_id = 200;
618 let flush_trigger_size = ReadableSize(20);
619
620 let regions = vec![
621 (region_id(1, 1), 190), (region_id(1, 2), 180), (region_id(1, 3), 199), (region_id(1, 4), 200), ];
626
627 let result = filter_regions_by_replay_size(
628 topic,
629 regions.into_iter(),
630 avg_record_size,
631 latest_entry_id,
632 flush_trigger_size,
633 );
634 assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
636 }
637
638 fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
639 LeaderRegion {
640 datanode_id: 1,
641 manifest: LeaderRegionManifestInfo::Metric {
642 data_manifest_version: 1,
643 data_flushed_entry_id: replay_entry_id,
644 data_topic_latest_entry_id: 0,
645 metadata_manifest_version: 1,
646 metadata_flushed_entry_id: metadata_replay_entry_id,
647 metadata_topic_latest_entry_id: 0,
648 },
649 }
650 }
651
652 fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
653 LeaderRegion {
654 datanode_id: 1,
655 manifest: LeaderRegionManifestInfo::Mito {
656 manifest_version: 1,
657 flushed_entry_id: replay_entry_id,
658 topic_latest_entry_id: 0,
659 },
660 }
661 }
662
663 #[test]
664 fn test_should_persist_region_checkpoint() {
665 let current = metric_leader_region(100, 10);
667 let result = should_persist_region_checkpoint(¤t, None).unwrap();
668 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
669
670 let current = mito_leader_region(100);
672 let result =
673 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, None)))
674 .unwrap();
675 assert_eq!(result, ReplayCheckpoint::new(100, None));
676
677 let current = metric_leader_region(100, 10);
678 let result =
679 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))))
680 .unwrap();
681 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
682
683 let current = metric_leader_region(100, 10);
685 let result =
686 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
687 .unwrap();
688 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
689
690 let current = metric_leader_region(100, 10);
692 let result =
693 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)))
694 .unwrap();
695 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
696
697 let current = mito_leader_region(100);
699 let result =
700 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
701 .is_none();
702 assert!(result);
703
704 let current = metric_leader_region(100, 10);
706 let result =
707 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(10))));
708 assert!(result.is_none());
709 let current = mito_leader_region(100);
710 let result =
711 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)));
712 assert!(result.is_none());
713
714 let current = metric_leader_region(80, 11);
717 let result =
718 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
719 assert!(result.is_none());
720 let current = mito_leader_region(80);
721 let result =
722 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
723 assert!(result.is_none());
724 }
725}