1use std::any::Any;
16use std::pin::Pin;
17use std::sync::Arc;
18use std::task::{Context, Poll};
19
20use datafusion::arrow::array::{BooleanArray, Float64Array};
21use datafusion::arrow::compute;
22use datafusion::common::{DFSchema, DFSchemaRef, Result as DataFusionResult, Statistics};
23use datafusion::error::DataFusionError;
24use datafusion::execution::context::TaskContext;
25use datafusion::logical_expr::{EmptyRelation, Expr, LogicalPlan, UserDefinedLogicalNodeCore};
26use datafusion::physical_plan::expressions::Column as ColumnExpr;
27use datafusion::physical_plan::metrics::{
28 BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder, MetricValue, MetricsSet,
29};
30use datafusion::physical_plan::{
31 DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, PlanProperties, RecordBatchStream,
32 SendableRecordBatchStream,
33};
34use datatypes::arrow::array::TimestampMillisecondArray;
35use datatypes::arrow::datatypes::SchemaRef;
36use datatypes::arrow::record_batch::RecordBatch;
37use futures::{ready, Stream, StreamExt};
38use greptime_proto::substrait_extension as pb;
39use prost::Message;
40use snafu::ResultExt;
41
42use crate::error::{DeserializeSnafu, Result};
43use crate::extension_plan::{Millisecond, METRIC_NUM_SERIES};
44use crate::metrics::PROMQL_SERIES_COUNT;
45
46#[derive(Debug, PartialEq, Eq, Hash, PartialOrd)]
54pub struct SeriesNormalize {
55 offset: Millisecond,
56 time_index_column_name: String,
57 need_filter_out_nan: bool,
58 tag_columns: Vec<String>,
59
60 input: LogicalPlan,
61}
62
63impl UserDefinedLogicalNodeCore for SeriesNormalize {
64 fn name(&self) -> &str {
65 Self::name()
66 }
67
68 fn inputs(&self) -> Vec<&LogicalPlan> {
69 vec![&self.input]
70 }
71
72 fn schema(&self) -> &DFSchemaRef {
73 self.input.schema()
74 }
75
76 fn expressions(&self) -> Vec<datafusion::logical_expr::Expr> {
77 vec![]
78 }
79
80 fn fmt_for_explain(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
81 write!(
82 f,
83 "PromSeriesNormalize: offset=[{}], time index=[{}], filter NaN: [{}]",
84 self.offset, self.time_index_column_name, self.need_filter_out_nan
85 )
86 }
87
88 fn with_exprs_and_inputs(
89 &self,
90 _exprs: Vec<Expr>,
91 inputs: Vec<LogicalPlan>,
92 ) -> DataFusionResult<Self> {
93 if inputs.is_empty() {
94 return Err(DataFusionError::Internal(
95 "SeriesNormalize should have at least one input".to_string(),
96 ));
97 }
98
99 Ok(Self {
100 offset: self.offset,
101 time_index_column_name: self.time_index_column_name.clone(),
102 need_filter_out_nan: self.need_filter_out_nan,
103 input: inputs.into_iter().next().unwrap(),
104 tag_columns: self.tag_columns.clone(),
105 })
106 }
107}
108
109impl SeriesNormalize {
110 pub fn new<N: AsRef<str>>(
111 offset: Millisecond,
112 time_index_column_name: N,
113 need_filter_out_nan: bool,
114 tag_columns: Vec<String>,
115 input: LogicalPlan,
116 ) -> Self {
117 Self {
118 offset,
119 time_index_column_name: time_index_column_name.as_ref().to_string(),
120 need_filter_out_nan,
121 tag_columns,
122 input,
123 }
124 }
125
126 pub const fn name() -> &'static str {
127 "SeriesNormalize"
128 }
129
130 pub fn to_execution_plan(&self, exec_input: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
131 Arc::new(SeriesNormalizeExec {
132 offset: self.offset,
133 time_index_column_name: self.time_index_column_name.clone(),
134 need_filter_out_nan: self.need_filter_out_nan,
135 input: exec_input,
136 tag_columns: self.tag_columns.clone(),
137 metric: ExecutionPlanMetricsSet::new(),
138 })
139 }
140
141 pub fn serialize(&self) -> Vec<u8> {
142 pb::SeriesNormalize {
143 offset: self.offset,
144 time_index: self.time_index_column_name.clone(),
145 filter_nan: self.need_filter_out_nan,
146 tag_columns: self.tag_columns.clone(),
147 }
148 .encode_to_vec()
149 }
150
151 pub fn deserialize(bytes: &[u8]) -> Result<Self> {
152 let pb_normalize = pb::SeriesNormalize::decode(bytes).context(DeserializeSnafu)?;
153 let placeholder_plan = LogicalPlan::EmptyRelation(EmptyRelation {
154 produce_one_row: false,
155 schema: Arc::new(DFSchema::empty()),
156 });
157 Ok(Self::new(
158 pb_normalize.offset,
159 pb_normalize.time_index,
160 pb_normalize.filter_nan,
161 pb_normalize.tag_columns,
162 placeholder_plan,
163 ))
164 }
165}
166
167#[derive(Debug)]
168pub struct SeriesNormalizeExec {
169 offset: Millisecond,
170 time_index_column_name: String,
171 need_filter_out_nan: bool,
172 tag_columns: Vec<String>,
173
174 input: Arc<dyn ExecutionPlan>,
175 metric: ExecutionPlanMetricsSet,
176}
177
178impl ExecutionPlan for SeriesNormalizeExec {
179 fn as_any(&self) -> &dyn Any {
180 self
181 }
182
183 fn schema(&self) -> SchemaRef {
184 self.input.schema()
185 }
186
187 fn required_input_distribution(&self) -> Vec<Distribution> {
188 let schema = self.input.schema();
189 vec![Distribution::HashPartitioned(
190 self.tag_columns
191 .iter()
192 .map(|tag| Arc::new(ColumnExpr::new_with_schema(tag, &schema).unwrap()) as _)
194 .collect(),
195 )]
196 }
197
198 fn properties(&self) -> &PlanProperties {
199 self.input.properties()
200 }
201
202 fn children(&self) -> Vec<&Arc<dyn ExecutionPlan>> {
203 vec![&self.input]
204 }
205
206 fn with_new_children(
207 self: Arc<Self>,
208 children: Vec<Arc<dyn ExecutionPlan>>,
209 ) -> DataFusionResult<Arc<dyn ExecutionPlan>> {
210 assert!(!children.is_empty());
211 Ok(Arc::new(Self {
212 offset: self.offset,
213 time_index_column_name: self.time_index_column_name.clone(),
214 need_filter_out_nan: self.need_filter_out_nan,
215 input: children[0].clone(),
216 tag_columns: self.tag_columns.clone(),
217 metric: self.metric.clone(),
218 }))
219 }
220
221 fn execute(
222 &self,
223 partition: usize,
224 context: Arc<TaskContext>,
225 ) -> DataFusionResult<SendableRecordBatchStream> {
226 let baseline_metric = BaselineMetrics::new(&self.metric, partition);
227 let metrics_builder = MetricBuilder::new(&self.metric);
228 let num_series = Count::new();
229 metrics_builder
230 .with_partition(partition)
231 .build(MetricValue::Count {
232 name: METRIC_NUM_SERIES.into(),
233 count: num_series.clone(),
234 });
235
236 let input = self.input.execute(partition, context)?;
237 let schema = input.schema();
238 let time_index = schema
239 .column_with_name(&self.time_index_column_name)
240 .expect("time index column not found")
241 .0;
242 Ok(Box::pin(SeriesNormalizeStream {
243 offset: self.offset,
244 time_index,
245 need_filter_out_nan: self.need_filter_out_nan,
246 schema,
247 input,
248 metric: baseline_metric,
249 num_series,
250 }))
251 }
252
253 fn metrics(&self) -> Option<MetricsSet> {
254 Some(self.metric.clone_inner())
255 }
256
257 fn partition_statistics(&self, partition: Option<usize>) -> DataFusionResult<Statistics> {
258 self.input.partition_statistics(partition)
259 }
260
261 fn name(&self) -> &str {
262 "SeriesNormalizeExec"
263 }
264}
265
266impl DisplayAs for SeriesNormalizeExec {
267 fn fmt_as(&self, t: DisplayFormatType, f: &mut std::fmt::Formatter) -> std::fmt::Result {
268 match t {
269 DisplayFormatType::Default
270 | DisplayFormatType::Verbose
271 | DisplayFormatType::TreeRender => {
272 write!(
273 f,
274 "PromSeriesNormalizeExec: offset=[{}], time index=[{}], filter NaN: [{}]",
275 self.offset, self.time_index_column_name, self.need_filter_out_nan
276 )
277 }
278 }
279 }
280}
281
282pub struct SeriesNormalizeStream {
283 offset: Millisecond,
284 time_index: usize,
286 need_filter_out_nan: bool,
287
288 schema: SchemaRef,
289 input: SendableRecordBatchStream,
290 metric: BaselineMetrics,
291 num_series: Count,
293}
294
295impl SeriesNormalizeStream {
296 pub fn normalize(&self, input: RecordBatch) -> DataFusionResult<RecordBatch> {
297 let ts_column = input
298 .column(self.time_index)
299 .as_any()
300 .downcast_ref::<TimestampMillisecondArray>()
301 .ok_or_else(|| {
302 DataFusionError::Execution(
303 "Time index Column downcast to TimestampMillisecondArray failed".into(),
304 )
305 })?;
306
307 let ts_column_biased = if self.offset == 0 {
309 Arc::new(ts_column.clone()) as _
310 } else {
311 Arc::new(TimestampMillisecondArray::from_iter(
312 ts_column.iter().map(|ts| ts.map(|ts| ts - self.offset)),
313 ))
314 };
315 let mut columns = input.columns().to_vec();
316 columns[self.time_index] = ts_column_biased;
317
318 let result_batch = RecordBatch::try_new(input.schema(), columns)?;
319 if !self.need_filter_out_nan {
320 return Ok(result_batch);
321 }
322
323 let mut filter = vec![true; input.num_rows()];
326 for column in result_batch.columns() {
327 if let Some(float_column) = column.as_any().downcast_ref::<Float64Array>() {
328 for (i, flag) in filter.iter_mut().enumerate() {
329 if float_column.value(i).is_nan() {
330 *flag = false;
331 }
332 }
333 }
334 }
335
336 let result = compute::filter_record_batch(&result_batch, &BooleanArray::from(filter))
337 .map_err(|e| DataFusionError::ArrowError(Box::new(e), None))?;
338 Ok(result)
339 }
340}
341
342impl RecordBatchStream for SeriesNormalizeStream {
343 fn schema(&self) -> SchemaRef {
344 self.schema.clone()
345 }
346}
347
348impl Stream for SeriesNormalizeStream {
349 type Item = DataFusionResult<RecordBatch>;
350
351 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
352 let poll = match ready!(self.input.poll_next_unpin(cx)) {
353 Some(Ok(batch)) => {
354 self.num_series.add(1);
355 let timer = std::time::Instant::now();
356 let result = Ok(batch).and_then(|batch| self.normalize(batch));
357 self.metric.elapsed_compute().add_elapsed(timer);
358 Poll::Ready(Some(result))
359 }
360 None => {
361 PROMQL_SERIES_COUNT.observe(self.num_series.value() as f64);
362 Poll::Ready(None)
363 }
364 Some(Err(e)) => Poll::Ready(Some(Err(e))),
365 };
366 self.metric.record_poll(poll)
367 }
368}
369
370#[cfg(test)]
371mod test {
372 use datafusion::arrow::array::Float64Array;
373 use datafusion::arrow::datatypes::{
374 ArrowPrimitiveType, DataType, Field, Schema, TimestampMillisecondType,
375 };
376 use datafusion::datasource::memory::MemorySourceConfig;
377 use datafusion::datasource::source::DataSourceExec;
378 use datafusion::prelude::SessionContext;
379 use datatypes::arrow::array::TimestampMillisecondArray;
380 use datatypes::arrow_array::StringArray;
381
382 use super::*;
383
384 const TIME_INDEX_COLUMN: &str = "timestamp";
385
386 fn prepare_test_data() -> DataSourceExec {
387 let schema = Arc::new(Schema::new(vec![
388 Field::new(TIME_INDEX_COLUMN, TimestampMillisecondType::DATA_TYPE, true),
389 Field::new("value", DataType::Float64, true),
390 Field::new("path", DataType::Utf8, true),
391 ]));
392 let timestamp_column = Arc::new(TimestampMillisecondArray::from(vec![
393 60_000, 120_000, 0, 30_000, 90_000,
394 ])) as _;
395 let field_column = Arc::new(Float64Array::from(vec![0.0, 1.0, 10.0, 100.0, 1000.0])) as _;
396 let path_column = Arc::new(StringArray::from(vec!["foo", "foo", "foo", "foo", "foo"])) as _;
397 let data = RecordBatch::try_new(
398 schema.clone(),
399 vec![timestamp_column, field_column, path_column],
400 )
401 .unwrap();
402
403 DataSourceExec::new(Arc::new(
404 MemorySourceConfig::try_new(&[vec![data]], schema, None).unwrap(),
405 ))
406 }
407
408 #[tokio::test]
409 async fn test_sort_record_batch() {
410 let memory_exec = Arc::new(prepare_test_data());
411 let normalize_exec = Arc::new(SeriesNormalizeExec {
412 offset: 0,
413 time_index_column_name: TIME_INDEX_COLUMN.to_string(),
414 need_filter_out_nan: true,
415 input: memory_exec,
416 tag_columns: vec!["path".to_string()],
417 metric: ExecutionPlanMetricsSet::new(),
418 });
419 let session_context = SessionContext::default();
420 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
421 .await
422 .unwrap();
423 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
424 .unwrap()
425 .to_string();
426
427 let expected = String::from(
428 "+---------------------+--------+------+\
429 \n| timestamp | value | path |\
430 \n+---------------------+--------+------+\
431 \n| 1970-01-01T00:01:00 | 0.0 | foo |\
432 \n| 1970-01-01T00:02:00 | 1.0 | foo |\
433 \n| 1970-01-01T00:00:00 | 10.0 | foo |\
434 \n| 1970-01-01T00:00:30 | 100.0 | foo |\
435 \n| 1970-01-01T00:01:30 | 1000.0 | foo |\
436 \n+---------------------+--------+------+",
437 );
438
439 assert_eq!(result_literal, expected);
440 }
441
442 #[tokio::test]
443 async fn test_offset_record_batch() {
444 let memory_exec = Arc::new(prepare_test_data());
445 let normalize_exec = Arc::new(SeriesNormalizeExec {
446 offset: 1_000,
447 time_index_column_name: TIME_INDEX_COLUMN.to_string(),
448 need_filter_out_nan: true,
449 input: memory_exec,
450 metric: ExecutionPlanMetricsSet::new(),
451 tag_columns: vec!["path".to_string()],
452 });
453 let session_context = SessionContext::default();
454 let result = datafusion::physical_plan::collect(normalize_exec, session_context.task_ctx())
455 .await
456 .unwrap();
457 let result_literal = datatypes::arrow::util::pretty::pretty_format_batches(&result)
458 .unwrap()
459 .to_string();
460
461 let expected = String::from(
462 "+---------------------+--------+------+\
463 \n| timestamp | value | path |\
464 \n+---------------------+--------+------+\
465 \n| 1970-01-01T00:00:59 | 0.0 | foo |\
466 \n| 1970-01-01T00:01:59 | 1.0 | foo |\
467 \n| 1969-12-31T23:59:59 | 10.0 | foo |\
468 \n| 1970-01-01T00:00:29 | 100.0 | foo |\
469 \n| 1970-01-01T00:01:29 | 1000.0 | foo |\
470 \n+---------------------+--------+------+",
471 );
472
473 assert_eq!(result_literal, expected);
474 }
475}