Skip to main content

flow/batching_mode/
incremental_filter.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use common_telemetry::tracing::debug;
16use datafusion_expr::Expr;
17use datatypes::schema::Schema;
18
19use crate::batching_mode::state::FilterExprInfo;
20use crate::batching_mode::utils::IncrementalAggregateAnalysis;
21use crate::{Error, FlowId};
22
23pub(super) fn build_sink_dirty_time_window_filter_expr(
24    flow_id: FlowId,
25    analysis: &IncrementalAggregateAnalysis,
26    sink_schema: &Schema,
27    dirty_filter: Option<&FilterExprInfo>,
28) -> Result<Option<Expr>, Error> {
29    let Some(dirty_filter) = dirty_filter else {
30        return Ok(None);
31    };
32
33    let Some(sink_filter_col) =
34        infer_sink_time_window_filter_col(flow_id, analysis, sink_schema, dirty_filter)
35    else {
36        return Ok(None);
37    };
38
39    dirty_filter.predicate_for_col(&sink_filter_col)
40}
41
42fn infer_sink_time_window_filter_col(
43    flow_id: FlowId,
44    analysis: &IncrementalAggregateAnalysis,
45    sink_schema: &Schema,
46    dirty_filter: &FilterExprInfo,
47) -> Option<String> {
48    if analysis.group_key_names.is_empty() {
49        return None;
50    }
51
52    let is_timestamp_group_key = |name: &str| {
53        analysis.group_key_names.iter().any(|key| key == name)
54            && sink_schema
55                .column_schema_by_name(name)
56                .is_some_and(|col| col.data_type.is_timestamp())
57    };
58
59    if is_timestamp_group_key(&dirty_filter.col_name) {
60        return Some(dirty_filter.col_name.clone());
61    }
62
63    let candidates = analysis
64        .group_key_names
65        .iter()
66        .filter(|name| is_timestamp_group_key(name))
67        .cloned()
68        .collect::<Vec<_>>();
69
70    match candidates.as_slice() {
71        [name] => Some(name.clone()),
72        [] => {
73            debug!(
74                "Flow {} cannot infer sink dirty-window filter column: no timestamp group key in {:?}",
75                flow_id, analysis.group_key_names
76            );
77            None
78        }
79        _ => {
80            debug!(
81                "Flow {} cannot infer sink dirty-window filter column: ambiguous timestamp group keys {:?}",
82                flow_id, candidates
83            );
84            None
85        }
86    }
87}
88
89#[cfg(test)]
90mod test {
91    use datatypes::prelude::ConcreteDataType;
92    use datatypes::schema::ColumnSchema;
93    use pretty_assertions::assert_eq;
94
95    use super::*;
96    use crate::adapter::AUTO_CREATED_UPDATE_AT_TS_COL;
97    use crate::batching_mode::state::FilterExprInfo;
98    use crate::batching_mode::utils::IncrementalAggregateAnalysis;
99
100    fn test_analysis_with_group_keys(group_key_names: Vec<&str>) -> IncrementalAggregateAnalysis {
101        IncrementalAggregateAnalysis {
102            group_key_names: group_key_names
103                .into_iter()
104                .map(|name| name.to_string())
105                .collect(),
106            merge_columns: vec![],
107            literal_columns: vec![],
108            output_field_names: vec![],
109            unsupported_exprs: vec![],
110        }
111    }
112
113    fn test_dirty_filter(col_name: &str) -> FilterExprInfo {
114        FilterExprInfo {
115            expr: datafusion_expr::col(col_name),
116            col_name: col_name.to_string(),
117            time_ranges: vec![],
118            window_size: chrono::Duration::seconds(1),
119        }
120    }
121
122    fn test_sink_schema(columns: Vec<(&str, ConcreteDataType)>) -> Schema {
123        Schema::new(
124            columns
125                .into_iter()
126                .map(|(name, data_type)| ColumnSchema::new(name, data_type, true))
127                .collect(),
128        )
129    }
130
131    #[test]
132    fn test_infer_sink_time_window_filter_col_uses_matching_source_group_key() {
133        let analysis = test_analysis_with_group_keys(vec!["ts", "host"]);
134        let sink_schema = test_sink_schema(vec![
135            ("ts", ConcreteDataType::timestamp_millisecond_datatype()),
136            ("host", ConcreteDataType::string_datatype()),
137        ]);
138        let dirty_filter = test_dirty_filter("ts");
139
140        assert_eq!(
141            Some("ts".to_string()),
142            infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
143        );
144    }
145
146    #[test]
147    fn test_infer_sink_time_window_filter_col_uses_unique_timestamp_group_key() {
148        let analysis = test_analysis_with_group_keys(vec!["host", "time_window"]);
149        let sink_schema = test_sink_schema(vec![
150            ("host", ConcreteDataType::string_datatype()),
151            (
152                "time_window",
153                ConcreteDataType::timestamp_millisecond_datatype(),
154            ),
155            (
156                AUTO_CREATED_UPDATE_AT_TS_COL,
157                ConcreteDataType::timestamp_millisecond_datatype(),
158            ),
159        ]);
160        let dirty_filter = test_dirty_filter("ts");
161
162        assert_eq!(
163            Some("time_window".to_string()),
164            infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
165        );
166    }
167
168    #[test]
169    fn test_infer_sink_time_window_filter_col_skips_global_aggregate() {
170        let analysis = test_analysis_with_group_keys(vec![]);
171        let sink_schema = test_sink_schema(vec![
172            ("number", ConcreteDataType::uint32_datatype()),
173            (
174                "time_window",
175                ConcreteDataType::timestamp_millisecond_datatype(),
176            ),
177        ]);
178        let dirty_filter = test_dirty_filter("ts");
179
180        assert_eq!(
181            None,
182            infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
183        );
184    }
185
186    #[test]
187    fn test_infer_sink_time_window_filter_col_skips_without_timestamp_group_key() {
188        let analysis = test_analysis_with_group_keys(vec!["host", "device"]);
189        let sink_schema = test_sink_schema(vec![
190            ("host", ConcreteDataType::string_datatype()),
191            ("device", ConcreteDataType::string_datatype()),
192            (
193                AUTO_CREATED_UPDATE_AT_TS_COL,
194                ConcreteDataType::timestamp_millisecond_datatype(),
195            ),
196        ]);
197        let dirty_filter = test_dirty_filter("ts");
198
199        assert_eq!(
200            None,
201            infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
202        );
203    }
204
205    #[test]
206    fn test_infer_sink_time_window_filter_col_skips_ambiguous_timestamp_group_keys() {
207        let analysis = test_analysis_with_group_keys(vec!["ts", "time_window"]);
208        let sink_schema = test_sink_schema(vec![
209            ("ts", ConcreteDataType::timestamp_millisecond_datatype()),
210            (
211                "time_window",
212                ConcreteDataType::timestamp_millisecond_datatype(),
213            ),
214        ]);
215        let dirty_filter = test_dirty_filter("source_ts");
216
217        assert_eq!(
218            None,
219            infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
220        );
221    }
222}