1use std::collections::{HashMap, HashSet};
16use std::sync::Arc;
17use std::time::{Duration, Instant};
18
19use api::v1::meta::MailboxMessage;
20use common_base::readable_size::ReadableSize;
21use common_meta::instruction::{FlushRegions, Instruction};
22use common_meta::key::TableMetadataManagerRef;
23use common_meta::key::topic_region::{ReplayCheckpoint, TopicRegionKey, TopicRegionValue};
24use common_meta::peer::Peer;
25use common_meta::region_registry::{LeaderRegion, LeaderRegionRegistryRef};
26use common_meta::stats::topic::TopicStatsRegistryRef;
27use common_telemetry::tracing_context::TracingContext;
28use common_telemetry::{debug, error, info, warn};
29use common_time::util::current_time_millis;
30use common_wal::config::kafka::common::{
31 DEFAULT_CHECKPOINT_TRIGGER_SIZE, DEFAULT_FLUSH_TRIGGER_SIZE,
32};
33use snafu::{OptionExt, ResultExt};
34use store_api::region_request::RegionFlushReason;
35use store_api::storage::RegionId;
36use tokio::sync::mpsc::{Receiver, Sender};
37
38use crate::error::{self, Result};
39use crate::service::mailbox::{Channel, MailboxRef};
40use crate::{define_ticker, metrics};
41
42const TICKER_INTERVAL: Duration = Duration::from_secs(60);
44
45const RECENT_DURATION: Duration = Duration::from_secs(300);
47
48pub enum Event {
53 Tick,
54}
55
56pub(crate) type RegionFlushTickerRef = Arc<RegionFlushTicker>;
57
58define_ticker!(
59 RegionFlushTicker,
61 event_type = Event,
62 event_value = Event::Tick
63);
64
65pub struct RegionFlushTrigger {
73 table_metadata_manager: TableMetadataManagerRef,
75 leader_region_registry: LeaderRegionRegistryRef,
77 topic_stats_registry: TopicStatsRegistryRef,
79 mailbox: MailboxRef,
81 server_addr: String,
83 flush_trigger_size: ReadableSize,
85 checkpoint_trigger_size: ReadableSize,
87 receiver: Receiver<Event>,
89}
90
91impl RegionFlushTrigger {
92 pub(crate) fn new(
94 table_metadata_manager: TableMetadataManagerRef,
95 leader_region_registry: LeaderRegionRegistryRef,
96 topic_stats_registry: TopicStatsRegistryRef,
97 mailbox: MailboxRef,
98 server_addr: String,
99 mut flush_trigger_size: ReadableSize,
100 mut checkpoint_trigger_size: ReadableSize,
101 ) -> (Self, RegionFlushTicker) {
102 if flush_trigger_size.as_bytes() == 0 {
103 flush_trigger_size = DEFAULT_FLUSH_TRIGGER_SIZE;
104 warn!(
105 "flush_trigger_size is not set, using default value: {}",
106 flush_trigger_size
107 );
108 }
109 if checkpoint_trigger_size.as_bytes() == 0 {
110 checkpoint_trigger_size = DEFAULT_CHECKPOINT_TRIGGER_SIZE;
111 warn!(
112 "checkpoint_trigger_size is not set, using default value: {}",
113 checkpoint_trigger_size
114 );
115 }
116 let (tx, rx) = Self::channel();
117 let region_flush_ticker = RegionFlushTicker::new(TICKER_INTERVAL, tx);
118 let region_flush_trigger = Self {
119 table_metadata_manager,
120 leader_region_registry,
121 topic_stats_registry,
122 mailbox,
123 server_addr,
124 flush_trigger_size,
125 checkpoint_trigger_size,
126 receiver: rx,
127 };
128 (region_flush_trigger, region_flush_ticker)
129 }
130
131 fn channel() -> (Sender<Event>, Receiver<Event>) {
132 tokio::sync::mpsc::channel(8)
133 }
134
135 pub fn try_start(mut self) -> Result<()> {
137 common_runtime::spawn_global(async move { self.run().await });
138 info!("Region flush trigger started");
139 Ok(())
140 }
141
142 async fn run(&mut self) {
143 while let Some(event) = self.receiver.recv().await {
144 match event {
145 Event::Tick => self.handle_tick().await,
146 }
147 }
148 }
149
150 async fn handle_tick(&self) {
151 if let Err(e) = self.trigger_flush().await {
152 error!(e; "Failed to trigger flush");
153 }
154 }
155
156 async fn trigger_flush(&self) -> Result<()> {
157 let now = Instant::now();
158 let topics = self
159 .table_metadata_manager
160 .topic_name_manager()
161 .range()
162 .await
163 .context(error::TableMetadataManagerSnafu)?;
164
165 for topic in &topics {
166 let Some((latest_entry_id, avg_record_size)) = self.retrieve_topic_stat(topic) else {
167 continue;
168 };
169 if let Err(e) = self
170 .flush_regions_in_topic(topic, latest_entry_id, avg_record_size)
171 .await
172 {
173 error!(e; "Failed to flush regions in topic: {}", topic);
174 }
175 }
176
177 debug!(
178 "Triggered flush for {} topics in {:?}",
179 topics.len(),
180 now.elapsed()
181 );
182 Ok(())
183 }
184
185 fn retrieve_topic_stat(&self, topic: &str) -> Option<(u64, usize)> {
189 let Some((latest_entry_id, timestamp)) =
190 self.topic_stats_registry.get_latest_entry_id(topic)
191 else {
192 debug!("No latest entry id found for topic: {}", topic);
193 return None;
194 };
195
196 let Some(stat) = self
197 .topic_stats_registry
198 .get_calculated_topic_stat(topic, TICKER_INTERVAL)
199 else {
200 debug!("No topic stat found for topic: {}", topic);
201 return None;
202 };
203
204 let now = current_time_millis();
205 if !is_recent(timestamp, now, RECENT_DURATION) {
206 debug!(
207 "Latest entry id of topic '{}': is not recent (now: {}, stat timestamp: {})",
208 topic, timestamp, now
209 );
210 return None;
211 }
212 if !is_recent(stat.end_ts, now, RECENT_DURATION) {
213 debug!(
214 "Calculated stat of topic '{}': is not recent (now: {}, stat timestamp: {})",
215 topic, stat.end_ts, now
216 );
217 return None;
218 }
219
220 Some((latest_entry_id, stat.avg_record_size))
221 }
222
223 async fn persist_region_checkpoints(
224 &self,
225 topic: &str,
226 region_ids: &[RegionId],
227 topic_regions: &HashMap<RegionId, TopicRegionValue>,
228 leader_regions: &HashMap<RegionId, LeaderRegion>,
229 ) -> Result<()> {
230 let regions = region_ids
231 .iter()
232 .flat_map(|region_id| match leader_regions.get(region_id) {
233 Some(leader_region) => should_persist_region_checkpoint(
234 leader_region,
235 topic_regions
236 .get(region_id)
237 .cloned()
238 .and_then(|value| value.checkpoint),
239 )
240 .map(|checkpoint| {
241 (
242 TopicRegionKey::new(*region_id, topic),
243 Some(TopicRegionValue::new(Some(checkpoint))),
244 )
245 }),
246 None => None,
247 })
248 .collect::<Vec<_>>();
249
250 if regions.is_empty() {
252 return Ok(());
253 }
254
255 let max_txn_ops = self.table_metadata_manager.kv_backend().max_txn_ops();
256 let batch_size = max_txn_ops.min(regions.len());
257 for batch in regions.chunks(batch_size) {
258 self.table_metadata_manager
259 .topic_region_manager()
260 .batch_put(batch)
261 .await
262 .context(error::TableMetadataManagerSnafu)?;
263 }
264
265 metrics::METRIC_META_TRIGGERED_REGION_CHECKPOINT_TOTAL
266 .with_label_values(&[topic])
267 .inc_by(regions.len() as u64);
268
269 Ok(())
270 }
271
272 async fn flush_regions_in_topic(
273 &self,
274 topic: &str,
275 latest_entry_id: u64,
276 avg_record_size: usize,
277 ) -> Result<()> {
278 let topic_regions = self
279 .table_metadata_manager
280 .topic_region_manager()
281 .regions(topic)
282 .await
283 .context(error::TableMetadataManagerSnafu)?;
284
285 if topic_regions.is_empty() {
286 debug!("No regions found for topic: {}", topic);
287 return Ok(());
288 }
289
290 let regions_to_persist = filter_regions_by_replay_size(
292 topic,
293 topic_regions
294 .iter()
295 .map(|(region_id, value)| (*region_id, value.min_entry_id().unwrap_or_default())),
296 avg_record_size as u64,
297 latest_entry_id,
298 self.checkpoint_trigger_size,
299 );
300 let region_manifests = self
301 .leader_region_registry
302 .batch_get(topic_regions.keys().cloned());
303
304 if let Err(err) = self
305 .persist_region_checkpoints(
306 topic,
307 ®ions_to_persist,
308 &topic_regions,
309 ®ion_manifests,
310 )
311 .await
312 {
313 error!(err; "Failed to persist region checkpoints for topic: {}", topic);
314 }
315
316 let regions = region_manifests
317 .into_iter()
318 .map(|(region_id, region)| (region_id, region.manifest.prunable_entry_id()))
319 .collect::<Vec<_>>();
320 let min_entry_id = regions.iter().min_by_key(|(_, entry_id)| *entry_id);
321 if let Some((_, min_entry_id)) = min_entry_id {
322 let replay_size = (latest_entry_id.saturating_sub(*min_entry_id))
323 .saturating_mul(avg_record_size as u64);
324 metrics::METRIC_META_TOPIC_ESTIMATED_REPLAY_SIZE
325 .with_label_values(&[topic])
326 .set(replay_size as i64);
327 }
328
329 let regions_to_flush = filter_regions_by_replay_size(
331 topic,
332 regions.into_iter(),
333 avg_record_size as u64,
334 latest_entry_id,
335 self.flush_trigger_size,
336 );
337
338 if !regions_to_flush.is_empty() {
340 self.send_flush_instructions(®ions_to_flush).await?;
341 debug!(
342 "Sent {} flush instructions to datanodes for topic: '{}', regions: {:?}",
343 regions_to_flush.len(),
344 topic,
345 regions_to_flush,
346 );
347 }
348
349 metrics::METRIC_META_TRIGGERED_REGION_FLUSH_TOTAL
350 .with_label_values(&[topic])
351 .inc_by(regions_to_flush.len() as u64);
352
353 Ok(())
354 }
355
356 async fn send_flush_instructions(&self, regions_to_flush: &[RegionId]) -> Result<()> {
357 let leader_to_region_ids =
358 group_regions_by_leader(&self.table_metadata_manager, regions_to_flush).await?;
359 let flush_instructions = leader_to_region_ids
360 .into_iter()
361 .map(|(leader, region_ids)| {
362 let flush_instruction = Instruction::FlushRegions(
363 FlushRegions::async_batch(region_ids)
364 .with_reason(RegionFlushReason::RemoteWalPrune),
365 );
366 (leader, flush_instruction)
367 });
368
369 let tracing_ctx = TracingContext::from_current_span();
370 let tracing_header = tracing_ctx.to_w3c();
371 for (peer, flush_instruction) in flush_instructions {
372 let msg = MailboxMessage::json_message(
373 &format!("Flush regions: {}", flush_instruction),
374 &format!("Metasrv@{}", self.server_addr),
375 &format!("Datanode-{}@{}", peer.id, peer.addr),
376 common_time::util::current_time_millis(),
377 &flush_instruction,
378 Some(tracing_header.clone()),
379 )
380 .with_context(|_| error::SerializeToJsonSnafu {
381 input: flush_instruction.to_string(),
382 })?;
383 if let Err(e) = self
384 .mailbox
385 .send_oneway(&Channel::Datanode(peer.id), msg)
386 .await
387 {
388 error!(e; "Failed to send flush instruction to datanode {}", peer);
389 }
390 }
391
392 Ok(())
393 }
394}
395
396fn should_persist_region_checkpoint(
398 current: &LeaderRegion,
399 persisted: Option<ReplayCheckpoint>,
400) -> Option<ReplayCheckpoint> {
401 let new_checkpoint = ReplayCheckpoint::new(
402 current.manifest.replay_entry_id(),
403 current.manifest.metadata_replay_entry_id(),
404 );
405
406 let Some(persisted) = persisted else {
407 return Some(new_checkpoint);
408 };
409
410 if new_checkpoint > persisted {
411 return Some(new_checkpoint);
412 }
413 None
414}
415
416fn filter_regions_by_replay_size<I: Iterator<Item = (RegionId, u64)>>(
422 topic: &str,
423 regions: I,
424 avg_record_size: u64,
425 latest_entry_id: u64,
426 threshold: ReadableSize,
427) -> Vec<RegionId> {
428 let mut regions_to_flush = Vec::new();
429 for (region_id, entry_id) in regions {
430 if entry_id < latest_entry_id {
431 let replay_size = (latest_entry_id - entry_id).saturating_mul(avg_record_size);
432 if replay_size > threshold.as_bytes() {
433 debug!(
434 "Region {}: estimated replay size {} exceeds threshold {}, entry id: {}, topic latest entry id: {}, topic: '{}'",
435 region_id,
436 ReadableSize(replay_size),
437 threshold,
438 entry_id,
439 latest_entry_id,
440 topic
441 );
442 regions_to_flush.push(region_id);
443 }
444 }
445 }
446
447 regions_to_flush
448}
449
450async fn group_regions_by_leader(
454 table_metadata_manager: &TableMetadataManagerRef,
455 regions_to_flush: &[RegionId],
456) -> Result<HashMap<Peer, Vec<RegionId>>> {
457 let table_ids = regions_to_flush
458 .iter()
459 .map(|region_id| region_id.table_id())
460 .collect::<HashSet<_>>()
461 .into_iter()
462 .collect::<Vec<_>>();
463 let table_ids_table_routes = table_metadata_manager
464 .table_route_manager()
465 .batch_get_physical_table_routes(&table_ids)
466 .await
467 .context(error::TableMetadataManagerSnafu)?;
468
469 let mut peer_region_ids_map: HashMap<Peer, Vec<RegionId>> = HashMap::new();
470 for region_id in regions_to_flush {
471 let table_id = region_id.table_id();
472 let table_route = table_ids_table_routes
473 .get(&table_id)
474 .context(error::TableRouteNotFoundSnafu { table_id })?;
475 let Some(region_route) = table_route
476 .region_routes
477 .iter()
478 .find(|r| r.region.id == *region_id)
479 else {
480 continue;
481 };
482 let Some(peer) = ®ion_route.leader_peer else {
483 continue;
484 };
485
486 match peer_region_ids_map.get_mut(peer) {
487 Some(region_ids) => {
488 region_ids.push(*region_id);
489 }
490 None => {
491 peer_region_ids_map.insert(peer.clone(), vec![*region_id]);
492 }
493 }
494 }
495 Ok(peer_region_ids_map)
496}
497
498fn is_recent(timestamp: i64, now: i64, duration: Duration) -> bool {
502 let duration = duration.as_millis() as i64;
503 now.saturating_sub(timestamp) < duration
504}
505
506#[cfg(test)]
507mod tests {
508 use std::sync::Arc;
509
510 use common_base::readable_size::ReadableSize;
511 use common_meta::instruction::FlushStrategy;
512 use common_meta::key::TableMetadataManager;
513 use common_meta::kv_backend::memory::MemoryKvBackend;
514 use common_meta::region_registry::{LeaderRegionManifestInfo, LeaderRegionRegistry};
515 use common_meta::sequence::SequenceBuilder;
516 use common_meta::stats::topic::TopicStatsRegistry;
517 use store_api::storage::RegionId;
518
519 use super::*;
520 use crate::handler::HeartbeatMailbox;
521 use crate::procedure::test_util::{MailboxContext, new_wal_prune_metadata};
522
523 #[test]
524 fn test_is_recent() {
525 let now = current_time_millis();
526 assert!(is_recent(now - 999, now, Duration::from_secs(1)));
527 assert!(!is_recent(now - 1001, now, Duration::from_secs(1)));
528 }
529
530 fn region_id(table: u32, region: u32) -> RegionId {
531 RegionId::new(table, region)
532 }
533
534 #[test]
535 fn test_no_regions_to_flush_when_none_exceed_threshold() {
536 let topic = "test_topic";
537 let avg_record_size = 10;
538 let latest_entry_id = 100;
539 let flush_trigger_size = ReadableSize(1000); let regions = vec![
543 (region_id(1, 1), 99), (region_id(1, 2), 98), (region_id(1, 3), 95), ];
547
548 let result = filter_regions_by_replay_size(
549 topic,
550 regions.into_iter(),
551 avg_record_size,
552 latest_entry_id,
553 flush_trigger_size,
554 );
555 assert!(result.is_empty());
556 }
557
558 #[test]
559 fn test_regions_to_flush_when_some_exceed_threshold() {
560 let topic = "test_topic";
561 let avg_record_size = 10;
562 let latest_entry_id = 100;
563 let flush_trigger_size = ReadableSize(50); let regions = vec![
567 (region_id(1, 1), 99), (region_id(1, 2), 98), (region_id(1, 3), 90), ];
571
572 let result = filter_regions_by_replay_size(
573 topic,
574 regions.into_iter(),
575 avg_record_size,
576 latest_entry_id,
577 flush_trigger_size,
578 );
579 assert_eq!(result, vec![region_id(1, 3)]);
580 }
581
582 #[test]
583 fn test_regions_to_flush_with_zero_avg_record_size() {
584 let topic = "test_topic";
585 let avg_record_size = 0;
586 let latest_entry_id = 100;
587 let flush_trigger_size = ReadableSize(1);
588
589 let regions = vec![(region_id(1, 1), 50), (region_id(1, 2), 10)];
590
591 let result = filter_regions_by_replay_size(
593 topic,
594 regions.into_iter(),
595 avg_record_size,
596 latest_entry_id,
597 flush_trigger_size,
598 );
599 assert!(result.is_empty());
600 }
601
602 #[test]
603 fn test_regions_to_flush_with_prunable_entry_id_equal_latest() {
604 let topic = "test_topic";
605 let avg_record_size = 10;
606 let latest_entry_id = 100;
607 let flush_trigger_size = ReadableSize(10);
608
609 let regions = vec![
610 (region_id(1, 1), 100), (region_id(1, 2), 99), ];
613
614 let result = filter_regions_by_replay_size(
615 topic,
616 regions.into_iter(),
617 avg_record_size,
618 latest_entry_id,
619 flush_trigger_size,
620 );
621 assert!(result.is_empty());
623 }
624
625 #[test]
626 fn test_multiple_regions_to_flush() {
627 let topic = "test_topic";
628 let avg_record_size = 5;
629 let latest_entry_id = 200;
630 let flush_trigger_size = ReadableSize(20);
631
632 let regions = vec![
633 (region_id(1, 1), 190), (region_id(1, 2), 180), (region_id(1, 3), 199), (region_id(1, 4), 200), ];
638
639 let result = filter_regions_by_replay_size(
640 topic,
641 regions.into_iter(),
642 avg_record_size,
643 latest_entry_id,
644 flush_trigger_size,
645 );
646 assert_eq!(result, vec![region_id(1, 1), region_id(1, 2)]);
648 }
649
650 fn metric_leader_region(replay_entry_id: u64, metadata_replay_entry_id: u64) -> LeaderRegion {
651 LeaderRegion {
652 datanode_id: 1,
653 manifest: LeaderRegionManifestInfo::Metric {
654 data_manifest_version: 1,
655 data_flushed_entry_id: replay_entry_id,
656 data_topic_latest_entry_id: 0,
657 metadata_manifest_version: 1,
658 metadata_flushed_entry_id: metadata_replay_entry_id,
659 metadata_topic_latest_entry_id: 0,
660 },
661 }
662 }
663
664 fn mito_leader_region(replay_entry_id: u64) -> LeaderRegion {
665 LeaderRegion {
666 datanode_id: 1,
667 manifest: LeaderRegionManifestInfo::Mito {
668 manifest_version: 1,
669 flushed_entry_id: replay_entry_id,
670 topic_latest_entry_id: 0,
671 },
672 }
673 }
674
675 #[test]
676 fn test_should_persist_region_checkpoint() {
677 let current = metric_leader_region(100, 10);
679 let result = should_persist_region_checkpoint(¤t, None).unwrap();
680 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
681
682 let current = mito_leader_region(100);
684 let result =
685 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, None)))
686 .unwrap();
687 assert_eq!(result, ReplayCheckpoint::new(100, None));
688
689 let current = metric_leader_region(100, 10);
690 let result =
691 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))))
692 .unwrap();
693 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
694
695 let current = metric_leader_region(100, 10);
697 let result =
698 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
699 .unwrap();
700 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
701
702 let current = metric_leader_region(100, 10);
704 let result =
705 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)))
706 .unwrap();
707 assert_eq!(result, ReplayCheckpoint::new(100, Some(10)));
708
709 let current = mito_leader_region(100);
711 let result =
712 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(8))))
713 .is_none();
714 assert!(result);
715
716 let current = metric_leader_region(100, 10);
718 let result =
719 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, Some(10))));
720 assert!(result.is_none());
721 let current = mito_leader_region(100);
722 let result =
723 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(100, None)));
724 assert!(result.is_none());
725
726 let current = metric_leader_region(80, 11);
729 let result =
730 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
731 assert!(result.is_none());
732 let current = mito_leader_region(80);
733 let result =
734 should_persist_region_checkpoint(¤t, Some(ReplayCheckpoint::new(90, Some(10))));
735 assert!(result.is_none());
736 }
737
738 #[tokio::test]
739 async fn test_send_flush_instructions_payload_includes_remote_wal_prune_reason() {
740 let kv_backend = Arc::new(MemoryKvBackend::new());
741 let table_metadata_manager = Arc::new(TableMetadataManager::new(kv_backend.clone()));
742 let leader_region_registry = Arc::new(LeaderRegionRegistry::new());
743 let topic_stats_registry = Arc::new(TopicStatsRegistry::default());
744 let mailbox_sequence =
745 SequenceBuilder::new("test_remote_wal_prune_flush_reason", kv_backend).build();
746 let mut mailbox_ctx = MailboxContext::new(mailbox_sequence);
747
748 let (tx, mut rx) = tokio::sync::mpsc::channel(1);
749 mailbox_ctx
750 .insert_heartbeat_response_receiver(Channel::Datanode(1), tx)
751 .await;
752
753 let (trigger, _ticker) = RegionFlushTrigger::new(
754 table_metadata_manager.clone(),
755 leader_region_registry.clone(),
756 topic_stats_registry,
757 mailbox_ctx.mailbox().clone(),
758 "127.0.0.1:3002".to_string(),
759 ReadableSize(1),
760 ReadableSize(1),
761 );
762
763 let topic = "test_topic".to_string();
764 new_wal_prune_metadata(
765 table_metadata_manager,
766 leader_region_registry,
767 1,
768 1,
769 &[0],
770 topic,
771 )
772 .await;
773
774 let region_id = RegionId::new(0, 0);
775 trigger.send_flush_instructions(&[region_id]).await.unwrap();
776
777 let response = rx.recv().await.unwrap().unwrap();
778 let msg = response.mailbox_message.unwrap();
779 let instruction = HeartbeatMailbox::json_instruction(&msg).unwrap();
780 let Instruction::FlushRegions(flush_regions) = instruction else {
781 panic!("Expected FlushRegions instruction");
782 };
783
784 assert_eq!(flush_regions.region_ids, vec![region_id]);
785 assert_eq!(flush_regions.strategy, FlushStrategy::Async);
786 assert_eq!(
787 flush_regions.reason,
788 Some(RegionFlushReason::RemoteWalPrune)
789 );
790 }
791}