flow/
repr.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! basically a wrapper around the `datatype` crate
16//! for basic Data Representation
17
18mod relation;
19
20use api::helper::{pb_value_to_value_ref, value_to_grpc_value};
21use api::v1::Row as ProtoRow;
22use datatypes::data_type::ConcreteDataType;
23use datatypes::types::cast;
24use datatypes::value::Value;
25use get_size2::GetSize;
26use itertools::Itertools;
27pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType};
28use serde::{Deserialize, Serialize};
29use snafu::ResultExt;
30
31use crate::expr::error::{CastValueSnafu, EvalError, InvalidArgumentSnafu};
32use crate::utils::get_value_heap_size;
33
34/// System-wide Record count difference type. Useful for capture data change
35///
36/// i.e. +1 means insert one record, -1 means remove,
37/// and +/-n means insert/remove multiple duplicate records.
38pub type Diff = i64;
39
40/// System-wide default timestamp type, in milliseconds
41pub type Timestamp = i64;
42
43/// System-wide default duration type, in milliseconds
44pub type Duration = i64;
45
46/// Default type for a repr of changes to a collection.
47pub type DiffRow = (Row, Timestamp, Diff);
48
49/// Row with key-value pair, timestamp and diff
50pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
51
52/// broadcast channel capacity, can be important to memory consumption, since this influence how many
53/// updates can be buffered in memory in the entire dataflow
54/// TODO(discord9): add config for this, so cpu&mem usage can be balanced and configured by this
55pub const BROADCAST_CAP: usize = 1024;
56
57/// The maximum capacity of the send buffer, to prevent the buffer from growing too large
58pub const SEND_BUF_CAP: usize = BROADCAST_CAP * 2;
59
60/// Flow worker will try to at least accumulate this many rows before processing them(if one second haven't passed)
61pub const BATCH_SIZE: usize = 32 * 16384;
62
63/// Convert a value that is or can be converted to Datetime to internal timestamp
64///
65/// support types are: `Date`, `DateTime`, `TimeStamp`, `i64`
66pub fn value_to_internal_ts(value: Value) -> Result<i64, EvalError> {
67    let is_supported_time_type = |arg: &Value| {
68        let ty = arg.data_type();
69        matches!(
70            ty,
71            ConcreteDataType::Date(..) | ConcreteDataType::Timestamp(..)
72        )
73    };
74    match value {
75        Value::Int64(ts) => Ok(ts),
76        arg if is_supported_time_type(&arg) => {
77            let arg_ty = arg.data_type();
78            let res = cast(arg, &ConcreteDataType::timestamp_millisecond_datatype()).context({
79                CastValueSnafu {
80                    from: arg_ty,
81                    to: ConcreteDataType::timestamp_millisecond_datatype(),
82                }
83            })?;
84            if let Value::Timestamp(ts) = res {
85                Ok(ts.value())
86            } else {
87                unreachable!()
88            }
89        }
90        _ => InvalidArgumentSnafu {
91            reason: format!("Expect a time type or i64, got {:?}", value.data_type()),
92        }
93        .fail(),
94    }
95}
96
97/// A row is a vector of values.
98///
99/// TODO(discord9): use a more efficient representation
100/// i.e. more compact like raw u8 of \[tag0, value0, tag1, value1, ...\]
101#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
102pub struct Row {
103    /// The inner vector of values
104    pub inner: Vec<Value>,
105}
106
107impl GetSize for Row {
108    fn get_heap_size(&self) -> usize {
109        self.inner.iter().map(get_value_heap_size).sum()
110    }
111}
112
113impl Row {
114    /// Create an empty row
115    pub fn empty() -> Self {
116        Self { inner: vec![] }
117    }
118
119    /// Returns true if the Row contains no elements.
120    pub fn is_empty(&self) -> bool {
121        self.inner.is_empty()
122    }
123
124    /// Create a row from a vector of values
125    pub fn new(row: Vec<Value>) -> Self {
126        Self { inner: row }
127    }
128
129    /// Get the value at the given index
130    pub fn get(&self, idx: usize) -> Option<&Value> {
131        self.inner.get(idx)
132    }
133
134    /// Clear the row
135    pub fn clear(&mut self) {
136        self.inner.clear();
137    }
138
139    /// clear and return the inner vector
140    ///
141    /// useful if you want to reuse the vector as a buffer
142    pub fn packer(&mut self) -> &mut Vec<Value> {
143        self.inner.clear();
144        &mut self.inner
145    }
146
147    /// pack a iterator of values into a row
148    pub fn pack<I>(iter: I) -> Row
149    where
150        I: IntoIterator<Item = Value>,
151    {
152        Self {
153            inner: iter.into_iter().collect(),
154        }
155    }
156
157    /// unpack a row into a vector of values
158    pub fn unpack(self) -> Vec<Value> {
159        self.inner
160    }
161
162    /// extend the row with values from an iterator
163    pub fn extend<I>(&mut self, iter: I)
164    where
165        I: IntoIterator<Item = Value>,
166    {
167        self.inner.extend(iter);
168    }
169
170    /// Creates a consuming iterator, that is, one that moves each value out of the `Row` (from start to end). The `Row` cannot be used after calling this
171    pub fn into_iter(self) -> impl Iterator<Item = Value> {
172        self.inner.into_iter()
173    }
174
175    /// Returns an iterator over the slice.
176    pub fn iter(&self) -> impl Iterator<Item = &Value> {
177        self.inner.iter()
178    }
179
180    /// Returns the number of elements in the row, also known as its 'length'.
181    pub fn len(&self) -> usize {
182        self.inner.len()
183    }
184}
185
186impl From<Vec<Value>> for Row {
187    fn from(row: Vec<Value>) -> Self {
188        Row::new(row)
189    }
190}
191
192impl From<ProtoRow> for Row {
193    fn from(row: ProtoRow) -> Self {
194        Row::pack(
195            row.values
196                .iter()
197                .map(|pb_val| -> Value { pb_value_to_value_ref(pb_val, &None).into() }),
198        )
199    }
200}
201
202impl From<Row> for ProtoRow {
203    fn from(row: Row) -> Self {
204        let values = row
205            .unpack()
206            .into_iter()
207            .map(value_to_grpc_value)
208            .collect_vec();
209        ProtoRow { values }
210    }
211}
212#[cfg(test)]
213mod test {
214    use common_time::{Date, Timestamp};
215
216    use super::*;
217
218    #[test]
219    fn test_row() {
220        let row = Row::empty();
221        let row_1 = Row::new(vec![]);
222        assert_eq!(row, row_1);
223        let mut row_2 = Row::new(vec![Value::Int32(1), Value::Int32(2)]);
224        assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
225        row_2.clear();
226        assert_eq!(row_2.get(0), None);
227        row_2
228            .packer()
229            .extend(vec![Value::Int32(1), Value::Int32(2)]);
230        assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
231        row_2.extend(vec![Value::Int32(1), Value::Int32(2)]);
232        assert_eq!(row_2.len(), 4);
233        let row_3 = Row::pack(row_2.into_iter());
234        assert_eq!(row_3.len(), 4);
235        let row_4 = Row::pack(row_3.iter().cloned());
236        assert_eq!(row_3, row_4);
237    }
238
239    #[test]
240    fn test_cast_to_internal_ts() {
241        {
242            let a = Value::from(1i32);
243            let b = Value::from(1i64);
244            let c = Value::Timestamp(Timestamp::new_millisecond(1i64));
245            let d = Value::from(1.0);
246
247            assert!(value_to_internal_ts(a).is_err());
248            assert_eq!(value_to_internal_ts(b).unwrap(), 1i64);
249            assert_eq!(value_to_internal_ts(c).unwrap(), 1i64);
250            assert!(value_to_internal_ts(d).is_err());
251        }
252
253        {
254            // time related type
255            let a = Value::Date(Date::new(1));
256            assert_eq!(value_to_internal_ts(a).unwrap(), 86400 * 1000i64);
257            let b = Value::Timestamp(common_time::Timestamp::new_second(1));
258            assert_eq!(value_to_internal_ts(b).unwrap(), 1000i64);
259            let c = Value::Time(common_time::time::Time::new_second(1));
260            assert!(matches!(
261                value_to_internal_ts(c),
262                Err(EvalError::InvalidArgument { .. })
263            ));
264        }
265    }
266}