1use std::collections::HashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::value::ValueData;
19use api::v1::{
20 ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
21 RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
22};
23use common_grpc::precision::Precision;
24use common_time::timestamp::TimeUnit;
25use common_time::timestamp::TimeUnit::Nanosecond;
26use common_time::Timestamp;
27use snafu::{ensure, OptionExt, ResultExt};
28
29use crate::error::{
30 IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu,
31};
32
33pub struct TableData {
37 schema: Vec<ColumnSchema>,
38 rows: Vec<Row>,
39 column_indexes: HashMap<String, usize>,
40}
41
42impl TableData {
43 pub fn new(num_columns: usize, num_rows: usize) -> Self {
44 Self {
45 schema: Vec::with_capacity(num_columns),
46 rows: Vec::with_capacity(num_rows),
47 column_indexes: HashMap::with_capacity(num_columns),
48 }
49 }
50
51 #[inline]
52 pub fn num_columns(&self) -> usize {
53 self.schema.len()
54 }
55
56 #[inline]
57 pub fn num_rows(&self) -> usize {
58 self.rows.len()
59 }
60
61 #[inline]
62 pub fn alloc_one_row(&self) -> Vec<Value> {
63 vec![Value { value_data: None }; self.num_columns()]
64 }
65
66 #[inline]
67 pub fn add_row(&mut self, values: Vec<Value>) {
68 self.rows.push(Row { values })
69 }
70
71 #[allow(dead_code)]
72 pub fn columns(&self) -> &Vec<ColumnSchema> {
73 &self.schema
74 }
75
76 pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
77 (self.schema, self.rows)
78 }
79}
80
81pub struct MultiTableData {
82 table_data_map: HashMap<String, TableData>,
83}
84
85impl Default for MultiTableData {
86 fn default() -> Self {
87 Self::new()
88 }
89}
90
91impl MultiTableData {
92 pub fn new() -> Self {
93 Self {
94 table_data_map: HashMap::new(),
95 }
96 }
97
98 pub fn get_or_default_table_data(
99 &mut self,
100 table_name: impl ToString,
101 num_columns: usize,
102 num_rows: usize,
103 ) -> &mut TableData {
104 self.table_data_map
105 .entry(table_name.to_string())
106 .or_insert_with(|| TableData::new(num_columns, num_rows))
107 }
108
109 pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) {
110 self.table_data_map
111 .insert(table_name.to_string(), table_data);
112 }
113
114 #[allow(dead_code)]
115 pub fn num_tables(&self) -> usize {
116 self.table_data_map.len()
117 }
118
119 pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
121 let mut total_rows = 0;
122 let inserts = self
123 .table_data_map
124 .into_iter()
125 .map(|(table_name, table_data)| {
126 total_rows += table_data.num_rows();
127 let num_columns = table_data.num_columns();
128 let (schema, mut rows) = table_data.into_schema_and_rows();
129 for row in &mut rows {
130 if num_columns > row.values.len() {
131 row.values.resize(num_columns, Value { value_data: None });
132 }
133 }
134
135 RowInsertRequest {
136 table_name,
137 rows: Some(Rows { schema, rows }),
138 }
139 })
140 .collect::<Vec<_>>();
141 let row_insert_requests = RowInsertRequests { inserts };
142
143 (row_insert_requests, total_rows)
144 }
145}
146
147pub fn write_tags(
149 table_data: &mut TableData,
150 tags: impl Iterator<Item = (String, String)>,
151 one_row: &mut Vec<Value>,
152) -> Result<()> {
153 let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, Some(ValueData::StringValue(v))));
154 write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
155}
156
157pub fn write_fields(
159 table_data: &mut TableData,
160 fields: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
161 one_row: &mut Vec<Value>,
162) -> Result<()> {
163 write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
164}
165
166pub fn write_tag(
168 table_data: &mut TableData,
169 name: impl ToString,
170 value: impl ToString,
171 one_row: &mut Vec<Value>,
172) -> Result<()> {
173 write_by_semantic_type(
174 table_data,
175 SemanticType::Tag,
176 std::iter::once((
177 name.to_string(),
178 ColumnDataType::String,
179 Some(ValueData::StringValue(value.to_string())),
180 )),
181 one_row,
182 )
183}
184
185pub fn write_f64(
187 table_data: &mut TableData,
188 name: impl ToString,
189 value: f64,
190 one_row: &mut Vec<Value>,
191) -> Result<()> {
192 write_fields(
193 table_data,
194 std::iter::once((
195 name.to_string(),
196 ColumnDataType::Float64,
197 Some(ValueData::F64Value(value)),
198 )),
199 one_row,
200 )
201}
202
203fn build_json_column_schema(name: impl ToString) -> ColumnSchema {
204 ColumnSchema {
205 column_name: name.to_string(),
206 datatype: ColumnDataType::Binary as i32,
207 semantic_type: SemanticType::Field as i32,
208 datatype_extension: Some(ColumnDataTypeExtension {
209 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
210 }),
211 ..Default::default()
212 }
213}
214
215pub fn write_json(
216 table_data: &mut TableData,
217 name: impl ToString,
218 value: jsonb::Value,
219 one_row: &mut Vec<Value>,
220) -> Result<()> {
221 write_by_schema(
222 table_data,
223 std::iter::once((
224 build_json_column_schema(name),
225 Some(ValueData::BinaryValue(value.to_vec())),
226 )),
227 one_row,
228 )
229}
230
231fn write_by_schema(
232 table_data: &mut TableData,
233 kv_iter: impl Iterator<Item = (ColumnSchema, Option<ValueData>)>,
234 one_row: &mut Vec<Value>,
235) -> Result<()> {
236 let TableData {
237 schema,
238 column_indexes,
239 ..
240 } = table_data;
241
242 for (column_schema, value) in kv_iter {
243 let index = column_indexes.get(&column_schema.column_name);
244 if let Some(index) = index {
245 check_schema_number(
246 column_schema.datatype,
247 column_schema.semantic_type,
248 &schema[*index],
249 )?;
250 one_row[*index].value_data = value;
251 } else {
252 let index = schema.len();
253 let key = column_schema.column_name.clone();
254 schema.push(column_schema);
255 column_indexes.insert(key, index);
256 one_row.push(Value { value_data: value });
257 }
258 }
259
260 Ok(())
261}
262
263fn write_by_semantic_type(
264 table_data: &mut TableData,
265 semantic_type: SemanticType,
266 ktv_iter: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
267 one_row: &mut Vec<Value>,
268) -> Result<()> {
269 let TableData {
270 schema,
271 column_indexes,
272 ..
273 } = table_data;
274
275 for (name, datatype, value) in ktv_iter {
276 let index = column_indexes.get(&name);
277 if let Some(index) = index {
278 check_schema(datatype, semantic_type, &schema[*index])?;
279 one_row[*index].value_data = value;
280 } else {
281 let index = schema.len();
282 schema.push(ColumnSchema {
283 column_name: name.clone(),
284 datatype: datatype as i32,
285 semantic_type: semantic_type as i32,
286 ..Default::default()
287 });
288 column_indexes.insert(name, index);
289 one_row.push(Value { value_data: value });
290 }
291 }
292
293 Ok(())
294}
295
296pub fn write_ts_to_millis(
298 table_data: &mut TableData,
299 name: impl ToString,
300 ts: Option<i64>,
301 precision: Precision,
302 one_row: &mut Vec<Value>,
303) -> Result<()> {
304 write_ts_to(
305 table_data,
306 name,
307 ts,
308 precision,
309 TimestampType::Millis,
310 one_row,
311 )
312}
313
314pub fn write_ts_to_nanos(
316 table_data: &mut TableData,
317 name: impl ToString,
318 ts: Option<i64>,
319 precision: Precision,
320 one_row: &mut Vec<Value>,
321) -> Result<()> {
322 write_ts_to(
323 table_data,
324 name,
325 ts,
326 precision,
327 TimestampType::Nanos,
328 one_row,
329 )
330}
331
332enum TimestampType {
333 Millis,
334 Nanos,
335}
336
337fn write_ts_to(
338 table_data: &mut TableData,
339 name: impl ToString,
340 ts: Option<i64>,
341 precision: Precision,
342 ts_type: TimestampType,
343 one_row: &mut Vec<Value>,
344) -> Result<()> {
345 let TableData {
346 schema,
347 column_indexes,
348 ..
349 } = table_data;
350 let name = name.to_string();
351
352 let ts = match ts {
353 Some(timestamp) => match ts_type {
354 TimestampType::Millis => precision.to_millis(timestamp),
355 TimestampType::Nanos => precision.to_nanos(timestamp),
356 }
357 .with_context(|| TimestampOverflowSnafu {
358 error: format!(
359 "timestamp {} overflow with precision {}",
360 timestamp, precision
361 ),
362 })?,
363 None => {
364 let timestamp = Timestamp::current_time(Nanosecond);
365 let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?;
366 let timestamp = timestamp
367 .convert_to(unit)
368 .with_context(|| TimePrecisionSnafu {
369 name: precision.to_string(),
370 })?
371 .into();
372 match ts_type {
373 TimestampType::Millis => precision.to_millis(timestamp),
374 TimestampType::Nanos => precision.to_nanos(timestamp),
375 }
376 .with_context(|| TimestampOverflowSnafu {
377 error: format!(
378 "timestamp {} overflow with precision {}",
379 timestamp, precision
380 ),
381 })?
382 }
383 };
384
385 let (datatype, ts) = match ts_type {
386 TimestampType::Millis => (
387 ColumnDataType::TimestampMillisecond,
388 ValueData::TimestampMillisecondValue(ts),
389 ),
390 TimestampType::Nanos => (
391 ColumnDataType::TimestampNanosecond,
392 ValueData::TimestampNanosecondValue(ts),
393 ),
394 };
395
396 let index = column_indexes.get(&name);
397 if let Some(index) = index {
398 check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
399 one_row[*index].value_data = Some(ts);
400 } else {
401 let index = schema.len();
402 schema.push(ColumnSchema {
403 column_name: name.clone(),
404 datatype: datatype as i32,
405 semantic_type: SemanticType::Timestamp as i32,
406 ..Default::default()
407 });
408 column_indexes.insert(name, index);
409 one_row.push(ts.into())
410 }
411
412 Ok(())
413}
414
415fn check_schema(
416 datatype: ColumnDataType,
417 semantic_type: SemanticType,
418 schema: &ColumnSchema,
419) -> Result<()> {
420 check_schema_number(datatype as i32, semantic_type as i32, schema)
421}
422
423fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> {
424 ensure!(
425 schema.datatype == datatype,
426 IncompatibleSchemaSnafu {
427 column_name: &schema.column_name,
428 datatype: "datatype",
429 expected: schema.datatype,
430 actual: datatype,
431 }
432 );
433
434 ensure!(
435 schema.semantic_type == semantic_type,
436 IncompatibleSchemaSnafu {
437 column_name: &schema.column_name,
438 datatype: "semantic_type",
439 expected: schema.semantic_type,
440 actual: semantic_type,
441 }
442 );
443
444 Ok(())
445}