1use std::sync::Arc;
16
17use api::v1::value::ValueData;
18use api::v1::ColumnDataType;
19use common_time::timestamp::TimeUnit;
20use common_time::Timestamp;
21use datatypes::timestamp::TimestampNanosecond;
22use itertools::Itertools;
23use session::context::Channel;
24use snafu::ensure;
25use util::to_pipeline_version;
26
27use crate::error::{CastTypeSnafu, InvalidCustomTimeIndexSnafu, PipelineMissingSnafu, Result};
28use crate::etl::value::time::{MS_RESOLUTION, NS_RESOLUTION, S_RESOLUTION, US_RESOLUTION};
29use crate::table::PipelineTable;
30use crate::{GreptimePipelineParams, Pipeline, Value};
31
32mod pipeline_cache;
33pub mod pipeline_operator;
34pub mod table;
35pub mod util;
36
37pub type PipelineVersion = Option<TimestampNanosecond>;
43
44pub type PipelineInfo = (Timestamp, PipelineRef);
46
47pub type PipelineTableRef = Arc<PipelineTable>;
48pub type PipelineRef = Arc<Pipeline>;
49
50#[derive(Default)]
53pub struct SelectInfo {
54 pub keys: Vec<String>,
55}
56
57impl From<String> for SelectInfo {
62 fn from(value: String) -> Self {
63 let mut keys: Vec<String> = value.split(',').map(|s| s.to_string()).sorted().collect();
64 keys.dedup();
65
66 SelectInfo { keys }
67 }
68}
69
70impl SelectInfo {
71 pub fn is_empty(&self) -> bool {
72 self.keys.is_empty()
73 }
74}
75
76pub const GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME: &str = "greptime_identity";
77pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME: &str = "greptime_trace_v0";
78pub const GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME: &str = "greptime_trace_v1";
79
80#[derive(Debug, Clone)]
83pub enum PipelineDefinition {
84 Resolved(Arc<Pipeline>),
85 ByNameAndValue((String, PipelineVersion)),
86 GreptimeIdentityPipeline(Option<IdentityTimeIndex>),
87}
88
89impl PipelineDefinition {
90 pub fn from_name(
91 name: &str,
92 version: PipelineVersion,
93 custom_time_index: Option<(String, bool)>,
94 ) -> Result<Self> {
95 if name == GREPTIME_INTERNAL_IDENTITY_PIPELINE_NAME {
96 Ok(Self::GreptimeIdentityPipeline(
97 custom_time_index
98 .map(|(config, ignore_errors)| {
99 IdentityTimeIndex::from_config(config, ignore_errors)
100 })
101 .transpose()?,
102 ))
103 } else {
104 Ok(Self::ByNameAndValue((name.to_owned(), version)))
105 }
106 }
107
108 pub fn is_identity(&self) -> bool {
109 matches!(self, Self::GreptimeIdentityPipeline(_))
110 }
111
112 pub fn get_custom_ts(&self) -> Option<&IdentityTimeIndex> {
113 if let Self::GreptimeIdentityPipeline(custom_ts) = self {
114 custom_ts.as_ref()
115 } else {
116 None
117 }
118 }
119}
120
121pub struct PipelineContext<'a> {
122 pub pipeline_definition: &'a PipelineDefinition,
123 pub pipeline_param: &'a GreptimePipelineParams,
124 pub channel: Channel,
125}
126
127impl<'a> PipelineContext<'a> {
128 pub fn new(
129 pipeline_definition: &'a PipelineDefinition,
130 pipeline_param: &'a GreptimePipelineParams,
131 channel: Channel,
132 ) -> Self {
133 Self {
134 pipeline_definition,
135 pipeline_param,
136 channel,
137 }
138 }
139}
140pub enum PipelineWay {
141 OtlpLogDirect(Box<SelectInfo>),
142 Pipeline(PipelineDefinition),
143 OtlpTraceDirectV0,
144 OtlpTraceDirectV1,
145}
146
147impl PipelineWay {
148 pub fn from_name_and_default(
149 name: Option<&str>,
150 version: Option<&str>,
151 default_pipeline: Option<PipelineWay>,
152 ) -> Result<PipelineWay> {
153 if let Some(pipeline_name) = name {
154 if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V1_NAME {
155 Ok(PipelineWay::OtlpTraceDirectV1)
156 } else if pipeline_name == GREPTIME_INTERNAL_TRACE_PIPELINE_V0_NAME {
157 Ok(PipelineWay::OtlpTraceDirectV0)
158 } else {
159 Ok(PipelineWay::Pipeline(PipelineDefinition::from_name(
160 pipeline_name,
161 to_pipeline_version(version)?,
162 None,
163 )?))
164 }
165 } else if let Some(default_pipeline) = default_pipeline {
166 Ok(default_pipeline)
167 } else {
168 PipelineMissingSnafu.fail()
169 }
170 }
171}
172
173const IDENTITY_TS_EPOCH: &str = "epoch";
174const IDENTITY_TS_DATESTR: &str = "datestr";
175
176#[derive(Debug, Clone)]
177pub enum IdentityTimeIndex {
178 Epoch(String, TimeUnit, bool),
179 DateStr(String, String, bool),
180}
181
182impl IdentityTimeIndex {
183 pub fn from_config(config: String, ignore_errors: bool) -> Result<Self> {
184 let parts = config.split(';').collect::<Vec<&str>>();
185 ensure!(
186 parts.len() == 3,
187 InvalidCustomTimeIndexSnafu {
188 config,
189 reason: "config format: '<field>;<type>;<config>'",
190 }
191 );
192
193 let field = parts[0].to_string();
194 match parts[1] {
195 IDENTITY_TS_EPOCH => match parts[2] {
196 NS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
197 field,
198 TimeUnit::Nanosecond,
199 ignore_errors,
200 )),
201 US_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
202 field,
203 TimeUnit::Microsecond,
204 ignore_errors,
205 )),
206 MS_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
207 field,
208 TimeUnit::Millisecond,
209 ignore_errors,
210 )),
211 S_RESOLUTION => Ok(IdentityTimeIndex::Epoch(
212 field,
213 TimeUnit::Second,
214 ignore_errors,
215 )),
216 _ => InvalidCustomTimeIndexSnafu {
217 config,
218 reason: "epoch type must be one of ns, us, ms, s",
219 }
220 .fail(),
221 },
222 IDENTITY_TS_DATESTR => Ok(IdentityTimeIndex::DateStr(
223 field,
224 parts[2].to_string(),
225 ignore_errors,
226 )),
227 _ => InvalidCustomTimeIndexSnafu {
228 config,
229 reason: "identity time index type must be one of epoch, datestr",
230 }
231 .fail(),
232 }
233 }
234
235 pub fn get_column_name(&self) -> &String {
236 match self {
237 IdentityTimeIndex::Epoch(field, _, _) => field,
238 IdentityTimeIndex::DateStr(field, _, _) => field,
239 }
240 }
241
242 pub fn get_ignore_errors(&self) -> bool {
243 match self {
244 IdentityTimeIndex::Epoch(_, _, ignore_errors) => *ignore_errors,
245 IdentityTimeIndex::DateStr(_, _, ignore_errors) => *ignore_errors,
246 }
247 }
248
249 pub fn get_datatype(&self) -> ColumnDataType {
250 match self {
251 IdentityTimeIndex::Epoch(_, unit, _) => match unit {
252 TimeUnit::Nanosecond => ColumnDataType::TimestampNanosecond,
253 TimeUnit::Microsecond => ColumnDataType::TimestampMicrosecond,
254 TimeUnit::Millisecond => ColumnDataType::TimestampMillisecond,
255 TimeUnit::Second => ColumnDataType::TimestampSecond,
256 },
257 IdentityTimeIndex::DateStr(_, _, _) => ColumnDataType::TimestampNanosecond,
258 }
259 }
260
261 pub fn get_timestamp(&self, value: Option<&Value>) -> Result<ValueData> {
262 match self {
263 IdentityTimeIndex::Epoch(_, unit, ignore_errors) => {
264 let v = match value {
265 Some(Value::Int32(v)) => *v as i64,
266 Some(Value::Int64(v)) => *v,
267 Some(Value::Uint32(v)) => *v as i64,
268 Some(Value::Uint64(v)) => *v as i64,
269 Some(Value::String(s)) => match s.parse::<i64>() {
270 Ok(v) => v,
271 Err(_) => {
272 return if_ignore_errors(
273 *ignore_errors,
274 *unit,
275 format!("failed to convert {} to number", s),
276 )
277 }
278 },
279 Some(Value::Timestamp(timestamp)) => timestamp.to_unit(unit),
280 Some(v) => {
281 return if_ignore_errors(
282 *ignore_errors,
283 *unit,
284 format!("unsupported value type to convert to timestamp: {}", v),
285 )
286 }
287 None => {
288 return if_ignore_errors(*ignore_errors, *unit, "missing field".to_string())
289 }
290 };
291 Ok(time_unit_to_value_data(*unit, v))
292 }
293 IdentityTimeIndex::DateStr(_, format, ignore_errors) => {
294 let v = match value {
295 Some(Value::String(s)) => s,
296 Some(v) => {
297 return if_ignore_errors(
298 *ignore_errors,
299 TimeUnit::Nanosecond,
300 format!("unsupported value type to convert to date string: {}", v),
301 );
302 }
303 None => {
304 return if_ignore_errors(
305 *ignore_errors,
306 TimeUnit::Nanosecond,
307 "missing field".to_string(),
308 )
309 }
310 };
311
312 let timestamp = match chrono::DateTime::parse_from_str(v, format) {
313 Ok(ts) => ts,
314 Err(_) => {
315 return if_ignore_errors(
316 *ignore_errors,
317 TimeUnit::Nanosecond,
318 format!("failed to parse date string: {}, format: {}", v, format),
319 )
320 }
321 };
322
323 Ok(ValueData::TimestampNanosecondValue(
324 timestamp.timestamp_nanos_opt().unwrap_or_default(),
325 ))
326 }
327 }
328 }
329}
330
331fn if_ignore_errors(ignore_errors: bool, unit: TimeUnit, msg: String) -> Result<ValueData> {
332 if ignore_errors {
333 Ok(time_unit_to_value_data(
334 unit,
335 Timestamp::current_time(unit).value(),
336 ))
337 } else {
338 CastTypeSnafu { msg }.fail()
339 }
340}
341
342fn time_unit_to_value_data(unit: TimeUnit, v: i64) -> ValueData {
343 match unit {
344 TimeUnit::Nanosecond => ValueData::TimestampNanosecondValue(v),
345 TimeUnit::Microsecond => ValueData::TimestampMicrosecondValue(v),
346 TimeUnit::Millisecond => ValueData::TimestampMillisecondValue(v),
347 TimeUnit::Second => ValueData::TimestampSecondValue(v),
348 }
349}