servers/
row_writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::value::ValueData;
19use api::v1::{
20    ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
21    RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
22};
23use common_grpc::precision::Precision;
24use common_time::timestamp::TimeUnit;
25use common_time::timestamp::TimeUnit::Nanosecond;
26use common_time::Timestamp;
27use snafu::{ensure, OptionExt, ResultExt};
28
29use crate::error::{
30    IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu,
31};
32
33/// The intermediate data structure for building the write request.
34/// It constructs the `schema` and `rows` as all input data row
35/// parsing is completed.
36pub struct TableData {
37    schema: Vec<ColumnSchema>,
38    rows: Vec<Row>,
39    column_indexes: HashMap<String, usize>,
40}
41
42impl TableData {
43    pub fn new(num_columns: usize, num_rows: usize) -> Self {
44        Self {
45            schema: Vec::with_capacity(num_columns),
46            rows: Vec::with_capacity(num_rows),
47            column_indexes: HashMap::with_capacity(num_columns),
48        }
49    }
50
51    #[inline]
52    pub fn num_columns(&self) -> usize {
53        self.schema.len()
54    }
55
56    #[inline]
57    pub fn num_rows(&self) -> usize {
58        self.rows.len()
59    }
60
61    #[inline]
62    pub fn alloc_one_row(&self) -> Vec<Value> {
63        vec![Value { value_data: None }; self.num_columns()]
64    }
65
66    #[inline]
67    pub fn add_row(&mut self, values: Vec<Value>) {
68        self.rows.push(Row { values })
69    }
70
71    #[allow(dead_code)]
72    pub fn columns(&self) -> &Vec<ColumnSchema> {
73        &self.schema
74    }
75
76    pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
77        (self.schema, self.rows)
78    }
79}
80
81pub struct MultiTableData {
82    table_data_map: HashMap<String, TableData>,
83}
84
85impl Default for MultiTableData {
86    fn default() -> Self {
87        Self::new()
88    }
89}
90
91impl MultiTableData {
92    pub fn new() -> Self {
93        Self {
94            table_data_map: HashMap::new(),
95        }
96    }
97
98    pub fn get_or_default_table_data(
99        &mut self,
100        table_name: impl ToString,
101        num_columns: usize,
102        num_rows: usize,
103    ) -> &mut TableData {
104        self.table_data_map
105            .entry(table_name.to_string())
106            .or_insert_with(|| TableData::new(num_columns, num_rows))
107    }
108
109    pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) {
110        self.table_data_map
111            .insert(table_name.to_string(), table_data);
112    }
113
114    #[allow(dead_code)]
115    pub fn num_tables(&self) -> usize {
116        self.table_data_map.len()
117    }
118
119    /// Returns the request and number of rows in it.
120    pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
121        let mut total_rows = 0;
122        let inserts = self
123            .table_data_map
124            .into_iter()
125            .map(|(table_name, table_data)| {
126                total_rows += table_data.num_rows();
127                let num_columns = table_data.num_columns();
128                let (schema, mut rows) = table_data.into_schema_and_rows();
129                for row in &mut rows {
130                    if num_columns > row.values.len() {
131                        row.values.resize(num_columns, Value { value_data: None });
132                    }
133                }
134
135                RowInsertRequest {
136                    table_name,
137                    rows: Some(Rows { schema, rows }),
138                }
139            })
140            .collect::<Vec<_>>();
141        let row_insert_requests = RowInsertRequests { inserts };
142
143        (row_insert_requests, total_rows)
144    }
145}
146
147/// Write data as tags into the table data.
148pub fn write_tags(
149    table_data: &mut TableData,
150    tags: impl Iterator<Item = (String, String)>,
151    one_row: &mut Vec<Value>,
152) -> Result<()> {
153    let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, Some(ValueData::StringValue(v))));
154    write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
155}
156
157/// Write data as fields into the table data.
158pub fn write_fields(
159    table_data: &mut TableData,
160    fields: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
161    one_row: &mut Vec<Value>,
162) -> Result<()> {
163    write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
164}
165
166/// Write data as a tag into the table data.
167pub fn write_tag(
168    table_data: &mut TableData,
169    name: impl ToString,
170    value: impl ToString,
171    one_row: &mut Vec<Value>,
172) -> Result<()> {
173    write_by_semantic_type(
174        table_data,
175        SemanticType::Tag,
176        std::iter::once((
177            name.to_string(),
178            ColumnDataType::String,
179            Some(ValueData::StringValue(value.to_string())),
180        )),
181        one_row,
182    )
183}
184
185/// Write float64 data as a field into the table data.
186pub fn write_f64(
187    table_data: &mut TableData,
188    name: impl ToString,
189    value: f64,
190    one_row: &mut Vec<Value>,
191) -> Result<()> {
192    write_fields(
193        table_data,
194        std::iter::once((
195            name.to_string(),
196            ColumnDataType::Float64,
197            Some(ValueData::F64Value(value)),
198        )),
199        one_row,
200    )
201}
202
203fn build_json_column_schema(name: impl ToString) -> ColumnSchema {
204    ColumnSchema {
205        column_name: name.to_string(),
206        datatype: ColumnDataType::Binary as i32,
207        semantic_type: SemanticType::Field as i32,
208        datatype_extension: Some(ColumnDataTypeExtension {
209            type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
210        }),
211        ..Default::default()
212    }
213}
214
215pub fn write_json(
216    table_data: &mut TableData,
217    name: impl ToString,
218    value: jsonb::Value,
219    one_row: &mut Vec<Value>,
220) -> Result<()> {
221    write_by_schema(
222        table_data,
223        std::iter::once((
224            build_json_column_schema(name),
225            Some(ValueData::BinaryValue(value.to_vec())),
226        )),
227        one_row,
228    )
229}
230
231fn write_by_schema(
232    table_data: &mut TableData,
233    kv_iter: impl Iterator<Item = (ColumnSchema, Option<ValueData>)>,
234    one_row: &mut Vec<Value>,
235) -> Result<()> {
236    let TableData {
237        schema,
238        column_indexes,
239        ..
240    } = table_data;
241
242    for (column_schema, value) in kv_iter {
243        let index = column_indexes.get(&column_schema.column_name);
244        if let Some(index) = index {
245            check_schema_number(
246                column_schema.datatype,
247                column_schema.semantic_type,
248                &schema[*index],
249            )?;
250            one_row[*index].value_data = value;
251        } else {
252            let index = schema.len();
253            let key = column_schema.column_name.clone();
254            schema.push(column_schema);
255            column_indexes.insert(key, index);
256            one_row.push(Value { value_data: value });
257        }
258    }
259
260    Ok(())
261}
262
263fn write_by_semantic_type(
264    table_data: &mut TableData,
265    semantic_type: SemanticType,
266    ktv_iter: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
267    one_row: &mut Vec<Value>,
268) -> Result<()> {
269    let TableData {
270        schema,
271        column_indexes,
272        ..
273    } = table_data;
274
275    for (name, datatype, value) in ktv_iter {
276        let index = column_indexes.get(&name);
277        if let Some(index) = index {
278            check_schema(datatype, semantic_type, &schema[*index])?;
279            one_row[*index].value_data = value;
280        } else {
281            let index = schema.len();
282            schema.push(ColumnSchema {
283                column_name: name.clone(),
284                datatype: datatype as i32,
285                semantic_type: semantic_type as i32,
286                ..Default::default()
287            });
288            column_indexes.insert(name, index);
289            one_row.push(Value { value_data: value });
290        }
291    }
292
293    Ok(())
294}
295
296/// Write timestamp data as milliseconds into the table data.
297pub fn write_ts_to_millis(
298    table_data: &mut TableData,
299    name: impl ToString,
300    ts: Option<i64>,
301    precision: Precision,
302    one_row: &mut Vec<Value>,
303) -> Result<()> {
304    write_ts_to(
305        table_data,
306        name,
307        ts,
308        precision,
309        TimestampType::Millis,
310        one_row,
311    )
312}
313
314/// Write timestamp data as nanoseconds into the table data.
315pub fn write_ts_to_nanos(
316    table_data: &mut TableData,
317    name: impl ToString,
318    ts: Option<i64>,
319    precision: Precision,
320    one_row: &mut Vec<Value>,
321) -> Result<()> {
322    write_ts_to(
323        table_data,
324        name,
325        ts,
326        precision,
327        TimestampType::Nanos,
328        one_row,
329    )
330}
331
332enum TimestampType {
333    Millis,
334    Nanos,
335}
336
337fn write_ts_to(
338    table_data: &mut TableData,
339    name: impl ToString,
340    ts: Option<i64>,
341    precision: Precision,
342    ts_type: TimestampType,
343    one_row: &mut Vec<Value>,
344) -> Result<()> {
345    let TableData {
346        schema,
347        column_indexes,
348        ..
349    } = table_data;
350    let name = name.to_string();
351
352    let ts = match ts {
353        Some(timestamp) => match ts_type {
354            TimestampType::Millis => precision.to_millis(timestamp),
355            TimestampType::Nanos => precision.to_nanos(timestamp),
356        }
357        .with_context(|| TimestampOverflowSnafu {
358            error: format!(
359                "timestamp {} overflow with precision {}",
360                timestamp, precision
361            ),
362        })?,
363        None => {
364            let timestamp = Timestamp::current_time(Nanosecond);
365            let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?;
366            let timestamp = timestamp
367                .convert_to(unit)
368                .with_context(|| TimePrecisionSnafu {
369                    name: precision.to_string(),
370                })?
371                .into();
372            match ts_type {
373                TimestampType::Millis => precision.to_millis(timestamp),
374                TimestampType::Nanos => precision.to_nanos(timestamp),
375            }
376            .with_context(|| TimestampOverflowSnafu {
377                error: format!(
378                    "timestamp {} overflow with precision {}",
379                    timestamp, precision
380                ),
381            })?
382        }
383    };
384
385    let (datatype, ts) = match ts_type {
386        TimestampType::Millis => (
387            ColumnDataType::TimestampMillisecond,
388            ValueData::TimestampMillisecondValue(ts),
389        ),
390        TimestampType::Nanos => (
391            ColumnDataType::TimestampNanosecond,
392            ValueData::TimestampNanosecondValue(ts),
393        ),
394    };
395
396    let index = column_indexes.get(&name);
397    if let Some(index) = index {
398        check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
399        one_row[*index].value_data = Some(ts);
400    } else {
401        let index = schema.len();
402        schema.push(ColumnSchema {
403            column_name: name.clone(),
404            datatype: datatype as i32,
405            semantic_type: SemanticType::Timestamp as i32,
406            ..Default::default()
407        });
408        column_indexes.insert(name, index);
409        one_row.push(ts.into())
410    }
411
412    Ok(())
413}
414
415fn check_schema(
416    datatype: ColumnDataType,
417    semantic_type: SemanticType,
418    schema: &ColumnSchema,
419) -> Result<()> {
420    check_schema_number(datatype as i32, semantic_type as i32, schema)
421}
422
423fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> {
424    ensure!(
425        schema.datatype == datatype,
426        IncompatibleSchemaSnafu {
427            column_name: &schema.column_name,
428            datatype: "datatype",
429            expected: schema.datatype,
430            actual: datatype,
431        }
432    );
433
434    ensure!(
435        schema.semantic_type == semantic_type,
436        IncompatibleSchemaSnafu {
437            column_name: &schema.column_name,
438            datatype: "semantic_type",
439            expected: schema.semantic_type,
440            actual: semantic_type,
441        }
442    );
443
444    Ok(())
445}