pipeline/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::ColumnDataType;
19use common_time::timestamp::TimeUnit;
20use common_time::Timestamp;
21use datatypes::timestamp::TimestampNanosecond;
22use itertools::Itertools;
23use snafu::ensure;
24use util::to_pipeline_version;
25
26use crate::error::{CastTypeSnafu, InvalidCustomTimeIndexSnafu, PipelineMissingSnafu, Result};
27use crate::etl::value::time::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RESOLUTION};
28use crate::table::PipelineTable;
29use crate::{GreptimePipelineParams, Pipeline, Value};
30
31pub mod pipeline_operator;
32pub mod table;
33pub mod util;
34
35/// Pipeline version. An optional timestamp with nanosecond precision.
36///
37/// If the version is None, it means the latest version of the pipeline.
38/// User can specify the version by providing a timestamp string formatted as iso8601.
39/// When it used in cache key, it will be converted to i64 meaning the number of nanoseconds since the epoch.
40pub type PipelineVersion = Option<TimestampNanosecond>;
41
42/// Pipeline info. A tuple of timestamp and pipeline reference.
43pub type PipelineInfo = (Timestamp, PipelineRef);
44
45pub type PipelineTableRef = Arc<PipelineTable>;
46pub type PipelineRef = Arc<Pipeline>;
47
48/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
49/// The key is used to uplift value from the attributes and serve as column name in the table
50#[derive(Default)]
51pub struct SelectInfo {
52    pub keys: Vec<String>,
53}
54
55/// Try to convert a string to SelectInfo
56/// The string should be a comma-separated list of keys
57/// example: "key1,key2,key3"
58/// The keys will be sorted and deduplicated
59impl From<String> for SelectInfo {
60    fn from(value: String) -> Self {
61        let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
62        keys.dedup();
63
64        SelectInfo { keys }
65    }
66}
67
68impl SelectInfo {
69    pub fn is_empty(&self) -> bool {
70        self.keys.is_empty()
71    }
72}
73
74pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
75pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME: &str = "greptime_trace_v0";
76pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1";
77
78/// Enum for holding information of a pipeline, which is either pipeline itself,
79/// or information that be used to retrieve a pipeline from `PipelineHandler`
80#[derive(Debug, Clone)]
81pub enum PipelineDefinition {
82    Resolved(Arc<Pipeline>),
83    ByNameAndValue((String, PipelineVersion)),
84    GreptimeIdentityPipeline(Option<IdentityTimeIndex>),
85}
86
87impl PipelineDefinition {
88    pub fn from_name(
89        name: &str,
90        version: PipelineVersion,
91        custom_time_index: Option<(String, bool)>,
92    ) -> Result<Self> {
93        if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
94            Ok(Self::GreptimeIdentityPipeline(
95                custom_time_index
96                    .map(|(config, ignore_errors)| {
97                        IdentityTimeIndex::from_config(config, ignore_errors)
98                    })
99                    .transpose()?,
100            ))
101        } else {
102            Ok(Self::ByNameAndValue((name.to_owned(), version)))
103        }
104    }
105
106    pub fn is_identity(&self) -> bool {
107        matches!(self, Self::GreptimeIdentityPipeline(_))
108    }
109
110    pub fn get_custom_ts(&self) -> Option<&IdentityTimeIndex> {
111        if let Self::GreptimeIdentityPipeline(custom_ts) = self {
112            custom_ts.as_ref()
113        } else {
114            None
115        }
116    }
117}
118
119pub struct PipelineContext<'a> {
120    pub pipeline_definition: &'a PipelineDefinition,
121    pub pipeline_param: &'a GreptimePipelineParams,
122}
123
124impl<'a> PipelineContext<'a> {
125    pub fn new(
126        pipeline_definition: &'a PipelineDefinition,
127        pipeline_param: &'a GreptimePipelineParams,
128    ) -> Self {
129        Self {
130            pipeline_definition,
131            pipeline_param,
132        }
133    }
134}
135pub enum PipelineWay {
136    OtlpLogDirect(Box<SelectInfo>),
137    Pipeline(PipelineDefinition),
138    OtlpTraceDirectV0,
139    OtlpTraceDirectV1,
140}
141
142impl PipelineWay {
143    pub fn from_name_and_default(
144        name: Option<&str>,
145        version: Option<&str>,
146        default_pipeline: Option<PipelineWay>,
147    ) -> Result<PipelineWay> {
148        if let Some(pipeline_name) = name {
149            if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
150                Ok(PipelineWay::OtlpTraceDirectV1)
151            } else if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME {
152                Ok(PipelineWay::OtlpTraceDirectV0)
153            } else {
154                Ok(PipelineWay::Pipeline(PipelineDefinition::from_name(
155                    pipeline_name,
156                    to_pipeline_version(version)?,
157                    None,
158                )?))
159            }
160        } else if let Some(default_pipeline) = default_pipeline {
161            Ok(default_pipeline)
162        } else {
163            PipelineMissingSnafu.fail()
164        }
165    }
166}
167
168const IDENTITY_TS_EPOCH: &str = "epoch";
169const IDENTITY_TS_DATESTR: &str = "datestr";
170
171#[derive(Debug, Clone)]
172pub enum IdentityTimeIndex {
173    Epoch(String, TimeUnit, bool),
174    DateStr(String, String, bool),
175}
176
177impl IdentityTimeIndex {
178    pub fn from_config(config: String, ignore_errors: bool) -> Result<Self> {
179        let parts = config.split(';').collect::<Vec<&str>>();
180        ensure!(
181            parts.len() == 3,
182            InvalidCustomTimeIndexSnafu {
183                config,
184                reason: "config format: '<field>;<type>;<config>'",
185            }
186        );
187
188        let field = parts[0].to_string();
189        match parts[1] {
190            IDENTITY_TS_EPOCH => match parts[2] {
191                NS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
192                    field,
193                    TimeUnit::Nanosecond,
194                    ignore_errors,
195                )),
196                US_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
197                    field,
198                    TimeUnit::Microsecond,
199                    ignore_errors,
200                )),
201                MS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
202                    field,
203                    TimeUnit::Millisecond,
204                    ignore_errors,
205                )),
206                S_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
207                    field,
208                    TimeUnit::Second,
209                    ignore_errors,
210                )),
211                _ => InvalidCustomTimeIndexSnafu {
212                    config,
213                    reason: "epoch type must be one of ns, us, ms, s",
214                }
215                .fail(),
216            },
217            IDENTITY_TS_DATESTR => Ok(IdentityTimeIndex::DateStr(
218                field,
219                parts[2].to_string(),
220                ignore_errors,
221            )),
222            _ => InvalidCustomTimeIndexSnafu {
223                config,
224                reason: "identity time index type must be one of epoch, datestr",
225            }
226            .fail(),
227        }
228    }
229
230    pub fn get_column_name(&self) -> &String {
231        match self {
232            IdentityTimeIndex::Epoch(field, _, _) => field,
233            IdentityTimeIndex::DateStr(field, _, _) => field,
234        }
235    }
236
237    pub fn get_ignore_errors(&self) -> bool {
238        match self {
239            IdentityTimeIndex::Epoch(_, _, ignore_errors) => *ignore_errors,
240            IdentityTimeIndex::DateStr(_, _, ignore_errors) => *ignore_errors,
241        }
242    }
243
244    pub fn get_datatype(&self) -> ColumnDataType {
245        match self {
246            IdentityTimeIndex::Epoch(_, unit, _) => match unit {
247                TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
248                TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
249                TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
250                TimeUnit::Second => ColumnDataType::TimestampSecond,
251            },
252            IdentityTimeIndex::DateStr(_, _, _) => ColumnDataType::TimestampNanosecond,
253        }
254    }
255
256    pub fn get_timestamp(&self, value: Option<&Value>) -> Result<ValueData> {
257        match self {
258            IdentityTimeIndex::Epoch(_, unit, ignore_errors) => {
259                let v = match value {
260                    Some(Value::Int32(v)) => *v as i64,
261                    Some(Value::Int64(v)) => *v,
262                    Some(Value::Uint32(v)) => *v as i64,
263                    Some(Value::Uint64(v)) => *v as i64,
264                    Some(Value::String(s)) => match s.parse::<i64>() {
265                        Ok(v) => v,
266                        Err(_) => {
267                            return if_ignore_errors(
268                                *ignore_errors,
269                                *unit,
270                                format!("failed to convert {} to number", s),
271                            )
272                        }
273                    },
274                    Some(Value::Timestamp(timestamp)) => timestamp.to_unit(unit),
275                    Some(v) => {
276                        return if_ignore_errors(
277                            *ignore_errors,
278                            *unit,
279                            format!("unsupported value type to convert to timestamp: {}", v),
280                        )
281                    }
282                    None => {
283                        return if_ignore_errors(*ignore_errors, *unit, "missing field".to_string())
284                    }
285                };
286                Ok(time_unit_to_value_data(*unit, v))
287            }
288            IdentityTimeIndex::DateStr(_, format, ignore_errors) => {
289                let v = match value {
290                    Some(Value::String(s)) => s,
291                    Some(v) => {
292                        return if_ignore_errors(
293                            *ignore_errors,
294                            TimeUnit::Nanosecond,
295                            format!("unsupported value type to convert to date string: {}", v),
296                        );
297                    }
298                    None => {
299                        return if_ignore_errors(
300                            *ignore_errors,
301                            TimeUnit::Nanosecond,
302                            "missing field".to_string(),
303                        )
304                    }
305                };
306
307                let timestamp = match chrono::DateTime::parse_from_str(v, format) {
308                    Ok(ts) => ts,
309                    Err(_) => {
310                        return if_ignore_errors(
311                            *ignore_errors,
312                            TimeUnit::Nanosecond,
313                            format!("failed to parse date string: {}, format: {}", v, format),
314                        )
315                    }
316                };
317
318                Ok(ValueData::TimestampNanosecondValue(
319                    timestamp.timestamp_nanos_opt().unwrap_or_default(),
320                ))
321            }
322        }
323    }
324}
325
326fn if_ignore_errors(ignore_errors: bool, unit: TimeUnit, msg: String) -> Result<ValueData> {
327    if ignore_errors {
328        Ok(time_unit_to_value_data(
329            unit,
330            Timestamp::current_time(unit).value(),
331        ))
332    } else {
333        CastTypeSnafu { msg }.fail()
334    }
335}
336
337fn time_unit_to_value_data(unit: TimeUnit, v: i64) -> ValueData {
338    match unit {
339        TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(v),
340        TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(v),
341        TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v),
342        TimeUnit::Second => ValueData::TimestampSecondValue(v),
343    }
344}