1use std::collections::HashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::value::ValueData;
19use api::v1::{
20 ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
21 RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
22};
23use common_grpc::precision::Precision;
24use common_time::timestamp::TimeUnit;
25use common_time::timestamp::TimeUnit::Nanosecond;
26use common_time::Timestamp;
27use snafu::{ensure, OptionExt, ResultExt};
28
29use crate::error::{
30 IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu,
31};
32
33pub struct TableData {
37 schema: Vec<ColumnSchema>,
38 rows: Vec<Row>,
39 column_indexes: HashMap<String, usize>,
40}
41
42impl TableData {
43 pub fn new(num_columns: usize, num_rows: usize) -> Self {
44 Self {
45 schema: Vec::with_capacity(num_columns),
46 rows: Vec::with_capacity(num_rows),
47 column_indexes: HashMap::with_capacity(num_columns),
48 }
49 }
50
51 #[inline]
52 pub fn num_columns(&self) -> usize {
53 self.schema.len()
54 }
55
56 #[inline]
57 pub fn num_rows(&self) -> usize {
58 self.rows.len()
59 }
60
61 #[inline]
62 pub fn alloc_one_row(&self) -> Vec<Value> {
63 vec![Value { value_data: None }; self.num_columns()]
64 }
65
66 #[inline]
67 pub fn add_row(&mut self, values: Vec<Value>) {
68 self.rows.push(Row { values })
69 }
70
71 #[allow(dead_code)]
72 pub fn columns(&self) -> &Vec<ColumnSchema> {
73 &self.schema
74 }
75
76 pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
77 (self.schema, self.rows)
78 }
79}
80
81pub struct MultiTableData {
82 table_data_map: HashMap<String, TableData>,
83}
84
85impl Default for MultiTableData {
86 fn default() -> Self {
87 Self::new()
88 }
89}
90
91impl MultiTableData {
92 pub fn new() -> Self {
93 Self {
94 table_data_map: HashMap::new(),
95 }
96 }
97
98 pub fn get_or_default_table_data(
99 &mut self,
100 table_name: impl ToString,
101 num_columns: usize,
102 num_rows: usize,
103 ) -> &mut TableData {
104 self.table_data_map
105 .entry(table_name.to_string())
106 .or_insert_with(|| TableData::new(num_columns, num_rows))
107 }
108
109 pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) {
110 self.table_data_map
111 .insert(table_name.to_string(), table_data);
112 }
113
114 #[allow(dead_code)]
115 pub fn num_tables(&self) -> usize {
116 self.table_data_map.len()
117 }
118
119 pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
121 let mut total_rows = 0;
122 let inserts = self
123 .table_data_map
124 .into_iter()
125 .map(|(table_name, table_data)| {
126 total_rows += table_data.num_rows();
127 let num_columns = table_data.num_columns();
128 let (schema, mut rows) = table_data.into_schema_and_rows();
129 for row in &mut rows {
130 if num_columns > row.values.len() {
131 row.values.resize(num_columns, Value { value_data: None });
132 }
133 }
134
135 RowInsertRequest {
136 table_name,
137 rows: Some(Rows { schema, rows }),
138 }
139 })
140 .collect::<Vec<_>>();
141 let row_insert_requests = RowInsertRequests { inserts };
142
143 (row_insert_requests, total_rows)
144 }
145}
146
147pub fn write_tags(
149 table_data: &mut TableData,
150 tags: impl Iterator<Item = (String, String)>,
151 one_row: &mut Vec<Value>,
152) -> Result<()> {
153 let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, ValueData::StringValue(v)));
154 write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
155}
156
157pub fn write_fields(
159 table_data: &mut TableData,
160 fields: impl Iterator<Item = (String, ColumnDataType, ValueData)>,
161 one_row: &mut Vec<Value>,
162) -> Result<()> {
163 write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
164}
165
166pub fn write_tag(
168 table_data: &mut TableData,
169 name: impl ToString,
170 value: impl ToString,
171 one_row: &mut Vec<Value>,
172) -> Result<()> {
173 write_by_semantic_type(
174 table_data,
175 SemanticType::Tag,
176 std::iter::once((
177 name.to_string(),
178 ColumnDataType::String,
179 ValueData::StringValue(value.to_string()),
180 )),
181 one_row,
182 )
183}
184
185pub fn write_f64(
187 table_data: &mut TableData,
188 name: impl ToString,
189 value: f64,
190 one_row: &mut Vec<Value>,
191) -> Result<()> {
192 write_fields(
193 table_data,
194 std::iter::once((
195 name.to_string(),
196 ColumnDataType::Float64,
197 ValueData::F64Value(value),
198 )),
199 one_row,
200 )
201}
202
203fn build_json_column_schema(name: impl ToString) -> ColumnSchema {
204 ColumnSchema {
205 column_name: name.to_string(),
206 datatype: ColumnDataType::Binary as i32,
207 semantic_type: SemanticType::Field as i32,
208 datatype_extension: Some(ColumnDataTypeExtension {
209 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
210 }),
211 ..Default::default()
212 }
213}
214
215pub fn write_json(
216 table_data: &mut TableData,
217 name: impl ToString,
218 value: jsonb::Value,
219 one_row: &mut Vec<Value>,
220) -> Result<()> {
221 write_by_schema(
222 table_data,
223 std::iter::once((
224 build_json_column_schema(name),
225 ValueData::BinaryValue(value.to_vec()),
226 )),
227 one_row,
228 )
229}
230
231fn write_by_schema(
232 table_data: &mut TableData,
233 kv_iter: impl Iterator<Item = (ColumnSchema, ValueData)>,
234 one_row: &mut Vec<Value>,
235) -> Result<()> {
236 let TableData {
237 schema,
238 column_indexes,
239 ..
240 } = table_data;
241
242 for (column_schema, value) in kv_iter {
243 let index = column_indexes.get(&column_schema.column_name);
244 if let Some(index) = index {
245 check_schema_number(
246 column_schema.datatype,
247 column_schema.semantic_type,
248 &schema[*index],
249 )?;
250 one_row[*index].value_data = Some(value);
251 } else {
252 let index = schema.len();
253 let key = column_schema.column_name.clone();
254 schema.push(column_schema);
255 column_indexes.insert(key, index);
256 one_row.push(Value {
257 value_data: Some(value),
258 });
259 }
260 }
261
262 Ok(())
263}
264
265fn write_by_semantic_type(
266 table_data: &mut TableData,
267 semantic_type: SemanticType,
268 ktv_iter: impl Iterator<Item = (String, ColumnDataType, ValueData)>,
269 one_row: &mut Vec<Value>,
270) -> Result<()> {
271 let TableData {
272 schema,
273 column_indexes,
274 ..
275 } = table_data;
276
277 for (name, datatype, value) in ktv_iter {
278 let index = column_indexes.get(&name);
279 if let Some(index) = index {
280 check_schema(datatype, semantic_type, &schema[*index])?;
281 one_row[*index].value_data = Some(value);
282 } else {
283 let index = schema.len();
284 schema.push(ColumnSchema {
285 column_name: name.clone(),
286 datatype: datatype as i32,
287 semantic_type: semantic_type as i32,
288 ..Default::default()
289 });
290 column_indexes.insert(name, index);
291 one_row.push(Value {
292 value_data: Some(value),
293 });
294 }
295 }
296
297 Ok(())
298}
299
300pub fn write_ts_to_millis(
302 table_data: &mut TableData,
303 name: impl ToString,
304 ts: Option<i64>,
305 precision: Precision,
306 one_row: &mut Vec<Value>,
307) -> Result<()> {
308 write_ts_to(
309 table_data,
310 name,
311 ts,
312 precision,
313 TimestampType::Millis,
314 one_row,
315 )
316}
317
318pub fn write_ts_to_nanos(
320 table_data: &mut TableData,
321 name: impl ToString,
322 ts: Option<i64>,
323 precision: Precision,
324 one_row: &mut Vec<Value>,
325) -> Result<()> {
326 write_ts_to(
327 table_data,
328 name,
329 ts,
330 precision,
331 TimestampType::Nanos,
332 one_row,
333 )
334}
335
336enum TimestampType {
337 Millis,
338 Nanos,
339}
340
341fn write_ts_to(
342 table_data: &mut TableData,
343 name: impl ToString,
344 ts: Option<i64>,
345 precision: Precision,
346 ts_type: TimestampType,
347 one_row: &mut Vec<Value>,
348) -> Result<()> {
349 let TableData {
350 schema,
351 column_indexes,
352 ..
353 } = table_data;
354 let name = name.to_string();
355
356 let ts = match ts {
357 Some(timestamp) => match ts_type {
358 TimestampType::Millis => precision.to_millis(timestamp),
359 TimestampType::Nanos => precision.to_nanos(timestamp),
360 }
361 .with_context(|| TimestampOverflowSnafu {
362 error: format!(
363 "timestamp {} overflow with precision {}",
364 timestamp, precision
365 ),
366 })?,
367 None => {
368 let timestamp = Timestamp::current_time(Nanosecond);
369 let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?;
370 let timestamp = timestamp
371 .convert_to(unit)
372 .with_context(|| TimePrecisionSnafu {
373 name: precision.to_string(),
374 })?
375 .into();
376 match ts_type {
377 TimestampType::Millis => precision.to_millis(timestamp),
378 TimestampType::Nanos => precision.to_nanos(timestamp),
379 }
380 .with_context(|| TimestampOverflowSnafu {
381 error: format!(
382 "timestamp {} overflow with precision {}",
383 timestamp, precision
384 ),
385 })?
386 }
387 };
388
389 let (datatype, ts) = match ts_type {
390 TimestampType::Millis => (
391 ColumnDataType::TimestampMillisecond,
392 ValueData::TimestampMillisecondValue(ts),
393 ),
394 TimestampType::Nanos => (
395 ColumnDataType::TimestampNanosecond,
396 ValueData::TimestampNanosecondValue(ts),
397 ),
398 };
399
400 let index = column_indexes.get(&name);
401 if let Some(index) = index {
402 check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
403 one_row[*index].value_data = Some(ts);
404 } else {
405 let index = schema.len();
406 schema.push(ColumnSchema {
407 column_name: name.clone(),
408 datatype: datatype as i32,
409 semantic_type: SemanticType::Timestamp as i32,
410 ..Default::default()
411 });
412 column_indexes.insert(name, index);
413 one_row.push(ts.into())
414 }
415
416 Ok(())
417}
418
419fn check_schema(
420 datatype: ColumnDataType,
421 semantic_type: SemanticType,
422 schema: &ColumnSchema,
423) -> Result<()> {
424 check_schema_number(datatype as i32, semantic_type as i32, schema)
425}
426
427fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> {
428 ensure!(
429 schema.datatype == datatype,
430 IncompatibleSchemaSnafu {
431 column_name: &schema.column_name,
432 datatype: "datatype",
433 expected: schema.datatype,
434 actual: datatype,
435 }
436 );
437
438 ensure!(
439 schema.semantic_type == semantic_type,
440 IncompatibleSchemaSnafu {
441 column_name: &schema.column_name,
442 datatype: "semantic_type",
443 expected: schema.semantic_type,
444 actual: semantic_type,
445 }
446 );
447
448 Ok(())
449}