flow/repr/
relation.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use datafusion_common::DFSchema;
16use datatypes::data_type::DataType;
17use datatypes::prelude::ConcreteDataType;
18use itertools::Itertools;
19use serde::{Deserialize, Serialize};
20use snafu::{ensure, OptionExt, ResultExt};
21
22use crate::error::{DatafusionSnafu, InternalSnafu, InvalidQuerySnafu, Result, UnexpectedSnafu};
23use crate::expr::{SafeMfpPlan, ScalarExpr};
24
25/// a set of column indices that are "keys" for the collection.
26#[derive(Default, Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
27pub struct Key {
28    /// indicate whose column form key
29    pub column_indices: Vec<usize>,
30}
31
32impl Key {
33    /// create a new Key
34    pub fn new() -> Self {
35        Default::default()
36    }
37
38    /// create a new Key from a vector of column indices
39    pub fn from(mut column_indices: Vec<usize>) -> Self {
40        column_indices.sort_unstable();
41        Self { column_indices }
42    }
43
44    /// Add a column to Key
45    pub fn add_col(&mut self, col: usize) {
46        self.column_indices.push(col);
47    }
48
49    /// Remove a column from Key
50    pub fn remove_col(&mut self, col: usize) {
51        self.column_indices.retain(|&r| r != col);
52    }
53
54    /// get all columns in Key
55    pub fn get(&self) -> &Vec<usize> {
56        &self.column_indices
57    }
58
59    /// True if Key is empty
60    pub fn is_empty(&self) -> bool {
61        self.column_indices.is_empty()
62    }
63
64    /// True if all columns in self are also in other
65    pub fn subset_of(&self, other: &Key) -> bool {
66        self.column_indices
67            .iter()
68            .all(|c| other.column_indices.contains(c))
69    }
70}
71
72/// The type of a relation.
73#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
74pub struct RelationType {
75    /// The type for each column, in order.
76    pub column_types: Vec<ColumnType>,
77    /// Sets of indices that are "keys" for the collection.
78    ///
79    /// Each element in this list is a set of column indices, each with the
80    /// property that the collection contains at most one record with each
81    /// distinct set of values for each column. Alternately, for a specific set
82    /// of values assigned to the these columns there is at most one record.
83    ///
84    /// A collection can contain multiple sets of keys, although it is common to
85    /// have either zero or one sets of key indices.
86    pub keys: Vec<Key>,
87    /// optionally indicate the column that is TIME INDEX
88    pub time_index: Option<usize>,
89    /// mark all the columns that are added automatically by flow, but are not present in original sql
90    pub auto_columns: Vec<usize>,
91}
92
93impl RelationType {
94    pub fn with_autos(mut self, auto_cols: &[usize]) -> Self {
95        self.auto_columns = auto_cols.to_vec();
96        self
97    }
98
99    /// Trying to apply a mpf on current types, will return a new RelationType
100    /// with the new types, will also try to preserve keys&time index information
101    /// if the old key&time index columns are preserve in given mfp
102    ///
103    /// i.e. old column of size 3, with a mfp's
104    ///
105    /// project = `[2, 1]`,
106    ///
107    /// the old key = `[1]`, old time index = `[2]`,
108    ///
109    /// then new key=`[1]`, new time index=`[0]`
110    ///
111    /// note that this function will remove empty keys like key=`[]` will be removed
112    pub fn apply_mfp(&self, mfp: &SafeMfpPlan) -> Result<Self> {
113        let mfp = &mfp.mfp;
114        let mut all_types = self.column_types.clone();
115        for expr in &mfp.expressions {
116            let expr_typ = expr.typ(&self.column_types)?;
117            all_types.push(expr_typ);
118        }
119        let all_types = all_types;
120        let mfp_out_types = mfp
121            .projection
122            .iter()
123            .map(|i| {
124                all_types.get(*i).cloned().with_context(|| UnexpectedSnafu {
125                    reason: format!(
126                        "MFP index out of bound, len is {}, but the index is {}",
127                        all_types.len(),
128                        *i
129                    ),
130                })
131            })
132            .try_collect()?;
133
134        let old_to_new_col = mfp.get_old_to_new_mapping();
135
136        // since it's just a mfp, we also try to preserve keys&time index information, if they survive mfp transform
137        let keys = self
138            .keys
139            .iter()
140            .filter_map(|key| {
141                key.column_indices
142                    .iter()
143                    .map(|old| old_to_new_col.get(old).cloned())
144                    .collect::<Option<Vec<_>>>()
145                    // remove empty keys
146                    .and_then(|v| if v.is_empty() { None } else { Some(v) })
147                    .map(Key::from)
148            })
149            .collect_vec();
150
151        let time_index = self
152            .time_index
153            .and_then(|old| old_to_new_col.get(&old).cloned());
154        let auto_columns = self
155            .auto_columns
156            .iter()
157            .filter_map(|old| old_to_new_col.get(old).cloned())
158            .collect_vec();
159        Ok(Self {
160            column_types: mfp_out_types,
161            keys,
162            time_index,
163            auto_columns,
164        })
165    }
166    /// Constructs a `RelationType` representing the relation with no columns and
167    /// no keys.
168    pub fn empty() -> Self {
169        RelationType::new(vec![])
170    }
171
172    /// Constructs a new `RelationType` from specified column types.
173    ///
174    /// The `RelationType` will have no keys.
175    pub fn new(column_types: Vec<ColumnType>) -> Self {
176        RelationType {
177            column_types,
178            keys: Vec::new(),
179            time_index: None,
180            auto_columns: vec![],
181        }
182    }
183
184    /// Adds a new key for the relation. Also sorts the key indices.
185    ///
186    /// will ignore empty key
187    pub fn with_key(mut self, mut indices: Vec<usize>) -> Self {
188        if indices.is_empty() {
189            return self;
190        }
191        indices.sort_unstable();
192        let key = Key::from(indices);
193        if !self.keys.contains(&key) {
194            self.keys.push(key);
195        }
196        self
197    }
198
199    /// Adds new keys for the relation. Also sorts the key indices.
200    ///
201    /// will ignore empty keys
202    pub fn with_keys(mut self, keys: Vec<Vec<usize>>) -> Self {
203        for key in keys {
204            self = self.with_key(key)
205        }
206        self
207    }
208
209    /// will also remove time index from keys if it's in keys
210    pub fn with_time_index(mut self, time_index: Option<usize>) -> Self {
211        self.time_index = time_index;
212        for key in &mut self.keys {
213            key.remove_col(time_index.unwrap_or(usize::MAX));
214        }
215        // remove empty keys
216        self.keys.retain(|key| !key.is_empty());
217        self
218    }
219
220    /// Computes the number of columns in the relation.
221    pub fn arity(&self) -> usize {
222        self.column_types.len()
223    }
224
225    /// Gets the index of the columns used when creating a default index.
226    pub fn default_key(&self) -> Vec<usize> {
227        if let Some(key) = self.keys.first() {
228            if key.is_empty() {
229                (0..self.column_types.len()).collect()
230            } else {
231                key.get().clone()
232            }
233        } else {
234            (0..self.column_types.len()).collect()
235        }
236    }
237
238    /// True if any collection described by `self` could safely be described by `other`.
239    ///
240    /// In practice this means checking that the scalar types match exactly, and that the
241    /// nullability of `self` is at least as strict as `other`, and that all keys of `other`
242    /// contain some key of `self` (as a set of key columns is less strict than any subset).
243    pub fn subtypes(&self, other: &RelationType) -> bool {
244        if self.column_types.len() != other.column_types.len() {
245            return false;
246        }
247
248        for (col1, col2) in self.column_types.iter().zip(other.column_types.iter()) {
249            if col1.nullable && !col2.nullable {
250                return false;
251            }
252            if col1.scalar_type != col2.scalar_type {
253                return false;
254            }
255        }
256
257        let all_keys = other
258            .keys
259            .iter()
260            .all(|key1| self.keys.iter().any(|key2| key1.subset_of(key2)));
261        if !all_keys {
262            return false;
263        }
264
265        true
266    }
267
268    /// Return relation describe with column names
269    pub fn into_named(self, names: Vec<Option<ColumnName>>) -> RelationDesc {
270        RelationDesc { typ: self, names }
271    }
272
273    /// Return relation describe without column names
274    pub fn into_unnamed(self) -> RelationDesc {
275        RelationDesc {
276            names: vec![None; self.column_types.len()],
277            typ: self,
278        }
279    }
280}
281
282/// The type of a `Value`
283///
284/// [`ColumnType`] bundles information about the scalar type of a datum (e.g.,
285/// Int32 or String) with its nullability.
286///
287/// To construct a column type, either initialize the struct directly, or
288/// use the [`ScalarType::nullable`] method.
289#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
290pub struct ColumnType {
291    /// The underlying scalar type (e.g., Int32 or String) of this column.
292    pub scalar_type: ConcreteDataType,
293    /// Whether this datum can be null.
294    #[serde(default = "return_true")]
295    pub nullable: bool,
296}
297
298impl ColumnType {
299    /// Constructs a new `ColumnType` from a scalar type and a nullability flag.
300    pub fn new(scalar_type: ConcreteDataType, nullable: bool) -> Self {
301        ColumnType {
302            scalar_type,
303            nullable,
304        }
305    }
306
307    /// Constructs a new `ColumnType` from a scalar type, with nullability set to
308    /// ***true***
309    pub fn new_nullable(scalar_type: ConcreteDataType) -> Self {
310        ColumnType {
311            scalar_type,
312            nullable: true,
313        }
314    }
315
316    /// Returns the scalar type of this column.
317    pub fn scalar_type(&self) -> &ConcreteDataType {
318        &self.scalar_type
319    }
320
321    /// Returns true if this column can be null.
322    pub fn nullable(&self) -> bool {
323        self.nullable
324    }
325}
326
327/// This method exists solely for the purpose of making ColumnType nullable by
328/// default in unit tests. The default value of a bool is false, and the only
329/// way to make an object take on any other value by default is to pass it a
330/// function that returns the desired default value. See
331/// <https://github.com/serde-rs/serde/issues/1030>
332#[inline(always)]
333fn return_true() -> bool {
334    true
335}
336
337/// A description of the shape of a relation.
338///
339/// It bundles a [`RelationType`] with the name of each column in the relation.
340/// Individual column names are optional.
341#[derive(Clone, Debug, Eq, PartialEq, Ord, PartialOrd, Serialize, Deserialize, Hash)]
342pub struct RelationDesc {
343    pub typ: RelationType,
344    pub names: Vec<Option<ColumnName>>,
345}
346
347impl RelationDesc {
348    pub fn len(&self) -> Result<usize> {
349        ensure!(
350            self.typ.column_types.len() == self.names.len(),
351            InternalSnafu {
352                reason: "Expect typ and names field to be of same length"
353            }
354        );
355        Ok(self.names.len())
356    }
357
358    pub fn to_df_schema(&self) -> Result<DFSchema> {
359        let fields: Vec<_> = self
360            .iter()
361            .enumerate()
362            .map(|(i, (name, typ))| {
363                let name = name.clone().unwrap_or(format!("Col_{i}"));
364                let nullable = typ.nullable;
365                let data_type = typ.scalar_type.clone().as_arrow_type();
366                arrow_schema::Field::new(name, data_type, nullable)
367            })
368            .collect();
369        let arrow_schema = arrow_schema::Schema::new(fields);
370
371        DFSchema::try_from(arrow_schema.clone()).with_context(|_e| DatafusionSnafu {
372            context: format!("Error when converting to DFSchema: {:?}", arrow_schema),
373        })
374    }
375
376    /// apply mfp, and also project col names for the projected columns
377    pub fn apply_mfp(&self, mfp: &SafeMfpPlan) -> Result<Self> {
378        // TODO(discord9): find a way to deduce name at best effect
379        let names = {
380            let mfp = &mfp.mfp;
381            let mut names = self.names.clone();
382            for expr in &mfp.expressions {
383                if let ScalarExpr::Column(i) = expr {
384                    names.push(self.names.get(*i).cloned().flatten());
385                } else {
386                    names.push(None);
387                }
388            }
389            mfp.projection
390                .iter()
391                .map(|i| names.get(*i).cloned().flatten())
392                .collect_vec()
393        };
394        Ok(Self {
395            typ: self.typ.apply_mfp(mfp)?,
396            names,
397        })
398    }
399}
400
401impl RelationDesc {
402    /// Constructs a new `RelationDesc` that represents the empty relation
403    /// with no columns and no keys.
404    pub fn empty() -> Self {
405        RelationDesc {
406            typ: RelationType::empty(),
407            names: vec![],
408        }
409    }
410
411    /// Constructs a new `RelationDesc` from a `RelationType` and an iterator
412    /// over column names.
413    ///
414    pub fn try_new<I, N>(typ: RelationType, names: I) -> Result<Self>
415    where
416        I: IntoIterator<Item = N>,
417        N: Into<Option<ColumnName>>,
418    {
419        let names: Vec<_> = names.into_iter().map(|name| name.into()).collect();
420        ensure!(
421            typ.arity() == names.len(),
422            InvalidQuerySnafu {
423                reason: format!(
424                    "Length mismatch between RelationType {:?} and column names {:?}",
425                    typ.column_types, names
426                )
427            }
428        );
429        Ok(RelationDesc { typ, names })
430    }
431
432    /// Constructs a new `RelationDesc` from a `RelationType` and an iterator
433    /// over column names.
434    ///
435    /// # Panics
436    ///
437    /// Panics if the arity of the `RelationType` is not equal to the number of
438    /// items in `names`.
439    pub fn new_unchecked<I, N>(typ: RelationType, names: I) -> Self
440    where
441        I: IntoIterator<Item = N>,
442        N: Into<Option<ColumnName>>,
443    {
444        let names: Vec<_> = names.into_iter().map(|name| name.into()).collect();
445        assert_eq!(typ.arity(), names.len());
446        RelationDesc { typ, names }
447    }
448
449    pub fn from_names_and_types<I, T, N>(iter: I) -> Self
450    where
451        I: IntoIterator<Item = (N, T)>,
452        T: Into<ColumnType>,
453        N: Into<Option<ColumnName>>,
454    {
455        let (names, types): (Vec<_>, Vec<_>) = iter.into_iter().unzip();
456        let types = types.into_iter().map(Into::into).collect();
457        let typ = RelationType::new(types);
458        Self::new_unchecked(typ, names)
459    }
460    /// Concatenates a `RelationDesc` onto the end of this `RelationDesc`.
461    pub fn concat(mut self, other: Self) -> Self {
462        let self_len = self.typ.column_types.len();
463        self.names.extend(other.names);
464        self.typ.column_types.extend(other.typ.column_types);
465        for k in other.typ.keys {
466            let k = k
467                .column_indices
468                .into_iter()
469                .map(|idx| idx + self_len)
470                .collect();
471            self = self.with_key(k);
472        }
473        self
474    }
475
476    /// Appends a column with the specified name and type.
477    pub fn with_column<N>(mut self, name: N, column_type: ColumnType) -> Self
478    where
479        N: Into<Option<ColumnName>>,
480    {
481        self.typ.column_types.push(column_type);
482        self.names.push(name.into());
483        self
484    }
485
486    /// Adds a new key for the relation.
487    pub fn with_key(mut self, indices: Vec<usize>) -> Self {
488        self.typ = self.typ.with_key(indices);
489        self
490    }
491
492    /// Builds a new relation description with the column names replaced with
493    /// new names.
494    ///
495    pub fn try_with_names<I, N>(self, names: I) -> Result<Self>
496    where
497        I: IntoIterator<Item = N>,
498        N: Into<Option<ColumnName>>,
499    {
500        Self::try_new(self.typ, names)
501    }
502
503    /// Computes the number of columns in the relation.
504    pub fn arity(&self) -> usize {
505        self.typ.arity()
506    }
507
508    /// Returns the relation type underlying this relation description.
509    pub fn typ(&self) -> &RelationType {
510        &self.typ
511    }
512
513    /// Returns an iterator over the columns in this relation.
514    pub fn iter(&self) -> impl Iterator<Item = (&Option<ColumnName>, &ColumnType)> {
515        self.iter_names().zip(self.iter_types())
516    }
517
518    /// Returns an iterator over the types of the columns in this relation.
519    pub fn iter_types(&self) -> impl Iterator<Item = &ColumnType> {
520        self.typ.column_types.iter()
521    }
522
523    /// Returns an iterator over the names of the columns in this relation.
524    pub fn iter_names(&self) -> impl Iterator<Item = &Option<ColumnName>> {
525        self.names.iter()
526    }
527
528    /// Finds a column by name.
529    ///
530    /// Returns the index and type of the column named `name`. If no column with
531    /// the specified name exists, returns `None`. If multiple columns have the
532    /// specified name, the leftmost column is returned.
533    pub fn get_by_name(&self, name: &ColumnName) -> Option<(usize, &ColumnType)> {
534        self.iter_names()
535            .position(|n| n.as_ref() == Some(name))
536            .map(|i| (i, &self.typ.column_types[i]))
537    }
538
539    /// Gets the name of the `i`th column.
540    ///
541    /// # Panics
542    ///
543    /// Panics if `i` is not a valid column index.
544    pub fn get_name(&self, i: usize) -> &Option<ColumnName> {
545        &self.names[i]
546    }
547}
548
549/// The name of a column in a [`RelationDesc`].
550pub type ColumnName = String;