1use std::time::{Duration, Instant};
16
17use api::v1::meta::{HeartbeatRequest, Role};
18use api::v1::value::ValueData;
19use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, Value};
20use client::inserter::{Context as InserterContext, Inserter};
21use client::DEFAULT_CATALOG_NAME;
22use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
23use common_macro::{Schema, ToRow};
24use common_meta::datanode::RegionStat;
25use common_meta::DatanodeId;
26use common_telemetry::warn;
27use dashmap::DashMap;
28use store_api::region_engine::RegionRole;
29use store_api::storage::RegionId;
30
31use crate::error::Result;
32use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
33use crate::metasrv::Context;
34
35pub struct PersistStatsHandler {
37 inserter: Box<dyn Inserter>,
38 last_persisted_region_stats: DashMap<RegionId, PersistedRegionStat>,
39 last_persisted_time: DashMap<DatanodeId, Instant>,
40 persist_interval: Duration,
41}
42
43const META_REGION_STATS_HISTORY_TABLE_NAME: &str = "region_statistics_history";
45const DEFAULT_CONTEXT: InserterContext = InserterContext {
47 catalog: DEFAULT_CATALOG_NAME,
48 schema: DEFAULT_PRIVATE_SCHEMA_NAME,
49};
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52struct PersistedRegionStat {
53 region_id: RegionId,
54 written_bytes: u64,
55}
56
57impl From<&RegionStat> for PersistedRegionStat {
58 fn from(stat: &RegionStat) -> Self {
59 Self {
60 region_id: stat.id,
61 written_bytes: stat.written_bytes,
62 }
63 }
64}
65
66#[derive(ToRow, Schema)]
67struct PersistRegionStat<'a> {
68 table_id: u32,
69 region_id: u64,
70 region_number: u32,
71 manifest_size: u64,
72 datanode_id: u64,
73 #[col(datatype = "string")]
74 engine: &'a str,
75 num_rows: u64,
76 sst_num: u64,
77 sst_size: u64,
78 write_bytes_delta: u64,
79 #[col(
80 name = "greptime_timestamp",
81 semantic = "Timestamp",
82 datatype = "TimestampMillisecond"
83 )]
84 timestamp_millis: i64,
85}
86
87fn compute_persist_region_stat(
89 region_stat: &RegionStat,
90 datanode_id: DatanodeId,
91 timestamp_millis: i64,
92 persisted_region_stat: Option<PersistedRegionStat>,
93) -> PersistRegionStat {
94 let write_bytes_delta = persisted_region_stat
95 .and_then(|persisted_region_stat| {
96 region_stat
97 .written_bytes
98 .checked_sub(persisted_region_stat.written_bytes)
99 })
100 .unwrap_or_default();
101
102 PersistRegionStat {
103 table_id: region_stat.id.table_id(),
104 region_id: region_stat.id.as_u64(),
105 region_number: region_stat.id.region_number(),
106 manifest_size: region_stat.manifest_size,
107 datanode_id,
108 engine: region_stat.engine.as_str(),
109 num_rows: region_stat.num_rows,
110 sst_num: region_stat.sst_num,
111 sst_size: region_stat.sst_size,
112 write_bytes_delta,
113 timestamp_millis,
114 }
115}
116
117fn to_persisted_if_leader(
118 region_stat: &RegionStat,
119 last_persisted_region_stats: &DashMap<RegionId, PersistedRegionStat>,
120 datanode_id: DatanodeId,
121 timestamp_millis: i64,
122) -> Option<(Row, PersistedRegionStat)> {
123 if matches!(region_stat.role, RegionRole::Leader) {
124 let persisted_region_stat = last_persisted_region_stats.get(®ion_stat.id).map(|s| *s);
125 Some((
126 compute_persist_region_stat(
127 region_stat,
128 datanode_id,
129 timestamp_millis,
130 persisted_region_stat,
131 )
132 .to_row(),
133 PersistedRegionStat::from(region_stat),
134 ))
135 } else {
136 None
137 }
138}
139
140fn align_ts(ts: i64, interval: Duration) -> i64 {
145 assert!(
146 interval.as_millis() != 0,
147 "interval must be greater than zero"
148 );
149 ts / interval.as_millis() as i64 * interval.as_millis() as i64
150}
151
152impl PersistStatsHandler {
153 pub fn new(inserter: Box<dyn Inserter>, mut persist_interval: Duration) -> Self {
155 if persist_interval < Duration::from_mins(10) {
156 warn!("persist_interval is less than 10 minutes, set to 10 minutes");
157 persist_interval = Duration::from_mins(10);
158 }
159
160 Self {
161 inserter,
162 last_persisted_region_stats: DashMap::new(),
163 last_persisted_time: DashMap::new(),
164 persist_interval,
165 }
166 }
167
168 fn should_persist(&self, datanode_id: DatanodeId) -> bool {
169 let Some(last_persisted_time) = self.last_persisted_time.get(&datanode_id) else {
170 return true;
171 };
172
173 last_persisted_time.elapsed() >= self.persist_interval
174 }
175
176 async fn persist(
177 &self,
178 timestamp_millis: i64,
179 datanode_id: DatanodeId,
180 region_stats: &[RegionStat],
181 ) {
182 let aligned_ts = align_ts(timestamp_millis, self.persist_interval);
184 let (rows, incoming_region_stats): (Vec<_>, Vec<_>) = region_stats
185 .iter()
186 .flat_map(|region_stat| {
187 to_persisted_if_leader(
188 region_stat,
189 &self.last_persisted_region_stats,
190 datanode_id,
191 aligned_ts,
192 )
193 })
194 .unzip();
195
196 if rows.is_empty() {
197 return;
198 }
199
200 if let Err(err) = self
201 .inserter
202 .insert_rows(
203 &DEFAULT_CONTEXT,
204 RowInsertRequests {
205 inserts: vec![RowInsertRequest {
206 table_name: META_REGION_STATS_HISTORY_TABLE_NAME.to_string(),
207 rows: Some(Rows {
208 schema: PersistRegionStat::schema(),
209 rows,
210 }),
211 }],
212 },
213 )
214 .await
215 {
216 warn!(
217 "Failed to persist region stats, datanode_id: {}, error: {:?}",
218 datanode_id, err
219 );
220 return;
221 }
222
223 self.last_persisted_time.insert(datanode_id, Instant::now());
224 for s in incoming_region_stats {
225 self.last_persisted_region_stats.insert(s.region_id, s);
226 }
227 }
228}
229
230#[async_trait::async_trait]
231impl HeartbeatHandler for PersistStatsHandler {
232 fn is_acceptable(&self, role: Role) -> bool {
233 role == Role::Datanode
234 }
235
236 async fn handle(
237 &self,
238 _req: &HeartbeatRequest,
239 _: &mut Context,
240 acc: &mut HeartbeatAccumulator,
241 ) -> Result<HandleControl> {
242 let Some(current_stat) = acc.stat.as_ref() else {
243 return Ok(HandleControl::Continue);
244 };
245
246 if !self.should_persist(current_stat.id) {
247 return Ok(HandleControl::Continue);
248 }
249
250 self.persist(
251 current_stat.timestamp_millis,
252 current_stat.id,
253 ¤t_stat.region_stats,
254 )
255 .await;
256
257 Ok(HandleControl::Continue)
258 }
259}
260
261#[cfg(test)]
262mod tests {
263 use std::sync::{Arc, Mutex};
264
265 use client::inserter::{Context as InserterContext, InsertOptions};
266 use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
267 use store_api::region_engine::RegionRole;
268 use store_api::storage::RegionId;
269
270 use super::*;
271 use crate::handler::test_utils::TestEnv;
272
273 fn create_test_region_stat(
274 table_id: u32,
275 region_number: u32,
276 written_bytes: u64,
277 engine: &str,
278 ) -> RegionStat {
279 let region_id = RegionId::new(table_id, region_number);
280 RegionStat {
281 id: region_id,
282 rcus: 100,
283 wcus: 200,
284 approximate_bytes: 1024,
285 engine: engine.to_string(),
286 role: RegionRole::Leader,
287 num_rows: 1000,
288 memtable_size: 512,
289 manifest_size: 256,
290 sst_size: 2048,
291 sst_num: 5,
292 index_size: 128,
293 region_manifest: RegionManifestInfo::Mito {
294 manifest_version: 1,
295 flushed_entry_id: 100,
296 },
297 written_bytes,
298 data_topic_latest_entry_id: 200,
299 metadata_topic_latest_entry_id: 200,
300 }
301 }
302
303 #[test]
304 fn test_compute_persist_region_stat_with_no_persisted_stat() {
305 let region_stat = create_test_region_stat(1, 1, 1000, "mito");
306 let datanode_id = 123;
307 let timestamp_millis = 1640995200000; let result = compute_persist_region_stat(®ion_stat, datanode_id, timestamp_millis, None);
309 assert_eq!(result.table_id, 1);
310 assert_eq!(result.region_id, region_stat.id.as_u64());
311 assert_eq!(result.region_number, 1);
312 assert_eq!(result.manifest_size, 256);
313 assert_eq!(result.datanode_id, datanode_id);
314 assert_eq!(result.engine, "mito");
315 assert_eq!(result.num_rows, 1000);
316 assert_eq!(result.sst_num, 5);
317 assert_eq!(result.sst_size, 2048);
318 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
320 }
321
322 #[test]
323 fn test_compute_persist_region_stat_with_persisted_stat_increase() {
324 let region_stat = create_test_region_stat(2, 3, 1500, "mito");
325 let datanode_id = 456;
326 let timestamp_millis = 1640995260000; let persisted_stat = PersistedRegionStat {
328 region_id: region_stat.id,
329 written_bytes: 1000, };
331 let result = compute_persist_region_stat(
332 ®ion_stat,
333 datanode_id,
334 timestamp_millis,
335 Some(persisted_stat),
336 );
337 assert_eq!(result.table_id, 2);
338 assert_eq!(result.region_id, region_stat.id.as_u64());
339 assert_eq!(result.region_number, 3);
340 assert_eq!(result.manifest_size, 256);
341 assert_eq!(result.datanode_id, datanode_id);
342 assert_eq!(result.engine, "mito");
343 assert_eq!(result.num_rows, 1000);
344 assert_eq!(result.sst_num, 5);
345 assert_eq!(result.sst_size, 2048);
346 assert_eq!(result.write_bytes_delta, 500); assert_eq!(result.timestamp_millis, timestamp_millis);
348 }
349
350 #[test]
351 fn test_compute_persist_region_stat_with_persisted_stat_decrease() {
352 let region_stat = create_test_region_stat(3, 5, 800, "mito");
353 let datanode_id = 789;
354 let timestamp_millis = 1640995320000; let persisted_stat = PersistedRegionStat {
356 region_id: region_stat.id,
357 written_bytes: 1200, };
359 let result = compute_persist_region_stat(
360 ®ion_stat,
361 datanode_id,
362 timestamp_millis,
363 Some(persisted_stat),
364 );
365 assert_eq!(result.table_id, 3);
366 assert_eq!(result.region_id, region_stat.id.as_u64());
367 assert_eq!(result.region_number, 5);
368 assert_eq!(result.manifest_size, 256);
369 assert_eq!(result.datanode_id, datanode_id);
370 assert_eq!(result.engine, "mito");
371 assert_eq!(result.num_rows, 1000);
372 assert_eq!(result.sst_num, 5);
373 assert_eq!(result.sst_size, 2048);
374 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
376 }
377
378 #[test]
379 fn test_compute_persist_region_stat_with_persisted_stat_equal() {
380 let region_stat = create_test_region_stat(4, 7, 2000, "mito");
381 let datanode_id = 101;
382 let timestamp_millis = 1640995380000; let persisted_stat = PersistedRegionStat {
384 region_id: region_stat.id,
385 written_bytes: 2000, };
387 let result = compute_persist_region_stat(
388 ®ion_stat,
389 datanode_id,
390 timestamp_millis,
391 Some(persisted_stat),
392 );
393 assert_eq!(result.table_id, 4);
394 assert_eq!(result.region_id, region_stat.id.as_u64());
395 assert_eq!(result.region_number, 7);
396 assert_eq!(result.manifest_size, 256);
397 assert_eq!(result.datanode_id, datanode_id);
398 assert_eq!(result.engine, "mito");
399 assert_eq!(result.num_rows, 1000);
400 assert_eq!(result.sst_num, 5);
401 assert_eq!(result.sst_size, 2048);
402 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
404 }
405
406 #[test]
407 fn test_compute_persist_region_stat_with_overflow_protection() {
408 let region_stat = create_test_region_stat(8, 15, 500, "mito");
409 let datanode_id = 505;
410 let timestamp_millis = 1640995620000; let persisted_stat = PersistedRegionStat {
412 region_id: region_stat.id,
413 written_bytes: 1000, };
415 let result = compute_persist_region_stat(
416 ®ion_stat,
417 datanode_id,
418 timestamp_millis,
419 Some(persisted_stat),
420 );
421 assert_eq!(result.table_id, 8);
422 assert_eq!(result.region_id, region_stat.id.as_u64());
423 assert_eq!(result.region_number, 15);
424 assert_eq!(result.manifest_size, 256);
425 assert_eq!(result.datanode_id, datanode_id);
426 assert_eq!(result.engine, "mito");
427 assert_eq!(result.num_rows, 1000);
428 assert_eq!(result.sst_num, 5);
429 assert_eq!(result.sst_size, 2048);
430 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
432 }
433
434 struct MockInserter {
435 requests: Arc<Mutex<Vec<api::v1::RowInsertRequest>>>,
436 }
437
438 #[async_trait::async_trait]
439 impl Inserter for MockInserter {
440 async fn insert_rows(
441 &self,
442 _context: &InserterContext<'_>,
443 requests: api::v1::RowInsertRequests,
444 ) -> client::error::Result<()> {
445 self.requests.lock().unwrap().extend(requests.inserts);
446
447 Ok(())
448 }
449
450 fn set_options(&mut self, _options: &InsertOptions) {}
451 }
452
453 #[tokio::test]
454 async fn test_not_persist_region_stats() {
455 let env = TestEnv::new();
456 let mut ctx = env.ctx();
457
458 let requests = Arc::new(Mutex::new(vec![]));
459 let inserter = MockInserter {
460 requests: requests.clone(),
461 };
462 let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
463 let mut acc = HeartbeatAccumulator {
464 stat: Some(Stat {
465 id: 1,
466 timestamp_millis: 1640995200000,
467 region_stats: vec![create_test_region_stat(1, 1, 1000, "mito")],
468 ..Default::default()
469 }),
470 ..Default::default()
471 };
472 handler.last_persisted_time.insert(1, Instant::now());
473 handler
475 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
476 .await
477 .unwrap();
478 assert!(requests.lock().unwrap().is_empty());
479 }
480
481 #[tokio::test]
482 async fn test_persist_region_stats() {
483 let env = TestEnv::new();
484 let mut ctx = env.ctx();
485 let requests = Arc::new(Mutex::new(vec![]));
486 let inserter = MockInserter {
487 requests: requests.clone(),
488 };
489
490 let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
491
492 let region_stat = create_test_region_stat(1, 1, 1000, "mito");
493 let timestamp_millis = 1640995200000;
494 let datanode_id = 1;
495 let region_id = RegionId::new(1, 1);
496 let mut acc = HeartbeatAccumulator {
497 stat: Some(Stat {
498 id: datanode_id,
499 timestamp_millis,
500 region_stats: vec![region_stat.clone()],
501 ..Default::default()
502 }),
503 ..Default::default()
504 };
505
506 handler.last_persisted_region_stats.insert(
507 region_id,
508 PersistedRegionStat {
509 region_id,
510 written_bytes: 500,
511 },
512 );
513 let (expected_row, expected_persisted_region_stat) = to_persisted_if_leader(
514 ®ion_stat,
515 &handler.last_persisted_region_stats,
516 datanode_id,
517 timestamp_millis,
518 )
519 .unwrap();
520 let before_insert_time = Instant::now();
521 handler
523 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
524 .await
525 .unwrap();
526 let request = {
527 let mut requests = requests.lock().unwrap();
528 assert_eq!(requests.len(), 1);
529 requests.pop().unwrap()
530 };
531 assert_eq!(
532 request.table_name,
533 META_REGION_STATS_HISTORY_TABLE_NAME.to_string()
534 );
535 assert_eq!(request.rows.unwrap().rows, vec![expected_row]);
536
537 assert!(handler
539 .last_persisted_time
540 .get(&datanode_id)
541 .unwrap()
542 .gt(&before_insert_time));
543
544 assert_eq!(
546 handler
547 .last_persisted_region_stats
548 .get(®ion_id)
549 .unwrap()
550 .value(),
551 &expected_persisted_region_stat
552 );
553 }
554}