servers/
prom_row_builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::string::ToString;
17
18use ahash::HashMap;
19use api::prom_store::remote::Sample;
20use api::v1::value::ValueData;
21use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
22use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
23use prost::DecodeError;
24
25use crate::http::PromValidationMode;
26use crate::proto::{decode_string, PromLabel};
27use crate::repeated_field::Clear;
28
29/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests].
30#[derive(Default, Debug)]
31pub(crate) struct TablesBuilder {
32    tables: HashMap<String, TableBuilder>,
33}
34
35impl Clear for TablesBuilder {
36    fn clear(&mut self) {
37        self.tables.clear();
38    }
39}
40
41impl TablesBuilder {
42    /// Gets table builder with given table name. Creates an empty [TableBuilder] if not exist.
43    pub(crate) fn get_or_create_table_builder(
44        &mut self,
45        table_name: String,
46        label_num: usize,
47        row_num: usize,
48    ) -> &mut TableBuilder {
49        self.tables
50            .entry(table_name)
51            .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
52    }
53
54    /// Converts [TablesBuilder] to [RowInsertRequests] and row numbers and clears inner states.
55    pub(crate) fn as_insert_requests(&mut self) -> Vec<RowInsertRequest> {
56        self.tables
57            .drain()
58            .map(|(name, mut table)| table.as_row_insert_request(name))
59            .collect()
60    }
61}
62
63/// Builder for one table.
64#[derive(Debug)]
65pub(crate) struct TableBuilder {
66    /// Column schemas.
67    schema: Vec<ColumnSchema>,
68    /// Rows written.
69    rows: Vec<Row>,
70    /// Indices of columns inside `schema`.
71    col_indexes: HashMap<String, usize>,
72}
73
74impl Default for TableBuilder {
75    fn default() -> Self {
76        Self::with_capacity(2, 0)
77    }
78}
79
80impl TableBuilder {
81    pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
82        let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
83        col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0);
84        col_indexes.insert(GREPTIME_VALUE.to_string(), 1);
85
86        let mut schema = Vec::with_capacity(cols);
87        schema.push(ColumnSchema {
88            column_name: GREPTIME_TIMESTAMP.to_string(),
89            datatype: ColumnDataType::TimestampMillisecond as i32,
90            semantic_type: SemanticType::Timestamp as i32,
91            datatype_extension: None,
92            options: None,
93        });
94
95        schema.push(ColumnSchema {
96            column_name: GREPTIME_VALUE.to_string(),
97            datatype: ColumnDataType::Float64 as i32,
98            semantic_type: SemanticType::Field as i32,
99            datatype_extension: None,
100            options: None,
101        });
102
103        Self {
104            schema,
105            rows: Vec::with_capacity(rows),
106            col_indexes,
107        }
108    }
109
110    /// Adds a set of labels and samples to table builder.
111    pub(crate) fn add_labels_and_samples(
112        &mut self,
113        labels: &[PromLabel],
114        samples: &[Sample],
115        prom_validation_mode: PromValidationMode,
116    ) -> Result<(), DecodeError> {
117        let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
118
119        for PromLabel { name, value } in labels {
120            let tag_name = decode_string(name, prom_validation_mode)?;
121            let tag_value = decode_string(value, prom_validation_mode)?;
122            let tag_value = Some(ValueData::StringValue(tag_value));
123            let tag_num = self.col_indexes.len();
124
125            match self.col_indexes.entry(tag_name) {
126                Entry::Occupied(e) => {
127                    row[*e.get()].value_data = tag_value;
128                }
129                Entry::Vacant(e) => {
130                    let column_name = e.key().clone();
131                    e.insert(tag_num);
132                    self.schema.push(ColumnSchema {
133                        column_name,
134                        datatype: ColumnDataType::String as i32,
135                        semantic_type: SemanticType::Tag as i32,
136                        datatype_extension: None,
137                        options: None,
138                    });
139                    row.push(Value {
140                        value_data: tag_value,
141                    });
142                }
143            }
144        }
145
146        if samples.len() == 1 {
147            let sample = &samples[0];
148            row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
149            row[1].value_data = Some(ValueData::F64Value(sample.value));
150            self.rows.push(Row { values: row });
151            return Ok(());
152        }
153        for sample in samples {
154            row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
155            row[1].value_data = Some(ValueData::F64Value(sample.value));
156            self.rows.push(Row {
157                values: row.clone(),
158            });
159        }
160
161        Ok(())
162    }
163
164    /// Converts [TableBuilder] to [RowInsertRequest] and clears buffered data.
165    pub(crate) fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest {
166        let mut rows = std::mem::take(&mut self.rows);
167        let schema = std::mem::take(&mut self.schema);
168        let col_num = schema.len();
169        for row in &mut rows {
170            if row.values.len() < col_num {
171                row.values.resize(col_num, Value { value_data: None });
172            }
173        }
174
175        RowInsertRequest {
176            table_name,
177            rows: Some(Rows { schema, rows }),
178        }
179    }
180}
181
182#[cfg(test)]
183mod tests {
184    use api::prom_store::remote::Sample;
185    use api::v1::value::ValueData;
186    use api::v1::Value;
187    use arrow::datatypes::ToByteSlice;
188    use bytes::Bytes;
189    use prost::DecodeError;
190
191    use crate::http::PromValidationMode;
192    use crate::prom_row_builder::TableBuilder;
193    use crate::proto::PromLabel;
194    #[test]
195    fn test_table_builder() {
196        let mut builder = TableBuilder::default();
197        let _ = builder.add_labels_and_samples(
198            &[
199                PromLabel {
200                    name: Bytes::from("tag0"),
201                    value: Bytes::from("v0"),
202                },
203                PromLabel {
204                    name: Bytes::from("tag1"),
205                    value: Bytes::from("v1"),
206                },
207            ],
208            &[Sample {
209                value: 0.0,
210                timestamp: 0,
211            }],
212            PromValidationMode::Strict,
213        );
214
215        let _ = builder.add_labels_and_samples(
216            &[
217                PromLabel {
218                    name: Bytes::from("tag0"),
219                    value: Bytes::from("v0"),
220                },
221                PromLabel {
222                    name: Bytes::from("tag2"),
223                    value: Bytes::from("v2"),
224                },
225            ],
226            &[Sample {
227                value: 0.1,
228                timestamp: 1,
229            }],
230            PromValidationMode::Strict,
231        );
232
233        let request = builder.as_row_insert_request("test".to_string());
234        let rows = request.rows.unwrap().rows;
235        assert_eq!(2, rows.len());
236
237        assert_eq!(
238            vec![
239                Value {
240                    value_data: Some(ValueData::TimestampMillisecondValue(0))
241                },
242                Value {
243                    value_data: Some(ValueData::F64Value(0.0))
244                },
245                Value {
246                    value_data: Some(ValueData::StringValue("v0".to_string()))
247                },
248                Value {
249                    value_data: Some(ValueData::StringValue("v1".to_string()))
250                },
251                Value { value_data: None },
252            ],
253            rows[0].values
254        );
255
256        assert_eq!(
257            vec![
258                Value {
259                    value_data: Some(ValueData::TimestampMillisecondValue(1))
260                },
261                Value {
262                    value_data: Some(ValueData::F64Value(0.1))
263                },
264                Value {
265                    value_data: Some(ValueData::StringValue("v0".to_string()))
266                },
267                Value { value_data: None },
268                Value {
269                    value_data: Some(ValueData::StringValue("v2".to_string()))
270                },
271            ],
272            rows[1].values
273        );
274
275        let invalid_utf8_bytes = &[0xFF, 0xFF, 0xFF];
276
277        let res = builder.add_labels_and_samples(
278            &[PromLabel {
279                name: Bytes::from("tag0"),
280                value: invalid_utf8_bytes.to_byte_slice().into(),
281            }],
282            &[Sample {
283                value: 0.1,
284                timestamp: 1,
285            }],
286            PromValidationMode::Strict,
287        );
288        assert_eq!(res, Err(DecodeError::new("invalid utf-8")));
289    }
290}