1use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::BoxedError;
24use common_plugins::GREPTIME_EXEC_READ_COST;
25use common_query::request::QueryRequest;
26use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
27use common_recordbatch::error::ExternalSnafu;
28use common_recordbatch::{
29 DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
30};
31use common_telemetry::tracing_context::TracingContext;
32use datafusion::execution::{SessionState, TaskContext};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{
35 Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
36};
37use datafusion::physical_plan::{
38 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
39};
40use datafusion_common::{Column as ColumnExpr, Result};
41use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::{Distribution, EquivalenceProperties, PhysicalSortExpr};
44use datatypes::schema::{Schema, SchemaRef};
45use futures_util::StreamExt;
46use greptime_proto::v1::region::RegionRequestHeader;
47use meter_core::data::ReadItem;
48use meter_macros::read_meter;
49use session::context::QueryContextRef;
50use snafu::ResultExt;
51use store_api::storage::RegionId;
52use table::table_name::TableName;
53use tokio::time::Instant;
54
55use crate::error::ConvertSchemaSnafu;
56use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
57use crate::region_query::RegionQueryHandlerRef;
58
59#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
60pub struct MergeScanLogicalPlan {
61 input: LogicalPlan,
63 is_placeholder: bool,
65 partition_cols: Vec<String>,
66}
67
68impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
69 fn name(&self) -> &str {
70 Self::name()
71 }
72
73 fn inputs(&self) -> Vec<&LogicalPlan> {
76 vec![]
77 }
78
79 fn schema(&self) -> &datafusion_common::DFSchemaRef {
80 self.input.schema()
81 }
82
83 fn expressions(&self) -> Vec<datafusion_expr::Expr> {
85 vec![]
86 }
87
88 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
89 write!(
90 f,
91 "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
92 self.is_placeholder, self.input
93 )
94 }
95
96 fn with_exprs_and_inputs(
97 &self,
98 _exprs: Vec<datafusion::prelude::Expr>,
99 _inputs: Vec<LogicalPlan>,
100 ) -> Result<Self> {
101 Ok(self.clone())
102 }
103}
104
105impl MergeScanLogicalPlan {
106 pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: Vec<String>) -> Self {
107 Self {
108 input,
109 is_placeholder,
110 partition_cols,
111 }
112 }
113
114 pub fn name() -> &'static str {
115 "MergeScan"
116 }
117
118 pub fn into_logical_plan(self) -> LogicalPlan {
120 LogicalPlan::Extension(Extension {
121 node: Arc::new(self),
122 })
123 }
124
125 pub fn is_placeholder(&self) -> bool {
126 self.is_placeholder
127 }
128
129 pub fn input(&self) -> &LogicalPlan {
130 &self.input
131 }
132
133 pub fn partition_cols(&self) -> &[String] {
134 &self.partition_cols
135 }
136}
137
138pub struct MergeScanExec {
139 table: TableName,
140 regions: Vec<RegionId>,
141 plan: LogicalPlan,
142 schema: SchemaRef,
143 arrow_schema: ArrowSchemaRef,
144 region_query_handler: RegionQueryHandlerRef,
145 metric: ExecutionPlanMetricsSet,
146 properties: PlanProperties,
147 sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
149 partition_metrics: Arc<Mutex<HashMap<usize, PartitionMetrics>>>,
151 query_ctx: QueryContextRef,
152 target_partition: usize,
153 partition_cols: Vec<String>,
154}
155
156impl std::fmt::Debug for MergeScanExec {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("MergeScanExec")
159 .field("table", &self.table)
160 .field("regions", &self.regions)
161 .field("schema", &self.schema)
162 .field("plan", &self.plan)
163 .finish()
164 }
165}
166
167impl MergeScanExec {
168 #[allow(clippy::too_many_arguments)]
169 pub fn new(
170 session_state: &SessionState,
171 table: TableName,
172 regions: Vec<RegionId>,
173 plan: LogicalPlan,
174 arrow_schema: &ArrowSchema,
175 region_query_handler: RegionQueryHandlerRef,
176 query_ctx: QueryContextRef,
177 target_partition: usize,
178 partition_cols: Vec<String>,
179 ) -> Result<Self> {
180 let arrow_schema = Arc::new(arrow_schema.clone());
184
185 let eq_properties = if let LogicalPlan::Sort(sort) = &plan
193 && target_partition >= regions.len()
194 {
195 let lex_ordering = sort
196 .expr
197 .iter()
198 .map(|sort_expr| {
199 let physical_expr = session_state
200 .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
201 Ok(PhysicalSortExpr::new(
202 physical_expr,
203 SortOptions {
204 descending: !sort_expr.asc,
205 nulls_first: sort_expr.nulls_first,
206 },
207 ))
208 })
209 .collect::<Result<Vec<_>>>()?;
210 EquivalenceProperties::new_with_orderings(arrow_schema.clone(), vec![lex_ordering])
211 } else {
212 EquivalenceProperties::new(arrow_schema.clone())
213 };
214
215 let partition_exprs = partition_cols
216 .iter()
217 .filter_map(|col| {
218 session_state
219 .create_physical_expr(
220 Expr::Column(ColumnExpr::new_unqualified(col)),
221 plan.schema(),
222 )
223 .ok()
224 })
225 .collect();
226 let partitioning = Partitioning::Hash(partition_exprs, target_partition);
227
228 let properties = PlanProperties::new(
229 eq_properties,
230 partitioning,
231 EmissionType::Incremental,
232 Boundedness::Bounded,
233 );
234 let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;
235 Ok(Self {
236 table,
237 regions,
238 plan,
239 schema,
240 arrow_schema,
241 region_query_handler,
242 metric: ExecutionPlanMetricsSet::new(),
243 sub_stage_metrics: Arc::default(),
244 partition_metrics: Arc::default(),
245 properties,
246 query_ctx,
247 target_partition,
248 partition_cols,
249 })
250 }
251
252 pub fn to_stream(
253 &self,
254 context: Arc<TaskContext>,
255 partition: usize,
256 ) -> Result<SendableRecordBatchStream> {
257 let regions = self.regions.clone();
259 let region_query_handler = self.region_query_handler.clone();
260 let metric = MergeScanMetric::new(&self.metric);
261 let schema = self.schema.clone();
262 let query_ctx = self.query_ctx.clone();
263 let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
264 let partition_metrics_moved = self.partition_metrics.clone();
265 let plan = self.plan.clone();
266 let target_partition = self.target_partition;
267 let dbname = context.task_id().unwrap_or_default();
268 let tracing_context = TracingContext::from_json(context.session_id().as_str());
269 let current_channel = self.query_ctx.channel();
270 let read_preference = self.query_ctx.read_preference();
271 let explain_verbose = self.query_ctx.explain_verbose();
272
273 let stream = Box::pin(stream!({
274 if partition == 0 {
276 MERGE_SCAN_REGIONS.observe(regions.len() as f64);
277 }
278
279 let _finish_timer = metric.finish_time().timer();
280 let mut ready_timer = metric.ready_time().timer();
281 let mut first_consume_timer = Some(metric.first_consume_time().timer());
282
283 for region_id in regions
284 .iter()
285 .skip(partition)
286 .step_by(target_partition)
287 .copied()
288 {
289 let request = QueryRequest {
290 header: Some(RegionRequestHeader {
291 tracing_context: tracing_context.to_w3c(),
292 dbname: dbname.clone(),
293 query_context: Some(query_ctx.as_ref().into()),
294 }),
295 region_id,
296 plan: plan.clone(),
297 };
298 let region_start = Instant::now();
299 let do_get_start = Instant::now();
300
301 if explain_verbose {
302 common_telemetry::info!(
303 "Merge scan one region, partition: {}, region_id: {}",
304 partition,
305 region_id
306 );
307 }
308
309 let mut stream = region_query_handler
310 .do_get(read_preference, request)
311 .await
312 .map_err(|e| {
313 MERGE_SCAN_ERRORS_TOTAL.inc();
314 BoxedError::new(e)
315 })
316 .context(ExternalSnafu)?;
317 let do_get_cost = do_get_start.elapsed();
318
319 ready_timer.stop();
320
321 let mut poll_duration = Duration::ZERO;
322 let mut poll_timer = Instant::now();
323 while let Some(batch) = stream.next().await {
324 let poll_elapsed = poll_timer.elapsed();
325 poll_duration += poll_elapsed;
326
327 let batch = batch?;
328 let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
331 metric.record_output_batch_rows(batch.num_rows());
332 if let Some(mut first_consume_timer) = first_consume_timer.take() {
333 first_consume_timer.stop();
334 }
335
336 if let Some(metrics) = stream.metrics() {
337 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
338 sub_stage_metrics.insert(region_id, metrics);
339 }
340
341 yield Ok(batch);
342 poll_timer = Instant::now();
344 }
345 let total_cost = region_start.elapsed();
346
347 let region_metrics = RegionMetrics {
349 region_id,
350 poll_duration,
351 do_get_cost,
352 total_cost,
353 };
354
355 {
357 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
358 let partition_metrics = partition_metrics_guard
359 .entry(partition)
360 .or_insert_with(|| PartitionMetrics::new(partition, explain_verbose));
361 partition_metrics.add_region_metrics(region_metrics);
362 }
363
364 if explain_verbose {
365 common_telemetry::info!(
366 "Merge scan finish one region, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
367 partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
368 );
369 }
370
371 if let Some(metrics) = stream.metrics() {
373 let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
374 let value = read_meter!(
375 c,
376 s,
377 ReadItem {
378 cpu_time: metrics.elapsed_compute as u64,
379 table_scan: metrics.memory_usage as u64
380 },
381 current_channel as u8
382 );
383 metric.record_greptime_exec_cost(value as usize);
384
385 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
387 sub_stage_metrics.insert(region_id, metrics);
388 }
389
390 MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
391 }
392
393 {
395 let mut partition_metrics_guard = partition_metrics_moved.lock().unwrap();
396 if let Some(partition_metrics) = partition_metrics_guard.get_mut(&partition) {
397 partition_metrics.finish();
398 }
399 }
400 }));
401
402 Ok(Box::pin(RecordBatchStreamWrapper {
403 schema: self.schema.clone(),
404 stream,
405 output_ordering: None,
406 metrics: Default::default(),
407 }))
408 }
409
410 pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
411 let Distribution::HashPartitioned(hash_exprs) = distribution else {
412 return None;
414 };
415
416 if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
417 && curr_dist == &hash_exprs
418 {
419 return None;
421 }
422
423 let mut hash_cols = HashSet::default();
424 for expr in &hash_exprs {
425 if let Some(col_expr) = expr.as_any().downcast_ref::<Column>() {
426 hash_cols.insert(col_expr.name());
427 }
428 }
429 for col in &self.partition_cols {
430 if !hash_cols.contains(col.as_str()) {
431 return None;
433 }
434 }
435
436 Some(Self {
437 table: self.table.clone(),
438 regions: self.regions.clone(),
439 plan: self.plan.clone(),
440 schema: self.schema.clone(),
441 arrow_schema: self.arrow_schema.clone(),
442 region_query_handler: self.region_query_handler.clone(),
443 metric: self.metric.clone(),
444 properties: PlanProperties::new(
445 self.properties.eq_properties.clone(),
446 Partitioning::Hash(hash_exprs, self.target_partition),
447 self.properties.emission_type,
448 self.properties.boundedness,
449 ),
450 sub_stage_metrics: self.sub_stage_metrics.clone(),
451 partition_metrics: self.partition_metrics.clone(),
452 query_ctx: self.query_ctx.clone(),
453 target_partition: self.target_partition,
454 partition_cols: self.partition_cols.clone(),
455 })
456 }
457
458 fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
459 let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
460 Ok(Arc::new(schema))
461 }
462
463 pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
464 self.sub_stage_metrics
465 .lock()
466 .unwrap()
467 .values()
468 .cloned()
469 .collect()
470 }
471
472 pub fn partition_count(&self) -> usize {
473 self.target_partition
474 }
475
476 pub fn region_count(&self) -> usize {
477 self.regions.len()
478 }
479
480 fn partition_metrics(&self) -> Vec<PartitionMetrics> {
481 self.partition_metrics
482 .lock()
483 .unwrap()
484 .values()
485 .cloned()
486 .collect()
487 }
488}
489
490#[derive(Debug, Clone)]
492struct RegionMetrics {
493 region_id: RegionId,
494 poll_duration: Duration,
495 do_get_cost: Duration,
496 total_cost: Duration,
498}
499
500#[derive(Debug, Clone)]
502struct PartitionMetrics {
503 partition: usize,
504 region_metrics: Vec<RegionMetrics>,
505 total_poll_duration: Duration,
506 total_do_get_cost: Duration,
507 total_regions: usize,
508 explain_verbose: bool,
509 finished: bool,
510}
511
512impl PartitionMetrics {
513 fn new(partition: usize, explain_verbose: bool) -> Self {
514 Self {
515 partition,
516 region_metrics: Vec::new(),
517 total_poll_duration: Duration::ZERO,
518 total_do_get_cost: Duration::ZERO,
519 total_regions: 0,
520 explain_verbose,
521 finished: false,
522 }
523 }
524
525 fn add_region_metrics(&mut self, region_metrics: RegionMetrics) {
526 self.total_poll_duration += region_metrics.poll_duration;
527 self.total_do_get_cost += region_metrics.do_get_cost;
528 self.total_regions += 1;
529 self.region_metrics.push(region_metrics);
530 }
531
532 fn finish(&mut self) {
534 if self.finished {
535 return;
536 }
537 self.finished = true;
538 self.log_metrics();
539 }
540
541 fn log_metrics(&self) {
543 if self.explain_verbose {
544 common_telemetry::info!(
545 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
546 self.partition, self.total_regions, self.total_poll_duration, self.total_do_get_cost
547 );
548 } else {
549 common_telemetry::debug!(
550 "MergeScan partition {} finished: {} regions, total_poll_duration: {:?}, total_do_get_cost: {:?}",
551 self.partition, self.total_regions, self.total_poll_duration, self.total_do_get_cost
552 );
553 }
554 }
555}
556
557impl Drop for PartitionMetrics {
558 fn drop(&mut self) {
559 if !self.finished {
560 self.log_metrics();
561 }
562 }
563}
564
565impl ExecutionPlan for MergeScanExec {
566 fn as_any(&self) -> &dyn Any {
567 self
568 }
569
570 fn schema(&self) -> ArrowSchemaRef {
571 self.arrow_schema.clone()
572 }
573
574 fn properties(&self) -> &PlanProperties {
575 &self.properties
576 }
577
578 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
579 vec![]
580 }
581
582 fn with_new_children(
585 self: Arc<Self>,
586 _children: Vec<Arc<dyn ExecutionPlan>>,
587 ) -> Result<Arc<dyn ExecutionPlan>> {
588 Ok(self.clone())
589 }
590
591 fn execute(
592 &self,
593 partition: usize,
594 context: Arc<TaskContext>,
595 ) -> Result<DfSendableRecordBatchStream> {
596 Ok(Box::pin(DfRecordBatchStreamAdapter::new(
597 self.to_stream(context, partition)?,
598 )))
599 }
600
601 fn metrics(&self) -> Option<MetricsSet> {
602 Some(self.metric.clone_inner())
603 }
604
605 fn name(&self) -> &str {
606 "MergeScanExec"
607 }
608}
609
610impl DisplayAs for MergeScanExec {
611 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
612 write!(f, "MergeScanExec: peers=[")?;
613 for region_id in self.regions.iter() {
614 write!(f, "{}, ", region_id)?;
615 }
616 write!(f, "]")?;
617
618 if matches!(t, DisplayFormatType::Verbose) {
619 let partition_metrics = self.partition_metrics();
620 if !partition_metrics.is_empty() {
621 write!(f, ", metrics={{")?;
622 for (i, pm) in partition_metrics.iter().enumerate() {
623 if i > 0 {
624 write!(f, ", ")?;
625 }
626 write!(f, "\"partition_{}\":{{\"regions\":{},\"total_poll_duration\":\"{:?}\",\"total_do_get_cost\":\"{:?}\",\"region_metrics\":[",
627 pm.partition, pm.total_regions,
628 pm.total_poll_duration,
629 pm.total_do_get_cost)?;
630 for (j, rm) in pm.region_metrics.iter().enumerate() {
631 if j > 0 {
632 write!(f, ",")?;
633 }
634 write!(f, "{{\"region_id\":\"{}\",\"poll_duration\":\"{:?}\",\"do_get_cost\":\"{:?}\",\"total_cost\":\"{:?}\"}}",
635 rm.region_id,
636 rm.poll_duration,
637 rm.do_get_cost,
638 rm.total_cost)?;
639 }
640 write!(f, "]}}")?;
641 }
642 write!(f, "}}")?;
643 }
644 }
645
646 Ok(())
647 }
648}
649
650#[derive(Debug, Clone)]
651struct MergeScanMetric {
652 ready_time: Time,
654 first_consume_time: Time,
656 finish_time: Time,
658 output_rows: Count,
660
661 greptime_exec_cost: Gauge,
663}
664
665impl MergeScanMetric {
666 pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
667 Self {
668 ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
669 first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
670 finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
671 output_rows: MetricBuilder::new(metric).output_rows(1),
672 greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
673 }
674 }
675
676 pub fn ready_time(&self) -> &Time {
677 &self.ready_time
678 }
679
680 pub fn first_consume_time(&self) -> &Time {
681 &self.first_consume_time
682 }
683
684 pub fn finish_time(&self) -> &Time {
685 &self.finish_time
686 }
687
688 pub fn record_output_batch_rows(&self, num_rows: usize) {
689 self.output_rows.add(num_rows);
690 }
691
692 pub fn record_greptime_exec_cost(&self, metrics: usize) {
693 self.greptime_exec_cost.add(metrics);
694 }
695}