1use std::collections::HashMap;
16use std::sync::Arc;
17
18use common_plugins::GREPTIME_EXEC_PREFIX;
19use datafusion::physical_plan::metrics::MetricValue;
20use datafusion::physical_plan::ExecutionPlan;
21use headers::{Header, HeaderName, HeaderValue};
22use hyper::HeaderMap;
23use serde_json::Value;
24
25pub mod constants {
26 pub const GREPTIME_DB_HEADER_FORMAT: &str = "x-greptime-format";
42 pub const GREPTIME_DB_HEADER_TIMEOUT: &str = "x-greptime-timeout";
43 pub const GREPTIME_DB_HEADER_EXECUTION_TIME: &str = "x-greptime-execution-time";
44 pub const GREPTIME_DB_HEADER_METRICS: &str = "x-greptime-metrics";
45 pub const GREPTIME_DB_HEADER_NAME: &str = "x-greptime-db-name";
46 pub const GREPTIME_DB_HEADER_READ_PREFERENCE: &str = "x-greptime-read-preference";
47 pub const GREPTIME_TIMEZONE_HEADER_NAME: &str = "x-greptime-timezone";
48 pub const GREPTIME_DB_HEADER_ERROR_CODE: &str = common_error::GREPTIME_DB_HEADER_ERROR_CODE;
49
50 pub const GREPTIME_LOG_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-log-pipeline-name";
52 pub const GREPTIME_LOG_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-log-pipeline-version";
53
54 pub const GREPTIME_PIPELINE_NAME_HEADER_NAME: &str = "x-greptime-pipeline-name";
56 pub const GREPTIME_PIPELINE_VERSION_HEADER_NAME: &str = "x-greptime-pipeline-version";
57
58 pub const GREPTIME_LOG_TABLE_NAME_HEADER_NAME: &str = "x-greptime-log-table-name";
59 pub const GREPTIME_LOG_EXTRACT_KEYS_HEADER_NAME: &str = "x-greptime-log-extract-keys";
60 pub const GREPTIME_TRACE_TABLE_NAME_HEADER_NAME: &str = "x-greptime-trace-table-name";
61
62 pub const GREPTIME_OTLP_METRIC_PROMOTE_ALL_RESOURCE_ATTRS_HEADER_NAME: &str =
64 "x-greptime-otlp-metric-promote-all-resource-attrs";
65 pub const GREPTIME_OTLP_METRIC_PROMOTE_RESOURCE_ATTRS_HEADER_NAME: &str =
66 "x-greptime-otlp-metric-promote-resource-attrs";
67 pub const GREPTIME_OTLP_METRIC_IGNORE_RESOURCE_ATTRS_HEADER_NAME: &str =
68 "x-greptime-otlp-metric-ignore-resource-attrs";
69 pub const GREPTIME_OTLP_METRIC_PROMOTE_SCOPE_ATTRS_HEADER_NAME: &str =
70 "x-greptime-otlp-metric-promote-scope-attrs";
71
72 pub const GREPTIME_PIPELINE_PARAMS_HEADER: &str = "x-greptime-pipeline-params";
74}
75
76pub static GREPTIME_DB_HEADER_FORMAT: HeaderName =
77 HeaderName::from_static(constants::GREPTIME_DB_HEADER_FORMAT);
78pub static GREPTIME_DB_HEADER_EXECUTION_TIME: HeaderName =
79 HeaderName::from_static(constants::GREPTIME_DB_HEADER_EXECUTION_TIME);
80pub static GREPTIME_DB_HEADER_METRICS: HeaderName =
81 HeaderName::from_static(constants::GREPTIME_DB_HEADER_METRICS);
82
83pub static GREPTIME_DB_HEADER_NAME: HeaderName =
85 HeaderName::from_static(constants::GREPTIME_DB_HEADER_NAME);
86
87pub static GREPTIME_TIMEZONE_HEADER_NAME: HeaderName =
89 HeaderName::from_static(constants::GREPTIME_TIMEZONE_HEADER_NAME);
90
91pub static GREPTIME_DB_HEADER_READ_PREFERENCE: HeaderName =
93 HeaderName::from_static(constants::GREPTIME_DB_HEADER_READ_PREFERENCE);
94
95pub static CONTENT_TYPE_PROTOBUF_STR: &str = "application/x-protobuf";
96pub static CONTENT_TYPE_PROTOBUF: HeaderValue = HeaderValue::from_static(CONTENT_TYPE_PROTOBUF_STR);
97pub static CONTENT_ENCODING_SNAPPY: HeaderValue = HeaderValue::from_static("snappy");
98
99pub static CONTENT_TYPE_NDJSON_STR: &str = "application/x-ndjson";
100pub static CONTENT_TYPE_NDJSON_SUBTYPE_STR: &str = "x-ndjson";
101
102pub struct GreptimeDbName(Option<String>);
103
104impl Header for GreptimeDbName {
105 fn name() -> &'static HeaderName {
106 &GREPTIME_DB_HEADER_NAME
107 }
108
109 fn decode<'i, I>(values: &mut I) -> Result<Self, headers::Error>
110 where
111 Self: Sized,
112 I: Iterator<Item = &'i HeaderValue>,
113 {
114 if let Some(value) = values.next() {
115 let str_value = value.to_str().map_err(|_| headers::Error::invalid())?;
116 Ok(Self(Some(str_value.to_owned())))
117 } else {
118 Ok(Self(None))
119 }
120 }
121
122 fn encode<E: Extend<HeaderValue>>(&self, values: &mut E) {
123 if let Some(name) = &self.0 {
124 if let Ok(value) = HeaderValue::from_str(name) {
125 values.extend(std::iter::once(value));
126 }
127 }
128 }
129}
130
131impl GreptimeDbName {
132 pub fn value(&self) -> Option<&String> {
133 self.0.as_ref()
134 }
135}
136
137pub fn write_cost_header_map(cost: usize) -> HeaderMap {
139 let mut header_map = HeaderMap::new();
140 if cost > 0 {
141 let mut map: HashMap<String, Value> = HashMap::new();
142 map.insert(
143 common_plugins::GREPTIME_EXEC_WRITE_COST.to_string(),
144 Value::from(cost),
145 );
146 let _ = serde_json::to_string(&map)
147 .ok()
148 .and_then(|s| HeaderValue::from_str(&s).ok())
149 .and_then(|v| header_map.insert(&GREPTIME_DB_HEADER_METRICS, v));
150 }
151 header_map
152}
153
154fn collect_into_maps(name: &str, value: u64, maps: &mut [&mut HashMap<String, u64>]) {
155 if name.starts_with(GREPTIME_EXEC_PREFIX) && value > 0 {
156 maps.iter_mut().for_each(|map| {
157 map.entry(name.to_string())
158 .and_modify(|v| *v += value)
159 .or_insert(value);
160 });
161 }
162}
163
164pub fn collect_plan_metrics(plan: &Arc<dyn ExecutionPlan>, maps: &mut [&mut HashMap<String, u64>]) {
165 if let Some(m) = plan.metrics() {
166 m.iter().for_each(|m| match m.value() {
167 MetricValue::Count { name, count } => {
168 collect_into_maps(name, count.value() as u64, maps);
169 }
170 MetricValue::Gauge { name, gauge } => {
171 collect_into_maps(name, gauge.value() as u64, maps);
172 }
173 MetricValue::Time { name, time } => {
174 if name.starts_with(GREPTIME_EXEC_PREFIX) {
175 maps.iter_mut().for_each(|map| {
177 map.insert(name.to_string(), time.value() as u64);
178 });
179 }
180 }
181 _ => {}
182 });
183 }
184
185 for c in plan.children() {
186 collect_plan_metrics(c, maps);
187 }
188}