1use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_plugins::GREPTIME_EXEC_READ_COST;
24use common_query::request::QueryRequest;
25use common_recordbatch::adapter::RecordBatchMetrics;
26use common_telemetry::tracing_context::TracingContext;
27use datafusion::arrow::record_batch::RecordBatch;
28use datafusion::execution::{SessionState, TaskContext};
29use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
30use datafusion::physical_plan::metrics::{
31 Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
32};
33use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
34use datafusion::physical_plan::{
35 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
36 SendableRecordBatchStream,
37};
38use datafusion_common::{Column as ColumnExpr, DataFusionError, Result};
39use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
40use datafusion_physical_expr::expressions::Column;
41use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
42use futures_util::StreamExt;
43use greptime_proto::v1::region::RegionRequestHeader;
44use meter_core::data::ReadItem;
45use meter_macros::read_meter;
46use session::context::QueryContextRef;
47use store_api::storage::RegionId;
48use table::table_name::TableName;
49use tokio::time::Instant;
50
51use crate::dist_plan::analyzer::AliasMapping;
52use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
53use crate::region_query::RegionQueryHandlerRef;
54
55#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
56pub struct MergeScanLogicalPlan {
57 input: LogicalPlan,
59 is_placeholder: bool,
61 partition_cols: AliasMapping,
62}
63
64impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
65 fn name(&self) -> &str {
66 Self::name()
67 }
68
69 fn inputs(&self) -> Vec<&LogicalPlan> {
72 vec![]
73 }
74
75 fn schema(&self) -> &datafusion_common::DFSchemaRef {
76 self.input.schema()
77 }
78
79 fn expressions(&self) -> Vec<datafusion_expr::Expr> {
81 vec![]
82 }
83
84 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
85 write!(
86 f,
87 "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
88 self.is_placeholder, self.input
89 )
90 }
91
92 fn with_exprs_and_inputs(
93 &self,
94 _exprs: Vec<datafusion::prelude::Expr>,
95 _inputs: Vec<LogicalPlan>,
96 ) -> Result<Self> {
97 Ok(self.clone())
98 }
99}
100
101impl MergeScanLogicalPlan {
102 pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
103 Self {
104 input,
105 is_placeholder,
106 partition_cols,
107 }
108 }
109
110 pub fn name() -> &'static str {
111 "MergeScan"
112 }
113
114 pub fn into_logical_plan(self) -> LogicalPlan {
116 LogicalPlan::Extension(Extension {
117 node: Arc::new(self),
118 })
119 }
120
121 pub fn is_placeholder(&self) -> bool {
122 self.is_placeholder
123 }
124
125 pub fn input(&self) -> &LogicalPlan {
126 &self.input
127 }
128
129 pub fn partition_cols(&self) -> &AliasMapping {
130 &self.partition_cols
131 }
132}
133
134pub struct MergeScanExec {
135 table: TableName,
136 regions: Vec<RegionId>,
137 plan: LogicalPlan,
138 arrow_schema: ArrowSchemaRef,
139 region_query_handler: RegionQueryHandlerRef,
140 metric: ExecutionPlanMetricsSet,
141 properties: PlanProperties,
142 sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
144 partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
146 query_ctx: QueryContextRef,
147 target_partition: usize,
148 partition_cols: AliasMapping,
149}
150
151impl std::fmt::Debug for MergeScanExec {
152 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
153 f.debug_struct("MergeScanExec")
154 .field("table", &self.table)
155 .field("regions", &self.regions)
156 .field("plan", &self.plan)
157 .finish()
158 }
159}
160
161impl MergeScanExec {
162 #[allow(clippy::too_many_arguments)]
163 pub fn new(
164 session_state: &SessionState,
165 table: TableName,
166 regions: Vec<RegionId>,
167 plan: LogicalPlan,
168 arrow_schema: &ArrowSchema,
169 region_query_handler: RegionQueryHandlerRef,
170 query_ctx: QueryContextRef,
171 target_partition: usize,
172 partition_cols: AliasMapping,
173 ) -> Result<Self> {
174 let arrow_schema = Arc::new(arrow_schema.clone());
178
179 let eq_properties = if let LogicalPlan::Sort(sort) = &plan
187 && target_partition >= regions.len()
188 {
189 let lex_ordering = sort
190 .expr
191 .iter()
192 .map(|sort_expr| {
193 let physical_expr = session_state
194 .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
195 Ok(PhysicalSortExpr::new(
196 physical_expr,
197 SortOptions {
198 descending: !sort_expr.asc,
199 nulls_first: sort_expr.nulls_first,
200 },
201 ))
202 })
203 .collect::<Result<Vec<_>>>()?;
204 EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
205 } else {
206 EquivalenceProperties::new(arrow_schema.clone())
207 };
208
209 let partition_exprs = partition_cols
210 .iter()
211 .filter_map(|col| {
212 if let Some(first_alias) = col.1.first() {
213 session_state
214 .create_physical_expr(
215 Expr::Column(ColumnExpr::new_unqualified(
216 first_alias.name().to_string(),
217 )),
218 plan.schema(),
219 )
220 .ok()
221 } else {
222 None
223 }
224 })
225 .collect();
226 let partitioning = Partitioning::Hash(partition_exprs, target_partition);
227
228 let properties = PlanProperties::new(
229 eq_properties,
230 partitioning,
231 EmissionType::Incremental,
232 Boundedness::Bounded,
233 );
234 Ok(Self {
235 table,
236 regions,
237 plan,
238 arrow_schema,
239 region_query_handler,
240 metric: ExecutionPlanMetricsSet::new(),
241 sub_stage_metrics: Arc::default(),
242 partition_metrics: Arc::default(),
243 properties,
244 query_ctx,
245 target_partition,
246 partition_cols,
247 })
248 }
249
250 pub fn to_stream(
251 &self,
252 context: Arc<TaskContext>,
253 partition: usize,
254 ) -> Result<SendableRecordBatchStream> {
255 let regions = self.regions.clone();
257 let region_query_handler = self.region_query_handler.clone();
258 let metric = MergeScanMetric::new(&self.metric);
259 let arrow_schema = self.arrow_schema.clone();
260 let query_ctx = self.query_ctx.clone();
261 let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
262 let partition_metrics_moved = self.partition_metrics.clone();
263 let plan = self.plan.clone();
264 let target_partition = self.target_partition;
265 let dbname = context.task_id().unwrap_or_default();
266 let tracing_context = TracingContext::from_json(context.session_id().as_str());
267 let current_channel = self.query_ctx.channel();
268 let read_preference = self.query_ctx.read_preference();
269 let explain_verbose = self.query_ctx.explain_verbose();
270
271 let stream = Box::pin(stream!({
272 if partition == 0 {
274 MERGE_SCAN_REGIONS.observe(regions.len() as f64);
275 }
276
277 let _finish_timer = metric.finish_time().timer();
278 let mut ready_timer = metric.ready_time().timer();
279 let mut first_consume_timer = Some(metric.first_consume_time().timer());
280
281 for region_id in regions
282 .iter()
283 .skip(partition)
284 .step_by(target_partition)
285 .copied()
286 {
287 let request = QueryRequest {
288 header: Some(RegionRequestHeader {
289 tracing_context: tracing_context.to_w3c(),
290 dbname: dbname.clone(),
291 query_context: Some(query_ctx.as_ref().into()),
292 }),
293 region_id,
294 plan: plan.clone(),
295 };
296 let region_start = Instant::now();
297 let do_get_start = Instant::now();
298
299 if explain_verbose {
300 common_telemetry::info!(
301 "Merge scan one region, partition: {}, region_id: {}",
302 partition,
303 region_id
304 );
305 }
306
307 let mut stream = region_query_handler
308 .do_get(read_preference, request)
309 .await
310 .map_err(|e| {
311 MERGE_SCAN_ERRORS_TOTAL.inc();
312 DataFusionError::External(Box::new(e))
313 })?;
314 let do_get_cost = do_get_start.elapsed();
315
316 ready_timer.stop();
317
318 let mut poll_duration = Duration::ZERO;
319 let mut poll_timer = Instant::now();
320 while let Some(batch) = stream.next().await {
321 let poll_elapsed = poll_timer.elapsed();
322 poll_duration += poll_elapsed;
323
324 let batch = batch.map_err(|e| DataFusionError::External(Box::new(e)))?;
325 let batch = RecordBatch::try_new(
326 arrow_schema.clone(),
327 batch.into_df_record_batch().columns().to_vec(),
328 )?;
329 metric.record_output_batch_rows(batch.num_rows());
330 if let Some(mut first_consume_timer) = first_consume_timer.take() {
331 first_consume_timer.stop();
332 }
333
334 if let Some(metrics) = stream.metrics() {
335 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
336 sub_stage_metrics.insert(region_id, metrics);
337 }
338
339 yield Ok(batch);
340 poll_timer = Instant::now();
342 }
343 let total_cost = region_start.elapsed();
344
345 let region_metrics = RegionMetrics {
347 region_id,
348 poll_duration,
349 do_get_cost,
350 total_cost,
351 };
352
353 {
355 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
356 let partition_metrics = partition_metrics_guard
357 .entry(partition)
358 .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
359 partition_metrics.add_region_metrics(region_metrics);
360 }
361
362 if explain_verbose {
363 common_telemetry::info!(
364 "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
365 partition,
366 region_id,
367 poll_duration,
368 metric.first_consume_time(),
369 do_get_cost
370 );
371 }
372
373 if let Some(metrics) = stream.metrics() {
375 let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
376 let value = read_meter!(
377 c,
378 s,
379 ReadItem {
380 cpu_time: metrics.elapsed_compute as u64,
381 table_scan: metrics.memory_usage as u64
382 },
383 current_channel as u8
384 );
385 metric.record_greptime_exec_cost(value as usize);
386
387 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
389 sub_stage_metrics.insert(region_id, metrics);
390 }
391
392 MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
393 }
394
395 {
397 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
398 if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
399 partition_metrics.finish();
400 }
401 }
402 }));
403
404 Ok(Box::pin(RecordBatchStreamAdapter::new(
405 self.arrow_schema.clone(),
406 stream,
407 )))
408 }
409
410 pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
411 let Distribution::HashPartitioned(hash_exprs) = distribution else {
412 return None;
414 };
415
416 if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
417 && curr_dist == &hash_exprs
418 {
419 return None;
421 }
422
423 let all_partition_col_aliases: HashSet<_> = self
424 .partition_cols
425 .values()
426 .flat_map(|aliases| aliases.iter().map(|c| c.name()))
427 .collect();
428 let mut overlaps = vec![];
429 for expr in &hash_exprs {
430 if let Some(col_expr) = expr.as_any().downcast_ref::<Column>()
431 && all_partition_col_aliases.contains(col_expr.name())
432 {
433 overlaps.push(expr.clone());
434 }
435 }
436
437 if overlaps.is_empty() {
438 return None;
439 }
440
441 Some(Self {
442 table: self.table.clone(),
443 regions: self.regions.clone(),
444 plan: self.plan.clone(),
445 arrow_schema: self.arrow_schema.clone(),
446 region_query_handler: self.region_query_handler.clone(),
447 metric: self.metric.clone(),
448 properties: PlanProperties::new(
449 self.properties.eq_properties.clone(),
450 Partitioning::Hash(overlaps, self.target_partition),
451 self.properties.emission_type,
452 self.properties.boundedness,
453 ),
454 sub_stage_metrics: self.sub_stage_metrics.clone(),
455 partition_metrics: self.partition_metrics.clone(),
456 query_ctx: self.query_ctx.clone(),
457 target_partition: self.target_partition,
458 partition_cols: self.partition_cols.clone(),
459 })
460 }
461
462 pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
463 self.sub_stage_metrics
464 .lock()
465 .unwrap()
466 .values()
467 .cloned()
468 .collect()
469 }
470
471 pub fn partition_count(&self) -> usize {
472 self.target_partition
473 }
474
475 pub fn region_count(&self) -> usize {
476 self.regions.len()
477 }
478
479 fn partition_metrics(&self) -> Vec<PartitionMetrics> {
480 self.partition_metrics
481 .lock()
482 .unwrap()
483 .values()
484 .cloned()
485 .collect()
486 }
487}
488
489#[derive(Debug, Clone)]
491struct RegionMetrics {
492 region_id: RegionId,
493 poll_duration: Duration,
494 do_get_cost: Duration,
495 total_cost: Duration,
497}
498
499#[derive(Debug, Clone)]
501struct PartitionMetrics {
502 partition: usize,
503 region_metrics: Vec<RegionMetrics>,
504 total_poll_duration: Duration,
505 total_do_get_cost: Duration,
506 total_regions: usize,
507 explain_verbose: bool,
508 finished: bool,
509}
510
511impl PartitionMetrics {
512 fn new(partition: usize, explain_verbose: bool) -> Self {
513 Self {
514 partition,
515 region_metrics: Vec::new(),
516 total_poll_duration: Duration::ZERO,
517 total_do_get_cost: Duration::ZERO,
518 total_regions: 0,
519 explain_verbose,
520 finished: false,
521 }
522 }
523
524 fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
525 self.total_poll_duration += region_metrics.poll_duration;
526 self.total_do_get_cost += region_metrics.do_get_cost;
527 self.total_regions += 1;
528 self.region_metrics.push(region_metrics);
529 }
530
531 fn finish(&mut self) {
533 if self.finished {
534 return;
535 }
536 self.finished = true;
537 self.log_metrics();
538 }
539
540 fn log_metrics(&self) {
542 if self.explain_verbose {
543 common_telemetry::info!(
544 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
545 self.partition,
546 self.total_regions,
547 self.total_poll_duration,
548 self.total_do_get_cost
549 );
550 } else {
551 common_telemetry::debug!(
552 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
553 self.partition,
554 self.total_regions,
555 self.total_poll_duration,
556 self.total_do_get_cost
557 );
558 }
559 }
560}
561
562impl Drop for PartitionMetrics {
563 fn drop(&mut self) {
564 if !self.finished {
565 self.log_metrics();
566 }
567 }
568}
569
570impl ExecutionPlan for MergeScanExec {
571 fn as_any(&self) -> &dyn Any {
572 self
573 }
574
575 fn schema(&self) -> ArrowSchemaRef {
576 self.arrow_schema.clone()
577 }
578
579 fn properties(&self) -> &PlanProperties {
580 &self.properties
581 }
582
583 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
584 vec![]
585 }
586
587 fn with_new_children(
590 self: Arc<Self>,
591 _children: Vec<Arc<dyn ExecutionPlan>>,
592 ) -> Result<Arc<dyn ExecutionPlan>> {
593 Ok(self.clone())
594 }
595
596 fn execute(
597 &self,
598 partition: usize,
599 context: Arc<TaskContext>,
600 ) -> Result<SendableRecordBatchStream> {
601 self.to_stream(context, partition)
602 }
603
604 fn metrics(&self) -> Option<MetricsSet> {
605 Some(self.metric.clone_inner())
606 }
607
608 fn name(&self) -> &str {
609 "MergeScanExec"
610 }
611}
612
613impl DisplayAs for MergeScanExec {
614 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
615 write!(f, "MergeScanExec: peers=[")?;
616 for region_id in self.regions.iter() {
617 write!(f, "{}, ", region_id)?;
618 }
619 write!(f, "]")?;
620
621 if matches!(t, DisplayFormatType::Verbose) {
622 let partition_metrics = self.partition_metrics();
623 if !partition_metrics.is_empty() {
624 write!(f, ", metrics={{")?;
625 for (i, pm) in partition_metrics.iter().enumerate() {
626 if i > 0 {
627 write!(f, ", ")?;
628 }
629 write!(
630 f,
631 "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[",
632 pm.partition,
633 pm.total_regions,
634 pm.total_poll_duration,
635 pm.total_do_get_cost
636 )?;
637 for (j, rm) in pm.region_metrics.iter().enumerate() {
638 if j > 0 {
639 write!(f, ",")?;
640 }
641 write!(
642 f,
643 "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
644 rm.region_id, rm.poll_duration, rm.do_get_cost, rm.total_cost
645 )?;
646 }
647 write!(f, "]}}")?;
648 }
649 write!(f, "}}")?;
650 }
651 }
652
653 Ok(())
654 }
655}
656
657#[derive(Debug, Clone)]
658struct MergeScanMetric {
659 ready_time: Time,
661 first_consume_time: Time,
663 finish_time: Time,
665 output_rows: Count,
667
668 greptime_exec_cost: Gauge,
670}
671
672impl MergeScanMetric {
673 pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
674 Self {
675 ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
676 first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
677 finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
678 output_rows: MetricBuilder::new(metric).output_rows(1),
679 greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
680 }
681 }
682
683 pub fn ready_time(&self) -> &Time {
684 &self.ready_time
685 }
686
687 pub fn first_consume_time(&self) -> &Time {
688 &self.first_consume_time
689 }
690
691 pub fn finish_time(&self) -> &Time {
692 &self.finish_time
693 }
694
695 pub fn record_output_batch_rows(&self, num_rows: usize) {
696 self.output_rows.add(num_rows);
697 }
698
699 pub fn record_greptime_exec_cost(&self, metrics: usize) {
700 self.greptime_exec_cost.add(metrics);
701 }
702}