1use std::collections::HashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::helper::time_index_column_schema;
19use api::v1::value::ValueData;
20use api::v1::{
21 ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
22 RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
23};
24use common_grpc::precision::Precision;
25use common_time::timestamp::TimeUnit;
26use common_time::timestamp::TimeUnit::Nanosecond;
27use common_time::Timestamp;
28use snafu::{ensure, OptionExt, ResultExt};
29
30use crate::error::{
31 IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu,
32};
33
34pub struct TableData {
38 schema: Vec<ColumnSchema>,
39 rows: Vec<Row>,
40 column_indexes: HashMap<String, usize>,
41}
42
43impl TableData {
44 pub fn new(num_columns: usize, num_rows: usize) -> Self {
45 Self {
46 schema: Vec::with_capacity(num_columns),
47 rows: Vec::with_capacity(num_rows),
48 column_indexes: HashMap::with_capacity(num_columns),
49 }
50 }
51
52 #[inline]
53 pub fn num_columns(&self) -> usize {
54 self.schema.len()
55 }
56
57 #[inline]
58 pub fn num_rows(&self) -> usize {
59 self.rows.len()
60 }
61
62 #[inline]
63 pub fn alloc_one_row(&self) -> Vec<Value> {
64 vec![Value { value_data: None }; self.num_columns()]
65 }
66
67 #[inline]
68 pub fn add_row(&mut self, values: Vec<Value>) {
69 self.rows.push(Row { values })
70 }
71
72 #[allow(dead_code)]
73 pub fn columns(&self) -> &Vec<ColumnSchema> {
74 &self.schema
75 }
76
77 pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
78 (self.schema, self.rows)
79 }
80}
81
82pub struct MultiTableData {
83 table_data_map: HashMap<String, TableData>,
84}
85
86impl Default for MultiTableData {
87 fn default() -> Self {
88 Self::new()
89 }
90}
91
92impl MultiTableData {
93 pub fn new() -> Self {
94 Self {
95 table_data_map: HashMap::new(),
96 }
97 }
98
99 pub fn get_or_default_table_data(
100 &mut self,
101 table_name: impl ToString,
102 num_columns: usize,
103 num_rows: usize,
104 ) -> &mut TableData {
105 self.table_data_map
106 .entry(table_name.to_string())
107 .or_insert_with(|| TableData::new(num_columns, num_rows))
108 }
109
110 pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) {
111 self.table_data_map
112 .insert(table_name.to_string(), table_data);
113 }
114
115 #[allow(dead_code)]
116 pub fn num_tables(&self) -> usize {
117 self.table_data_map.len()
118 }
119
120 pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
122 let mut total_rows = 0;
123 let inserts = self
124 .table_data_map
125 .into_iter()
126 .map(|(table_name, table_data)| {
127 total_rows += table_data.num_rows();
128 let num_columns = table_data.num_columns();
129 let (schema, mut rows) = table_data.into_schema_and_rows();
130 for row in &mut rows {
131 if num_columns > row.values.len() {
132 row.values.resize(num_columns, Value { value_data: None });
133 }
134 }
135
136 RowInsertRequest {
137 table_name,
138 rows: Some(Rows { schema, rows }),
139 }
140 })
141 .collect::<Vec<_>>();
142 let row_insert_requests = RowInsertRequests { inserts };
143
144 (row_insert_requests, total_rows)
145 }
146}
147
148pub fn write_tags(
150 table_data: &mut TableData,
151 tags: impl Iterator<Item = (String, String)>,
152 one_row: &mut Vec<Value>,
153) -> Result<()> {
154 let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, Some(ValueData::StringValue(v))));
155 write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
156}
157
158pub fn write_fields(
160 table_data: &mut TableData,
161 fields: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
162 one_row: &mut Vec<Value>,
163) -> Result<()> {
164 write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
165}
166
167pub fn write_tag(
169 table_data: &mut TableData,
170 name: impl ToString,
171 value: impl ToString,
172 one_row: &mut Vec<Value>,
173) -> Result<()> {
174 write_by_semantic_type(
175 table_data,
176 SemanticType::Tag,
177 std::iter::once((
178 name.to_string(),
179 ColumnDataType::String,
180 Some(ValueData::StringValue(value.to_string())),
181 )),
182 one_row,
183 )
184}
185
186pub fn write_f64(
188 table_data: &mut TableData,
189 name: impl ToString,
190 value: f64,
191 one_row: &mut Vec<Value>,
192) -> Result<()> {
193 write_fields(
194 table_data,
195 std::iter::once((
196 name.to_string(),
197 ColumnDataType::Float64,
198 Some(ValueData::F64Value(value)),
199 )),
200 one_row,
201 )
202}
203
204fn build_json_column_schema(name: impl ToString) -> ColumnSchema {
205 ColumnSchema {
206 column_name: name.to_string(),
207 datatype: ColumnDataType::Binary as i32,
208 semantic_type: SemanticType::Field as i32,
209 datatype_extension: Some(ColumnDataTypeExtension {
210 type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
211 }),
212 ..Default::default()
213 }
214}
215
216pub fn write_json(
217 table_data: &mut TableData,
218 name: impl ToString,
219 value: jsonb::Value,
220 one_row: &mut Vec<Value>,
221) -> Result<()> {
222 write_by_schema(
223 table_data,
224 std::iter::once((
225 build_json_column_schema(name),
226 Some(ValueData::BinaryValue(value.to_vec())),
227 )),
228 one_row,
229 )
230}
231
232fn write_by_schema(
233 table_data: &mut TableData,
234 kv_iter: impl Iterator<Item = (ColumnSchema, Option<ValueData>)>,
235 one_row: &mut Vec<Value>,
236) -> Result<()> {
237 let TableData {
238 schema,
239 column_indexes,
240 ..
241 } = table_data;
242
243 for (column_schema, value) in kv_iter {
244 let index = column_indexes.get(&column_schema.column_name);
245 if let Some(index) = index {
246 check_schema_number(
247 column_schema.datatype,
248 column_schema.semantic_type,
249 &schema[*index],
250 )?;
251 one_row[*index].value_data = value;
252 } else {
253 let index = schema.len();
254 let key = column_schema.column_name.clone();
255 schema.push(column_schema);
256 column_indexes.insert(key, index);
257 one_row.push(Value { value_data: value });
258 }
259 }
260
261 Ok(())
262}
263
264fn write_by_semantic_type(
265 table_data: &mut TableData,
266 semantic_type: SemanticType,
267 ktv_iter: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
268 one_row: &mut Vec<Value>,
269) -> Result<()> {
270 let TableData {
271 schema,
272 column_indexes,
273 ..
274 } = table_data;
275
276 for (name, datatype, value) in ktv_iter {
277 let index = column_indexes.get(&name);
278 if let Some(index) = index {
279 check_schema(datatype, semantic_type, &schema[*index])?;
280 one_row[*index].value_data = value;
281 } else {
282 let index = schema.len();
283 schema.push(ColumnSchema {
284 column_name: name.clone(),
285 datatype: datatype as i32,
286 semantic_type: semantic_type as i32,
287 ..Default::default()
288 });
289 column_indexes.insert(name, index);
290 one_row.push(Value { value_data: value });
291 }
292 }
293
294 Ok(())
295}
296
297pub fn write_ts_to_millis(
299 table_data: &mut TableData,
300 name: impl ToString,
301 ts: Option<i64>,
302 precision: Precision,
303 one_row: &mut Vec<Value>,
304) -> Result<()> {
305 write_ts_to(
306 table_data,
307 name,
308 ts,
309 precision,
310 TimestampType::Millis,
311 one_row,
312 )
313}
314
315pub fn write_ts_to_nanos(
317 table_data: &mut TableData,
318 name: impl ToString,
319 ts: Option<i64>,
320 precision: Precision,
321 one_row: &mut Vec<Value>,
322) -> Result<()> {
323 write_ts_to(
324 table_data,
325 name,
326 ts,
327 precision,
328 TimestampType::Nanos,
329 one_row,
330 )
331}
332
333enum TimestampType {
334 Millis,
335 Nanos,
336}
337
338fn write_ts_to(
339 table_data: &mut TableData,
340 name: impl ToString,
341 ts: Option<i64>,
342 precision: Precision,
343 ts_type: TimestampType,
344 one_row: &mut Vec<Value>,
345) -> Result<()> {
346 let TableData {
347 schema,
348 column_indexes,
349 ..
350 } = table_data;
351 let name = name.to_string();
352
353 let ts = match ts {
354 Some(timestamp) => match ts_type {
355 TimestampType::Millis => precision.to_millis(timestamp),
356 TimestampType::Nanos => precision.to_nanos(timestamp),
357 }
358 .with_context(|| TimestampOverflowSnafu {
359 error: format!(
360 "timestamp {} overflow with precision {}",
361 timestamp, precision
362 ),
363 })?,
364 None => {
365 let timestamp = Timestamp::current_time(Nanosecond);
366 let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?;
367 let timestamp = timestamp
368 .convert_to(unit)
369 .with_context(|| TimePrecisionSnafu {
370 name: precision.to_string(),
371 })?
372 .into();
373 match ts_type {
374 TimestampType::Millis => precision.to_millis(timestamp),
375 TimestampType::Nanos => precision.to_nanos(timestamp),
376 }
377 .with_context(|| TimestampOverflowSnafu {
378 error: format!(
379 "timestamp {} overflow with precision {}",
380 timestamp, precision
381 ),
382 })?
383 }
384 };
385
386 let (datatype, ts) = match ts_type {
387 TimestampType::Millis => (
388 ColumnDataType::TimestampMillisecond,
389 ValueData::TimestampMillisecondValue(ts),
390 ),
391 TimestampType::Nanos => (
392 ColumnDataType::TimestampNanosecond,
393 ValueData::TimestampNanosecondValue(ts),
394 ),
395 };
396
397 let index = column_indexes.get(&name);
398 if let Some(index) = index {
399 check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
400 one_row[*index].value_data = Some(ts);
401 } else {
402 let index = schema.len();
403 schema.push(time_index_column_schema(&name, datatype));
404 column_indexes.insert(name, index);
405 one_row.push(ts.into())
406 }
407
408 Ok(())
409}
410
411fn check_schema(
412 datatype: ColumnDataType,
413 semantic_type: SemanticType,
414 schema: &ColumnSchema,
415) -> Result<()> {
416 check_schema_number(datatype as i32, semantic_type as i32, schema)
417}
418
419fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> {
420 ensure!(
421 schema.datatype == datatype,
422 IncompatibleSchemaSnafu {
423 column_name: &schema.column_name,
424 datatype: "datatype",
425 expected: schema.datatype,
426 actual: datatype,
427 }
428 );
429
430 ensure!(
431 schema.semantic_type == semantic_type,
432 IncompatibleSchemaSnafu {
433 column_name: &schema.column_name,
434 datatype: "semantic_type",
435 expected: schema.semantic_type,
436 actual: semantic_type,
437 }
438 );
439
440 Ok(())
441}