1pub mod columnar_value;
16pub mod error;
17pub mod logical_plan;
18pub mod prelude;
19pub mod request;
20mod signature;
21pub mod stream;
22#[cfg(any(test, feature = "testing"))]
23pub mod test_util;
24
25use std::fmt::{Debug, Display, Formatter};
26use std::sync::Arc;
27
28use api::greptime_proto::v1::add_column_location::LocationType;
29use api::greptime_proto::v1::AddColumnLocation as Location;
30use common_recordbatch::{RecordBatches, SendableRecordBatchStream};
31use datafusion::physical_plan::ExecutionPlan;
32use serde::{Deserialize, Serialize};
33use sqlparser_derive::{Visit, VisitMut};
34
35#[derive(Debug)]
37pub struct Output {
38 pub data: OutputData,
39 pub meta: OutputMeta,
40}
41
42pub enum OutputData {
45 AffectedRows(OutputRows),
46 RecordBatches(RecordBatches),
47 Stream(SendableRecordBatchStream),
48}
49
50#[derive(Debug, Default)]
52pub struct OutputMeta {
53 pub plan: Option<Arc<dyn ExecutionPlan>>,
55 pub cost: OutputCost,
56}
57
58impl Output {
59 pub fn new_with_affected_rows(affected_rows: OutputRows) -> Self {
60 Self {
61 data: OutputData::AffectedRows(affected_rows),
62 meta: Default::default(),
63 }
64 }
65
66 pub fn new_with_record_batches(recordbatches: RecordBatches) -> Self {
67 Self {
68 data: OutputData::RecordBatches(recordbatches),
69 meta: Default::default(),
70 }
71 }
72
73 pub fn new_with_stream(stream: SendableRecordBatchStream) -> Self {
74 Self {
75 data: OutputData::Stream(stream),
76 meta: Default::default(),
77 }
78 }
79
80 pub fn new(data: OutputData, meta: OutputMeta) -> Self {
81 Self { data, meta }
82 }
83
84 pub fn extract_rows_and_cost(&self) -> (OutputRows, OutputCost) {
85 match self.data {
86 OutputData::AffectedRows(rows) => (rows, self.meta.cost),
87 _ => (0, self.meta.cost),
88 }
89 }
90}
91
92impl Debug for OutputData {
93 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
94 match self {
95 OutputData::AffectedRows(rows) => write!(f, "OutputData::AffectedRows({rows})"),
96 OutputData::RecordBatches(recordbatches) => {
97 write!(f, "OutputData::RecordBatches({recordbatches:?})")
98 }
99 OutputData::Stream(s) => {
100 write!(f, "OutputData::Stream(<{}>)", s.name())
101 }
102 }
103 }
104}
105
106impl OutputMeta {
107 pub fn new(plan: Option<Arc<dyn ExecutionPlan>>, cost: usize) -> Self {
108 Self { plan, cost }
109 }
110
111 pub fn new_with_plan(plan: Arc<dyn ExecutionPlan>) -> Self {
112 Self {
113 plan: Some(plan),
114 cost: 0,
115 }
116 }
117
118 pub fn new_with_cost(cost: usize) -> Self {
119 Self { plan: None, cost }
120 }
121}
122
123#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, Visit, VisitMut)]
124pub enum AddColumnLocation {
125 First,
126 After { column_name: String },
127}
128
129impl Display for AddColumnLocation {
130 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
131 match self {
132 AddColumnLocation::First => write!(f, r#"FIRST"#),
133 AddColumnLocation::After { column_name } => {
134 write!(f, r#"AFTER {column_name}"#)
135 }
136 }
137 }
138}
139
140impl From<&AddColumnLocation> for Location {
141 fn from(value: &AddColumnLocation) -> Self {
142 match value {
143 AddColumnLocation::First => Location {
144 location_type: LocationType::First.into(),
145 after_column_name: String::default(),
146 },
147 AddColumnLocation::After { column_name } => Location {
148 location_type: LocationType::After.into(),
149 after_column_name: column_name.to_string(),
150 },
151 }
152 }
153}
154
155pub type OutputRows = usize;
156pub type OutputCost = usize;