Skip to main content

servers/
row_writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16
17use api::v1::column_data_type_extension::TypeExt;
18use api::v1::helper::time_index_column_schema;
19use api::v1::value::ValueData;
20use api::v1::{
21    ColumnDataType, ColumnDataTypeExtension, ColumnSchema, JsonTypeExtension, Row,
22    RowInsertRequest, RowInsertRequests, Rows, SemanticType, Value,
23};
24use common_grpc::precision::Precision;
25use common_time::Timestamp;
26use common_time::timestamp::TimeUnit;
27use common_time::timestamp::TimeUnit::Nanosecond;
28use snafu::{OptionExt, ResultExt, ensure};
29
30use crate::error::{
31    IncompatibleSchemaSnafu, Result, RowWriterSnafu, TimePrecisionSnafu, TimestampOverflowSnafu,
32};
33
34/// The intermediate data structure for building the write request.
35/// It constructs the `schema` and `rows` as all input data row
36/// parsing is completed.
37pub struct TableData {
38    schema: Vec<ColumnSchema>,
39    rows: Vec<Row>,
40    column_indexes: HashMap<String, usize>,
41}
42
43impl TableData {
44    pub fn new(num_columns: usize, num_rows: usize) -> Self {
45        Self {
46            schema: Vec::with_capacity(num_columns),
47            rows: Vec::with_capacity(num_rows),
48            column_indexes: HashMap::with_capacity(num_columns),
49        }
50    }
51
52    #[inline]
53    pub fn num_columns(&self) -> usize {
54        self.schema.len()
55    }
56
57    #[inline]
58    pub fn num_rows(&self) -> usize {
59        self.rows.len()
60    }
61
62    #[inline]
63    pub fn alloc_one_row(&self) -> Vec<Value> {
64        vec![Value { value_data: None }; self.num_columns()]
65    }
66
67    #[inline]
68    pub fn add_row(&mut self, values: Vec<Value>) {
69        self.rows.push(Row { values })
70    }
71
72    #[inline]
73    pub fn reserve_rows(&mut self, additional: usize) {
74        self.rows.reserve(additional);
75    }
76
77    #[allow(dead_code)]
78    pub fn columns(&self) -> &Vec<ColumnSchema> {
79        &self.schema
80    }
81
82    pub fn into_schema_and_rows(self) -> (Vec<ColumnSchema>, Vec<Row>) {
83        (self.schema, self.rows)
84    }
85
86    /// Writes a field value without enforcing that later writes use the same datatype
87    /// as the first-seen schema entry.
88    ///
89    /// The OTLP trace v1 path uses this to preserve raw mixed values inside one request
90    /// so the frontend can reconcile them later against both the full batch and the
91    /// existing table schema.
92    pub fn write_field_unchecked(
93        &mut self,
94        name: impl ToString,
95        datatype: ColumnDataType,
96        value: Option<ValueData>,
97        one_row: &mut Vec<Value>,
98    ) {
99        let name = name.to_string();
100        if let Some(index) = self.column_indexes.get(&name).copied() {
101            one_row[index].value_data = value;
102        } else {
103            let index = self.schema.len();
104            self.schema.push(ColumnSchema {
105                column_name: name.clone(),
106                datatype: datatype as i32,
107                semantic_type: SemanticType::Field as i32,
108                ..Default::default()
109            });
110            self.column_indexes.insert(name, index);
111            one_row.push(Value { value_data: value });
112        }
113    }
114}
115
116pub struct MultiTableData {
117    table_data_map: HashMap<String, TableData>,
118}
119
120impl Default for MultiTableData {
121    fn default() -> Self {
122        Self::new()
123    }
124}
125
126impl MultiTableData {
127    pub fn new() -> Self {
128        Self {
129            table_data_map: HashMap::new(),
130        }
131    }
132
133    pub fn get_or_default_table_data(
134        &mut self,
135        table_name: impl ToString,
136        num_columns: usize,
137        num_rows: usize,
138    ) -> &mut TableData {
139        self.table_data_map
140            .entry(table_name.to_string())
141            .or_insert_with(|| TableData::new(num_columns, num_rows))
142    }
143
144    pub fn add_table_data(&mut self, table_name: impl ToString, table_data: TableData) {
145        self.table_data_map
146            .insert(table_name.to_string(), table_data);
147    }
148
149    #[allow(dead_code)]
150    pub fn num_tables(&self) -> usize {
151        self.table_data_map.len()
152    }
153
154    /// Returns the request and number of rows in it.
155    pub fn into_row_insert_requests(self) -> (RowInsertRequests, usize) {
156        let mut total_rows = 0;
157        let inserts = self
158            .table_data_map
159            .into_iter()
160            .map(|(table_name, table_data)| {
161                total_rows += table_data.num_rows();
162                let num_columns = table_data.num_columns();
163                let (schema, mut rows) = table_data.into_schema_and_rows();
164                for row in &mut rows {
165                    if num_columns > row.values.len() {
166                        row.values.resize(num_columns, Value { value_data: None });
167                    }
168                }
169
170                RowInsertRequest {
171                    table_name,
172                    rows: Some(Rows { schema, rows }),
173                }
174            })
175            .collect::<Vec<_>>();
176        let row_insert_requests = RowInsertRequests { inserts };
177
178        (row_insert_requests, total_rows)
179    }
180}
181
182/// Write data as tags into the table data.
183pub fn write_tags(
184    table_data: &mut TableData,
185    tags: impl Iterator<Item = (String, String)>,
186    one_row: &mut Vec<Value>,
187) -> Result<()> {
188    let ktv_iter = tags.map(|(k, v)| (k, ColumnDataType::String, Some(ValueData::StringValue(v))));
189    write_by_semantic_type(table_data, SemanticType::Tag, ktv_iter, one_row)
190}
191
192/// Write data as fields into the table data.
193pub fn write_fields(
194    table_data: &mut TableData,
195    fields: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
196    one_row: &mut Vec<Value>,
197) -> Result<()> {
198    write_by_semantic_type(table_data, SemanticType::Field, fields, one_row)
199}
200
201/// Write data as a tag into the table data.
202pub fn write_tag(
203    table_data: &mut TableData,
204    name: impl ToString,
205    value: impl ToString,
206    one_row: &mut Vec<Value>,
207) -> Result<()> {
208    write_by_semantic_type(
209        table_data,
210        SemanticType::Tag,
211        std::iter::once((
212            name.to_string(),
213            ColumnDataType::String,
214            Some(ValueData::StringValue(value.to_string())),
215        )),
216        one_row,
217    )
218}
219
220/// Write float64 data as a field into the table data.
221pub fn write_f64(
222    table_data: &mut TableData,
223    name: impl ToString,
224    value: f64,
225    one_row: &mut Vec<Value>,
226) -> Result<()> {
227    write_fields(
228        table_data,
229        std::iter::once((
230            name.to_string(),
231            ColumnDataType::Float64,
232            Some(ValueData::F64Value(value)),
233        )),
234        one_row,
235    )
236}
237
238fn build_json_column_schema(name: impl ToString) -> ColumnSchema {
239    ColumnSchema {
240        column_name: name.to_string(),
241        datatype: ColumnDataType::Binary as i32,
242        semantic_type: SemanticType::Field as i32,
243        datatype_extension: Some(ColumnDataTypeExtension {
244            type_ext: Some(TypeExt::JsonType(JsonTypeExtension::JsonBinary.into())),
245        }),
246        ..Default::default()
247    }
248}
249
250pub fn write_json(
251    table_data: &mut TableData,
252    name: impl ToString,
253    value: jsonb::Value,
254    one_row: &mut Vec<Value>,
255) -> Result<()> {
256    write_by_schema(
257        table_data,
258        std::iter::once((
259            build_json_column_schema(name),
260            Some(ValueData::BinaryValue(value.to_vec())),
261        )),
262        one_row,
263    )
264}
265
266fn write_by_schema(
267    table_data: &mut TableData,
268    kv_iter: impl Iterator<Item = (ColumnSchema, Option<ValueData>)>,
269    one_row: &mut Vec<Value>,
270) -> Result<()> {
271    let TableData {
272        schema,
273        column_indexes,
274        ..
275    } = table_data;
276
277    for (column_schema, value) in kv_iter {
278        let index = column_indexes.get(&column_schema.column_name);
279        if let Some(index) = index {
280            check_schema_number(
281                column_schema.datatype,
282                column_schema.semantic_type,
283                &schema[*index],
284            )?;
285            one_row[*index].value_data = value;
286        } else {
287            let index = schema.len();
288            let key = column_schema.column_name.clone();
289            schema.push(column_schema);
290            column_indexes.insert(key, index);
291            one_row.push(Value { value_data: value });
292        }
293    }
294
295    Ok(())
296}
297
298fn write_by_semantic_type(
299    table_data: &mut TableData,
300    semantic_type: SemanticType,
301    ktv_iter: impl Iterator<Item = (String, ColumnDataType, Option<ValueData>)>,
302    one_row: &mut Vec<Value>,
303) -> Result<()> {
304    let TableData {
305        schema,
306        column_indexes,
307        ..
308    } = table_data;
309
310    for (name, datatype, value) in ktv_iter {
311        let index = column_indexes.get(&name);
312        if let Some(index) = index {
313            check_schema(datatype, semantic_type, &schema[*index])?;
314            one_row[*index].value_data = value;
315        } else {
316            let index = schema.len();
317            schema.push(ColumnSchema {
318                column_name: name.clone(),
319                datatype: datatype as i32,
320                semantic_type: semantic_type as i32,
321                ..Default::default()
322            });
323            column_indexes.insert(name, index);
324            one_row.push(Value { value_data: value });
325        }
326    }
327
328    Ok(())
329}
330
331/// Write timestamp data as milliseconds into the table data.
332pub fn write_ts_to_millis(
333    table_data: &mut TableData,
334    name: impl ToString,
335    ts: Option<i64>,
336    precision: Precision,
337    one_row: &mut Vec<Value>,
338) -> Result<()> {
339    write_ts_to(
340        table_data,
341        name,
342        ts,
343        precision,
344        TimestampType::Millis,
345        one_row,
346    )
347}
348
349/// Write timestamp data as nanoseconds into the table data.
350pub fn write_ts_to_nanos(
351    table_data: &mut TableData,
352    name: impl ToString,
353    ts: Option<i64>,
354    precision: Precision,
355    one_row: &mut Vec<Value>,
356) -> Result<()> {
357    write_ts_to(
358        table_data,
359        name,
360        ts,
361        precision,
362        TimestampType::Nanos,
363        one_row,
364    )
365}
366
367enum TimestampType {
368    Millis,
369    Nanos,
370}
371
372fn write_ts_to(
373    table_data: &mut TableData,
374    name: impl ToString,
375    ts: Option<i64>,
376    precision: Precision,
377    ts_type: TimestampType,
378    one_row: &mut Vec<Value>,
379) -> Result<()> {
380    let TableData {
381        schema,
382        column_indexes,
383        ..
384    } = table_data;
385    let name = name.to_string();
386
387    let ts = match ts {
388        Some(timestamp) => match ts_type {
389            TimestampType::Millis => precision.to_millis(timestamp),
390            TimestampType::Nanos => precision.to_nanos(timestamp),
391        }
392        .with_context(|| TimestampOverflowSnafu {
393            error: format!(
394                "timestamp {} overflow with precision {}",
395                timestamp, precision
396            ),
397        })?,
398        None => {
399            let timestamp = Timestamp::current_time(Nanosecond);
400            let unit: TimeUnit = precision.try_into().context(RowWriterSnafu)?;
401            let timestamp = timestamp
402                .convert_to(unit)
403                .with_context(|| TimePrecisionSnafu {
404                    name: precision.to_string(),
405                })?
406                .into();
407            match ts_type {
408                TimestampType::Millis => precision.to_millis(timestamp),
409                TimestampType::Nanos => precision.to_nanos(timestamp),
410            }
411            .with_context(|| TimestampOverflowSnafu {
412                error: format!(
413                    "timestamp {} overflow with precision {}",
414                    timestamp, precision
415                ),
416            })?
417        }
418    };
419
420    let (datatype, ts) = match ts_type {
421        TimestampType::Millis => (
422            ColumnDataType::TimestampMillisecond,
423            ValueData::TimestampMillisecondValue(ts),
424        ),
425        TimestampType::Nanos => (
426            ColumnDataType::TimestampNanosecond,
427            ValueData::TimestampNanosecondValue(ts),
428        ),
429    };
430
431    let index = column_indexes.get(&name);
432    if let Some(index) = index {
433        check_schema(datatype, SemanticType::Timestamp, &schema[*index])?;
434        one_row[*index].value_data = Some(ts);
435    } else {
436        let index = schema.len();
437        schema.push(time_index_column_schema(&name, datatype));
438        column_indexes.insert(name, index);
439        one_row.push(ts.into())
440    }
441
442    Ok(())
443}
444
445fn check_schema(
446    datatype: ColumnDataType,
447    semantic_type: SemanticType,
448    schema: &ColumnSchema,
449) -> Result<()> {
450    check_schema_number(datatype as i32, semantic_type as i32, schema)
451}
452
453fn check_schema_number(datatype: i32, semantic_type: i32, schema: &ColumnSchema) -> Result<()> {
454    ensure!(
455        schema.datatype == datatype,
456        IncompatibleSchemaSnafu {
457            column_name: &schema.column_name,
458            datatype: "datatype",
459            expected: schema.datatype,
460            actual: datatype,
461        }
462    );
463
464    ensure!(
465        schema.semantic_type == semantic_type,
466        IncompatibleSchemaSnafu {
467            column_name: &schema.column_name,
468            datatype: "semantic_type",
469            expected: schema.semantic_type,
470            actual: semantic_type,
471        }
472    );
473
474    Ok(())
475}