1use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::ColumnDataType;
19use chrono::{DateTime, Utc};
20use common_time::timestamp::TimeUnit;
21use common_time::Timestamp;
22use datatypes::timestamp::TimestampNanosecond;
23use itertools::Itertools;
24use session::context::Channel;
25use snafu::{ensure, OptionExt};
26use util::to_pipeline_version;
27use vrl::value::Value as VrlValue;
28
29use crate::error::{
30 CastTypeSnafu, InvalidCustomTimeIndexSnafu, InvalidTimestampSnafu, PipelineMissingSnafu, Result,
31};
32use crate::etl::value::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RESOLUTION};
33use crate::table::PipelineTable;
34use crate::{GreptimePipelineParams, Pipeline};
35
36mod pipeline_cache;
37pub mod pipeline_operator;
38pub mod table;
39pub mod util;
40
41pub type PipelineVersion = Option<TimestampNanosecond>;
47
48pub type PipelineInfo = (Timestamp, PipelineRef);
50
51pub type PipelineTableRef = Arc<PipelineTable>;
52pub type PipelineRef = Arc<Pipeline>;
53
54#[derive(Default)]
57pub struct SelectInfo {
58 pub keys: Vec<String>,
59}
60
61impl From<String> for SelectInfo {
66 fn from(value: String) -> Self {
67 let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
68 keys.dedup();
69
70 SelectInfo { keys }
71 }
72}
73
74impl SelectInfo {
75 pub fn is_empty(&self) -> bool {
76 self.keys.is_empty()
77 }
78}
79
80pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
81pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME: &str = "greptime_trace_v0";
82pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1";
83
84#[derive(Debug, Clone)]
87pub enum PipelineDefinition {
88 Resolved(Arc<Pipeline>),
89 ByNameAndValue((String, PipelineVersion)),
90 GreptimeIdentityPipeline(Option<IdentityTimeIndex>),
91}
92
93impl PipelineDefinition {
94 pub fn from_name(
95 name: &str,
96 version: PipelineVersion,
97 custom_time_index: Option<(String, bool)>,
98 ) -> Result<Self> {
99 if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
100 Ok(Self::GreptimeIdentityPipeline(
101 custom_time_index
102 .map(|(config, ignore_errors)| {
103 IdentityTimeIndex::from_config(config, ignore_errors)
104 })
105 .transpose()?,
106 ))
107 } else {
108 Ok(Self::ByNameAndValue((name.to_owned(), version)))
109 }
110 }
111
112 pub fn is_identity(&self) -> bool {
113 matches!(self, Self::GreptimeIdentityPipeline(_))
114 }
115
116 pub fn get_custom_ts(&self) -> Option<&IdentityTimeIndex> {
117 if let Self::GreptimeIdentityPipeline(custom_ts) = self {
118 custom_ts.as_ref()
119 } else {
120 None
121 }
122 }
123}
124
125pub struct PipelineContext<'a> {
126 pub pipeline_definition: &'a PipelineDefinition,
127 pub pipeline_param: &'a GreptimePipelineParams,
128 pub channel: Channel,
129}
130
131impl<'a> PipelineContext<'a> {
132 pub fn new(
133 pipeline_definition: &'a PipelineDefinition,
134 pipeline_param: &'a GreptimePipelineParams,
135 channel: Channel,
136 ) -> Self {
137 Self {
138 pipeline_definition,
139 pipeline_param,
140 channel,
141 }
142 }
143}
144pub enum PipelineWay {
145 OtlpLogDirect(Box<SelectInfo>),
146 Pipeline(PipelineDefinition),
147 OtlpTraceDirectV0,
148 OtlpTraceDirectV1,
149}
150
151impl PipelineWay {
152 pub fn from_name_and_default(
153 name: Option<&str>,
154 version: Option<&str>,
155 default_pipeline: Option<PipelineWay>,
156 ) -> Result<PipelineWay> {
157 if let Some(pipeline_name) = name {
158 if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
159 Ok(PipelineWay::OtlpTraceDirectV1)
160 } else if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME {
161 Ok(PipelineWay::OtlpTraceDirectV0)
162 } else {
163 Ok(PipelineWay::Pipeline(PipelineDefinition::from_name(
164 pipeline_name,
165 to_pipeline_version(version)?,
166 None,
167 )?))
168 }
169 } else if let Some(default_pipeline) = default_pipeline {
170 Ok(default_pipeline)
171 } else {
172 PipelineMissingSnafu.fail()
173 }
174 }
175}
176
177const IDENTITY_TS_EPOCH: &str = "epoch";
178const IDENTITY_TS_DATESTR: &str = "datestr";
179
180#[derive(Debug, Clone)]
181pub enum IdentityTimeIndex {
182 Epoch(String, TimeUnit, bool),
183 DateStr(String, String, bool),
184}
185
186impl IdentityTimeIndex {
187 pub fn from_config(config: String, ignore_errors: bool) -> Result<Self> {
188 let parts = config.split(';').collect::<Vec<&str>>();
189 ensure!(
190 parts.len() == 3,
191 InvalidCustomTimeIndexSnafu {
192 config,
193 reason: "config format: '<field>;<type>;<config>'",
194 }
195 );
196
197 let field = parts[0].to_string();
198 match parts[1] {
199 IDENTITY_TS_EPOCH => match parts[2] {
200 NS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
201 field,
202 TimeUnit::Nanosecond,
203 ignore_errors,
204 )),
205 US_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
206 field,
207 TimeUnit::Microsecond,
208 ignore_errors,
209 )),
210 MS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
211 field,
212 TimeUnit::Millisecond,
213 ignore_errors,
214 )),
215 S_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
216 field,
217 TimeUnit::Second,
218 ignore_errors,
219 )),
220 _ => InvalidCustomTimeIndexSnafu {
221 config,
222 reason: "epoch type must be one of ns, us, ms, s",
223 }
224 .fail(),
225 },
226 IDENTITY_TS_DATESTR => Ok(IdentityTimeIndex::DateStr(
227 field,
228 parts[2].to_string(),
229 ignore_errors,
230 )),
231 _ => InvalidCustomTimeIndexSnafu {
232 config,
233 reason: "identity time index type must be one of epoch, datestr",
234 }
235 .fail(),
236 }
237 }
238
239 pub fn get_column_name(&self) -> &str {
240 match self {
241 IdentityTimeIndex::Epoch(field, _, _) => field,
242 IdentityTimeIndex::DateStr(field, _, _) => field,
243 }
244 }
245
246 pub fn get_ignore_errors(&self) -> bool {
247 match self {
248 IdentityTimeIndex::Epoch(_, _, ignore_errors) => *ignore_errors,
249 IdentityTimeIndex::DateStr(_, _, ignore_errors) => *ignore_errors,
250 }
251 }
252
253 pub fn get_datatype(&self) -> ColumnDataType {
254 match self {
255 IdentityTimeIndex::Epoch(_, unit, _) => match unit {
256 TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
257 TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
258 TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
259 TimeUnit::Second => ColumnDataType::TimestampSecond,
260 },
261 IdentityTimeIndex::DateStr(_, _, _) => ColumnDataType::TimestampNanosecond,
262 }
263 }
264
265 pub fn get_timestamp_value(&self, value: Option<&VrlValue>) -> Result<ValueData> {
266 match self {
267 IdentityTimeIndex::Epoch(_, unit, ignore_errors) => {
268 let v = match value {
269 Some(VrlValue::Integer(v)) => *v,
270 Some(VrlValue::Bytes(s)) => match String::from_utf8_lossy(s).parse::<i64>() {
271 Ok(v) => v,
272 Err(_) => {
273 return if_ignore_errors(
274 *ignore_errors,
275 *unit,
276 format!(
277 "failed to convert {} to number",
278 String::from_utf8_lossy(s)
279 ),
280 )
281 }
282 },
283 Some(VrlValue::Timestamp(timestamp)) => datetime_utc_to_unit(timestamp, unit)?,
284 Some(v) => {
285 return if_ignore_errors(
286 *ignore_errors,
287 *unit,
288 format!("unsupported value type to convert to timestamp: {}", v),
289 )
290 }
291 None => {
292 return if_ignore_errors(*ignore_errors, *unit, "missing field".to_string())
293 }
294 };
295 Ok(time_unit_to_value_data(*unit, v))
296 }
297 IdentityTimeIndex::DateStr(_, format, ignore_errors) => {
298 let v = match value {
299 Some(VrlValue::Bytes(s)) => String::from_utf8_lossy(s),
300 Some(v) => {
301 return if_ignore_errors(
302 *ignore_errors,
303 TimeUnit::Nanosecond,
304 format!("unsupported value type to convert to date string: {}", v),
305 );
306 }
307 None => {
308 return if_ignore_errors(
309 *ignore_errors,
310 TimeUnit::Nanosecond,
311 "missing field".to_string(),
312 )
313 }
314 };
315
316 let timestamp = match chrono::DateTime::parse_from_str(&v, format) {
317 Ok(ts) => ts,
318 Err(_) => {
319 return if_ignore_errors(
320 *ignore_errors,
321 TimeUnit::Nanosecond,
322 format!("failed to parse date string: {}, format: {}", v, format),
323 )
324 }
325 };
326
327 Ok(ValueData::TimestampNanosecondValue(
328 timestamp
329 .timestamp_nanos_opt()
330 .context(InvalidTimestampSnafu {
331 input: timestamp.to_rfc3339(),
332 })?,
333 ))
334 }
335 }
336 }
337}
338
339fn datetime_utc_to_unit(timestamp: &DateTime<Utc>, unit: &TimeUnit) -> Result<i64> {
340 let ts = match unit {
341 TimeUnit::Nanosecond => timestamp
342 .timestamp_nanos_opt()
343 .context(InvalidTimestampSnafu {
344 input: timestamp.to_rfc3339(),
345 })?,
346 TimeUnit::Microsecond => timestamp.timestamp_micros(),
347 TimeUnit::Millisecond => timestamp.timestamp_millis(),
348 TimeUnit::Second => timestamp.timestamp(),
349 };
350 Ok(ts)
351}
352
353fn if_ignore_errors(ignore_errors: bool, unit: TimeUnit, msg: String) -> Result<ValueData> {
354 if ignore_errors {
355 Ok(time_unit_to_value_data(
356 unit,
357 Timestamp::current_time(unit).value(),
358 ))
359 } else {
360 CastTypeSnafu { msg }.fail()
361 }
362}
363
364fn time_unit_to_value_data(unit: TimeUnit, v: i64) -> ValueData {
365 match unit {
366 TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(v),
367 TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(v),
368 TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v),
369 TimeUnit::Second => ValueData::TimestampSecondValue(v),
370 }
371}