servers/mysql/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::ops::Deref;
16
17use common_error::ext::ErrorExt;
18use common_error::status_code::StatusCode;
19use common_query::{Output, OutputData};
20use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
21use common_telemetry::{debug, error};
22use datatypes::prelude::{ConcreteDataType, Value};
23use datatypes::schema::SchemaRef;
24use datatypes::types::json_type_value_to_string;
25use futures::StreamExt;
26use opensrv_mysql::{
27    Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
28};
29use session::context::QueryContextRef;
30use snafu::prelude::*;
31use tokio::io::AsyncWrite;
32
33use crate::error::{self, ConvertSqlValueSnafu, Error, Result};
34use crate::metrics::*;
35
36/// Try to write multiple output to the writer if possible.
37pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
38    w: QueryResultWriter<'_, W>,
39    query_context: QueryContextRef,
40    outputs: Vec<Result<Output>>,
41) -> Result<()> {
42    let mut writer = Some(MysqlResultWriter::new(w, query_context.clone()));
43    for output in outputs {
44        let result_writer = writer.take().context(error::InternalSnafu {
45            err_msg: "Sending multiple result set is unsupported",
46        })?;
47        writer = result_writer.try_write_one(output).await?;
48    }
49
50    if let Some(result_writer) = writer {
51        result_writer.finish().await?;
52    }
53    Ok(())
54}
55
56/// Handle GreptimeDB error, convert it to MySQL error
57pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
58    let status_code = e.status_code();
59    let kind = mysql_error_kind(&status_code);
60
61    if status_code.should_log_error() {
62        let root_error = e.root_cause().unwrap_or(&e);
63        error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
64    } else {
65        debug!(
66            "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
67            status_code,
68            query_ctx.get_db_string(),
69            e
70        );
71    };
72    let msg = e.output_msg();
73    // Inline the status code to output message for MySQL
74    let err_msg = format!("({status_code}): {msg}");
75
76    (kind, err_msg)
77}
78
79struct QueryResult {
80    schema: SchemaRef,
81    stream: SendableRecordBatchStream,
82}
83
84pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
85    writer: QueryResultWriter<'a, W>,
86    query_context: QueryContextRef,
87}
88
89impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
90    pub fn new(
91        writer: QueryResultWriter<'a, W>,
92        query_context: QueryContextRef,
93    ) -> MysqlResultWriter<'a, W> {
94        MysqlResultWriter::<'a, W> {
95            writer,
96            query_context,
97        }
98    }
99
100    /// Try to write one result set. If there are more than one result set, return `Some`.
101    pub async fn try_write_one(
102        self,
103        output: Result<Output>,
104    ) -> Result<Option<MysqlResultWriter<'a, W>>> {
105        // We don't support sending multiple query result because the RowWriter's lifetime is bound to
106        // a local variable.
107        match output {
108            Ok(output) => match output.data {
109                OutputData::Stream(stream) => {
110                    let query_result = QueryResult {
111                        schema: stream.schema(),
112                        stream,
113                    };
114                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
115                }
116                OutputData::RecordBatches(recordbatches) => {
117                    let query_result = QueryResult {
118                        schema: recordbatches.schema(),
119                        stream: recordbatches.as_stream(),
120                    };
121                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
122                }
123                OutputData::AffectedRows(rows) => {
124                    let next_writer = Self::write_affected_rows(self.writer, rows).await?;
125                    return Ok(Some(MysqlResultWriter::new(
126                        next_writer,
127                        self.query_context,
128                    )));
129                }
130            },
131            Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
132        }
133        Ok(None)
134    }
135
136    /// Indicate no more result set to write. No need to call this if there is only one result set.
137    pub async fn finish(self) -> Result<()> {
138        self.writer.no_more_results().await?;
139        Ok(())
140    }
141
142    async fn write_affected_rows(
143        w: QueryResultWriter<'a, W>,
144        rows: usize,
145    ) -> Result<QueryResultWriter<'a, W>> {
146        let next_writer = w
147            .complete_one(OkResponse {
148                affected_rows: rows as u64,
149                ..Default::default()
150            })
151            .await?;
152        Ok(next_writer)
153    }
154
155    async fn write_query_result(
156        mut query_result: QueryResult,
157        writer: QueryResultWriter<'a, W>,
158        query_context: QueryContextRef,
159    ) -> Result<()> {
160        match create_mysql_column_def(&query_result.schema) {
161            Ok(column_def) => {
162                // The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one()
163                // to return a new QueryResultWriter.
164                let mut row_writer = writer.start(&column_def).await?;
165                while let Some(record_batch) = query_result.stream.next().await {
166                    match record_batch {
167                        Ok(record_batch) => {
168                            Self::write_recordbatch(
169                                &mut row_writer,
170                                &record_batch,
171                                query_context.clone(),
172                                &query_result.schema,
173                            )
174                            .await?
175                        }
176                        Err(e) => {
177                            let (kind, err) = handle_err(e, query_context);
178                            debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
179                            row_writer.finish_error(kind, &err.as_bytes()).await?;
180
181                            return Ok(());
182                        }
183                    }
184                }
185                row_writer.finish().await?;
186                Ok(())
187            }
188            Err(error) => Self::write_query_error(error, writer, query_context).await,
189        }
190    }
191
192    async fn write_recordbatch(
193        row_writer: &mut RowWriter<'_, W>,
194        recordbatch: &RecordBatch,
195        query_context: QueryContextRef,
196        schema: &SchemaRef,
197    ) -> Result<()> {
198        for row in recordbatch.rows() {
199            for (value, column) in row.into_iter().zip(schema.column_schemas().iter()) {
200                match value {
201                    Value::Null => row_writer.write_col(None::<u8>)?,
202                    Value::Boolean(v) => row_writer.write_col(v as i8)?,
203                    Value::UInt8(v) => row_writer.write_col(v)?,
204                    Value::UInt16(v) => row_writer.write_col(v)?,
205                    Value::UInt32(v) => row_writer.write_col(v)?,
206                    Value::UInt64(v) => row_writer.write_col(v)?,
207                    Value::Int8(v) => row_writer.write_col(v)?,
208                    Value::Int16(v) => row_writer.write_col(v)?,
209                    Value::Int32(v) => row_writer.write_col(v)?,
210                    Value::Int64(v) => row_writer.write_col(v)?,
211                    Value::Float32(v) => row_writer.write_col(v.0)?,
212                    Value::Float64(v) => row_writer.write_col(v.0)?,
213                    Value::String(v) => row_writer.write_col(v.as_utf8())?,
214                    Value::Binary(v) => match column.data_type {
215                        ConcreteDataType::Json(j) => {
216                            let s = json_type_value_to_string(&v, &j.format)
217                                .context(ConvertSqlValueSnafu)?;
218                            row_writer.write_col(s)?;
219                        }
220                        _ => {
221                            row_writer.write_col(v.deref())?;
222                        }
223                    },
224                    Value::Date(v) => row_writer.write_col(v.to_chrono_date())?,
225                    // convert timestamp to timezone of current connection
226                    Value::Timestamp(v) => row_writer.write_col(
227                        v.to_chrono_datetime_with_timezone(Some(&query_context.timezone())),
228                    )?,
229                    Value::IntervalYearMonth(v) => row_writer.write_col(v.to_iso8601_string())?,
230                    Value::IntervalDayTime(v) => row_writer.write_col(v.to_iso8601_string())?,
231                    Value::IntervalMonthDayNano(v) => {
232                        row_writer.write_col(v.to_iso8601_string())?
233                    }
234                    Value::Duration(v) => row_writer.write_col(v.to_std_duration())?,
235                    Value::List(_) => {
236                        return Err(Error::Internal {
237                            err_msg: format!(
238                                "cannot write value {:?} in mysql protocol: unimplemented",
239                                &value
240                            ),
241                        })
242                    }
243                    Value::Time(v) => row_writer
244                        .write_col(v.to_timezone_aware_string(Some(&query_context.timezone())))?,
245                    Value::Decimal128(v) => row_writer.write_col(v.to_string())?,
246                }
247            }
248            row_writer.end_row().await?;
249        }
250        Ok(())
251    }
252
253    async fn write_query_error(
254        error: impl ErrorExt,
255        w: QueryResultWriter<'a, W>,
256        query_context: QueryContextRef,
257    ) -> Result<()> {
258        METRIC_ERROR_COUNTER
259            .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
260            .inc();
261
262        let (kind, err) = handle_err(error, query_context);
263        debug!("Write query error, kind: {:?}, err: {}", kind, err);
264        w.error(kind, err.as_bytes()).await?;
265        Ok(())
266    }
267}
268
269pub(crate) fn create_mysql_column(
270    data_type: &ConcreteDataType,
271    column_name: &str,
272) -> Result<Column> {
273    let column_type = match data_type {
274        ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
275        ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
276            Ok(ColumnType::MYSQL_TYPE_TINY)
277        }
278        ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
279            Ok(ColumnType::MYSQL_TYPE_SHORT)
280        }
281        ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
282        ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
283            Ok(ColumnType::MYSQL_TYPE_LONGLONG)
284        }
285        ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
286        ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
287        ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
288            Ok(ColumnType::MYSQL_TYPE_VARCHAR)
289        }
290        ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
291        ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
292        ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
293        ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
294        ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
295        ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
296        ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
297        ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
298        _ => error::UnsupportedDataTypeSnafu {
299            data_type,
300            reason: "not implemented",
301        }
302        .fail(),
303    };
304    let mut colflags = ColumnFlags::empty();
305    match data_type {
306        ConcreteDataType::UInt16(_)
307        | ConcreteDataType::UInt8(_)
308        | ConcreteDataType::UInt32(_)
309        | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
310        _ => {}
311    };
312    column_type.map(|column_type| Column {
313        column: column_name.to_string(),
314        coltype: column_type,
315
316        // TODO(LFC): Currently "table" and "colflags" are not relevant in MySQL server
317        //   implementation, will revisit them again in the future.
318        table: String::default(),
319        colflags,
320    })
321}
322
323/// Creates MySQL columns definition from our column schema.
324pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
325    schema
326        .column_schemas()
327        .iter()
328        .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
329        .collect()
330}
331
332fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
333    match status_code {
334        StatusCode::Success => ErrorKind::ER_YES,
335        StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
336        StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
337        StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
338        StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
339        StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
340        StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
341            ErrorKind::ER_TABLE_EXISTS_ERROR
342        }
343        StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
344        StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
345        StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
346        StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
347        StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
348        StatusCode::PermissionDenied | StatusCode::AccessDenied => {
349            ErrorKind::ER_ACCESS_DENIED_ERROR
350        }
351        StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
352        StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
353            ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
354        }
355        StatusCode::Unexpected
356        | StatusCode::Internal
357        | StatusCode::IllegalState
358        | StatusCode::PlanQuery
359        | StatusCode::EngineExecuteQuery
360        | StatusCode::RegionNotReady
361        | StatusCode::RegionBusy
362        | StatusCode::TableUnavailable
363        | StatusCode::StorageUnavailable
364        | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
365        StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
366        StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
367        StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
368        StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
369        StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
370        StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
371        StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
372    }
373}