1use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_plugins::GREPTIME_EXEC_READ_COST;
24use common_query::request::QueryRequest;
25use common_recordbatch::adapter::RecordBatchMetrics;
26use common_telemetry::tracing_context::TracingContext;
27use datafusion::execution::{SessionState, TaskContext};
28use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
29use datafusion::physical_plan::metrics::{
30 Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
31};
32use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
33use datafusion::physical_plan::{
34 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
35 SendableRecordBatchStream,
36};
37use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
38use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
39use datafusion_physical_expr::expressions::Column;
40use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
41use futures_util::StreamExt;
42use greptime_proto::v1::region::RegionRequestHeader;
43use meter_core::data::ReadItem;
44use meter_macros::read_meter;
45use session::context::QueryContextRef;
46use store_api::storage::RegionId;
47use table::table_name::TableName;
48use tokio::time::Instant;
49use tracing::{Instrument, Span};
50
51use crate::dist_plan::analyzer::AliasMapping;
52use crate::dist_plan::analyzer::utils::patch_batch_timezone;
53use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
54use crate::region_query::RegionQueryHandlerRef;
55
56#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
57pub struct MergeScanLogicalPlan {
58 input: LogicalPlan,
60 is_placeholder: bool,
62 partition_cols: AliasMapping,
63}
64
65impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
66 fn name(&self) -> &str {
67 Self::name()
68 }
69
70 fn inputs(&self) -> Vec<&LogicalPlan> {
73 vec![]
74 }
75
76 fn schema(&self) -> &datafusion_common::DFSchemaRef {
77 self.input.schema()
78 }
79
80 fn expressions(&self) -> Vec<datafusion_expr::Expr> {
82 vec![]
83 }
84
85 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
86 write!(
87 f,
88 "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
89 self.is_placeholder, self.input
90 )
91 }
92
93 fn with_exprs_and_inputs(
94 &self,
95 _exprs: Vec<datafusion::prelude::Expr>,
96 _inputs: Vec<LogicalPlan>,
97 ) -> Result<Self> {
98 Ok(self.clone())
99 }
100}
101
102impl MergeScanLogicalPlan {
103 pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
104 Self {
105 input,
106 is_placeholder,
107 partition_cols,
108 }
109 }
110
111 pub fn name() -> &'static str {
112 "MergeScan"
113 }
114
115 pub fn into_logical_plan(self) -> LogicalPlan {
117 LogicalPlan::Extension(Extension {
118 node: Arc::new(self),
119 })
120 }
121
122 pub fn is_placeholder(&self) -> bool {
123 self.is_placeholder
124 }
125
126 pub fn input(&self) -> &LogicalPlan {
127 &self.input
128 }
129
130 pub fn partition_cols(&self) -> &AliasMapping {
131 &self.partition_cols
132 }
133}
134
135pub struct MergeScanExec {
136 table: TableName,
137 regions: Vec<RegionId>,
138 plan: LogicalPlan,
139 arrow_schema: ArrowSchemaRef,
140 region_query_handler: RegionQueryHandlerRef,
141 metric: ExecutionPlanMetricsSet,
142 properties: Arc<PlanProperties>,
143 sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
145 partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
147 query_ctx: QueryContextRef,
148 target_partition: usize,
149 partition_cols: AliasMapping,
150}
151
152impl std::fmt::Debug for MergeScanExec {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct("MergeScanExec")
155 .field("table", &self.table)
156 .field("regions", &self.regions)
157 .field("plan", &self.plan)
158 .finish()
159 }
160}
161
162impl MergeScanExec {
163 #[allow(clippy::too_many_arguments)]
164 pub fn new(
165 session_state: &SessionState,
166 table: TableName,
167 regions: Vec<RegionId>,
168 plan: LogicalPlan,
169 arrow_schema: &ArrowSchema,
170 region_query_handler: RegionQueryHandlerRef,
171 query_ctx: QueryContextRef,
172 target_partition: usize,
173 partition_cols: AliasMapping,
174 ) -> Result<Self> {
175 let arrow_schema = Arc::new(arrow_schema.clone());
179
180 let eq_properties = if let LogicalPlan::Sort(sort) = &plan
188 && target_partition >= regions.len()
189 {
190 let lex_ordering = sort
191 .expr
192 .iter()
193 .map(|sort_expr| {
194 let physical_expr = session_state
195 .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
196 Ok(PhysicalSortExpr::new(
197 physical_expr,
198 SortOptions {
199 descending: !sort_expr.asc,
200 nulls_first: sort_expr.nulls_first,
201 },
202 ))
203 })
204 .collect::<Result<Vec<_>>>()?;
205 EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
206 } else {
207 EquivalenceProperties::new(arrow_schema.clone())
208 };
209
210 let partition_exprs = partition_cols
211 .iter()
212 .filter_map(|col| {
213 if let Some(first_alias) = col.1.first() {
214 session_state
215 .create_physical_expr(
216 Expr::Column(ColumnExpr::new_unqualified(
217 first_alias.name().to_string(),
218 )),
219 plan.schema(),
220 )
221 .ok()
222 } else {
223 None
224 }
225 })
226 .collect();
227 let partitioning = Partitioning::Hash(partition_exprs, target_partition);
228
229 let properties = Arc::new(PlanProperties::new(
230 eq_properties,
231 partitioning,
232 EmissionType::Incremental,
233 Boundedness::Bounded,
234 ));
235 Ok(Self {
236 table,
237 regions,
238 plan,
239 arrow_schema,
240 region_query_handler,
241 metric: ExecutionPlanMetricsSet::new(),
242 sub_stage_metrics: Arc::default(),
243 partition_metrics: Arc::default(),
244 properties,
245 query_ctx,
246 target_partition,
247 partition_cols,
248 })
249 }
250
251 pub fn to_stream(
252 &self,
253 context: Arc<TaskContext>,
254 partition: usize,
255 ) -> Result<SendableRecordBatchStream> {
256 let regions = self.regions.clone();
258 let region_query_handler = self.region_query_handler.clone();
259 let metric = MergeScanMetric::new(&self.metric);
260 let arrow_schema = self.arrow_schema.clone();
261 let query_ctx = self.query_ctx.clone();
262 let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
263 let partition_metrics_moved = self.partition_metrics.clone();
264 let plan = self.plan.clone();
265 let target_partition = self.target_partition;
266 let dbname = context.task_id().unwrap_or_default();
267 let tracing_context = TracingContext::from_json(context.session_id().as_str());
268 let current_channel = self.query_ctx.channel();
269 let read_preference = self.query_ctx.read_preference();
270 let explain_verbose = self.query_ctx.explain_verbose();
271
272 let stream = Box::pin(stream!({
273 if partition == 0 {
275 MERGE_SCAN_REGIONS.observe(regions.len() as f64);
276 }
277
278 let _finish_timer = metric.finish_time().timer();
279 let mut ready_timer = metric.ready_time().timer();
280 let mut first_consume_timer = Some(metric.first_consume_time().timer());
281
282 for region_id in regions
283 .iter()
284 .skip(partition)
285 .step_by(target_partition)
286 .copied()
287 {
288 let region_span = tracing_context.attach(tracing::info_span!(
289 parent: &Span::current(),
290 "merge_scan_region",
291 region_id = %region_id,
292 partition = partition
293 ));
294 let request = QueryRequest {
295 header: Some(RegionRequestHeader {
296 tracing_context: tracing_context.to_w3c(),
297 dbname: dbname.clone(),
298 query_context: Some(query_ctx.as_ref().into()),
299 }),
300 region_id,
301 plan: plan.clone(),
302 };
303 let region_start = Instant::now();
304 let do_get_start = Instant::now();
305
306 if explain_verbose {
307 common_telemetry::info!(
308 "Merge scan one region, partition: {}, region_id: {}",
309 partition,
310 region_id
311 );
312 }
313
314 let mut stream = region_query_handler
315 .do_get(read_preference, request)
316 .instrument(region_span.clone())
317 .await
318 .map_err(|e| {
319 MERGE_SCAN_ERRORS_TOTAL.inc();
320 DataFusionError::External(Box::new(e))
321 })?;
322 let do_get_cost = do_get_start.elapsed();
323
324 ready_timer.stop();
325
326 let mut poll_duration = Duration::ZERO;
327 let mut poll_timer = Instant::now();
328 while let Some(batch) = stream.next().instrument(region_span.clone()).await {
329 let poll_elapsed = poll_timer.elapsed();
330 poll_duration += poll_elapsed;
331
332 let batch = batch.map_err(|e| DataFusionError::External(Box::new(e)))?;
333 let batch = patch_batch_timezone(
334 arrow_schema.clone(),
335 batch.into_df_record_batch().columns().to_vec(),
336 )?;
337 metric.record_output_batch_rows(batch.num_rows());
338 if let Some(mut first_consume_timer) = first_consume_timer.take() {
339 first_consume_timer.stop();
340 }
341
342 if let Some(metrics) = stream.metrics() {
343 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
344 sub_stage_metrics.insert(region_id, metrics);
345 }
346
347 yield Ok(batch);
348 poll_timer = Instant::now();
350 }
351 let total_cost = region_start.elapsed();
352
353 let region_metrics = RegionMetrics {
355 region_id,
356 poll_duration,
357 do_get_cost,
358 total_cost,
359 };
360
361 {
363 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
364 let partition_metrics = partition_metrics_guard
365 .entry(partition)
366 .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
367 partition_metrics.add_region_metrics(region_metrics);
368 }
369
370 if explain_verbose {
371 common_telemetry::info!(
372 "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
373 partition,
374 region_id,
375 poll_duration,
376 metric.first_consume_time(),
377 do_get_cost
378 );
379 }
380
381 if let Some(metrics) = stream.metrics() {
383 let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
384 let value = read_meter!(
385 c,
386 s,
387 ReadItem {
388 cpu_time: metrics.elapsed_compute as u64,
389 table_scan: metrics.memory_usage as u64
390 },
391 current_channel as u8
392 );
393 metric.record_greptime_exec_cost(value as usize);
394
395 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
397 sub_stage_metrics.insert(region_id, metrics);
398 }
399
400 MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
401 }
402
403 {
405 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
406 if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
407 partition_metrics.finish();
408 }
409 }
410 }));
411
412 Ok(Box::pin(RecordBatchStreamAdapter::new(
413 self.arrow_schema.clone(),
414 stream,
415 )))
416 }
417
418 pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
419 let Distribution::HashPartitioned(hash_exprs) = distribution else {
420 return None;
422 };
423
424 if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
425 && curr_dist == &hash_exprs
426 {
427 return None;
429 }
430
431 let all_partition_col_aliases: HashSet<_> = self
432 .partition_cols
433 .values()
434 .flat_map(|aliases| aliases.iter().map(|c| c.name()))
435 .collect();
436 let overlaps: Vec<_> = hash_exprs
437 .iter()
438 .filter(|expr| {
439 expr.as_any()
440 .downcast_ref::<Column>()
441 .is_some_and(|col_expr| all_partition_col_aliases.contains(col_expr.name()))
442 })
443 .cloned()
444 .collect();
445
446 if overlaps.is_empty() {
447 return None;
448 }
449
450 Some(Self {
451 table: self.table.clone(),
452 regions: self.regions.clone(),
453 plan: self.plan.clone(),
454 arrow_schema: self.arrow_schema.clone(),
455 region_query_handler: self.region_query_handler.clone(),
456 metric: self.metric.clone(),
457 properties: Arc::new(PlanProperties::new(
458 self.properties.eq_properties.clone(),
459 Partitioning::Hash(overlaps, self.target_partition),
460 self.properties.emission_type,
461 self.properties.boundedness,
462 )),
463 sub_stage_metrics: self.sub_stage_metrics.clone(),
464 partition_metrics: self.partition_metrics.clone(),
465 query_ctx: self.query_ctx.clone(),
466 target_partition: self.target_partition,
467 partition_cols: self.partition_cols.clone(),
468 })
469 }
470
471 pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
472 self.sub_stage_metrics
473 .lock()
474 .unwrap()
475 .values()
476 .cloned()
477 .collect()
478 }
479
480 pub fn regions(&self) -> &[RegionId] {
481 &self.regions
482 }
483
484 pub fn partition_count(&self) -> usize {
485 self.target_partition
486 }
487
488 pub fn region_count(&self) -> usize {
489 self.regions.len()
490 }
491
492 fn partition_metrics(&self) -> Vec<PartitionMetrics> {
493 self.partition_metrics
494 .lock()
495 .unwrap()
496 .values()
497 .cloned()
498 .collect()
499 }
500}
501
502#[derive(Debug, Clone)]
504struct RegionMetrics {
505 region_id: RegionId,
506 poll_duration: Duration,
507 do_get_cost: Duration,
508 total_cost: Duration,
510}
511
512#[derive(Debug, Clone)]
514struct PartitionMetrics {
515 partition: usize,
516 region_metrics: Vec<RegionMetrics>,
517 total_poll_duration: Duration,
518 total_do_get_cost: Duration,
519 total_regions: usize,
520 explain_verbose: bool,
521 finished: bool,
522}
523
524impl PartitionMetrics {
525 fn new(partition: usize, explain_verbose: bool) -> Self {
526 Self {
527 partition,
528 region_metrics: Vec::new(),
529 total_poll_duration: Duration::ZERO,
530 total_do_get_cost: Duration::ZERO,
531 total_regions: 0,
532 explain_verbose,
533 finished: false,
534 }
535 }
536
537 fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
538 self.total_poll_duration += region_metrics.poll_duration;
539 self.total_do_get_cost += region_metrics.do_get_cost;
540 self.total_regions += 1;
541 self.region_metrics.push(region_metrics);
542 }
543
544 fn finish(&mut self) {
546 if self.finished {
547 return;
548 }
549 self.finished = true;
550 self.log_metrics();
551 }
552
553 fn log_metrics(&self) {
555 if self.explain_verbose {
556 common_telemetry::info!(
557 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
558 self.partition,
559 self.total_regions,
560 self.total_poll_duration,
561 self.total_do_get_cost
562 );
563 } else {
564 common_telemetry::debug!(
565 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
566 self.partition,
567 self.total_regions,
568 self.total_poll_duration,
569 self.total_do_get_cost
570 );
571 }
572 }
573}
574
575impl Drop for PartitionMetrics {
576 fn drop(&mut self) {
577 if !self.finished {
578 self.log_metrics();
579 }
580 }
581}
582
583impl ExecutionPlan for MergeScanExec {
584 fn as_any(&self) -> &dyn Any {
585 self
586 }
587
588 fn schema(&self) -> ArrowSchemaRef {
589 self.arrow_schema.clone()
590 }
591
592 fn properties(&self) -> &Arc<PlanProperties> {
593 &self.properties
594 }
595
596 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
597 vec![]
598 }
599
600 fn with_new_children(
603 self: Arc<Self>,
604 _children: Vec<Arc<dyn ExecutionPlan>>,
605 ) -> Result<Arc<dyn ExecutionPlan>> {
606 Ok(self.clone())
607 }
608
609 fn execute(
610 &self,
611 partition: usize,
612 context: Arc<TaskContext>,
613 ) -> Result<SendableRecordBatchStream> {
614 self.to_stream(context, partition)
615 }
616
617 fn metrics(&self) -> Option<MetricsSet> {
618 Some(self.metric.clone_inner())
619 }
620
621 fn name(&self) -> &str {
622 "MergeScanExec"
623 }
624}
625
626impl DisplayAs for MergeScanExec {
627 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
628 write!(f, "MergeScanExec: peers=[")?;
629 for region_id in self.regions.iter() {
630 write!(f, "{}, ", region_id)?;
631 }
632 write!(f, "]")?;
633
634 if matches!(t, DisplayFormatType::Verbose) {
635 let partition_metrics = self.partition_metrics();
636 if !partition_metrics.is_empty() {
637 write!(f, ", metrics={{")?;
638 for (i, pm) in partition_metrics.iter().enumerate() {
639 if i > 0 {
640 write!(f, ", ")?;
641 }
642 write!(
643 f,
644 "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[",
645 pm.partition,
646 pm.total_regions,
647 pm.total_poll_duration,
648 pm.total_do_get_cost
649 )?;
650 for (j, rm) in pm.region_metrics.iter().enumerate() {
651 if j > 0 {
652 write!(f, ",")?;
653 }
654 write!(
655 f,
656 "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
657 rm.region_id, rm.poll_duration, rm.do_get_cost, rm.total_cost
658 )?;
659 }
660 write!(f, "]}}")?;
661 }
662 write!(f, "}}")?;
663 }
664 }
665
666 Ok(())
667 }
668}
669
670#[derive(Debug, Clone)]
671struct MergeScanMetric {
672 ready_time: Time,
674 first_consume_time: Time,
676 finish_time: Time,
678 output_rows: Count,
680
681 greptime_exec_cost: Gauge,
683}
684
685impl MergeScanMetric {
686 pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
687 Self {
688 ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
689 first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
690 finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
691 output_rows: MetricBuilder::new(metric).output_rows(1),
692 greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
693 }
694 }
695
696 pub fn ready_time(&self) -> &Time {
697 &self.ready_time
698 }
699
700 pub fn first_consume_time(&self) -> &Time {
701 &self.first_consume_time
702 }
703
704 pub fn finish_time(&self) -> &Time {
705 &self.finish_time
706 }
707
708 pub fn record_output_batch_rows(&self, num_rows: usize) {
709 self.output_rows.add(num_rows);
710 }
711
712 pub fn record_greptime_exec_cost(&self, metrics: usize) {
713 self.greptime_exec_cost.add(metrics);
714 }
715}