1use common_telemetry::debug;
16use datatypes::data_type::DataType;
17use snafu::{ensure, ResultExt};
18use sqlx::MySqlPool;
19
20use crate::error::{self, Result, UnexpectedSnafu};
21use crate::ir::create_expr::ColumnOption;
22use crate::ir::{Column, Ident};
23
24#[derive(Debug, sqlx::FromRow)]
25pub struct ColumnEntry {
26 pub table_schema: String,
27 pub table_name: String,
28 pub column_name: String,
29 pub data_type: String,
30 pub semantic_type: String,
31 pub column_default: Option<String>,
32 pub is_nullable: String,
33}
34
35fn is_nullable(str: &str) -> bool {
36 str.to_uppercase() == "YES"
37}
38
39enum SemanticType {
40 Timestamp,
41 Field,
42 Tag,
43}
44
45fn semantic_type(str: &str) -> Option<SemanticType> {
46 match str {
47 "TIMESTAMP" => Some(SemanticType::Timestamp),
48 "FIELD" => Some(SemanticType::Field),
49 "TAG" => Some(SemanticType::Tag),
50 _ => None,
51 }
52}
53
54impl PartialEq<Column> for ColumnEntry {
55 fn eq(&self, other: &Column) -> bool {
56 if other.name.value != self.column_name {
58 debug!(
59 "expected name: {}, got: {}",
60 other.name.value, self.column_name
61 );
62 return false;
63 }
64 if other.column_type.name() != self.data_type {
66 debug!(
67 "expected column_type: {}, got: {}",
68 other.column_type.name(),
69 self.data_type
70 );
71 return false;
72 }
73 match &self.column_default {
75 Some(value) => {
76 let default_value_opt = other.options.iter().find(|opt| {
77 matches!(
78 opt,
79 ColumnOption::DefaultFn(_) | ColumnOption::DefaultValue(_)
80 )
81 });
82 if default_value_opt.is_none() {
83 debug!("default value options is not found");
84 return false;
85 }
86 let default_value = match default_value_opt.unwrap() {
87 ColumnOption::DefaultValue(v) => v.to_string(),
88 ColumnOption::DefaultFn(f) => f.to_string(),
89 _ => unreachable!(),
90 };
91 if &default_value != value {
92 debug!("expected default value: {default_value}, got: {value}");
93 return false;
94 }
95 }
96 None => {
97 if other.options.iter().any(|opt| {
98 matches!(
99 opt,
100 ColumnOption::DefaultFn(_) | ColumnOption::DefaultValue(_)
101 )
102 }) {
103 return false;
104 }
105 }
106 };
107 if is_nullable(&self.is_nullable) {
109 if other
111 .options
112 .iter()
113 .any(|opt| matches!(opt, ColumnOption::NotNull))
114 {
115 debug!("ColumnOption::NotNull is found");
116 return false;
117 }
118 } else {
119 if !other
121 .options
122 .iter()
123 .any(|opt| matches!(opt, ColumnOption::NotNull | ColumnOption::TimeIndex))
124 {
125 debug!("ColumnOption::NotNull or ColumnOption::TimeIndex is not found");
126 return false;
127 }
128 }
129 match semantic_type(&self.semantic_type) {
131 Some(SemanticType::Tag) => {
132 if !other
133 .options
134 .iter()
135 .any(|opt| matches!(opt, ColumnOption::PrimaryKey))
136 {
137 debug!("ColumnOption::PrimaryKey is not found");
138 return false;
139 }
140 }
141 Some(SemanticType::Field) => {
142 if other
143 .options
144 .iter()
145 .any(|opt| matches!(opt, ColumnOption::PrimaryKey | ColumnOption::TimeIndex))
146 {
147 debug!("unexpected ColumnOption::PrimaryKey or ColumnOption::TimeIndex");
148 return false;
149 }
150 }
151 Some(SemanticType::Timestamp) => {
152 if !other
153 .options
154 .iter()
155 .any(|opt| matches!(opt, ColumnOption::TimeIndex))
156 {
157 debug!("ColumnOption::TimeIndex is not found");
158 return false;
159 }
160 }
161 None => {
162 debug!("unknown semantic type: {}", self.semantic_type);
163 return false;
164 }
165 };
166
167 true
168 }
169}
170
171pub fn assert_eq(fetched_columns: &[ColumnEntry], columns: &[Column]) -> Result<()> {
173 ensure!(
174 columns.len() == fetched_columns.len(),
175 error::AssertSnafu {
176 reason: format!(
177 "Expected columns length: {}, got: {}",
178 columns.len(),
179 fetched_columns.len(),
180 )
181 }
182 );
183
184 for (idx, fetched) in fetched_columns.iter().enumerate() {
185 ensure!(
186 fetched == &columns[idx],
187 error::AssertSnafu {
188 reason: format!(
189 "ColumnEntry {fetched:?} is not equal to Column {:?}",
190 columns[idx]
191 )
192 }
193 );
194 }
195
196 Ok(())
197}
198
199pub async fn fetch_columns(
201 db: &MySqlPool,
202 schema_name: Ident,
203 table_name: Ident,
204) -> Result<Vec<ColumnEntry>> {
205 let sql = "SELECT table_schema, table_name, column_name, greptime_data_type as data_type, semantic_type, column_default, is_nullable FROM information_schema.columns WHERE table_schema = ? AND table_name = ?";
206 sqlx::query_as::<_, ColumnEntry>(sql)
207 .bind(schema_name.value.to_string())
208 .bind(table_name.value.to_string())
209 .fetch_all(db)
210 .await
211 .context(error::ExecuteQuerySnafu { sql })
212}
213
214#[allow(unused)]
216pub async fn valid_marker_value(
217 db: &MySqlPool,
218 _schema_name: Ident,
219 table_name: Ident,
220) -> Result<bool> {
221 let sql = format!("SELECT * FROM {table_name}");
222 let res = sqlx::query(&sql)
225 .persistent(false)
226 .fetch_all(db)
227 .await
228 .context(error::ExecuteQuerySnafu { sql })?;
229 if res.len() != 1 {
230 UnexpectedSnafu {
231 violated: format!(
232 "Expected one row after alter table, found {}:{:?}",
233 res.len(),
234 res
235 ),
236 }
237 .fail()?
238 }
239 Ok(true)
241}
242
243#[cfg(test)]
244mod tests {
245 use datatypes::data_type::{ConcreteDataType, DataType};
246 use datatypes::value::Value;
247
248 use super::ColumnEntry;
249 use crate::ir::create_expr::ColumnOption;
250 use crate::ir::{Column, Ident};
251
252 #[test]
253 fn test_column_eq() {
254 common_telemetry::init_default_ut_logging();
255 let column_entry = ColumnEntry {
256 table_schema: String::new(),
257 table_name: String::new(),
258 column_name: "test".to_string(),
259 data_type: ConcreteDataType::int8_datatype().name(),
260 semantic_type: "FIELD".to_string(),
261 column_default: None,
262 is_nullable: "Yes".to_string(),
263 };
264 let column = Column {
266 name: Ident::new("test"),
267 column_type: ConcreteDataType::int8_datatype(),
268 options: vec![],
269 };
270 assert!(column_entry == column);
271 let column = Column {
273 name: Ident::with_quote('\'', "test"),
274 column_type: ConcreteDataType::int8_datatype(),
275 options: vec![],
276 };
277 assert!(column_entry == column);
278 let column_entry = ColumnEntry {
280 table_schema: String::new(),
281 table_name: String::new(),
282 column_name: "test".to_string(),
283 data_type: ConcreteDataType::int8_datatype().to_string(),
284 semantic_type: "FIELD".to_string(),
285 column_default: Some("1".to_string()),
286 is_nullable: "Yes".to_string(),
287 };
288 let column = Column {
289 name: Ident::with_quote('\'', "test"),
290 column_type: ConcreteDataType::int8_datatype(),
291 options: vec![ColumnOption::DefaultValue(Value::from(1))],
292 };
293 assert!(column_entry == column);
294 let column_entry = ColumnEntry {
296 table_schema: String::new(),
297 table_name: String::new(),
298 column_name: "test".to_string(),
299 data_type: ConcreteDataType::int8_datatype().to_string(),
300 semantic_type: "FIELD".to_string(),
301 column_default: Some("Hello()".to_string()),
302 is_nullable: "Yes".to_string(),
303 };
304 let column = Column {
305 name: Ident::with_quote('\'', "test"),
306 column_type: ConcreteDataType::int8_datatype(),
307 options: vec![ColumnOption::DefaultFn("Hello()".to_string())],
308 };
309 assert!(column_entry == column);
310 }
311}