Skip to main content

servers/
row_writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::helper::time_index_column_schema;
19use api::v1::value::ValueData;
20use api::v1::{
21    ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
22    RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
23};
24use common_grpc::precision::Precision;
25use common_time::Timestamp;
26use common_time::timestamp::TimeUnit;
27use common_time::timestamp::TimeUnit::Nanosecond;
28use snafu::{OptionExt, ResultExt, ensure};
29
30use crate::error::{
31    IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu,
32};
33
34/// The intermediate data structure for building the write request.
35/// It constructs the `schema` and `rows` as all input data row
36/// parsing is completed.
37pub struct TableData {
38    schema: Vec<ColumnSchema>,
39    rows: Vec<Row>,
40    column_indexes: HashMap<String, usize>,
41}
42
43impl TableData {
44    pub fn new(num_columns: usize, num_rows: usize) -> Self {
45        Self {
46            schema: Vec::with_capacity(num_columns),
47            rows: Vec::with_capacity(num_rows),
48            column_indexes: HashMap::with_capacity(num_columns),
49        }
50    }
51
52    #[inline]
53    pub fn num_columns(&self) -> usize {
54        self.schema.len()
55    }
56
57    #[inline]
58    pub fn num_rows(&self) -> usize {
59        self.rows.len()
60    }
61
62    #[inline]
63    pub fn alloc_one_row(&self) -> Vec<Value> {
64        vec![Value { value_data: None }; self.num_columns()]
65    }
66
67    #[inline]
68    pub fn add_row(&mut self, values: Vec<Value>) {
69        self.rows.push(Row { values })
70    }
71
72    #[allow(dead_code)]
73    pub fn columns(&self) -> &Vec<ColumnSchema> {
74        &self.schema
75    }
76
77    pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
78        (self.schema, self.rows)
79    }
80
81    /// Writes a field value without enforcing that later writes use the same datatype
82    /// as the first-seen schema entry.
83    ///
84    /// The OTLP trace v1 path uses this to preserve raw mixed values inside one request
85    /// so the frontend can reconcile them later against both the full batch and the
86    /// existing table schema.
87    pub fn write_field_unchecked(
88        &mut self,
89        name: impl ToString,
90        datatype: ColumnDataType,
91        value: Option<ValueData>,
92        one_row: &mut Vec<Value>,
93    ) {
94        let name = name.to_string();
95        if let Some(index) = self.column_indexes.get(&name).copied() {
96            one_row[index].value_data = value;
97        } else {
98            let index = self.schema.len();
99            self.schema.push(ColumnSchema {
100                column_name: name.clone(),
101                datatype: datatype as i32,
102                semantic_type: SemanticType::Field as i32,
103                ..Default::default()
104            });
105            self.column_indexes.insert(name, index);
106            one_row.push(Value { value_data: value });
107        }
108    }
109}
110
111pub struct MultiTableData {
112    table_data_map: HashMap<String, TableData>,
113}
114
115impl Default for MultiTableData {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121impl MultiTableData {
122    pub fn new() -> Self {
123        Self {
124            table_data_map: HashMap::new(),
125        }
126    }
127
128    pub fn get_or_default_table_data(
129        &mut self,
130        table_name: impl ToString,
131        num_columns: usize,
132        num_rows: usize,
133    ) -> &mut TableData {
134        self.table_data_map
135            .entry(table_name.to_string())
136            .or_insert_with(|| TableData::new(num_columns, num_rows))
137    }
138
139    pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) {
140        self.table_data_map
141            .insert(table_name.to_string(), table_data);
142    }
143
144    #[allow(dead_code)]
145    pub fn num_tables(&self) -> usize {
146        self.table_data_map.len()
147    }
148
149    /// Returns the request and number of rows in it.
150    pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
151        let mut total_rows = 0;
152        let inserts = self
153            .table_data_map
154            .into_iter()
155            .map(|(table_name, table_data)| {
156                total_rows += table_data.num_rows();
157                let num_columns = table_data.num_columns();
158                let (schema, mut rows) = table_data.into_schema_and_rows();
159                for row in &mut rows {
160                    if num_columns > row.values.len() {
161                        row.values.resize(num_columns, Value { value_data: None });
162                    }
163                }
164
165                RowInsertRequest {
166                    table_name,
167                    rows: Some(Rows { schema, rows }),
168                }
169            })
170            .collect::<Vec<_>>();
171        let row_insert_requests = RowInsertRequests { inserts };
172
173        (row_insert_requests, total_rows)
174    }
175}
176
177/// Write data as tags into the table data.
178pub fn write_tags(
179    table_data: &mut TableData,
180    tags: impl Iterator<Item = (String, String)>,
181    one_row: &mut Vec<Value>,
182) -> Result<()> {
183    let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, Some(ValueData::StringValue(v))));
184    write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
185}
186
187/// Write data as fields into the table data.
188pub fn write_fields(
189    table_data: &mut TableData,
190    fields: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
191    one_row: &mut Vec<Value>,
192) -> Result<()> {
193    write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
194}
195
196/// Write data as a tag into the table data.
197pub fn write_tag(
198    table_data: &mut TableData,
199    name: impl ToString,
200    value: impl ToString,
201    one_row: &mut Vec<Value>,
202) -> Result<()> {
203    write_by_semantic_type(
204        table_data,
205        SemanticType::Tag,
206        std::iter::once((
207            name.to_string(),
208            ColumnDataType::String,
209            Some(ValueData::StringValue(value.to_string())),
210        )),
211        one_row,
212    )
213}
214
215/// Write float64 data as a field into the table data.
216pub fn write_f64(
217    table_data: &mut TableData,
218    name: impl ToString,
219    value: f64,
220    one_row: &mut Vec<Value>,
221) -> Result<()> {
222    write_fields(
223        table_data,
224        std::iter::once((
225            name.to_string(),
226            ColumnDataType::Float64,
227            Some(ValueData::F64Value(value)),
228        )),
229        one_row,
230    )
231}
232
233fn build_json_column_schema(name: impl ToString) -> ColumnSchema {
234    ColumnSchema {
235        column_name: name.to_string(),
236        datatype: ColumnDataType::Binary as i32,
237        semantic_type: SemanticType::Field as i32,
238        datatype_extension: Some(ColumnDataTypeExtension {
239            type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
240        }),
241        ..Default::default()
242    }
243}
244
245pub fn write_json(
246    table_data: &mut TableData,
247    name: impl ToString,
248    value: jsonb::Value,
249    one_row: &mut Vec<Value>,
250) -> Result<()> {
251    write_by_schema(
252        table_data,
253        std::iter::once((
254            build_json_column_schema(name),
255            Some(ValueData::BinaryValue(value.to_vec())),
256        )),
257        one_row,
258    )
259}
260
261fn write_by_schema(
262    table_data: &mut TableData,
263    kv_iter: impl Iterator<Item = (ColumnSchema, Option<ValueData>)>,
264    one_row: &mut Vec<Value>,
265) -> Result<()> {
266    let TableData {
267        schema,
268        column_indexes,
269        ..
270    } = table_data;
271
272    for (column_schema, value) in kv_iter {
273        let index = column_indexes.get(&column_schema.column_name);
274        if let Some(index) = index {
275            check_schema_number(
276                column_schema.datatype,
277                column_schema.semantic_type,
278                &schema[*index],
279            )?;
280            one_row[*index].value_data = value;
281        } else {
282            let index = schema.len();
283            let key = column_schema.column_name.clone();
284            schema.push(column_schema);
285            column_indexes.insert(key, index);
286            one_row.push(Value { value_data: value });
287        }
288    }
289
290    Ok(())
291}
292
293fn write_by_semantic_type(
294    table_data: &mut TableData,
295    semantic_type: SemanticType,
296    ktv_iter: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
297    one_row: &mut Vec<Value>,
298) -> Result<()> {
299    let TableData {
300        schema,
301        column_indexes,
302        ..
303    } = table_data;
304
305    for (name, datatype, value) in ktv_iter {
306        let index = column_indexes.get(&name);
307        if let Some(index) = index {
308            check_schema(datatype, semantic_type, &schema[*index])?;
309            one_row[*index].value_data = value;
310        } else {
311            let index = schema.len();
312            schema.push(ColumnSchema {
313                column_name: name.clone(),
314                datatype: datatype as i32,
315                semantic_type: semantic_type as i32,
316                ..Default::default()
317            });
318            column_indexes.insert(name, index);
319            one_row.push(Value { value_data: value });
320        }
321    }
322
323    Ok(())
324}
325
326/// Write timestamp data as milliseconds into the table data.
327pub fn write_ts_to_millis(
328    table_data: &mut TableData,
329    name: impl ToString,
330    ts: Option<i64>,
331    precision: Precision,
332    one_row: &mut Vec<Value>,
333) -> Result<()> {
334    write_ts_to(
335        table_data,
336        name,
337        ts,
338        precision,
339        TimestampType::Millis,
340        one_row,
341    )
342}
343
344/// Write timestamp data as nanoseconds into the table data.
345pub fn write_ts_to_nanos(
346    table_data: &mut TableData,
347    name: impl ToString,
348    ts: Option<i64>,
349    precision: Precision,
350    one_row: &mut Vec<Value>,
351) -> Result<()> {
352    write_ts_to(
353        table_data,
354        name,
355        ts,
356        precision,
357        TimestampType::Nanos,
358        one_row,
359    )
360}
361
362enum TimestampType {
363    Millis,
364    Nanos,
365}
366
367fn write_ts_to(
368    table_data: &mut TableData,
369    name: impl ToString,
370    ts: Option<i64>,
371    precision: Precision,
372    ts_type: TimestampType,
373    one_row: &mut Vec<Value>,
374) -> Result<()> {
375    let TableData {
376        schema,
377        column_indexes,
378        ..
379    } = table_data;
380    let name = name.to_string();
381
382    let ts = match ts {
383        Some(timestamp) => match ts_type {
384            TimestampType::Millis => precision.to_millis(timestamp),
385            TimestampType::Nanos => precision.to_nanos(timestamp),
386        }
387        .with_context(|| TimestampOverflowSnafu {
388            error: format!(
389                "timestamp {} overflow with precision {}",
390                timestamp, precision
391            ),
392        })?,
393        None => {
394            let timestamp = Timestamp::current_time(Nanosecond);
395            let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?;
396            let timestamp = timestamp
397                .convert_to(unit)
398                .with_context(|| TimePrecisionSnafu {
399                    name: precision.to_string(),
400                })?
401                .into();
402            match ts_type {
403                TimestampType::Millis => precision.to_millis(timestamp),
404                TimestampType::Nanos => precision.to_nanos(timestamp),
405            }
406            .with_context(|| TimestampOverflowSnafu {
407                error: format!(
408                    "timestamp {} overflow with precision {}",
409                    timestamp, precision
410                ),
411            })?
412        }
413    };
414
415    let (datatype, ts) = match ts_type {
416        TimestampType::Millis => (
417            ColumnDataType::TimestampMillisecond,
418            ValueData::TimestampMillisecondValue(ts),
419        ),
420        TimestampType::Nanos => (
421            ColumnDataType::TimestampNanosecond,
422            ValueData::TimestampNanosecondValue(ts),
423        ),
424    };
425
426    let index = column_indexes.get(&name);
427    if let Some(index) = index {
428        check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
429        one_row[*index].value_data = Some(ts);
430    } else {
431        let index = schema.len();
432        schema.push(time_index_column_schema(&name, datatype));
433        column_indexes.insert(name, index);
434        one_row.push(ts.into())
435    }
436
437    Ok(())
438}
439
440fn check_schema(
441    datatype: ColumnDataType,
442    semantic_type: SemanticType,
443    schema: &ColumnSchema,
444) -> Result<()> {
445    check_schema_number(datatype as i32, semantic_type as i32, schema)
446}
447
448fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> {
449    ensure!(
450        schema.datatype == datatype,
451        IncompatibleSchemaSnafu {
452            column_name: &schema.column_name,
453            datatype: "datatype",
454            expected: schema.datatype,
455            actual: datatype,
456        }
457    );
458
459    ensure!(
460        schema.semantic_type == semantic_type,
461        IncompatibleSchemaSnafu {
462            column_name: &schema.column_name,
463            datatype: "semantic_type",
464            expected: schema.semantic_type,
465            actual: semantic_type,
466        }
467    );
468
469    Ok(())
470}