1use std::collections::{HashMap, VecDeque};
16use std::sync::{Arc, RwLock};
17use std::time::Duration;
18
19use common_telemetry::{debug, warn};
20use datafusion_common::HashSet;
21
22use crate::datanode::TopicStat;
23use crate::distributed_time_constants::{
24 TOPIC_STATS_REPORT_INTERVAL_SECS, TOPIC_STATS_RETENTION_SECS,
25};
26use crate::DatanodeId;
27
28pub type TopicStatsRegistryRef = Arc<TopicStatsRegistry>;
29
30pub struct TopicStatsRegistry {
32 inner: RwLock<TopicStatsStore>,
33}
34
35impl Default for TopicStatsRegistry {
36 fn default() -> Self {
37 Self::new(
38 Duration::from_secs(TOPIC_STATS_RETENTION_SECS),
39 Duration::from_secs(TOPIC_STATS_REPORT_INTERVAL_SECS),
40 )
41 }
42}
43
44impl TopicStatsRegistry {
45 fn new(retention: Duration, window_size: Duration) -> Self {
50 let history_limit = (retention.as_secs() / window_size.as_secs()).max(10) as usize;
51 Self {
52 inner: RwLock::new(TopicStatsStore::new(history_limit, window_size)),
53 }
54 }
55
56 pub fn add_stat(&self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) {
58 let mut inner = self.inner.write().unwrap();
59 inner.add_stat(datanode_id, stat, millis_ts);
60 }
61
62 pub fn add_stats(&self, datanode_id: DatanodeId, stats: &[TopicStat], millis_ts: i64) {
64 if stats.is_empty() {
65 return;
66 }
67
68 let mut inner = self.inner.write().unwrap();
69 for stat in stats {
70 inner.add_stat(datanode_id, stat, millis_ts);
71 }
72 }
73
74 pub fn get_calculated_topic_stat(
76 &self,
77 topic: &str,
78 period: Duration,
79 ) -> Option<CalculatedTopicStat> {
80 let inner = self.inner.read().unwrap();
81 inner.get_calculated_topic_stat(topic, period)
82 }
83
84 pub fn get_latest_entry_id(&self, topic: &str) -> Option<(u64, i64)> {
86 let inner = self.inner.read().unwrap();
87 inner.get_latest_entry_id(topic)
88 }
89}
90
91#[derive(Debug, PartialEq, Clone, Default)]
92struct HistoryTopicStat {
93 pub latest_entry_id: u64,
95 pub record_size: u64,
97 pub record_num: u64,
99 start_ts: i64,
101}
102
103#[derive(Debug)]
104struct PartialTopicStat {
105 pub latest_entry_id: u64,
107 pub record_size: u64,
109 pub record_num: u64,
111 pub timestamp: i64,
113}
114
115struct ActiveBucket {
116 buffer: HashMap<DatanodeId, HashMap<String, PartialTopicStat>>,
117 start_ts: i64,
118 window_size: Duration,
119}
120
121impl ActiveBucket {
122 fn new(timestamp: i64, window_sec: Duration) -> Self {
123 Self {
124 buffer: HashMap::new(),
125 start_ts: timestamp,
126 window_size: window_sec,
127 }
128 }
129
130 fn acceptable_ts(&self, millis_ts: i64) -> bool {
131 let acceptable = millis_ts >= self.start_ts
132 && millis_ts < self.start_ts + self.window_size.as_millis() as i64;
133 if !acceptable {
134 debug!(
135 "acceptable range: ts >= {} && ts < {}, ts: {}",
136 self.start_ts,
137 self.start_ts + self.window_size.as_millis() as i64,
138 millis_ts
139 );
140 }
141 acceptable
142 }
143
144 fn add_stat(&mut self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) -> bool {
149 if !self.acceptable_ts(millis_ts) {
150 return false;
151 }
152
153 let datanode_stats = self.buffer.entry(datanode_id).or_default();
154
155 if let Some(prev) = datanode_stats.get_mut(&stat.topic) {
157 if millis_ts > prev.timestamp {
158 *prev = PartialTopicStat {
159 latest_entry_id: stat.latest_entry_id,
160 record_size: stat.record_size,
161 record_num: stat.record_num,
162 timestamp: millis_ts,
163 };
164 } else {
165 warn!(
166 "Ignore stale topic stat for topic: {}, timestamp: {}, last recorded timestamp: {}",
167 stat.topic, millis_ts, prev.timestamp
168 );
169 }
170 } else {
171 datanode_stats.insert(
172 stat.topic.to_string(),
173 PartialTopicStat {
174 latest_entry_id: stat.latest_entry_id,
175 record_size: stat.record_size,
176 record_num: stat.record_num,
177 timestamp: millis_ts,
178 },
179 );
180 }
181 true
182 }
183
184 fn merge(self) -> HashMap<String, HistoryTopicStat> {
185 let all_topics = self
186 .buffer
187 .values()
188 .flat_map(|stats| stats.keys())
189 .collect::<HashSet<_>>();
190
191 let mut output = HashMap::with_capacity(all_topics.len());
192 for topic in all_topics {
193 let stats = self
194 .buffer
195 .values()
196 .flat_map(|stats| stats.get(topic))
197 .collect::<Vec<_>>();
198 debug!("stats: {:?} for topic: {}", stats, topic);
199 let latest_entry_id = stats
200 .iter()
201 .map(|stat| stat.latest_entry_id)
202 .max()
203 .unwrap_or(0);
204 let record_size = stats.iter().map(|stat| stat.record_size).sum::<u64>();
205 let record_num = stats.iter().map(|stat| stat.record_num).sum::<u64>();
206
207 output.insert(
208 topic.to_string(),
209 HistoryTopicStat {
210 latest_entry_id,
211 record_size,
212 record_num,
213 start_ts: self.start_ts,
214 },
215 );
216 }
217
218 output
219 }
220
221 #[cfg(test)]
223 fn get_stat(&self, datanode_id: DatanodeId, topic: &str) -> Option<&PartialTopicStat> {
224 self.buffer
225 .get(&datanode_id)
226 .and_then(|stats| stats.get(topic))
227 }
228}
229
230struct TopicStatsStore {
232 active_bucket: Option<ActiveBucket>,
234 history_by_topic: HashMap<String, VecDeque<HistoryTopicStat>>,
236 history_limit: usize,
238 window_size: Duration,
240}
241
242impl TopicStatsStore {
243 fn new(history_limit: usize, window_size: Duration) -> Self {
245 Self {
246 active_bucket: None,
247 history_by_topic: HashMap::new(),
248 history_limit,
249 window_size,
250 }
251 }
252
253 fn align_ts(millis_ts: i64) -> i64 {
255 (millis_ts / 1000) * 1000
256 }
257
258 fn rotate_active_bucket(&mut self, start_ts: i64) {
259 let aligned_ts = Self::align_ts(start_ts);
260 if let Some(old_bucket) = self.active_bucket.take() {
261 let merged = old_bucket.merge();
262 for (topic, stat) in merged {
263 debug!(
264 "Merge current topic: {}, stats into history: {:?}",
265 topic, stat
266 );
267 let history = self.history_by_topic.entry(topic).or_default();
268 history.push_back(stat);
269 if history.len() > self.history_limit {
270 history.pop_front();
271 }
272 }
273 }
274
275 self.active_bucket = Some(ActiveBucket::new(aligned_ts, self.window_size));
276 }
277
278 fn add_stat(&mut self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) {
280 let aligned_ts = Self::align_ts(millis_ts);
281
282 let need_rotate = match &self.active_bucket {
283 Some(bucket) => !bucket.acceptable_ts(aligned_ts),
284 None => true,
285 };
286
287 if need_rotate {
288 debug!("Rotate active bucket at ts: {}", aligned_ts);
289 self.rotate_active_bucket(aligned_ts);
290 }
291
292 let active_bucket = self.active_bucket.as_mut().unwrap();
294 let added = active_bucket.add_stat(datanode_id, stat, millis_ts);
295 debug_assert!(added);
296 }
297
298 fn get_calculated_topic_stat(
300 &self,
301 topic: &str,
302 period: Duration,
303 ) -> Option<CalculatedTopicStat> {
304 let stats = self.history_by_topic.get(topic)?;
305 calculate_topic_stat(stats, period)
306 }
307
308 fn get_latest_entry_id(&self, topic: &str) -> Option<(u64, i64)> {
310 self.history_by_topic.get(topic).and_then(|stats| {
311 stats
312 .back()
313 .map(|stat| (stat.latest_entry_id, stat.start_ts))
314 })
315 }
316}
317
318pub struct CalculatedTopicStat {
324 pub avg_record_size: usize,
325 pub start_ts: i64,
326 pub end_ts: i64,
327}
328
329fn calculate_topic_stat(
333 stats: &VecDeque<HistoryTopicStat>,
334 period: Duration,
335) -> Option<CalculatedTopicStat> {
336 if stats.len() < 2 {
337 return None;
338 }
339
340 let last_stat = stats.back().unwrap();
341 let first_stat = stats.front().unwrap();
342 if first_stat.start_ts + period.as_millis() as i64 > last_stat.start_ts {
344 return None;
345 }
346
347 let target_stat = stats
350 .iter()
351 .rev()
352 .skip(1)
353 .find(|stat| (stat.start_ts + period.as_millis() as i64) < last_stat.start_ts);
354
355 let target_stat = target_stat?;
356
357 if target_stat.record_size > last_stat.record_size
359 || target_stat.record_num > last_stat.record_num
360 {
361 return None;
362 }
363
364 let record_size = last_stat.record_size - target_stat.record_size;
366 let record_num = last_stat.record_num - target_stat.record_num;
367 let avg_record_size = record_size.checked_div(record_num).unwrap_or(0) as usize;
368
369 let start_ts = target_stat.start_ts;
370 let end_ts = last_stat.start_ts;
371 Some(CalculatedTopicStat {
372 avg_record_size,
373 start_ts,
374 end_ts,
375 })
376}
377
378#[cfg(test)]
379mod tests {
380 use std::collections::VecDeque;
381
382 use common_time::util::current_time_millis;
383
384 use super::*;
385 use crate::datanode::TopicStat;
386
387 fn merged_stat(ts: i64, record_size: u64, record_num: u64) -> HistoryTopicStat {
388 HistoryTopicStat {
389 start_ts: ts,
390 record_size,
391 record_num,
392 ..Default::default()
393 }
394 }
395
396 #[test]
397 fn test_empty_stats() {
398 let stats: VecDeque<HistoryTopicStat> = VecDeque::new();
399 assert!(calculate_topic_stat(&stats, Duration::from_secs(10)).is_none());
400 }
401
402 #[test]
403 fn test_single_stat() {
404 let mut stats = VecDeque::new();
405 stats.push_back(merged_stat(1000, 100, 2));
406 assert!(calculate_topic_stat(&stats, Duration::from_secs(10)).is_none());
407 }
408
409 #[test]
410 fn test_no_target_stat_found() {
411 let mut stats = VecDeque::new();
412 stats.push_back(merged_stat(1000, 100, 2));
413 stats.push_back(merged_stat(2000, 200, 4));
414 assert!(calculate_topic_stat(&stats, Duration::from_secs(100)).is_none());
416 }
417
418 #[test]
419 fn test_target_stat_found() {
420 let mut stats = VecDeque::new();
421 stats.push_back(merged_stat(1000, 100, 2));
422 stats.push_back(merged_stat(3000, 200, 4));
423 stats.push_back(merged_stat(6000, 600, 6));
424 let result = calculate_topic_stat(&stats, Duration::from_secs(2));
425 assert!(result.is_some());
426 let stat = result.unwrap();
427 assert_eq!(stat.avg_record_size, 200); assert_eq!(stat.start_ts, 3000);
429 assert_eq!(stat.end_ts, 6000);
430 }
431
432 #[test]
433 fn test_target_stat_decreasing() {
434 let mut stats = VecDeque::new();
435 stats.push_back(merged_stat(1000, 100, 2));
436 stats.push_back(merged_stat(3000, 200, 4));
437 stats.push_back(merged_stat(6000, 100, 1)); let result = calculate_topic_stat(&stats, Duration::from_secs(2));
439 assert!(result.is_none());
440 }
441
442 #[test]
443 fn test_multiple_stats_target_found() {
444 let mut stats = VecDeque::new();
445 stats.push_back(merged_stat(1000, 100, 2));
446 stats.push_back(merged_stat(2000, 200, 4));
447 stats.push_back(merged_stat(4000, 400, 8));
448 stats.push_back(merged_stat(8000, 800, 16));
449 let result = calculate_topic_stat(&stats, Duration::from_secs(3));
450 assert!(result.is_some());
451 let stat = result.unwrap();
452 assert_eq!(stat.avg_record_size, 50); assert_eq!(stat.start_ts, 4000);
454 assert_eq!(stat.end_ts, 8000);
455 }
456
457 #[test]
458 fn test_active_bucket() {
459 let ts = current_time_millis();
460 let window_size = Duration::from_secs(3);
461 let mut active_bucket = ActiveBucket::new(ts, window_size);
462
463 assert!(active_bucket.add_stat(
464 0,
465 &TopicStat {
466 topic: "test".to_string(),
467 latest_entry_id: 1,
468 record_size: 256,
469 record_num: 1,
470 },
471 ts + 10,
472 ));
473
474 assert!(active_bucket.add_stat(
475 1,
476 &TopicStat {
477 topic: "test".to_string(),
478 latest_entry_id: 10,
479 record_size: 5120,
480 record_num: 10,
481 },
482 ts + 10,
483 ));
484
485 assert!(active_bucket.add_stat(
486 0,
487 &TopicStat {
488 topic: "test1".to_string(),
489 latest_entry_id: 2,
490 record_size: 128,
491 record_num: 2,
492 },
493 ts + 9,
494 ));
495
496 assert!(!active_bucket.add_stat(
498 0,
499 &TopicStat {
500 topic: "test".to_string(),
501 latest_entry_id: 2,
502 record_size: 2,
503 record_num: 2,
504 },
505 ts + window_size.as_millis() as i64 + 1,
506 ));
507
508 assert!(!active_bucket.add_stat(
510 0,
511 &TopicStat {
512 topic: "test".to_string(),
513 latest_entry_id: 2,
514 record_size: 2,
515 record_num: 2,
516 },
517 ts - 1
518 ));
519
520 assert!(active_bucket.add_stat(
522 0,
523 &TopicStat {
524 topic: "test".to_string(),
525 latest_entry_id: 3,
526 record_size: 1024,
527 record_num: 3,
528 },
529 ts + 11,
530 ));
531 assert_eq!(
532 active_bucket.get_stat(0, "test").unwrap().latest_entry_id,
533 3
534 );
535
536 assert!(active_bucket.add_stat(
538 0,
539 &TopicStat {
540 topic: "test".to_string(),
541 latest_entry_id: 2,
542 record_size: 512,
543 record_num: 2,
544 },
545 ts + 9,
546 ));
547
548 assert_eq!(
549 active_bucket.get_stat(0, "test").unwrap().latest_entry_id,
550 3
551 );
552
553 let merged = active_bucket.merge();
554 assert_eq!(merged.len(), 2);
555 assert_eq!(merged.get("test").unwrap().latest_entry_id, 10);
556 assert_eq!(merged.get("test").unwrap().record_size, 5120 + 1024);
557 assert_eq!(merged.get("test").unwrap().record_num, 10 + 3);
558
559 assert_eq!(merged.get("test1").unwrap().latest_entry_id, 2);
560 assert_eq!(merged.get("test1").unwrap().record_size, 128);
561 assert_eq!(merged.get("test1").unwrap().record_num, 2);
562 assert_eq!(merged.get("test1").unwrap().start_ts, ts);
563 }
564
565 #[test]
566 fn test_topic_stats() {
567 let topic_name = "test";
568 let window_size = Duration::from_secs(60);
569 let mut topic_stats = TopicStatsStore::new(5, window_size);
570 let ts = TopicStatsStore::align_ts(current_time_millis());
571 debug!("add stat at ts: {}", ts);
572 topic_stats.add_stat(
573 0,
574 &TopicStat {
575 topic: topic_name.to_string(),
576 latest_entry_id: 1,
577 record_size: 1024,
578 record_num: 1,
579 },
580 ts,
581 );
582
583 debug!("add stat at ts: {}", ts + window_size.as_millis() as i64);
584 topic_stats.add_stat(
585 1,
586 &TopicStat {
587 topic: topic_name.to_string(),
588 latest_entry_id: 4,
589 record_size: 4096,
590 record_num: 4,
591 },
592 ts + window_size.as_millis() as i64 - 1,
593 );
594
595 topic_stats.add_stat(
596 1,
597 &TopicStat {
598 topic: "another_topic".to_string(),
599 latest_entry_id: 4,
600 record_size: 4096,
601 record_num: 4,
602 },
603 ts + window_size.as_millis() as i64 - 1,
604 );
605
606 debug!(
607 "add stat at ts: {}",
608 ts + window_size.as_millis() as i64 + 1
609 );
610 topic_stats.add_stat(
612 1,
613 &TopicStat {
614 topic: topic_name.to_string(),
615 latest_entry_id: 5,
616 record_size: 8192,
617 record_num: 5,
618 },
619 ts + window_size.as_millis() as i64,
620 );
621
622 let history = topic_stats.history_by_topic.get(topic_name).unwrap();
623 assert_eq!(history.len(), 1);
624 assert_eq!(
625 history[0],
626 HistoryTopicStat {
627 latest_entry_id: 4,
628 record_size: 1024 + 4096,
629 record_num: 1 + 4,
630 start_ts: ts,
631 }
632 );
633 assert!(topic_stats.active_bucket.is_some());
634 }
635}