1use std::collections::hash_map::Entry;
16use std::string::ToString;
17
18use ahash::HashMap;
19use api::prom_store::remote::Sample;
20use api::v1::value::ValueData;
21use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
22use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
23use prost::DecodeError;
24
25use crate::http::PromValidationMode;
26use crate::proto::{decode_string, PromLabel};
27use crate::repeated_field::Clear;
28
29#[derive(Default, Debug)]
31pub(crate) struct TablesBuilder {
32 tables: HashMap<String, TableBuilder>,
33}
34
35impl Clear for TablesBuilder {
36 fn clear(&mut self) {
37 self.tables.clear();
38 }
39}
40
41impl TablesBuilder {
42 pub(crate) fn get_or_create_table_builder(
44 &mut self,
45 table_name: String,
46 label_num: usize,
47 row_num: usize,
48 ) -> &mut TableBuilder {
49 self.tables
50 .entry(table_name)
51 .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
52 }
53
54 pub(crate) fn as_insert_requests(&mut self) -> Vec<RowInsertRequest> {
56 self.tables
57 .drain()
58 .map(|(name, mut table)| table.as_row_insert_request(name))
59 .collect()
60 }
61}
62
63#[derive(Debug)]
65pub(crate) struct TableBuilder {
66 schema: Vec<ColumnSchema>,
68 rows: Vec<Row>,
70 col_indexes: HashMap<String, usize>,
72}
73
74impl Default for TableBuilder {
75 fn default() -> Self {
76 Self::with_capacity(2, 0)
77 }
78}
79
80impl TableBuilder {
81 pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
82 let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
83 col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0);
84 col_indexes.insert(GREPTIME_VALUE.to_string(), 1);
85
86 let mut schema = Vec::with_capacity(cols);
87 schema.push(ColumnSchema {
88 column_name: GREPTIME_TIMESTAMP.to_string(),
89 datatype: ColumnDataType::TimestampMillisecond as i32,
90 semantic_type: SemanticType::Timestamp as i32,
91 datatype_extension: None,
92 options: None,
93 });
94
95 schema.push(ColumnSchema {
96 column_name: GREPTIME_VALUE.to_string(),
97 datatype: ColumnDataType::Float64 as i32,
98 semantic_type: SemanticType::Field as i32,
99 datatype_extension: None,
100 options: None,
101 });
102
103 Self {
104 schema,
105 rows: Vec::with_capacity(rows),
106 col_indexes,
107 }
108 }
109
110 pub(crate) fn add_labels_and_samples(
112 &mut self,
113 labels: &[PromLabel],
114 samples: &[Sample],
115 prom_validation_mode: PromValidationMode,
116 ) -> Result<(), DecodeError> {
117 let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
118
119 for PromLabel { name, value } in labels {
120 let tag_name = decode_string(name, prom_validation_mode)?;
121 let tag_value = decode_string(value, prom_validation_mode)?;
122 let tag_value = Some(ValueData::StringValue(tag_value));
123 let tag_num = self.col_indexes.len();
124
125 match self.col_indexes.entry(tag_name) {
126 Entry::Occupied(e) => {
127 row[*e.get()].value_data = tag_value;
128 }
129 Entry::Vacant(e) => {
130 let column_name = e.key().clone();
131 e.insert(tag_num);
132 self.schema.push(ColumnSchema {
133 column_name,
134 datatype: ColumnDataType::String as i32,
135 semantic_type: SemanticType::Tag as i32,
136 datatype_extension: None,
137 options: None,
138 });
139 row.push(Value {
140 value_data: tag_value,
141 });
142 }
143 }
144 }
145
146 if samples.len() == 1 {
147 let sample = &samples[0];
148 row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
149 row[1].value_data = Some(ValueData::F64Value(sample.value));
150 self.rows.push(Row { values: row });
151 return Ok(());
152 }
153 for sample in samples {
154 row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
155 row[1].value_data = Some(ValueData::F64Value(sample.value));
156 self.rows.push(Row {
157 values: row.clone(),
158 });
159 }
160
161 Ok(())
162 }
163
164 pub(crate) fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest {
166 let mut rows = std::mem::take(&mut self.rows);
167 let schema = std::mem::take(&mut self.schema);
168 let col_num = schema.len();
169 for row in &mut rows {
170 if row.values.len() < col_num {
171 row.values.resize(col_num, Value { value_data: None });
172 }
173 }
174
175 RowInsertRequest {
176 table_name,
177 rows: Some(Rows { schema, rows }),
178 }
179 }
180}
181
182#[cfg(test)]
183mod tests {
184 use api::prom_store::remote::Sample;
185 use api::v1::value::ValueData;
186 use api::v1::Value;
187 use arrow::datatypes::ToByteSlice;
188 use bytes::Bytes;
189 use prost::DecodeError;
190
191 use crate::http::PromValidationMode;
192 use crate::prom_row_builder::TableBuilder;
193 use crate::proto::PromLabel;
194 #[test]
195 fn test_table_builder() {
196 let mut builder = TableBuilder::default();
197 let _ = builder.add_labels_and_samples(
198 &[
199 PromLabel {
200 name: Bytes::from("tag0"),
201 value: Bytes::from("v0"),
202 },
203 PromLabel {
204 name: Bytes::from("tag1"),
205 value: Bytes::from("v1"),
206 },
207 ],
208 &[Sample {
209 value: 0.0,
210 timestamp: 0,
211 }],
212 PromValidationMode::Strict,
213 );
214
215 let _ = builder.add_labels_and_samples(
216 &[
217 PromLabel {
218 name: Bytes::from("tag0"),
219 value: Bytes::from("v0"),
220 },
221 PromLabel {
222 name: Bytes::from("tag2"),
223 value: Bytes::from("v2"),
224 },
225 ],
226 &[Sample {
227 value: 0.1,
228 timestamp: 1,
229 }],
230 PromValidationMode::Strict,
231 );
232
233 let request = builder.as_row_insert_request("test".to_string());
234 let rows = request.rows.unwrap().rows;
235 assert_eq!(2, rows.len());
236
237 assert_eq!(
238 vec![
239 Value {
240 value_data: Some(ValueData::TimestampMillisecondValue(0))
241 },
242 Value {
243 value_data: Some(ValueData::F64Value(0.0))
244 },
245 Value {
246 value_data: Some(ValueData::StringValue("v0".to_string()))
247 },
248 Value {
249 value_data: Some(ValueData::StringValue("v1".to_string()))
250 },
251 Value { value_data: None },
252 ],
253 rows[0].values
254 );
255
256 assert_eq!(
257 vec![
258 Value {
259 value_data: Some(ValueData::TimestampMillisecondValue(1))
260 },
261 Value {
262 value_data: Some(ValueData::F64Value(0.1))
263 },
264 Value {
265 value_data: Some(ValueData::StringValue("v0".to_string()))
266 },
267 Value { value_data: None },
268 Value {
269 value_data: Some(ValueData::StringValue("v2".to_string()))
270 },
271 ],
272 rows[1].values
273 );
274
275 let invalid_utf8_bytes = &[0xFF, 0xFF, 0xFF];
276
277 let res = builder.add_labels_and_samples(
278 &[PromLabel {
279 name: Bytes::from("tag0"),
280 value: invalid_utf8_bytes.to_byte_slice().into(),
281 }],
282 &[Sample {
283 value: 0.1,
284 timestamp: 1,
285 }],
286 PromValidationMode::Strict,
287 );
288 assert_eq!(res, Err(DecodeError::new("invalid utf-8")));
289 }
290}