1use std::collections::hash_map::Entry;
16use std::string::ToString;
17
18use ahash::HashMap;
19use api::prom_store::remote::Sample;
20use api::v1::value::ValueData;
21use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
22use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
23use pipeline::{ContextOpt, ContextReq};
24use prost::DecodeError;
25
26use crate::http::PromValidationMode;
27use crate::proto::PromLabel;
28use crate::repeated_field::Clear;
29
30#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
32pub struct PromCtx {
33 pub schema: Option<String>,
34 pub physical_table: Option<String>,
35}
36
37#[derive(Default, Debug)]
39pub(crate) struct TablesBuilder {
40 tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
42}
43
44impl Clear for TablesBuilder {
45 fn clear(&mut self) {
46 self.tables.clear();
47 }
48}
49
50impl TablesBuilder {
51 pub(crate) fn get_or_create_table_builder(
53 &mut self,
54 prom_ctx: PromCtx,
55 table_name: String,
56 label_num: usize,
57 row_num: usize,
58 ) -> &mut TableBuilder {
59 self.tables
60 .entry(prom_ctx)
61 .or_default()
62 .entry(table_name)
63 .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
64 }
65
66 pub(crate) fn as_insert_requests(&mut self) -> ContextReq {
68 self.tables
69 .drain()
70 .map(|(prom, mut tables)| {
71 let mut opt = ContextOpt::default();
73 if let Some(physical_table) = prom.physical_table {
74 opt.set_physical_table(physical_table);
75 }
76 if let Some(schema) = prom.schema {
77 opt.set_schema(schema);
78 }
79
80 let mut ctx_req = ContextReq::default();
82 let reqs = tables
83 .drain()
84 .map(|(table_name, mut table)| table.as_row_insert_request(table_name));
85 ctx_req.add_rows(opt, reqs);
86
87 ctx_req
88 })
89 .fold(ContextReq::default(), |mut req, reqs| {
90 req.merge(reqs);
91 req
92 })
93 }
94}
95
96#[derive(Debug)]
98pub(crate) struct TableBuilder {
99 schema: Vec<ColumnSchema>,
101 rows: Vec<Row>,
103 col_indexes: HashMap<String, usize>,
105}
106
107impl Default for TableBuilder {
108 fn default() -> Self {
109 Self::with_capacity(2, 0)
110 }
111}
112
113impl TableBuilder {
114 pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
115 let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
116 col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0);
117 col_indexes.insert(GREPTIME_VALUE.to_string(), 1);
118
119 let mut schema = Vec::with_capacity(cols);
120 schema.push(ColumnSchema {
121 column_name: GREPTIME_TIMESTAMP.to_string(),
122 datatype: ColumnDataType::TimestampMillisecond as i32,
123 semantic_type: SemanticType::Timestamp as i32,
124 datatype_extension: None,
125 options: None,
126 });
127
128 schema.push(ColumnSchema {
129 column_name: GREPTIME_VALUE.to_string(),
130 datatype: ColumnDataType::Float64 as i32,
131 semantic_type: SemanticType::Field as i32,
132 datatype_extension: None,
133 options: None,
134 });
135
136 Self {
137 schema,
138 rows: Vec::with_capacity(rows),
139 col_indexes,
140 }
141 }
142
143 pub(crate) fn add_labels_and_samples(
145 &mut self,
146 labels: &[PromLabel],
147 samples: &[Sample],
148 prom_validation_mode: PromValidationMode,
149 ) -> Result<(), DecodeError> {
150 let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
151
152 for PromLabel { name, value } in labels {
153 let tag_name = prom_validation_mode.decode_string(name)?;
154 let tag_value = prom_validation_mode.decode_string(value)?;
155 let tag_value = Some(ValueData::StringValue(tag_value));
156 let tag_num = self.col_indexes.len();
157
158 match self.col_indexes.entry(tag_name) {
159 Entry::Occupied(e) => {
160 row[*e.get()].value_data = tag_value;
161 }
162 Entry::Vacant(e) => {
163 let column_name = e.key().clone();
164 e.insert(tag_num);
165 self.schema.push(ColumnSchema {
166 column_name,
167 datatype: ColumnDataType::String as i32,
168 semantic_type: SemanticType::Tag as i32,
169 datatype_extension: None,
170 options: None,
171 });
172 row.push(Value {
173 value_data: tag_value,
174 });
175 }
176 }
177 }
178
179 if samples.len() == 1 {
180 let sample = &samples[0];
181 row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
182 row[1].value_data = Some(ValueData::F64Value(sample.value));
183 self.rows.push(Row { values: row });
184 return Ok(());
185 }
186 for sample in samples {
187 row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
188 row[1].value_data = Some(ValueData::F64Value(sample.value));
189 self.rows.push(Row {
190 values: row.clone(),
191 });
192 }
193
194 Ok(())
195 }
196
197 pub(crate) fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest {
199 let mut rows = std::mem::take(&mut self.rows);
200 let schema = std::mem::take(&mut self.schema);
201 let col_num = schema.len();
202 for row in &mut rows {
203 if row.values.len() < col_num {
204 row.values.resize(col_num, Value { value_data: None });
205 }
206 }
207
208 RowInsertRequest {
209 table_name,
210 rows: Some(Rows { schema, rows }),
211 }
212 }
213}
214
215#[cfg(test)]
216mod tests {
217 use api::prom_store::remote::Sample;
218 use api::v1::value::ValueData;
219 use api::v1::Value;
220 use arrow::datatypes::ToByteSlice;
221 use bytes::Bytes;
222 use prost::DecodeError;
223
224 use crate::http::PromValidationMode;
225 use crate::prom_row_builder::TableBuilder;
226 use crate::proto::PromLabel;
227 #[test]
228 fn test_table_builder() {
229 let mut builder = TableBuilder::default();
230 let _ = builder.add_labels_and_samples(
231 &[
232 PromLabel {
233 name: Bytes::from("tag0"),
234 value: Bytes::from("v0"),
235 },
236 PromLabel {
237 name: Bytes::from("tag1"),
238 value: Bytes::from("v1"),
239 },
240 ],
241 &[Sample {
242 value: 0.0,
243 timestamp: 0,
244 }],
245 PromValidationMode::Strict,
246 );
247
248 let _ = builder.add_labels_and_samples(
249 &[
250 PromLabel {
251 name: Bytes::from("tag0"),
252 value: Bytes::from("v0"),
253 },
254 PromLabel {
255 name: Bytes::from("tag2"),
256 value: Bytes::from("v2"),
257 },
258 ],
259 &[Sample {
260 value: 0.1,
261 timestamp: 1,
262 }],
263 PromValidationMode::Strict,
264 );
265
266 let request = builder.as_row_insert_request("test".to_string());
267 let rows = request.rows.unwrap().rows;
268 assert_eq!(2, rows.len());
269
270 assert_eq!(
271 vec![
272 Value {
273 value_data: Some(ValueData::TimestampMillisecondValue(0))
274 },
275 Value {
276 value_data: Some(ValueData::F64Value(0.0))
277 },
278 Value {
279 value_data: Some(ValueData::StringValue("v0".to_string()))
280 },
281 Value {
282 value_data: Some(ValueData::StringValue("v1".to_string()))
283 },
284 Value { value_data: None },
285 ],
286 rows[0].values
287 );
288
289 assert_eq!(
290 vec![
291 Value {
292 value_data: Some(ValueData::TimestampMillisecondValue(1))
293 },
294 Value {
295 value_data: Some(ValueData::F64Value(0.1))
296 },
297 Value {
298 value_data: Some(ValueData::StringValue("v0".to_string()))
299 },
300 Value { value_data: None },
301 Value {
302 value_data: Some(ValueData::StringValue("v2".to_string()))
303 },
304 ],
305 rows[1].values
306 );
307
308 let invalid_utf8_bytes = &[0xFF, 0xFF, 0xFF];
309
310 let res = builder.add_labels_and_samples(
311 &[PromLabel {
312 name: Bytes::from("tag0"),
313 value: invalid_utf8_bytes.to_byte_slice().into(),
314 }],
315 &[Sample {
316 value: 0.1,
317 timestamp: 1,
318 }],
319 PromValidationMode::Strict,
320 );
321 assert_eq!(res, Err(DecodeError::new("invalid utf-8")));
322 }
323}