servers/
row_writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::helper::time_index_column_schema;
19use api::v1::value::ValueData;
20use api::v1::{
21    ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
22    RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
23};
24use common_grpc::precision::Precision;
25use common_time::timestamp::TimeUnit;
26use common_time::timestamp::TimeUnit::Nanosecond;
27use common_time::Timestamp;
28use snafu::{ensure, OptionExt, ResultExt};
29
30use crate::error::{
31    IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu,
32};
33
34/// The intermediate data structure for building the write request.
35/// It constructs the `schema` and `rows` as all input data row
36/// parsing is completed.
37pub struct TableData {
38    schema: Vec<ColumnSchema>,
39    rows: Vec<Row>,
40    column_indexes: HashMap<String, usize>,
41}
42
43impl TableData {
44    pub fn new(num_columns: usize, num_rows: usize) -> Self {
45        Self {
46            schema: Vec::with_capacity(num_columns),
47            rows: Vec::with_capacity(num_rows),
48            column_indexes: HashMap::with_capacity(num_columns),
49        }
50    }
51
52    #[inline]
53    pub fn num_columns(&self) -> usize {
54        self.schema.len()
55    }
56
57    #[inline]
58    pub fn num_rows(&self) -> usize {
59        self.rows.len()
60    }
61
62    #[inline]
63    pub fn alloc_one_row(&self) -> Vec<Value> {
64        vec![Value { value_data: None }; self.num_columns()]
65    }
66
67    #[inline]
68    pub fn add_row(&mut self, values: Vec<Value>) {
69        self.rows.push(Row { values })
70    }
71
72    #[allow(dead_code)]
73    pub fn columns(&self) -> &Vec<ColumnSchema> {
74        &self.schema
75    }
76
77    pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
78        (self.schema, self.rows)
79    }
80}
81
82pub struct MultiTableData {
83    table_data_map: HashMap<String, TableData>,
84}
85
86impl Default for MultiTableData {
87    fn default() -> Self {
88        Self::new()
89    }
90}
91
92impl MultiTableData {
93    pub fn new() -> Self {
94        Self {
95            table_data_map: HashMap::new(),
96        }
97    }
98
99    pub fn get_or_default_table_data(
100        &mut self,
101        table_name: impl ToString,
102        num_columns: usize,
103        num_rows: usize,
104    ) -> &mut TableData {
105        self.table_data_map
106            .entry(table_name.to_string())
107            .or_insert_with(|| TableData::new(num_columns, num_rows))
108    }
109
110    pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) {
111        self.table_data_map
112            .insert(table_name.to_string(), table_data);
113    }
114
115    #[allow(dead_code)]
116    pub fn num_tables(&self) -> usize {
117        self.table_data_map.len()
118    }
119
120    /// Returns the request and number of rows in it.
121    pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
122        let mut total_rows = 0;
123        let inserts = self
124            .table_data_map
125            .into_iter()
126            .map(|(table_name, table_data)| {
127                total_rows += table_data.num_rows();
128                let num_columns = table_data.num_columns();
129                let (schema, mut rows) = table_data.into_schema_and_rows();
130                for row in &mut rows {
131                    if num_columns > row.values.len() {
132                        row.values.resize(num_columns, Value { value_data: None });
133                    }
134                }
135
136                RowInsertRequest {
137                    table_name,
138                    rows: Some(Rows { schema, rows }),
139                }
140            })
141            .collect::<Vec<_>>();
142        let row_insert_requests = RowInsertRequests { inserts };
143
144        (row_insert_requests, total_rows)
145    }
146}
147
148/// Write data as tags into the table data.
149pub fn write_tags(
150    table_data: &mut TableData,
151    tags: impl Iterator<Item = (String, String)>,
152    one_row: &mut Vec<Value>,
153) -> Result<()> {
154    let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, Some(ValueData::StringValue(v))));
155    write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
156}
157
158/// Write data as fields into the table data.
159pub fn write_fields(
160    table_data: &mut TableData,
161    fields: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
162    one_row: &mut Vec<Value>,
163) -> Result<()> {
164    write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
165}
166
167/// Write data as a tag into the table data.
168pub fn write_tag(
169    table_data: &mut TableData,
170    name: impl ToString,
171    value: impl ToString,
172    one_row: &mut Vec<Value>,
173) -> Result<()> {
174    write_by_semantic_type(
175        table_data,
176        SemanticType::Tag,
177        std::iter::once((
178            name.to_string(),
179            ColumnDataType::String,
180            Some(ValueData::StringValue(value.to_string())),
181        )),
182        one_row,
183    )
184}
185
186/// Write float64 data as a field into the table data.
187pub fn write_f64(
188    table_data: &mut TableData,
189    name: impl ToString,
190    value: f64,
191    one_row: &mut Vec<Value>,
192) -> Result<()> {
193    write_fields(
194        table_data,
195        std::iter::once((
196            name.to_string(),
197            ColumnDataType::Float64,
198            Some(ValueData::F64Value(value)),
199        )),
200        one_row,
201    )
202}
203
204fn build_json_column_schema(name: impl ToString) -> ColumnSchema {
205    ColumnSchema {
206        column_name: name.to_string(),
207        datatype: ColumnDataType::Binary as i32,
208        semantic_type: SemanticType::Field as i32,
209        datatype_extension: Some(ColumnDataTypeExtension {
210            type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
211        }),
212        ..Default::default()
213    }
214}
215
216pub fn write_json(
217    table_data: &mut TableData,
218    name: impl ToString,
219    value: jsonb::Value,
220    one_row: &mut Vec<Value>,
221) -> Result<()> {
222    write_by_schema(
223        table_data,
224        std::iter::once((
225            build_json_column_schema(name),
226            Some(ValueData::BinaryValue(value.to_vec())),
227        )),
228        one_row,
229    )
230}
231
232fn write_by_schema(
233    table_data: &mut TableData,
234    kv_iter: impl Iterator<Item = (ColumnSchema, Option<ValueData>)>,
235    one_row: &mut Vec<Value>,
236) -> Result<()> {
237    let TableData {
238        schema,
239        column_indexes,
240        ..
241    } = table_data;
242
243    for (column_schema, value) in kv_iter {
244        let index = column_indexes.get(&column_schema.column_name);
245        if let Some(index) = index {
246            check_schema_number(
247                column_schema.datatype,
248                column_schema.semantic_type,
249                &schema[*index],
250            )?;
251            one_row[*index].value_data = value;
252        } else {
253            let index = schema.len();
254            let key = column_schema.column_name.clone();
255            schema.push(column_schema);
256            column_indexes.insert(key, index);
257            one_row.push(Value { value_data: value });
258        }
259    }
260
261    Ok(())
262}
263
264fn write_by_semantic_type(
265    table_data: &mut TableData,
266    semantic_type: SemanticType,
267    ktv_iter: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
268    one_row: &mut Vec<Value>,
269) -> Result<()> {
270    let TableData {
271        schema,
272        column_indexes,
273        ..
274    } = table_data;
275
276    for (name, datatype, value) in ktv_iter {
277        let index = column_indexes.get(&name);
278        if let Some(index) = index {
279            check_schema(datatype, semantic_type, &schema[*index])?;
280            one_row[*index].value_data = value;
281        } else {
282            let index = schema.len();
283            schema.push(ColumnSchema {
284                column_name: name.clone(),
285                datatype: datatype as i32,
286                semantic_type: semantic_type as i32,
287                ..Default::default()
288            });
289            column_indexes.insert(name, index);
290            one_row.push(Value { value_data: value });
291        }
292    }
293
294    Ok(())
295}
296
297/// Write timestamp data as milliseconds into the table data.
298pub fn write_ts_to_millis(
299    table_data: &mut TableData,
300    name: impl ToString,
301    ts: Option<i64>,
302    precision: Precision,
303    one_row: &mut Vec<Value>,
304) -> Result<()> {
305    write_ts_to(
306        table_data,
307        name,
308        ts,
309        precision,
310        TimestampType::Millis,
311        one_row,
312    )
313}
314
315/// Write timestamp data as nanoseconds into the table data.
316pub fn write_ts_to_nanos(
317    table_data: &mut TableData,
318    name: impl ToString,
319    ts: Option<i64>,
320    precision: Precision,
321    one_row: &mut Vec<Value>,
322) -> Result<()> {
323    write_ts_to(
324        table_data,
325        name,
326        ts,
327        precision,
328        TimestampType::Nanos,
329        one_row,
330    )
331}
332
333enum TimestampType {
334    Millis,
335    Nanos,
336}
337
338fn write_ts_to(
339    table_data: &mut TableData,
340    name: impl ToString,
341    ts: Option<i64>,
342    precision: Precision,
343    ts_type: TimestampType,
344    one_row: &mut Vec<Value>,
345) -> Result<()> {
346    let TableData {
347        schema,
348        column_indexes,
349        ..
350    } = table_data;
351    let name = name.to_string();
352
353    let ts = match ts {
354        Some(timestamp) => match ts_type {
355            TimestampType::Millis => precision.to_millis(timestamp),
356            TimestampType::Nanos => precision.to_nanos(timestamp),
357        }
358        .with_context(|| TimestampOverflowSnafu {
359            error: format!(
360                "timestamp {} overflow with precision {}",
361                timestamp, precision
362            ),
363        })?,
364        None => {
365            let timestamp = Timestamp::current_time(Nanosecond);
366            let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?;
367            let timestamp = timestamp
368                .convert_to(unit)
369                .with_context(|| TimePrecisionSnafu {
370                    name: precision.to_string(),
371                })?
372                .into();
373            match ts_type {
374                TimestampType::Millis => precision.to_millis(timestamp),
375                TimestampType::Nanos => precision.to_nanos(timestamp),
376            }
377            .with_context(|| TimestampOverflowSnafu {
378                error: format!(
379                    "timestamp {} overflow with precision {}",
380                    timestamp, precision
381                ),
382            })?
383        }
384    };
385
386    let (datatype, ts) = match ts_type {
387        TimestampType::Millis => (
388            ColumnDataType::TimestampMillisecond,
389            ValueData::TimestampMillisecondValue(ts),
390        ),
391        TimestampType::Nanos => (
392            ColumnDataType::TimestampNanosecond,
393            ValueData::TimestampNanosecondValue(ts),
394        ),
395    };
396
397    let index = column_indexes.get(&name);
398    if let Some(index) = index {
399        check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
400        one_row[*index].value_data = Some(ts);
401    } else {
402        let index = schema.len();
403        schema.push(time_index_column_schema(&name, datatype));
404        column_indexes.insert(name, index);
405        one_row.push(ts.into())
406    }
407
408    Ok(())
409}
410
411fn check_schema(
412    datatype: ColumnDataType,
413    semantic_type: SemanticType,
414    schema: &ColumnSchema,
415) -> Result<()> {
416    check_schema_number(datatype as i32, semantic_type as i32, schema)
417}
418
419fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> {
420    ensure!(
421        schema.datatype == datatype,
422        IncompatibleSchemaSnafu {
423            column_name: &schema.column_name,
424            datatype: "datatype",
425            expected: schema.datatype,
426            actual: datatype,
427        }
428    );
429
430    ensure!(
431        schema.semantic_type == semantic_type,
432        IncompatibleSchemaSnafu {
433            column_name: &schema.column_name,
434            datatype: "semantic_type",
435            expected: schema.semantic_type,
436            actual: semantic_type,
437        }
438    );
439
440    Ok(())
441}