flow/batching_mode/
incremental_filter.rs1use common_telemetry::tracing::debug;
16use datafusion_expr::Expr;
17use datatypes::schema::Schema;
18
19use crate::batching_mode::state::FilterExprInfo;
20use crate::batching_mode::utils::IncrementalAggregateAnalysis;
21use crate::{Error, FlowId};
22
23pub(super) fn build_sink_dirty_time_window_filter_expr(
24 flow_id: FlowId,
25 analysis: &IncrementalAggregateAnalysis,
26 sink_schema: &Schema,
27 dirty_filter: Option<&FilterExprInfo>,
28) -> Result<Option<Expr>, Error> {
29 let Some(dirty_filter) = dirty_filter else {
30 return Ok(None);
31 };
32
33 let Some(sink_filter_col) =
34 infer_sink_time_window_filter_col(flow_id, analysis, sink_schema, dirty_filter)
35 else {
36 return Ok(None);
37 };
38
39 dirty_filter.predicate_for_col(&sink_filter_col)
40}
41
42fn infer_sink_time_window_filter_col(
43 flow_id: FlowId,
44 analysis: &IncrementalAggregateAnalysis,
45 sink_schema: &Schema,
46 dirty_filter: &FilterExprInfo,
47) -> Option<String> {
48 if analysis.group_key_names.is_empty() {
49 return None;
50 }
51
52 let is_timestamp_group_key = |name: &str| {
53 analysis.group_key_names.iter().any(|key| key == name)
54 && sink_schema
55 .column_schema_by_name(name)
56 .is_some_and(|col| col.data_type.is_timestamp())
57 };
58
59 if is_timestamp_group_key(&dirty_filter.col_name) {
60 return Some(dirty_filter.col_name.clone());
61 }
62
63 let candidates = analysis
64 .group_key_names
65 .iter()
66 .filter(|name| is_timestamp_group_key(name))
67 .cloned()
68 .collect::<Vec<_>>();
69
70 match candidates.as_slice() {
71 [name] => Some(name.clone()),
72 [] => {
73 debug!(
74 "Flow {} cannot infer sink dirty-window filter column: no timestamp group key in {:?}",
75 flow_id, analysis.group_key_names
76 );
77 None
78 }
79 _ => {
80 debug!(
81 "Flow {} cannot infer sink dirty-window filter column: ambiguous timestamp group keys {:?}",
82 flow_id, candidates
83 );
84 None
85 }
86 }
87}
88
89#[cfg(test)]
90mod test {
91 use datatypes::prelude::ConcreteDataType;
92 use datatypes::schema::ColumnSchema;
93 use pretty_assertions::assert_eq;
94
95 use super::*;
96 use crate::adapter::AUTO_CREATED_UPDATE_AT_TS_COL;
97 use crate::batching_mode::state::FilterExprInfo;
98 use crate::batching_mode::utils::IncrementalAggregateAnalysis;
99
100 fn test_analysis_with_group_keys(group_key_names: Vec<&str>) -> IncrementalAggregateAnalysis {
101 IncrementalAggregateAnalysis {
102 group_key_names: group_key_names
103 .into_iter()
104 .map(|name| name.to_string())
105 .collect(),
106 merge_columns: vec![],
107 literal_columns: vec![],
108 output_field_names: vec![],
109 unsupported_exprs: vec![],
110 }
111 }
112
113 fn test_dirty_filter(col_name: &str) -> FilterExprInfo {
114 FilterExprInfo {
115 expr: datafusion_expr::col(col_name),
116 col_name: col_name.to_string(),
117 time_ranges: vec![],
118 window_size: chrono::Duration::seconds(1),
119 }
120 }
121
122 fn test_sink_schema(columns: Vec<(&str, ConcreteDataType)>) -> Schema {
123 Schema::new(
124 columns
125 .into_iter()
126 .map(|(name, data_type)| ColumnSchema::new(name, data_type, true))
127 .collect(),
128 )
129 }
130
131 #[test]
132 fn test_infer_sink_time_window_filter_col_uses_matching_source_group_key() {
133 let analysis = test_analysis_with_group_keys(vec!["ts", "host"]);
134 let sink_schema = test_sink_schema(vec![
135 ("ts", ConcreteDataType::timestamp_millisecond_datatype()),
136 ("host", ConcreteDataType::string_datatype()),
137 ]);
138 let dirty_filter = test_dirty_filter("ts");
139
140 assert_eq!(
141 Some("ts".to_string()),
142 infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
143 );
144 }
145
146 #[test]
147 fn test_infer_sink_time_window_filter_col_uses_unique_timestamp_group_key() {
148 let analysis = test_analysis_with_group_keys(vec!["host", "time_window"]);
149 let sink_schema = test_sink_schema(vec![
150 ("host", ConcreteDataType::string_datatype()),
151 (
152 "time_window",
153 ConcreteDataType::timestamp_millisecond_datatype(),
154 ),
155 (
156 AUTO_CREATED_UPDATE_AT_TS_COL,
157 ConcreteDataType::timestamp_millisecond_datatype(),
158 ),
159 ]);
160 let dirty_filter = test_dirty_filter("ts");
161
162 assert_eq!(
163 Some("time_window".to_string()),
164 infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
165 );
166 }
167
168 #[test]
169 fn test_infer_sink_time_window_filter_col_skips_global_aggregate() {
170 let analysis = test_analysis_with_group_keys(vec![]);
171 let sink_schema = test_sink_schema(vec![
172 ("number", ConcreteDataType::uint32_datatype()),
173 (
174 "time_window",
175 ConcreteDataType::timestamp_millisecond_datatype(),
176 ),
177 ]);
178 let dirty_filter = test_dirty_filter("ts");
179
180 assert_eq!(
181 None,
182 infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
183 );
184 }
185
186 #[test]
187 fn test_infer_sink_time_window_filter_col_skips_without_timestamp_group_key() {
188 let analysis = test_analysis_with_group_keys(vec!["host", "device"]);
189 let sink_schema = test_sink_schema(vec![
190 ("host", ConcreteDataType::string_datatype()),
191 ("device", ConcreteDataType::string_datatype()),
192 (
193 AUTO_CREATED_UPDATE_AT_TS_COL,
194 ConcreteDataType::timestamp_millisecond_datatype(),
195 ),
196 ]);
197 let dirty_filter = test_dirty_filter("ts");
198
199 assert_eq!(
200 None,
201 infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
202 );
203 }
204
205 #[test]
206 fn test_infer_sink_time_window_filter_col_skips_ambiguous_timestamp_group_keys() {
207 let analysis = test_analysis_with_group_keys(vec!["ts", "time_window"]);
208 let sink_schema = test_sink_schema(vec![
209 ("ts", ConcreteDataType::timestamp_millisecond_datatype()),
210 (
211 "time_window",
212 ConcreteDataType::timestamp_millisecond_datatype(),
213 ),
214 ]);
215 let dirty_filter = test_dirty_filter("source_ts");
216
217 assert_eq!(
218 None,
219 infer_sink_time_window_filter_col(1, &analysis, &sink_schema, &dirty_filter)
220 );
221 }
222}