servers/
prom_row_builder.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::string::ToString;
18
19use api::prom_store::remote::Sample;
20use api::v1::helper::{field_column_schema, tag_column_schema, time_index_column_schema};
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
23use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
24use pipeline::{ContextOpt, ContextReq};
25use prost::DecodeError;
26
27use crate::http::PromValidationMode;
28use crate::proto::PromLabel;
29use crate::repeated_field::Clear;
30
31// Prometheus remote write context
32#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
33pub struct PromCtx {
34    pub schema: Option<String>,
35    pub physical_table: Option<String>,
36}
37
38/// [TablesBuilder] serves as an intermediate container to build [RowInsertRequests].
39#[derive(Default, Debug)]
40pub struct TablesBuilder {
41    // schema -> table -> table_builder
42    pub tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
43}
44
45impl Clear for TablesBuilder {
46    fn clear(&mut self) {
47        self.tables.clear();
48    }
49}
50
51impl TablesBuilder {
52    /// Gets table builder with given table name. Creates an empty [TableBuilder] if not exist.
53    pub(crate) fn get_or_create_table_builder(
54        &mut self,
55        prom_ctx: PromCtx,
56        table_name: String,
57        label_num: usize,
58        row_num: usize,
59    ) -> &mut TableBuilder {
60        self.tables
61            .entry(prom_ctx)
62            .or_default()
63            .entry(table_name)
64            .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
65    }
66
67    /// Converts [TablesBuilder] to [RowInsertRequests] and row numbers and clears inner states.
68    pub(crate) fn as_insert_requests(&mut self) -> ContextReq {
69        self.tables
70            .drain()
71            .map(|(prom, mut tables)| {
72                // create context opt
73                let mut opt = ContextOpt::default();
74                if let Some(physical_table) = prom.physical_table {
75                    opt.set_physical_table(physical_table);
76                }
77                if let Some(schema) = prom.schema {
78                    opt.set_schema(schema);
79                }
80
81                // create and set context req
82                let mut ctx_req = ContextReq::default();
83                let reqs = tables
84                    .drain()
85                    .map(|(table_name, mut table)| table.as_row_insert_request(table_name));
86                ctx_req.add_rows(opt, reqs);
87
88                ctx_req
89            })
90            .fold(ContextReq::default(), |mut req, reqs| {
91                req.merge(reqs);
92                req
93            })
94    }
95}
96
97/// Builder for one table.
98#[derive(Debug)]
99pub struct TableBuilder {
100    /// Column schemas.
101    schema: Vec<ColumnSchema>,
102    /// Rows written.
103    rows: Vec<Row>,
104    /// Indices of columns inside `schema`.
105    col_indexes: HashMap<String, usize>,
106}
107
108impl Default for TableBuilder {
109    fn default() -> Self {
110        Self::with_capacity(2, 0)
111    }
112}
113
114impl TableBuilder {
115    pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
116        let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
117        col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0);
118        col_indexes.insert(GREPTIME_VALUE.to_string(), 1);
119
120        let mut schema = Vec::with_capacity(cols);
121        schema.push(time_index_column_schema(
122            GREPTIME_TIMESTAMP,
123            ColumnDataType::TimestampMillisecond,
124        ));
125        schema.push(field_column_schema(GREPTIME_VALUE, ColumnDataType::Float64));
126
127        Self {
128            schema,
129            rows: Vec::with_capacity(rows),
130            col_indexes,
131        }
132    }
133
134    /// Adds a set of labels and samples to table builder.
135    pub(crate) fn add_labels_and_samples(
136        &mut self,
137        labels: &[PromLabel],
138        samples: &[Sample],
139        prom_validation_mode: PromValidationMode,
140    ) -> Result<(), DecodeError> {
141        let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
142
143        for PromLabel { name, value } in labels {
144            let tag_name = prom_validation_mode.decode_string(name)?;
145            let tag_value = prom_validation_mode.decode_string(value)?;
146            let tag_value = Some(ValueData::StringValue(tag_value));
147            let tag_num = self.col_indexes.len();
148
149            match self.col_indexes.entry(tag_name) {
150                Entry::Occupied(e) => {
151                    row[*e.get()].value_data = tag_value;
152                }
153                Entry::Vacant(e) => {
154                    self.schema
155                        .push(tag_column_schema(e.key(), ColumnDataType::String));
156                    e.insert(tag_num);
157                    row.push(Value {
158                        value_data: tag_value,
159                    });
160                }
161            }
162        }
163
164        if samples.len() == 1 {
165            let sample = &samples[0];
166            row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
167            row[1].value_data = Some(ValueData::F64Value(sample.value));
168            self.rows.push(Row { values: row });
169            return Ok(());
170        }
171        for sample in samples {
172            row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
173            row[1].value_data = Some(ValueData::F64Value(sample.value));
174            self.rows.push(Row {
175                values: row.clone(),
176            });
177        }
178
179        Ok(())
180    }
181
182    /// Converts [TableBuilder] to [RowInsertRequest] and clears buffered data.
183    pub fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest {
184        let mut rows = std::mem::take(&mut self.rows);
185        let schema = std::mem::take(&mut self.schema);
186        let col_num = schema.len();
187        for row in &mut rows {
188            if row.values.len() < col_num {
189                row.values.resize(col_num, Value { value_data: None });
190            }
191        }
192
193        RowInsertRequest {
194            table_name,
195            rows: Some(Rows { schema, rows }),
196        }
197    }
198
199    pub fn tags(&self) -> impl Iterator<Item = &String> {
200        self.schema
201            .iter()
202            .filter(|v| v.semantic_type == SemanticType::Tag as i32)
203            .map(|c| &c.column_name)
204    }
205}
206
207#[cfg(test)]
208mod tests {
209    use api::prom_store::remote::Sample;
210    use api::v1::value::ValueData;
211    use api::v1::Value;
212    use arrow::datatypes::ToByteSlice;
213    use bytes::Bytes;
214    use prost::DecodeError;
215
216    use crate::http::PromValidationMode;
217    use crate::prom_row_builder::TableBuilder;
218    use crate::proto::PromLabel;
219    #[test]
220    fn test_table_builder() {
221        let mut builder = TableBuilder::default();
222        let _ = builder.add_labels_and_samples(
223            &[
224                PromLabel {
225                    name: Bytes::from("tag0"),
226                    value: Bytes::from("v0"),
227                },
228                PromLabel {
229                    name: Bytes::from("tag1"),
230                    value: Bytes::from("v1"),
231                },
232            ],
233            &[Sample {
234                value: 0.0,
235                timestamp: 0,
236            }],
237            PromValidationMode::Strict,
238        );
239
240        let _ = builder.add_labels_and_samples(
241            &[
242                PromLabel {
243                    name: Bytes::from("tag0"),
244                    value: Bytes::from("v0"),
245                },
246                PromLabel {
247                    name: Bytes::from("tag2"),
248                    value: Bytes::from("v2"),
249                },
250            ],
251            &[Sample {
252                value: 0.1,
253                timestamp: 1,
254            }],
255            PromValidationMode::Strict,
256        );
257
258        let request = builder.as_row_insert_request("test".to_string());
259        let rows = request.rows.unwrap().rows;
260        assert_eq!(2, rows.len());
261
262        assert_eq!(
263            vec![
264                Value {
265                    value_data: Some(ValueData::TimestampMillisecondValue(0))
266                },
267                Value {
268                    value_data: Some(ValueData::F64Value(0.0))
269                },
270                Value {
271                    value_data: Some(ValueData::StringValue("v0".to_string()))
272                },
273                Value {
274                    value_data: Some(ValueData::StringValue("v1".to_string()))
275                },
276                Value { value_data: None },
277            ],
278            rows[0].values
279        );
280
281        assert_eq!(
282            vec![
283                Value {
284                    value_data: Some(ValueData::TimestampMillisecondValue(1))
285                },
286                Value {
287                    value_data: Some(ValueData::F64Value(0.1))
288                },
289                Value {
290                    value_data: Some(ValueData::StringValue("v0".to_string()))
291                },
292                Value { value_data: None },
293                Value {
294                    value_data: Some(ValueData::StringValue("v2".to_string()))
295                },
296            ],
297            rows[1].values
298        );
299
300        let invalid_utf8_bytes = &[0xFF, 0xFF, 0xFF];
301
302        let res = builder.add_labels_and_samples(
303            &[PromLabel {
304                name: Bytes::from("tag0"),
305                value: invalid_utf8_bytes.to_byte_slice().into(),
306            }],
307            &[Sample {
308                value: 0.1,
309                timestamp: 1,
310            }],
311            PromValidationMode::Strict,
312        );
313        assert_eq!(res, Err(DecodeError::new("invalid utf-8")));
314    }
315}