servers/mysql/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16
17use common_error::ext::ErrorExt;
18use common_error::status_code::StatusCode;
19use common_query::{Output, OutputData};
20use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
21use common_telemetry::{debug, error};
22use datatypes::prelude::{ConcreteDataType, Value};
23use datatypes::schema::SchemaRef;
24use datatypes::types::json_type_value_to_string;
25use futures::StreamExt;
26use itertools::Itertools;
27use opensrv_mysql::{
28    Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
29};
30use session::context::QueryContextRef;
31use snafu::prelude::*;
32use tokio::io::AsyncWrite;
33
34use crate::error::{self, ConvertSqlValueSnafu, Result};
35use crate::metrics::*;
36
37/// Try to write multiple output to the writer if possible.
38pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
39    w: QueryResultWriter<'_, W>,
40    query_context: QueryContextRef,
41    outputs: Vec<Result<Output>>,
42) -> Result<()> {
43    let mut writer = Some(MysqlResultWriter::new(w, query_context.clone()));
44    for output in outputs {
45        let result_writer = writer.take().context(error::InternalSnafu {
46            err_msg: "Sending multiple result set is unsupported",
47        })?;
48        writer = result_writer.try_write_one(output).await?;
49    }
50
51    if let Some(result_writer) = writer {
52        result_writer.finish().await?;
53    }
54    Ok(())
55}
56
57/// Handle GreptimeDB error, convert it to MySQL error
58pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
59    let status_code = e.status_code();
60    let kind = mysql_error_kind(&status_code);
61
62    if status_code.should_log_error() {
63        let root_error = e.root_cause().unwrap_or(&e);
64        error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
65    } else {
66        debug!(
67            "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
68            status_code,
69            query_ctx.get_db_string(),
70            e
71        );
72    };
73    let msg = e.output_msg();
74    // Inline the status code to output message for MySQL
75    let err_msg = format!("({status_code}): {msg}");
76
77    (kind, err_msg)
78}
79
80struct QueryResult {
81    schema: SchemaRef,
82    stream: SendableRecordBatchStream,
83}
84
85pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
86    writer: QueryResultWriter<'a, W>,
87    query_context: QueryContextRef,
88}
89
90impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
91    pub fn new(
92        writer: QueryResultWriter<'a, W>,
93        query_context: QueryContextRef,
94    ) -> MysqlResultWriter<'a, W> {
95        MysqlResultWriter::<'a, W> {
96            writer,
97            query_context,
98        }
99    }
100
101    /// Try to write one result set. If there are more than one result set, return `Some`.
102    pub async fn try_write_one(
103        self,
104        output: Result<Output>,
105    ) -> Result<Option<MysqlResultWriter<'a, W>>> {
106        // We don't support sending multiple query result because the RowWriter's lifetime is bound to
107        // a local variable.
108        match output {
109            Ok(output) => match output.data {
110                OutputData::Stream(stream) => {
111                    let query_result = QueryResult {
112                        schema: stream.schema(),
113                        stream,
114                    };
115                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
116                }
117                OutputData::RecordBatches(recordbatches) => {
118                    let query_result = QueryResult {
119                        schema: recordbatches.schema(),
120                        stream: recordbatches.as_stream(),
121                    };
122                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
123                }
124                OutputData::AffectedRows(rows) => {
125                    let next_writer = Self::write_affected_rows(self.writer, rows).await?;
126                    return Ok(Some(MysqlResultWriter::new(
127                        next_writer,
128                        self.query_context,
129                    )));
130                }
131            },
132            Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
133        }
134        Ok(None)
135    }
136
137    /// Indicate no more result set to write. No need to call this if there is only one result set.
138    pub async fn finish(self) -> Result<()> {
139        self.writer.no_more_results().await?;
140        Ok(())
141    }
142
143    async fn write_affected_rows(
144        w: QueryResultWriter<'a, W>,
145        rows: usize,
146    ) -> Result<QueryResultWriter<'a, W>> {
147        let next_writer = w
148            .complete_one(OkResponse {
149                affected_rows: rows as u64,
150                ..Default::default()
151            })
152            .await?;
153        Ok(next_writer)
154    }
155
156    async fn write_query_result(
157        mut query_result: QueryResult,
158        writer: QueryResultWriter<'a, W>,
159        query_context: QueryContextRef,
160    ) -> Result<()> {
161        match create_mysql_column_def(&query_result.schema) {
162            Ok(column_def) => {
163                // The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one()
164                // to return a new QueryResultWriter.
165                let mut row_writer = writer.start(&column_def).await?;
166                while let Some(record_batch) = query_result.stream.next().await {
167                    match record_batch {
168                        Ok(record_batch) => {
169                            Self::write_recordbatch(
170                                &mut row_writer,
171                                &record_batch,
172                                query_context.clone(),
173                                &query_result.schema,
174                            )
175                            .await?
176                        }
177                        Err(e) => {
178                            let (kind, err) = handle_err(e, query_context);
179                            debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
180                            row_writer.finish_error(kind, &err.as_bytes()).await?;
181
182                            return Ok(());
183                        }
184                    }
185                }
186                row_writer.finish().await?;
187                Ok(())
188            }
189            Err(error) => Self::write_query_error(error, writer, query_context).await,
190        }
191    }
192
193    async fn write_recordbatch(
194        row_writer: &mut RowWriter<'_, W>,
195        recordbatch: &RecordBatch,
196        query_context: QueryContextRef,
197        schema: &SchemaRef,
198    ) -> Result<()> {
199        for row in recordbatch.rows() {
200            for (value, column) in row.into_iter().zip(schema.column_schemas().iter()) {
201                match value {
202                    Value::Null => row_writer.write_col(None::<u8>)?,
203                    Value::Boolean(v) => row_writer.write_col(v as i8)?,
204                    Value::UInt8(v) => row_writer.write_col(v)?,
205                    Value::UInt16(v) => row_writer.write_col(v)?,
206                    Value::UInt32(v) => row_writer.write_col(v)?,
207                    Value::UInt64(v) => row_writer.write_col(v)?,
208                    Value::Int8(v) => row_writer.write_col(v)?,
209                    Value::Int16(v) => row_writer.write_col(v)?,
210                    Value::Int32(v) => row_writer.write_col(v)?,
211                    Value::Int64(v) => row_writer.write_col(v)?,
212                    Value::Float32(v) => row_writer.write_col(v.0)?,
213                    Value::Float64(v) => row_writer.write_col(v.0)?,
214                    Value::String(v) => row_writer.write_col(v.as_utf8())?,
215                    Value::Binary(v) => match column.data_type {
216                        ConcreteDataType::Json(j) => {
217                            let s = json_type_value_to_string(&v, &j.format)
218                                .context(ConvertSqlValueSnafu)?;
219                            row_writer.write_col(s)?;
220                        }
221                        _ => {
222                            row_writer.write_col(v.deref())?;
223                        }
224                    },
225                    Value::Date(v) => row_writer.write_col(v.to_chrono_date())?,
226                    // convert timestamp to timezone of current connection
227                    Value::Timestamp(v) => row_writer.write_col(
228                        v.to_chrono_datetime_with_timezone(Some(&query_context.timezone())),
229                    )?,
230                    Value::IntervalYearMonth(v) => row_writer.write_col(v.to_iso8601_string())?,
231                    Value::IntervalDayTime(v) => row_writer.write_col(v.to_iso8601_string())?,
232                    Value::IntervalMonthDayNano(v) => {
233                        row_writer.write_col(v.to_iso8601_string())?
234                    }
235                    Value::Duration(v) => row_writer.write_col(v.to_std_duration())?,
236                    Value::List(v) => row_writer.write_col(format!(
237                        "[{}]",
238                        v.items().iter().map(|x| x.to_string()).join(", ")
239                    ))?,
240                    Value::Time(v) => row_writer
241                        .write_col(v.to_timezone_aware_string(Some(&query_context.timezone())))?,
242                    Value::Decimal128(v) => row_writer.write_col(v.to_string())?,
243                }
244            }
245            row_writer.end_row().await?;
246        }
247        Ok(())
248    }
249
250    async fn write_query_error(
251        error: impl ErrorExt,
252        w: QueryResultWriter<'a, W>,
253        query_context: QueryContextRef,
254    ) -> Result<()> {
255        METRIC_ERROR_COUNTER
256            .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
257            .inc();
258
259        let (kind, err) = handle_err(error, query_context);
260        debug!("Write query error, kind: {:?}, err: {}", kind, err);
261        w.error(kind, err.as_bytes()).await?;
262        Ok(())
263    }
264}
265
266pub(crate) fn create_mysql_column(
267    data_type: &ConcreteDataType,
268    column_name: &str,
269) -> Result<Column> {
270    let column_type = match data_type {
271        ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
272        ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
273            Ok(ColumnType::MYSQL_TYPE_TINY)
274        }
275        ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
276            Ok(ColumnType::MYSQL_TYPE_SHORT)
277        }
278        ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
279        ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
280            Ok(ColumnType::MYSQL_TYPE_LONGLONG)
281        }
282        ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
283        ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
284        ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
285            Ok(ColumnType::MYSQL_TYPE_VARCHAR)
286        }
287        ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
288        ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
289        ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
290        ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
291        ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
292        ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
293        ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
294        ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
295        ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
296        _ => error::UnsupportedDataTypeSnafu {
297            data_type,
298            reason: "not implemented",
299        }
300        .fail(),
301    };
302    let mut colflags = ColumnFlags::empty();
303    match data_type {
304        ConcreteDataType::UInt16(_)
305        | ConcreteDataType::UInt8(_)
306        | ConcreteDataType::UInt32(_)
307        | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
308        _ => {}
309    };
310    column_type.map(|column_type| Column {
311        column: column_name.to_string(),
312        coltype: column_type,
313
314        // TODO(LFC): Currently "table" and "colflags" are not relevant in MySQL server
315        //   implementation, will revisit them again in the future.
316        table: String::default(),
317        colflags,
318    })
319}
320
321/// Creates MySQL columns definition from our column schema.
322pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
323    schema
324        .column_schemas()
325        .iter()
326        .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
327        .collect()
328}
329
330fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
331    match status_code {
332        StatusCode::Success => ErrorKind::ER_YES,
333        StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
334        StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
335        StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
336        StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
337        StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
338        StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
339            ErrorKind::ER_TABLE_EXISTS_ERROR
340        }
341        StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
342        StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
343        StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
344        StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
345        StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
346        StatusCode::PermissionDenied | StatusCode::AccessDenied => {
347            ErrorKind::ER_ACCESS_DENIED_ERROR
348        }
349        StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
350        StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
351            ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
352        }
353        StatusCode::Unexpected
354        | StatusCode::Internal
355        | StatusCode::IllegalState
356        | StatusCode::PlanQuery
357        | StatusCode::EngineExecuteQuery
358        | StatusCode::RegionNotReady
359        | StatusCode::RegionBusy
360        | StatusCode::TableUnavailable
361        | StatusCode::StorageUnavailable
362        | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
363        StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
364        StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
365        StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
366        StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
367        StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
368        StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
369        StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
370        StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
371        StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
372    }
373}