flow/adapter/
util.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Util functions for adapter
16
17use std::sync::Arc;
18
19use api::helper::ColumnDataTypeWrapper;
20use api::v1::column_def::options_from_column_schema;
21use api::v1::{ColumnDataType, ColumnDataTypeExtension, CreateTableExpr, SemanticType};
22use common_error::ext::BoxedError;
23use common_meta::key::table_info::TableInfoValue;
24use datatypes::prelude::ConcreteDataType;
25use datatypes::schema::{ColumnDefaultConstraint, ColumnSchema};
26use itertools::Itertools;
27use operator::expr_helper;
28use session::context::QueryContextBuilder;
29use snafu::{OptionExt, ResultExt};
30use table::table_reference::TableReference;
31
32use crate::adapter::table_source::TableDesc;
33use crate::adapter::{TableName, WorkerHandle, AUTO_CREATED_PLACEHOLDER_TS_COL};
34use crate::error::{Error, ExternalSnafu, UnexpectedSnafu};
35use crate::repr::{ColumnType, RelationDesc, RelationType};
36use crate::StreamingEngine;
37impl StreamingEngine {
38    /// Get a worker handle for creating flow, using round robin to select a worker
39    pub(crate) async fn get_worker_handle_for_create_flow(&self) -> &WorkerHandle {
40        let use_idx = {
41            let mut selector = self.worker_selector.lock().await;
42            if *selector >= self.worker_handles.len() {
43                *selector = 0
44            };
45            let use_idx = *selector;
46            *selector += 1;
47            use_idx
48        };
49        // Safety: selector is always in bound
50        &self.worker_handles[use_idx]
51    }
52
53    /// Create table from given schema(will adjust to add auto column if needed), return true if table is created
54    pub(crate) async fn create_table_from_relation(
55        &self,
56        flow_name: &str,
57        table_name: &TableName,
58        relation_desc: &RelationDesc,
59    ) -> Result<bool, Error> {
60        if self.fetch_table_pk_schema(table_name).await?.is_some() {
61            return Ok(false);
62        }
63        let (pks, tys, _) = self.adjust_auto_created_table_schema(relation_desc).await?;
64
65        //create sink table using pks, column types and is_ts_auto
66
67        let proto_schema = column_schemas_to_proto(tys.clone(), &pks)?;
68
69        // create sink table
70        let create_expr = expr_helper::create_table_expr_by_column_schemas(
71            &TableReference {
72                catalog: &table_name[0],
73                schema: &table_name[1],
74                table: &table_name[2],
75            },
76            &proto_schema,
77            "mito",
78            Some(&format!("Sink table for flow {}", flow_name)),
79        )
80        .map_err(BoxedError::new)
81        .context(ExternalSnafu)?;
82
83        self.submit_create_sink_table_ddl(create_expr).await?;
84        Ok(true)
85    }
86
87    /// Try fetch table with adjusted schema(added auto column if needed)
88    pub(crate) async fn try_fetch_existing_table(
89        &self,
90        table_name: &TableName,
91    ) -> Result<Option<(bool, Vec<api::v1::ColumnSchema>)>, Error> {
92        if let Some((primary_keys, time_index, schema)) =
93            self.fetch_table_pk_schema(table_name).await?
94        {
95            // check if the last column is the auto created timestamp column, hence the table is auto created from
96            // flow's plan type
97            let is_auto_create = {
98                let correct_name = schema
99                    .last()
100                    .map(|s| s.name == AUTO_CREATED_PLACEHOLDER_TS_COL)
101                    .unwrap_or(false);
102                let correct_time_index = time_index == Some(schema.len() - 1);
103                correct_name && correct_time_index
104            };
105            let proto_schema = column_schemas_to_proto(schema, &primary_keys)?;
106            Ok(Some((is_auto_create, proto_schema)))
107        } else {
108            Ok(None)
109        }
110    }
111
112    /// submit a create table ddl
113    pub(crate) async fn submit_create_sink_table_ddl(
114        &self,
115        mut create_table: CreateTableExpr,
116    ) -> Result<(), Error> {
117        let stmt_exec = {
118            self.frontend_invoker
119                .read()
120                .await
121                .as_ref()
122                .map(|f| f.statement_executor())
123        }
124        .context(UnexpectedSnafu {
125            reason: "Failed to get statement executor",
126        })?;
127        let ctx = Arc::new(
128            QueryContextBuilder::default()
129                .current_catalog(create_table.catalog_name.clone())
130                .current_schema(create_table.schema_name.clone())
131                .build(),
132        );
133        stmt_exec
134            .create_table_inner(&mut create_table, None, ctx)
135            .await
136            .map_err(BoxedError::new)
137            .context(ExternalSnafu)?;
138
139        Ok(())
140    }
141}
142
143pub fn table_info_value_to_relation_desc(
144    table_info_value: TableInfoValue,
145) -> Result<TableDesc, Error> {
146    let raw_schema = table_info_value.table_info.meta.schema;
147    let (column_types, col_names): (Vec<_>, Vec<_>) = raw_schema
148        .column_schemas
149        .clone()
150        .into_iter()
151        .map(|col| {
152            (
153                ColumnType {
154                    nullable: col.is_nullable(),
155                    scalar_type: col.data_type,
156                },
157                Some(col.name),
158            )
159        })
160        .unzip();
161
162    let key = table_info_value.table_info.meta.primary_key_indices;
163    let keys = vec![crate::repr::Key::from(key)];
164
165    let time_index = raw_schema.timestamp_index;
166    let relation_desc = RelationDesc {
167        typ: RelationType {
168            column_types,
169            keys,
170            time_index,
171            // by default table schema's column are all non-auto
172            auto_columns: vec![],
173        },
174        names: col_names,
175    };
176    let default_values = raw_schema
177        .column_schemas
178        .iter()
179        .map(|c| {
180            c.default_constraint().cloned().or_else(|| {
181                if c.is_nullable() {
182                    Some(ColumnDefaultConstraint::null_value())
183                } else {
184                    None
185                }
186            })
187        })
188        .collect_vec();
189
190    Ok(TableDesc::new(relation_desc, default_values))
191}
192
193pub fn from_proto_to_data_type(
194    column_schema: &api::v1::ColumnSchema,
195) -> Result<ConcreteDataType, Error> {
196    let wrapper =
197        ColumnDataTypeWrapper::try_new(column_schema.datatype, column_schema.datatype_extension)
198            .map_err(BoxedError::new)
199            .context(ExternalSnafu)?;
200    let cdt = ConcreteDataType::from(wrapper);
201
202    Ok(cdt)
203}
204
205/// convert `ColumnSchema` lists to it's corresponding proto type
206pub fn column_schemas_to_proto(
207    column_schemas: Vec<ColumnSchema>,
208    primary_keys: &[String],
209) -> Result<Vec<api::v1::ColumnSchema>, Error> {
210    let column_datatypes: Vec<(ColumnDataType, Option<ColumnDataTypeExtension>)> = column_schemas
211        .iter()
212        .map(|c| {
213            ColumnDataTypeWrapper::try_from(c.data_type.clone())
214                .map(|w| w.to_parts())
215                .map_err(BoxedError::new)
216                .context(ExternalSnafu)
217        })
218        .try_collect()?;
219
220    let ret = column_schemas
221        .iter()
222        .zip(column_datatypes)
223        .map(|(schema, datatype)| {
224            let semantic_type = if schema.is_time_index() {
225                SemanticType::Timestamp
226            } else if primary_keys.contains(&schema.name) {
227                SemanticType::Tag
228            } else {
229                SemanticType::Field
230            } as i32;
231
232            api::v1::ColumnSchema {
233                column_name: schema.name.clone(),
234                datatype: datatype.0 as i32,
235                semantic_type,
236                datatype_extension: datatype.1,
237                options: options_from_column_schema(schema),
238            }
239        })
240        .collect();
241    Ok(ret)
242}
243
244/// Convert `RelationDesc` to `ColumnSchema` list,
245/// if the column name is not present, use `col_{idx}` as the column name
246pub fn relation_desc_to_column_schemas_with_fallback(schema: &RelationDesc) -> Vec<ColumnSchema> {
247    schema
248        .typ()
249        .column_types
250        .clone()
251        .into_iter()
252        .enumerate()
253        .map(|(idx, typ)| {
254            let name = schema
255                .names
256                .get(idx)
257                .cloned()
258                .flatten()
259                .unwrap_or(format!("col_{}", idx));
260            let ret = ColumnSchema::new(name, typ.scalar_type, typ.nullable);
261            if schema.typ().time_index == Some(idx) {
262                ret.with_time_index(true)
263            } else {
264                ret
265            }
266        })
267        .collect_vec()
268}