1use std::time::{Duration, Instant};
16
17use api::v1::meta::{HeartbeatRequest, Role};
18use api::v1::value::ValueData;
19use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, Value};
20use client::DEFAULT_CATALOG_NAME;
21use client::inserter::{Context as InserterContext, Inserter};
22use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
23use common_macro::{Schema, ToRow};
24use common_meta::DatanodeId;
25use common_meta::datanode::RegionStat;
26use common_telemetry::warn;
27use dashmap::DashMap;
28use store_api::region_engine::RegionRole;
29use store_api::storage::RegionId;
30
31use crate::error::Result;
32use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
33use crate::metasrv::Context;
34
35pub struct PersistStatsHandler {
37 inserter: Box<dyn Inserter>,
38 last_persisted_region_stats: DashMap<RegionId, PersistedRegionStat>,
39 last_persisted_time: DashMap<DatanodeId, Instant>,
40 persist_interval: Duration,
41}
42
43const META_REGION_STATS_HISTORY_TABLE_NAME: &str = "region_statistics_history";
45const DEFAULT_CONTEXT: InserterContext = InserterContext {
47 catalog: DEFAULT_CATALOG_NAME,
48 schema: DEFAULT_PRIVATE_SCHEMA_NAME,
49};
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52struct PersistedRegionStat {
53 region_id: RegionId,
54 written_bytes: u64,
55}
56
57impl From<&RegionStat> for PersistedRegionStat {
58 fn from(stat: &RegionStat) -> Self {
59 Self {
60 region_id: stat.id,
61 written_bytes: stat.written_bytes,
62 }
63 }
64}
65
66#[derive(ToRow, Schema)]
67struct PersistRegionStat<'a> {
68 table_id: u32,
69 region_id: u64,
70 region_number: u32,
71 manifest_size: u64,
72 datanode_id: u64,
73 #[col(datatype = "string")]
74 engine: &'a str,
75 num_rows: u64,
76 sst_num: u64,
77 sst_size: u64,
78 write_bytes_delta: u64,
79 #[col(
80 name = "greptime_timestamp",
82 semantic = "Timestamp",
83 datatype = "TimestampMillisecond"
84 )]
85 timestamp_millis: i64,
86}
87
88fn compute_persist_region_stat(
90 region_stat: &RegionStat,
91 datanode_id: DatanodeId,
92 timestamp_millis: i64,
93 persisted_region_stat: Option<PersistedRegionStat>,
94) -> PersistRegionStat<'_> {
95 let write_bytes_delta = persisted_region_stat
96 .and_then(|persisted_region_stat| {
97 region_stat
98 .written_bytes
99 .checked_sub(persisted_region_stat.written_bytes)
100 })
101 .unwrap_or_default();
102
103 PersistRegionStat {
104 table_id: region_stat.id.table_id(),
105 region_id: region_stat.id.as_u64(),
106 region_number: region_stat.id.region_number(),
107 manifest_size: region_stat.manifest_size,
108 datanode_id,
109 engine: region_stat.engine.as_str(),
110 num_rows: region_stat.num_rows,
111 sst_num: region_stat.sst_num,
112 sst_size: region_stat.sst_size,
113 write_bytes_delta,
114 timestamp_millis,
115 }
116}
117
118fn to_persisted_if_leader(
119 region_stat: &RegionStat,
120 last_persisted_region_stats: &DashMap<RegionId, PersistedRegionStat>,
121 datanode_id: DatanodeId,
122 timestamp_millis: i64,
123) -> Option<(Row, PersistedRegionStat)> {
124 if matches!(
125 region_stat.role,
126 RegionRole::Leader | RegionRole::StagingLeader
127 ) {
128 let persisted_region_stat = last_persisted_region_stats.get(®ion_stat.id).map(|s| *s);
129 Some((
130 compute_persist_region_stat(
131 region_stat,
132 datanode_id,
133 timestamp_millis,
134 persisted_region_stat,
135 )
136 .to_row(),
137 PersistedRegionStat::from(region_stat),
138 ))
139 } else {
140 None
141 }
142}
143
144fn align_ts(ts: i64, interval: Duration) -> i64 {
149 assert!(
150 interval.as_millis() != 0,
151 "interval must be greater than zero"
152 );
153 ts / interval.as_millis() as i64 * interval.as_millis() as i64
154}
155
156impl PersistStatsHandler {
157 pub fn new(inserter: Box<dyn Inserter>, mut persist_interval: Duration) -> Self {
159 if persist_interval < Duration::from_mins(10) {
160 warn!("persist_interval is less than 10 minutes, set to 10 minutes");
161 persist_interval = Duration::from_mins(10);
162 }
163
164 Self {
165 inserter,
166 last_persisted_region_stats: DashMap::new(),
167 last_persisted_time: DashMap::new(),
168 persist_interval,
169 }
170 }
171
172 fn should_persist(&self, datanode_id: DatanodeId) -> bool {
173 let Some(last_persisted_time) = self.last_persisted_time.get(&datanode_id) else {
174 return true;
175 };
176
177 last_persisted_time.elapsed() >= self.persist_interval
178 }
179
180 async fn persist(
181 &self,
182 timestamp_millis: i64,
183 datanode_id: DatanodeId,
184 region_stats: &[RegionStat],
185 ) {
186 let aligned_ts = align_ts(timestamp_millis, self.persist_interval);
188 let (rows, incoming_region_stats): (Vec<_>, Vec<_>) = region_stats
189 .iter()
190 .flat_map(|region_stat| {
191 to_persisted_if_leader(
192 region_stat,
193 &self.last_persisted_region_stats,
194 datanode_id,
195 aligned_ts,
196 )
197 })
198 .unzip();
199
200 if rows.is_empty() {
201 return;
202 }
203
204 if let Err(err) = self
205 .inserter
206 .insert_rows(
207 &DEFAULT_CONTEXT,
208 RowInsertRequests {
209 inserts: vec![RowInsertRequest {
210 table_name: META_REGION_STATS_HISTORY_TABLE_NAME.to_string(),
211 rows: Some(Rows {
212 schema: PersistRegionStat::schema(),
213 rows,
214 }),
215 }],
216 },
217 )
218 .await
219 {
220 warn!(
221 "Failed to persist region stats, datanode_id: {}, error: {:?}",
222 datanode_id, err
223 );
224 return;
225 }
226
227 self.last_persisted_time.insert(datanode_id, Instant::now());
228 for s in incoming_region_stats {
229 self.last_persisted_region_stats.insert(s.region_id, s);
230 }
231 }
232}
233
234#[async_trait::async_trait]
235impl HeartbeatHandler for PersistStatsHandler {
236 fn is_acceptable(&self, role: Role) -> bool {
237 role == Role::Datanode
238 }
239
240 async fn handle(
241 &self,
242 _req: &HeartbeatRequest,
243 _: &mut Context,
244 acc: &mut HeartbeatAccumulator,
245 ) -> Result<HandleControl> {
246 let Some(current_stat) = acc.stat.as_ref() else {
247 return Ok(HandleControl::Continue);
248 };
249
250 if !self.should_persist(current_stat.id) {
251 return Ok(HandleControl::Continue);
252 }
253
254 self.persist(
255 current_stat.timestamp_millis,
256 current_stat.id,
257 ¤t_stat.region_stats,
258 )
259 .await;
260
261 Ok(HandleControl::Continue)
262 }
263}
264
265#[cfg(test)]
266mod tests {
267 use std::sync::{Arc, Mutex};
268
269 use client::inserter::{Context as InserterContext, InsertOptions};
270 use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
271 use store_api::region_engine::RegionRole;
272 use store_api::storage::RegionId;
273
274 use super::*;
275 use crate::handler::test_utils::TestEnv;
276
277 fn create_test_region_stat(
278 table_id: u32,
279 region_number: u32,
280 written_bytes: u64,
281 engine: &str,
282 ) -> RegionStat {
283 let region_id = RegionId::new(table_id, region_number);
284 RegionStat {
285 id: region_id,
286 rcus: 100,
287 wcus: 200,
288 approximate_bytes: 1024,
289 engine: engine.to_string(),
290 role: RegionRole::Leader,
291 num_rows: 1000,
292 memtable_size: 512,
293 manifest_size: 256,
294 sst_size: 2048,
295 sst_num: 5,
296 index_size: 128,
297 region_manifest: RegionManifestInfo::Mito {
298 manifest_version: 1,
299 flushed_entry_id: 100,
300 file_removed_cnt: 0,
301 },
302 written_bytes,
303 data_topic_latest_entry_id: 200,
304 metadata_topic_latest_entry_id: 200,
305 }
306 }
307
308 #[test]
309 fn test_compute_persist_region_stat_with_no_persisted_stat() {
310 let region_stat = create_test_region_stat(1, 1, 1000, "mito");
311 let datanode_id = 123;
312 let timestamp_millis = 1640995200000; let result = compute_persist_region_stat(®ion_stat, datanode_id, timestamp_millis, None);
314 assert_eq!(result.table_id, 1);
315 assert_eq!(result.region_id, region_stat.id.as_u64());
316 assert_eq!(result.region_number, 1);
317 assert_eq!(result.manifest_size, 256);
318 assert_eq!(result.datanode_id, datanode_id);
319 assert_eq!(result.engine, "mito");
320 assert_eq!(result.num_rows, 1000);
321 assert_eq!(result.sst_num, 5);
322 assert_eq!(result.sst_size, 2048);
323 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
325 }
326
327 #[test]
328 fn test_compute_persist_region_stat_with_persisted_stat_increase() {
329 let region_stat = create_test_region_stat(2, 3, 1500, "mito");
330 let datanode_id = 456;
331 let timestamp_millis = 1640995260000; let persisted_stat = PersistedRegionStat {
333 region_id: region_stat.id,
334 written_bytes: 1000, };
336 let result = compute_persist_region_stat(
337 ®ion_stat,
338 datanode_id,
339 timestamp_millis,
340 Some(persisted_stat),
341 );
342 assert_eq!(result.table_id, 2);
343 assert_eq!(result.region_id, region_stat.id.as_u64());
344 assert_eq!(result.region_number, 3);
345 assert_eq!(result.manifest_size, 256);
346 assert_eq!(result.datanode_id, datanode_id);
347 assert_eq!(result.engine, "mito");
348 assert_eq!(result.num_rows, 1000);
349 assert_eq!(result.sst_num, 5);
350 assert_eq!(result.sst_size, 2048);
351 assert_eq!(result.write_bytes_delta, 500); assert_eq!(result.timestamp_millis, timestamp_millis);
353 }
354
355 #[test]
356 fn test_compute_persist_region_stat_with_persisted_stat_decrease() {
357 let region_stat = create_test_region_stat(3, 5, 800, "mito");
358 let datanode_id = 789;
359 let timestamp_millis = 1640995320000; let persisted_stat = PersistedRegionStat {
361 region_id: region_stat.id,
362 written_bytes: 1200, };
364 let result = compute_persist_region_stat(
365 ®ion_stat,
366 datanode_id,
367 timestamp_millis,
368 Some(persisted_stat),
369 );
370 assert_eq!(result.table_id, 3);
371 assert_eq!(result.region_id, region_stat.id.as_u64());
372 assert_eq!(result.region_number, 5);
373 assert_eq!(result.manifest_size, 256);
374 assert_eq!(result.datanode_id, datanode_id);
375 assert_eq!(result.engine, "mito");
376 assert_eq!(result.num_rows, 1000);
377 assert_eq!(result.sst_num, 5);
378 assert_eq!(result.sst_size, 2048);
379 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
381 }
382
383 #[test]
384 fn test_compute_persist_region_stat_with_persisted_stat_equal() {
385 let region_stat = create_test_region_stat(4, 7, 2000, "mito");
386 let datanode_id = 101;
387 let timestamp_millis = 1640995380000; let persisted_stat = PersistedRegionStat {
389 region_id: region_stat.id,
390 written_bytes: 2000, };
392 let result = compute_persist_region_stat(
393 ®ion_stat,
394 datanode_id,
395 timestamp_millis,
396 Some(persisted_stat),
397 );
398 assert_eq!(result.table_id, 4);
399 assert_eq!(result.region_id, region_stat.id.as_u64());
400 assert_eq!(result.region_number, 7);
401 assert_eq!(result.manifest_size, 256);
402 assert_eq!(result.datanode_id, datanode_id);
403 assert_eq!(result.engine, "mito");
404 assert_eq!(result.num_rows, 1000);
405 assert_eq!(result.sst_num, 5);
406 assert_eq!(result.sst_size, 2048);
407 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
409 }
410
411 #[test]
412 fn test_compute_persist_region_stat_with_overflow_protection() {
413 let region_stat = create_test_region_stat(8, 15, 500, "mito");
414 let datanode_id = 505;
415 let timestamp_millis = 1640995620000; let persisted_stat = PersistedRegionStat {
417 region_id: region_stat.id,
418 written_bytes: 1000, };
420 let result = compute_persist_region_stat(
421 ®ion_stat,
422 datanode_id,
423 timestamp_millis,
424 Some(persisted_stat),
425 );
426 assert_eq!(result.table_id, 8);
427 assert_eq!(result.region_id, region_stat.id.as_u64());
428 assert_eq!(result.region_number, 15);
429 assert_eq!(result.manifest_size, 256);
430 assert_eq!(result.datanode_id, datanode_id);
431 assert_eq!(result.engine, "mito");
432 assert_eq!(result.num_rows, 1000);
433 assert_eq!(result.sst_num, 5);
434 assert_eq!(result.sst_size, 2048);
435 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
437 }
438
439 struct MockInserter {
440 requests: Arc<Mutex<Vec<api::v1::RowInsertRequest>>>,
441 }
442
443 #[async_trait::async_trait]
444 impl Inserter for MockInserter {
445 async fn insert_rows(
446 &self,
447 _context: &InserterContext<'_>,
448 requests: api::v1::RowInsertRequests,
449 ) -> client::error::Result<()> {
450 self.requests.lock().unwrap().extend(requests.inserts);
451
452 Ok(())
453 }
454
455 fn set_options(&mut self, _options: &InsertOptions) {}
456 }
457
458 #[tokio::test]
459 async fn test_not_persist_region_stats() {
460 let env = TestEnv::new();
461 let mut ctx = env.ctx();
462
463 let requests = Arc::new(Mutex::new(vec![]));
464 let inserter = MockInserter {
465 requests: requests.clone(),
466 };
467 let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
468 let mut acc = HeartbeatAccumulator {
469 stat: Some(Stat {
470 id: 1,
471 timestamp_millis: 1640995200000,
472 region_stats: vec![create_test_region_stat(1, 1, 1000, "mito")],
473 ..Default::default()
474 }),
475 ..Default::default()
476 };
477 handler.last_persisted_time.insert(1, Instant::now());
478 handler
480 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
481 .await
482 .unwrap();
483 assert!(requests.lock().unwrap().is_empty());
484 }
485
486 #[tokio::test]
487 async fn test_persist_region_stats() {
488 let env = TestEnv::new();
489 let mut ctx = env.ctx();
490 let requests = Arc::new(Mutex::new(vec![]));
491 let inserter = MockInserter {
492 requests: requests.clone(),
493 };
494
495 let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
496
497 let region_stat = create_test_region_stat(1, 1, 1000, "mito");
498 let timestamp_millis = 1640995200000;
499 let datanode_id = 1;
500 let region_id = RegionId::new(1, 1);
501 let mut acc = HeartbeatAccumulator {
502 stat: Some(Stat {
503 id: datanode_id,
504 timestamp_millis,
505 region_stats: vec![region_stat.clone()],
506 ..Default::default()
507 }),
508 ..Default::default()
509 };
510
511 handler.last_persisted_region_stats.insert(
512 region_id,
513 PersistedRegionStat {
514 region_id,
515 written_bytes: 500,
516 },
517 );
518 let (expected_row, expected_persisted_region_stat) = to_persisted_if_leader(
519 ®ion_stat,
520 &handler.last_persisted_region_stats,
521 datanode_id,
522 timestamp_millis,
523 )
524 .unwrap();
525 let before_insert_time = Instant::now();
526 handler
528 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
529 .await
530 .unwrap();
531 let request = {
532 let mut requests = requests.lock().unwrap();
533 assert_eq!(requests.len(), 1);
534 requests.pop().unwrap()
535 };
536 assert_eq!(
537 request.table_name,
538 META_REGION_STATS_HISTORY_TABLE_NAME.to_string()
539 );
540 assert_eq!(request.rows.unwrap().rows, vec![expected_row]);
541
542 assert!(
544 handler
545 .last_persisted_time
546 .get(&datanode_id)
547 .unwrap()
548 .gt(&before_insert_time)
549 );
550
551 assert_eq!(
553 handler
554 .last_persisted_region_stats
555 .get(®ion_id)
556 .unwrap()
557 .value(),
558 &expected_persisted_region_stat
559 );
560 }
561}