1use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_plugins::GREPTIME_EXEC_READ_COST;
24use common_query::request::QueryRequest;
25use common_recordbatch::adapter::RecordBatchMetrics;
26use common_telemetry::tracing_context::TracingContext;
27use datafusion::execution::{SessionState, TaskContext};
28use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
29use datafusion::physical_plan::metrics::{
30 Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
31};
32use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
33use datafusion::physical_plan::{
34 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
35 SendableRecordBatchStream,
36};
37use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
38use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
39use datafusion_physical_expr::expressions::Column;
40use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
41use futures_util::StreamExt;
42use greptime_proto::v1::region::RegionRequestHeader;
43use meter_core::data::ReadItem;
44use meter_macros::read_meter;
45use session::context::QueryContextRef;
46use store_api::storage::RegionId;
47use table::table_name::TableName;
48use tokio::time::Instant;
49use tracing::{Instrument, Span};
50
51use crate::dist_plan::analyzer::AliasMapping;
52use crate::dist_plan::analyzer::utils::patch_batch_timezone;
53use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
54use crate::region_query::RegionQueryHandlerRef;
55
56#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
57pub struct MergeScanLogicalPlan {
58 input: LogicalPlan,
60 is_placeholder: bool,
62 partition_cols: AliasMapping,
63}
64
65impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
66 fn name(&self) -> &str {
67 Self::name()
68 }
69
70 fn inputs(&self) -> Vec<&LogicalPlan> {
73 vec![]
74 }
75
76 fn schema(&self) -> &datafusion_common::DFSchemaRef {
77 self.input.schema()
78 }
79
80 fn expressions(&self) -> Vec<datafusion_expr::Expr> {
82 vec![]
83 }
84
85 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
86 write!(
87 f,
88 "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
89 self.is_placeholder, self.input
90 )
91 }
92
93 fn with_exprs_and_inputs(
94 &self,
95 _exprs: Vec<datafusion::prelude::Expr>,
96 _inputs: Vec<LogicalPlan>,
97 ) -> Result<Self> {
98 Ok(self.clone())
99 }
100}
101
102impl MergeScanLogicalPlan {
103 pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
104 Self {
105 input,
106 is_placeholder,
107 partition_cols,
108 }
109 }
110
111 pub fn name() -> &'static str {
112 "MergeScan"
113 }
114
115 pub fn into_logical_plan(self) -> LogicalPlan {
117 LogicalPlan::Extension(Extension {
118 node: Arc::new(self),
119 })
120 }
121
122 pub fn is_placeholder(&self) -> bool {
123 self.is_placeholder
124 }
125
126 pub fn input(&self) -> &LogicalPlan {
127 &self.input
128 }
129
130 pub fn partition_cols(&self) -> &AliasMapping {
131 &self.partition_cols
132 }
133}
134
135pub struct MergeScanExec {
136 table: TableName,
137 regions: Vec<RegionId>,
138 plan: LogicalPlan,
139 arrow_schema: ArrowSchemaRef,
140 region_query_handler: RegionQueryHandlerRef,
141 metric: ExecutionPlanMetricsSet,
142 properties: PlanProperties,
143 sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
145 partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
147 query_ctx: QueryContextRef,
148 target_partition: usize,
149 partition_cols: AliasMapping,
150}
151
152impl std::fmt::Debug for MergeScanExec {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct("MergeScanExec")
155 .field("table", &self.table)
156 .field("regions", &self.regions)
157 .field("plan", &self.plan)
158 .finish()
159 }
160}
161
162impl MergeScanExec {
163 #[allow(clippy::too_many_arguments)]
164 pub fn new(
165 session_state: &SessionState,
166 table: TableName,
167 regions: Vec<RegionId>,
168 plan: LogicalPlan,
169 arrow_schema: &ArrowSchema,
170 region_query_handler: RegionQueryHandlerRef,
171 query_ctx: QueryContextRef,
172 target_partition: usize,
173 partition_cols: AliasMapping,
174 ) -> Result<Self> {
175 let arrow_schema = Arc::new(arrow_schema.clone());
179
180 let eq_properties = if let LogicalPlan::Sort(sort) = &plan
188 && target_partition >= regions.len()
189 {
190 let lex_ordering = sort
191 .expr
192 .iter()
193 .map(|sort_expr| {
194 let physical_expr = session_state
195 .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
196 Ok(PhysicalSortExpr::new(
197 physical_expr,
198 SortOptions {
199 descending: !sort_expr.asc,
200 nulls_first: sort_expr.nulls_first,
201 },
202 ))
203 })
204 .collect::<Result<Vec<_>>>()?;
205 EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
206 } else {
207 EquivalenceProperties::new(arrow_schema.clone())
208 };
209
210 let partition_exprs = partition_cols
211 .iter()
212 .filter_map(|col| {
213 if let Some(first_alias) = col.1.first() {
214 session_state
215 .create_physical_expr(
216 Expr::Column(ColumnExpr::new_unqualified(
217 first_alias.name().to_string(),
218 )),
219 plan.schema(),
220 )
221 .ok()
222 } else {
223 None
224 }
225 })
226 .collect();
227 let partitioning = Partitioning::Hash(partition_exprs, target_partition);
228
229 let properties = PlanProperties::new(
230 eq_properties,
231 partitioning,
232 EmissionType::Incremental,
233 Boundedness::Bounded,
234 );
235 Ok(Self {
236 table,
237 regions,
238 plan,
239 arrow_schema,
240 region_query_handler,
241 metric: ExecutionPlanMetricsSet::new(),
242 sub_stage_metrics: Arc::default(),
243 partition_metrics: Arc::default(),
244 properties,
245 query_ctx,
246 target_partition,
247 partition_cols,
248 })
249 }
250
251 pub fn to_stream(
252 &self,
253 context: Arc<TaskContext>,
254 partition: usize,
255 ) -> Result<SendableRecordBatchStream> {
256 let regions = self.regions.clone();
258 let region_query_handler = self.region_query_handler.clone();
259 let metric = MergeScanMetric::new(&self.metric);
260 let arrow_schema = self.arrow_schema.clone();
261 let query_ctx = self.query_ctx.clone();
262 let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
263 let partition_metrics_moved = self.partition_metrics.clone();
264 let plan = self.plan.clone();
265 let target_partition = self.target_partition;
266 let dbname = context.task_id().unwrap_or_default();
267 let tracing_context = TracingContext::from_json(context.session_id().as_str());
268 let current_channel = self.query_ctx.channel();
269 let read_preference = self.query_ctx.read_preference();
270 let explain_verbose = self.query_ctx.explain_verbose();
271
272 let stream = Box::pin(stream!({
273 if partition == 0 {
275 MERGE_SCAN_REGIONS.observe(regions.len() as f64);
276 }
277
278 let _finish_timer = metric.finish_time().timer();
279 let mut ready_timer = metric.ready_time().timer();
280 let mut first_consume_timer = Some(metric.first_consume_time().timer());
281
282 for region_id in regions
283 .iter()
284 .skip(partition)
285 .step_by(target_partition)
286 .copied()
287 {
288 let region_span = tracing_context.attach(tracing::info_span!(
289 parent: &Span::current(),
290 "merge_scan_region",
291 region_id = %region_id,
292 partition = partition
293 ));
294 let request = QueryRequest {
295 header: Some(RegionRequestHeader {
296 tracing_context: tracing_context.to_w3c(),
297 dbname: dbname.clone(),
298 query_context: Some(query_ctx.as_ref().into()),
299 }),
300 region_id,
301 plan: plan.clone(),
302 };
303 let region_start = Instant::now();
304 let do_get_start = Instant::now();
305
306 if explain_verbose {
307 common_telemetry::info!(
308 "Merge scan one region, partition: {}, region_id: {}",
309 partition,
310 region_id
311 );
312 }
313
314 let mut stream = region_query_handler
315 .do_get(read_preference, request)
316 .instrument(region_span.clone())
317 .await
318 .map_err(|e| {
319 MERGE_SCAN_ERRORS_TOTAL.inc();
320 DataFusionError::External(Box::new(e))
321 })?;
322 let do_get_cost = do_get_start.elapsed();
323
324 ready_timer.stop();
325
326 let mut poll_duration = Duration::ZERO;
327 let mut poll_timer = Instant::now();
328 while let Some(batch) = stream.next().instrument(region_span.clone()).await {
329 let poll_elapsed = poll_timer.elapsed();
330 poll_duration += poll_elapsed;
331
332 let batch = batch.map_err(|e| DataFusionError::External(Box::new(e)))?;
333 let batch = patch_batch_timezone(
334 arrow_schema.clone(),
335 batch.into_df_record_batch().columns().to_vec(),
336 )?;
337 metric.record_output_batch_rows(batch.num_rows());
338 if let Some(mut first_consume_timer) = first_consume_timer.take() {
339 first_consume_timer.stop();
340 }
341
342 if let Some(metrics) = stream.metrics() {
343 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
344 sub_stage_metrics.insert(region_id, metrics);
345 }
346
347 yield Ok(batch);
348 poll_timer = Instant::now();
350 }
351 let total_cost = region_start.elapsed();
352
353 let region_metrics = RegionMetrics {
355 region_id,
356 poll_duration,
357 do_get_cost,
358 total_cost,
359 };
360
361 {
363 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
364 let partition_metrics = partition_metrics_guard
365 .entry(partition)
366 .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
367 partition_metrics.add_region_metrics(region_metrics);
368 }
369
370 if explain_verbose {
371 common_telemetry::info!(
372 "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
373 partition,
374 region_id,
375 poll_duration,
376 metric.first_consume_time(),
377 do_get_cost
378 );
379 }
380
381 if let Some(metrics) = stream.metrics() {
383 let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
384 let value = read_meter!(
385 c,
386 s,
387 ReadItem {
388 cpu_time: metrics.elapsed_compute as u64,
389 table_scan: metrics.memory_usage as u64
390 },
391 current_channel as u8
392 );
393 metric.record_greptime_exec_cost(value as usize);
394
395 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
397 sub_stage_metrics.insert(region_id, metrics);
398 }
399
400 MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
401 }
402
403 {
405 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
406 if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
407 partition_metrics.finish();
408 }
409 }
410 }));
411
412 Ok(Box::pin(RecordBatchStreamAdapter::new(
413 self.arrow_schema.clone(),
414 stream,
415 )))
416 }
417
418 pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
419 let Distribution::HashPartitioned(hash_exprs) = distribution else {
420 return None;
422 };
423
424 if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
425 && curr_dist == &hash_exprs
426 {
427 return None;
429 }
430
431 let all_partition_col_aliases: HashSet<_> = self
432 .partition_cols
433 .values()
434 .flat_map(|aliases| aliases.iter().map(|c| c.name()))
435 .collect();
436 let mut overlaps = vec![];
437 for expr in &hash_exprs {
438 if let Some(col_expr) = expr.as_any().downcast_ref::<Column>()
439 && all_partition_col_aliases.contains(col_expr.name())
440 {
441 overlaps.push(expr.clone());
442 }
443 }
444
445 if overlaps.is_empty() {
446 return None;
447 }
448
449 Some(Self {
450 table: self.table.clone(),
451 regions: self.regions.clone(),
452 plan: self.plan.clone(),
453 arrow_schema: self.arrow_schema.clone(),
454 region_query_handler: self.region_query_handler.clone(),
455 metric: self.metric.clone(),
456 properties: PlanProperties::new(
457 self.properties.eq_properties.clone(),
458 Partitioning::Hash(overlaps, self.target_partition),
459 self.properties.emission_type,
460 self.properties.boundedness,
461 ),
462 sub_stage_metrics: self.sub_stage_metrics.clone(),
463 partition_metrics: self.partition_metrics.clone(),
464 query_ctx: self.query_ctx.clone(),
465 target_partition: self.target_partition,
466 partition_cols: self.partition_cols.clone(),
467 })
468 }
469
470 pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
471 self.sub_stage_metrics
472 .lock()
473 .unwrap()
474 .values()
475 .cloned()
476 .collect()
477 }
478
479 pub fn partition_count(&self) -> usize {
480 self.target_partition
481 }
482
483 pub fn region_count(&self) -> usize {
484 self.regions.len()
485 }
486
487 fn partition_metrics(&self) -> Vec<PartitionMetrics> {
488 self.partition_metrics
489 .lock()
490 .unwrap()
491 .values()
492 .cloned()
493 .collect()
494 }
495}
496
497#[derive(Debug, Clone)]
499struct RegionMetrics {
500 region_id: RegionId,
501 poll_duration: Duration,
502 do_get_cost: Duration,
503 total_cost: Duration,
505}
506
507#[derive(Debug, Clone)]
509struct PartitionMetrics {
510 partition: usize,
511 region_metrics: Vec<RegionMetrics>,
512 total_poll_duration: Duration,
513 total_do_get_cost: Duration,
514 total_regions: usize,
515 explain_verbose: bool,
516 finished: bool,
517}
518
519impl PartitionMetrics {
520 fn new(partition: usize, explain_verbose: bool) -> Self {
521 Self {
522 partition,
523 region_metrics: Vec::new(),
524 total_poll_duration: Duration::ZERO,
525 total_do_get_cost: Duration::ZERO,
526 total_regions: 0,
527 explain_verbose,
528 finished: false,
529 }
530 }
531
532 fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
533 self.total_poll_duration += region_metrics.poll_duration;
534 self.total_do_get_cost += region_metrics.do_get_cost;
535 self.total_regions += 1;
536 self.region_metrics.push(region_metrics);
537 }
538
539 fn finish(&mut self) {
541 if self.finished {
542 return;
543 }
544 self.finished = true;
545 self.log_metrics();
546 }
547
548 fn log_metrics(&self) {
550 if self.explain_verbose {
551 common_telemetry::info!(
552 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
553 self.partition,
554 self.total_regions,
555 self.total_poll_duration,
556 self.total_do_get_cost
557 );
558 } else {
559 common_telemetry::debug!(
560 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
561 self.partition,
562 self.total_regions,
563 self.total_poll_duration,
564 self.total_do_get_cost
565 );
566 }
567 }
568}
569
570impl Drop for PartitionMetrics {
571 fn drop(&mut self) {
572 if !self.finished {
573 self.log_metrics();
574 }
575 }
576}
577
578impl ExecutionPlan for MergeScanExec {
579 fn as_any(&self) -> &dyn Any {
580 self
581 }
582
583 fn schema(&self) -> ArrowSchemaRef {
584 self.arrow_schema.clone()
585 }
586
587 fn properties(&self) -> &PlanProperties {
588 &self.properties
589 }
590
591 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
592 vec![]
593 }
594
595 fn with_new_children(
598 self: Arc<Self>,
599 _children: Vec<Arc<dyn ExecutionPlan>>,
600 ) -> Result<Arc<dyn ExecutionPlan>> {
601 Ok(self.clone())
602 }
603
604 fn execute(
605 &self,
606 partition: usize,
607 context: Arc<TaskContext>,
608 ) -> Result<SendableRecordBatchStream> {
609 self.to_stream(context, partition)
610 }
611
612 fn metrics(&self) -> Option<MetricsSet> {
613 Some(self.metric.clone_inner())
614 }
615
616 fn name(&self) -> &str {
617 "MergeScanExec"
618 }
619}
620
621impl DisplayAs for MergeScanExec {
622 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
623 write!(f, "MergeScanExec: peers=[")?;
624 for region_id in self.regions.iter() {
625 write!(f, "{}, ", region_id)?;
626 }
627 write!(f, "]")?;
628
629 if matches!(t, DisplayFormatType::Verbose) {
630 let partition_metrics = self.partition_metrics();
631 if !partition_metrics.is_empty() {
632 write!(f, ", metrics={{")?;
633 for (i, pm) in partition_metrics.iter().enumerate() {
634 if i > 0 {
635 write!(f, ", ")?;
636 }
637 write!(
638 f,
639 "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[",
640 pm.partition,
641 pm.total_regions,
642 pm.total_poll_duration,
643 pm.total_do_get_cost
644 )?;
645 for (j, rm) in pm.region_metrics.iter().enumerate() {
646 if j > 0 {
647 write!(f, ",")?;
648 }
649 write!(
650 f,
651 "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
652 rm.region_id, rm.poll_duration, rm.do_get_cost, rm.total_cost
653 )?;
654 }
655 write!(f, "]}}")?;
656 }
657 write!(f, "}}")?;
658 }
659 }
660
661 Ok(())
662 }
663}
664
665#[derive(Debug, Clone)]
666struct MergeScanMetric {
667 ready_time: Time,
669 first_consume_time: Time,
671 finish_time: Time,
673 output_rows: Count,
675
676 greptime_exec_cost: Gauge,
678}
679
680impl MergeScanMetric {
681 pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
682 Self {
683 ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
684 first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
685 finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
686 output_rows: MetricBuilder::new(metric).output_rows(1),
687 greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
688 }
689 }
690
691 pub fn ready_time(&self) -> &Time {
692 &self.ready_time
693 }
694
695 pub fn first_consume_time(&self) -> &Time {
696 &self.first_consume_time
697 }
698
699 pub fn finish_time(&self) -> &Time {
700 &self.finish_time
701 }
702
703 pub fn record_output_batch_rows(&self, num_rows: usize) {
704 self.output_rows.add(num_rows);
705 }
706
707 pub fn record_greptime_exec_cost(&self, metrics: usize) {
708 self.greptime_exec_cost.add(metrics);
709 }
710}