1pub mod columnar_value;
16pub mod error;
17pub mod logical_plan;
18pub mod prelude;
19pub mod request;
20pub mod stream;
21#[cfg(any(test, feature = "testing"))]
22pub mod test_util;
23
24use std::fmt::{Debug, Display, Formatter};
25use std::sync::Arc;
26
27use api::greptime_proto::v1::AddColumnLocation as Location;
28use api::greptime_proto::v1::add_column_location::LocationType;
29use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
30use datafusion::physical_plan::ExecutionPlan;
31use serde::{Deserialize, Serialize};
32use sqlparser_derive::{Visit, VisitMut};
33
34#[derive(Debug)]
36pub struct Output {
37 pub data: OutputData,
38 pub meta: OutputMeta,
39}
40
41pub enum OutputData {
44 AffectedRows(OutputRows),
45 RecordBatches(RecordBatches),
46 Stream(SendableRecordBatchStream),
47}
48
49#[derive(Debug, Default)]
51pub struct OutputMeta {
52 pub plan: Option<Arc<dyn ExecutionPlan>>,
54 pub cost: OutputCost,
55}
56
57impl Output {
58 pub fn new_with_affected_rows(affected_rows: OutputRows) -> Self {
59 Self {
60 data: OutputData::AffectedRows(affected_rows),
61 meta: Default::default(),
62 }
63 }
64
65 pub fn new_with_record_batches(recordbatches: RecordBatches) -> Self {
66 Self {
67 data: OutputData::RecordBatches(recordbatches),
68 meta: Default::default(),
69 }
70 }
71
72 pub fn new_with_stream(stream: SendableRecordBatchStream) -> Self {
73 Self {
74 data: OutputData::Stream(stream),
75 meta: Default::default(),
76 }
77 }
78
79 pub fn new(data: OutputData, meta: OutputMeta) -> Self {
80 Self { data, meta }
81 }
82
83 pub fn extract_rows_and_cost(&self) -> (OutputRows, OutputCost) {
84 match self.data {
85 OutputData::AffectedRows(rows) => (rows, self.meta.cost),
86 _ => (0, self.meta.cost),
87 }
88 }
89}
90
91impl Debug for OutputData {
92 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
93 match self {
94 OutputData::AffectedRows(rows) => write!(f, "OutputData::AffectedRows({rows})"),
95 OutputData::RecordBatches(recordbatches) => {
96 write!(f, "OutputData::RecordBatches({recordbatches:?})")
97 }
98 OutputData::Stream(s) => {
99 write!(f, "OutputData::Stream(<{}>)", s.name())
100 }
101 }
102 }
103}
104
105impl OutputMeta {
106 pub fn new(plan: Option<Arc<dyn ExecutionPlan>>, cost: usize) -> Self {
107 Self { plan, cost }
108 }
109
110 pub fn new_with_plan(plan: Arc<dyn ExecutionPlan>) -> Self {
111 Self {
112 plan: Some(plan),
113 cost: 0,
114 }
115 }
116
117 pub fn new_with_cost(cost: usize) -> Self {
118 Self { plan: None, cost }
119 }
120}
121
122#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
123pub enum AddColumnLocation {
124 First,
125 After { column_name: String },
126}
127
128impl Display for AddColumnLocation {
129 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
130 match self {
131 AddColumnLocation::First => write!(f, r#"FIRST"#),
132 AddColumnLocation::After { column_name } => {
133 write!(f, r#"AFTER {column_name}"#)
134 }
135 }
136 }
137}
138
139impl From<&AddColumnLocation> for Location {
140 fn from(value: &AddColumnLocation) -> Self {
141 match value {
142 AddColumnLocation::First => Location {
143 location_type: LocationType::First.into(),
144 after_column_name: String::default(),
145 },
146 AddColumnLocation::After { column_name } => Location {
147 location_type: LocationType::After.into(),
148 after_column_name: column_name.to_string(),
149 },
150 }
151 }
152}
153
154pub type OutputRows = usize;
155pub type OutputCost = usize;