servers/prom_remote_write/
row_builder.rs1use std::collections::HashMap;
16
17use api::prom_store::remote::Sample;
18use api::v1::helper::{field_column_schema, time_index_column_schema};
19use api::v1::value::ValueData;
20use api::v1::{ColumnDataType, ColumnSchema, Row, RowInsertRequest, Rows, SemanticType, Value};
21use common_query::prelude::{greptime_timestamp, greptime_value};
22use pipeline::{ContextOpt, ContextReq};
23use prost::DecodeError;
24
25use crate::prom_remote_write::PromValidationMode;
26use crate::prom_remote_write::types::PromLabel;
27use crate::prom_remote_write::validation::validate_label_name;
28use crate::repeated_field::Clear;
29
30#[derive(Debug, Default, Clone, Eq, PartialEq, Ord, PartialOrd, Hash)]
31pub struct PromCtx {
32 pub schema: Option<String>,
33 pub physical_table: Option<String>,
34}
35
36#[derive(Default, Debug)]
37pub struct TablesBuilder<'a> {
38 pub tables: HashMap<PromCtx, HashMap<String, TableBuilder<'a>>>,
39 pub(crate) raw_data: Vec<u8>,
40}
41
42impl<'a> Clear for TablesBuilder<'a> {
43 fn clear(&mut self) {
44 self.tables.clear();
45 self.raw_data.clear();
46 }
47}
48
49impl<'a> TablesBuilder<'a> {
50 pub(crate) fn get_or_create_table_builder(
51 &mut self,
52 prom_ctx: PromCtx,
53 table_name: String,
54 label_num: usize,
55 row_num: usize,
56 ) -> &mut TableBuilder<'a> {
57 self.tables
58 .entry(prom_ctx)
59 .or_default()
60 .entry(table_name)
61 .or_insert_with(|| TableBuilder::with_capacity(label_num + 2, row_num))
62 }
63
64 pub(crate) fn as_insert_requests(&mut self) -> ContextReq {
65 self.tables
66 .drain()
67 .map(|(prom, mut tables)| {
68 let mut opt = ContextOpt::default();
69 if let Some(physical_table) = prom.physical_table {
70 opt.set_physical_table(physical_table);
71 }
72 if let Some(schema) = prom.schema {
73 opt.set_schema(schema);
74 }
75
76 let mut ctx_req = ContextReq::default();
77 let reqs = tables
78 .drain()
79 .map(|(table_name, mut table)| table.as_row_insert_request(table_name));
80 ctx_req.add_rows(opt, reqs);
81
82 ctx_req
83 })
84 .fold(ContextReq::default(), |mut req, reqs| {
85 req.merge(reqs);
86 req
87 })
88 }
89
90 pub(crate) fn set_raw_data(&mut self, buf: Vec<u8>) {
91 self.raw_data = buf;
92 }
93}
94
95#[derive(Debug)]
96pub struct TableBuilder<'a> {
97 schema: Vec<ColumnSchema>,
98 rows: Vec<Row>,
99 col_indexes: HashMap<&'a [u8], usize>,
100}
101
102impl<'a> Default for TableBuilder<'a> {
103 fn default() -> Self {
104 Self::with_capacity(2, 0)
105 }
106}
107
108impl<'a> TableBuilder<'a> {
109 pub(crate) fn with_capacity(cols: usize, rows: usize) -> Self {
110 let mut col_indexes = HashMap::with_capacity_and_hasher(cols, Default::default());
111 col_indexes.insert(greptime_timestamp().as_bytes(), 0);
112 col_indexes.insert(greptime_value().as_bytes(), 1);
113
114 let mut schema = Vec::with_capacity(cols);
115 schema.push(time_index_column_schema(
116 greptime_timestamp(),
117 ColumnDataType::TimestampMillisecond,
118 ));
119 schema.push(field_column_schema(
120 greptime_value(),
121 ColumnDataType::Float64,
122 ));
123
124 Self {
125 schema,
126 rows: Vec::with_capacity(rows),
127 col_indexes,
128 }
129 }
130
131 pub(crate) fn add_labels_and_samples(
132 &mut self,
133 labels: &[PromLabel],
134 samples: &[Sample],
135 prom_validation_mode: PromValidationMode,
136 ) -> Result<(), DecodeError> {
137 let mut row = vec![Value { value_data: None }; self.col_indexes.len()];
138
139 for PromLabel { name, value } in labels {
140 if !validate_label_name(name) {
141 return Err(DecodeError::new(format!(
142 "Invalid label name: `{}`",
143 String::from_utf8_lossy(name)
144 )));
145 }
146 let raw_tag_name = *name;
147 let tag_value = Some(ValueData::StringValue(
148 prom_validation_mode.decode_string(value)?,
149 ));
150 let tag_num = self.col_indexes.len();
151
152 if let Some(e) = self.col_indexes.get_mut(raw_tag_name) {
153 row[*e].value_data = tag_value;
154 continue;
155 }
156
157 let tag_name = unsafe { std::str::from_utf8_unchecked(raw_tag_name) };
158 self.schema.push(ColumnSchema {
159 column_name: tag_name.to_owned(),
160 datatype: ColumnDataType::String as i32,
161 semantic_type: SemanticType::Tag as i32,
162 ..Default::default()
163 });
164 self.col_indexes.insert(raw_tag_name, tag_num);
165
166 row.push(Value {
167 value_data: tag_value,
168 });
169 }
170
171 if samples.len() == 1 {
172 let sample = &samples[0];
173 row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
174 row[1].value_data = Some(ValueData::F64Value(sample.value));
175 self.rows.push(Row { values: row });
176 return Ok(());
177 }
178 for sample in samples {
179 row[0].value_data = Some(ValueData::TimestampMillisecondValue(sample.timestamp));
180 row[1].value_data = Some(ValueData::F64Value(sample.value));
181 self.rows.push(Row {
182 values: row.clone(),
183 });
184 }
185
186 Ok(())
187 }
188
189 pub fn as_row_insert_request(&mut self, table_name: String) -> RowInsertRequest {
190 let mut rows = std::mem::take(&mut self.rows);
191 let schema = std::mem::take(&mut self.schema);
192 let col_num = schema.len();
193 for row in &mut rows {
194 if row.values.len() < col_num {
195 row.values.resize(col_num, Value { value_data: None });
196 }
197 }
198
199 RowInsertRequest {
200 table_name,
201 rows: Some(Rows { schema, rows }),
202 }
203 }
204
205 pub fn tags(&self) -> impl Iterator<Item = &String> {
206 self.schema
207 .iter()
208 .filter(|v| v.semantic_type == SemanticType::Tag as i32)
209 .map(|c| &c.column_name)
210 }
211}
212
213#[cfg(test)]
214mod tests {
215 use api::prom_store::remote::Sample;
216 use prost::DecodeError;
217
218 use super::*;
219
220 #[test]
221 fn test_table_builder() {
222 let mut builder = TableBuilder::default();
223 let _ = builder.add_labels_and_samples(
224 &[
225 PromLabel {
226 name: b"tag0",
227 value: b"v0",
228 },
229 PromLabel {
230 name: b"tag1",
231 value: b"v1",
232 },
233 ],
234 &[Sample {
235 value: 0.0,
236 timestamp: 0,
237 }],
238 PromValidationMode::Strict,
239 );
240
241 let _ = builder.add_labels_and_samples(
242 &[
243 PromLabel {
244 name: b"tag0",
245 value: b"v0",
246 },
247 PromLabel {
248 name: b"tag2",
249 value: b"v2",
250 },
251 ],
252 &[Sample {
253 value: 0.1,
254 timestamp: 1,
255 }],
256 PromValidationMode::Strict,
257 );
258
259 let request = builder.as_row_insert_request("test".to_string());
260 let rows = request.rows.unwrap().rows;
261 assert_eq!(2, rows.len());
262
263 let invalid_utf8_bytes = &[0xFF, 0xFF, 0xFF];
264 let res = builder.add_labels_and_samples(
265 &[PromLabel {
266 name: b"tag0",
267 value: invalid_utf8_bytes,
268 }],
269 &[Sample {
270 value: 0.1,
271 timestamp: 1,
272 }],
273 PromValidationMode::Strict,
274 );
275 assert_eq!(res, Err(DecodeError::new("invalid utf-8")));
276 }
277}