1mod relation;
19
20use api::helper::{pb_value_to_value_ref, to_grpc_value};
21use api::v1::Row as ProtoRow;
22use datatypes::data_type::ConcreteDataType;
23use datatypes::types::cast;
24use datatypes::value::Value;
25use get_size2::GetSize;
26use itertools::Itertools;
27pub(crate) use relation::{ColumnType, Key, RelationDesc, RelationType};
28use serde::{Deserialize, Serialize};
29use snafu::ResultExt;
30
31use crate::expr::error::{CastValueSnafu, EvalError, InvalidArgumentSnafu};
32use crate::utils::get_value_heap_size;
33
34pub type Diff = i64;
39
40pub type Timestamp = i64;
42
43pub type Duration = i64;
45
46pub type DiffRow = (Row, Timestamp, Diff);
48
49pub type KeyValDiffRow = ((Row, Row), Timestamp, Diff);
51
52pub const BROADCAST_CAP: usize = 1024;
56
57pub const SEND_BUF_CAP: usize = BROADCAST_CAP * 2;
59
60pub const BATCH_SIZE: usize = 32 * 16384;
62
63pub fn value_to_internal_ts(value: Value) -> Result<i64, EvalError> {
67 let is_supported_time_type = |arg: &Value| {
68 let ty = arg.data_type();
69 matches!(
70 ty,
71 ConcreteDataType::Date(..) | ConcreteDataType::Timestamp(..)
72 )
73 };
74 match value {
75 Value::Int64(ts) => Ok(ts),
76 arg if is_supported_time_type(&arg) => {
77 let arg_ty = arg.data_type();
78 let res = cast(arg, &ConcreteDataType::timestamp_millisecond_datatype()).context({
79 CastValueSnafu {
80 from: arg_ty,
81 to: ConcreteDataType::timestamp_millisecond_datatype(),
82 }
83 })?;
84 if let Value::Timestamp(ts) = res {
85 Ok(ts.value())
86 } else {
87 unreachable!()
88 }
89 }
90 _ => InvalidArgumentSnafu {
91 reason: format!("Expect a time type or i64, got {:?}", value.data_type()),
92 }
93 .fail(),
94 }
95}
96
97#[derive(Clone, Debug, Hash, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
102pub struct Row {
103 pub inner: Vec<Value>,
105}
106
107impl GetSize for Row {
108 fn get_heap_size(&self) -> usize {
109 self.inner.iter().map(get_value_heap_size).sum()
110 }
111}
112
113impl Row {
114 pub fn empty() -> Self {
116 Self { inner: vec![] }
117 }
118
119 pub fn is_empty(&self) -> bool {
121 self.inner.is_empty()
122 }
123
124 pub fn new(row: Vec<Value>) -> Self {
126 Self { inner: row }
127 }
128
129 pub fn get(&self, idx: usize) -> Option<&Value> {
131 self.inner.get(idx)
132 }
133
134 pub fn clear(&mut self) {
136 self.inner.clear();
137 }
138
139 pub fn packer(&mut self) -> &mut Vec<Value> {
143 self.inner.clear();
144 &mut self.inner
145 }
146
147 pub fn pack<I>(iter: I) -> Row
149 where
150 I: IntoIterator<Item = Value>,
151 {
152 Self {
153 inner: iter.into_iter().collect(),
154 }
155 }
156
157 pub fn unpack(self) -> Vec<Value> {
159 self.inner
160 }
161
162 pub fn extend<I>(&mut self, iter: I)
164 where
165 I: IntoIterator<Item = Value>,
166 {
167 self.inner.extend(iter);
168 }
169
170 pub fn into_iter(self) -> impl Iterator<Item = Value> {
172 self.inner.into_iter()
173 }
174
175 pub fn iter(&self) -> impl Iterator<Item = &Value> {
177 self.inner.iter()
178 }
179
180 pub fn len(&self) -> usize {
182 self.inner.len()
183 }
184}
185
186impl From<Vec<Value>> for Row {
187 fn from(row: Vec<Value>) -> Self {
188 Row::new(row)
189 }
190}
191
192impl From<ProtoRow> for Row {
193 fn from(row: ProtoRow) -> Self {
194 Row::pack(
195 row.values
196 .iter()
197 .map(|pb_val| -> Value { pb_value_to_value_ref(pb_val, None).into() }),
198 )
199 }
200}
201
202impl From<Row> for ProtoRow {
203 fn from(row: Row) -> Self {
204 let values = row.unpack().into_iter().map(to_grpc_value).collect_vec();
205 ProtoRow { values }
206 }
207}
208#[cfg(test)]
209mod test {
210 use common_time::{Date, Timestamp};
211
212 use super::*;
213
214 #[test]
215 fn test_row() {
216 let row = Row::empty();
217 let row_1 = Row::new(vec![]);
218 assert_eq!(row, row_1);
219 let mut row_2 = Row::new(vec![Value::Int32(1), Value::Int32(2)]);
220 assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
221 row_2.clear();
222 assert_eq!(row_2.get(0), None);
223 row_2
224 .packer()
225 .extend(vec![Value::Int32(1), Value::Int32(2)]);
226 assert_eq!(row_2.get(0), Some(&Value::Int32(1)));
227 row_2.extend(vec![Value::Int32(1), Value::Int32(2)]);
228 assert_eq!(row_2.len(), 4);
229 let row_3 = Row::pack(row_2.into_iter());
230 assert_eq!(row_3.len(), 4);
231 let row_4 = Row::pack(row_3.iter().cloned());
232 assert_eq!(row_3, row_4);
233 }
234
235 #[test]
236 fn test_cast_to_internal_ts() {
237 {
238 let a = Value::from(1i32);
239 let b = Value::from(1i64);
240 let c = Value::Timestamp(Timestamp::new_millisecond(1i64));
241 let d = Value::from(1.0);
242
243 assert!(value_to_internal_ts(a).is_err());
244 assert_eq!(value_to_internal_ts(b).unwrap(), 1i64);
245 assert_eq!(value_to_internal_ts(c).unwrap(), 1i64);
246 assert!(value_to_internal_ts(d).is_err());
247 }
248
249 {
250 let a = Value::Date(Date::new(1));
252 assert_eq!(value_to_internal_ts(a).unwrap(), 86400 * 1000i64);
253 let b = Value::Timestamp(common_time::Timestamp::new_second(1));
254 assert_eq!(value_to_internal_ts(b).unwrap(), 1000i64);
255 let c = Value::Time(common_time::time::Time::new_second(1));
256 assert!(matches!(
257 value_to_internal_ts(c),
258 Err(EvalError::InvalidArgument { .. })
259 ));
260 }
261 }
262}