1use datafusion_common::DFSchema;
16use datatypes::data_type::DataType;
17use datatypes::prelude::ConcreteDataType;
18use itertools::Itertools;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21
22use crate::error::{DatafusionSnafu, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu};
23use crate::expr::{SafeMfpPlan, ScalarExpr};
24
25#[derive(Default, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
27pub struct Key {
28 pub column_indices: Vec<usize>,
30}
31
32impl Key {
33 pub fn new() -> Self {
35 Default::default()
36 }
37
38 pub fn from(mut column_indices: Vec<usize>) -> Self {
40 column_indices.sort_unstable();
41 Self { column_indices }
42 }
43
44 pub fn add_col(&mut self, col: usize) {
46 self.column_indices.push(col);
47 }
48
49 pub fn remove_col(&mut self, col: usize) {
51 self.column_indices.retain(|&r| r != col);
52 }
53
54 pub fn get(&self) -> &Vec<usize> {
56 &self.column_indices
57 }
58
59 pub fn is_empty(&self) -> bool {
61 self.column_indices.is_empty()
62 }
63
64 pub fn subset_of(&self, other: &Key) -> bool {
66 self.column_indices
67 .iter()
68 .all(|c| other.column_indices.contains(c))
69 }
70}
71
72#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
74pub struct RelationType {
75 pub column_types: Vec<ColumnType>,
77 pub keys: Vec<Key>,
87 pub time_index: Option<usize>,
89 pub auto_columns: Vec<usize>,
91}
92
93impl RelationType {
94 pub fn with_autos(mut self, auto_cols: &[usize]) -> Self {
95 self.auto_columns = auto_cols.to_vec();
96 self
97 }
98
99 pub fn apply_mfp(&self, mfp: &SafeMfpPlan) -> Result<Self> {
113 let mfp = &mfp.mfp;
114 let mut all_types = self.column_types.clone();
115 for expr in &mfp.expressions {
116 let expr_typ = expr.typ(&self.column_types)?;
117 all_types.push(expr_typ);
118 }
119 let all_types = all_types;
120 let mfp_out_types = mfp
121 .projection
122 .iter()
123 .map(|i| {
124 all_types.get(*i).cloned().with_context(|| UnexpectedSnafu {
125 reason: format!(
126 "MFP index out of bound, len is {}, but the index is {}",
127 all_types.len(),
128 *i
129 ),
130 })
131 })
132 .try_collect()?;
133
134 let old_to_new_col = mfp.get_old_to_new_mapping();
135
136 let keys = self
138 .keys
139 .iter()
140 .filter_map(|key| {
141 key.column_indices
142 .iter()
143 .map(|old| old_to_new_col.get(old).cloned())
144 .collect::<Option<Vec<_>>>()
145 .and_then(|v| if v.is_empty() { None } else { Some(v) })
147 .map(Key::from)
148 })
149 .collect_vec();
150
151 let time_index = self
152 .time_index
153 .and_then(|old| old_to_new_col.get(&old).cloned());
154 let auto_columns = self
155 .auto_columns
156 .iter()
157 .filter_map(|old| old_to_new_col.get(old).cloned())
158 .collect_vec();
159 Ok(Self {
160 column_types: mfp_out_types,
161 keys,
162 time_index,
163 auto_columns,
164 })
165 }
166 pub fn empty() -> Self {
169 RelationType::new(vec![])
170 }
171
172 pub fn new(column_types: Vec<ColumnType>) -> Self {
176 RelationType {
177 column_types,
178 keys: Vec::new(),
179 time_index: None,
180 auto_columns: vec![],
181 }
182 }
183
184 pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
188 if indices.is_empty() {
189 return self;
190 }
191 indices.sort_unstable();
192 let key = Key::from(indices);
193 if !self.keys.contains(&key) {
194 self.keys.push(key);
195 }
196 self
197 }
198
199 pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
203 for key in keys {
204 self = self.with_key(key)
205 }
206 self
207 }
208
209 pub fn with_time_index(mut self, time_index: Option<usize>) -> Self {
211 self.time_index = time_index;
212 for key in &mut self.keys {
213 key.remove_col(time_index.unwrap_or(usize::MAX));
214 }
215 self.keys.retain(|key| !key.is_empty());
217 self
218 }
219
220 pub fn arity(&self) -> usize {
222 self.column_types.len()
223 }
224
225 pub fn default_key(&self) -> Vec<usize> {
227 if let Some(key) = self.keys.first() {
228 if key.is_empty() {
229 (0..self.column_types.len()).collect()
230 } else {
231 key.get().clone()
232 }
233 } else {
234 (0..self.column_types.len()).collect()
235 }
236 }
237
238 pub fn subtypes(&self, other: &RelationType) -> bool {
244 if self.column_types.len() != other.column_types.len() {
245 return false;
246 }
247
248 for (col1, col2) in self.column_types.iter().zip(other.column_types.iter()) {
249 if col1.nullable && !col2.nullable {
250 return false;
251 }
252 if col1.scalar_type != col2.scalar_type {
253 return false;
254 }
255 }
256
257 let all_keys = other
258 .keys
259 .iter()
260 .all(|key1| self.keys.iter().any(|key2| key1.subset_of(key2)));
261 if !all_keys {
262 return false;
263 }
264
265 true
266 }
267
268 pub fn into_named(self, names: Vec<Option<ColumnName>>) -> RelationDesc {
270 RelationDesc { typ: self, names }
271 }
272
273 pub fn into_unnamed(self) -> RelationDesc {
275 RelationDesc {
276 names: vec![None; self.column_types.len()],
277 typ: self,
278 }
279 }
280}
281
282#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
290pub struct ColumnType {
291 pub scalar_type: ConcreteDataType,
293 #[serde(default = "return_true")]
295 pub nullable: bool,
296}
297
298impl ColumnType {
299 pub fn new(scalar_type: ConcreteDataType, nullable: bool) -> Self {
301 ColumnType {
302 scalar_type,
303 nullable,
304 }
305 }
306
307 pub fn new_nullable(scalar_type: ConcreteDataType) -> Self {
310 ColumnType {
311 scalar_type,
312 nullable: true,
313 }
314 }
315
316 pub fn scalar_type(&self) -> &ConcreteDataType {
318 &self.scalar_type
319 }
320
321 pub fn nullable(&self) -> bool {
323 self.nullable
324 }
325}
326
327#[inline(always)]
333fn return_true() -> bool {
334 true
335}
336
337#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
342pub struct RelationDesc {
343 pub typ: RelationType,
344 pub names: Vec<Option<ColumnName>>,
345}
346
347impl RelationDesc {
348 pub fn len(&self) -> Result<usize> {
349 ensure!(
350 self.typ.column_types.len() == self.names.len(),
351 InternalSnafu {
352 reason: "Expect typ and names field to be of same length"
353 }
354 );
355 Ok(self.names.len())
356 }
357
358 pub fn to_df_schema(&self) -> Result<DFSchema> {
359 let fields: Vec<_> = self
360 .iter()
361 .enumerate()
362 .map(|(i, (name, typ))| {
363 let name = name.clone().unwrap_or(format!("Col_{i}"));
364 let nullable = typ.nullable;
365 let data_type = typ.scalar_type.clone().as_arrow_type();
366 arrow_schema::Field::new(name, data_type, nullable)
367 })
368 .collect();
369 let arrow_schema = arrow_schema::Schema::new(fields);
370
371 DFSchema::try_from(arrow_schema.clone()).with_context(|_e| DatafusionSnafu {
372 context: format!("Error when converting to DFSchema: {:?}", arrow_schema),
373 })
374 }
375
376 pub fn apply_mfp(&self, mfp: &SafeMfpPlan) -> Result<Self> {
378 let names = {
380 let mfp = &mfp.mfp;
381 let mut names = self.names.clone();
382 for expr in &mfp.expressions {
383 if let ScalarExpr::Column(i) = expr {
384 names.push(self.names.get(*i).cloned().flatten());
385 } else {
386 names.push(None);
387 }
388 }
389 mfp.projection
390 .iter()
391 .map(|i| names.get(*i).cloned().flatten())
392 .collect_vec()
393 };
394 Ok(Self {
395 typ: self.typ.apply_mfp(mfp)?,
396 names,
397 })
398 }
399}
400
401impl RelationDesc {
402 pub fn empty() -> Self {
405 RelationDesc {
406 typ: RelationType::empty(),
407 names: vec![],
408 }
409 }
410
411 pub fn try_new<I, N>(typ: RelationType, names: I) -> Result<Self>
415 where
416 I: IntoIterator<Item = N>,
417 N: Into<Option<ColumnName>>,
418 {
419 let names: Vec<_> = names.into_iter().map(|name| name.into()).collect();
420 ensure!(
421 typ.arity() == names.len(),
422 InvalidQuerySnafu {
423 reason: format!(
424 "Length mismatch between RelationType {:?} and column names {:?}",
425 typ.column_types, names
426 )
427 }
428 );
429 Ok(RelationDesc { typ, names })
430 }
431
432 pub fn new_unchecked<I, N>(typ: RelationType, names: I) -> Self
440 where
441 I: IntoIterator<Item = N>,
442 N: Into<Option<ColumnName>>,
443 {
444 let names: Vec<_> = names.into_iter().map(|name| name.into()).collect();
445 assert_eq!(typ.arity(), names.len());
446 RelationDesc { typ, names }
447 }
448
449 pub fn from_names_and_types<I, T, N>(iter: I) -> Self
450 where
451 I: IntoIterator<Item = (N, T)>,
452 T: Into<ColumnType>,
453 N: Into<Option<ColumnName>>,
454 {
455 let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
456 let types = types.into_iter().map(Into::into).collect();
457 let typ = RelationType::new(types);
458 Self::new_unchecked(typ, names)
459 }
460 pub fn concat(mut self, other: Self) -> Self {
462 let self_len = self.typ.column_types.len();
463 self.names.extend(other.names);
464 self.typ.column_types.extend(other.typ.column_types);
465 for k in other.typ.keys {
466 let k = k
467 .column_indices
468 .into_iter()
469 .map(|idx| idx + self_len)
470 .collect();
471 self = self.with_key(k);
472 }
473 self
474 }
475
476 pub fn with_column<N>(mut self, name: N, column_type: ColumnType) -> Self
478 where
479 N: Into<Option<ColumnName>>,
480 {
481 self.typ.column_types.push(column_type);
482 self.names.push(name.into());
483 self
484 }
485
486 pub fn with_key(mut self, indices: Vec<usize>) -> Self {
488 self.typ = self.typ.with_key(indices);
489 self
490 }
491
492 pub fn try_with_names<I, N>(self, names: I) -> Result<Self>
496 where
497 I: IntoIterator<Item = N>,
498 N: Into<Option<ColumnName>>,
499 {
500 Self::try_new(self.typ, names)
501 }
502
503 pub fn arity(&self) -> usize {
505 self.typ.arity()
506 }
507
508 pub fn typ(&self) -> &RelationType {
510 &self.typ
511 }
512
513 pub fn iter(&self) -> impl Iterator<Item = (&Option<ColumnName>, &ColumnType)> {
515 self.iter_names().zip(self.iter_types())
516 }
517
518 pub fn iter_types(&self) -> impl Iterator<Item = &ColumnType> {
520 self.typ.column_types.iter()
521 }
522
523 pub fn iter_names(&self) -> impl Iterator<Item = &Option<ColumnName>> {
525 self.names.iter()
526 }
527
528 pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> {
534 self.iter_names()
535 .position(|n| n.as_ref() == Some(name))
536 .map(|i| (i, &self.typ.column_types[i]))
537 }
538
539 pub fn get_name(&self, i: usize) -> &Option<ColumnName> {
545 &self.names[i]
546 }
547}
548
549pub type ColumnName = String;