1use std::time::{Duration, Instant};
16
17use api::v1::meta::{HeartbeatRequest, Role};
18use api::v1::value::ValueData;
19use api::v1::{ColumnSchema, Row, RowInsertRequest, RowInsertRequests, Rows, Value};
20use client::DEFAULT_CATALOG_NAME;
21use client::inserter::{Context as InserterContext, Inserter};
22use common_catalog::consts::DEFAULT_PRIVATE_SCHEMA_NAME;
23use common_macro::{Schema, ToRow};
24use common_meta::DatanodeId;
25use common_meta::datanode::RegionStat;
26use common_telemetry::warn;
27use dashmap::DashMap;
28use store_api::region_engine::RegionRole;
29use store_api::storage::RegionId;
30
31use crate::error::Result;
32use crate::handler::{HandleControl, HeartbeatAccumulator, HeartbeatHandler};
33use crate::metasrv::Context;
34
35pub struct PersistStatsHandler {
37 inserter: Box<dyn Inserter>,
38 last_persisted_region_stats: DashMap<RegionId, PersistedRegionStat>,
39 last_persisted_time: DashMap<DatanodeId, Instant>,
40 persist_interval: Duration,
41}
42
43const META_REGION_STATS_HISTORY_TABLE_NAME: &str = "region_statistics_history";
45const DEFAULT_CONTEXT: InserterContext = InserterContext {
47 catalog: DEFAULT_CATALOG_NAME,
48 schema: DEFAULT_PRIVATE_SCHEMA_NAME,
49};
50
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52struct PersistedRegionStat {
53 region_id: RegionId,
54 written_bytes: u64,
55}
56
57impl From<&RegionStat> for PersistedRegionStat {
58 fn from(stat: &RegionStat) -> Self {
59 Self {
60 region_id: stat.id,
61 written_bytes: stat.written_bytes,
62 }
63 }
64}
65
66#[derive(ToRow, Schema)]
67struct PersistRegionStat<'a> {
68 table_id: u32,
69 region_id: u64,
70 region_number: u32,
71 manifest_size: u64,
72 datanode_id: u64,
73 #[col(datatype = "string")]
74 engine: &'a str,
75 num_rows: u64,
76 sst_num: u64,
77 sst_size: u64,
78 write_bytes_delta: u64,
79 #[col(
80 name = "greptime_timestamp",
82 semantic = "Timestamp",
83 datatype = "TimestampMillisecond"
84 )]
85 timestamp_millis: i64,
86}
87
88fn compute_persist_region_stat(
90 region_stat: &RegionStat,
91 datanode_id: DatanodeId,
92 timestamp_millis: i64,
93 persisted_region_stat: Option<PersistedRegionStat>,
94) -> PersistRegionStat<'_> {
95 let write_bytes_delta = persisted_region_stat
96 .and_then(|persisted_region_stat| {
97 region_stat
98 .written_bytes
99 .checked_sub(persisted_region_stat.written_bytes)
100 })
101 .unwrap_or_default();
102
103 PersistRegionStat {
104 table_id: region_stat.id.table_id(),
105 region_id: region_stat.id.as_u64(),
106 region_number: region_stat.id.region_number(),
107 manifest_size: region_stat.manifest_size,
108 datanode_id,
109 engine: region_stat.engine.as_str(),
110 num_rows: region_stat.num_rows,
111 sst_num: region_stat.sst_num,
112 sst_size: region_stat.sst_size,
113 write_bytes_delta,
114 timestamp_millis,
115 }
116}
117
118fn to_persisted_if_leader(
119 region_stat: &RegionStat,
120 last_persisted_region_stats: &DashMap<RegionId, PersistedRegionStat>,
121 datanode_id: DatanodeId,
122 timestamp_millis: i64,
123) -> Option<(Row, PersistedRegionStat)> {
124 if matches!(region_stat.role, RegionRole::Leader) {
125 let persisted_region_stat = last_persisted_region_stats.get(®ion_stat.id).map(|s| *s);
126 Some((
127 compute_persist_region_stat(
128 region_stat,
129 datanode_id,
130 timestamp_millis,
131 persisted_region_stat,
132 )
133 .to_row(),
134 PersistedRegionStat::from(region_stat),
135 ))
136 } else {
137 None
138 }
139}
140
141fn align_ts(ts: i64, interval: Duration) -> i64 {
146 assert!(
147 interval.as_millis() != 0,
148 "interval must be greater than zero"
149 );
150 ts / interval.as_millis() as i64 * interval.as_millis() as i64
151}
152
153impl PersistStatsHandler {
154 pub fn new(inserter: Box<dyn Inserter>, mut persist_interval: Duration) -> Self {
156 if persist_interval < Duration::from_mins(10) {
157 warn!("persist_interval is less than 10 minutes, set to 10 minutes");
158 persist_interval = Duration::from_mins(10);
159 }
160
161 Self {
162 inserter,
163 last_persisted_region_stats: DashMap::new(),
164 last_persisted_time: DashMap::new(),
165 persist_interval,
166 }
167 }
168
169 fn should_persist(&self, datanode_id: DatanodeId) -> bool {
170 let Some(last_persisted_time) = self.last_persisted_time.get(&datanode_id) else {
171 return true;
172 };
173
174 last_persisted_time.elapsed() >= self.persist_interval
175 }
176
177 async fn persist(
178 &self,
179 timestamp_millis: i64,
180 datanode_id: DatanodeId,
181 region_stats: &[RegionStat],
182 ) {
183 let aligned_ts = align_ts(timestamp_millis, self.persist_interval);
185 let (rows, incoming_region_stats): (Vec<_>, Vec<_>) = region_stats
186 .iter()
187 .flat_map(|region_stat| {
188 to_persisted_if_leader(
189 region_stat,
190 &self.last_persisted_region_stats,
191 datanode_id,
192 aligned_ts,
193 )
194 })
195 .unzip();
196
197 if rows.is_empty() {
198 return;
199 }
200
201 if let Err(err) = self
202 .inserter
203 .insert_rows(
204 &DEFAULT_CONTEXT,
205 RowInsertRequests {
206 inserts: vec![RowInsertRequest {
207 table_name: META_REGION_STATS_HISTORY_TABLE_NAME.to_string(),
208 rows: Some(Rows {
209 schema: PersistRegionStat::schema(),
210 rows,
211 }),
212 }],
213 },
214 )
215 .await
216 {
217 warn!(
218 "Failed to persist region stats, datanode_id: {}, error: {:?}",
219 datanode_id, err
220 );
221 return;
222 }
223
224 self.last_persisted_time.insert(datanode_id, Instant::now());
225 for s in incoming_region_stats {
226 self.last_persisted_region_stats.insert(s.region_id, s);
227 }
228 }
229}
230
231#[async_trait::async_trait]
232impl HeartbeatHandler for PersistStatsHandler {
233 fn is_acceptable(&self, role: Role) -> bool {
234 role == Role::Datanode
235 }
236
237 async fn handle(
238 &self,
239 _req: &HeartbeatRequest,
240 _: &mut Context,
241 acc: &mut HeartbeatAccumulator,
242 ) -> Result<HandleControl> {
243 let Some(current_stat) = acc.stat.as_ref() else {
244 return Ok(HandleControl::Continue);
245 };
246
247 if !self.should_persist(current_stat.id) {
248 return Ok(HandleControl::Continue);
249 }
250
251 self.persist(
252 current_stat.timestamp_millis,
253 current_stat.id,
254 ¤t_stat.region_stats,
255 )
256 .await;
257
258 Ok(HandleControl::Continue)
259 }
260}
261
262#[cfg(test)]
263mod tests {
264 use std::sync::{Arc, Mutex};
265
266 use client::inserter::{Context as InserterContext, InsertOptions};
267 use common_meta::datanode::{RegionManifestInfo, RegionStat, Stat};
268 use store_api::region_engine::RegionRole;
269 use store_api::storage::RegionId;
270
271 use super::*;
272 use crate::handler::test_utils::TestEnv;
273
274 fn create_test_region_stat(
275 table_id: u32,
276 region_number: u32,
277 written_bytes: u64,
278 engine: &str,
279 ) -> RegionStat {
280 let region_id = RegionId::new(table_id, region_number);
281 RegionStat {
282 id: region_id,
283 rcus: 100,
284 wcus: 200,
285 approximate_bytes: 1024,
286 engine: engine.to_string(),
287 role: RegionRole::Leader,
288 num_rows: 1000,
289 memtable_size: 512,
290 manifest_size: 256,
291 sst_size: 2048,
292 sst_num: 5,
293 index_size: 128,
294 region_manifest: RegionManifestInfo::Mito {
295 manifest_version: 1,
296 flushed_entry_id: 100,
297 file_removed_cnt: 0,
298 },
299 written_bytes,
300 data_topic_latest_entry_id: 200,
301 metadata_topic_latest_entry_id: 200,
302 }
303 }
304
305 #[test]
306 fn test_compute_persist_region_stat_with_no_persisted_stat() {
307 let region_stat = create_test_region_stat(1, 1, 1000, "mito");
308 let datanode_id = 123;
309 let timestamp_millis = 1640995200000; let result = compute_persist_region_stat(®ion_stat, datanode_id, timestamp_millis, None);
311 assert_eq!(result.table_id, 1);
312 assert_eq!(result.region_id, region_stat.id.as_u64());
313 assert_eq!(result.region_number, 1);
314 assert_eq!(result.manifest_size, 256);
315 assert_eq!(result.datanode_id, datanode_id);
316 assert_eq!(result.engine, "mito");
317 assert_eq!(result.num_rows, 1000);
318 assert_eq!(result.sst_num, 5);
319 assert_eq!(result.sst_size, 2048);
320 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
322 }
323
324 #[test]
325 fn test_compute_persist_region_stat_with_persisted_stat_increase() {
326 let region_stat = create_test_region_stat(2, 3, 1500, "mito");
327 let datanode_id = 456;
328 let timestamp_millis = 1640995260000; let persisted_stat = PersistedRegionStat {
330 region_id: region_stat.id,
331 written_bytes: 1000, };
333 let result = compute_persist_region_stat(
334 ®ion_stat,
335 datanode_id,
336 timestamp_millis,
337 Some(persisted_stat),
338 );
339 assert_eq!(result.table_id, 2);
340 assert_eq!(result.region_id, region_stat.id.as_u64());
341 assert_eq!(result.region_number, 3);
342 assert_eq!(result.manifest_size, 256);
343 assert_eq!(result.datanode_id, datanode_id);
344 assert_eq!(result.engine, "mito");
345 assert_eq!(result.num_rows, 1000);
346 assert_eq!(result.sst_num, 5);
347 assert_eq!(result.sst_size, 2048);
348 assert_eq!(result.write_bytes_delta, 500); assert_eq!(result.timestamp_millis, timestamp_millis);
350 }
351
352 #[test]
353 fn test_compute_persist_region_stat_with_persisted_stat_decrease() {
354 let region_stat = create_test_region_stat(3, 5, 800, "mito");
355 let datanode_id = 789;
356 let timestamp_millis = 1640995320000; let persisted_stat = PersistedRegionStat {
358 region_id: region_stat.id,
359 written_bytes: 1200, };
361 let result = compute_persist_region_stat(
362 ®ion_stat,
363 datanode_id,
364 timestamp_millis,
365 Some(persisted_stat),
366 );
367 assert_eq!(result.table_id, 3);
368 assert_eq!(result.region_id, region_stat.id.as_u64());
369 assert_eq!(result.region_number, 5);
370 assert_eq!(result.manifest_size, 256);
371 assert_eq!(result.datanode_id, datanode_id);
372 assert_eq!(result.engine, "mito");
373 assert_eq!(result.num_rows, 1000);
374 assert_eq!(result.sst_num, 5);
375 assert_eq!(result.sst_size, 2048);
376 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
378 }
379
380 #[test]
381 fn test_compute_persist_region_stat_with_persisted_stat_equal() {
382 let region_stat = create_test_region_stat(4, 7, 2000, "mito");
383 let datanode_id = 101;
384 let timestamp_millis = 1640995380000; let persisted_stat = PersistedRegionStat {
386 region_id: region_stat.id,
387 written_bytes: 2000, };
389 let result = compute_persist_region_stat(
390 ®ion_stat,
391 datanode_id,
392 timestamp_millis,
393 Some(persisted_stat),
394 );
395 assert_eq!(result.table_id, 4);
396 assert_eq!(result.region_id, region_stat.id.as_u64());
397 assert_eq!(result.region_number, 7);
398 assert_eq!(result.manifest_size, 256);
399 assert_eq!(result.datanode_id, datanode_id);
400 assert_eq!(result.engine, "mito");
401 assert_eq!(result.num_rows, 1000);
402 assert_eq!(result.sst_num, 5);
403 assert_eq!(result.sst_size, 2048);
404 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
406 }
407
408 #[test]
409 fn test_compute_persist_region_stat_with_overflow_protection() {
410 let region_stat = create_test_region_stat(8, 15, 500, "mito");
411 let datanode_id = 505;
412 let timestamp_millis = 1640995620000; let persisted_stat = PersistedRegionStat {
414 region_id: region_stat.id,
415 written_bytes: 1000, };
417 let result = compute_persist_region_stat(
418 ®ion_stat,
419 datanode_id,
420 timestamp_millis,
421 Some(persisted_stat),
422 );
423 assert_eq!(result.table_id, 8);
424 assert_eq!(result.region_id, region_stat.id.as_u64());
425 assert_eq!(result.region_number, 15);
426 assert_eq!(result.manifest_size, 256);
427 assert_eq!(result.datanode_id, datanode_id);
428 assert_eq!(result.engine, "mito");
429 assert_eq!(result.num_rows, 1000);
430 assert_eq!(result.sst_num, 5);
431 assert_eq!(result.sst_size, 2048);
432 assert_eq!(result.write_bytes_delta, 0); assert_eq!(result.timestamp_millis, timestamp_millis);
434 }
435
436 struct MockInserter {
437 requests: Arc<Mutex<Vec<api::v1::RowInsertRequest>>>,
438 }
439
440 #[async_trait::async_trait]
441 impl Inserter for MockInserter {
442 async fn insert_rows(
443 &self,
444 _context: &InserterContext<'_>,
445 requests: api::v1::RowInsertRequests,
446 ) -> client::error::Result<()> {
447 self.requests.lock().unwrap().extend(requests.inserts);
448
449 Ok(())
450 }
451
452 fn set_options(&mut self, _options: &InsertOptions) {}
453 }
454
455 #[tokio::test]
456 async fn test_not_persist_region_stats() {
457 let env = TestEnv::new();
458 let mut ctx = env.ctx();
459
460 let requests = Arc::new(Mutex::new(vec![]));
461 let inserter = MockInserter {
462 requests: requests.clone(),
463 };
464 let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
465 let mut acc = HeartbeatAccumulator {
466 stat: Some(Stat {
467 id: 1,
468 timestamp_millis: 1640995200000,
469 region_stats: vec![create_test_region_stat(1, 1, 1000, "mito")],
470 ..Default::default()
471 }),
472 ..Default::default()
473 };
474 handler.last_persisted_time.insert(1, Instant::now());
475 handler
477 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
478 .await
479 .unwrap();
480 assert!(requests.lock().unwrap().is_empty());
481 }
482
483 #[tokio::test]
484 async fn test_persist_region_stats() {
485 let env = TestEnv::new();
486 let mut ctx = env.ctx();
487 let requests = Arc::new(Mutex::new(vec![]));
488 let inserter = MockInserter {
489 requests: requests.clone(),
490 };
491
492 let handler = PersistStatsHandler::new(Box::new(inserter), Duration::from_secs(10));
493
494 let region_stat = create_test_region_stat(1, 1, 1000, "mito");
495 let timestamp_millis = 1640995200000;
496 let datanode_id = 1;
497 let region_id = RegionId::new(1, 1);
498 let mut acc = HeartbeatAccumulator {
499 stat: Some(Stat {
500 id: datanode_id,
501 timestamp_millis,
502 region_stats: vec![region_stat.clone()],
503 ..Default::default()
504 }),
505 ..Default::default()
506 };
507
508 handler.last_persisted_region_stats.insert(
509 region_id,
510 PersistedRegionStat {
511 region_id,
512 written_bytes: 500,
513 },
514 );
515 let (expected_row, expected_persisted_region_stat) = to_persisted_if_leader(
516 ®ion_stat,
517 &handler.last_persisted_region_stats,
518 datanode_id,
519 timestamp_millis,
520 )
521 .unwrap();
522 let before_insert_time = Instant::now();
523 handler
525 .handle(&HeartbeatRequest::default(), &mut ctx, &mut acc)
526 .await
527 .unwrap();
528 let request = {
529 let mut requests = requests.lock().unwrap();
530 assert_eq!(requests.len(), 1);
531 requests.pop().unwrap()
532 };
533 assert_eq!(
534 request.table_name,
535 META_REGION_STATS_HISTORY_TABLE_NAME.to_string()
536 );
537 assert_eq!(request.rows.unwrap().rows, vec![expected_row]);
538
539 assert!(
541 handler
542 .last_persisted_time
543 .get(&datanode_id)
544 .unwrap()
545 .gt(&before_insert_time)
546 );
547
548 assert_eq!(
550 handler
551 .last_persisted_region_stats
552 .get(®ion_id)
553 .unwrap()
554 .value(),
555 &expected_persisted_region_stat
556 );
557 }
558}