use std::time::Duration;
use lazy_static::lazy_static;
use prometheus::*;
use puffin::puffin_manager::stager::StagerNotifier;
pub const STAGE_LABEL: &str = "stage";
pub const TYPE_LABEL: &str = "type";
const CACHE_EVICTION_CAUSE: &str = "cause";
pub const FLUSH_REASON: &str = "reason";
pub const FILE_TYPE_LABEL: &str = "file_type";
pub const WORKER_LABEL: &str = "worker";
pub const PARTITION_LABEL: &str = "partition";
pub const STAGING_TYPE: &str = "index_staging";
pub const RECYCLE_TYPE: &str = "recycle_bin";
lazy_static! {
pub static ref WRITE_BUFFER_BYTES: IntGauge =
register_int_gauge!("greptime_mito_write_buffer_bytes", "mito write buffer bytes").unwrap();
pub static ref MEMTABLE_DICT_BYTES: IntGauge =
register_int_gauge!("greptime_mito_memtable_dict_bytes", "mito memtable dictionary size in bytes").unwrap();
pub static ref REGION_COUNT: IntGaugeVec =
register_int_gauge_vec!(
"greptime_mito_region_count",
"mito region count in each worker",
&[WORKER_LABEL],
).unwrap();
pub static ref HANDLE_REQUEST_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_mito_handle_request_elapsed",
"mito handle request elapsed",
&[TYPE_LABEL],
exponential_buckets(0.01, 10.0, 7).unwrap(),
)
.unwrap();
pub static ref FLUSH_REQUESTS_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_mito_flush_requests_total",
"mito flush requests total",
&[FLUSH_REASON]
)
.unwrap();
pub static ref FLUSH_ERRORS_TOTAL: IntCounter =
register_int_counter!("greptime_mito_flush_errors_total", "mito flush errors total").unwrap();
pub static ref FLUSH_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_mito_flush_elapsed",
"mito flush elapsed",
&[TYPE_LABEL],
exponential_buckets(1.0, 5.0, 6).unwrap(),
)
.unwrap();
pub static ref FLUSH_BYTES_TOTAL: IntCounter =
register_int_counter!("greptime_mito_flush_bytes_total", "mito flush bytes total").unwrap();
pub static ref INFLIGHT_FLUSH_COUNT: IntGauge =
register_int_gauge!(
"greptime_mito_inflight_flush_count",
"inflight flush count",
).unwrap();
pub static ref WRITE_STALL_TOTAL: IntGaugeVec = register_int_gauge_vec!(
"greptime_mito_write_stall_total",
"mito stalled write request in each worker",
&[WORKER_LABEL]
).unwrap();
pub static ref WRITE_REJECT_TOTAL: IntCounter =
register_int_counter!("greptime_mito_write_reject_total", "mito write reject total").unwrap();
pub static ref WRITE_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_mito_write_stage_elapsed",
"mito write stage elapsed",
&[STAGE_LABEL],
exponential_buckets(0.01, 10.0, 6).unwrap(),
)
.unwrap();
pub static ref WRITE_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_mito_write_rows_total",
"mito write rows total",
&[TYPE_LABEL]
)
.unwrap();
pub static ref COMPACTION_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_mito_compaction_stage_elapsed",
"mito compaction stage elapsed",
&[STAGE_LABEL],
exponential_buckets(1.0, 10.0, 6).unwrap(),
)
.unwrap();
pub static ref COMPACTION_ELAPSED_TOTAL: Histogram =
register_histogram!(
"greptime_mito_compaction_total_elapsed",
"mito compaction total elapsed",
exponential_buckets(1.0, 10.0, 6).unwrap(),
).unwrap();
pub static ref COMPACTION_REQUEST_COUNT: IntCounter =
register_int_counter!("greptime_mito_compaction_requests_total", "mito compaction requests total").unwrap();
pub static ref COMPACTION_FAILURE_COUNT: IntCounter =
register_int_counter!("greptime_mito_compaction_failure_total", "mito compaction failure total").unwrap();
pub static ref INFLIGHT_COMPACTION_COUNT: IntGauge =
register_int_gauge!(
"greptime_mito_inflight_compaction_count",
"inflight compaction count",
).unwrap();
pub static ref READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_mito_read_stage_elapsed",
"mito read stage elapsed",
&[STAGE_LABEL],
exponential_buckets(0.01, 10.0, 7).unwrap(),
)
.unwrap();
pub static ref READ_STAGE_FETCH_PAGES: Histogram = READ_STAGE_ELAPSED.with_label_values(&["fetch_pages"]);
pub static ref IN_PROGRESS_SCAN: IntGaugeVec = register_int_gauge_vec!(
"greptime_mito_in_progress_scan",
"mito in progress scan per partition",
&[TYPE_LABEL, PARTITION_LABEL]
)
.unwrap();
pub static ref READ_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_rows_total", "mito read rows total", &[TYPE_LABEL]).unwrap();
pub static ref MERGE_FILTER_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_merge_filter_rows_total", "mito merge filter rows total", &[TYPE_LABEL]).unwrap();
pub static ref READ_ROW_GROUPS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_row_groups_total", "mito read row groups total", &[TYPE_LABEL]).unwrap();
pub static ref PRECISE_FILTER_ROWS_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_precise_filter_rows_total", "mito precise filter rows total", &[TYPE_LABEL]).unwrap();
pub static ref READ_ROWS_IN_ROW_GROUP_TOTAL: IntCounterVec =
register_int_counter_vec!("greptime_mito_read_rows_in_row_group_total", "mito read rows in row group total", &[TYPE_LABEL]).unwrap();
pub static ref READ_SST_COUNT: Histogram = register_histogram!(
"greptime_mito_read_sst_count",
"Number of SSTs to scan in a scan task",
vec![1.0, 4.0, 8.0, 16.0, 32.0, 64.0, 256.0, 1024.0],
).unwrap();
pub static ref READ_ROWS_RETURN: Histogram = register_histogram!(
"greptime_mito_read_rows_return",
"Number of rows returned in a scan task",
exponential_buckets(100.0, 10.0, 8).unwrap(),
).unwrap();
pub static ref READ_BATCHES_RETURN: Histogram = register_histogram!(
"greptime_mito_read_batches_return",
"Number of rows returned in a scan task",
exponential_buckets(100.0, 10.0, 7).unwrap(),
).unwrap();
pub static ref CACHE_HIT: IntCounterVec = register_int_counter_vec!(
"greptime_mito_cache_hit",
"mito cache hit",
&[TYPE_LABEL]
)
.unwrap();
pub static ref CACHE_MISS: IntCounterVec = register_int_counter_vec!(
"greptime_mito_cache_miss",
"mito cache miss",
&[TYPE_LABEL]
)
.unwrap();
pub static ref CACHE_BYTES: IntGaugeVec = register_int_gauge_vec!(
"greptime_mito_cache_bytes",
"mito cache bytes",
&[TYPE_LABEL]
)
.unwrap();
pub static ref WRITE_CACHE_DOWNLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
"mito_write_cache_download_bytes_total",
"mito write cache download bytes total",
).unwrap();
pub static ref WRITE_CACHE_DOWNLOAD_ELAPSED: HistogramVec = register_histogram_vec!(
"mito_write_cache_download_elapsed",
"mito write cache download elapsed",
&[TYPE_LABEL],
exponential_buckets(0.1, 10.0, 6).unwrap(),
).unwrap();
pub static ref WRITE_CACHE_INFLIGHT_DOWNLOAD: IntGauge = register_int_gauge!(
"mito_write_cache_inflight_download_count",
"mito write cache inflight download tasks",
).unwrap();
pub static ref UPLOAD_BYTES_TOTAL: IntCounter = register_int_counter!(
"mito_upload_bytes_total",
"mito upload bytes total",
)
.unwrap();
pub static ref CACHE_EVICTION: IntCounterVec = register_int_counter_vec!(
"greptime_mito_cache_eviction",
"mito cache eviction",
&[TYPE_LABEL, CACHE_EVICTION_CAUSE]
).unwrap();
pub static ref INDEX_APPLY_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_index_apply_elapsed",
"index apply elapsed",
&[TYPE_LABEL],
exponential_buckets(0.01, 10.0, 6).unwrap(),
)
.unwrap();
pub static ref INDEX_APPLY_MEMORY_USAGE: IntGauge = register_int_gauge!(
"greptime_index_apply_memory_usage",
"index apply memory usage",
)
.unwrap();
pub static ref INDEX_CREATE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_index_create_elapsed",
"index create elapsed",
&[STAGE_LABEL, TYPE_LABEL],
exponential_buckets(0.1, 10.0, 6).unwrap(),
)
.unwrap();
pub static ref INDEX_CREATE_ROWS_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_index_create_rows_total",
"index create rows total",
&[TYPE_LABEL],
)
.unwrap();
pub static ref INDEX_CREATE_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_index_create_bytes_total",
"index create bytes total",
&[TYPE_LABEL],
)
.unwrap();
pub static ref INDEX_CREATE_MEMORY_USAGE: IntGaugeVec = register_int_gauge_vec!(
"greptime_index_create_memory_usage",
"index create memory usage",
&[TYPE_LABEL],
).unwrap();
pub static ref INDEX_IO_BYTES_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_index_io_bytes_total",
"index io bytes total",
&[TYPE_LABEL, FILE_TYPE_LABEL]
)
.unwrap();
pub static ref INDEX_PUFFIN_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["read", "puffin"]);
pub static ref INDEX_PUFFIN_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["write", "puffin"]);
pub static ref INDEX_INTERMEDIATE_READ_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["read", "intermediate"]);
pub static ref INDEX_INTERMEDIATE_WRITE_BYTES_TOTAL: IntCounter = INDEX_IO_BYTES_TOTAL
.with_label_values(&["write", "intermediate"]);
pub static ref INDEX_IO_OP_TOTAL: IntCounterVec = register_int_counter_vec!(
"greptime_index_io_op_total",
"index io op total",
&[TYPE_LABEL, FILE_TYPE_LABEL]
)
.unwrap();
pub static ref INDEX_PUFFIN_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["read", "puffin"]);
pub static ref INDEX_PUFFIN_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["seek", "puffin"]);
pub static ref INDEX_PUFFIN_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["write", "puffin"]);
pub static ref INDEX_PUFFIN_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "puffin"]);
pub static ref INDEX_INTERMEDIATE_READ_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["read", "intermediate"]);
pub static ref INDEX_INTERMEDIATE_SEEK_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["seek", "intermediate"]);
pub static ref INDEX_INTERMEDIATE_WRITE_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["write", "intermediate"]);
pub static ref INDEX_INTERMEDIATE_FLUSH_OP_TOTAL: IntCounter = INDEX_IO_OP_TOTAL
.with_label_values(&["flush", "intermediate"]);
pub static ref PARTITION_TREE_DATA_BUFFER_FREEZE_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_partition_tree_buffer_freeze_stage_elapsed",
"mito partition tree data buffer freeze stage elapsed",
&[STAGE_LABEL],
exponential_buckets(0.01, 10.0, 6).unwrap(),
)
.unwrap();
pub static ref PARTITION_TREE_READ_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_partition_tree_read_stage_elapsed",
"mito partition tree read stage elapsed",
&[STAGE_LABEL],
exponential_buckets(0.01, 10.0, 6).unwrap(),
)
.unwrap();
pub static ref MANIFEST_OP_ELAPSED: HistogramVec = register_histogram_vec!(
"greptime_manifest_op_elapsed",
"mito manifest operation elapsed",
&["op"],
exponential_buckets(0.01, 10.0, 6).unwrap(),
).unwrap();
}
pub struct StagerMetrics {
cache_hit: IntCounter,
cache_miss: IntCounter,
staging_cache_bytes: IntGauge,
recycle_cache_bytes: IntGauge,
cache_eviction: IntCounter,
staging_miss_read: Histogram,
}
impl StagerMetrics {
pub fn new() -> Self {
Self {
cache_hit: CACHE_HIT.with_label_values(&[STAGING_TYPE]),
cache_miss: CACHE_MISS.with_label_values(&[STAGING_TYPE]),
staging_cache_bytes: CACHE_BYTES.with_label_values(&[STAGING_TYPE]),
recycle_cache_bytes: CACHE_BYTES.with_label_values(&[RECYCLE_TYPE]),
cache_eviction: CACHE_EVICTION.with_label_values(&[STAGING_TYPE, "size"]),
staging_miss_read: READ_STAGE_ELAPSED.with_label_values(&["staging_miss_read"]),
}
}
}
impl Default for StagerMetrics {
fn default() -> Self {
Self::new()
}
}
impl StagerNotifier for StagerMetrics {
fn on_cache_hit(&self, _size: u64) {
self.cache_hit.inc();
}
fn on_cache_miss(&self, _size: u64) {
self.cache_miss.inc();
}
fn on_cache_insert(&self, size: u64) {
self.staging_cache_bytes.add(size as i64);
}
fn on_load_dir(&self, duration: Duration) {
self.staging_miss_read.observe(duration.as_secs_f64());
}
fn on_load_blob(&self, duration: Duration) {
self.staging_miss_read.observe(duration.as_secs_f64());
}
fn on_cache_evict(&self, size: u64) {
self.cache_eviction.inc();
self.staging_cache_bytes.sub(size as i64);
}
fn on_recycle_insert(&self, size: u64) {
self.recycle_cache_bytes.add(size as i64);
}
fn on_recycle_clear(&self, size: u64) {
self.recycle_cache_bytes.sub(size as i64);
}
}