pipeline/
error.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::any::Any;
16
17use common_error::ext::ErrorExt;
18use common_error::status_code::StatusCode;
19use common_macro::stack_trace_debug;
20use datatypes::timestamp::TimestampNanosecond;
21use snafu::{Location, Snafu};
22use vrl::value::Kind;
23
24#[derive(Snafu)]
25#[snafu(visibility(pub))]
26#[stack_trace_debug]
27pub enum Error {
28    #[snafu(display("Empty input field"))]
29    EmptyInputField {
30        #[snafu(implicit)]
31        location: Location,
32    },
33
34    #[snafu(display("Missing input field"))]
35    MissingInputField {
36        #[snafu(implicit)]
37        location: Location,
38    },
39
40    #[snafu(display(
41        "Field renaming must be a string pair of 'key' and 'rename_to', got: {value:?}"
42    ))]
43    InvalidFieldRename {
44        value: yaml_rust::Yaml,
45        #[snafu(implicit)]
46        location: Location,
47    },
48
49    #[snafu(display("Processor must be a map"))]
50    ProcessorMustBeMap {
51        #[snafu(implicit)]
52        location: Location,
53    },
54
55    #[snafu(display("Processor {processor}: missing field: {field}"))]
56    ProcessorMissingField {
57        processor: String,
58        field: String,
59        #[snafu(implicit)]
60        location: Location,
61    },
62
63    #[snafu(display("Processor {processor}: expect string value, but got {v:?}"))]
64    ProcessorExpectString {
65        processor: String,
66        v: vrl::value::Value,
67        #[snafu(implicit)]
68        location: Location,
69    },
70
71    #[snafu(display("Processor {processor}: unsupported value {val}"))]
72    ProcessorUnsupportedValue {
73        processor: String,
74        val: String,
75        #[snafu(implicit)]
76        location: Location,
77    },
78
79    #[snafu(display("Processor key must be a string"))]
80    ProcessorKeyMustBeString {
81        #[snafu(implicit)]
82        location: Location,
83    },
84
85    #[snafu(display("Processor {kind}: failed to parse {value}"))]
86    ProcessorFailedToParseString {
87        kind: String,
88        value: String,
89        #[snafu(implicit)]
90        location: Location,
91    },
92
93    #[snafu(display("Processor must have a string key"))]
94    ProcessorMustHaveStringKey {
95        #[snafu(implicit)]
96        location: Location,
97    },
98
99    #[snafu(display("Unsupported {processor} processor"))]
100    UnsupportedProcessor {
101        processor: String,
102        #[snafu(implicit)]
103        location: Location,
104    },
105
106    #[snafu(display("Field {field} must be a {ty}"))]
107    FieldMustBeType {
108        field: String,
109        ty: String,
110        #[snafu(implicit)]
111        location: Location,
112    },
113
114    #[snafu(display("Field parse from string failed: {field}"))]
115    FailedParseFieldFromString {
116        #[snafu(source)]
117        error: Box<dyn std::error::Error + Send + Sync>,
118        field: String,
119        #[snafu(implicit)]
120        location: Location,
121    },
122
123    #[snafu(display("Failed to parse {key} as int: {value}"))]
124    FailedToParseIntKey {
125        key: String,
126        value: String,
127        #[snafu(source)]
128        error: std::num::ParseIntError,
129        #[snafu(implicit)]
130        location: Location,
131    },
132
133    #[snafu(display("Failed to parse {value} to int"))]
134    FailedToParseInt {
135        value: String,
136        #[snafu(source)]
137        error: std::num::ParseIntError,
138        #[snafu(implicit)]
139        location: Location,
140    },
141    #[snafu(display("Failed to parse {key} as float: {value}"))]
142    FailedToParseFloatKey {
143        key: String,
144        value: String,
145        #[snafu(source)]
146        error: std::num::ParseFloatError,
147        #[snafu(implicit)]
148        location: Location,
149    },
150
151    #[snafu(display("Processor {kind}: {key} not found in intermediate keys"))]
152    IntermediateKeyIndex {
153        kind: String,
154        key: String,
155        #[snafu(implicit)]
156        location: Location,
157    },
158
159    #[snafu(display("Cmcd {k} missing value in {s}"))]
160    CmcdMissingValue {
161        k: String,
162        s: String,
163        #[snafu(implicit)]
164        location: Location,
165    },
166    #[snafu(display("Part: {part} missing key in {s}"))]
167    CmcdMissingKey {
168        part: String,
169        s: String,
170        #[snafu(implicit)]
171        location: Location,
172    },
173    #[snafu(display("Key must be a string, but got {k:?}"))]
174    KeyMustBeString {
175        k: yaml_rust::Yaml,
176        #[snafu(implicit)]
177        location: Location,
178    },
179
180    #[snafu(display("Csv read error"))]
181    CsvRead {
182        #[snafu(implicit)]
183        location: Location,
184        #[snafu(source)]
185        error: csv::Error,
186    },
187    #[snafu(display("Expected at least one record from csv format, but got none"))]
188    CsvNoRecord {
189        #[snafu(implicit)]
190        location: Location,
191    },
192
193    #[snafu(display("Separator '{separator}' must be a single character, but got '{value}'"))]
194    CsvSeparatorName {
195        separator: String,
196        value: String,
197        #[snafu(implicit)]
198        location: Location,
199    },
200
201    #[snafu(display("Quote '{quote}' must be a single character, but got '{value}'"))]
202    CsvQuoteName {
203        quote: String,
204        value: String,
205        #[snafu(implicit)]
206        location: Location,
207    },
208
209    #[snafu(display("Parse date timezone error {value}"))]
210    DateParseTimezone {
211        value: String,
212        #[snafu(source)]
213        error: chrono_tz::ParseError,
214        #[snafu(implicit)]
215        location: Location,
216    },
217
218    #[snafu(display("Parse date error {value}"))]
219    DateParse {
220        value: String,
221        #[snafu(source)]
222        error: chrono::ParseError,
223        #[snafu(implicit)]
224        location: Location,
225    },
226
227    #[snafu(display("Failed to get local timezone"))]
228    DateFailedToGetLocalTimezone {
229        #[snafu(implicit)]
230        location: Location,
231    },
232
233    #[snafu(display("Invalid Pattern: '{s}'. {detail}"))]
234    DissectInvalidPattern {
235        s: String,
236        detail: String,
237        #[snafu(implicit)]
238        location: Location,
239    },
240
241    #[snafu(display("Empty pattern is not allowed"))]
242    DissectEmptyPattern {
243        #[snafu(implicit)]
244        location: Location,
245    },
246    #[snafu(display("Split: '{split}' exceeds the input"))]
247    DissectSplitExceedsInput {
248        split: String,
249        #[snafu(implicit)]
250        location: Location,
251    },
252    #[snafu(display("Split: '{split}' does not match the input '{input}'"))]
253    DissectSplitNotMatchInput {
254        split: String,
255        input: String,
256        #[snafu(implicit)]
257        location: Location,
258    },
259    #[snafu(display("Consecutive names are not allowed: '{name1}' '{name2}'"))]
260    DissectConsecutiveNames {
261        name1: String,
262        name2: String,
263        #[snafu(implicit)]
264        location: Location,
265    },
266    #[snafu(display("No matching pattern found"))]
267    DissectNoMatchingPattern {
268        #[snafu(implicit)]
269        location: Location,
270    },
271    #[snafu(display("Modifier '{m}' already set, but found {modifier}"))]
272    DissectModifierAlreadySet {
273        m: String,
274        modifier: String,
275        #[snafu(implicit)]
276        location: Location,
277    },
278
279    #[snafu(display("Append Order modifier is already set to '{n}', cannot be set to {order}"))]
280    DissectAppendOrderAlreadySet {
281        n: String,
282        order: u32,
283        #[snafu(implicit)]
284        location: Location,
285    },
286    #[snafu(display("Order can only be set to Append Modifier, current modifier is {m}"))]
287    DissectOrderOnlyAppend {
288        m: String,
289        #[snafu(implicit)]
290        location: Location,
291    },
292
293    #[snafu(display("Order can only be set to Append Modifier"))]
294    DissectOrderOnlyAppendModifier {
295        #[snafu(implicit)]
296        location: Location,
297    },
298
299    #[snafu(display("End modifier already set: '{m}'"))]
300    DissectEndModifierAlreadySet {
301        m: String,
302        #[snafu(implicit)]
303        location: Location,
304    },
305    #[snafu(display("Invalid resolution: {resolution}"))]
306    EpochInvalidResolution {
307        resolution: String,
308        #[snafu(implicit)]
309        location: Location,
310    },
311    #[snafu(display("Pattern is required"))]
312    GsubPatternRequired {
313        #[snafu(implicit)]
314        location: Location,
315    },
316    #[snafu(display("Replacement is required"))]
317    GsubReplacementRequired {
318        #[snafu(implicit)]
319        location: Location,
320    },
321    #[snafu(display("Invalid regex pattern: {pattern}"))]
322    Regex {
323        #[snafu(source)]
324        error: regex::Error,
325        pattern: String,
326        #[snafu(implicit)]
327        location: Location,
328    },
329    #[snafu(display("Separator is required"))]
330    JoinSeparatorRequired {
331        #[snafu(implicit)]
332        location: Location,
333    },
334    #[snafu(display("Invalid method: {method}"))]
335    LetterInvalidMethod {
336        method: String,
337        #[snafu(implicit)]
338        location: Location,
339    },
340    #[snafu(display("No named group found in regex {origin}"))]
341    RegexNamedGroupNotFound {
342        origin: String,
343        #[snafu(implicit)]
344        location: Location,
345    },
346    #[snafu(display("No valid field found in {processor} processor"))]
347    RegexNoValidField {
348        processor: String,
349        #[snafu(implicit)]
350        location: Location,
351    },
352    #[snafu(display("No valid pattern found in {processor} processor"))]
353    RegexNoValidPattern {
354        processor: String,
355        #[snafu(implicit)]
356        location: Location,
357    },
358    #[snafu(display("Invalid method: {s}"))]
359    UrlEncodingInvalidMethod {
360        s: String,
361        #[snafu(implicit)]
362        location: Location,
363    },
364    #[snafu(display("Wrong digest pattern: {pattern}"))]
365    DigestPatternInvalid {
366        pattern: String,
367        #[snafu(implicit)]
368        location: Location,
369    },
370    #[snafu(display("Invalid transform on_failure value: {value}"))]
371    TransformOnFailureInvalidValue {
372        value: String,
373        #[snafu(implicit)]
374        location: Location,
375    },
376    #[snafu(display("Transform element must be a map"))]
377    TransformElementMustBeMap {
378        #[snafu(implicit)]
379        location: Location,
380    },
381    #[snafu(display("Transform fields must be set."))]
382    TransformFieldMustBeSet {
383        #[snafu(implicit)]
384        location: Location,
385    },
386    #[snafu(display("Transform {fields:?} type MUST BE set."))]
387    TransformTypeMustBeSet {
388        fields: String,
389        #[snafu(implicit)]
390        location: Location,
391    },
392    #[snafu(display("Column name must be unique, but got duplicated: {duplicates}"))]
393    TransformColumnNameMustBeUnique {
394        duplicates: String,
395        #[snafu(implicit)]
396        location: Location,
397    },
398    #[snafu(display(
399        "Illegal to set multiple timestamp Index columns, please set only one: {columns}"
400    ))]
401    TransformMultipleTimestampIndex {
402        columns: String,
403        #[snafu(implicit)]
404        location: Location,
405    },
406    #[snafu(display(
407        "Transform must have exactly one field specified as timestamp Index, but got {count}: {columns}"
408    ))]
409    TransformTimestampIndexCount {
410        count: usize,
411        columns: String,
412        #[snafu(implicit)]
413        location: Location,
414    },
415    #[snafu(display(
416        "Exactly one time-related processor and one timestamp value is required to use auto transform. `ignore_missing` can not be set to true."
417    ))]
418    AutoTransformOneTimestamp {
419        #[snafu(implicit)]
420        location: Location,
421    },
422    #[snafu(display("Invalid Pipeline doc version number: {}", version))]
423    InvalidVersionNumber {
424        version: String,
425        #[snafu(implicit)]
426        location: Location,
427    },
428    #[snafu(display("Type: {ty} value not supported for Epoch"))]
429    CoerceUnsupportedEpochType {
430        ty: String,
431        #[snafu(implicit)]
432        location: Location,
433    },
434    #[snafu(display("Failed to coerce string value '{s}' to type '{ty}'"))]
435    CoerceStringToType {
436        s: String,
437        ty: String,
438        #[snafu(implicit)]
439        location: Location,
440    },
441    #[snafu(display("Can not coerce json type to {ty}"))]
442    CoerceJsonTypeTo {
443        ty: String,
444        #[snafu(implicit)]
445        location: Location,
446    },
447    #[snafu(display(
448        "Can not coerce {ty} to json type. we only consider object and array to be json types."
449    ))]
450    CoerceTypeToJson {
451        ty: String,
452        #[snafu(implicit)]
453        location: Location,
454    },
455    #[snafu(display("Failed to coerce value: {msg}"))]
456    CoerceIncompatibleTypes {
457        msg: String,
458        #[snafu(implicit)]
459        location: Location,
460    },
461    #[snafu(display(
462        "Invalid resolution: '{resolution}'. Available resolutions: {valid_resolution}"
463    ))]
464    ValueInvalidResolution {
465        resolution: String,
466        valid_resolution: String,
467        #[snafu(implicit)]
468        location: Location,
469    },
470
471    #[snafu(display("Failed to parse type: '{t}'"))]
472    ValueParseType {
473        t: String,
474        #[snafu(implicit)]
475        location: Location,
476    },
477
478    #[snafu(display("Failed to parse {ty}: {v}"))]
479    ValueParseInt {
480        ty: String,
481        v: String,
482        #[snafu(source)]
483        error: std::num::ParseIntError,
484        #[snafu(implicit)]
485        location: Location,
486    },
487
488    #[snafu(display("Failed to parse {ty}: {v}"))]
489    ValueParseFloat {
490        ty: String,
491        v: String,
492        #[snafu(source)]
493        error: std::num::ParseFloatError,
494        #[snafu(implicit)]
495        location: Location,
496    },
497
498    #[snafu(display("Failed to parse {ty}: {v}"))]
499    ValueParseBoolean {
500        ty: String,
501        v: String,
502        #[snafu(source)]
503        error: std::str::ParseBoolError,
504        #[snafu(implicit)]
505        location: Location,
506    },
507    #[snafu(display("Default value not unsupported for type {value}"))]
508    ValueDefaultValueUnsupported {
509        value: String,
510        #[snafu(implicit)]
511        location: Location,
512    },
513
514    #[snafu(display("Unsupported yaml type: {value:?}"))]
515    ValueUnsupportedYamlType {
516        value: yaml_rust::Yaml,
517        #[snafu(implicit)]
518        location: Location,
519    },
520
521    #[snafu(display("key in Hash must be a string, but got {value:?}"))]
522    ValueYamlKeyMustBeString {
523        value: yaml_rust::Yaml,
524        #[snafu(implicit)]
525        location: Location,
526    },
527
528    #[snafu(display("Yaml load error."))]
529    YamlLoad {
530        #[snafu(source)]
531        error: yaml_rust::ScanError,
532        #[snafu(implicit)]
533        location: Location,
534    },
535    #[snafu(display("Yaml parse error."))]
536    YamlParse {
537        #[snafu(implicit)]
538        location: Location,
539    },
540    #[snafu(display("Column options error"))]
541    ColumnOptions {
542        #[snafu(source)]
543        source: api::error::Error,
544        #[snafu(implicit)]
545        location: Location,
546    },
547    #[snafu(display("Unsupported index type: {value}"))]
548    UnsupportedIndexType {
549        value: String,
550        #[snafu(implicit)]
551        location: Location,
552    },
553    #[snafu(display("Failed to parse json"))]
554    JsonParse {
555        #[snafu(source)]
556        error: serde_json::Error,
557        #[snafu(implicit)]
558        location: Location,
559    },
560    #[snafu(display(
561        "Column datatype mismatch. For column: {column}, expected datatype: {expected}, actual datatype: {actual}"
562    ))]
563    IdentifyPipelineColumnTypeMismatch {
564        column: String,
565        expected: String,
566        actual: String,
567        #[snafu(implicit)]
568        location: Location,
569    },
570    #[snafu(display("Parse json path error"))]
571    JsonPathParse {
572        #[snafu(implicit)]
573        location: Location,
574        #[snafu(source)]
575        error: jsonpath_rust::JsonPathParserError,
576    },
577    #[snafu(display("Json path result index not number"))]
578    JsonPathParseResultIndex {
579        #[snafu(implicit)]
580        location: Location,
581    },
582    #[snafu(display("Field is required for dispatcher"))]
583    FieldRequiredForDispatcher,
584    #[snafu(display("Table_suffix is required for dispatcher rule"))]
585    TableSuffixRequiredForDispatcherRule,
586    #[snafu(display("Value is required for dispatcher rule"))]
587    ValueRequiredForDispatcherRule,
588
589    #[snafu(display("Pipeline table not found"))]
590    PipelineTableNotFound {
591        #[snafu(implicit)]
592        location: Location,
593    },
594
595    #[snafu(display("Failed to insert pipeline to pipelines table"))]
596    InsertPipeline {
597        #[snafu(source)]
598        source: operator::error::Error,
599        #[snafu(implicit)]
600        location: Location,
601    },
602
603    #[snafu(display("Pipeline not found, name: {}, version: {}", name, version.map(|ts| ts.0.to_iso8601_string()).unwrap_or("latest".to_string())))]
604    PipelineNotFound {
605        name: String,
606        version: Option<TimestampNanosecond>,
607        #[snafu(implicit)]
608        location: Location,
609    },
610
611    #[snafu(display(
612        "Multiple pipelines with different schemas found, but none under current schema. Please replicate one of them or delete until only one schema left. name: {}, current_schema: {}, schemas: {}",
613        name,
614        current_schema,
615        schemas,
616    ))]
617    MultiPipelineWithDiffSchema {
618        name: String,
619        current_schema: String,
620        schemas: String,
621        #[snafu(implicit)]
622        location: Location,
623    },
624
625    #[snafu(display(
626        "The return value's length of the record batch does not match, see debug log for details"
627    ))]
628    RecordBatchLenNotMatch {
629        #[snafu(implicit)]
630        location: Location,
631    },
632
633    #[snafu(display("Failed to collect record batch"))]
634    CollectRecords {
635        #[snafu(implicit)]
636        location: Location,
637        #[snafu(source)]
638        source: common_recordbatch::error::Error,
639    },
640
641    #[snafu(display("A valid table suffix template is required for tablesuffix section"))]
642    RequiredTableSuffixTemplate,
643
644    #[snafu(display("Invalid table suffix template, input: {}", input))]
645    InvalidTableSuffixTemplate {
646        input: String,
647        #[snafu(implicit)]
648        location: Location,
649    },
650
651    #[snafu(display("Failed to compile VRL, {}", msg))]
652    CompileVrl {
653        msg: String,
654        #[snafu(implicit)]
655        location: Location,
656    },
657
658    #[snafu(display("Failed to execute VRL, {}", msg))]
659    ExecuteVrl {
660        msg: String,
661        #[snafu(implicit)]
662        location: Location,
663    },
664    #[snafu(display("Invalid timestamp value: {}", input))]
665    InvalidTimestamp {
666        input: String,
667        #[snafu(implicit)]
668        location: Location,
669    },
670
671    #[snafu(display("Invalid epoch value '{}' for resolution '{}'", value, resolution))]
672    InvalidEpochForResolution {
673        value: i64,
674        resolution: String,
675        #[snafu(implicit)]
676        location: Location,
677    },
678    #[snafu(display("Please don't use regex in Vrl script"))]
679    VrlRegexValue {
680        #[snafu(implicit)]
681        location: Location,
682    },
683
684    #[snafu(display(
685        "Vrl script should return object or array in the end, got `{:?}`",
686        result_kind
687    ))]
688    VrlReturnValue {
689        result_kind: Kind,
690        #[snafu(implicit)]
691        location: Location,
692    },
693
694    #[snafu(display("Failed to cast type, msg: {}", msg))]
695    CastType {
696        msg: String,
697        #[snafu(implicit)]
698        location: Location,
699    },
700
701    #[snafu(display("Top level value must be map"))]
702    ValueMustBeMap {
703        #[snafu(implicit)]
704        location: Location,
705    },
706
707    #[snafu(display(
708        "Array element at index {index} must be an object for one-to-many transformation, got {actual_type}"
709    ))]
710    ArrayElementMustBeObject {
711        index: usize,
712        actual_type: String,
713        #[snafu(implicit)]
714        location: Location,
715    },
716
717    #[snafu(display("Failed to transform array element at index {index}: {source}"))]
718    TransformArrayElement {
719        index: usize,
720        #[snafu(source)]
721        source: Box<Error>,
722        #[snafu(implicit)]
723        location: Location,
724    },
725
726    #[snafu(display("Failed to build DataFusion logical plan"))]
727    BuildDfLogicalPlan {
728        #[snafu(source)]
729        error: datafusion_common::DataFusionError,
730        #[snafu(implicit)]
731        location: Location,
732    },
733
734    #[snafu(display("Failed to execute internal statement"))]
735    ExecuteInternalStatement {
736        #[snafu(source)]
737        source: query::error::Error,
738        #[snafu(implicit)]
739        location: Location,
740    },
741
742    #[snafu(display("Failed to create dataframe"))]
743    DataFrame {
744        #[snafu(source)]
745        source: query::error::Error,
746        #[snafu(implicit)]
747        location: Location,
748    },
749
750    #[snafu(display("General catalog error"))]
751    Catalog {
752        #[snafu(source)]
753        source: catalog::error::Error,
754        #[snafu(implicit)]
755        location: Location,
756    },
757
758    #[snafu(display("Failed to create table"))]
759    CreateTable {
760        #[snafu(source)]
761        source: operator::error::Error,
762        #[snafu(implicit)]
763        location: Location,
764    },
765
766    #[snafu(display("Invalid pipeline version format: {}", version))]
767    InvalidPipelineVersion {
768        version: String,
769        #[snafu(implicit)]
770        location: Location,
771    },
772
773    #[snafu(display("Invalid custom time index config: {}, reason: {}", config, reason))]
774    InvalidCustomTimeIndex {
775        config: String,
776        reason: String,
777        #[snafu(implicit)]
778        location: Location,
779    },
780
781    #[snafu(display("Pipeline is required for this API."))]
782    PipelineMissing {
783        #[snafu(implicit)]
784        location: Location,
785    },
786
787    #[snafu(display("Time index must be non null."))]
788    TimeIndexMustBeNonNull {
789        #[snafu(implicit)]
790        location: Location,
791    },
792
793    #[snafu(display("Float is NaN"))]
794    FloatIsNan {
795        #[snafu(source)]
796        error: ordered_float::FloatIsNan,
797        #[snafu(implicit)]
798        location: Location,
799    },
800
801    #[snafu(display("Unsupported type in pipeline: {}", ty))]
802    UnsupportedTypeInPipeline {
803        ty: String,
804        #[snafu(implicit)]
805        location: Location,
806    },
807
808    #[snafu(transparent)]
809    GreptimeProto {
810        source: api::error::Error,
811        #[snafu(implicit)]
812        location: Location,
813    },
814
815    #[snafu(transparent)]
816    Datatypes {
817        source: datatypes::error::Error,
818        #[snafu(implicit)]
819        location: Location,
820    },
821}
822
823pub type Result<T> = std::result::Result<T, Error>;
824
825impl ErrorExt for Error {
826    fn status_code(&self) -> StatusCode {
827        use Error::*;
828        match self {
829            CastType { .. } => StatusCode::Unexpected,
830            PipelineTableNotFound { .. } => StatusCode::TableNotFound,
831            InsertPipeline { source, .. } => source.status_code(),
832            CollectRecords { source, .. } => source.status_code(),
833            PipelineNotFound { .. }
834            | InvalidPipelineVersion { .. }
835            | InvalidCustomTimeIndex { .. }
836            | TimeIndexMustBeNonNull { .. } => StatusCode::InvalidArguments,
837            MultiPipelineWithDiffSchema { .. }
838            | ValueMustBeMap { .. }
839            | ArrayElementMustBeObject { .. } => StatusCode::IllegalState,
840            TransformArrayElement { source, .. } => source.status_code(),
841            BuildDfLogicalPlan { .. } | RecordBatchLenNotMatch { .. } => StatusCode::Internal,
842            ExecuteInternalStatement { source, .. } => source.status_code(),
843            DataFrame { source, .. } => source.status_code(),
844            Catalog { source, .. } => source.status_code(),
845            CreateTable { source, .. } => source.status_code(),
846
847            EmptyInputField { .. }
848            | MissingInputField { .. }
849            | InvalidFieldRename { .. }
850            | ProcessorMustBeMap { .. }
851            | ProcessorMissingField { .. }
852            | ProcessorExpectString { .. }
853            | ProcessorUnsupportedValue { .. }
854            | ProcessorKeyMustBeString { .. }
855            | ProcessorFailedToParseString { .. }
856            | ProcessorMustHaveStringKey { .. }
857            | UnsupportedProcessor { .. }
858            | FieldMustBeType { .. }
859            | FailedParseFieldFromString { .. }
860            | FailedToParseIntKey { .. }
861            | FailedToParseInt { .. }
862            | FailedToParseFloatKey { .. }
863            | IntermediateKeyIndex { .. }
864            | CmcdMissingValue { .. }
865            | CmcdMissingKey { .. }
866            | KeyMustBeString { .. }
867            | CsvRead { .. }
868            | CsvNoRecord { .. }
869            | CsvSeparatorName { .. }
870            | CsvQuoteName { .. }
871            | DateParseTimezone { .. }
872            | DateParse { .. }
873            | DateFailedToGetLocalTimezone { .. }
874            | DissectInvalidPattern { .. }
875            | DissectEmptyPattern { .. }
876            | DissectSplitExceedsInput { .. }
877            | DissectSplitNotMatchInput { .. }
878            | DissectConsecutiveNames { .. }
879            | DissectNoMatchingPattern { .. }
880            | DissectModifierAlreadySet { .. }
881            | DissectAppendOrderAlreadySet { .. }
882            | DissectOrderOnlyAppend { .. }
883            | DissectOrderOnlyAppendModifier { .. }
884            | DissectEndModifierAlreadySet { .. }
885            | EpochInvalidResolution { .. }
886            | GsubPatternRequired { .. }
887            | GsubReplacementRequired { .. }
888            | Regex { .. }
889            | JoinSeparatorRequired { .. }
890            | LetterInvalidMethod { .. }
891            | RegexNamedGroupNotFound { .. }
892            | RegexNoValidField { .. }
893            | RegexNoValidPattern { .. }
894            | UrlEncodingInvalidMethod { .. }
895            | DigestPatternInvalid { .. }
896            | TransformOnFailureInvalidValue { .. }
897            | TransformElementMustBeMap { .. }
898            | TransformFieldMustBeSet { .. }
899            | TransformTypeMustBeSet { .. }
900            | TransformColumnNameMustBeUnique { .. }
901            | TransformMultipleTimestampIndex { .. }
902            | TransformTimestampIndexCount { .. }
903            | AutoTransformOneTimestamp { .. }
904            | InvalidVersionNumber { .. }
905            | CoerceUnsupportedEpochType { .. }
906            | CoerceStringToType { .. }
907            | CoerceJsonTypeTo { .. }
908            | CoerceTypeToJson { .. }
909            | CoerceIncompatibleTypes { .. }
910            | ValueInvalidResolution { .. }
911            | ValueParseType { .. }
912            | ValueParseInt { .. }
913            | ValueParseFloat { .. }
914            | ValueParseBoolean { .. }
915            | ValueDefaultValueUnsupported { .. }
916            | ValueUnsupportedYamlType { .. }
917            | ValueYamlKeyMustBeString { .. }
918            | YamlLoad { .. }
919            | YamlParse { .. }
920            | ColumnOptions { .. }
921            | UnsupportedIndexType { .. }
922            | IdentifyPipelineColumnTypeMismatch { .. }
923            | JsonParse { .. }
924            | JsonPathParse { .. }
925            | JsonPathParseResultIndex { .. }
926            | FieldRequiredForDispatcher
927            | TableSuffixRequiredForDispatcherRule
928            | ValueRequiredForDispatcherRule
929            | RequiredTableSuffixTemplate
930            | InvalidTableSuffixTemplate { .. }
931            | CompileVrl { .. }
932            | ExecuteVrl { .. }
933            | InvalidTimestamp { .. }
934            | VrlRegexValue { .. }
935            | VrlReturnValue { .. }
936            | PipelineMissing { .. } => StatusCode::InvalidArguments,
937
938            FloatIsNan { .. }
939            | InvalidEpochForResolution { .. }
940            | UnsupportedTypeInPipeline { .. } => StatusCode::InvalidArguments,
941
942            GreptimeProto { source, .. } => source.status_code(),
943            Datatypes { source, .. } => source.status_code(),
944        }
945    }
946
947    fn as_any(&self) -> &dyn Any {
948        self
949    }
950}