1use std::collections::BTreeMap;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use common_recordbatch::adapter::{RecordBatchMetrics, RegionWatermarkEntry};
21use common_recordbatch::{OrderOption, RecordBatch, RecordBatchStream, SendableRecordBatchStream};
22use common_telemetry::warn;
23use datafusion::physical_plan::ExecutionPlan;
24use datatypes::schema::SchemaRef;
25use futures::Stream;
26use futures_util::ready;
27use lazy_static::lazy_static;
28use prometheus::*;
29
30use crate::dist_plan::MergeScanExec;
31
32enum MergeState {
35 Participated,
38 Unproved,
41 Proved(u64),
43 Conflict {
46 watermarks: Vec<u64>,
48 },
49}
50
51lazy_static! {
52 pub static ref QUERY_STAGE_ELAPSED: HistogramVec = register_histogram_vec!(
54 "greptime_query_stage_elapsed",
55 "query engine time elapsed during each stage",
56 &["stage"],
57 vec![0.005, 0.01, 0.05, 0.1, 0.5, 1.0, 5.0, 10.0, 60.0, 300.0]
58 )
59 .unwrap();
60 pub static ref PARSE_SQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
61 .with_label_values(&["parse_sql"]);
62 pub static ref PARSE_PROMQL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
63 .with_label_values(&["parse_promql"]);
64 pub static ref OPTIMIZE_LOGICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
65 .with_label_values(&["optimize_logicalplan"]);
66 pub static ref OPTIMIZE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
67 .with_label_values(&["optimize_physicalplan"]);
68 pub static ref CREATE_PHYSICAL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
69 .with_label_values(&["create_physicalplan"]);
70 pub static ref EXEC_PLAN_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
71 .with_label_values(&["execute_plan"]);
72 pub static ref MERGE_SCAN_POLL_ELAPSED: Histogram = QUERY_STAGE_ELAPSED
73 .with_label_values(&["merge_scan_poll"]);
74
75 pub static ref MERGE_SCAN_REGIONS: Histogram = register_histogram!(
76 "greptime_query_merge_scan_regions",
77 "query merge scan regions"
78 )
79 .unwrap();
80 pub static ref MERGE_SCAN_ERRORS_TOTAL: IntCounter = register_int_counter!(
81 "greptime_query_merge_scan_errors_total",
82 "query merge scan errors total"
83 )
84 .unwrap();
85 pub static ref PUSH_DOWN_FALLBACK_ERRORS_TOTAL: IntCounter = register_int_counter!(
86 "greptime_push_down_fallback_errors_total",
87 "query push down fallback errors total"
88 )
89 .unwrap();
90
91 pub static ref QUERY_MEMORY_POOL_USAGE_BYTES: IntGauge = register_int_gauge!(
92 "greptime_query_memory_pool_usage_bytes",
93 "current query memory pool usage in bytes"
94 )
95 .unwrap();
96
97 pub static ref QUERY_MEMORY_POOL_REJECTED_TOTAL: IntCounter = register_int_counter!(
98 "greptime_query_memory_pool_rejected_total",
99 "total number of query memory allocations rejected"
100 )
101 .unwrap();
102}
103
104pub struct OnDone<F> {
106 stream: SendableRecordBatchStream,
107 callback: Option<F>,
108}
109
110impl<F> OnDone<F> {
111 pub fn new(stream: SendableRecordBatchStream, callback: F) -> Self {
113 Self {
114 stream,
115 callback: Some(callback),
116 }
117 }
118}
119
120impl<F: FnOnce() + Unpin> RecordBatchStream for OnDone<F> {
121 fn name(&self) -> &str {
122 self.stream.name()
123 }
124
125 fn schema(&self) -> SchemaRef {
126 self.stream.schema()
127 }
128
129 fn output_ordering(&self) -> Option<&[OrderOption]> {
130 self.stream.output_ordering()
131 }
132
133 fn metrics(&self) -> Option<RecordBatchMetrics> {
134 self.stream.metrics()
135 }
136}
137
138impl<F: FnOnce() + Unpin> Stream for OnDone<F> {
139 type Item = common_recordbatch::error::Result<RecordBatch>;
140
141 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
142 match ready!(Pin::new(&mut self.stream).poll_next(cx)) {
143 Some(rb) => Poll::Ready(Some(rb)),
144 None => {
145 if let Some(callback) = self.callback.take() {
146 callback();
147 }
148 Poll::Ready(None)
149 }
150 }
151 }
152
153 fn size_hint(&self) -> (usize, Option<usize>) {
154 self.stream.size_hint()
155 }
156}
157
158pub struct RegionWatermarkMetricsStream {
159 stream: SendableRecordBatchStream,
160 plan: Arc<dyn ExecutionPlan>,
161}
162
163impl RegionWatermarkMetricsStream {
164 pub fn new(stream: SendableRecordBatchStream, plan: Arc<dyn ExecutionPlan>) -> Self {
165 Self { stream, plan }
166 }
167}
168
169impl RecordBatchStream for RegionWatermarkMetricsStream {
170 fn name(&self) -> &str {
171 self.stream.name()
172 }
173
174 fn schema(&self) -> SchemaRef {
175 self.stream.schema()
176 }
177
178 fn output_ordering(&self) -> Option<&[OrderOption]> {
179 self.stream.output_ordering()
180 }
181
182 fn metrics(&self) -> Option<RecordBatchMetrics> {
183 let mut metrics = self.stream.metrics()?;
184 let region_watermarks = collect_region_watermarks(self.plan.clone());
185 if !region_watermarks.is_empty() {
186 metrics.region_watermarks = region_watermarks;
187 }
188 Some(metrics)
189 }
190}
191
192impl Stream for RegionWatermarkMetricsStream {
193 type Item = common_recordbatch::error::Result<RecordBatch>;
194
195 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
196 Pin::new(&mut self.stream).poll_next(cx)
197 }
198
199 fn size_hint(&self) -> (usize, Option<usize>) {
200 self.stream.size_hint()
201 }
202}
203
204pub fn terminal_recordbatch_metrics_from_plan(
205 plan: Arc<dyn ExecutionPlan>,
206) -> Option<RecordBatchMetrics> {
207 let region_watermarks = collect_region_watermarks(plan);
208 if region_watermarks.is_empty() {
209 None
210 } else {
211 Some(RecordBatchMetrics {
212 region_watermarks,
213 ..Default::default()
214 })
215 }
216}
217
218fn collect_region_watermarks(plan: Arc<dyn ExecutionPlan>) -> Vec<RegionWatermarkEntry> {
219 let mut merged = BTreeMap::<u64, MergeState>::new();
220 let mut stack = vec![plan];
221
222 while let Some(plan) = stack.pop() {
223 if let Some(merge_scan) = plan.as_any().downcast_ref::<MergeScanExec>() {
224 merge_merge_scan_region_watermarks(
225 &mut merged,
226 merge_scan
227 .regions()
228 .iter()
229 .map(|region_id| region_id.as_u64()),
230 merge_scan.sub_stage_metrics(),
231 );
232 }
233
234 stack.extend(plan.children().into_iter().cloned());
235 }
236
237 finalize_region_watermarks(merged)
238}
239
240fn merge_merge_scan_region_watermarks(
241 merged: &mut BTreeMap<u64, MergeState>,
242 regions: impl IntoIterator<Item = u64>,
243 sub_stage_metrics: impl IntoIterator<Item = RecordBatchMetrics>,
244) {
245 for region_id in regions {
246 merged.entry(region_id).or_insert(MergeState::Participated);
247 }
248
249 for metrics in sub_stage_metrics {
250 for entry in metrics.region_watermarks {
251 merged
252 .entry(entry.region_id)
253 .and_modify(|existing| match entry.watermark {
254 None => match existing {
255 MergeState::Participated | MergeState::Proved(_) => {
256 *existing = MergeState::Unproved;
257 }
258 MergeState::Unproved | MergeState::Conflict { .. } => {}
259 },
260 Some(seq) => match existing {
261 MergeState::Participated => {
262 *existing = MergeState::Proved(seq);
263 }
264 MergeState::Unproved => {}
265 MergeState::Proved(existing_seq) if *existing_seq == seq => {}
266 MergeState::Proved(existing_seq) => {
267 let old_seq = *existing_seq;
268 *existing = MergeState::Conflict {
269 watermarks: vec![old_seq, seq],
270 };
271 }
272 MergeState::Conflict { watermarks } => {
273 if !watermarks.contains(&seq) {
274 watermarks.push(seq);
275 }
276 }
277 },
278 })
279 .or_insert(match entry.watermark {
280 Some(seq) => MergeState::Proved(seq),
281 None => MergeState::Unproved,
282 });
283 }
284 }
285}
286
287fn finalize_region_watermarks(merged: BTreeMap<u64, MergeState>) -> Vec<RegionWatermarkEntry> {
288 merged
289 .into_iter()
290 .map(|(region_id, state)| RegionWatermarkEntry {
291 region_id,
292 watermark: match state {
293 MergeState::Participated => None,
294 MergeState::Unproved => None,
295 MergeState::Proved(seq) => Some(seq),
296 MergeState::Conflict { watermarks } => {
297 warn!(
298 "Conflicting proved watermarks for region {}: {:?}; degrading to unproved",
299 region_id, watermarks
300 );
301 None
302 }
303 },
304 })
305 .collect()
306}
307
308#[cfg(test)]
309mod tests {
310 use datafusion::arrow::datatypes::Schema as ArrowSchema;
311 use datafusion::physical_plan::empty::EmptyExec;
312
313 use super::*;
314
315 fn metrics_with_region_watermarks(entries: &[(u64, Option<u64>)]) -> RecordBatchMetrics {
316 RecordBatchMetrics {
317 region_watermarks: entries
318 .iter()
319 .map(|(region_id, watermark)| RegionWatermarkEntry {
320 region_id: *region_id,
321 watermark: *watermark,
322 })
323 .collect(),
324 ..Default::default()
325 }
326 }
327
328 #[test]
329 fn terminal_metrics_returns_none_without_merge_scan() {
330 let plan: Arc<dyn ExecutionPlan> = Arc::new(EmptyExec::new(Arc::new(ArrowSchema::empty())));
331 assert!(terminal_recordbatch_metrics_from_plan(plan).is_none());
332 }
333
334 #[test]
335 fn merge_merge_scan_region_watermarks_marks_missing_watermarks_unproved() {
336 let mut merged = BTreeMap::new();
337
338 merge_merge_scan_region_watermarks(&mut merged, [1, 2], std::iter::empty());
339
340 assert_eq!(
341 finalize_region_watermarks(merged),
342 vec![
343 RegionWatermarkEntry {
344 region_id: 1,
345 watermark: None,
346 },
347 RegionWatermarkEntry {
348 region_id: 2,
349 watermark: None,
350 },
351 ]
352 );
353 }
354
355 #[test]
356 fn merge_merge_scan_region_watermarks_keeps_matching_proved_values() {
357 let mut merged = BTreeMap::new();
358
359 merge_merge_scan_region_watermarks(
360 &mut merged,
361 [42],
362 [
363 metrics_with_region_watermarks(&[(42, Some(7))]),
364 metrics_with_region_watermarks(&[(42, Some(7))]),
365 ],
366 );
367
368 assert_eq!(
369 finalize_region_watermarks(merged),
370 vec![RegionWatermarkEntry {
371 region_id: 42,
372 watermark: Some(7),
373 }]
374 );
375 }
376
377 #[test]
378 fn merge_merge_scan_region_watermarks_degrades_conflicting_proved_values() {
379 let mut merged = BTreeMap::new();
380
381 merge_merge_scan_region_watermarks(
382 &mut merged,
383 [7],
384 [
385 metrics_with_region_watermarks(&[(7, Some(11))]),
386 metrics_with_region_watermarks(&[(7, Some(13))]),
387 ],
388 );
389
390 assert_eq!(
391 finalize_region_watermarks(merged),
392 vec![RegionWatermarkEntry {
393 region_id: 7,
394 watermark: None,
395 }]
396 );
397 }
398
399 #[test]
400 fn merge_merge_scan_region_watermarks_none_vetoes_proved_value() {
401 let mut merged = BTreeMap::new();
402
403 merge_merge_scan_region_watermarks(
404 &mut merged,
405 [9],
406 [
407 metrics_with_region_watermarks(&[(9, Some(21))]),
408 metrics_with_region_watermarks(&[(9, None)]),
409 ],
410 );
411
412 assert_eq!(
413 finalize_region_watermarks(merged),
414 vec![RegionWatermarkEntry {
415 region_id: 9,
416 watermark: None,
417 }]
418 );
419 }
420
421 #[test]
422 fn merge_merge_scan_region_watermarks_none_vetoes_proved_value_regardless_of_order() {
423 let mut merged = BTreeMap::new();
424
425 merge_merge_scan_region_watermarks(
426 &mut merged,
427 [9],
428 [
429 metrics_with_region_watermarks(&[(9, None)]),
430 metrics_with_region_watermarks(&[(9, Some(21))]),
431 ],
432 );
433
434 assert_eq!(
435 finalize_region_watermarks(merged),
436 vec![RegionWatermarkEntry {
437 region_id: 9,
438 watermark: None,
439 }]
440 );
441 }
442}