1use std::collections::HashMap;
16use std::collections::hash_map::Entry;
17use std::string::ToString;
18
19use api::prom_store::remote::Sample;
20use api::v1::helper::{field_column_schema, tag_column_schema, time_index_column_schema};
21use api::v1::value::ValueData;
22use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
23use common_query::prelude::{greptime_timestamp, greptime_value};
24use pipeline::{ContextOpt, ContextReq};
25use prost::DecodeError;
26
27use crate::http::PromValidationMode;
28use crate::proto::PromLabel;
29use crate::repeated_field::Clear;
30
31#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
33pub struct PromCtx {
34 pub schema: Option<String>,
35 pub physical_table: Option<String>,
36}
37
38#[derive(Default, Debug)]
40pub struct TablesBuilder {
41 pub tables: HashMap<PromCtx, HashMap<String, TableBuilder>>,
43}
44
45impl Clear for TablesBuilder {
46 fn clear(&mut self) {
47 self.tables.clear();
48 }
49}
50
51impl TablesBuilder {
52 pub(crate) fn get_or_create_table_builder(
54 &mut self,
55 prom_ctx: PromCtx,
56 table_name: String,
57 label_num: usize,
58 row_num: usize,
59 ) -> &mut TableBuilder {
60 self.tables
61 .entry(prom_ctx)
62 .or_default()
63 .entry(table_name)
64 .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
65 }
66
67 pub(crate) fn as_insert_requests(&mut self) -> ContextReq {
69 self.tables
70 .drain()
71 .map(|(prom, mut tables)| {
72 let mut opt = ContextOpt::default();
74 if let Some(physical_table) = prom.physical_table {
75 opt.set_physical_table(physical_table);
76 }
77 if let Some(schema) = prom.schema {
78 opt.set_schema(schema);
79 }
80
81 let mut ctx_req = ContextReq::default();
83 let reqs = tables
84 .drain()
85 .map(|(table_name, mut table)| table.as_row_insert_request(table_name));
86 ctx_req.add_rows(opt, reqs);
87
88 ctx_req
89 })
90 .fold(ContextReq::default(), |mut req, reqs| {
91 req.merge(reqs);
92 req
93 })
94 }
95}
96
97#[derive(Debug)]
99pub struct TableBuilder {
100 schema: Vec<ColumnSchema>,
102 rows: Vec<Row>,
104 col_indexes: HashMap<String, usize>,
106}
107
108impl Default for TableBuilder {
109 fn default() -> Self {
110 Self::with_capacity(2, 0)
111 }
112}
113
114impl TableBuilder {
115 pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
116 let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
117 col_indexes.insert(greptime_timestamp().to_string(), 0);
118 col_indexes.insert(greptime_value().to_string(), 1);
119
120 let mut schema = Vec::with_capacity(cols);
121 schema.push(time_index_column_schema(
122 greptime_timestamp(),
123 ColumnDataType::TimestampMillisecond,
124 ));
125 schema.push(field_column_schema(
126 greptime_value(),
127 ColumnDataType::Float64,
128 ));
129
130 Self {
131 schema,
132 rows: Vec::with_capacity(rows),
133 col_indexes,
134 }
135 }
136
137 pub(crate) fn add_labels_and_samples(
139 &mut self,
140 labels: &[PromLabel],
141 samples: &[Sample],
142 prom_validation_mode: PromValidationMode,
143 ) -> Result<(), DecodeError> {
144 let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
145
146 for PromLabel { name, value } in labels {
147 let tag_name = prom_validation_mode.decode_string(name)?;
148 let tag_value = prom_validation_mode.decode_string(value)?;
149 let tag_value = Some(ValueData::StringValue(tag_value));
150 let tag_num = self.col_indexes.len();
151
152 match self.col_indexes.entry(tag_name) {
153 Entry::Occupied(e) => {
154 row[*e.get()].value_data = tag_value;
155 }
156 Entry::Vacant(e) => {
157 self.schema
158 .push(tag_column_schema(e.key(), ColumnDataType::String));
159 e.insert(tag_num);
160 row.push(Value {
161 value_data: tag_value,
162 });
163 }
164 }
165 }
166
167 if samples.len() == 1 {
168 let sample = &samples[0];
169 row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
170 row[1].value_data = Some(ValueData::F64Value(sample.value));
171 self.rows.push(Row { values: row });
172 return Ok(());
173 }
174 for sample in samples {
175 row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
176 row[1].value_data = Some(ValueData::F64Value(sample.value));
177 self.rows.push(Row {
178 values: row.clone(),
179 });
180 }
181
182 Ok(())
183 }
184
185 pub fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest {
187 let mut rows = std::mem::take(&mut self.rows);
188 let schema = std::mem::take(&mut self.schema);
189 let col_num = schema.len();
190 for row in &mut rows {
191 if row.values.len() < col_num {
192 row.values.resize(col_num, Value { value_data: None });
193 }
194 }
195
196 RowInsertRequest {
197 table_name,
198 rows: Some(Rows { schema, rows }),
199 }
200 }
201
202 pub fn tags(&self) -> impl Iterator<Item = &String> {
203 self.schema
204 .iter()
205 .filter(|v| v.semantic_type == SemanticType::Tag as i32)
206 .map(|c| &c.column_name)
207 }
208}
209
210#[cfg(test)]
211mod tests {
212 use api::prom_store::remote::Sample;
213 use api::v1::Value;
214 use api::v1::value::ValueData;
215 use arrow::datatypes::ToByteSlice;
216 use bytes::Bytes;
217 use prost::DecodeError;
218
219 use crate::http::PromValidationMode;
220 use crate::prom_row_builder::TableBuilder;
221 use crate::proto::PromLabel;
222 #[test]
223 fn test_table_builder() {
224 let mut builder = TableBuilder::default();
225 let _ = builder.add_labels_and_samples(
226 &[
227 PromLabel {
228 name: Bytes::from("tag0"),
229 value: Bytes::from("v0"),
230 },
231 PromLabel {
232 name: Bytes::from("tag1"),
233 value: Bytes::from("v1"),
234 },
235 ],
236 &[Sample {
237 value: 0.0,
238 timestamp: 0,
239 }],
240 PromValidationMode::Strict,
241 );
242
243 let _ = builder.add_labels_and_samples(
244 &[
245 PromLabel {
246 name: Bytes::from("tag0"),
247 value: Bytes::from("v0"),
248 },
249 PromLabel {
250 name: Bytes::from("tag2"),
251 value: Bytes::from("v2"),
252 },
253 ],
254 &[Sample {
255 value: 0.1,
256 timestamp: 1,
257 }],
258 PromValidationMode::Strict,
259 );
260
261 let request = builder.as_row_insert_request("test".to_string());
262 let rows = request.rows.unwrap().rows;
263 assert_eq!(2, rows.len());
264
265 assert_eq!(
266 vec![
267 Value {
268 value_data: Some(ValueData::TimestampMillisecondValue(0))
269 },
270 Value {
271 value_data: Some(ValueData::F64Value(0.0))
272 },
273 Value {
274 value_data: Some(ValueData::StringValue("v0".to_string()))
275 },
276 Value {
277 value_data: Some(ValueData::StringValue("v1".to_string()))
278 },
279 Value { value_data: None },
280 ],
281 rows[0].values
282 );
283
284 assert_eq!(
285 vec![
286 Value {
287 value_data: Some(ValueData::TimestampMillisecondValue(1))
288 },
289 Value {
290 value_data: Some(ValueData::F64Value(0.1))
291 },
292 Value {
293 value_data: Some(ValueData::StringValue("v0".to_string()))
294 },
295 Value { value_data: None },
296 Value {
297 value_data: Some(ValueData::StringValue("v2".to_string()))
298 },
299 ],
300 rows[1].values
301 );
302
303 let invalid_utf8_bytes = &[0xFF, 0xFF, 0xFF];
304
305 let res = builder.add_labels_and_samples(
306 &[PromLabel {
307 name: Bytes::from("tag0"),
308 value: invalid_utf8_bytes.to_byte_slice().into(),
309 }],
310 &[Sample {
311 value: 0.1,
312 timestamp: 1,
313 }],
314 PromValidationMode::Strict,
315 );
316 assert_eq!(res, Err(DecodeError::new("invalid utf-8")));
317 }
318}