1use std::time::Duration;
16
17use lazy_static::lazy_static;
18use prometheus::*;
19use puffin::puffin_manager::stager::StagerNotifier;
20
21pub const STAGE_LABEL: &str = "stage";
23pub const TYPE_LABEL: &str = "type";
25const CACHE_EVICTION_CAUSE: &str = "cause";
26pub const FLUSH_REASON: &str = "reason";
28pub const FILE_TYPE_LABEL: &str = "file_type";
30pub const WORKER_LABEL: &str = "worker";
32pub const PARTITION_LABEL: &str = "partition";
34pub const STAGING_TYPE: &str = "index_staging";
36pub const RECYCLE_TYPE: &str = "recycle_bin";
38
39lazy_static! {
40 pub static ref WRITE_BUFFER_BYTES: IntGauge =
42 register_int_gauge!("greptime_mito_write_buffer_bytes", "mito write buffer bytes").unwrap();
43 pub static ref MEMTABLE_DICT_BYTES: IntGauge =
45 register_int_gauge!("greptime_mito_memtable_dict_bytes", "mito memtable dictionary size in bytes").unwrap();
46 pub static ref REGION_COUNT: IntGaugeVec =
48 register_int_gauge_vec!(
49 "greptime_mito_region_count",
50 "mito region count in each worker",
51 &[WORKER_LABEL],
52 ).unwrap();
53 pub static ref HANDLE_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!(
55 "greptime_mito_handle_request_elapsed",
56 "mito handle request elapsed",
57 &[TYPE_LABEL],
58 exponential_buckets(0.01, 10.0, 7).unwrap(),
60 )
61 .unwrap();
62
63 pub static ref FLUSH_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
67 "greptime_mito_flush_requests_total",
68 "mito flush requests total",
69 &[FLUSH_REASON]
70 )
71 .unwrap();
72 pub static ref FLUSH_FAILURE_TOTAL: IntCounter =
74 register_int_counter!("greptime_mito_flush_failure_total", "mito flush failure total").unwrap();
75 pub static ref FLUSH_ELAPSED: HistogramVec = register_histogram_vec!(
77 "greptime_mito_flush_elapsed",
78 "mito flush elapsed",
79 &[TYPE_LABEL],
80 exponential_buckets(1.0, 5.0, 6).unwrap(),
82 )
83 .unwrap();
84 pub static ref FLUSH_BYTES_TOTAL: IntCounter =
86 register_int_counter!("greptime_mito_flush_bytes_total", "mito flush bytes total").unwrap();
87 pub static ref INFLIGHT_FLUSH_COUNT: IntGauge =
89 register_int_gauge!(
90 "greptime_mito_inflight_flush_count",
91 "inflight flush count",
92 ).unwrap();
93 pub static ref WRITE_REJECT_TOTAL: IntCounter =
100 register_int_counter!("greptime_mito_write_reject_total", "mito write reject total").unwrap();
101 pub static ref WRITE_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
103 "greptime_mito_write_stage_elapsed",
104 "mito write stage elapsed",
105 &[STAGE_LABEL],
106 exponential_buckets(0.01, 10.0, 6).unwrap(),
108 )
109 .unwrap();
110 pub static ref WRITE_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!(
112 "greptime_mito_write_rows_total",
113 "mito write rows total",
114 &[TYPE_LABEL]
115 )
116 .unwrap();
117 pub static ref COMPACTION_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
131 "greptime_mito_compaction_stage_elapsed",
132 "mito compaction stage elapsed",
133 &[STAGE_LABEL],
134 exponential_buckets(1.0, 10.0, 6).unwrap(),
136 )
137 .unwrap();
138 pub static ref COMPACTION_ELAPSED_TOTAL: Histogram =
140 register_histogram!(
141 "greptime_mito_compaction_total_elapsed",
142 "mito compaction total elapsed",
143 exponential_buckets(1.0, 10.0, 6).unwrap(),
145 ).unwrap();
146 pub static ref COMPACTION_REQUEST_COUNT: IntCounter =
148 register_int_counter!("greptime_mito_compaction_requests_total", "mito compaction requests total").unwrap();
149 pub static ref COMPACTION_FAILURE_COUNT: IntCounter =
151 register_int_counter!("greptime_mito_compaction_failure_total", "mito compaction failure total").unwrap();
152
153 pub static ref INFLIGHT_COMPACTION_COUNT: IntGauge =
155 register_int_gauge!(
156 "greptime_mito_inflight_compaction_count",
157 "inflight compaction count",
158 ).unwrap();
159
160 pub static ref READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
163 "greptime_mito_read_stage_elapsed",
164 "mito read stage elapsed",
165 &[STAGE_LABEL],
166 exponential_buckets(0.01, 10.0, 7).unwrap(),
168 )
169 .unwrap();
170 pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]);
171 pub static ref IN_PROGRESS_SCAN: IntGaugeVec = register_int_gauge_vec!(
173 "greptime_mito_in_progress_scan",
174 "mito in progress scan per partition",
175 &[TYPE_LABEL, PARTITION_LABEL]
176 )
177 .unwrap();
178 pub static ref READ_ROWS_TOTAL: IntCounterVec =
180 register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();
181 pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec =
183 register_int_counter_vec!("greptime_mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap();
184 pub static ref READ_ROW_GROUPS_TOTAL: IntCounterVec =
186 register_int_counter_vec!("greptime_mito_read_row_groups_total", "mito read row groups total", &[TYPE_LABEL]).unwrap();
187 pub static ref PRECISE_FILTER_ROWS_TOTAL: IntCounterVec =
189 register_int_counter_vec!("greptime_mito_precise_filter_rows_total", "mito precise filter rows total", &[TYPE_LABEL]).unwrap();
190 pub static ref READ_ROWS_IN_ROW_GROUP_TOTAL: IntCounterVec =
191 register_int_counter_vec!("greptime_mito_read_rows_in_row_group_total", "mito read rows in row group total", &[TYPE_LABEL]).unwrap();
192 pub static ref READ_SST_COUNT: Histogram = register_histogram!(
194 "greptime_mito_read_sst_count",
195 "Number of SSTs to scan in a scan task",
196 vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 256.0, 1024.0],
197 ).unwrap();
198 pub static ref READ_ROWS_RETURN: Histogram = register_histogram!(
200 "greptime_mito_read_rows_return",
201 "Number of rows returned in a scan task",
202 exponential_buckets(100.0, 10.0, 8).unwrap(),
203 ).unwrap();
204 pub static ref READ_BATCHES_RETURN: Histogram = register_histogram!(
206 "greptime_mito_read_batches_return",
207 "Number of rows returned in a scan task",
208 exponential_buckets(100.0, 10.0, 7).unwrap(),
209 ).unwrap();
210 pub static ref CACHE_HIT: IntCounterVec = register_int_counter_vec!(
215 "greptime_mito_cache_hit",
216 "mito cache hit",
217 &[TYPE_LABEL]
218 )
219 .unwrap();
220 pub static ref CACHE_MISS: IntCounterVec = register_int_counter_vec!(
222 "greptime_mito_cache_miss",
223 "mito cache miss",
224 &[TYPE_LABEL]
225 )
226 .unwrap();
227 pub static ref CACHE_BYTES: IntGaugeVec = register_int_gauge_vec!(
229 "greptime_mito_cache_bytes",
230 "mito cache bytes",
231 &[TYPE_LABEL]
232 )
233 .unwrap();
234 pub static ref WRITE_CACHE_DOWNLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
236 "mito_write_cache_download_bytes_total",
237 "mito write cache download bytes total",
238 ).unwrap();
239 pub static ref WRITE_CACHE_DOWNLOAD_ELAPSED: HistogramVec = register_histogram_vec!(
241 "mito_write_cache_download_elapsed",
242 "mito write cache download elapsed",
243 &[TYPE_LABEL],
244 exponential_buckets(0.1, 10.0, 6).unwrap(),
246 ).unwrap();
247 pub static ref WRITE_CACHE_INFLIGHT_DOWNLOAD: IntGauge = register_int_gauge!(
249 "mito_write_cache_inflight_download_count",
250 "mito write cache inflight download tasks",
251 ).unwrap();
252 pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
254 "mito_upload_bytes_total",
255 "mito upload bytes total",
256 )
257 .unwrap();
258 pub static ref CACHE_EVICTION: IntCounterVec = register_int_counter_vec!(
260 "greptime_mito_cache_eviction",
261 "mito cache eviction",
262 &[TYPE_LABEL, CACHE_EVICTION_CAUSE]
263 ).unwrap();
264 pub static ref INDEX_APPLY_ELAPSED: HistogramVec = register_histogram_vec!(
269 "greptime_index_apply_elapsed",
270 "index apply elapsed",
271 &[TYPE_LABEL],
272 exponential_buckets(0.01, 10.0, 6).unwrap(),
274 )
275 .unwrap();
276 pub static ref INDEX_APPLY_MEMORY_USAGE: IntGauge = register_int_gauge!(
278 "greptime_index_apply_memory_usage",
279 "index apply memory usage",
280 )
281 .unwrap();
282 pub static ref INDEX_CREATE_ELAPSED: HistogramVec = register_histogram_vec!(
284 "greptime_index_create_elapsed",
285 "index create elapsed",
286 &[STAGE_LABEL, TYPE_LABEL],
287 exponential_buckets(0.1, 10.0, 6).unwrap(),
289 )
290 .unwrap();
291 pub static ref INDEX_CREATE_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!(
293 "greptime_index_create_rows_total",
294 "index create rows total",
295 &[TYPE_LABEL],
296 )
297 .unwrap();
298 pub static ref INDEX_CREATE_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
300 "greptime_index_create_bytes_total",
301 "index create bytes total",
302 &[TYPE_LABEL],
303 )
304 .unwrap();
305 pub static ref INDEX_CREATE_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec!(
307 "greptime_index_create_memory_usage",
308 "index create memory usage",
309 &[TYPE_LABEL],
310 ).unwrap();
311 pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
313 "greptime_index_io_bytes_total",
314 "index io bytes total",
315 &[TYPE_LABEL, FILE_TYPE_LABEL]
316 )
317 .unwrap();
318 pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
320 .with_label_values(&["read", "puffin"]);
321 pub static ref INDEX_PUFFIN_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
323 .with_label_values(&["write", "puffin"]);
324 pub static ref INDEX_INTERMEDIATE_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
326 .with_label_values(&["read", "intermediate"]);
327 pub static ref INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
329 .with_label_values(&["write", "intermediate"]);
330
331 pub static ref INDEX_IO_OP_TOTAL: IntCounterVec = register_int_counter_vec!(
333 "greptime_index_io_op_total",
334 "index io op total",
335 &[TYPE_LABEL, FILE_TYPE_LABEL]
336 )
337 .unwrap();
338 pub static ref INDEX_PUFFIN_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
340 .with_label_values(&["read", "puffin"]);
341 pub static ref INDEX_PUFFIN_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
343 .with_label_values(&["seek", "puffin"]);
344 pub static ref INDEX_PUFFIN_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
346 .with_label_values(&["write", "puffin"]);
347 pub static ref INDEX_PUFFIN_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
349 .with_label_values(&["flush", "puffin"]);
350 pub static ref INDEX_INTERMEDIATE_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
352 .with_label_values(&["read", "intermediate"]);
353 pub static ref INDEX_INTERMEDIATE_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
355 .with_label_values(&["seek", "intermediate"]);
356 pub static ref INDEX_INTERMEDIATE_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
358 .with_label_values(&["write", "intermediate"]);
359 pub static ref INDEX_INTERMEDIATE_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
361 .with_label_values(&["flush", "intermediate"]);
362 pub static ref PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
366 "greptime_partition_tree_buffer_freeze_stage_elapsed",
367 "mito partition tree data buffer freeze stage elapsed",
368 &[STAGE_LABEL],
369 exponential_buckets(0.01, 10.0, 6).unwrap(),
371 )
372 .unwrap();
373
374 pub static ref PARTITION_TREE_READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
376 "greptime_partition_tree_read_stage_elapsed",
377 "mito partition tree read stage elapsed",
378 &[STAGE_LABEL],
379 exponential_buckets(0.01, 10.0, 6).unwrap(),
381 )
382 .unwrap();
383
384 pub static ref MANIFEST_OP_ELAPSED: HistogramVec = register_histogram_vec!(
391 "greptime_manifest_op_elapsed",
392 "mito manifest operation elapsed",
393 &["op"],
394 exponential_buckets(0.01, 10.0, 6).unwrap(),
396 ).unwrap();
397
398
399 pub static ref REGION_WORKER_HANDLE_WRITE_ELAPSED: HistogramVec = register_histogram_vec!(
400 "greptime_region_worker_handle_write",
401 "elapsed time for handling writes in region worker loop",
402 &["stage"],
403 exponential_buckets(0.001, 10.0, 5).unwrap()
404 ).unwrap();
405
406}
407
408lazy_static! {
410 pub static ref COMPACTION_INPUT_BYTES: Counter = register_counter!(
412 "greptime_mito_compaction_input_bytes",
413 "mito compaction input file size",
414 ).unwrap();
415
416 pub static ref COMPACTION_OUTPUT_BYTES: Counter = register_counter!(
418 "greptime_mito_compaction_output_bytes",
419 "mito compaction output file size",
420 ).unwrap();
421
422 pub static ref MEMTABLE_ACTIVE_SERIES_COUNT: IntGauge = register_int_gauge!(
424 "greptime_mito_memtable_active_series_count",
425 "active time series count in TimeSeriesMemtable",
426 ).unwrap();
427
428 pub static ref MEMTABLE_ACTIVE_FIELD_BUILDER_COUNT: IntGauge = register_int_gauge!(
430 "greptime_mito_memtable_field_builder_count",
431 "active field builder count in TimeSeriesMemtable",
432 ).unwrap();
433
434 pub static ref WRITE_STALLING: IntGaugeVec = register_int_gauge_vec!(
436 "greptime_mito_write_stalling_count",
437 "mito stalled write request in each worker",
438 &[WORKER_LABEL]
439 ).unwrap();
440 pub static ref WRITE_STALL_TOTAL: IntCounter = register_int_counter!(
442 "greptime_mito_write_stall_total",
443 "Total number of stalled write requests"
444 ).unwrap();
445 pub static ref REQUEST_WAIT_TIME: HistogramVec = register_histogram_vec!(
447 "greptime_mito_request_wait_time",
448 "mito request wait time before being handled by region worker",
449 &[WORKER_LABEL],
450 exponential_buckets(0.001, 10.0, 8).unwrap(),
452 )
453 .unwrap();
454}
455
456pub struct StagerMetrics {
458 cache_hit: IntCounter,
459 cache_miss: IntCounter,
460 staging_cache_bytes: IntGauge,
461 recycle_cache_bytes: IntGauge,
462 cache_eviction: IntCounter,
463 staging_miss_read: Histogram,
464}
465
466impl StagerMetrics {
467 pub fn new() -> Self {
469 Self {
470 cache_hit: CACHE_HIT.with_label_values(&[STAGING_TYPE]),
471 cache_miss: CACHE_MISS.with_label_values(&[STAGING_TYPE]),
472 staging_cache_bytes: CACHE_BYTES.with_label_values(&[STAGING_TYPE]),
473 recycle_cache_bytes: CACHE_BYTES.with_label_values(&[RECYCLE_TYPE]),
474 cache_eviction: CACHE_EVICTION.with_label_values(&[STAGING_TYPE, "size"]),
475 staging_miss_read: READ_STAGE_ELAPSED.with_label_values(&["staging_miss_read"]),
476 }
477 }
478}
479
480impl Default for StagerMetrics {
481 fn default() -> Self {
482 Self::new()
483 }
484}
485
486impl StagerNotifier for StagerMetrics {
487 fn on_cache_hit(&self, _size: u64) {
488 self.cache_hit.inc();
489 }
490
491 fn on_cache_miss(&self, _size: u64) {
492 self.cache_miss.inc();
493 }
494
495 fn on_cache_insert(&self, size: u64) {
496 self.staging_cache_bytes.add(size as i64);
497 }
498
499 fn on_load_dir(&self, duration: Duration) {
500 self.staging_miss_read.observe(duration.as_secs_f64());
501 }
502
503 fn on_load_blob(&self, duration: Duration) {
504 self.staging_miss_read.observe(duration.as_secs_f64());
505 }
506
507 fn on_cache_evict(&self, size: u64) {
508 self.cache_eviction.inc();
509 self.staging_cache_bytes.sub(size as i64);
510 }
511
512 fn on_recycle_insert(&self, size: u64) {
513 self.recycle_cache_bytes.add(size as i64);
514 }
515
516 fn on_recycle_clear(&self, size: u64) {
517 self.recycle_cache_bytes.sub(size as i64);
518 }
519}