servers/
prom_row_builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::string::ToString;
17
18use ahash::HashMap;
19use api::prom_store::remote::Sample;
20use api::v1::value::ValueData;
21use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
22use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
23use pipeline::{ContextOpt, ContextReq};
24use prost::DecodeError;
25
26use crate::http::PromValidationMode;
27use crate::proto::PromLabel;
28use crate::repeated_field::Clear;
29
30// Prometheus remote write context
31#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
32pub struct PromCtx {
33    pub schema: Option<String>,
34    pub physical_table: Option<String>,
35}
36
37/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests].
38#[derive(Default, Debug)]
39pub(crate) struct TablesBuilder {
40    // schema -> table -> table_builder
41    tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
42}
43
44impl Clear for TablesBuilder {
45    fn clear(&mut self) {
46        self.tables.clear();
47    }
48}
49
50impl TablesBuilder {
51    /// Gets table builder with given table name. Creates an empty [TableBuilder] if not exist.
52    pub(crate) fn get_or_create_table_builder(
53        &mut self,
54        prom_ctx: PromCtx,
55        table_name: String,
56        label_num: usize,
57        row_num: usize,
58    ) -> &mut TableBuilder {
59        self.tables
60            .entry(prom_ctx)
61            .or_default()
62            .entry(table_name)
63            .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
64    }
65
66    /// Converts [TablesBuilder] to [RowInsertRequests] and row numbers and clears inner states.
67    pub(crate) fn as_insert_requests(&mut self) -> ContextReq {
68        self.tables
69            .drain()
70            .map(|(prom, mut tables)| {
71                // create context opt
72                let mut opt = ContextOpt::default();
73                if let Some(physical_table) = prom.physical_table {
74                    opt.set_physical_table(physical_table);
75                }
76                if let Some(schema) = prom.schema {
77                    opt.set_schema(schema);
78                }
79
80                // create and set context req
81                let mut ctx_req = ContextReq::default();
82                let reqs = tables
83                    .drain()
84                    .map(|(table_name, mut table)| table.as_row_insert_request(table_name));
85                ctx_req.add_rows(opt, reqs);
86
87                ctx_req
88            })
89            .fold(ContextReq::default(), |mut req, reqs| {
90                req.merge(reqs);
91                req
92            })
93    }
94}
95
96/// Builder for one table.
97#[derive(Debug)]
98pub(crate) struct TableBuilder {
99    /// Column schemas.
100    schema: Vec<ColumnSchema>,
101    /// Rows written.
102    rows: Vec<Row>,
103    /// Indices of columns inside `schema`.
104    col_indexes: HashMap<String, usize>,
105}
106
107impl Default for TableBuilder {
108    fn default() -> Self {
109        Self::with_capacity(2, 0)
110    }
111}
112
113impl TableBuilder {
114    pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
115        let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
116        col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0);
117        col_indexes.insert(GREPTIME_VALUE.to_string(), 1);
118
119        let mut schema = Vec::with_capacity(cols);
120        schema.push(ColumnSchema {
121            column_name: GREPTIME_TIMESTAMP.to_string(),
122            datatype: ColumnDataType::TimestampMillisecond as i32,
123            semantic_type: SemanticType::Timestamp as i32,
124            datatype_extension: None,
125            options: None,
126        });
127
128        schema.push(ColumnSchema {
129            column_name: GREPTIME_VALUE.to_string(),
130            datatype: ColumnDataType::Float64 as i32,
131            semantic_type: SemanticType::Field as i32,
132            datatype_extension: None,
133            options: None,
134        });
135
136        Self {
137            schema,
138            rows: Vec::with_capacity(rows),
139            col_indexes,
140        }
141    }
142
143    /// Adds a set of labels and samples to table builder.
144    pub(crate) fn add_labels_and_samples(
145        &mut self,
146        labels: &[PromLabel],
147        samples: &[Sample],
148        prom_validation_mode: PromValidationMode,
149    ) -> Result<(), DecodeError> {
150        let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
151
152        for PromLabel { name, value } in labels {
153            let tag_name = prom_validation_mode.decode_string(name)?;
154            let tag_value = prom_validation_mode.decode_string(value)?;
155            let tag_value = Some(ValueData::StringValue(tag_value));
156            let tag_num = self.col_indexes.len();
157
158            match self.col_indexes.entry(tag_name) {
159                Entry::Occupied(e) => {
160                    row[*e.get()].value_data = tag_value;
161                }
162                Entry::Vacant(e) => {
163                    let column_name = e.key().clone();
164                    e.insert(tag_num);
165                    self.schema.push(ColumnSchema {
166                        column_name,
167                        datatype: ColumnDataType::String as i32,
168                        semantic_type: SemanticType::Tag as i32,
169                        datatype_extension: None,
170                        options: None,
171                    });
172                    row.push(Value {
173                        value_data: tag_value,
174                    });
175                }
176            }
177        }
178
179        if samples.len() == 1 {
180            let sample = &samples[0];
181            row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
182            row[1].value_data = Some(ValueData::F64Value(sample.value));
183            self.rows.push(Row { values: row });
184            return Ok(());
185        }
186        for sample in samples {
187            row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
188            row[1].value_data = Some(ValueData::F64Value(sample.value));
189            self.rows.push(Row {
190                values: row.clone(),
191            });
192        }
193
194        Ok(())
195    }
196
197    /// Converts [TableBuilder] to [RowInsertRequest] and clears buffered data.
198    pub(crate) fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest {
199        let mut rows = std::mem::take(&mut self.rows);
200        let schema = std::mem::take(&mut self.schema);
201        let col_num = schema.len();
202        for row in &mut rows {
203            if row.values.len() < col_num {
204                row.values.resize(col_num, Value { value_data: None });
205            }
206        }
207
208        RowInsertRequest {
209            table_name,
210            rows: Some(Rows { schema, rows }),
211        }
212    }
213}
214
215#[cfg(test)]
216mod tests {
217    use api::prom_store::remote::Sample;
218    use api::v1::value::ValueData;
219    use api::v1::Value;
220    use arrow::datatypes::ToByteSlice;
221    use bytes::Bytes;
222    use prost::DecodeError;
223
224    use crate::http::PromValidationMode;
225    use crate::prom_row_builder::TableBuilder;
226    use crate::proto::PromLabel;
227    #[test]
228    fn test_table_builder() {
229        let mut builder = TableBuilder::default();
230        let _ = builder.add_labels_and_samples(
231            &[
232                PromLabel {
233                    name: Bytes::from("tag0"),
234                    value: Bytes::from("v0"),
235                },
236                PromLabel {
237                    name: Bytes::from("tag1"),
238                    value: Bytes::from("v1"),
239                },
240            ],
241            &[Sample {
242                value: 0.0,
243                timestamp: 0,
244            }],
245            PromValidationMode::Strict,
246        );
247
248        let _ = builder.add_labels_and_samples(
249            &[
250                PromLabel {
251                    name: Bytes::from("tag0"),
252                    value: Bytes::from("v0"),
253                },
254                PromLabel {
255                    name: Bytes::from("tag2"),
256                    value: Bytes::from("v2"),
257                },
258            ],
259            &[Sample {
260                value: 0.1,
261                timestamp: 1,
262            }],
263            PromValidationMode::Strict,
264        );
265
266        let request = builder.as_row_insert_request("test".to_string());
267        let rows = request.rows.unwrap().rows;
268        assert_eq!(2, rows.len());
269
270        assert_eq!(
271            vec![
272                Value {
273                    value_data: Some(ValueData::TimestampMillisecondValue(0))
274                },
275                Value {
276                    value_data: Some(ValueData::F64Value(0.0))
277                },
278                Value {
279                    value_data: Some(ValueData::StringValue("v0".to_string()))
280                },
281                Value {
282                    value_data: Some(ValueData::StringValue("v1".to_string()))
283                },
284                Value { value_data: None },
285            ],
286            rows[0].values
287        );
288
289        assert_eq!(
290            vec![
291                Value {
292                    value_data: Some(ValueData::TimestampMillisecondValue(1))
293                },
294                Value {
295                    value_data: Some(ValueData::F64Value(0.1))
296                },
297                Value {
298                    value_data: Some(ValueData::StringValue("v0".to_string()))
299                },
300                Value { value_data: None },
301                Value {
302                    value_data: Some(ValueData::StringValue("v2".to_string()))
303                },
304            ],
305            rows[1].values
306        );
307
308        let invalid_utf8_bytes = &[0xFF, 0xFF, 0xFF];
309
310        let res = builder.add_labels_and_samples(
311            &[PromLabel {
312                name: Bytes::from("tag0"),
313                value: invalid_utf8_bytes.to_byte_slice().into(),
314            }],
315            &[Sample {
316                value: 0.1,
317                timestamp: 1,
318            }],
319            PromValidationMode::Strict,
320        );
321        assert_eq!(res, Err(DecodeError::new("invalid utf-8")));
322    }
323}