1use std::any::Any;
16use std::sync::{Arc, Mutex};
17use std::time::Duration;
18
19use ahash::HashSet;
20use arrow_schema::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef, SortOptions};
21use async_stream::stream;
22use common_catalog::parse_catalog_and_schema_from_db_string;
23use common_error::ext::BoxedError;
24use common_plugins::GREPTIME_EXEC_READ_COST;
25use common_query::request::QueryRequest;
26use common_recordbatch::adapter::{DfRecordBatchStreamAdapter, RecordBatchMetrics};
27use common_recordbatch::error::ExternalSnafu;
28use common_recordbatch::{
29 DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
30};
31use common_telemetry::tracing_context::TracingContext;
32use datafusion::execution::{SessionState, TaskContext};
33use datafusion::physical_plan::execution_plan::{Boundedness, EmissionType};
34use datafusion::physical_plan::metrics::{
35 Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, MetricsSet, Time,
36};
37use datafusion::physical_plan::{
38 DisplayAs, DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties,
39};
40use datafusion_common::{Column as ColumnExpr, Result};
41use datafusion_expr::{Expr, Extension, LogicalPlan, UserDefinedLogicalNodeCore};
42use datafusion_physical_expr::expressions::Column;
43use datafusion_physical_expr::{
44 Distribution, EquivalenceProperties, LexOrdering, PhysicalSortExpr,
45};
46use datatypes::schema::{Schema, SchemaRef};
47use futures_util::StreamExt;
48use greptime_proto::v1::region::RegionRequestHeader;
49use meter_core::data::ReadItem;
50use meter_macros::read_meter;
51use session::context::QueryContextRef;
52use snafu::ResultExt;
53use store_api::storage::RegionId;
54use table::table_name::TableName;
55use tokio::time::Instant;
56
57use crate::error::ConvertSchemaSnafu;
58use crate::metrics::{MERGE_SCAN_ERRORS_TOTAL, MERGE_SCAN_POLL_ELAPSED, MERGE_SCAN_REGIONS};
59use crate::region_query::RegionQueryHandlerRef;
60
61#[derive(Debug, Hash, PartialOrd, PartialEq, Eq, Clone)]
62pub struct MergeScanLogicalPlan {
63 input: LogicalPlan,
65 is_placeholder: bool,
67 partition_cols: Vec<String>,
68}
69
70impl UserDefinedLogicalNodeCore for MergeScanLogicalPlan {
71 fn name(&self) -> &str {
72 Self::name()
73 }
74
75 fn inputs(&self) -> Vec<&LogicalPlan> {
78 vec![]
79 }
80
81 fn schema(&self) -> &datafusion_common::DFSchemaRef {
82 self.input.schema()
83 }
84
85 fn expressions(&self) -> Vec<datafusion_expr::Expr> {
87 vec![]
88 }
89
90 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
91 write!(f, "MergeScan [is_placeholder={}]", self.is_placeholder)
92 }
93
94 fn with_exprs_and_inputs(
95 &self,
96 _exprs: Vec<datafusion::prelude::Expr>,
97 _inputs: Vec<LogicalPlan>,
98 ) -> Result<Self> {
99 Ok(self.clone())
100 }
101}
102
103impl MergeScanLogicalPlan {
104 pub fn new(input: LogicalPlan, is_placeholder: bool, partition_cols: Vec<String>) -> Self {
105 Self {
106 input,
107 is_placeholder,
108 partition_cols,
109 }
110 }
111
112 pub fn name() -> &'static str {
113 "MergeScan"
114 }
115
116 pub fn into_logical_plan(self) -> LogicalPlan {
118 LogicalPlan::Extension(Extension {
119 node: Arc::new(self),
120 })
121 }
122
123 pub fn is_placeholder(&self) -> bool {
124 self.is_placeholder
125 }
126
127 pub fn input(&self) -> &LogicalPlan {
128 &self.input
129 }
130
131 pub fn partition_cols(&self) -> &[String] {
132 &self.partition_cols
133 }
134}
135
136pub struct MergeScanExec {
137 table: TableName,
138 regions: Vec<RegionId>,
139 plan: LogicalPlan,
140 schema: SchemaRef,
141 arrow_schema: ArrowSchemaRef,
142 region_query_handler: RegionQueryHandlerRef,
143 metric: ExecutionPlanMetricsSet,
144 properties: PlanProperties,
145 sub_stage_metrics: Arc<Mutex<Vec<RecordBatchMetrics>>>,
147 query_ctx: QueryContextRef,
148 target_partition: usize,
149 partition_cols: Vec<String>,
150}
151
152impl std::fmt::Debug for MergeScanExec {
153 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
154 f.debug_struct("MergeScanExec")
155 .field("table", &self.table)
156 .field("regions", &self.regions)
157 .field("schema", &self.schema)
158 .finish()
159 }
160}
161
162impl MergeScanExec {
163 #[allow(clippy::too_many_arguments)]
164 pub fn new(
165 session_state: &SessionState,
166 table: TableName,
167 regions: Vec<RegionId>,
168 plan: LogicalPlan,
169 arrow_schema: &ArrowSchema,
170 region_query_handler: RegionQueryHandlerRef,
171 query_ctx: QueryContextRef,
172 target_partition: usize,
173 partition_cols: Vec<String>,
174 ) -> Result<Self> {
175 let arrow_schema = Arc::new(arrow_schema.clone());
179
180 let eq_properties = if let LogicalPlan::Sort(sort) = &plan
188 && target_partition >= regions.len()
189 {
190 let lex_ordering = sort
191 .expr
192 .iter()
193 .map(|sort_expr| {
194 let physical_expr = session_state
195 .create_physical_expr(sort_expr.expr.clone(), plan.schema())?;
196 Ok(PhysicalSortExpr::new(
197 physical_expr,
198 SortOptions {
199 descending: !sort_expr.asc,
200 nulls_first: sort_expr.nulls_first,
201 },
202 ))
203 })
204 .collect::<Result<Vec<_>>>()?;
205 EquivalenceProperties::new_with_orderings(
206 arrow_schema.clone(),
207 &[LexOrdering::new(lex_ordering)],
208 )
209 } else {
210 EquivalenceProperties::new(arrow_schema.clone())
211 };
212
213 let partition_exprs = partition_cols
214 .iter()
215 .filter_map(|col| {
216 session_state
217 .create_physical_expr(
218 Expr::Column(ColumnExpr::new_unqualified(col)),
219 plan.schema(),
220 )
221 .ok()
222 })
223 .collect();
224 let partitioning = Partitioning::Hash(partition_exprs, target_partition);
225
226 let properties = PlanProperties::new(
227 eq_properties,
228 partitioning,
229 EmissionType::Incremental,
230 Boundedness::Bounded,
231 );
232 let schema = Self::arrow_schema_to_schema(arrow_schema.clone())?;
233 Ok(Self {
234 table,
235 regions,
236 plan,
237 schema,
238 arrow_schema,
239 region_query_handler,
240 metric: ExecutionPlanMetricsSet::new(),
241 sub_stage_metrics: Arc::default(),
242 properties,
243 query_ctx,
244 target_partition,
245 partition_cols,
246 })
247 }
248
249 pub fn to_stream(
250 &self,
251 context: Arc<TaskContext>,
252 partition: usize,
253 ) -> Result<SendableRecordBatchStream> {
254 let regions = self.regions.clone();
256 let region_query_handler = self.region_query_handler.clone();
257 let metric = MergeScanMetric::new(&self.metric);
258 let schema = self.schema.clone();
259 let query_ctx = self.query_ctx.clone();
260 let sub_stage_metrics_moved = self.sub_stage_metrics.clone();
261 let plan = self.plan.clone();
262 let target_partition = self.target_partition;
263 let dbname = context.task_id().unwrap_or_default();
264 let tracing_context = TracingContext::from_json(context.session_id().as_str());
265 let current_channel = self.query_ctx.channel();
266 let read_preference = self.query_ctx.read_preference();
267
268 let stream = Box::pin(stream!({
269 if partition == 0 {
271 MERGE_SCAN_REGIONS.observe(regions.len() as f64);
272 }
273
274 let _finish_timer = metric.finish_time().timer();
275 let mut ready_timer = metric.ready_time().timer();
276 let mut first_consume_timer = Some(metric.first_consume_time().timer());
277
278 for region_id in regions
279 .iter()
280 .skip(partition)
281 .step_by(target_partition)
282 .copied()
283 {
284 let request = QueryRequest {
285 header: Some(RegionRequestHeader {
286 tracing_context: tracing_context.to_w3c(),
287 dbname: dbname.clone(),
288 query_context: Some(query_ctx.as_ref().into()),
289 }),
290 region_id,
291 plan: plan.clone(),
292 };
293 let do_get_start = Instant::now();
294 let mut stream = region_query_handler
295 .do_get(read_preference, request)
296 .await
297 .map_err(|e| {
298 MERGE_SCAN_ERRORS_TOTAL.inc();
299 BoxedError::new(e)
300 })
301 .context(ExternalSnafu)?;
302 let do_get_cost = do_get_start.elapsed();
303
304 ready_timer.stop();
305
306 let mut poll_duration = Duration::ZERO;
307 let mut poll_timer = Instant::now();
308 while let Some(batch) = stream.next().await {
309 let poll_elapsed = poll_timer.elapsed();
310 poll_duration += poll_elapsed;
311
312 let batch = batch?;
313 let batch = RecordBatch::new(schema.clone(), batch.columns().iter().cloned())?;
316 metric.record_output_batch_rows(batch.num_rows());
317 if let Some(mut first_consume_timer) = first_consume_timer.take() {
318 first_consume_timer.stop();
319 }
320 yield Ok(batch);
321 poll_timer = Instant::now();
323 }
324 common_telemetry::debug!(
325 "Merge scan stop poll stream, partition: {}, region_id: {}, poll_duration: {:?}, first_consume: {}, do_get_cost: {:?}",
326 partition, region_id, poll_duration, metric.first_consume_time(), do_get_cost
327 );
328
329 if let Some(metrics) = stream.metrics() {
331 let (c, s) = parse_catalog_and_schema_from_db_string(&dbname);
332 let value = read_meter!(
333 c,
334 s,
335 ReadItem {
336 cpu_time: metrics.elapsed_compute as u64,
337 table_scan: metrics.memory_usage as u64
338 },
339 current_channel as u8
340 );
341 metric.record_greptime_exec_cost(value as usize);
342
343 sub_stage_metrics_moved.lock().unwrap().push(metrics);
345 }
346
347 MERGE_SCAN_POLL_ELAPSED.observe(poll_duration.as_secs_f64());
348 }
349 }));
350
351 Ok(Box::pin(RecordBatchStreamWrapper {
352 schema: self.schema.clone(),
353 stream,
354 output_ordering: None,
355 metrics: Default::default(),
356 }))
357 }
358
359 pub fn try_with_new_distribution(&self, distribution: Distribution) -> Option<Self> {
360 let Distribution::HashPartitioned(hash_exprs) = distribution else {
361 return None;
363 };
364
365 if let Partitioning::Hash(curr_dist, _) = &self.properties.partitioning
366 && curr_dist == &hash_exprs
367 {
368 return None;
370 }
371
372 let mut hash_cols = HashSet::default();
373 for expr in &hash_exprs {
374 if let Some(col_expr) = expr.as_any().downcast_ref::<Column>() {
375 hash_cols.insert(col_expr.name());
376 }
377 }
378 for col in &self.partition_cols {
379 if !hash_cols.contains(col.as_str()) {
380 return None;
382 }
383 }
384
385 Some(Self {
386 table: self.table.clone(),
387 regions: self.regions.clone(),
388 plan: self.plan.clone(),
389 schema: self.schema.clone(),
390 arrow_schema: self.arrow_schema.clone(),
391 region_query_handler: self.region_query_handler.clone(),
392 metric: self.metric.clone(),
393 properties: PlanProperties::new(
394 self.properties.eq_properties.clone(),
395 Partitioning::Hash(hash_exprs, self.target_partition),
396 self.properties.emission_type,
397 self.properties.boundedness,
398 ),
399 sub_stage_metrics: self.sub_stage_metrics.clone(),
400 query_ctx: self.query_ctx.clone(),
401 target_partition: self.target_partition,
402 partition_cols: self.partition_cols.clone(),
403 })
404 }
405
406 fn arrow_schema_to_schema(arrow_schema: ArrowSchemaRef) -> Result<SchemaRef> {
407 let schema = Schema::try_from(arrow_schema).context(ConvertSchemaSnafu)?;
408 Ok(Arc::new(schema))
409 }
410
411 pub fn sub_stage_metrics(&self) -> Vec<RecordBatchMetrics> {
412 self.sub_stage_metrics.lock().unwrap().clone()
413 }
414
415 pub fn partition_count(&self) -> usize {
416 self.target_partition
417 }
418
419 pub fn region_count(&self) -> usize {
420 self.regions.len()
421 }
422}
423
424impl ExecutionPlan for MergeScanExec {
425 fn as_any(&self) -> &dyn Any {
426 self
427 }
428
429 fn schema(&self) -> ArrowSchemaRef {
430 self.arrow_schema.clone()
431 }
432
433 fn properties(&self) -> &PlanProperties {
434 &self.properties
435 }
436
437 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
438 vec![]
439 }
440
441 fn with_new_children(
444 self: Arc<Self>,
445 _children: Vec<Arc<dyn ExecutionPlan>>,
446 ) -> Result<Arc<dyn ExecutionPlan>> {
447 Ok(self.clone())
448 }
449
450 fn execute(
451 &self,
452 partition: usize,
453 context: Arc<TaskContext>,
454 ) -> Result<DfSendableRecordBatchStream> {
455 Ok(Box::pin(DfRecordBatchStreamAdapter::new(
456 self.to_stream(context, partition)?,
457 )))
458 }
459
460 fn metrics(&self) -> Option<MetricsSet> {
461 Some(self.metric.clone_inner())
462 }
463
464 fn name(&self) -> &str {
465 "MergeScanExec"
466 }
467}
468
469impl DisplayAs for MergeScanExec {
470 fn fmt_as(&self, _t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
471 write!(f, "MergeScanExec: peers=[")?;
472 for region_id in self.regions.iter() {
473 write!(f, "{}, ", region_id)?;
474 }
475 write!(f, "]")
476 }
477}
478
479#[derive(Debug, Clone)]
480struct MergeScanMetric {
481 ready_time: Time,
483 first_consume_time: Time,
485 finish_time: Time,
487 output_rows: Count,
489
490 greptime_exec_cost: Gauge,
492}
493
494impl MergeScanMetric {
495 pub fn new(metric: &ExecutionPlanMetricsSet) -> Self {
496 Self {
497 ready_time: MetricBuilder::new(metric).subset_time("ready_time", 1),
498 first_consume_time: MetricBuilder::new(metric).subset_time("first_consume_time", 1),
499 finish_time: MetricBuilder::new(metric).subset_time("finish_time", 1),
500 output_rows: MetricBuilder::new(metric).output_rows(1),
501 greptime_exec_cost: MetricBuilder::new(metric).gauge(GREPTIME_EXEC_READ_COST, 1),
502 }
503 }
504
505 pub fn ready_time(&self) -> &Time {
506 &self.ready_time
507 }
508
509 pub fn first_consume_time(&self) -> &Time {
510 &self.first_consume_time
511 }
512
513 pub fn finish_time(&self) -> &Time {
514 &self.finish_time
515 }
516
517 pub fn record_output_batch_rows(&self, num_rows: usize) {
518 self.output_rows.add(num_rows);
519 }
520
521 pub fn record_greptime_exec_cost(&self, metrics: usize) {
522 self.greptime_exec_cost.add(metrics);
523 }
524}