servers/
prom_row_builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::string::ToString;
18
19use api::prom_store::remote::Sample;
20use api::v1::helper::{field_column_schema, tag_column_schema, time_index_column_schema};
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
23use common_query::prelude::{greptime_timestamp, greptime_value};
24use pipeline::{ContextOpt, ContextReq};
25use prost::DecodeError;
26
27use crate::http::PromValidationMode;
28use crate::proto::PromLabel;
29use crate::repeated_field::Clear;
30
31// Prometheus remote write context
32#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
33pub struct PromCtx {
34    pub schema: Option<String>,
35    pub physical_table: Option<String>,
36}
37
38/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests].
39#[derive(Default, Debug)]
40pub struct TablesBuilder {
41    // schema -> table -> table_builder
42    pub tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
43}
44
45impl Clear for TablesBuilder {
46    fn clear(&mut self) {
47        self.tables.clear();
48    }
49}
50
51impl TablesBuilder {
52    /// Gets table builder with given table name. Creates an empty [TableBuilder] if not exist.
53    pub(crate) fn get_or_create_table_builder(
54        &mut self,
55        prom_ctx: PromCtx,
56        table_name: String,
57        label_num: usize,
58        row_num: usize,
59    ) -> &mut TableBuilder {
60        self.tables
61            .entry(prom_ctx)
62            .or_default()
63            .entry(table_name)
64            .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
65    }
66
67    /// Converts [TablesBuilder] to [RowInsertRequests] and row numbers and clears inner states.
68    pub(crate) fn as_insert_requests(&mut self) -> ContextReq {
69        self.tables
70            .drain()
71            .map(|(prom, mut tables)| {
72                // create context opt
73                let mut opt = ContextOpt::default();
74                if let Some(physical_table) = prom.physical_table {
75                    opt.set_physical_table(physical_table);
76                }
77                if let Some(schema) = prom.schema {
78                    opt.set_schema(schema);
79                }
80
81                // create and set context req
82                let mut ctx_req = ContextReq::default();
83                let reqs = tables
84                    .drain()
85                    .map(|(table_name, mut table)| table.as_row_insert_request(table_name));
86                ctx_req.add_rows(opt, reqs);
87
88                ctx_req
89            })
90            .fold(ContextReq::default(), |mut req, reqs| {
91                req.merge(reqs);
92                req
93            })
94    }
95}
96
97/// Builder for one table.
98#[derive(Debug)]
99pub struct TableBuilder {
100    /// Column schemas.
101    schema: Vec<ColumnSchema>,
102    /// Rows written.
103    rows: Vec<Row>,
104    /// Indices of columns inside `schema`.
105    col_indexes: HashMap<String, usize>,
106}
107
108impl Default for TableBuilder {
109    fn default() -> Self {
110        Self::with_capacity(2, 0)
111    }
112}
113
114impl TableBuilder {
115    pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
116        let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
117        col_indexes.insert(greptime_timestamp().to_string(), 0);
118        col_indexes.insert(greptime_value().to_string(), 1);
119
120        let mut schema = Vec::with_capacity(cols);
121        schema.push(time_index_column_schema(
122            greptime_timestamp(),
123            ColumnDataType::TimestampMillisecond,
124        ));
125        schema.push(field_column_schema(
126            greptime_value(),
127            ColumnDataType::Float64,
128        ));
129
130        Self {
131            schema,
132            rows: Vec::with_capacity(rows),
133            col_indexes,
134        }
135    }
136
137    /// Adds a set of labels and samples to table builder.
138    pub(crate) fn add_labels_and_samples(
139        &mut self,
140        labels: &[PromLabel],
141        samples: &[Sample],
142        prom_validation_mode: PromValidationMode,
143    ) -> Result<(), DecodeError> {
144        let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
145
146        for PromLabel { name, value } in labels {
147            let tag_name = prom_validation_mode.decode_string(name)?;
148            let tag_value = prom_validation_mode.decode_string(value)?;
149            let tag_value = Some(ValueData::StringValue(tag_value));
150            let tag_num = self.col_indexes.len();
151
152            match self.col_indexes.entry(tag_name) {
153                Entry::Occupied(e) => {
154                    row[*e.get()].value_data = tag_value;
155                }
156                Entry::Vacant(e) => {
157                    self.schema
158                        .push(tag_column_schema(e.key(), ColumnDataType::String));
159                    e.insert(tag_num);
160                    row.push(Value {
161                        value_data: tag_value,
162                    });
163                }
164            }
165        }
166
167        if samples.len() == 1 {
168            let sample = &samples[0];
169            row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
170            row[1].value_data = Some(ValueData::F64Value(sample.value));
171            self.rows.push(Row { values: row });
172            return Ok(());
173        }
174        for sample in samples {
175            row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
176            row[1].value_data = Some(ValueData::F64Value(sample.value));
177            self.rows.push(Row {
178                values: row.clone(),
179            });
180        }
181
182        Ok(())
183    }
184
185    /// Converts [TableBuilder] to [RowInsertRequest] and clears buffered data.
186    pub fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest {
187        let mut rows = std::mem::take(&mut self.rows);
188        let schema = std::mem::take(&mut self.schema);
189        let col_num = schema.len();
190        for row in &mut rows {
191            if row.values.len() < col_num {
192                row.values.resize(col_num, Value { value_data: None });
193            }
194        }
195
196        RowInsertRequest {
197            table_name,
198            rows: Some(Rows { schema, rows }),
199        }
200    }
201
202    pub fn tags(&self) -> impl Iterator<Item = &String> {
203        self.schema
204            .iter()
205            .filter(|v| v.semantic_type == SemanticType::Tag as i32)
206            .map(|c| &c.column_name)
207    }
208}
209
210#[cfg(test)]
211mod tests {
212    use api::prom_store::remote::Sample;
213    use api::v1::Value;
214    use api::v1::value::ValueData;
215    use arrow::datatypes::ToByteSlice;
216    use bytes::Bytes;
217    use prost::DecodeError;
218
219    use crate::http::PromValidationMode;
220    use crate::prom_row_builder::TableBuilder;
221    use crate::proto::PromLabel;
222    #[test]
223    fn test_table_builder() {
224        let mut builder = TableBuilder::default();
225        let _ = builder.add_labels_and_samples(
226            &[
227                PromLabel {
228                    name: Bytes::from("tag0"),
229                    value: Bytes::from("v0"),
230                },
231                PromLabel {
232                    name: Bytes::from("tag1"),
233                    value: Bytes::from("v1"),
234                },
235            ],
236            &[Sample {
237                value: 0.0,
238                timestamp: 0,
239            }],
240            PromValidationMode::Strict,
241        );
242
243        let _ = builder.add_labels_and_samples(
244            &[
245                PromLabel {
246                    name: Bytes::from("tag0"),
247                    value: Bytes::from("v0"),
248                },
249                PromLabel {
250                    name: Bytes::from("tag2"),
251                    value: Bytes::from("v2"),
252                },
253            ],
254            &[Sample {
255                value: 0.1,
256                timestamp: 1,
257            }],
258            PromValidationMode::Strict,
259        );
260
261        let request = builder.as_row_insert_request("test".to_string());
262        let rows = request.rows.unwrap().rows;
263        assert_eq!(2, rows.len());
264
265        assert_eq!(
266            vec![
267                Value {
268                    value_data: Some(ValueData::TimestampMillisecondValue(0))
269                },
270                Value {
271                    value_data: Some(ValueData::F64Value(0.0))
272                },
273                Value {
274                    value_data: Some(ValueData::StringValue("v0".to_string()))
275                },
276                Value {
277                    value_data: Some(ValueData::StringValue("v1".to_string()))
278                },
279                Value { value_data: None },
280            ],
281            rows[0].values
282        );
283
284        assert_eq!(
285            vec![
286                Value {
287                    value_data: Some(ValueData::TimestampMillisecondValue(1))
288                },
289                Value {
290                    value_data: Some(ValueData::F64Value(0.1))
291                },
292                Value {
293                    value_data: Some(ValueData::StringValue("v0".to_string()))
294                },
295                Value { value_data: None },
296                Value {
297                    value_data: Some(ValueData::StringValue("v2".to_string()))
298                },
299            ],
300            rows[1].values
301        );
302
303        let invalid_utf8_bytes = &[0xFF, 0xFF, 0xFF];
304
305        let res = builder.add_labels_and_samples(
306            &[PromLabel {
307                name: Bytes::from("tag0"),
308                value: invalid_utf8_bytes.to_byte_slice().into(),
309            }],
310            &[Sample {
311                value: 0.1,
312                timestamp: 1,
313            }],
314            PromValidationMode::Strict,
315        );
316        assert_eq!(res, Err(DecodeError::new("invalid utf-8")));
317    }
318}