common_meta/stats/
topic.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, VecDeque};
16use std::sync::{Arc, RwLock};
17use std::time::Duration;
18
19use common_telemetry::{debug, warn};
20use datafusion_common::HashSet;
21
22use crate::datanode::TopicStat;
23use crate::distributed_time_constants::{
24    TOPIC_STATS_REPORT_INTERVAL_SECS, TOPIC_STATS_RETENTION_SECS,
25};
26use crate::DatanodeId;
27
28pub type TopicStatsRegistryRef = Arc<TopicStatsRegistry>;
29
30/// Manages statistics for all topics across the cluster.
31pub struct TopicStatsRegistry {
32    inner: RwLock<TopicStatsStore>,
33}
34
35impl Default for TopicStatsRegistry {
36    fn default() -> Self {
37        Self::new(
38            Duration::from_secs(TOPIC_STATS_RETENTION_SECS),
39            Duration::from_secs(TOPIC_STATS_REPORT_INTERVAL_SECS),
40        )
41    }
42}
43
44impl TopicStatsRegistry {
45    /// Creates a new topic stats registry.
46    ///
47    /// # Panics
48    /// Panic if the window size is zero.
49    fn new(retention: Duration, window_size: Duration) -> Self {
50        let history_limit = (retention.as_secs() / window_size.as_secs()).max(10) as usize;
51        Self {
52            inner: RwLock::new(TopicStatsStore::new(history_limit, window_size)),
53        }
54    }
55
56    /// Adds a topic stat for a given datanode at a specific timestamp.
57    pub fn add_stat(&self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) {
58        let mut inner = self.inner.write().unwrap();
59        inner.add_stat(datanode_id, stat, millis_ts);
60    }
61
62    /// Adds a list of topic stats for a given datanode at a specific timestamp.
63    pub fn add_stats(&self, datanode_id: DatanodeId, stats: &[TopicStat], millis_ts: i64) {
64        if stats.is_empty() {
65            return;
66        }
67
68        let mut inner = self.inner.write().unwrap();
69        for stat in stats {
70            inner.add_stat(datanode_id, stat, millis_ts);
71        }
72    }
73
74    /// Gets the calculated topic stat for a given topic.
75    pub fn get_calculated_topic_stat(
76        &self,
77        topic: &str,
78        period: Duration,
79    ) -> Option<CalculatedTopicStat> {
80        let inner = self.inner.read().unwrap();
81        inner.get_calculated_topic_stat(topic, period)
82    }
83
84    /// Gets the latest entry id and timestamp for a given topic.
85    pub fn get_latest_entry_id(&self, topic: &str) -> Option<(u64, i64)> {
86        let inner = self.inner.read().unwrap();
87        inner.get_latest_entry_id(topic)
88    }
89}
90
91#[derive(Debug, PartialEq, Clone, Default)]
92struct HistoryTopicStat {
93    /// The latest entry id of the topic.
94    pub latest_entry_id: u64,
95    /// The total size in bytes of records appended to the topic.
96    pub record_size: u64,
97    /// The total number of records appended to the topic.
98    pub record_num: u64,
99    /// The start timestamp of the stat.
100    start_ts: i64,
101}
102
103#[derive(Debug)]
104struct PartialTopicStat {
105    /// The latest entry id of the topic.
106    pub latest_entry_id: u64,
107    /// The total size in bytes of records appended to the topic.
108    pub record_size: u64,
109    /// The total number of records appended to the topic.
110    pub record_num: u64,
111    /// The timestamp of the partial topic stat.
112    pub timestamp: i64,
113}
114
115struct ActiveBucket {
116    buffer: HashMap<DatanodeId, HashMap<String, PartialTopicStat>>,
117    start_ts: i64,
118    window_size: Duration,
119}
120
121impl ActiveBucket {
122    fn new(timestamp: i64, window_sec: Duration) -> Self {
123        Self {
124            buffer: HashMap::new(),
125            start_ts: timestamp,
126            window_size: window_sec,
127        }
128    }
129
130    fn acceptable_ts(&self, millis_ts: i64) -> bool {
131        let acceptable = millis_ts >= self.start_ts
132            && millis_ts < self.start_ts + self.window_size.as_millis() as i64;
133        if !acceptable {
134            debug!(
135                "acceptable range: ts >= {} && ts < {}, ts: {}",
136                self.start_ts,
137                self.start_ts + self.window_size.as_millis() as i64,
138                millis_ts
139            );
140        }
141        acceptable
142    }
143
144    /// Add a topic stat to the current topic stats.
145    ///
146    /// Returns true if the topic stat is added successfully (stale stat will be ignored directly),
147    /// false if the topic stat is out of the window.
148    fn add_stat(&mut self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) -> bool {
149        if !self.acceptable_ts(millis_ts) {
150            return false;
151        }
152
153        let datanode_stats = self.buffer.entry(datanode_id).or_default();
154
155        // Overwrite the topic stat if it already exists.
156        if let Some(prev) = datanode_stats.get_mut(&stat.topic) {
157            if millis_ts > prev.timestamp {
158                *prev = PartialTopicStat {
159                    latest_entry_id: stat.latest_entry_id,
160                    record_size: stat.record_size,
161                    record_num: stat.record_num,
162                    timestamp: millis_ts,
163                };
164            } else {
165                warn!(
166                    "Ignore stale topic stat for topic: {}, timestamp: {}, last recorded timestamp: {}",
167                    stat.topic, millis_ts, prev.timestamp
168                );
169            }
170        } else {
171            datanode_stats.insert(
172                stat.topic.to_string(),
173                PartialTopicStat {
174                    latest_entry_id: stat.latest_entry_id,
175                    record_size: stat.record_size,
176                    record_num: stat.record_num,
177                    timestamp: millis_ts,
178                },
179            );
180        }
181        true
182    }
183
184    fn merge(self) -> HashMap<String, HistoryTopicStat> {
185        let all_topics = self
186            .buffer
187            .values()
188            .flat_map(|stats| stats.keys())
189            .collect::<HashSet<_>>();
190
191        let mut output = HashMap::with_capacity(all_topics.len());
192        for topic in all_topics {
193            let stats = self
194                .buffer
195                .values()
196                .flat_map(|stats| stats.get(topic))
197                .collect::<Vec<_>>();
198            debug!("stats: {:?} for topic: {}", stats, topic);
199            let latest_entry_id = stats
200                .iter()
201                .map(|stat| stat.latest_entry_id)
202                .max()
203                .unwrap_or(0);
204            let record_size = stats.iter().map(|stat| stat.record_size).sum::<u64>();
205            let record_num = stats.iter().map(|stat| stat.record_num).sum::<u64>();
206
207            output.insert(
208                topic.to_string(),
209                HistoryTopicStat {
210                    latest_entry_id,
211                    record_size,
212                    record_num,
213                    start_ts: self.start_ts,
214                },
215            );
216        }
217
218        output
219    }
220
221    /// Get the partial topic stat of a datanode.
222    #[cfg(test)]
223    fn get_stat(&self, datanode_id: DatanodeId, topic: &str) -> Option<&PartialTopicStat> {
224        self.buffer
225            .get(&datanode_id)
226            .and_then(|stats| stats.get(topic))
227    }
228}
229
230/// Manages topic statistics over time, including active and historical buckets.
231struct TopicStatsStore {
232    /// The currently active bucket collecting stats.
233    active_bucket: Option<ActiveBucket>,
234    /// Historical merged buckets, grouped by topic.
235    history_by_topic: HashMap<String, VecDeque<HistoryTopicStat>>,
236    /// Maximum number of historical windows to keep per topic.
237    history_limit: usize,
238    /// Duration of each stats window in seconds.
239    window_size: Duration,
240}
241
242impl TopicStatsStore {
243    /// Create a new topic stats.
244    fn new(history_limit: usize, window_size: Duration) -> Self {
245        Self {
246            active_bucket: None,
247            history_by_topic: HashMap::new(),
248            history_limit,
249            window_size,
250        }
251    }
252
253    /// Aligns the timestamp to the nearest second.
254    fn align_ts(millis_ts: i64) -> i64 {
255        (millis_ts / 1000) * 1000
256    }
257
258    fn rotate_active_bucket(&mut self, start_ts: i64) {
259        let aligned_ts = Self::align_ts(start_ts);
260        if let Some(old_bucket) = self.active_bucket.take() {
261            let merged = old_bucket.merge();
262            for (topic, stat) in merged {
263                debug!(
264                    "Merge current topic: {}, stats into history: {:?}",
265                    topic, stat
266                );
267                let history = self.history_by_topic.entry(topic).or_default();
268                history.push_back(stat);
269                if history.len() > self.history_limit {
270                    history.pop_front();
271                }
272            }
273        }
274
275        self.active_bucket = Some(ActiveBucket::new(aligned_ts, self.window_size));
276    }
277
278    /// Adds a topic stat for a given datanode at a specific timestamp.
279    fn add_stat(&mut self, datanode_id: DatanodeId, stat: &TopicStat, millis_ts: i64) {
280        let aligned_ts = Self::align_ts(millis_ts);
281
282        let need_rotate = match &self.active_bucket {
283            Some(bucket) => !bucket.acceptable_ts(aligned_ts),
284            None => true,
285        };
286
287        if need_rotate {
288            debug!("Rotate active bucket at ts: {}", aligned_ts);
289            self.rotate_active_bucket(aligned_ts);
290        }
291
292        // Safety: The current topic stats is initialized in the previous step.
293        let active_bucket = self.active_bucket.as_mut().unwrap();
294        let added = active_bucket.add_stat(datanode_id, stat, millis_ts);
295        debug_assert!(added);
296    }
297
298    /// Gets the calculated topic stat for a given topic.
299    fn get_calculated_topic_stat(
300        &self,
301        topic: &str,
302        period: Duration,
303    ) -> Option<CalculatedTopicStat> {
304        let stats = self.history_by_topic.get(topic)?;
305        calculate_topic_stat(stats, period)
306    }
307
308    /// Gets the latest entry id and timestamp for a given topic.
309    fn get_latest_entry_id(&self, topic: &str) -> Option<(u64, i64)> {
310        self.history_by_topic.get(topic).and_then(|stats| {
311            stats
312                .back()
313                .map(|stat| (stat.latest_entry_id, stat.start_ts))
314        })
315    }
316}
317
318/// The calculated topic stat.
319///
320/// The average record size is the average record size of the topic over the window.
321/// The start timestamp is the timestamp of the window start.
322/// The end timestamp is the timestamp of the window end.
323pub struct CalculatedTopicStat {
324    pub avg_record_size: usize,
325    pub start_ts: i64,
326    pub end_ts: i64,
327}
328
329/// Calculates the average record size for a topic within a specified time window based on recent merged statistics.
330///
331/// Returns `Some(CalculatedTopicStat)` if the calculation is successful, or `None` if insufficient data is available.
332fn calculate_topic_stat(
333    stats: &VecDeque<HistoryTopicStat>,
334    period: Duration,
335) -> Option<CalculatedTopicStat> {
336    if stats.len() < 2 {
337        return None;
338    }
339
340    let last_stat = stats.back().unwrap();
341    let first_stat = stats.front().unwrap();
342    // Not enough stats data.
343    if first_stat.start_ts + period.as_millis() as i64 > last_stat.start_ts {
344        return None;
345    }
346
347    // Find the first stat whose timestamp is less than the last stat's timestamp - period.as_millis() as i64.
348    // TODO(weny): Use binary search to find the target stat.
349    let target_stat = stats
350        .iter()
351        .rev()
352        .skip(1)
353        .find(|stat| (stat.start_ts + period.as_millis() as i64) < last_stat.start_ts);
354
355    let target_stat = target_stat?;
356
357    // The target stat's record size and record num should be less than the last stat's record size and record num.
358    if target_stat.record_size > last_stat.record_size
359        || target_stat.record_num > last_stat.record_num
360    {
361        return None;
362    }
363
364    // Safety: the last stat's record size and record num must be greater than the target stat's record size and record num.
365    let record_size = last_stat.record_size - target_stat.record_size;
366    let record_num = last_stat.record_num - target_stat.record_num;
367    let avg_record_size = record_size.checked_div(record_num).unwrap_or(0) as usize;
368
369    let start_ts = target_stat.start_ts;
370    let end_ts = last_stat.start_ts;
371    Some(CalculatedTopicStat {
372        avg_record_size,
373        start_ts,
374        end_ts,
375    })
376}
377
378#[cfg(test)]
379mod tests {
380    use std::collections::VecDeque;
381
382    use common_time::util::current_time_millis;
383
384    use super::*;
385    use crate::datanode::TopicStat;
386
387    fn merged_stat(ts: i64, record_size: u64, record_num: u64) -> HistoryTopicStat {
388        HistoryTopicStat {
389            start_ts: ts,
390            record_size,
391            record_num,
392            ..Default::default()
393        }
394    }
395
396    #[test]
397    fn test_empty_stats() {
398        let stats: VecDeque<HistoryTopicStat> = VecDeque::new();
399        assert!(calculate_topic_stat(&stats, Duration::from_secs(10)).is_none());
400    }
401
402    #[test]
403    fn test_single_stat() {
404        let mut stats = VecDeque::new();
405        stats.push_back(merged_stat(1000, 100, 2));
406        assert!(calculate_topic_stat(&stats, Duration::from_secs(10)).is_none());
407    }
408
409    #[test]
410    fn test_no_target_stat_found() {
411        let mut stats = VecDeque::new();
412        stats.push_back(merged_stat(1000, 100, 2));
413        stats.push_back(merged_stat(2000, 200, 4));
414        // window_sec is large, so no stat will be found
415        assert!(calculate_topic_stat(&stats, Duration::from_secs(100)).is_none());
416    }
417
418    #[test]
419    fn test_target_stat_found() {
420        let mut stats = VecDeque::new();
421        stats.push_back(merged_stat(1000, 100, 2));
422        stats.push_back(merged_stat(3000, 200, 4));
423        stats.push_back(merged_stat(6000, 600, 6));
424        let result = calculate_topic_stat(&stats, Duration::from_secs(2));
425        assert!(result.is_some());
426        let stat = result.unwrap();
427        assert_eq!(stat.avg_record_size, 200); // (600 - 200) / (6 - 4)
428        assert_eq!(stat.start_ts, 3000);
429        assert_eq!(stat.end_ts, 6000);
430    }
431
432    #[test]
433    fn test_target_stat_decreasing() {
434        let mut stats = VecDeque::new();
435        stats.push_back(merged_stat(1000, 100, 2));
436        stats.push_back(merged_stat(3000, 200, 4));
437        stats.push_back(merged_stat(6000, 100, 1)); // Reset or something wrong
438        let result = calculate_topic_stat(&stats, Duration::from_secs(2));
439        assert!(result.is_none());
440    }
441
442    #[test]
443    fn test_multiple_stats_target_found() {
444        let mut stats = VecDeque::new();
445        stats.push_back(merged_stat(1000, 100, 2));
446        stats.push_back(merged_stat(2000, 200, 4));
447        stats.push_back(merged_stat(4000, 400, 8));
448        stats.push_back(merged_stat(8000, 800, 16));
449        let result = calculate_topic_stat(&stats, Duration::from_secs(3));
450        assert!(result.is_some());
451        let stat = result.unwrap();
452        assert_eq!(stat.avg_record_size, 50); // (800 - 400) / (16 - 8)
453        assert_eq!(stat.start_ts, 4000);
454        assert_eq!(stat.end_ts, 8000);
455    }
456
457    #[test]
458    fn test_active_bucket() {
459        let ts = current_time_millis();
460        let window_size = Duration::from_secs(3);
461        let mut active_bucket = ActiveBucket::new(ts, window_size);
462
463        assert!(active_bucket.add_stat(
464            0,
465            &TopicStat {
466                topic: "test".to_string(),
467                latest_entry_id: 1,
468                record_size: 256,
469                record_num: 1,
470            },
471            ts + 10,
472        ));
473
474        assert!(active_bucket.add_stat(
475            1,
476            &TopicStat {
477                topic: "test".to_string(),
478                latest_entry_id: 10,
479                record_size: 5120,
480                record_num: 10,
481            },
482            ts + 10,
483        ));
484
485        assert!(active_bucket.add_stat(
486            0,
487            &TopicStat {
488                topic: "test1".to_string(),
489                latest_entry_id: 2,
490                record_size: 128,
491                record_num: 2,
492            },
493            ts + 9,
494        ));
495
496        // Out of the window.
497        assert!(!active_bucket.add_stat(
498            0,
499            &TopicStat {
500                topic: "test".to_string(),
501                latest_entry_id: 2,
502                record_size: 2,
503                record_num: 2,
504            },
505            ts + window_size.as_millis() as i64 + 1,
506        ));
507
508        // Out of the window.
509        assert!(!active_bucket.add_stat(
510            0,
511            &TopicStat {
512                topic: "test".to_string(),
513                latest_entry_id: 2,
514                record_size: 2,
515                record_num: 2,
516            },
517            ts - 1
518        ));
519
520        // Overwrite the topic stat if the timestamp is larger.
521        assert!(active_bucket.add_stat(
522            0,
523            &TopicStat {
524                topic: "test".to_string(),
525                latest_entry_id: 3,
526                record_size: 1024,
527                record_num: 3,
528            },
529            ts + 11,
530        ));
531        assert_eq!(
532            active_bucket.get_stat(0, "test").unwrap().latest_entry_id,
533            3
534        );
535
536        // Ignore stale topic stat.
537        assert!(active_bucket.add_stat(
538            0,
539            &TopicStat {
540                topic: "test".to_string(),
541                latest_entry_id: 2,
542                record_size: 512,
543                record_num: 2,
544            },
545            ts + 9,
546        ));
547
548        assert_eq!(
549            active_bucket.get_stat(0, "test").unwrap().latest_entry_id,
550            3
551        );
552
553        let merged = active_bucket.merge();
554        assert_eq!(merged.len(), 2);
555        assert_eq!(merged.get("test").unwrap().latest_entry_id, 10);
556        assert_eq!(merged.get("test").unwrap().record_size, 5120 + 1024);
557        assert_eq!(merged.get("test").unwrap().record_num, 10 + 3);
558
559        assert_eq!(merged.get("test1").unwrap().latest_entry_id, 2);
560        assert_eq!(merged.get("test1").unwrap().record_size, 128);
561        assert_eq!(merged.get("test1").unwrap().record_num, 2);
562        assert_eq!(merged.get("test1").unwrap().start_ts, ts);
563    }
564
565    #[test]
566    fn test_topic_stats() {
567        let topic_name = "test";
568        let window_size = Duration::from_secs(60);
569        let mut topic_stats = TopicStatsStore::new(5, window_size);
570        let ts = TopicStatsStore::align_ts(current_time_millis());
571        debug!("add stat at ts: {}", ts);
572        topic_stats.add_stat(
573            0,
574            &TopicStat {
575                topic: topic_name.to_string(),
576                latest_entry_id: 1,
577                record_size: 1024,
578                record_num: 1,
579            },
580            ts,
581        );
582
583        debug!("add stat at ts: {}", ts + window_size.as_millis() as i64);
584        topic_stats.add_stat(
585            1,
586            &TopicStat {
587                topic: topic_name.to_string(),
588                latest_entry_id: 4,
589                record_size: 4096,
590                record_num: 4,
591            },
592            ts + window_size.as_millis() as i64 - 1,
593        );
594
595        topic_stats.add_stat(
596            1,
597            &TopicStat {
598                topic: "another_topic".to_string(),
599                latest_entry_id: 4,
600                record_size: 4096,
601                record_num: 4,
602            },
603            ts + window_size.as_millis() as i64 - 1,
604        );
605
606        debug!(
607            "add stat at ts: {}",
608            ts + window_size.as_millis() as i64 + 1
609        );
610        // Add a stat that is out of the window.
611        topic_stats.add_stat(
612            1,
613            &TopicStat {
614                topic: topic_name.to_string(),
615                latest_entry_id: 5,
616                record_size: 8192,
617                record_num: 5,
618            },
619            ts + window_size.as_millis() as i64,
620        );
621
622        let history = topic_stats.history_by_topic.get(topic_name).unwrap();
623        assert_eq!(history.len(), 1);
624        assert_eq!(
625            history[0],
626            HistoryTopicStat {
627                latest_entry_id: 4,
628                record_size: 1024 + 4096,
629                record_num: 1 + 4,
630                start_ts: ts,
631            }
632        );
633        assert!(topic_stats.active_bucket.is_some());
634    }
635}