1mod relation;
19
20use api::helper::{pb_value_to_value_ref, value_to_grpc_value};
21use api::v1::Row as ProtoRow;
22use datatypes::data_type::ConcreteDataType;
23use datatypes::types::cast;
24use datatypes::value::Value;
25use get_size2::GetSize;
26use itertools::Itertools;
27pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType};
28use serde::{Deserialize, Serialize};
29use snafu::ResultExt;
30
31use crate::expr::error::{CastValueSnafu, EvalError, InvalidArgumentSnafu};
32use crate::utils::get_value_heap_size;
33
34pub type Diff = i64;
39
40pub type Timestamp = i64;
42
43pub type Duration = i64;
45
46pub type DiffRow = (Row, Timestamp, Diff);
48
49pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
51
52pub const BROADCAST_CAP: usize = 1024;
56
57pub const SEND_BUF_CAP: usize = BROADCAST_CAP * 2;
59
60pub const BATCH_SIZE: usize = 32 * 16384;
62
63pub fn value_to_internal_ts(value: Value) -> Result<i64, EvalError> {
67 let is_supported_time_type = |arg: &Value| {
68 let ty = arg.data_type();
69 matches!(
70 ty,
71 ConcreteDataType::Date(..) | ConcreteDataType::Timestamp(..)
72 )
73 };
74 match value {
75 Value::Int64(ts) => Ok(ts),
76 arg if is_supported_time_type(&arg) => {
77 let arg_ty = arg.data_type();
78 let res = cast(arg, &ConcreteDataType::timestamp_millisecond_datatype()).context({
79 CastValueSnafu {
80 from: arg_ty,
81 to: ConcreteDataType::timestamp_millisecond_datatype(),
82 }
83 })?;
84 if let Value::Timestamp(ts) = res {
85 Ok(ts.value())
86 } else {
87 unreachable!()
88 }
89 }
90 _ => InvalidArgumentSnafu {
91 reason: format!("Expect a time type or i64, got {:?}", value.data_type()),
92 }
93 .fail(),
94 }
95}
96
97#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
102pub struct Row {
103 pub inner: Vec<Value>,
105}
106
107impl GetSize for Row {
108 fn get_heap_size(&self) -> usize {
109 self.inner.iter().map(get_value_heap_size).sum()
110 }
111}
112
113impl Row {
114 pub fn empty() -> Self {
116 Self { inner: vec![] }
117 }
118
119 pub fn is_empty(&self) -> bool {
121 self.inner.is_empty()
122 }
123
124 pub fn new(row: Vec<Value>) -> Self {
126 Self { inner: row }
127 }
128
129 pub fn get(&self, idx: usize) -> Option<&Value> {
131 self.inner.get(idx)
132 }
133
134 pub fn clear(&mut self) {
136 self.inner.clear();
137 }
138
139 pub fn packer(&mut self) -> &mut Vec<Value> {
143 self.inner.clear();
144 &mut self.inner
145 }
146
147 pub fn pack<I>(iter: I) -> Row
149 where
150 I: IntoIterator<Item = Value>,
151 {
152 Self {
153 inner: iter.into_iter().collect(),
154 }
155 }
156
157 pub fn unpack(self) -> Vec<Value> {
159 self.inner
160 }
161
162 pub fn extend<I>(&mut self, iter: I)
164 where
165 I: IntoIterator<Item = Value>,
166 {
167 self.inner.extend(iter);
168 }
169
170 pub fn into_iter(self) -> impl Iterator<Item = Value> {
172 self.inner.into_iter()
173 }
174
175 pub fn iter(&self) -> impl Iterator<Item = &Value> {
177 self.inner.iter()
178 }
179
180 pub fn len(&self) -> usize {
182 self.inner.len()
183 }
184}
185
186impl From<Vec<Value>> for Row {
187 fn from(row: Vec<Value>) -> Self {
188 Row::new(row)
189 }
190}
191
192impl From<ProtoRow> for Row {
193 fn from(row: ProtoRow) -> Self {
194 Row::pack(
195 row.values
196 .iter()
197 .map(|pb_val| -> Value { pb_value_to_value_ref(pb_val, &None).into() }),
198 )
199 }
200}
201
202impl From<Row> for ProtoRow {
203 fn from(row: Row) -> Self {
204 let values = row
205 .unpack()
206 .into_iter()
207 .map(value_to_grpc_value)
208 .collect_vec();
209 ProtoRow { values }
210 }
211}
212#[cfg(test)]
213mod test {
214 use common_time::{Date, Timestamp};
215
216 use super::*;
217
218 #[test]
219 fn test_row() {
220 let row = Row::empty();
221 let row_1 = Row::new(vec![]);
222 assert_eq!(row, row_1);
223 let mut row_2 = Row::new(vec![Value::Int32(1), Value::Int32(2)]);
224 assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
225 row_2.clear();
226 assert_eq!(row_2.get(0), None);
227 row_2
228 .packer()
229 .extend(vec![Value::Int32(1), Value::Int32(2)]);
230 assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
231 row_2.extend(vec![Value::Int32(1), Value::Int32(2)]);
232 assert_eq!(row_2.len(), 4);
233 let row_3 = Row::pack(row_2.into_iter());
234 assert_eq!(row_3.len(), 4);
235 let row_4 = Row::pack(row_3.iter().cloned());
236 assert_eq!(row_3, row_4);
237 }
238
239 #[test]
240 fn test_cast_to_internal_ts() {
241 {
242 let a = Value::from(1i32);
243 let b = Value::from(1i64);
244 let c = Value::Timestamp(Timestamp::new_millisecond(1i64));
245 let d = Value::from(1.0);
246
247 assert!(value_to_internal_ts(a).is_err());
248 assert_eq!(value_to_internal_ts(b).unwrap(), 1i64);
249 assert_eq!(value_to_internal_ts(c).unwrap(), 1i64);
250 assert!(value_to_internal_ts(d).is_err());
251 }
252
253 {
254 let a = Value::Date(Date::new(1));
256 assert_eq!(value_to_internal_ts(a).unwrap(), 86400 * 1000i64);
257 let b = Value::Timestamp(common_time::Timestamp::new_second(1));
258 assert_eq!(value_to_internal_ts(b).unwrap(), 1000i64);
259 let c = Value::Time(common_time::time::Time::new_second(1));
260 assert!(matches!(
261 value_to_internal_ts(c),
262 Err(EvalError::InvalidArgument { .. })
263 ));
264 }
265 }
266}