1use std::sync::Arc;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::column_def::options_from_column_schema;
21use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, SemanticType};
22use common_error::ext::BoxedError;
23use common_meta::key::table_info::TableInfoValue;
24use datatypes::prelude::ConcreteDataType;
25use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
26use itertools::Itertools;
27use operator::expr_helper;
28use session::context::QueryContextBuilder;
29use snafu::{OptionExt, ResultExt};
30use table::table_reference::TableReference;
31
32use crate::adapter::table_source::TableDesc;
33use crate::adapter::{TableName, WorkerHandle, AUTO_CREATED_PLACEHOLDER_TS_COL};
34use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
35use crate::repr::{ColumnType, RelationDesc, RelationType};
36use crate::StreamingEngine;
37impl StreamingEngine {
38 pub(crate) async fn get_worker_handle_for_create_flow(&self) -> &WorkerHandle {
40 let use_idx = {
41 let mut selector = self.worker_selector.lock().await;
42 if *selector >= self.worker_handles.len() {
43 *selector = 0
44 };
45 let use_idx = *selector;
46 *selector += 1;
47 use_idx
48 };
49 &self.worker_handles[use_idx]
51 }
52
53 pub(crate) async fn create_table_from_relation(
55 &self,
56 flow_name: &str,
57 table_name: &TableName,
58 relation_desc: &RelationDesc,
59 ) -> Result<bool, Error> {
60 if self.fetch_table_pk_schema(table_name).await?.is_some() {
61 return Ok(false);
62 }
63 let (pks, tys, _) = self.adjust_auto_created_table_schema(relation_desc).await?;
64
65 let proto_schema = column_schemas_to_proto(tys.clone(), &pks)?;
68
69 let create_expr = expr_helper::create_table_expr_by_column_schemas(
71 &TableReference {
72 catalog: &table_name[0],
73 schema: &table_name[1],
74 table: &table_name[2],
75 },
76 &proto_schema,
77 "mito",
78 Some(&format!("Sink table for flow {}", flow_name)),
79 )
80 .map_err(BoxedError::new)
81 .context(ExternalSnafu)?;
82
83 self.submit_create_sink_table_ddl(create_expr).await?;
84 Ok(true)
85 }
86
87 pub(crate) async fn try_fetch_existing_table(
89 &self,
90 table_name: &TableName,
91 ) -> Result<Option<(bool, Vec<api::v1::ColumnSchema>)>, Error> {
92 if let Some((primary_keys, time_index, schema)) =
93 self.fetch_table_pk_schema(table_name).await?
94 {
95 let is_auto_create = {
98 let correct_name = schema
99 .last()
100 .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL)
101 .unwrap_or(false);
102 let correct_time_index = time_index == Some(schema.len() - 1);
103 correct_name && correct_time_index
104 };
105 let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
106 Ok(Some((is_auto_create, proto_schema)))
107 } else {
108 Ok(None)
109 }
110 }
111
112 pub(crate) async fn submit_create_sink_table_ddl(
114 &self,
115 mut create_table: CreateTableExpr,
116 ) -> Result<(), Error> {
117 let stmt_exec = {
118 self.frontend_invoker
119 .read()
120 .await
121 .as_ref()
122 .map(|f| f.statement_executor())
123 }
124 .context(UnexpectedSnafu {
125 reason: "Failed to get statement executor",
126 })?;
127 let ctx = Arc::new(
128 QueryContextBuilder::default()
129 .current_catalog(create_table.catalog_name.clone())
130 .current_schema(create_table.schema_name.clone())
131 .build(),
132 );
133 stmt_exec
134 .create_table_inner(&mut create_table, None, ctx)
135 .await
136 .map_err(BoxedError::new)
137 .context(ExternalSnafu)?;
138
139 Ok(())
140 }
141}
142
143pub fn table_info_value_to_relation_desc(
144 table_info_value: TableInfoValue,
145) -> Result<TableDesc, Error> {
146 let raw_schema = table_info_value.table_info.meta.schema;
147 let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
148 .column_schemas
149 .clone()
150 .into_iter()
151 .map(|col| {
152 (
153 ColumnType {
154 nullable: col.is_nullable(),
155 scalar_type: col.data_type,
156 },
157 Some(col.name),
158 )
159 })
160 .unzip();
161
162 let key = table_info_value.table_info.meta.primary_key_indices;
163 let keys = vec![crate::repr::Key::from(key)];
164
165 let time_index = raw_schema.timestamp_index;
166 let relation_desc = RelationDesc {
167 typ: RelationType {
168 column_types,
169 keys,
170 time_index,
171 auto_columns: vec![],
173 },
174 names: col_names,
175 };
176 let default_values = raw_schema
177 .column_schemas
178 .iter()
179 .map(|c| {
180 c.default_constraint().cloned().or_else(|| {
181 if c.is_nullable() {
182 Some(ColumnDefaultConstraint::null_value())
183 } else {
184 None
185 }
186 })
187 })
188 .collect_vec();
189
190 Ok(TableDesc::new(relation_desc, default_values))
191}
192
193pub fn from_proto_to_data_type(
194 column_schema: &api::v1::ColumnSchema,
195) -> Result<ConcreteDataType, Error> {
196 let wrapper =
197 ColumnDataTypeWrapper::try_new(column_schema.datatype, column_schema.datatype_extension)
198 .map_err(BoxedError::new)
199 .context(ExternalSnafu)?;
200 let cdt = ConcreteDataType::from(wrapper);
201
202 Ok(cdt)
203}
204
205pub fn column_schemas_to_proto(
207 column_schemas: Vec<ColumnSchema>,
208 primary_keys: &[String],
209) -> Result<Vec<api::v1::ColumnSchema>, Error> {
210 let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
211 .iter()
212 .map(|c| {
213 ColumnDataTypeWrapper::try_from(c.data_type.clone())
214 .map(|w| w.to_parts())
215 .map_err(BoxedError::new)
216 .context(ExternalSnafu)
217 })
218 .try_collect()?;
219
220 let ret = column_schemas
221 .iter()
222 .zip(column_datatypes)
223 .map(|(schema, datatype)| {
224 let semantic_type = if schema.is_time_index() {
225 SemanticType::Timestamp
226 } else if primary_keys.contains(&schema.name) {
227 SemanticType::Tag
228 } else {
229 SemanticType::Field
230 } as i32;
231
232 api::v1::ColumnSchema {
233 column_name: schema.name.clone(),
234 datatype: datatype.0 as i32,
235 semantic_type,
236 datatype_extension: datatype.1,
237 options: options_from_column_schema(schema),
238 }
239 })
240 .collect();
241 Ok(ret)
242}
243
244pub fn relation_desc_to_column_schemas_with_fallback(schema: &RelationDesc) -> Vec<ColumnSchema> {
247 schema
248 .typ()
249 .column_types
250 .clone()
251 .into_iter()
252 .enumerate()
253 .map(|(idx, typ)| {
254 let name = schema
255 .names
256 .get(idx)
257 .cloned()
258 .flatten()
259 .unwrap_or(format!("col_{}", idx));
260 let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable);
261 if schema.typ().time_index == Some(idx) {
262 ret.with_time_index(true)
263 } else {
264 ret
265 }
266 })
267 .collect_vec()
268}