pipeline/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::ColumnDataType;
19use common_time::timestamp::TimeUnit;
20use common_time::Timestamp;
21use datatypes::timestamp::TimestampNanosecond;
22use itertools::Itertools;
23use session::context::Channel;
24use snafu::ensure;
25use util::to_pipeline_version;
26
27use crate::error::{CastTypeSnafu, InvalidCustomTimeIndexSnafu, PipelineMissingSnafu, Result};
28use crate::etl::value::time::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RESOLUTION};
29use crate::table::PipelineTable;
30use crate::{GreptimePipelineParams, Pipeline, Value};
31
32mod pipeline_cache;
33pub mod pipeline_operator;
34pub mod table;
35pub mod util;
36
37/// Pipeline version. An optional timestamp with nanosecond precision.
38///
39/// If the version is None, it means the latest version of the pipeline.
40/// User can specify the version by providing a timestamp string formatted as iso8601.
41/// When it used in cache key, it will be converted to i64 meaning the number of nanoseconds since the epoch.
42pub type PipelineVersion = Option<TimestampNanosecond>;
43
44/// Pipeline info. A tuple of timestamp and pipeline reference.
45pub type PipelineInfo = (Timestamp, PipelineRef);
46
47pub type PipelineTableRef = Arc<PipelineTable>;
48pub type PipelineRef = Arc<Pipeline>;
49
50/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
51/// The key is used to uplift value from the attributes and serve as column name in the table
52#[derive(Default)]
53pub struct SelectInfo {
54    pub keys: Vec<String>,
55}
56
57/// Try to convert a string to SelectInfo
58/// The string should be a comma-separated list of keys
59/// example: "key1,key2,key3"
60/// The keys will be sorted and deduplicated
61impl From<String> for SelectInfo {
62    fn from(value: String) -> Self {
63        let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
64        keys.dedup();
65
66        SelectInfo { keys }
67    }
68}
69
70impl SelectInfo {
71    pub fn is_empty(&self) -> bool {
72        self.keys.is_empty()
73    }
74}
75
76pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
77pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME: &str = "greptime_trace_v0";
78pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1";
79
80/// Enum for holding information of a pipeline, which is either pipeline itself,
81/// or information that be used to retrieve a pipeline from `PipelineHandler`
82#[derive(Debug, Clone)]
83pub enum PipelineDefinition {
84    Resolved(Arc<Pipeline>),
85    ByNameAndValue((String, PipelineVersion)),
86    GreptimeIdentityPipeline(Option<IdentityTimeIndex>),
87}
88
89impl PipelineDefinition {
90    pub fn from_name(
91        name: &str,
92        version: PipelineVersion,
93        custom_time_index: Option<(String, bool)>,
94    ) -> Result<Self> {
95        if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
96            Ok(Self::GreptimeIdentityPipeline(
97                custom_time_index
98                    .map(|(config, ignore_errors)| {
99                        IdentityTimeIndex::from_config(config, ignore_errors)
100                    })
101                    .transpose()?,
102            ))
103        } else {
104            Ok(Self::ByNameAndValue((name.to_owned(), version)))
105        }
106    }
107
108    pub fn is_identity(&self) -> bool {
109        matches!(self, Self::GreptimeIdentityPipeline(_))
110    }
111
112    pub fn get_custom_ts(&self) -> Option<&IdentityTimeIndex> {
113        if let Self::GreptimeIdentityPipeline(custom_ts) = self {
114            custom_ts.as_ref()
115        } else {
116            None
117        }
118    }
119}
120
121pub struct PipelineContext<'a> {
122    pub pipeline_definition: &'a PipelineDefinition,
123    pub pipeline_param: &'a GreptimePipelineParams,
124    pub channel: Channel,
125}
126
127impl<'a> PipelineContext<'a> {
128    pub fn new(
129        pipeline_definition: &'a PipelineDefinition,
130        pipeline_param: &'a GreptimePipelineParams,
131        channel: Channel,
132    ) -> Self {
133        Self {
134            pipeline_definition,
135            pipeline_param,
136            channel,
137        }
138    }
139}
140pub enum PipelineWay {
141    OtlpLogDirect(Box<SelectInfo>),
142    Pipeline(PipelineDefinition),
143    OtlpTraceDirectV0,
144    OtlpTraceDirectV1,
145}
146
147impl PipelineWay {
148    pub fn from_name_and_default(
149        name: Option<&str>,
150        version: Option<&str>,
151        default_pipeline: Option<PipelineWay>,
152    ) -> Result<PipelineWay> {
153        if let Some(pipeline_name) = name {
154            if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
155                Ok(PipelineWay::OtlpTraceDirectV1)
156            } else if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME {
157                Ok(PipelineWay::OtlpTraceDirectV0)
158            } else {
159                Ok(PipelineWay::Pipeline(PipelineDefinition::from_name(
160                    pipeline_name,
161                    to_pipeline_version(version)?,
162                    None,
163                )?))
164            }
165        } else if let Some(default_pipeline) = default_pipeline {
166            Ok(default_pipeline)
167        } else {
168            PipelineMissingSnafu.fail()
169        }
170    }
171}
172
173const IDENTITY_TS_EPOCH: &str = "epoch";
174const IDENTITY_TS_DATESTR: &str = "datestr";
175
176#[derive(Debug, Clone)]
177pub enum IdentityTimeIndex {
178    Epoch(String, TimeUnit, bool),
179    DateStr(String, String, bool),
180}
181
182impl IdentityTimeIndex {
183    pub fn from_config(config: String, ignore_errors: bool) -> Result<Self> {
184        let parts = config.split(';').collect::<Vec<&str>>();
185        ensure!(
186            parts.len() == 3,
187            InvalidCustomTimeIndexSnafu {
188                config,
189                reason: "config format: '<field>;<type>;<config>'",
190            }
191        );
192
193        let field = parts[0].to_string();
194        match parts[1] {
195            IDENTITY_TS_EPOCH => match parts[2] {
196                NS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
197                    field,
198                    TimeUnit::Nanosecond,
199                    ignore_errors,
200                )),
201                US_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
202                    field,
203                    TimeUnit::Microsecond,
204                    ignore_errors,
205                )),
206                MS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
207                    field,
208                    TimeUnit::Millisecond,
209                    ignore_errors,
210                )),
211                S_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
212                    field,
213                    TimeUnit::Second,
214                    ignore_errors,
215                )),
216                _ => InvalidCustomTimeIndexSnafu {
217                    config,
218                    reason: "epoch type must be one of ns, us, ms, s",
219                }
220                .fail(),
221            },
222            IDENTITY_TS_DATESTR => Ok(IdentityTimeIndex::DateStr(
223                field,
224                parts[2].to_string(),
225                ignore_errors,
226            )),
227            _ => InvalidCustomTimeIndexSnafu {
228                config,
229                reason: "identity time index type must be one of epoch, datestr",
230            }
231            .fail(),
232        }
233    }
234
235    pub fn get_column_name(&self) -> &String {
236        match self {
237            IdentityTimeIndex::Epoch(field, _, _) => field,
238            IdentityTimeIndex::DateStr(field, _, _) => field,
239        }
240    }
241
242    pub fn get_ignore_errors(&self) -> bool {
243        match self {
244            IdentityTimeIndex::Epoch(_, _, ignore_errors) => *ignore_errors,
245            IdentityTimeIndex::DateStr(_, _, ignore_errors) => *ignore_errors,
246        }
247    }
248
249    pub fn get_datatype(&self) -> ColumnDataType {
250        match self {
251            IdentityTimeIndex::Epoch(_, unit, _) => match unit {
252                TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
253                TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
254                TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
255                TimeUnit::Second => ColumnDataType::TimestampSecond,
256            },
257            IdentityTimeIndex::DateStr(_, _, _) => ColumnDataType::TimestampNanosecond,
258        }
259    }
260
261    pub fn get_timestamp(&self, value: Option<&Value>) -> Result<ValueData> {
262        match self {
263            IdentityTimeIndex::Epoch(_, unit, ignore_errors) => {
264                let v = match value {
265                    Some(Value::Int32(v)) => *v as i64,
266                    Some(Value::Int64(v)) => *v,
267                    Some(Value::Uint32(v)) => *v as i64,
268                    Some(Value::Uint64(v)) => *v as i64,
269                    Some(Value::String(s)) => match s.parse::<i64>() {
270                        Ok(v) => v,
271                        Err(_) => {
272                            return if_ignore_errors(
273                                *ignore_errors,
274                                *unit,
275                                format!("failed to convert {} to number", s),
276                            )
277                        }
278                    },
279                    Some(Value::Timestamp(timestamp)) => timestamp.to_unit(unit),
280                    Some(v) => {
281                        return if_ignore_errors(
282                            *ignore_errors,
283                            *unit,
284                            format!("unsupported value type to convert to timestamp: {}", v),
285                        )
286                    }
287                    None => {
288                        return if_ignore_errors(*ignore_errors, *unit, "missing field".to_string())
289                    }
290                };
291                Ok(time_unit_to_value_data(*unit, v))
292            }
293            IdentityTimeIndex::DateStr(_, format, ignore_errors) => {
294                let v = match value {
295                    Some(Value::String(s)) => s,
296                    Some(v) => {
297                        return if_ignore_errors(
298                            *ignore_errors,
299                            TimeUnit::Nanosecond,
300                            format!("unsupported value type to convert to date string: {}", v),
301                        );
302                    }
303                    None => {
304                        return if_ignore_errors(
305                            *ignore_errors,
306                            TimeUnit::Nanosecond,
307                            "missing field".to_string(),
308                        )
309                    }
310                };
311
312                let timestamp = match chrono::DateTime::parse_from_str(v, format) {
313                    Ok(ts) => ts,
314                    Err(_) => {
315                        return if_ignore_errors(
316                            *ignore_errors,
317                            TimeUnit::Nanosecond,
318                            format!("failed to parse date string: {}, format: {}", v, format),
319                        )
320                    }
321                };
322
323                Ok(ValueData::TimestampNanosecondValue(
324                    timestamp.timestamp_nanos_opt().unwrap_or_default(),
325                ))
326            }
327        }
328    }
329}
330
331fn if_ignore_errors(ignore_errors: bool, unit: TimeUnit, msg: String) -> Result<ValueData> {
332    if ignore_errors {
333        Ok(time_unit_to_value_data(
334            unit,
335            Timestamp::current_time(unit).value(),
336        ))
337    } else {
338        CastTypeSnafu { msg }.fail()
339    }
340}
341
342fn time_unit_to_value_data(unit: TimeUnit, v: i64) -> ValueData {
343    match unit {
344        TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(v),
345        TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(v),
346        TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v),
347        TimeUnit::Second => ValueData::TimestampSecondValue(v),
348    }
349}