1use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::BoxedError;
24use common_plugins::GREPTIME_EXEC_READ_COST;
25use common_query::request::QueryRequest;
26use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
27use common_recordbatch::error::ExternalSnafu;
28use common_recordbatch::{
29 DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
30};
31use common_telemetry::tracing_context::TracingContext;
32use datafusion::execution::{SessionState, TaskContext};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{
35 Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
36};
37use datafusion::physical_plan::{
38 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
39};
40use datafusion_common::{Column as ColumnExpr, Result};
41use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
44use datatypes::schema::{Schema, SchemaRef};
45use futures_util::StreamExt;
46use greptime_proto::v1::region::RegionRequestHeader;
47use meter_core::data::ReadItem;
48use meter_macros::read_meter;
49use session::context::QueryContextRef;
50use snafu::ResultExt;
51use store_api::storage::RegionId;
52use table::table_name::TableName;
53use tokio::time::Instant;
54
55use crate::dist_plan::analyzer::AliasMapping;
56use crate::error::ConvertSchemaSnafu;
57use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
58use crate::region_query::RegionQueryHandlerRef;
59
60#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
61pub struct MergeScanLogicalPlan {
62 input: LogicalPlan,
64 is_placeholder: bool,
66 partition_cols: AliasMapping,
67}
68
69impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
70 fn name(&self) -> &str {
71 Self::name()
72 }
73
74 fn inputs(&self) -> Vec<&LogicalPlan> {
77 vec![]
78 }
79
80 fn schema(&self) -> &datafusion_common::DFSchemaRef {
81 self.input.schema()
82 }
83
84 fn expressions(&self) -> Vec<datafusion_expr::Expr> {
86 vec![]
87 }
88
89 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
90 write!(
91 f,
92 "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
93 self.is_placeholder, self.input
94 )
95 }
96
97 fn with_exprs_and_inputs(
98 &self,
99 _exprs: Vec<datafusion::prelude::Expr>,
100 _inputs: Vec<LogicalPlan>,
101 ) -> Result<Self> {
102 Ok(self.clone())
103 }
104}
105
106impl MergeScanLogicalPlan {
107 pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: AliasMapping) -> Self {
108 Self {
109 input,
110 is_placeholder,
111 partition_cols,
112 }
113 }
114
115 pub fn name() -> &'static str {
116 "MergeScan"
117 }
118
119 pub fn into_logical_plan(self) -> LogicalPlan {
121 LogicalPlan::Extension(Extension {
122 node: Arc::new(self),
123 })
124 }
125
126 pub fn is_placeholder(&self) -> bool {
127 self.is_placeholder
128 }
129
130 pub fn input(&self) -> &LogicalPlan {
131 &self.input
132 }
133
134 pub fn partition_cols(&self) -> &AliasMapping {
135 &self.partition_cols
136 }
137}
138
139pub struct MergeScanExec {
140 table: TableName,
141 regions: Vec<RegionId>,
142 plan: LogicalPlan,
143 schema: SchemaRef,
144 arrow_schema: ArrowSchemaRef,
145 region_query_handler: RegionQueryHandlerRef,
146 metric: ExecutionPlanMetricsSet,
147 properties: PlanProperties,
148 sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
150 partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
152 query_ctx: QueryContextRef,
153 target_partition: usize,
154 partition_cols: AliasMapping,
155}
156
157impl std::fmt::Debug for MergeScanExec {
158 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159 f.debug_struct("MergeScanExec")
160 .field("table", &self.table)
161 .field("regions", &self.regions)
162 .field("schema", &self.schema)
163 .field("plan", &self.plan)
164 .finish()
165 }
166}
167
168impl MergeScanExec {
169 #[allow(clippy::too_many_arguments)]
170 pub fn new(
171 session_state: &SessionState,
172 table: TableName,
173 regions: Vec<RegionId>,
174 plan: LogicalPlan,
175 arrow_schema: &ArrowSchema,
176 region_query_handler: RegionQueryHandlerRef,
177 query_ctx: QueryContextRef,
178 target_partition: usize,
179 partition_cols: AliasMapping,
180 ) -> Result<Self> {
181 let arrow_schema = Arc::new(arrow_schema.clone());
185
186 let eq_properties = if let LogicalPlan::Sort(sort) = &plan
194 && target_partition >= regions.len()
195 {
196 let lex_ordering = sort
197 .expr
198 .iter()
199 .map(|sort_expr| {
200 let physical_expr = session_state
201 .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
202 Ok(PhysicalSortExpr::new(
203 physical_expr,
204 SortOptions {
205 descending: !sort_expr.asc,
206 nulls_first: sort_expr.nulls_first,
207 },
208 ))
209 })
210 .collect::<Result<Vec<_>>>()?;
211 EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
212 } else {
213 EquivalenceProperties::new(arrow_schema.clone())
214 };
215
216 let partition_exprs = partition_cols
217 .iter()
218 .filter_map(|col| {
219 if let Some(first_alias) = col.1.first() {
220 session_state
221 .create_physical_expr(
222 Expr::Column(ColumnExpr::new_unqualified(
223 first_alias.name().to_string(),
224 )),
225 plan.schema(),
226 )
227 .ok()
228 } else {
229 None
230 }
231 })
232 .collect();
233 let partitioning = Partitioning::Hash(partition_exprs, target_partition);
234
235 let properties = PlanProperties::new(
236 eq_properties,
237 partitioning,
238 EmissionType::Incremental,
239 Boundedness::Bounded,
240 );
241 let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;
242 Ok(Self {
243 table,
244 regions,
245 plan,
246 schema,
247 arrow_schema,
248 region_query_handler,
249 metric: ExecutionPlanMetricsSet::new(),
250 sub_stage_metrics: Arc::default(),
251 partition_metrics: Arc::default(),
252 properties,
253 query_ctx,
254 target_partition,
255 partition_cols,
256 })
257 }
258
259 pub fn to_stream(
260 &self,
261 context: Arc<TaskContext>,
262 partition: usize,
263 ) -> Result<SendableRecordBatchStream> {
264 let regions = self.regions.clone();
266 let region_query_handler = self.region_query_handler.clone();
267 let metric = MergeScanMetric::new(&self.metric);
268 let schema = self.schema.clone();
269 let query_ctx = self.query_ctx.clone();
270 let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
271 let partition_metrics_moved = self.partition_metrics.clone();
272 let plan = self.plan.clone();
273 let target_partition = self.target_partition;
274 let dbname = context.task_id().unwrap_or_default();
275 let tracing_context = TracingContext::from_json(context.session_id().as_str());
276 let current_channel = self.query_ctx.channel();
277 let read_preference = self.query_ctx.read_preference();
278 let explain_verbose = self.query_ctx.explain_verbose();
279
280 let stream = Box::pin(stream!({
281 if partition == 0 {
283 MERGE_SCAN_REGIONS.observe(regions.len() as f64);
284 }
285
286 let _finish_timer = metric.finish_time().timer();
287 let mut ready_timer = metric.ready_time().timer();
288 let mut first_consume_timer = Some(metric.first_consume_time().timer());
289
290 for region_id in regions
291 .iter()
292 .skip(partition)
293 .step_by(target_partition)
294 .copied()
295 {
296 let request = QueryRequest {
297 header: Some(RegionRequestHeader {
298 tracing_context: tracing_context.to_w3c(),
299 dbname: dbname.clone(),
300 query_context: Some(query_ctx.as_ref().into()),
301 }),
302 region_id,
303 plan: plan.clone(),
304 };
305 let region_start = Instant::now();
306 let do_get_start = Instant::now();
307
308 if explain_verbose {
309 common_telemetry::info!(
310 "Merge scan one region, partition: {}, region_id: {}",
311 partition,
312 region_id
313 );
314 }
315
316 let mut stream = region_query_handler
317 .do_get(read_preference, request)
318 .await
319 .map_err(|e| {
320 MERGE_SCAN_ERRORS_TOTAL.inc();
321 BoxedError::new(e)
322 })
323 .context(ExternalSnafu)?;
324 let do_get_cost = do_get_start.elapsed();
325
326 ready_timer.stop();
327
328 let mut poll_duration = Duration::ZERO;
329 let mut poll_timer = Instant::now();
330 while let Some(batch) = stream.next().await {
331 let poll_elapsed = poll_timer.elapsed();
332 poll_duration += poll_elapsed;
333
334 let batch = batch?;
335 let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
338 metric.record_output_batch_rows(batch.num_rows());
339 if let Some(mut first_consume_timer) = first_consume_timer.take() {
340 first_consume_timer.stop();
341 }
342
343 if let Some(metrics) = stream.metrics() {
344 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
345 sub_stage_metrics.insert(region_id, metrics);
346 }
347
348 yield Ok(batch);
349 poll_timer = Instant::now();
351 }
352 let total_cost = region_start.elapsed();
353
354 let region_metrics = RegionMetrics {
356 region_id,
357 poll_duration,
358 do_get_cost,
359 total_cost,
360 };
361
362 {
364 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
365 let partition_metrics = partition_metrics_guard
366 .entry(partition)
367 .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
368 partition_metrics.add_region_metrics(region_metrics);
369 }
370
371 if explain_verbose {
372 common_telemetry::info!(
373 "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
374 partition,
375 region_id,
376 poll_duration,
377 metric.first_consume_time(),
378 do_get_cost
379 );
380 }
381
382 if let Some(metrics) = stream.metrics() {
384 let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
385 let value = read_meter!(
386 c,
387 s,
388 ReadItem {
389 cpu_time: metrics.elapsed_compute as u64,
390 table_scan: metrics.memory_usage as u64
391 },
392 current_channel as u8
393 );
394 metric.record_greptime_exec_cost(value as usize);
395
396 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
398 sub_stage_metrics.insert(region_id, metrics);
399 }
400
401 MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
402 }
403
404 {
406 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
407 if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
408 partition_metrics.finish();
409 }
410 }
411 }));
412
413 Ok(Box::pin(RecordBatchStreamWrapper {
414 schema: self.schema.clone(),
415 stream,
416 output_ordering: None,
417 metrics: Default::default(),
418 }))
419 }
420
421 pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
422 let Distribution::HashPartitioned(hash_exprs) = distribution else {
423 return None;
425 };
426
427 if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
428 && curr_dist == &hash_exprs
429 {
430 return None;
432 }
433
434 let all_partition_col_aliases: HashSet<_> = self
435 .partition_cols
436 .values()
437 .flat_map(|aliases| aliases.iter().map(|c| c.name()))
438 .collect();
439 let mut overlaps = vec![];
440 for expr in &hash_exprs {
441 if let Some(col_expr) = expr.as_any().downcast_ref::<Column>()
442 && all_partition_col_aliases.contains(col_expr.name())
443 {
444 overlaps.push(expr.clone());
445 }
446 }
447
448 if overlaps.is_empty() {
449 return None;
450 }
451
452 Some(Self {
453 table: self.table.clone(),
454 regions: self.regions.clone(),
455 plan: self.plan.clone(),
456 schema: self.schema.clone(),
457 arrow_schema: self.arrow_schema.clone(),
458 region_query_handler: self.region_query_handler.clone(),
459 metric: self.metric.clone(),
460 properties: PlanProperties::new(
461 self.properties.eq_properties.clone(),
462 Partitioning::Hash(overlaps, self.target_partition),
463 self.properties.emission_type,
464 self.properties.boundedness,
465 ),
466 sub_stage_metrics: self.sub_stage_metrics.clone(),
467 partition_metrics: self.partition_metrics.clone(),
468 query_ctx: self.query_ctx.clone(),
469 target_partition: self.target_partition,
470 partition_cols: self.partition_cols.clone(),
471 })
472 }
473
474 fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
475 let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
476 Ok(Arc::new(schema))
477 }
478
479 pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
480 self.sub_stage_metrics
481 .lock()
482 .unwrap()
483 .values()
484 .cloned()
485 .collect()
486 }
487
488 pub fn partition_count(&self) -> usize {
489 self.target_partition
490 }
491
492 pub fn region_count(&self) -> usize {
493 self.regions.len()
494 }
495
496 fn partition_metrics(&self) -> Vec<PartitionMetrics> {
497 self.partition_metrics
498 .lock()
499 .unwrap()
500 .values()
501 .cloned()
502 .collect()
503 }
504}
505
506#[derive(Debug, Clone)]
508struct RegionMetrics {
509 region_id: RegionId,
510 poll_duration: Duration,
511 do_get_cost: Duration,
512 total_cost: Duration,
514}
515
516#[derive(Debug, Clone)]
518struct PartitionMetrics {
519 partition: usize,
520 region_metrics: Vec<RegionMetrics>,
521 total_poll_duration: Duration,
522 total_do_get_cost: Duration,
523 total_regions: usize,
524 explain_verbose: bool,
525 finished: bool,
526}
527
528impl PartitionMetrics {
529 fn new(partition: usize, explain_verbose: bool) -> Self {
530 Self {
531 partition,
532 region_metrics: Vec::new(),
533 total_poll_duration: Duration::ZERO,
534 total_do_get_cost: Duration::ZERO,
535 total_regions: 0,
536 explain_verbose,
537 finished: false,
538 }
539 }
540
541 fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
542 self.total_poll_duration += region_metrics.poll_duration;
543 self.total_do_get_cost += region_metrics.do_get_cost;
544 self.total_regions += 1;
545 self.region_metrics.push(region_metrics);
546 }
547
548 fn finish(&mut self) {
550 if self.finished {
551 return;
552 }
553 self.finished = true;
554 self.log_metrics();
555 }
556
557 fn log_metrics(&self) {
559 if self.explain_verbose {
560 common_telemetry::info!(
561 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
562 self.partition,
563 self.total_regions,
564 self.total_poll_duration,
565 self.total_do_get_cost
566 );
567 } else {
568 common_telemetry::debug!(
569 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
570 self.partition,
571 self.total_regions,
572 self.total_poll_duration,
573 self.total_do_get_cost
574 );
575 }
576 }
577}
578
579impl Drop for PartitionMetrics {
580 fn drop(&mut self) {
581 if !self.finished {
582 self.log_metrics();
583 }
584 }
585}
586
587impl ExecutionPlan for MergeScanExec {
588 fn as_any(&self) -> &dyn Any {
589 self
590 }
591
592 fn schema(&self) -> ArrowSchemaRef {
593 self.arrow_schema.clone()
594 }
595
596 fn properties(&self) -> &PlanProperties {
597 &self.properties
598 }
599
600 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
601 vec![]
602 }
603
604 fn with_new_children(
607 self: Arc<Self>,
608 _children: Vec<Arc<dyn ExecutionPlan>>,
609 ) -> Result<Arc<dyn ExecutionPlan>> {
610 Ok(self.clone())
611 }
612
613 fn execute(
614 &self,
615 partition: usize,
616 context: Arc<TaskContext>,
617 ) -> Result<DfSendableRecordBatchStream> {
618 Ok(Box::pin(DfRecordBatchStreamAdapter::new(
619 self.to_stream(context, partition)?,
620 )))
621 }
622
623 fn metrics(&self) -> Option<MetricsSet> {
624 Some(self.metric.clone_inner())
625 }
626
627 fn name(&self) -> &str {
628 "MergeScanExec"
629 }
630}
631
632impl DisplayAs for MergeScanExec {
633 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
634 write!(f, "MergeScanExec: peers=[")?;
635 for region_id in self.regions.iter() {
636 write!(f, "{}, ", region_id)?;
637 }
638 write!(f, "]")?;
639
640 if matches!(t, DisplayFormatType::Verbose) {
641 let partition_metrics = self.partition_metrics();
642 if !partition_metrics.is_empty() {
643 write!(f, ", metrics={{")?;
644 for (i, pm) in partition_metrics.iter().enumerate() {
645 if i > 0 {
646 write!(f, ", ")?;
647 }
648 write!(
649 f,
650 "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[",
651 pm.partition,
652 pm.total_regions,
653 pm.total_poll_duration,
654 pm.total_do_get_cost
655 )?;
656 for (j, rm) in pm.region_metrics.iter().enumerate() {
657 if j > 0 {
658 write!(f, ",")?;
659 }
660 write!(
661 f,
662 "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
663 rm.region_id, rm.poll_duration, rm.do_get_cost, rm.total_cost
664 )?;
665 }
666 write!(f, "]}}")?;
667 }
668 write!(f, "}}")?;
669 }
670 }
671
672 Ok(())
673 }
674}
675
676#[derive(Debug, Clone)]
677struct MergeScanMetric {
678 ready_time: Time,
680 first_consume_time: Time,
682 finish_time: Time,
684 output_rows: Count,
686
687 greptime_exec_cost: Gauge,
689}
690
691impl MergeScanMetric {
692 pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
693 Self {
694 ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
695 first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
696 finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
697 output_rows: MetricBuilder::new(metric).output_rows(1),
698 greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
699 }
700 }
701
702 pub fn ready_time(&self) -> &Time {
703 &self.ready_time
704 }
705
706 pub fn first_consume_time(&self) -> &Time {
707 &self.first_consume_time
708 }
709
710 pub fn finish_time(&self) -> &Time {
711 &self.finish_time
712 }
713
714 pub fn record_output_batch_rows(&self, num_rows: usize) {
715 self.output_rows.add(num_rows);
716 }
717
718 pub fn record_greptime_exec_cost(&self, metrics: usize) {
719 self.greptime_exec_cost.add(metrics);
720 }
721}