1use std::collections::HashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::helper::time_index_column_schema;
19use api::v1::value::ValueData;
20use api::v1::{
21 ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
22 RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
23};
24use common_grpc::precision::Precision;
25use common_time::Timestamp;
26use common_time::timestamp::TimeUnit;
27use common_time::timestamp::TimeUnit::Nanosecond;
28use snafu::{OptionExt, ResultExt, ensure};
29
30use crate::error::{
31 IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu,
32};
33
34pub struct TableData {
38 schema: Vec<ColumnSchema>,
39 rows: Vec<Row>,
40 column_indexes: HashMap<String, usize>,
41}
42
43impl TableData {
44 pub fn new(num_columns: usize, num_rows: usize) -> Self {
45 Self {
46 schema: Vec::with_capacity(num_columns),
47 rows: Vec::with_capacity(num_rows),
48 column_indexes: HashMap::with_capacity(num_columns),
49 }
50 }
51
52 #[inline]
53 pub fn num_columns(&self) -> usize {
54 self.schema.len()
55 }
56
57 #[inline]
58 pub fn num_rows(&self) -> usize {
59 self.rows.len()
60 }
61
62 #[inline]
63 pub fn alloc_one_row(&self) -> Vec<Value> {
64 vec![Value { value_data: None }; self.num_columns()]
65 }
66
67 #[inline]
68 pub fn add_row(&mut self, values: Vec<Value>) {
69 self.rows.push(Row { values })
70 }
71
72 #[inline]
73 pub fn reserve_rows(&mut self, additional: usize) {
74 self.rows.reserve(additional);
75 }
76
77 #[allow(dead_code)]
78 pub fn columns(&self) -> &Vec<ColumnSchema> {
79 &self.schema
80 }
81
82 pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
83 (self.schema, self.rows)
84 }
85
86 pub fn write_field_unchecked(
93 &mut self,
94 name: impl ToString,
95 datatype: ColumnDataType,
96 value: Option<ValueData>,
97 one_row: &mut Vec<Value>,
98 ) {
99 let name = name.to_string();
100 if let Some(index) = self.column_indexes.get(&name).copied() {
101 one_row[index].value_data = value;
102 } else {
103 let index = self.schema.len();
104 self.schema.push(ColumnSchema {
105 column_name: name.clone(),
106 datatype: datatype as i32,
107 semantic_type: SemanticType::Field as i32,
108 ..Default::default()
109 });
110 self.column_indexes.insert(name, index);
111 one_row.push(Value { value_data: value });
112 }
113 }
114}
115
116pub struct MultiTableData {
117 table_data_map: HashMap<String, TableData>,
118}
119
120impl Default for MultiTableData {
121 fn default() -> Self {
122 Self::new()
123 }
124}
125
126impl MultiTableData {
127 pub fn new() -> Self {
128 Self {
129 table_data_map: HashMap::new(),
130 }
131 }
132
133 pub fn get_or_default_table_data(
134 &mut self,
135 table_name: impl ToString,
136 num_columns: usize,
137 num_rows: usize,
138 ) -> &mut TableData {
139 self.table_data_map
140 .entry(table_name.to_string())
141 .or_insert_with(|| TableData::new(num_columns, num_rows))
142 }
143
144 pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) {
145 self.table_data_map
146 .insert(table_name.to_string(), table_data);
147 }
148
149 #[allow(dead_code)]
150 pub fn num_tables(&self) -> usize {
151 self.table_data_map.len()
152 }
153
154 pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
156 let mut total_rows = 0;
157 let inserts = self
158 .table_data_map
159 .into_iter()
160 .map(|(table_name, table_data)| {
161 total_rows += table_data.num_rows();
162 let num_columns = table_data.num_columns();
163 let (schema, mut rows) = table_data.into_schema_and_rows();
164 for row in &mut rows {
165 if num_columns > row.values.len() {
166 row.values.resize(num_columns, Value { value_data: None });
167 }
168 }
169
170 RowInsertRequest {
171 table_name,
172 rows: Some(Rows { schema, rows }),
173 }
174 })
175 .collect::<Vec<_>>();
176 let row_insert_requests = RowInsertRequests { inserts };
177
178 (row_insert_requests, total_rows)
179 }
180}
181
182pub fn write_tags(
184 table_data: &mut TableData,
185 tags: impl Iterator<Item = (String, String)>,
186 one_row: &mut Vec<Value>,
187) -> Result<()> {
188 let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, Some(ValueData::StringValue(v))));
189 write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
190}
191
192pub fn write_fields(
194 table_data: &mut TableData,
195 fields: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
196 one_row: &mut Vec<Value>,
197) -> Result<()> {
198 write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
199}
200
201pub fn write_tag(
203 table_data: &mut TableData,
204 name: impl ToString,
205 value: impl ToString,
206 one_row: &mut Vec<Value>,
207) -> Result<()> {
208 write_by_semantic_type(
209 table_data,
210 SemanticType::Tag,
211 std::iter::once((
212 name.to_string(),
213 ColumnDataType::String,
214 Some(ValueData::StringValue(value.to_string())),
215 )),
216 one_row,
217 )
218}
219
220pub fn write_f64(
222 table_data: &mut TableData,
223 name: impl ToString,
224 value: f64,
225 one_row: &mut Vec<Value>,
226) -> Result<()> {
227 write_fields(
228 table_data,
229 std::iter::once((
230 name.to_string(),
231 ColumnDataType::Float64,
232 Some(ValueData::F64Value(value)),
233 )),
234 one_row,
235 )
236}
237
238fn build_json_column_schema(name: impl ToString) -> ColumnSchema {
239 ColumnSchema {
240 column_name: name.to_string(),
241 datatype: ColumnDataType::Binary as i32,
242 semantic_type: SemanticType::Field as i32,
243 datatype_extension: Some(ColumnDataTypeExtension {
244 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
245 }),
246 ..Default::default()
247 }
248}
249
250pub fn write_json(
251 table_data: &mut TableData,
252 name: impl ToString,
253 value: jsonb::Value,
254 one_row: &mut Vec<Value>,
255) -> Result<()> {
256 write_by_schema(
257 table_data,
258 std::iter::once((
259 build_json_column_schema(name),
260 Some(ValueData::BinaryValue(value.to_vec())),
261 )),
262 one_row,
263 )
264}
265
266fn write_by_schema(
267 table_data: &mut TableData,
268 kv_iter: impl Iterator<Item = (ColumnSchema, Option<ValueData>)>,
269 one_row: &mut Vec<Value>,
270) -> Result<()> {
271 let TableData {
272 schema,
273 column_indexes,
274 ..
275 } = table_data;
276
277 for (column_schema, value) in kv_iter {
278 let index = column_indexes.get(&column_schema.column_name);
279 if let Some(index) = index {
280 check_schema_number(
281 column_schema.datatype,
282 column_schema.semantic_type,
283 &schema[*index],
284 )?;
285 one_row[*index].value_data = value;
286 } else {
287 let index = schema.len();
288 let key = column_schema.column_name.clone();
289 schema.push(column_schema);
290 column_indexes.insert(key, index);
291 one_row.push(Value { value_data: value });
292 }
293 }
294
295 Ok(())
296}
297
298fn write_by_semantic_type(
299 table_data: &mut TableData,
300 semantic_type: SemanticType,
301 ktv_iter: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
302 one_row: &mut Vec<Value>,
303) -> Result<()> {
304 let TableData {
305 schema,
306 column_indexes,
307 ..
308 } = table_data;
309
310 for (name, datatype, value) in ktv_iter {
311 let index = column_indexes.get(&name);
312 if let Some(index) = index {
313 check_schema(datatype, semantic_type, &schema[*index])?;
314 one_row[*index].value_data = value;
315 } else {
316 let index = schema.len();
317 schema.push(ColumnSchema {
318 column_name: name.clone(),
319 datatype: datatype as i32,
320 semantic_type: semantic_type as i32,
321 ..Default::default()
322 });
323 column_indexes.insert(name, index);
324 one_row.push(Value { value_data: value });
325 }
326 }
327
328 Ok(())
329}
330
331pub fn write_ts_to_millis(
333 table_data: &mut TableData,
334 name: impl ToString,
335 ts: Option<i64>,
336 precision: Precision,
337 one_row: &mut Vec<Value>,
338) -> Result<()> {
339 write_ts_to(
340 table_data,
341 name,
342 ts,
343 precision,
344 TimestampType::Millis,
345 one_row,
346 )
347}
348
349pub fn write_ts_to_nanos(
351 table_data: &mut TableData,
352 name: impl ToString,
353 ts: Option<i64>,
354 precision: Precision,
355 one_row: &mut Vec<Value>,
356) -> Result<()> {
357 write_ts_to(
358 table_data,
359 name,
360 ts,
361 precision,
362 TimestampType::Nanos,
363 one_row,
364 )
365}
366
367enum TimestampType {
368 Millis,
369 Nanos,
370}
371
372fn write_ts_to(
373 table_data: &mut TableData,
374 name: impl ToString,
375 ts: Option<i64>,
376 precision: Precision,
377 ts_type: TimestampType,
378 one_row: &mut Vec<Value>,
379) -> Result<()> {
380 let TableData {
381 schema,
382 column_indexes,
383 ..
384 } = table_data;
385 let name = name.to_string();
386
387 let ts = match ts {
388 Some(timestamp) => match ts_type {
389 TimestampType::Millis => precision.to_millis(timestamp),
390 TimestampType::Nanos => precision.to_nanos(timestamp),
391 }
392 .with_context(|| TimestampOverflowSnafu {
393 error: format!(
394 "timestamp {} overflow with precision {}",
395 timestamp, precision
396 ),
397 })?,
398 None => {
399 let timestamp = Timestamp::current_time(Nanosecond);
400 let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?;
401 let timestamp = timestamp
402 .convert_to(unit)
403 .with_context(|| TimePrecisionSnafu {
404 name: precision.to_string(),
405 })?
406 .into();
407 match ts_type {
408 TimestampType::Millis => precision.to_millis(timestamp),
409 TimestampType::Nanos => precision.to_nanos(timestamp),
410 }
411 .with_context(|| TimestampOverflowSnafu {
412 error: format!(
413 "timestamp {} overflow with precision {}",
414 timestamp, precision
415 ),
416 })?
417 }
418 };
419
420 let (datatype, ts) = match ts_type {
421 TimestampType::Millis => (
422 ColumnDataType::TimestampMillisecond,
423 ValueData::TimestampMillisecondValue(ts),
424 ),
425 TimestampType::Nanos => (
426 ColumnDataType::TimestampNanosecond,
427 ValueData::TimestampNanosecondValue(ts),
428 ),
429 };
430
431 let index = column_indexes.get(&name);
432 if let Some(index) = index {
433 check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
434 one_row[*index].value_data = Some(ts);
435 } else {
436 let index = schema.len();
437 schema.push(time_index_column_schema(&name, datatype));
438 column_indexes.insert(name, index);
439 one_row.push(ts.into())
440 }
441
442 Ok(())
443}
444
445fn check_schema(
446 datatype: ColumnDataType,
447 semantic_type: SemanticType,
448 schema: &ColumnSchema,
449) -> Result<()> {
450 check_schema_number(datatype as i32, semantic_type as i32, schema)
451}
452
453fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> {
454 ensure!(
455 schema.datatype == datatype,
456 IncompatibleSchemaSnafu {
457 column_name: &schema.column_name,
458 datatype: "datatype",
459 expected: schema.datatype,
460 actual: datatype,
461 }
462 );
463
464 ensure!(
465 schema.semantic_type == semantic_type,
466 IncompatibleSchemaSnafu {
467 column_name: &schema.column_name,
468 datatype: "semantic_type",
469 expected: schema.semantic_type,
470 actual: semantic_type,
471 }
472 );
473
474 Ok(())
475}