servers/mysql/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use arrow::array::{Array, AsArray};
18use arrow::datatypes::{
19    Date32Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
20    Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, UInt8Type,
21    UInt16Type, UInt32Type, UInt64Type,
22};
23use arrow_schema::{DataType, IntervalUnit};
24use common_decimal::Decimal128;
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_query::{Output, OutputData};
28use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
29use common_telemetry::{debug, error};
30use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
31use datafusion_common::ScalarValue;
32use datatypes::prelude::ConcreteDataType;
33use datatypes::schema::SchemaRef;
34use datatypes::types::jsonb_to_string;
35use futures::StreamExt;
36use opensrv_mysql::{
37    Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
38};
39use session::context::QueryContextRef;
40use snafu::prelude::*;
41use tokio::io::AsyncWrite;
42
43use crate::error::{self, ConvertSqlValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result};
44use crate::metrics::*;
45
46/// Try to write multiple output to the writer if possible.
47pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
48    w: QueryResultWriter<'_, W>,
49    query_context: QueryContextRef,
50    outputs: Vec<Result<Output>>,
51) -> Result<()> {
52    let mut writer = Some(MysqlResultWriter::new(w, query_context.clone()));
53    for output in outputs {
54        let result_writer = writer.take().context(error::InternalSnafu {
55            err_msg: "Sending multiple result set is unsupported",
56        })?;
57        writer = result_writer.try_write_one(output).await?;
58    }
59
60    if let Some(result_writer) = writer {
61        result_writer.finish().await?;
62    }
63    Ok(())
64}
65
66/// Handle GreptimeDB error, convert it to MySQL error
67pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
68    let status_code = e.status_code();
69    let kind = mysql_error_kind(&status_code);
70
71    if status_code.should_log_error() {
72        let root_error = e.root_cause().unwrap_or(&e);
73        error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
74    } else {
75        debug!(
76            "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
77            status_code,
78            query_ctx.get_db_string(),
79            e
80        );
81    };
82    let msg = e.output_msg();
83    // Inline the status code to output message for MySQL
84    let err_msg = format!("({status_code}): {msg}");
85
86    (kind, err_msg)
87}
88
89struct QueryResult {
90    schema: SchemaRef,
91    stream: SendableRecordBatchStream,
92}
93
94pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
95    writer: QueryResultWriter<'a, W>,
96    query_context: QueryContextRef,
97}
98
99impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
100    pub fn new(
101        writer: QueryResultWriter<'a, W>,
102        query_context: QueryContextRef,
103    ) -> MysqlResultWriter<'a, W> {
104        MysqlResultWriter::<'a, W> {
105            writer,
106            query_context,
107        }
108    }
109
110    /// Try to write one result set. If there are more than one result set, return `Some`.
111    pub async fn try_write_one(
112        self,
113        output: Result<Output>,
114    ) -> Result<Option<MysqlResultWriter<'a, W>>> {
115        // We don't support sending multiple query result because the RowWriter's lifetime is bound to
116        // a local variable.
117        match output {
118            Ok(output) => match output.data {
119                OutputData::Stream(stream) => {
120                    let query_result = QueryResult {
121                        schema: stream.schema(),
122                        stream,
123                    };
124                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
125                }
126                OutputData::RecordBatches(recordbatches) => {
127                    let query_result = QueryResult {
128                        schema: recordbatches.schema(),
129                        stream: recordbatches.as_stream(),
130                    };
131                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
132                }
133                OutputData::AffectedRows(rows) => {
134                    let next_writer = Self::write_affected_rows(self.writer, rows).await?;
135                    return Ok(Some(MysqlResultWriter::new(
136                        next_writer,
137                        self.query_context,
138                    )));
139                }
140            },
141            Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
142        }
143        Ok(None)
144    }
145
146    /// Indicate no more result set to write. No need to call this if there is only one result set.
147    pub async fn finish(self) -> Result<()> {
148        self.writer.no_more_results().await?;
149        Ok(())
150    }
151
152    async fn write_affected_rows(
153        w: QueryResultWriter<'a, W>,
154        rows: usize,
155    ) -> Result<QueryResultWriter<'a, W>> {
156        let next_writer = w
157            .complete_one(OkResponse {
158                affected_rows: rows as u64,
159                ..Default::default()
160            })
161            .await?;
162        Ok(next_writer)
163    }
164
165    async fn write_query_result(
166        mut query_result: QueryResult,
167        writer: QueryResultWriter<'a, W>,
168        query_context: QueryContextRef,
169    ) -> Result<()> {
170        match create_mysql_column_def(&query_result.schema) {
171            Ok(column_def) => {
172                // The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one()
173                // to return a new QueryResultWriter.
174                let mut row_writer = writer.start(&column_def).await?;
175                while let Some(record_batch) = query_result.stream.next().await {
176                    match record_batch {
177                        Ok(record_batch) => {
178                            Self::write_recordbatch(
179                                &mut row_writer,
180                                record_batch,
181                                query_context.clone(),
182                                &query_result.schema,
183                            )
184                            .await?
185                        }
186                        Err(e) => {
187                            let (kind, err) = handle_err(e, query_context);
188                            debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
189                            row_writer.finish_error(kind, &err.as_bytes()).await?;
190
191                            return Ok(());
192                        }
193                    }
194                }
195                row_writer.finish().await?;
196                Ok(())
197            }
198            Err(error) => Self::write_query_error(error, writer, query_context).await,
199        }
200    }
201
202    async fn write_recordbatch(
203        row_writer: &mut RowWriter<'_, W>,
204        record_batch: RecordBatch,
205        query_context: QueryContextRef,
206        schema: &SchemaRef,
207    ) -> Result<()> {
208        let record_batch = record_batch.into_df_record_batch();
209        for i in 0..record_batch.num_rows() {
210            for (j, column) in record_batch.columns().iter().enumerate() {
211                if column.is_null(i) {
212                    row_writer.write_col(None::<u8>)?;
213                    continue;
214                }
215
216                match column.data_type() {
217                    DataType::Null => {
218                        row_writer.write_col(None::<u8>)?;
219                    }
220                    DataType::Boolean => {
221                        let array = column.as_boolean();
222                        row_writer.write_col(array.value(i) as i8)?;
223                    }
224                    DataType::UInt8 => {
225                        let array = column.as_primitive::<UInt8Type>();
226                        row_writer.write_col(array.value(i))?;
227                    }
228                    DataType::UInt16 => {
229                        let array = column.as_primitive::<UInt16Type>();
230                        row_writer.write_col(array.value(i))?;
231                    }
232                    DataType::UInt32 => {
233                        let array = column.as_primitive::<UInt32Type>();
234                        row_writer.write_col(array.value(i))?;
235                    }
236                    DataType::UInt64 => {
237                        let array = column.as_primitive::<UInt64Type>();
238                        row_writer.write_col(array.value(i))?;
239                    }
240                    DataType::Int8 => {
241                        let array = column.as_primitive::<Int8Type>();
242                        row_writer.write_col(array.value(i))?;
243                    }
244                    DataType::Int16 => {
245                        let array = column.as_primitive::<Int16Type>();
246                        row_writer.write_col(array.value(i))?;
247                    }
248                    DataType::Int32 => {
249                        let array = column.as_primitive::<Int32Type>();
250                        row_writer.write_col(array.value(i))?;
251                    }
252                    DataType::Int64 => {
253                        let array = column.as_primitive::<Int64Type>();
254                        row_writer.write_col(array.value(i))?;
255                    }
256                    DataType::Float32 => {
257                        let array = column.as_primitive::<Float32Type>();
258                        row_writer.write_col(array.value(i))?;
259                    }
260                    DataType::Float64 => {
261                        let array = column.as_primitive::<Float64Type>();
262                        row_writer.write_col(array.value(i))?;
263                    }
264                    DataType::Utf8 => {
265                        let array = column.as_string::<i32>();
266                        row_writer.write_col(array.value(i))?;
267                    }
268                    DataType::Utf8View => {
269                        let array = column.as_string_view();
270                        row_writer.write_col(array.value(i))?;
271                    }
272                    DataType::LargeUtf8 => {
273                        let array = column.as_string::<i64>();
274                        row_writer.write_col(array.value(i))?;
275                    }
276                    DataType::Binary => {
277                        let array = column.as_binary::<i32>();
278                        let v = array.value(i);
279                        if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
280                            let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
281                            row_writer.write_col(s)?;
282                        } else {
283                            row_writer.write_col(v)?;
284                        }
285                    }
286                    DataType::BinaryView => {
287                        let array = column.as_binary_view();
288                        let v = array.value(i);
289                        if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
290                            let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
291                            row_writer.write_col(s)?;
292                        } else {
293                            row_writer.write_col(v)?;
294                        }
295                    }
296                    DataType::LargeBinary => {
297                        let array = column.as_binary::<i64>();
298                        let v = array.value(i);
299                        if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
300                            let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
301                            row_writer.write_col(s)?;
302                        } else {
303                            row_writer.write_col(v)?;
304                        }
305                    }
306                    DataType::Date32 => {
307                        let array = column.as_primitive::<Date32Type>();
308                        let v = Date::new(array.value(i));
309                        row_writer.write_col(v.to_chrono_date())?;
310                    }
311                    DataType::Timestamp(_, _) => {
312                        let v = datatypes::arrow_array::timestamp_array_value(column, i);
313                        let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone()));
314                        row_writer.write_col(v)?;
315                    }
316                    DataType::Interval(interval_unit) => match interval_unit {
317                        IntervalUnit::YearMonth => {
318                            let array = column.as_primitive::<IntervalYearMonthType>();
319                            let v: IntervalYearMonth = array.value(i).into();
320                            row_writer.write_col(v.to_iso8601_string())?;
321                        }
322                        IntervalUnit::DayTime => {
323                            let array = column.as_primitive::<IntervalDayTimeType>();
324                            let v: IntervalDayTime = array.value(i).into();
325                            row_writer.write_col(v.to_iso8601_string())?;
326                        }
327                        IntervalUnit::MonthDayNano => {
328                            let array = column.as_primitive::<IntervalMonthDayNanoType>();
329                            let v: IntervalMonthDayNano = array.value(i).into();
330                            row_writer.write_col(v.to_iso8601_string())?;
331                        }
332                    },
333                    DataType::Duration(_) => {
334                        let v: Duration =
335                            datatypes::arrow_array::duration_array_value(column, i).into();
336                        row_writer.write_col(v)?;
337                    }
338                    DataType::List(_) => {
339                        let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
340                        row_writer.write_col(v.to_string())?;
341                    }
342                    DataType::Struct(_) => {
343                        let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
344                        row_writer.write_col(v.to_string())?;
345                    }
346                    DataType::Time32(_) | DataType::Time64(_) => {
347                        let time = datatypes::arrow_array::time_array_value(column, i);
348                        let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
349                        row_writer.write_col(v)?;
350                    }
351                    DataType::Decimal128(precision, scale) => {
352                        let array = column.as_primitive::<Decimal128Type>();
353                        let v = Decimal128::new(array.value(i), *precision, *scale);
354                        row_writer.write_col(v.to_string())?;
355                    }
356                    _ => {
357                        return NotSupportedSnafu {
358                            feat: format!("convert {} to MySQL value", column.data_type()),
359                        }
360                        .fail();
361                    }
362                }
363            }
364            row_writer.end_row().await?;
365        }
366        Ok(())
367    }
368
369    async fn write_query_error(
370        error: impl ErrorExt,
371        w: QueryResultWriter<'a, W>,
372        query_context: QueryContextRef,
373    ) -> Result<()> {
374        METRIC_ERROR_COUNTER
375            .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
376            .inc();
377
378        let (kind, err) = handle_err(error, query_context);
379        debug!("Write query error, kind: {:?}, err: {}", kind, err);
380        w.error(kind, err.as_bytes()).await?;
381        Ok(())
382    }
383}
384
385pub(crate) fn create_mysql_column(
386    data_type: &ConcreteDataType,
387    column_name: &str,
388) -> Result<Column> {
389    let column_type = match data_type {
390        ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
391        ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
392            Ok(ColumnType::MYSQL_TYPE_TINY)
393        }
394        ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
395            Ok(ColumnType::MYSQL_TYPE_SHORT)
396        }
397        ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
398        ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
399            Ok(ColumnType::MYSQL_TYPE_LONGLONG)
400        }
401        ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
402        ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
403        ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
404            Ok(ColumnType::MYSQL_TYPE_VARCHAR)
405        }
406        ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
407        ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
408        ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
409        ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
410        ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
411        ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
412        ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
413        ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
414        ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
415        ConcreteDataType::Struct(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
416        _ => error::UnsupportedDataTypeSnafu {
417            data_type,
418            reason: "not implemented",
419        }
420        .fail(),
421    };
422    let mut colflags = ColumnFlags::empty();
423    match data_type {
424        ConcreteDataType::UInt16(_)
425        | ConcreteDataType::UInt8(_)
426        | ConcreteDataType::UInt32(_)
427        | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
428        _ => {}
429    };
430    column_type.map(|column_type| Column {
431        column: column_name.to_string(),
432        coltype: column_type,
433        // TODO(LFC): Currently "table" and "colflags" are not relevant in MySQL server
434        //   implementation, will revisit them again in the future.
435        table: String::default(),
436        collen: 0, // 0 means "use default".
437        colflags,
438    })
439}
440
441/// Creates MySQL columns definition from our column schema.
442pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
443    schema
444        .column_schemas()
445        .iter()
446        .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
447        .collect()
448}
449
450fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
451    match status_code {
452        StatusCode::Success => ErrorKind::ER_YES,
453        StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
454        StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
455        StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
456        StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
457        StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
458        StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
459            ErrorKind::ER_TABLE_EXISTS_ERROR
460        }
461        StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
462        StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
463        StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
464        StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
465        StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
466        StatusCode::PermissionDenied | StatusCode::AccessDenied => {
467            ErrorKind::ER_ACCESS_DENIED_ERROR
468        }
469        StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
470        StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
471            ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
472        }
473        StatusCode::Unexpected
474        | StatusCode::Internal
475        | StatusCode::IllegalState
476        | StatusCode::PlanQuery
477        | StatusCode::EngineExecuteQuery
478        | StatusCode::RegionNotReady
479        | StatusCode::RegionBusy
480        | StatusCode::TableUnavailable
481        | StatusCode::StorageUnavailable
482        | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
483        StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
484        StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
485        StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
486        StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
487        StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
488        StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
489        StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
490        StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
491        StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
492    }
493}