1use std::collections::hash_map::Entry;
16use std::collections::HashMap;
17use std::string::ToString;
18
19use api::prom_store::remote::Sample;
20use api::v1::helper::{field_column_schema, tag_column_schema, time_index_column_schema};
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
23use common_query::prelude::{GREPTIME_TIMESTAMP, GREPTIME_VALUE};
24use pipeline::{ContextOpt, ContextReq};
25use prost::DecodeError;
26
27use crate::http::PromValidationMode;
28use crate::proto::PromLabel;
29use crate::repeated_field::Clear;
30
31#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
33pub struct PromCtx {
34 pub schema: Option<String>,
35 pub physical_table: Option<String>,
36}
37
38#[derive(Default, Debug)]
40pub struct TablesBuilder {
41 pub tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
43}
44
45impl Clear for TablesBuilder {
46 fn clear(&mut self) {
47 self.tables.clear();
48 }
49}
50
51impl TablesBuilder {
52 pub(crate) fn get_or_create_table_builder(
54 &mut self,
55 prom_ctx: PromCtx,
56 table_name: String,
57 label_num: usize,
58 row_num: usize,
59 ) -> &mut TableBuilder {
60 self.tables
61 .entry(prom_ctx)
62 .or_default()
63 .entry(table_name)
64 .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
65 }
66
67 pub(crate) fn as_insert_requests(&mut self) -> ContextReq {
69 self.tables
70 .drain()
71 .map(|(prom, mut tables)| {
72 let mut opt = ContextOpt::default();
74 if let Some(physical_table) = prom.physical_table {
75 opt.set_physical_table(physical_table);
76 }
77 if let Some(schema) = prom.schema {
78 opt.set_schema(schema);
79 }
80
81 let mut ctx_req = ContextReq::default();
83 let reqs = tables
84 .drain()
85 .map(|(table_name, mut table)| table.as_row_insert_request(table_name));
86 ctx_req.add_rows(opt, reqs);
87
88 ctx_req
89 })
90 .fold(ContextReq::default(), |mut req, reqs| {
91 req.merge(reqs);
92 req
93 })
94 }
95}
96
97#[derive(Debug)]
99pub struct TableBuilder {
100 schema: Vec<ColumnSchema>,
102 rows: Vec<Row>,
104 col_indexes: HashMap<String, usize>,
106}
107
108impl Default for TableBuilder {
109 fn default() -> Self {
110 Self::with_capacity(2, 0)
111 }
112}
113
114impl TableBuilder {
115 pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
116 let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
117 col_indexes.insert(GREPTIME_TIMESTAMP.to_string(), 0);
118 col_indexes.insert(GREPTIME_VALUE.to_string(), 1);
119
120 let mut schema = Vec::with_capacity(cols);
121 schema.push(time_index_column_schema(
122 GREPTIME_TIMESTAMP,
123 ColumnDataType::TimestampMillisecond,
124 ));
125 schema.push(field_column_schema(GREPTIME_VALUE, ColumnDataType::Float64));
126
127 Self {
128 schema,
129 rows: Vec::with_capacity(rows),
130 col_indexes,
131 }
132 }
133
134 pub(crate) fn add_labels_and_samples(
136 &mut self,
137 labels: &[PromLabel],
138 samples: &[Sample],
139 prom_validation_mode: PromValidationMode,
140 ) -> Result<(), DecodeError> {
141 let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
142
143 for PromLabel { name, value } in labels {
144 let tag_name = prom_validation_mode.decode_string(name)?;
145 let tag_value = prom_validation_mode.decode_string(value)?;
146 let tag_value = Some(ValueData::StringValue(tag_value));
147 let tag_num = self.col_indexes.len();
148
149 match self.col_indexes.entry(tag_name) {
150 Entry::Occupied(e) => {
151 row[*e.get()].value_data = tag_value;
152 }
153 Entry::Vacant(e) => {
154 self.schema
155 .push(tag_column_schema(e.key(), ColumnDataType::String));
156 e.insert(tag_num);
157 row.push(Value {
158 value_data: tag_value,
159 });
160 }
161 }
162 }
163
164 if samples.len() == 1 {
165 let sample = &samples[0];
166 row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
167 row[1].value_data = Some(ValueData::F64Value(sample.value));
168 self.rows.push(Row { values: row });
169 return Ok(());
170 }
171 for sample in samples {
172 row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
173 row[1].value_data = Some(ValueData::F64Value(sample.value));
174 self.rows.push(Row {
175 values: row.clone(),
176 });
177 }
178
179 Ok(())
180 }
181
182 pub fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest {
184 let mut rows = std::mem::take(&mut self.rows);
185 let schema = std::mem::take(&mut self.schema);
186 let col_num = schema.len();
187 for row in &mut rows {
188 if row.values.len() < col_num {
189 row.values.resize(col_num, Value { value_data: None });
190 }
191 }
192
193 RowInsertRequest {
194 table_name,
195 rows: Some(Rows { schema, rows }),
196 }
197 }
198
199 pub fn tags(&self) -> impl Iterator<Item = &String> {
200 self.schema
201 .iter()
202 .filter(|v| v.semantic_type == SemanticType::Tag as i32)
203 .map(|c| &c.column_name)
204 }
205}
206
207#[cfg(test)]
208mod tests {
209 use api::prom_store::remote::Sample;
210 use api::v1::value::ValueData;
211 use api::v1::Value;
212 use arrow::datatypes::ToByteSlice;
213 use bytes::Bytes;
214 use prost::DecodeError;
215
216 use crate::http::PromValidationMode;
217 use crate::prom_row_builder::TableBuilder;
218 use crate::proto::PromLabel;
219 #[test]
220 fn test_table_builder() {
221 let mut builder = TableBuilder::default();
222 let _ = builder.add_labels_and_samples(
223 &[
224 PromLabel {
225 name: Bytes::from("tag0"),
226 value: Bytes::from("v0"),
227 },
228 PromLabel {
229 name: Bytes::from("tag1"),
230 value: Bytes::from("v1"),
231 },
232 ],
233 &[Sample {
234 value: 0.0,
235 timestamp: 0,
236 }],
237 PromValidationMode::Strict,
238 );
239
240 let _ = builder.add_labels_and_samples(
241 &[
242 PromLabel {
243 name: Bytes::from("tag0"),
244 value: Bytes::from("v0"),
245 },
246 PromLabel {
247 name: Bytes::from("tag2"),
248 value: Bytes::from("v2"),
249 },
250 ],
251 &[Sample {
252 value: 0.1,
253 timestamp: 1,
254 }],
255 PromValidationMode::Strict,
256 );
257
258 let request = builder.as_row_insert_request("test".to_string());
259 let rows = request.rows.unwrap().rows;
260 assert_eq!(2, rows.len());
261
262 assert_eq!(
263 vec![
264 Value {
265 value_data: Some(ValueData::TimestampMillisecondValue(0))
266 },
267 Value {
268 value_data: Some(ValueData::F64Value(0.0))
269 },
270 Value {
271 value_data: Some(ValueData::StringValue("v0".to_string()))
272 },
273 Value {
274 value_data: Some(ValueData::StringValue("v1".to_string()))
275 },
276 Value { value_data: None },
277 ],
278 rows[0].values
279 );
280
281 assert_eq!(
282 vec![
283 Value {
284 value_data: Some(ValueData::TimestampMillisecondValue(1))
285 },
286 Value {
287 value_data: Some(ValueData::F64Value(0.1))
288 },
289 Value {
290 value_data: Some(ValueData::StringValue("v0".to_string()))
291 },
292 Value { value_data: None },
293 Value {
294 value_data: Some(ValueData::StringValue("v2".to_string()))
295 },
296 ],
297 rows[1].values
298 );
299
300 let invalid_utf8_bytes = &[0xFF, 0xFF, 0xFF];
301
302 let res = builder.add_labels_and_samples(
303 &[PromLabel {
304 name: Bytes::from("tag0"),
305 value: invalid_utf8_bytes.to_byte_slice().into(),
306 }],
307 &[Sample {
308 value: 0.1,
309 timestamp: 1,
310 }],
311 PromValidationMode::Strict,
312 );
313 assert_eq!(res, Err(DecodeError::new("invalid utf-8")));
314 }
315}