1use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::ColumnDataType;
19use common_time::timestamp::TimeUnit;
20use common_time::Timestamp;
21use datatypes::timestamp::TimestampNanosecond;
22use itertools::Itertools;
23use snafu::ensure;
24use util::to_pipeline_version;
25
26use crate::error::{CastTypeSnafu, InvalidCustomTimeIndexSnafu, PipelineMissingSnafu, Result};
27use crate::etl::value::time::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RESOLUTION};
28use crate::table::PipelineTable;
29use crate::{GreptimePipelineParams, Pipeline, Value};
30
31pub mod pipeline_operator;
32pub mod table;
33pub mod util;
34
35pub type PipelineVersion = Option<TimestampNanosecond>;
41
42pub type PipelineInfo = (Timestamp, PipelineRef);
44
45pub type PipelineTableRef = Arc<PipelineTable>;
46pub type PipelineRef = Arc<Pipeline>;
47
48#[derive(Default)]
51pub struct SelectInfo {
52 pub keys: Vec<String>,
53}
54
55impl From<String> for SelectInfo {
60 fn from(value: String) -> Self {
61 let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
62 keys.dedup();
63
64 SelectInfo { keys }
65 }
66}
67
68impl SelectInfo {
69 pub fn is_empty(&self) -> bool {
70 self.keys.is_empty()
71 }
72}
73
74pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
75pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME: &str = "greptime_trace_v0";
76pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1";
77
78#[derive(Debug, Clone)]
81pub enum PipelineDefinition {
82 Resolved(Arc<Pipeline>),
83 ByNameAndValue((String, PipelineVersion)),
84 GreptimeIdentityPipeline(Option<IdentityTimeIndex>),
85}
86
87impl PipelineDefinition {
88 pub fn from_name(
89 name: &str,
90 version: PipelineVersion,
91 custom_time_index: Option<(String, bool)>,
92 ) -> Result<Self> {
93 if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
94 Ok(Self::GreptimeIdentityPipeline(
95 custom_time_index
96 .map(|(config, ignore_errors)| {
97 IdentityTimeIndex::from_config(config, ignore_errors)
98 })
99 .transpose()?,
100 ))
101 } else {
102 Ok(Self::ByNameAndValue((name.to_owned(), version)))
103 }
104 }
105
106 pub fn is_identity(&self) -> bool {
107 matches!(self, Self::GreptimeIdentityPipeline(_))
108 }
109
110 pub fn get_custom_ts(&self) -> Option<&IdentityTimeIndex> {
111 if let Self::GreptimeIdentityPipeline(custom_ts) = self {
112 custom_ts.as_ref()
113 } else {
114 None
115 }
116 }
117}
118
119pub struct PipelineContext<'a> {
120 pub pipeline_definition: &'a PipelineDefinition,
121 pub pipeline_param: &'a GreptimePipelineParams,
122}
123
124impl<'a> PipelineContext<'a> {
125 pub fn new(
126 pipeline_definition: &'a PipelineDefinition,
127 pipeline_param: &'a GreptimePipelineParams,
128 ) -> Self {
129 Self {
130 pipeline_definition,
131 pipeline_param,
132 }
133 }
134}
135pub enum PipelineWay {
136 OtlpLogDirect(Box<SelectInfo>),
137 Pipeline(PipelineDefinition),
138 OtlpTraceDirectV0,
139 OtlpTraceDirectV1,
140}
141
142impl PipelineWay {
143 pub fn from_name_and_default(
144 name: Option<&str>,
145 version: Option<&str>,
146 default_pipeline: Option<PipelineWay>,
147 ) -> Result<PipelineWay> {
148 if let Some(pipeline_name) = name {
149 if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
150 Ok(PipelineWay::OtlpTraceDirectV1)
151 } else if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME {
152 Ok(PipelineWay::OtlpTraceDirectV0)
153 } else {
154 Ok(PipelineWay::Pipeline(PipelineDefinition::from_name(
155 pipeline_name,
156 to_pipeline_version(version)?,
157 None,
158 )?))
159 }
160 } else if let Some(default_pipeline) = default_pipeline {
161 Ok(default_pipeline)
162 } else {
163 PipelineMissingSnafu.fail()
164 }
165 }
166}
167
168const IDENTITY_TS_EPOCH: &str = "epoch";
169const IDENTITY_TS_DATESTR: &str = "datestr";
170
171#[derive(Debug, Clone)]
172pub enum IdentityTimeIndex {
173 Epoch(String, TimeUnit, bool),
174 DateStr(String, String, bool),
175}
176
177impl IdentityTimeIndex {
178 pub fn from_config(config: String, ignore_errors: bool) -> Result<Self> {
179 let parts = config.split(';').collect::<Vec<&str>>();
180 ensure!(
181 parts.len() == 3,
182 InvalidCustomTimeIndexSnafu {
183 config,
184 reason: "config format: '<field>;<type>;<config>'",
185 }
186 );
187
188 let field = parts[0].to_string();
189 match parts[1] {
190 IDENTITY_TS_EPOCH => match parts[2] {
191 NS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
192 field,
193 TimeUnit::Nanosecond,
194 ignore_errors,
195 )),
196 US_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
197 field,
198 TimeUnit::Microsecond,
199 ignore_errors,
200 )),
201 MS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
202 field,
203 TimeUnit::Millisecond,
204 ignore_errors,
205 )),
206 S_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
207 field,
208 TimeUnit::Second,
209 ignore_errors,
210 )),
211 _ => InvalidCustomTimeIndexSnafu {
212 config,
213 reason: "epoch type must be one of ns, us, ms, s",
214 }
215 .fail(),
216 },
217 IDENTITY_TS_DATESTR => Ok(IdentityTimeIndex::DateStr(
218 field,
219 parts[2].to_string(),
220 ignore_errors,
221 )),
222 _ => InvalidCustomTimeIndexSnafu {
223 config,
224 reason: "identity time index type must be one of epoch, datestr",
225 }
226 .fail(),
227 }
228 }
229
230 pub fn get_column_name(&self) -> &String {
231 match self {
232 IdentityTimeIndex::Epoch(field, _, _) => field,
233 IdentityTimeIndex::DateStr(field, _, _) => field,
234 }
235 }
236
237 pub fn get_ignore_errors(&self) -> bool {
238 match self {
239 IdentityTimeIndex::Epoch(_, _, ignore_errors) => *ignore_errors,
240 IdentityTimeIndex::DateStr(_, _, ignore_errors) => *ignore_errors,
241 }
242 }
243
244 pub fn get_datatype(&self) -> ColumnDataType {
245 match self {
246 IdentityTimeIndex::Epoch(_, unit, _) => match unit {
247 TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
248 TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
249 TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
250 TimeUnit::Second => ColumnDataType::TimestampSecond,
251 },
252 IdentityTimeIndex::DateStr(_, _, _) => ColumnDataType::TimestampNanosecond,
253 }
254 }
255
256 pub fn get_timestamp(&self, value: Option<&Value>) -> Result<ValueData> {
257 match self {
258 IdentityTimeIndex::Epoch(_, unit, ignore_errors) => {
259 let v = match value {
260 Some(Value::Int32(v)) => *v as i64,
261 Some(Value::Int64(v)) => *v,
262 Some(Value::Uint32(v)) => *v as i64,
263 Some(Value::Uint64(v)) => *v as i64,
264 Some(Value::String(s)) => match s.parse::<i64>() {
265 Ok(v) => v,
266 Err(_) => {
267 return if_ignore_errors(
268 *ignore_errors,
269 *unit,
270 format!("failed to convert {} to number", s),
271 )
272 }
273 },
274 Some(Value::Timestamp(timestamp)) => timestamp.to_unit(unit),
275 Some(v) => {
276 return if_ignore_errors(
277 *ignore_errors,
278 *unit,
279 format!("unsupported value type to convert to timestamp: {}", v),
280 )
281 }
282 None => {
283 return if_ignore_errors(*ignore_errors, *unit, "missing field".to_string())
284 }
285 };
286 Ok(time_unit_to_value_data(*unit, v))
287 }
288 IdentityTimeIndex::DateStr(_, format, ignore_errors) => {
289 let v = match value {
290 Some(Value::String(s)) => s,
291 Some(v) => {
292 return if_ignore_errors(
293 *ignore_errors,
294 TimeUnit::Nanosecond,
295 format!("unsupported value type to convert to date string: {}", v),
296 );
297 }
298 None => {
299 return if_ignore_errors(
300 *ignore_errors,
301 TimeUnit::Nanosecond,
302 "missing field".to_string(),
303 )
304 }
305 };
306
307 let timestamp = match chrono::DateTime::parse_from_str(v, format) {
308 Ok(ts) => ts,
309 Err(_) => {
310 return if_ignore_errors(
311 *ignore_errors,
312 TimeUnit::Nanosecond,
313 format!("failed to parse date string: {}, format: {}", v, format),
314 )
315 }
316 };
317
318 Ok(ValueData::TimestampNanosecondValue(
319 timestamp.timestamp_nanos_opt().unwrap_or_default(),
320 ))
321 }
322 }
323 }
324}
325
326fn if_ignore_errors(ignore_errors: bool, unit: TimeUnit, msg: String) -> Result<ValueData> {
327 if ignore_errors {
328 Ok(time_unit_to_value_data(
329 unit,
330 Timestamp::current_time(unit).value(),
331 ))
332 } else {
333 CastTypeSnafu { msg }.fail()
334 }
335}
336
337fn time_unit_to_value_data(unit: TimeUnit, v: i64) -> ValueData {
338 match unit {
339 TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(v),
340 TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(v),
341 TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v),
342 TimeUnit::Second => ValueData::TimestampSecondValue(v),
343 }
344}