1use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{BooleanArray, Float64Array};
21use datafusion::arrow::compute;
22use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
23use datafusion::error::DataFusionError;
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_plan::expressions::Column as ColumnExpr;
27use datafusion::physical_plan::metrics::{
28 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
29};
30use datafusion::physical_plan::{
31 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
32 SendableRecordBatchStream,
33};
34use datatypes::arrow::array::TimestampMillisecondArray;
35use datatypes::arrow::datatypes::SchemaRef;
36use datatypes::arrow::record_batch::RecordBatch;
37use futures::{Stream, StreamExt, ready};
38use greptime_proto::substrait_extension as pb;
39use prost::Message;
40use snafu::ResultExt;
41
42use crate::error::{DeserializeSnafu, Result};
43use crate::extension_plan::{
44 METRIC_NUM_SERIES, Millisecond, resolve_column_name, serialize_column_index,
45};
46use crate::metrics::PROMQL_SERIES_COUNT;
47
48#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
56pub struct SeriesNormalize {
57 offset: Millisecond,
58 time_index_column_name: String,
59 need_filter_out_nan: bool,
60 tag_columns: Vec<String>,
61
62 input: LogicalPlan,
63 unfix: Option<UnfixIndices>,
64}
65
66#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
67struct UnfixIndices {
68 pub time_index_idx: u64,
69 pub tag_column_indices: Vec<u64>,
70}
71
72impl UserDefinedLogicalNodeCore for SeriesNormalize {
73 fn name(&self) -> &str {
74 Self::name()
75 }
76
77 fn inputs(&self) -> Vec<&LogicalPlan> {
78 vec![&self.input]
79 }
80
81 fn schema(&self) -> &DFSchemaRef {
82 self.input.schema()
83 }
84
85 fn expressions(&self) -> Vec<datafusion::logical_expr::Expr> {
86 vec![]
87 }
88
89 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
90 write!(
91 f,
92 "PromSeriesNormalize: offset=[{}], time index=[{}], filter NaN: [{}]",
93 self.offset, self.time_index_column_name, self.need_filter_out_nan
94 )
95 }
96
97 fn with_exprs_and_inputs(
98 &self,
99 _exprs: Vec<Expr>,
100 inputs: Vec<LogicalPlan>,
101 ) -> DataFusionResult<Self> {
102 if inputs.is_empty() {
103 return Err(DataFusionError::Internal(
104 "SeriesNormalize should have at least one input".to_string(),
105 ));
106 }
107
108 let input: LogicalPlan = inputs.into_iter().next().unwrap();
109 let input_schema = input.schema();
110
111 if let Some(unfix) = &self.unfix {
112 let time_index_column_name = resolve_column_name(
114 unfix.time_index_idx,
115 input_schema,
116 "SeriesNormalize",
117 "time index",
118 )?;
119
120 let tag_columns = unfix
121 .tag_column_indices
122 .iter()
123 .map(|idx| resolve_column_name(*idx, input_schema, "SeriesNormalize", "tag"))
124 .collect::<DataFusionResult<Vec<String>>>()?;
125
126 Ok(Self {
127 offset: self.offset,
128 time_index_column_name,
129 need_filter_out_nan: self.need_filter_out_nan,
130 tag_columns,
131 input,
132 unfix: None,
133 })
134 } else {
135 Ok(Self {
136 offset: self.offset,
137 time_index_column_name: self.time_index_column_name.clone(),
138 need_filter_out_nan: self.need_filter_out_nan,
139 tag_columns: self.tag_columns.clone(),
140 input,
141 unfix: None,
142 })
143 }
144 }
145}
146
147impl SeriesNormalize {
148 pub fn new<N: AsRef<str>>(
149 offset: Millisecond,
150 time_index_column_name: N,
151 need_filter_out_nan: bool,
152 tag_columns: Vec<String>,
153 input: LogicalPlan,
154 ) -> Self {
155 Self {
156 offset,
157 time_index_column_name: time_index_column_name.as_ref().to_string(),
158 need_filter_out_nan,
159 tag_columns,
160 input,
161 unfix: None,
162 }
163 }
164
165 pub const fn name() -> &'static str {
166 "SeriesNormalize"
167 }
168
169 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
170 Arc::new(SeriesNormalizeExec {
171 offset: self.offset,
172 time_index_column_name: self.time_index_column_name.clone(),
173 need_filter_out_nan: self.need_filter_out_nan,
174 input: exec_input,
175 tag_columns: self.tag_columns.clone(),
176 metric: ExecutionPlanMetricsSet::new(),
177 })
178 }
179
180 pub fn serialize(&self) -> Vec<u8> {
181 let time_index_idx =
182 serialize_column_index(self.input.schema(), &self.time_index_column_name);
183
184 let tag_column_indices = self
185 .tag_columns
186 .iter()
187 .map(|name| serialize_column_index(self.input.schema(), name))
188 .collect::<Vec<u64>>();
189
190 pb::SeriesNormalize {
191 offset: self.offset,
192 time_index_idx,
193 filter_nan: self.need_filter_out_nan,
194 tag_column_indices,
195 ..Default::default()
196 }
197 .encode_to_vec()
198 }
199
200 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
201 let pb_normalize = pb::SeriesNormalize::decode(bytes).context(DeserializeSnafu)?;
202 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
203 produce_one_row: false,
204 schema: Arc::new(DFSchema::empty()),
205 });
206
207 let unfix = UnfixIndices {
208 time_index_idx: pb_normalize.time_index_idx,
209 tag_column_indices: pb_normalize.tag_column_indices.clone(),
210 };
211
212 Ok(Self {
213 offset: pb_normalize.offset,
214 time_index_column_name: String::new(),
215 need_filter_out_nan: pb_normalize.filter_nan,
216 tag_columns: Vec::new(),
217 input: placeholder_plan,
218 unfix: Some(unfix),
219 })
220 }
221}
222
223#[derive(Debug)]
224pub struct SeriesNormalizeExec {
225 offset: Millisecond,
226 time_index_column_name: String,
227 need_filter_out_nan: bool,
228 tag_columns: Vec<String>,
229
230 input: Arc<dyn ExecutionPlan>,
231 metric: ExecutionPlanMetricsSet,
232}
233
234impl ExecutionPlan for SeriesNormalizeExec {
235 fn as_any(&self) -> &dyn Any {
236 self
237 }
238
239 fn schema(&self) -> SchemaRef {
240 self.input.schema()
241 }
242
243 fn required_input_distribution(&self) -> Vec<Distribution> {
244 let schema = self.input.schema();
245 vec![Distribution::HashPartitioned(
246 self.tag_columns
247 .iter()
248 .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
250 .collect(),
251 )]
252 }
253
254 fn properties(&self) -> &PlanProperties {
255 self.input.properties()
256 }
257
258 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
259 vec![&self.input]
260 }
261
262 fn with_new_children(
263 self: Arc<Self>,
264 children: Vec<Arc<dyn ExecutionPlan>>,
265 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
266 assert!(!children.is_empty());
267 Ok(Arc::new(Self {
268 offset: self.offset,
269 time_index_column_name: self.time_index_column_name.clone(),
270 need_filter_out_nan: self.need_filter_out_nan,
271 input: children[0].clone(),
272 tag_columns: self.tag_columns.clone(),
273 metric: self.metric.clone(),
274 }))
275 }
276
277 fn execute(
278 &self,
279 partition: usize,
280 context: Arc<TaskContext>,
281 ) -> DataFusionResult<SendableRecordBatchStream> {
282 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
283 let metrics_builder = MetricBuilder::new(&self.metric);
284 let num_series = Count::new();
285 metrics_builder
286 .with_partition(partition)
287 .build(MetricValue::Count {
288 name: METRIC_NUM_SERIES.into(),
289 count: num_series.clone(),
290 });
291
292 let input = self.input.execute(partition, context)?;
293 let schema = input.schema();
294 let time_index = schema
295 .column_with_name(&self.time_index_column_name)
296 .expect("time index column not found")
297 .0;
298 Ok(Box::pin(SeriesNormalizeStream {
299 offset: self.offset,
300 time_index,
301 need_filter_out_nan: self.need_filter_out_nan,
302 schema,
303 input,
304 metric: baseline_metric,
305 num_series,
306 }))
307 }
308
309 fn metrics(&self) -> Option<MetricsSet> {
310 Some(self.metric.clone_inner())
311 }
312
313 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
314 self.input.partition_statistics(partition)
315 }
316
317 fn name(&self) -> &str {
318 "SeriesNormalizeExec"
319 }
320}
321
322impl DisplayAs for SeriesNormalizeExec {
323 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
324 match t {
325 DisplayFormatType::Default
326 | DisplayFormatType::Verbose
327 | DisplayFormatType::TreeRender => {
328 write!(
329 f,
330 "PromSeriesNormalizeExec: offset=[{}], time index=[{}], filter NaN: [{}]",
331 self.offset, self.time_index_column_name, self.need_filter_out_nan
332 )
333 }
334 }
335 }
336}
337
338pub struct SeriesNormalizeStream {
339 offset: Millisecond,
340 time_index: usize,
342 need_filter_out_nan: bool,
343
344 schema: SchemaRef,
345 input: SendableRecordBatchStream,
346 metric: BaselineMetrics,
347 num_series: Count,
349}
350
351impl SeriesNormalizeStream {
352 pub fn normalize(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
353 let ts_column = input
354 .column(self.time_index)
355 .as_any()
356 .downcast_ref::<TimestampMillisecondArray>()
357 .ok_or_else(|| {
358 DataFusionError::Execution(
359 "Time index Column downcast to TimestampMillisecondArray failed".into(),
360 )
361 })?;
362
363 let ts_column_biased = if self.offset == 0 {
365 Arc::new(ts_column.clone()) as _
366 } else {
367 Arc::new(TimestampMillisecondArray::from_iter(
368 ts_column.iter().map(|ts| ts.map(|ts| ts - self.offset)),
369 ))
370 };
371 let mut columns = input.columns().to_vec();
372 columns[self.time_index] = ts_column_biased;
373
374 let result_batch = RecordBatch::try_new(input.schema(), columns)?;
375 if !self.need_filter_out_nan {
376 return Ok(result_batch);
377 }
378
379 let mut filter = vec![true; input.num_rows()];
382 for column in result_batch.columns() {
383 if let Some(float_column) = column.as_any().downcast_ref::<Float64Array>() {
384 for (i, flag) in filter.iter_mut().enumerate() {
385 if float_column.value(i).is_nan() {
386 *flag = false;
387 }
388 }
389 }
390 }
391
392 let result = compute::filter_record_batch(&result_batch, &BooleanArray::from(filter))
393 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
394 Ok(result)
395 }
396}
397
398impl RecordBatchStream for SeriesNormalizeStream {
399 fn schema(&self) -> SchemaRef {
400 self.schema.clone()
401 }
402}
403
404impl Stream for SeriesNormalizeStream {
405 type Item = DataFusionResult<RecordBatch>;
406
407 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
408 let poll = match ready!(self.input.poll_next_unpin(cx)) {
409 Some(Ok(batch)) => {
410 self.num_series.add(1);
411 let timer = std::time::Instant::now();
412 let result = Ok(batch).and_then(|batch| self.normalize(batch));
413 self.metric.elapsed_compute().add_elapsed(timer);
414 Poll::Ready(Some(result))
415 }
416 None => {
417 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
418 Poll::Ready(None)
419 }
420 Some(Err(e)) => Poll::Ready(Some(Err(e))),
421 };
422 self.metric.record_poll(poll)
423 }
424}
425
426#[cfg(test)]
427mod test {
428 use datafusion::arrow::array::Float64Array;
429 use datafusion::arrow::datatypes::{
430 ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
431 };
432 use datafusion::datasource::memory::MemorySourceConfig;
433 use datafusion::datasource::source::DataSourceExec;
434 use datafusion::prelude::SessionContext;
435 use datatypes::arrow::array::TimestampMillisecondArray;
436 use datatypes::arrow_array::StringArray;
437
438 use super::*;
439
440 const TIME_INDEX_COLUMN: &str = "timestamp";
441
442 fn prepare_test_data() -> DataSourceExec {
443 let schema = Arc::new(Schema::new(vec![
444 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
445 Field::new("value", DataType::Float64, true),
446 Field::new("path", DataType::Utf8, true),
447 ]));
448 let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
449 60_000, 120_000, 0, 30_000, 90_000,
450 ])) as _;
451 let field_column = Arc::new(Float64Array::from(vec![0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
452 let path_column = Arc::new(StringArray::from(vec!["foo", "foo", "foo", "foo", "foo"])) as _;
453 let data = RecordBatch::try_new(
454 schema.clone(),
455 vec![timestamp_column, field_column, path_column],
456 )
457 .unwrap();
458
459 DataSourceExec::new(Arc::new(
460 MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
461 ))
462 }
463
464 #[tokio::test]
465 async fn test_sort_record_batch() {
466 let memory_exec = Arc::new(prepare_test_data());
467 let normalize_exec = Arc::new(SeriesNormalizeExec {
468 offset: 0,
469 time_index_column_name: TIME_INDEX_COLUMN.to_string(),
470 need_filter_out_nan: true,
471 input: memory_exec,
472 tag_columns: vec!["path".to_string()],
473 metric: ExecutionPlanMetricsSet::new(),
474 });
475 let session_context = SessionContext::default();
476 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
477 .await
478 .unwrap();
479 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
480 .unwrap()
481 .to_string();
482
483 let expected = String::from(
484 "+---------------------+--------+------+\
485 \n| timestamp | value | path |\
486 \n+---------------------+--------+------+\
487 \n| 1970-01-01T00:01:00 | 0.0 | foo |\
488 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
489 \n| 1970-01-01T00:00:00 | 10.0 | foo |\
490 \n| 1970-01-01T00:00:30 | 100.0 | foo |\
491 \n| 1970-01-01T00:01:30 | 1000.0 | foo |\
492 \n+---------------------+--------+------+",
493 );
494
495 assert_eq!(result_literal, expected);
496 }
497
498 #[tokio::test]
499 async fn test_offset_record_batch() {
500 let memory_exec = Arc::new(prepare_test_data());
501 let normalize_exec = Arc::new(SeriesNormalizeExec {
502 offset: 1_000,
503 time_index_column_name: TIME_INDEX_COLUMN.to_string(),
504 need_filter_out_nan: true,
505 input: memory_exec,
506 metric: ExecutionPlanMetricsSet::new(),
507 tag_columns: vec!["path".to_string()],
508 });
509 let session_context = SessionContext::default();
510 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
511 .await
512 .unwrap();
513 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
514 .unwrap()
515 .to_string();
516
517 let expected = String::from(
518 "+---------------------+--------+------+\
519 \n| timestamp | value | path |\
520 \n+---------------------+--------+------+\
521 \n| 1970-01-01T00:00:59 | 0.0 | foo |\
522 \n| 1970-01-01T00:01:59 | 1.0 | foo |\
523 \n| 1969-12-31T23:59:59 | 10.0 | foo |\
524 \n| 1970-01-01T00:00:29 | 100.0 | foo |\
525 \n| 1970-01-01T00:01:29 | 1000.0 | foo |\
526 \n+---------------------+--------+------+",
527 );
528
529 assert_eq!(result_literal, expected);
530 }
531}