1pub mod columnar_value;
16pub mod error;
17mod function;
18pub mod logical_plan;
19pub mod prelude;
20pub mod request;
21mod signature;
22pub mod stream;
23#[cfg(any(test, feature = "testing"))]
24pub mod test_util;
25
26use std::fmt::{Debug, Display, Formatter};
27use std::sync::Arc;
28
29use api::greptime_proto::v1::add_column_location::LocationType;
30use api::greptime_proto::v1::AddColumnLocation as Location;
31use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
32use datafusion::physical_plan::ExecutionPlan;
33use serde::{Deserialize, Serialize};
34use sqlparser_derive::{Visit, VisitMut};
35
36#[derive(Debug)]
38pub struct Output {
39 pub data: OutputData,
40 pub meta: OutputMeta,
41}
42
43pub enum OutputData {
46 AffectedRows(OutputRows),
47 RecordBatches(RecordBatches),
48 Stream(SendableRecordBatchStream),
49}
50
51#[derive(Debug, Default)]
53pub struct OutputMeta {
54 pub plan: Option<Arc<dyn ExecutionPlan>>,
56 pub cost: OutputCost,
57}
58
59impl Output {
60 pub fn new_with_affected_rows(affected_rows: OutputRows) -> Self {
61 Self {
62 data: OutputData::AffectedRows(affected_rows),
63 meta: Default::default(),
64 }
65 }
66
67 pub fn new_with_record_batches(recordbatches: RecordBatches) -> Self {
68 Self {
69 data: OutputData::RecordBatches(recordbatches),
70 meta: Default::default(),
71 }
72 }
73
74 pub fn new_with_stream(stream: SendableRecordBatchStream) -> Self {
75 Self {
76 data: OutputData::Stream(stream),
77 meta: Default::default(),
78 }
79 }
80
81 pub fn new(data: OutputData, meta: OutputMeta) -> Self {
82 Self { data, meta }
83 }
84
85 pub fn extract_rows_and_cost(&self) -> (OutputRows, OutputCost) {
86 match self.data {
87 OutputData::AffectedRows(rows) => (rows, self.meta.cost),
88 _ => (0, self.meta.cost),
89 }
90 }
91}
92
93impl Debug for OutputData {
94 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
95 match self {
96 OutputData::AffectedRows(rows) => write!(f, "OutputData::AffectedRows({rows})"),
97 OutputData::RecordBatches(recordbatches) => {
98 write!(f, "OutputData::RecordBatches({recordbatches:?})")
99 }
100 OutputData::Stream(s) => {
101 write!(f, "OutputData::Stream(<{}>)", s.name())
102 }
103 }
104 }
105}
106
107impl OutputMeta {
108 pub fn new(plan: Option<Arc<dyn ExecutionPlan>>, cost: usize) -> Self {
109 Self { plan, cost }
110 }
111
112 pub fn new_with_plan(plan: Arc<dyn ExecutionPlan>) -> Self {
113 Self {
114 plan: Some(plan),
115 cost: 0,
116 }
117 }
118
119 pub fn new_with_cost(cost: usize) -> Self {
120 Self { plan: None, cost }
121 }
122}
123
124#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
125pub enum AddColumnLocation {
126 First,
127 After { column_name: String },
128}
129
130impl Display for AddColumnLocation {
131 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
132 match self {
133 AddColumnLocation::First => write!(f, r#"FIRST"#),
134 AddColumnLocation::After { column_name } => {
135 write!(f, r#"AFTER {column_name}"#)
136 }
137 }
138 }
139}
140
141impl From<&AddColumnLocation> for Location {
142 fn from(value: &AddColumnLocation) -> Self {
143 match value {
144 AddColumnLocation::First => Location {
145 location_type: LocationType::First.into(),
146 after_column_name: String::default(),
147 },
148 AddColumnLocation::After { column_name } => Location {
149 location_type: LocationType::After.into(),
150 after_column_name: column_name.to_string(),
151 },
152 }
153 }
154}
155
156pub type OutputRows = usize;
157pub type OutputCost = usize;