1use std::collections::HashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::helper::time_index_column_schema;
19use api::v1::value::ValueData;
20use api::v1::{
21 ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
22 RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
23};
24use common_grpc::precision::Precision;
25use common_time::Timestamp;
26use common_time::timestamp::TimeUnit;
27use common_time::timestamp::TimeUnit::Nanosecond;
28use snafu::{OptionExt, ResultExt, ensure};
29
30use crate::error::{
31 IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu,
32};
33
34pub struct TableData {
38 schema: Vec<ColumnSchema>,
39 rows: Vec<Row>,
40 column_indexes: HashMap<String, usize>,
41}
42
43impl TableData {
44 pub fn new(num_columns: usize, num_rows: usize) -> Self {
45 Self {
46 schema: Vec::with_capacity(num_columns),
47 rows: Vec::with_capacity(num_rows),
48 column_indexes: HashMap::with_capacity(num_columns),
49 }
50 }
51
52 #[inline]
53 pub fn num_columns(&self) -> usize {
54 self.schema.len()
55 }
56
57 #[inline]
58 pub fn num_rows(&self) -> usize {
59 self.rows.len()
60 }
61
62 #[inline]
63 pub fn alloc_one_row(&self) -> Vec<Value> {
64 vec![Value { value_data: None }; self.num_columns()]
65 }
66
67 #[inline]
68 pub fn add_row(&mut self, values: Vec<Value>) {
69 self.rows.push(Row { values })
70 }
71
72 #[allow(dead_code)]
73 pub fn columns(&self) -> &Vec<ColumnSchema> {
74 &self.schema
75 }
76
77 pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
78 (self.schema, self.rows)
79 }
80
81 pub fn write_field_unchecked(
88 &mut self,
89 name: impl ToString,
90 datatype: ColumnDataType,
91 value: Option<ValueData>,
92 one_row: &mut Vec<Value>,
93 ) {
94 let name = name.to_string();
95 if let Some(index) = self.column_indexes.get(&name).copied() {
96 one_row[index].value_data = value;
97 } else {
98 let index = self.schema.len();
99 self.schema.push(ColumnSchema {
100 column_name: name.clone(),
101 datatype: datatype as i32,
102 semantic_type: SemanticType::Field as i32,
103 ..Default::default()
104 });
105 self.column_indexes.insert(name, index);
106 one_row.push(Value { value_data: value });
107 }
108 }
109}
110
111pub struct MultiTableData {
112 table_data_map: HashMap<String, TableData>,
113}
114
115impl Default for MultiTableData {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121impl MultiTableData {
122 pub fn new() -> Self {
123 Self {
124 table_data_map: HashMap::new(),
125 }
126 }
127
128 pub fn get_or_default_table_data(
129 &mut self,
130 table_name: impl ToString,
131 num_columns: usize,
132 num_rows: usize,
133 ) -> &mut TableData {
134 self.table_data_map
135 .entry(table_name.to_string())
136 .or_insert_with(|| TableData::new(num_columns, num_rows))
137 }
138
139 pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) {
140 self.table_data_map
141 .insert(table_name.to_string(), table_data);
142 }
143
144 #[allow(dead_code)]
145 pub fn num_tables(&self) -> usize {
146 self.table_data_map.len()
147 }
148
149 pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
151 let mut total_rows = 0;
152 let inserts = self
153 .table_data_map
154 .into_iter()
155 .map(|(table_name, table_data)| {
156 total_rows += table_data.num_rows();
157 let num_columns = table_data.num_columns();
158 let (schema, mut rows) = table_data.into_schema_and_rows();
159 for row in &mut rows {
160 if num_columns > row.values.len() {
161 row.values.resize(num_columns, Value { value_data: None });
162 }
163 }
164
165 RowInsertRequest {
166 table_name,
167 rows: Some(Rows { schema, rows }),
168 }
169 })
170 .collect::<Vec<_>>();
171 let row_insert_requests = RowInsertRequests { inserts };
172
173 (row_insert_requests, total_rows)
174 }
175}
176
177pub fn write_tags(
179 table_data: &mut TableData,
180 tags: impl Iterator<Item = (String, String)>,
181 one_row: &mut Vec<Value>,
182) -> Result<()> {
183 let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, Some(ValueData::StringValue(v))));
184 write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
185}
186
187pub fn write_fields(
189 table_data: &mut TableData,
190 fields: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
191 one_row: &mut Vec<Value>,
192) -> Result<()> {
193 write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
194}
195
196pub fn write_tag(
198 table_data: &mut TableData,
199 name: impl ToString,
200 value: impl ToString,
201 one_row: &mut Vec<Value>,
202) -> Result<()> {
203 write_by_semantic_type(
204 table_data,
205 SemanticType::Tag,
206 std::iter::once((
207 name.to_string(),
208 ColumnDataType::String,
209 Some(ValueData::StringValue(value.to_string())),
210 )),
211 one_row,
212 )
213}
214
215pub fn write_f64(
217 table_data: &mut TableData,
218 name: impl ToString,
219 value: f64,
220 one_row: &mut Vec<Value>,
221) -> Result<()> {
222 write_fields(
223 table_data,
224 std::iter::once((
225 name.to_string(),
226 ColumnDataType::Float64,
227 Some(ValueData::F64Value(value)),
228 )),
229 one_row,
230 )
231}
232
233fn build_json_column_schema(name: impl ToString) -> ColumnSchema {
234 ColumnSchema {
235 column_name: name.to_string(),
236 datatype: ColumnDataType::Binary as i32,
237 semantic_type: SemanticType::Field as i32,
238 datatype_extension: Some(ColumnDataTypeExtension {
239 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
240 }),
241 ..Default::default()
242 }
243}
244
245pub fn write_json(
246 table_data: &mut TableData,
247 name: impl ToString,
248 value: jsonb::Value,
249 one_row: &mut Vec<Value>,
250) -> Result<()> {
251 write_by_schema(
252 table_data,
253 std::iter::once((
254 build_json_column_schema(name),
255 Some(ValueData::BinaryValue(value.to_vec())),
256 )),
257 one_row,
258 )
259}
260
261fn write_by_schema(
262 table_data: &mut TableData,
263 kv_iter: impl Iterator<Item = (ColumnSchema, Option<ValueData>)>,
264 one_row: &mut Vec<Value>,
265) -> Result<()> {
266 let TableData {
267 schema,
268 column_indexes,
269 ..
270 } = table_data;
271
272 for (column_schema, value) in kv_iter {
273 let index = column_indexes.get(&column_schema.column_name);
274 if let Some(index) = index {
275 check_schema_number(
276 column_schema.datatype,
277 column_schema.semantic_type,
278 &schema[*index],
279 )?;
280 one_row[*index].value_data = value;
281 } else {
282 let index = schema.len();
283 let key = column_schema.column_name.clone();
284 schema.push(column_schema);
285 column_indexes.insert(key, index);
286 one_row.push(Value { value_data: value });
287 }
288 }
289
290 Ok(())
291}
292
293fn write_by_semantic_type(
294 table_data: &mut TableData,
295 semantic_type: SemanticType,
296 ktv_iter: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
297 one_row: &mut Vec<Value>,
298) -> Result<()> {
299 let TableData {
300 schema,
301 column_indexes,
302 ..
303 } = table_data;
304
305 for (name, datatype, value) in ktv_iter {
306 let index = column_indexes.get(&name);
307 if let Some(index) = index {
308 check_schema(datatype, semantic_type, &schema[*index])?;
309 one_row[*index].value_data = value;
310 } else {
311 let index = schema.len();
312 schema.push(ColumnSchema {
313 column_name: name.clone(),
314 datatype: datatype as i32,
315 semantic_type: semantic_type as i32,
316 ..Default::default()
317 });
318 column_indexes.insert(name, index);
319 one_row.push(Value { value_data: value });
320 }
321 }
322
323 Ok(())
324}
325
326pub fn write_ts_to_millis(
328 table_data: &mut TableData,
329 name: impl ToString,
330 ts: Option<i64>,
331 precision: Precision,
332 one_row: &mut Vec<Value>,
333) -> Result<()> {
334 write_ts_to(
335 table_data,
336 name,
337 ts,
338 precision,
339 TimestampType::Millis,
340 one_row,
341 )
342}
343
344pub fn write_ts_to_nanos(
346 table_data: &mut TableData,
347 name: impl ToString,
348 ts: Option<i64>,
349 precision: Precision,
350 one_row: &mut Vec<Value>,
351) -> Result<()> {
352 write_ts_to(
353 table_data,
354 name,
355 ts,
356 precision,
357 TimestampType::Nanos,
358 one_row,
359 )
360}
361
362enum TimestampType {
363 Millis,
364 Nanos,
365}
366
367fn write_ts_to(
368 table_data: &mut TableData,
369 name: impl ToString,
370 ts: Option<i64>,
371 precision: Precision,
372 ts_type: TimestampType,
373 one_row: &mut Vec<Value>,
374) -> Result<()> {
375 let TableData {
376 schema,
377 column_indexes,
378 ..
379 } = table_data;
380 let name = name.to_string();
381
382 let ts = match ts {
383 Some(timestamp) => match ts_type {
384 TimestampType::Millis => precision.to_millis(timestamp),
385 TimestampType::Nanos => precision.to_nanos(timestamp),
386 }
387 .with_context(|| TimestampOverflowSnafu {
388 error: format!(
389 "timestamp {} overflow with precision {}",
390 timestamp, precision
391 ),
392 })?,
393 None => {
394 let timestamp = Timestamp::current_time(Nanosecond);
395 let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?;
396 let timestamp = timestamp
397 .convert_to(unit)
398 .with_context(|| TimePrecisionSnafu {
399 name: precision.to_string(),
400 })?
401 .into();
402 match ts_type {
403 TimestampType::Millis => precision.to_millis(timestamp),
404 TimestampType::Nanos => precision.to_nanos(timestamp),
405 }
406 .with_context(|| TimestampOverflowSnafu {
407 error: format!(
408 "timestamp {} overflow with precision {}",
409 timestamp, precision
410 ),
411 })?
412 }
413 };
414
415 let (datatype, ts) = match ts_type {
416 TimestampType::Millis => (
417 ColumnDataType::TimestampMillisecond,
418 ValueData::TimestampMillisecondValue(ts),
419 ),
420 TimestampType::Nanos => (
421 ColumnDataType::TimestampNanosecond,
422 ValueData::TimestampNanosecondValue(ts),
423 ),
424 };
425
426 let index = column_indexes.get(&name);
427 if let Some(index) = index {
428 check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
429 one_row[*index].value_data = Some(ts);
430 } else {
431 let index = schema.len();
432 schema.push(time_index_column_schema(&name, datatype));
433 column_indexes.insert(name, index);
434 one_row.push(ts.into())
435 }
436
437 Ok(())
438}
439
440fn check_schema(
441 datatype: ColumnDataType,
442 semantic_type: SemanticType,
443 schema: &ColumnSchema,
444) -> Result<()> {
445 check_schema_number(datatype as i32, semantic_type as i32, schema)
446}
447
448fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> {
449 ensure!(
450 schema.datatype == datatype,
451 IncompatibleSchemaSnafu {
452 column_name: &schema.column_name,
453 datatype: "datatype",
454 expected: schema.datatype,
455 actual: datatype,
456 }
457 );
458
459 ensure!(
460 schema.semantic_type == semantic_type,
461 IncompatibleSchemaSnafu {
462 column_name: &schema.column_name,
463 datatype: "semantic_type",
464 expected: schema.semantic_type,
465 actual: semantic_type,
466 }
467 );
468
469 Ok(())
470}