1use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::{HashMap, HashSet};
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::BoxedError;
24use common_plugins::GREPTIME_EXEC_READ_COST;
25use common_query::request::QueryRequest;
26use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
27use common_recordbatch::error::ExternalSnafu;
28use common_recordbatch::{
29 DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
30};
31use common_telemetry::tracing_context::TracingContext;
32use datafusion::execution::{SessionState, TaskContext};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{
35 Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
36};
37use datafusion::physical_plan::{
38 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
39};
40use datafusion_common::{Column as ColumnExpr, Result};
41use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::{
44 Distribution, EquivalenceProperties, LexOrdering, PhysicalSortExpr,
45};
46use datatypes::schema::{Schema, SchemaRef};
47use futures_util::StreamExt;
48use greptime_proto::v1::region::RegionRequestHeader;
49use meter_core::data::ReadItem;
50use meter_macros::read_meter;
51use session::context::QueryContextRef;
52use snafu::ResultExt;
53use store_api::storage::RegionId;
54use table::table_name::TableName;
55use tokio::time::Instant;
56
57use crate::error::ConvertSchemaSnafu;
58use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
59use crate::region_query::RegionQueryHandlerRef;
60
61#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
62pub struct MergeScanLogicalPlan {
63 input: LogicalPlan,
65 is_placeholder: bool,
67 partition_cols: Vec<String>,
68}
69
70impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
71 fn name(&self) -> &str {
72 Self::name()
73 }
74
75 fn inputs(&self) -> Vec<&LogicalPlan> {
78 vec![]
79 }
80
81 fn schema(&self) -> &datafusion_common::DFSchemaRef {
82 self.input.schema()
83 }
84
85 fn expressions(&self) -> Vec<datafusion_expr::Expr> {
87 vec![]
88 }
89
90 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
91 write!(
92 f,
93 "MergeScan [is_placeholder={}, remote_input=[\n{}\n]]",
94 self.is_placeholder, self.input
95 )
96 }
97
98 fn with_exprs_and_inputs(
99 &self,
100 _exprs: Vec<datafusion::prelude::Expr>,
101 _inputs: Vec<LogicalPlan>,
102 ) -> Result<Self> {
103 Ok(self.clone())
104 }
105}
106
107impl MergeScanLogicalPlan {
108 pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: Vec<String>) -> Self {
109 Self {
110 input,
111 is_placeholder,
112 partition_cols,
113 }
114 }
115
116 pub fn name() -> &'static str {
117 "MergeScan"
118 }
119
120 pub fn into_logical_plan(self) -> LogicalPlan {
122 LogicalPlan::Extension(Extension {
123 node: Arc::new(self),
124 })
125 }
126
127 pub fn is_placeholder(&self) -> bool {
128 self.is_placeholder
129 }
130
131 pub fn input(&self) -> &LogicalPlan {
132 &self.input
133 }
134
135 pub fn partition_cols(&self) -> &[String] {
136 &self.partition_cols
137 }
138}
139
140pub struct MergeScanExec {
141 table: TableName,
142 regions: Vec<RegionId>,
143 plan: LogicalPlan,
144 schema: SchemaRef,
145 arrow_schema: ArrowSchemaRef,
146 region_query_handler: RegionQueryHandlerRef,
147 metric: ExecutionPlanMetricsSet,
148 properties: PlanProperties,
149 sub_stage_metrics: Arc<Mutex<HashMap<RegionId, RecordBatchMetrics>>>,
151 query_ctx: QueryContextRef,
152 target_partition: usize,
153 partition_cols: Vec<String>,
154}
155
156impl std::fmt::Debug for MergeScanExec {
157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
158 f.debug_struct("MergeScanExec")
159 .field("table", &self.table)
160 .field("regions", &self.regions)
161 .field("schema", &self.schema)
162 .field("plan", &self.plan)
163 .finish()
164 }
165}
166
167impl MergeScanExec {
168 #[allow(clippy::too_many_arguments)]
169 pub fn new(
170 session_state: &SessionState,
171 table: TableName,
172 regions: Vec<RegionId>,
173 plan: LogicalPlan,
174 arrow_schema: &ArrowSchema,
175 region_query_handler: RegionQueryHandlerRef,
176 query_ctx: QueryContextRef,
177 target_partition: usize,
178 partition_cols: Vec<String>,
179 ) -> Result<Self> {
180 let arrow_schema = Arc::new(arrow_schema.clone());
184
185 let eq_properties = if let LogicalPlan::Sort(sort) = &plan
193 && target_partition >= regions.len()
194 {
195 let lex_ordering = sort
196 .expr
197 .iter()
198 .map(|sort_expr| {
199 let physical_expr = session_state
200 .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
201 Ok(PhysicalSortExpr::new(
202 physical_expr,
203 SortOptions {
204 descending: !sort_expr.asc,
205 nulls_first: sort_expr.nulls_first,
206 },
207 ))
208 })
209 .collect::<Result<Vec<_>>>()?;
210 EquivalenceProperties::new_with_orderings(
211 arrow_schema.clone(),
212 &[LexOrdering::new(lex_ordering)],
213 )
214 } else {
215 EquivalenceProperties::new(arrow_schema.clone())
216 };
217
218 let partition_exprs = partition_cols
219 .iter()
220 .filter_map(|col| {
221 session_state
222 .create_physical_expr(
223 Expr::Column(ColumnExpr::new_unqualified(col)),
224 plan.schema(),
225 )
226 .ok()
227 })
228 .collect();
229 let partitioning = Partitioning::Hash(partition_exprs, target_partition);
230
231 let properties = PlanProperties::new(
232 eq_properties,
233 partitioning,
234 EmissionType::Incremental,
235 Boundedness::Bounded,
236 );
237 let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;
238 Ok(Self {
239 table,
240 regions,
241 plan,
242 schema,
243 arrow_schema,
244 region_query_handler,
245 metric: ExecutionPlanMetricsSet::new(),
246 sub_stage_metrics: Arc::default(),
247 properties,
248 query_ctx,
249 target_partition,
250 partition_cols,
251 })
252 }
253
254 pub fn to_stream(
255 &self,
256 context: Arc<TaskContext>,
257 partition: usize,
258 ) -> Result<SendableRecordBatchStream> {
259 let regions = self.regions.clone();
261 let region_query_handler = self.region_query_handler.clone();
262 let metric = MergeScanMetric::new(&self.metric);
263 let schema = self.schema.clone();
264 let query_ctx = self.query_ctx.clone();
265 let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
266 let plan = self.plan.clone();
267 let target_partition = self.target_partition;
268 let dbname = context.task_id().unwrap_or_default();
269 let tracing_context = TracingContext::from_json(context.session_id().as_str());
270 let current_channel = self.query_ctx.channel();
271 let read_preference = self.query_ctx.read_preference();
272
273 let stream = Box::pin(stream!({
274 if partition == 0 {
276 MERGE_SCAN_REGIONS.observe(regions.len() as f64);
277 }
278
279 let _finish_timer = metric.finish_time().timer();
280 let mut ready_timer = metric.ready_time().timer();
281 let mut first_consume_timer = Some(metric.first_consume_time().timer());
282
283 for region_id in regions
284 .iter()
285 .skip(partition)
286 .step_by(target_partition)
287 .copied()
288 {
289 let request = QueryRequest {
290 header: Some(RegionRequestHeader {
291 tracing_context: tracing_context.to_w3c(),
292 dbname: dbname.clone(),
293 query_context: Some(query_ctx.as_ref().into()),
294 }),
295 region_id,
296 plan: plan.clone(),
297 };
298 let do_get_start = Instant::now();
299 let mut stream = region_query_handler
300 .do_get(read_preference, request)
301 .await
302 .map_err(|e| {
303 MERGE_SCAN_ERRORS_TOTAL.inc();
304 BoxedError::new(e)
305 })
306 .context(ExternalSnafu)?;
307 let do_get_cost = do_get_start.elapsed();
308
309 ready_timer.stop();
310
311 let mut poll_duration = Duration::ZERO;
312 let mut poll_timer = Instant::now();
313 while let Some(batch) = stream.next().await {
314 let poll_elapsed = poll_timer.elapsed();
315 poll_duration += poll_elapsed;
316
317 let batch = batch?;
318 let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
321 metric.record_output_batch_rows(batch.num_rows());
322 if let Some(mut first_consume_timer) = first_consume_timer.take() {
323 first_consume_timer.stop();
324 }
325
326 if let Some(metrics) = stream.metrics() {
327 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
328 sub_stage_metrics.insert(region_id, metrics);
329 }
330
331 yield Ok(batch);
332 poll_timer = Instant::now();
334 }
335 common_telemetry::debug!(
336 "Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
337 partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
338 );
339
340 if let Some(metrics) = stream.metrics() {
342 let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
343 let value = read_meter!(
344 c,
345 s,
346 ReadItem {
347 cpu_time: metrics.elapsed_compute as u64,
348 table_scan: metrics.memory_usage as u64
349 },
350 current_channel as u8
351 );
352 metric.record_greptime_exec_cost(value as usize);
353
354 let mut sub_stage_metrics = sub_stage_metrics_moved.lock().unwrap();
356 sub_stage_metrics.insert(region_id, metrics);
357 }
358
359 MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
360 }
361 }));
362
363 Ok(Box::pin(RecordBatchStreamWrapper {
364 schema: self.schema.clone(),
365 stream,
366 output_ordering: None,
367 metrics: Default::default(),
368 }))
369 }
370
371 pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
372 let Distribution::HashPartitioned(hash_exprs) = distribution else {
373 return None;
375 };
376
377 if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
378 && curr_dist == &hash_exprs
379 {
380 return None;
382 }
383
384 let mut hash_cols = HashSet::default();
385 for expr in &hash_exprs {
386 if let Some(col_expr) = expr.as_any().downcast_ref::<Column>() {
387 hash_cols.insert(col_expr.name());
388 }
389 }
390 for col in &self.partition_cols {
391 if !hash_cols.contains(col.as_str()) {
392 return None;
394 }
395 }
396
397 Some(Self {
398 table: self.table.clone(),
399 regions: self.regions.clone(),
400 plan: self.plan.clone(),
401 schema: self.schema.clone(),
402 arrow_schema: self.arrow_schema.clone(),
403 region_query_handler: self.region_query_handler.clone(),
404 metric: self.metric.clone(),
405 properties: PlanProperties::new(
406 self.properties.eq_properties.clone(),
407 Partitioning::Hash(hash_exprs, self.target_partition),
408 self.properties.emission_type,
409 self.properties.boundedness,
410 ),
411 sub_stage_metrics: self.sub_stage_metrics.clone(),
412 query_ctx: self.query_ctx.clone(),
413 target_partition: self.target_partition,
414 partition_cols: self.partition_cols.clone(),
415 })
416 }
417
418 fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
419 let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
420 Ok(Arc::new(schema))
421 }
422
423 pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
424 self.sub_stage_metrics
425 .lock()
426 .unwrap()
427 .values()
428 .cloned()
429 .collect()
430 }
431
432 pub fn partition_count(&self) -> usize {
433 self.target_partition
434 }
435
436 pub fn region_count(&self) -> usize {
437 self.regions.len()
438 }
439}
440
441impl ExecutionPlan for MergeScanExec {
442 fn as_any(&self) -> &dyn Any {
443 self
444 }
445
446 fn schema(&self) -> ArrowSchemaRef {
447 self.arrow_schema.clone()
448 }
449
450 fn properties(&self) -> &PlanProperties {
451 &self.properties
452 }
453
454 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
455 vec![]
456 }
457
458 fn with_new_children(
461 self: Arc<Self>,
462 _children: Vec<Arc<dyn ExecutionPlan>>,
463 ) -> Result<Arc<dyn ExecutionPlan>> {
464 Ok(self.clone())
465 }
466
467 fn execute(
468 &self,
469 partition: usize,
470 context: Arc<TaskContext>,
471 ) -> Result<DfSendableRecordBatchStream> {
472 Ok(Box::pin(DfRecordBatchStreamAdapter::new(
473 self.to_stream(context, partition)?,
474 )))
475 }
476
477 fn metrics(&self) -> Option<MetricsSet> {
478 Some(self.metric.clone_inner())
479 }
480
481 fn name(&self) -> &str {
482 "MergeScanExec"
483 }
484}
485
486impl DisplayAs for MergeScanExec {
487 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
488 write!(f, "MergeScanExec: peers=[")?;
489 for region_id in self.regions.iter() {
490 write!(f, "{}, ", region_id)?;
491 }
492 write!(f, "]")
493 }
494}
495
496#[derive(Debug, Clone)]
497struct MergeScanMetric {
498 ready_time: Time,
500 first_consume_time: Time,
502 finish_time: Time,
504 output_rows: Count,
506
507 greptime_exec_cost: Gauge,
509}
510
511impl MergeScanMetric {
512 pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
513 Self {
514 ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
515 first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
516 finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
517 output_rows: MetricBuilder::new(metric).output_rows(1),
518 greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
519 }
520 }
521
522 pub fn ready_time(&self) -> &Time {
523 &self.ready_time
524 }
525
526 pub fn first_consume_time(&self) -> &Time {
527 &self.first_consume_time
528 }
529
530 pub fn finish_time(&self) -> &Time {
531 &self.finish_time
532 }
533
534 pub fn record_output_batch_rows(&self, num_rows: usize) {
535 self.output_rows.add(num_rows);
536 }
537
538 pub fn record_greptime_exec_cost(&self, metrics: usize) {
539 self.greptime_exec_cost.add(metrics);
540 }
541}