servers/
row_writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::value::ValueData;
19use api::v1::{
20    ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
21    RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
22};
23use common_grpc::precision::Precision;
24use common_time::timestamp::TimeUnit;
25use common_time::timestamp::TimeUnit::Nanosecond;
26use common_time::Timestamp;
27use snafu::{ensure, OptionExt, ResultExt};
28
29use crate::error::{
30    IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu,
31};
32
33/// The intermediate data structure for building the write request.
34/// It constructs the `schema` and `rows` as all input data row
35/// parsing is completed.
36pub struct TableData {
37    schema: Vec<ColumnSchema>,
38    rows: Vec<Row>,
39    column_indexes: HashMap<String, usize>,
40}
41
42impl TableData {
43    pub fn new(num_columns: usize, num_rows: usize) -> Self {
44        Self {
45            schema: Vec::with_capacity(num_columns),
46            rows: Vec::with_capacity(num_rows),
47            column_indexes: HashMap::with_capacity(num_columns),
48        }
49    }
50
51    #[inline]
52    pub fn num_columns(&self) -> usize {
53        self.schema.len()
54    }
55
56    #[inline]
57    pub fn num_rows(&self) -> usize {
58        self.rows.len()
59    }
60
61    #[inline]
62    pub fn alloc_one_row(&self) -> Vec<Value> {
63        vec![Value { value_data: None }; self.num_columns()]
64    }
65
66    #[inline]
67    pub fn add_row(&mut self, values: Vec<Value>) {
68        self.rows.push(Row { values })
69    }
70
71    #[allow(dead_code)]
72    pub fn columns(&self) -> &Vec<ColumnSchema> {
73        &self.schema
74    }
75
76    pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
77        (self.schema, self.rows)
78    }
79}
80
81pub struct MultiTableData {
82    table_data_map: HashMap<String, TableData>,
83}
84
85impl Default for MultiTableData {
86    fn default() -> Self {
87        Self::new()
88    }
89}
90
91impl MultiTableData {
92    pub fn new() -> Self {
93        Self {
94            table_data_map: HashMap::new(),
95        }
96    }
97
98    pub fn get_or_default_table_data(
99        &mut self,
100        table_name: impl ToString,
101        num_columns: usize,
102        num_rows: usize,
103    ) -> &mut TableData {
104        self.table_data_map
105            .entry(table_name.to_string())
106            .or_insert_with(|| TableData::new(num_columns, num_rows))
107    }
108
109    pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) {
110        self.table_data_map
111            .insert(table_name.to_string(), table_data);
112    }
113
114    #[allow(dead_code)]
115    pub fn num_tables(&self) -> usize {
116        self.table_data_map.len()
117    }
118
119    /// Returns the request and number of rows in it.
120    pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
121        let mut total_rows = 0;
122        let inserts = self
123            .table_data_map
124            .into_iter()
125            .map(|(table_name, table_data)| {
126                total_rows += table_data.num_rows();
127                let num_columns = table_data.num_columns();
128                let (schema, mut rows) = table_data.into_schema_and_rows();
129                for row in &mut rows {
130                    if num_columns > row.values.len() {
131                        row.values.resize(num_columns, Value { value_data: None });
132                    }
133                }
134
135                RowInsertRequest {
136                    table_name,
137                    rows: Some(Rows { schema, rows }),
138                }
139            })
140            .collect::<Vec<_>>();
141        let row_insert_requests = RowInsertRequests { inserts };
142
143        (row_insert_requests, total_rows)
144    }
145}
146
147/// Write data as tags into the table data.
148pub fn write_tags(
149    table_data: &mut TableData,
150    tags: impl Iterator<Item = (String, String)>,
151    one_row: &mut Vec<Value>,
152) -> Result<()> {
153    let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, ValueData::StringValue(v)));
154    write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
155}
156
157/// Write data as fields into the table data.
158pub fn write_fields(
159    table_data: &mut TableData,
160    fields: impl Iterator<Item = (String, ColumnDataType, ValueData)>,
161    one_row: &mut Vec<Value>,
162) -> Result<()> {
163    write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
164}
165
166/// Write data as a tag into the table data.
167pub fn write_tag(
168    table_data: &mut TableData,
169    name: impl ToString,
170    value: impl ToString,
171    one_row: &mut Vec<Value>,
172) -> Result<()> {
173    write_by_semantic_type(
174        table_data,
175        SemanticType::Tag,
176        std::iter::once((
177            name.to_string(),
178            ColumnDataType::String,
179            ValueData::StringValue(value.to_string()),
180        )),
181        one_row,
182    )
183}
184
185/// Write float64 data as a field into the table data.
186pub fn write_f64(
187    table_data: &mut TableData,
188    name: impl ToString,
189    value: f64,
190    one_row: &mut Vec<Value>,
191) -> Result<()> {
192    write_fields(
193        table_data,
194        std::iter::once((
195            name.to_string(),
196            ColumnDataType::Float64,
197            ValueData::F64Value(value),
198        )),
199        one_row,
200    )
201}
202
203fn build_json_column_schema(name: impl ToString) -> ColumnSchema {
204    ColumnSchema {
205        column_name: name.to_string(),
206        datatype: ColumnDataType::Binary as i32,
207        semantic_type: SemanticType::Field as i32,
208        datatype_extension: Some(ColumnDataTypeExtension {
209            type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
210        }),
211        ..Default::default()
212    }
213}
214
215pub fn write_json(
216    table_data: &mut TableData,
217    name: impl ToString,
218    value: jsonb::Value,
219    one_row: &mut Vec<Value>,
220) -> Result<()> {
221    write_by_schema(
222        table_data,
223        std::iter::once((
224            build_json_column_schema(name),
225            ValueData::BinaryValue(value.to_vec()),
226        )),
227        one_row,
228    )
229}
230
231fn write_by_schema(
232    table_data: &mut TableData,
233    kv_iter: impl Iterator<Item = (ColumnSchema, ValueData)>,
234    one_row: &mut Vec<Value>,
235) -> Result<()> {
236    let TableData {
237        schema,
238        column_indexes,
239        ..
240    } = table_data;
241
242    for (column_schema, value) in kv_iter {
243        let index = column_indexes.get(&column_schema.column_name);
244        if let Some(index) = index {
245            check_schema_number(
246                column_schema.datatype,
247                column_schema.semantic_type,
248                &schema[*index],
249            )?;
250            one_row[*index].value_data = Some(value);
251        } else {
252            let index = schema.len();
253            let key = column_schema.column_name.clone();
254            schema.push(column_schema);
255            column_indexes.insert(key, index);
256            one_row.push(Value {
257                value_data: Some(value),
258            });
259        }
260    }
261
262    Ok(())
263}
264
265fn write_by_semantic_type(
266    table_data: &mut TableData,
267    semantic_type: SemanticType,
268    ktv_iter: impl Iterator<Item = (String, ColumnDataType, ValueData)>,
269    one_row: &mut Vec<Value>,
270) -> Result<()> {
271    let TableData {
272        schema,
273        column_indexes,
274        ..
275    } = table_data;
276
277    for (name, datatype, value) in ktv_iter {
278        let index = column_indexes.get(&name);
279        if let Some(index) = index {
280            check_schema(datatype, semantic_type, &schema[*index])?;
281            one_row[*index].value_data = Some(value);
282        } else {
283            let index = schema.len();
284            schema.push(ColumnSchema {
285                column_name: name.clone(),
286                datatype: datatype as i32,
287                semantic_type: semantic_type as i32,
288                ..Default::default()
289            });
290            column_indexes.insert(name, index);
291            one_row.push(Value {
292                value_data: Some(value),
293            });
294        }
295    }
296
297    Ok(())
298}
299
300/// Write timestamp data as milliseconds into the table data.
301pub fn write_ts_to_millis(
302    table_data: &mut TableData,
303    name: impl ToString,
304    ts: Option<i64>,
305    precision: Precision,
306    one_row: &mut Vec<Value>,
307) -> Result<()> {
308    write_ts_to(
309        table_data,
310        name,
311        ts,
312        precision,
313        TimestampType::Millis,
314        one_row,
315    )
316}
317
318/// Write timestamp data as nanoseconds into the table data.
319pub fn write_ts_to_nanos(
320    table_data: &mut TableData,
321    name: impl ToString,
322    ts: Option<i64>,
323    precision: Precision,
324    one_row: &mut Vec<Value>,
325) -> Result<()> {
326    write_ts_to(
327        table_data,
328        name,
329        ts,
330        precision,
331        TimestampType::Nanos,
332        one_row,
333    )
334}
335
336enum TimestampType {
337    Millis,
338    Nanos,
339}
340
341fn write_ts_to(
342    table_data: &mut TableData,
343    name: impl ToString,
344    ts: Option<i64>,
345    precision: Precision,
346    ts_type: TimestampType,
347    one_row: &mut Vec<Value>,
348) -> Result<()> {
349    let TableData {
350        schema,
351        column_indexes,
352        ..
353    } = table_data;
354    let name = name.to_string();
355
356    let ts = match ts {
357        Some(timestamp) => match ts_type {
358            TimestampType::Millis => precision.to_millis(timestamp),
359            TimestampType::Nanos => precision.to_nanos(timestamp),
360        }
361        .with_context(|| TimestampOverflowSnafu {
362            error: format!(
363                "timestamp {} overflow with precision {}",
364                timestamp, precision
365            ),
366        })?,
367        None => {
368            let timestamp = Timestamp::current_time(Nanosecond);
369            let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?;
370            let timestamp = timestamp
371                .convert_to(unit)
372                .with_context(|| TimePrecisionSnafu {
373                    name: precision.to_string(),
374                })?
375                .into();
376            match ts_type {
377                TimestampType::Millis => precision.to_millis(timestamp),
378                TimestampType::Nanos => precision.to_nanos(timestamp),
379            }
380            .with_context(|| TimestampOverflowSnafu {
381                error: format!(
382                    "timestamp {} overflow with precision {}",
383                    timestamp, precision
384                ),
385            })?
386        }
387    };
388
389    let (datatype, ts) = match ts_type {
390        TimestampType::Millis => (
391            ColumnDataType::TimestampMillisecond,
392            ValueData::TimestampMillisecondValue(ts),
393        ),
394        TimestampType::Nanos => (
395            ColumnDataType::TimestampNanosecond,
396            ValueData::TimestampNanosecondValue(ts),
397        ),
398    };
399
400    let index = column_indexes.get(&name);
401    if let Some(index) = index {
402        check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
403        one_row[*index].value_data = Some(ts);
404    } else {
405        let index = schema.len();
406        schema.push(ColumnSchema {
407            column_name: name.clone(),
408            datatype: datatype as i32,
409            semantic_type: SemanticType::Timestamp as i32,
410            ..Default::default()
411        });
412        column_indexes.insert(name, index);
413        one_row.push(ts.into())
414    }
415
416    Ok(())
417}
418
419fn check_schema(
420    datatype: ColumnDataType,
421    semantic_type: SemanticType,
422    schema: &ColumnSchema,
423) -> Result<()> {
424    check_schema_number(datatype as i32, semantic_type as i32, schema)
425}
426
427fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> {
428    ensure!(
429        schema.datatype == datatype,
430        IncompatibleSchemaSnafu {
431            column_name: &schema.column_name,
432            datatype: "datatype",
433            expected: schema.datatype,
434            actual: datatype,
435        }
436    );
437
438    ensure!(
439        schema.semantic_type == semantic_type,
440        IncompatibleSchemaSnafu {
441            column_name: &schema.column_name,
442            datatype: "semantic_type",
443            expected: schema.semantic_type,
444            actual: semantic_type,
445        }
446    );
447
448    Ok(())
449}