pipeline/
manager.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::ColumnDataType;
19use chrono::{DateTime, Utc};
20use common_time::timestamp::TimeUnit;
21use common_time::Timestamp;
22use datatypes::timestamp::TimestampNanosecond;
23use itertools::Itertools;
24use session::context::Channel;
25use snafu::{ensure, OptionExt};
26use util::to_pipeline_version;
27use vrl::value::Value as VrlValue;
28
29use crate::error::{
30    CastTypeSnafu, InvalidCustomTimeIndexSnafu, InvalidTimestampSnafu, PipelineMissingSnafu, Result,
31};
32use crate::etl::value::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RESOLUTION};
33use crate::table::PipelineTable;
34use crate::{GreptimePipelineParams, Pipeline};
35
36mod pipeline_cache;
37pub mod pipeline_operator;
38pub mod table;
39pub mod util;
40
41/// Pipeline version. An optional timestamp with nanosecond precision.
42///
43/// If the version is None, it means the latest version of the pipeline.
44/// User can specify the version by providing a timestamp string formatted as iso8601.
45/// When it used in cache key, it will be converted to i64 meaning the number of nanoseconds since the epoch.
46pub type PipelineVersion = Option<TimestampNanosecond>;
47
48/// Pipeline info. A tuple of timestamp and pipeline reference.
49pub type PipelineInfo = (Timestamp, PipelineRef);
50
51pub type PipelineTableRef = Arc<PipelineTable>;
52pub type PipelineRef = Arc<Pipeline>;
53
54/// SelectInfo is used to store the selected keys from OpenTelemetry record attrs
55/// The key is used to uplift value from the attributes and serve as column name in the table
56#[derive(Default)]
57pub struct SelectInfo {
58    pub keys: Vec<String>,
59}
60
61/// Try to convert a string to SelectInfo
62/// The string should be a comma-separated list of keys
63/// example: "key1,key2,key3"
64/// The keys will be sorted and deduplicated
65impl From<String> for SelectInfo {
66    fn from(value: String) -> Self {
67        let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
68        keys.dedup();
69
70        SelectInfo { keys }
71    }
72}
73
74impl SelectInfo {
75    pub fn is_empty(&self) -> bool {
76        self.keys.is_empty()
77    }
78}
79
80pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
81pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME: &str = "greptime_trace_v0";
82pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1";
83
84/// Enum for holding information of a pipeline, which is either pipeline itself,
85/// or information that be used to retrieve a pipeline from `PipelineHandler`
86#[derive(Debug, Clone)]
87pub enum PipelineDefinition {
88    Resolved(Arc<Pipeline>),
89    ByNameAndValue((String, PipelineVersion)),
90    GreptimeIdentityPipeline(Option<IdentityTimeIndex>),
91}
92
93impl PipelineDefinition {
94    pub fn from_name(
95        name: &str,
96        version: PipelineVersion,
97        custom_time_index: Option<(String, bool)>,
98    ) -> Result<Self> {
99        if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
100            Ok(Self::GreptimeIdentityPipeline(
101                custom_time_index
102                    .map(|(config, ignore_errors)| {
103                        IdentityTimeIndex::from_config(config, ignore_errors)
104                    })
105                    .transpose()?,
106            ))
107        } else {
108            Ok(Self::ByNameAndValue((name.to_owned(), version)))
109        }
110    }
111
112    pub fn is_identity(&self) -> bool {
113        matches!(self, Self::GreptimeIdentityPipeline(_))
114    }
115
116    pub fn get_custom_ts(&self) -> Option<&IdentityTimeIndex> {
117        if let Self::GreptimeIdentityPipeline(custom_ts) = self {
118            custom_ts.as_ref()
119        } else {
120            None
121        }
122    }
123}
124
125pub struct PipelineContext<'a> {
126    pub pipeline_definition: &'a PipelineDefinition,
127    pub pipeline_param: &'a GreptimePipelineParams,
128    pub channel: Channel,
129}
130
131impl<'a> PipelineContext<'a> {
132    pub fn new(
133        pipeline_definition: &'a PipelineDefinition,
134        pipeline_param: &'a GreptimePipelineParams,
135        channel: Channel,
136    ) -> Self {
137        Self {
138            pipeline_definition,
139            pipeline_param,
140            channel,
141        }
142    }
143}
144pub enum PipelineWay {
145    OtlpLogDirect(Box<SelectInfo>),
146    Pipeline(PipelineDefinition),
147    OtlpTraceDirectV0,
148    OtlpTraceDirectV1,
149}
150
151impl PipelineWay {
152    pub fn from_name_and_default(
153        name: Option<&str>,
154        version: Option<&str>,
155        default_pipeline: Option<PipelineWay>,
156    ) -> Result<PipelineWay> {
157        if let Some(pipeline_name) = name {
158            if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
159                Ok(PipelineWay::OtlpTraceDirectV1)
160            } else if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME {
161                Ok(PipelineWay::OtlpTraceDirectV0)
162            } else {
163                Ok(PipelineWay::Pipeline(PipelineDefinition::from_name(
164                    pipeline_name,
165                    to_pipeline_version(version)?,
166                    None,
167                )?))
168            }
169        } else if let Some(default_pipeline) = default_pipeline {
170            Ok(default_pipeline)
171        } else {
172            PipelineMissingSnafu.fail()
173        }
174    }
175}
176
177const IDENTITY_TS_EPOCH: &str = "epoch";
178const IDENTITY_TS_DATESTR: &str = "datestr";
179
180#[derive(Debug, Clone)]
181pub enum IdentityTimeIndex {
182    Epoch(String, TimeUnit, bool),
183    DateStr(String, String, bool),
184}
185
186impl IdentityTimeIndex {
187    pub fn from_config(config: String, ignore_errors: bool) -> Result<Self> {
188        let parts = config.split(';').collect::<Vec<&str>>();
189        ensure!(
190            parts.len() == 3,
191            InvalidCustomTimeIndexSnafu {
192                config,
193                reason: "config format: '<field>;<type>;<config>'",
194            }
195        );
196
197        let field = parts[0].to_string();
198        match parts[1] {
199            IDENTITY_TS_EPOCH => match parts[2] {
200                NS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
201                    field,
202                    TimeUnit::Nanosecond,
203                    ignore_errors,
204                )),
205                US_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
206                    field,
207                    TimeUnit::Microsecond,
208                    ignore_errors,
209                )),
210                MS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
211                    field,
212                    TimeUnit::Millisecond,
213                    ignore_errors,
214                )),
215                S_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
216                    field,
217                    TimeUnit::Second,
218                    ignore_errors,
219                )),
220                _ => InvalidCustomTimeIndexSnafu {
221                    config,
222                    reason: "epoch type must be one of ns, us, ms, s",
223                }
224                .fail(),
225            },
226            IDENTITY_TS_DATESTR => Ok(IdentityTimeIndex::DateStr(
227                field,
228                parts[2].to_string(),
229                ignore_errors,
230            )),
231            _ => InvalidCustomTimeIndexSnafu {
232                config,
233                reason: "identity time index type must be one of epoch, datestr",
234            }
235            .fail(),
236        }
237    }
238
239    pub fn get_column_name(&self) -> &str {
240        match self {
241            IdentityTimeIndex::Epoch(field, _, _) => field,
242            IdentityTimeIndex::DateStr(field, _, _) => field,
243        }
244    }
245
246    pub fn get_ignore_errors(&self) -> bool {
247        match self {
248            IdentityTimeIndex::Epoch(_, _, ignore_errors) => *ignore_errors,
249            IdentityTimeIndex::DateStr(_, _, ignore_errors) => *ignore_errors,
250        }
251    }
252
253    pub fn get_datatype(&self) -> ColumnDataType {
254        match self {
255            IdentityTimeIndex::Epoch(_, unit, _) => match unit {
256                TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
257                TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
258                TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
259                TimeUnit::Second => ColumnDataType::TimestampSecond,
260            },
261            IdentityTimeIndex::DateStr(_, _, _) => ColumnDataType::TimestampNanosecond,
262        }
263    }
264
265    pub fn get_timestamp_value(&self, value: Option<&VrlValue>) -> Result<ValueData> {
266        match self {
267            IdentityTimeIndex::Epoch(_, unit, ignore_errors) => {
268                let v = match value {
269                    Some(VrlValue::Integer(v)) => *v,
270                    Some(VrlValue::Bytes(s)) => match String::from_utf8_lossy(s).parse::<i64>() {
271                        Ok(v) => v,
272                        Err(_) => {
273                            return if_ignore_errors(
274                                *ignore_errors,
275                                *unit,
276                                format!(
277                                    "failed to convert {} to number",
278                                    String::from_utf8_lossy(s)
279                                ),
280                            )
281                        }
282                    },
283                    Some(VrlValue::Timestamp(timestamp)) => datetime_utc_to_unit(timestamp, unit)?,
284                    Some(v) => {
285                        return if_ignore_errors(
286                            *ignore_errors,
287                            *unit,
288                            format!("unsupported value type to convert to timestamp: {}", v),
289                        )
290                    }
291                    None => {
292                        return if_ignore_errors(*ignore_errors, *unit, "missing field".to_string())
293                    }
294                };
295                Ok(time_unit_to_value_data(*unit, v))
296            }
297            IdentityTimeIndex::DateStr(_, format, ignore_errors) => {
298                let v = match value {
299                    Some(VrlValue::Bytes(s)) => String::from_utf8_lossy(s),
300                    Some(v) => {
301                        return if_ignore_errors(
302                            *ignore_errors,
303                            TimeUnit::Nanosecond,
304                            format!("unsupported value type to convert to date string: {}", v),
305                        );
306                    }
307                    None => {
308                        return if_ignore_errors(
309                            *ignore_errors,
310                            TimeUnit::Nanosecond,
311                            "missing field".to_string(),
312                        )
313                    }
314                };
315
316                let timestamp = match chrono::DateTime::parse_from_str(&v, format) {
317                    Ok(ts) => ts,
318                    Err(_) => {
319                        return if_ignore_errors(
320                            *ignore_errors,
321                            TimeUnit::Nanosecond,
322                            format!("failed to parse date string: {}, format: {}", v, format),
323                        )
324                    }
325                };
326
327                Ok(ValueData::TimestampNanosecondValue(
328                    timestamp
329                        .timestamp_nanos_opt()
330                        .context(InvalidTimestampSnafu {
331                            input: timestamp.to_rfc3339(),
332                        })?,
333                ))
334            }
335        }
336    }
337}
338
339fn datetime_utc_to_unit(timestamp: &DateTime<Utc>, unit: &TimeUnit) -> Result<i64> {
340    let ts = match unit {
341        TimeUnit::Nanosecond => timestamp
342            .timestamp_nanos_opt()
343            .context(InvalidTimestampSnafu {
344                input: timestamp.to_rfc3339(),
345            })?,
346        TimeUnit::Microsecond => timestamp.timestamp_micros(),
347        TimeUnit::Millisecond => timestamp.timestamp_millis(),
348        TimeUnit::Second => timestamp.timestamp(),
349    };
350    Ok(ts)
351}
352
353fn if_ignore_errors(ignore_errors: bool, unit: TimeUnit, msg: String) -> Result<ValueData> {
354    if ignore_errors {
355        Ok(time_unit_to_value_data(
356            unit,
357            Timestamp::current_time(unit).value(),
358        ))
359    } else {
360        CastTypeSnafu { msg }.fail()
361    }
362}
363
364fn time_unit_to_value_data(unit: TimeUnit, v: i64) -> ValueData {
365    match unit {
366        TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(v),
367        TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(v),
368        TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v),
369        TimeUnit::Second => ValueData::TimestampSecondValue(v),
370    }
371}