servers/mysql/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::io;
16use std::time::Duration;
17
18use arrow::array::{Array, AsArray};
19use arrow::datatypes::{
20    Date32Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
21    Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, UInt8Type,
22    UInt16Type, UInt32Type, UInt64Type,
23};
24use arrow_schema::{DataType, IntervalUnit};
25use common_decimal::Decimal128;
26use common_error::ext::ErrorExt;
27use common_error::status_code::StatusCode;
28use common_query::{Output, OutputData};
29use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
30use common_telemetry::{debug, error};
31use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
32use datafusion_common::ScalarValue;
33use datatypes::prelude::ConcreteDataType;
34use datatypes::schema::SchemaRef;
35use datatypes::types::jsonb_to_string;
36use futures::StreamExt;
37use opensrv_mysql::{
38    Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
39};
40use session::SessionRef;
41use session::context::QueryContextRef;
42use snafu::prelude::*;
43use tokio::io::AsyncWrite;
44
45use crate::error::{self, ConvertSqlValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result};
46use crate::metrics::*;
47
48/// Try to write multiple output to the writer if possible.
49pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
50    w: QueryResultWriter<'_, W>,
51    query_context: QueryContextRef,
52    session: SessionRef,
53    outputs: Vec<Result<Output>>,
54) -> Result<()> {
55    if let Some(warning) = query_context.warning() {
56        session.add_warning(warning);
57    }
58
59    let mut writer = Some(MysqlResultWriter::new(
60        w,
61        query_context.clone(),
62        session.clone(),
63    ));
64    for output in outputs {
65        let result_writer = writer.take().context(error::InternalSnafu {
66            err_msg: "Sending multiple result set is unsupported",
67        })?;
68        writer = result_writer.try_write_one(output).await?;
69    }
70
71    if let Some(result_writer) = writer {
72        result_writer.finish().await?;
73    }
74    Ok(())
75}
76
77/// Handle GreptimeDB error, convert it to MySQL error
78pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
79    let status_code = e.status_code();
80    let kind = mysql_error_kind(&status_code);
81
82    if status_code.should_log_error() {
83        let root_error = e.root_cause().unwrap_or(&e);
84        error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
85    } else {
86        debug!(
87            "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
88            status_code,
89            query_ctx.get_db_string(),
90            e
91        );
92    };
93    let msg = e.output_msg();
94    // Inline the status code to output message for MySQL
95    let err_msg = format!("({status_code}): {msg}");
96
97    (kind, err_msg)
98}
99
100struct QueryResult {
101    schema: SchemaRef,
102    stream: SendableRecordBatchStream,
103}
104
105pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
106    writer: QueryResultWriter<'a, W>,
107    query_context: QueryContextRef,
108    session: SessionRef,
109}
110
111impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
112    pub fn new(
113        writer: QueryResultWriter<'a, W>,
114        query_context: QueryContextRef,
115        session: SessionRef,
116    ) -> MysqlResultWriter<'a, W> {
117        MysqlResultWriter::<'a, W> {
118            writer,
119            query_context,
120            session,
121        }
122    }
123
124    /// Try to write one result set. If there are more than one result set, return `Some`.
125    pub async fn try_write_one(
126        self,
127        output: Result<Output>,
128    ) -> io::Result<Option<MysqlResultWriter<'a, W>>> {
129        // We don't support sending multiple query result because the RowWriter's lifetime is bound to
130        // a local variable.
131        match output {
132            Ok(output) => match output.data {
133                OutputData::Stream(stream) => {
134                    let query_result = QueryResult {
135                        schema: stream.schema(),
136                        stream,
137                    };
138                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
139                }
140                OutputData::RecordBatches(recordbatches) => {
141                    let query_result = QueryResult {
142                        schema: recordbatches.schema(),
143                        stream: recordbatches.as_stream(),
144                    };
145                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
146                }
147                OutputData::AffectedRows(rows) => {
148                    let next_writer =
149                        Self::write_affected_rows(self.writer, rows, &self.session).await?;
150                    return Ok(Some(MysqlResultWriter::new(
151                        next_writer,
152                        self.query_context,
153                        self.session,
154                    )));
155                }
156            },
157            Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
158        }
159        Ok(None)
160    }
161
162    /// Indicate no more result set to write. No need to call this if there is only one result set.
163    pub async fn finish(self) -> Result<()> {
164        self.writer.no_more_results().await?;
165        Ok(())
166    }
167
168    async fn write_affected_rows(
169        w: QueryResultWriter<'a, W>,
170        rows: usize,
171        session: &SessionRef,
172    ) -> io::Result<QueryResultWriter<'a, W>> {
173        let warnings = session.warnings_count() as u16;
174
175        let next_writer = w
176            .complete_one(OkResponse {
177                affected_rows: rows as u64,
178                warnings,
179                ..Default::default()
180            })
181            .await?;
182        Ok(next_writer)
183    }
184
185    async fn write_query_result(
186        mut query_result: QueryResult,
187        writer: QueryResultWriter<'a, W>,
188        query_context: QueryContextRef,
189    ) -> io::Result<()> {
190        match create_mysql_column_def(&query_result.schema) {
191            Ok(column_def) => {
192                // The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one()
193                // to return a new QueryResultWriter.
194                let mut row_writer = writer.start(&column_def).await?;
195                while let Some(record_batch) = query_result.stream.next().await {
196                    match record_batch {
197                        Ok(record_batch) => {
198                            if let Err(e) = Self::write_recordbatch(
199                                &mut row_writer,
200                                record_batch,
201                                query_context.clone(),
202                                &query_result.schema,
203                            )
204                            .await
205                            {
206                                let (kind, err) = handle_err(e, query_context);
207                                row_writer.finish_error(kind, &err.as_bytes()).await?;
208                                return Ok(());
209                            }
210                        }
211                        Err(e) => {
212                            let (kind, err) = handle_err(e, query_context);
213                            debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
214                            row_writer.finish_error(kind, &err.as_bytes()).await?;
215
216                            return Ok(());
217                        }
218                    }
219                }
220                row_writer.finish().await?;
221                Ok(())
222            }
223            Err(error) => Self::write_query_error(error, writer, query_context).await,
224        }
225    }
226
227    async fn write_recordbatch(
228        row_writer: &mut RowWriter<'_, W>,
229        record_batch: RecordBatch,
230        query_context: QueryContextRef,
231        schema: &SchemaRef,
232    ) -> Result<()> {
233        let record_batch = record_batch.into_df_record_batch();
234        for i in 0..record_batch.num_rows() {
235            for (j, column) in record_batch.columns().iter().enumerate() {
236                if column.is_null(i) {
237                    row_writer.write_col(None::<u8>)?;
238                    continue;
239                }
240
241                match column.data_type() {
242                    DataType::Null => {
243                        row_writer.write_col(None::<u8>)?;
244                    }
245                    DataType::Boolean => {
246                        let array = column.as_boolean();
247                        row_writer.write_col(array.value(i) as i8)?;
248                    }
249                    DataType::UInt8 => {
250                        let array = column.as_primitive::<UInt8Type>();
251                        row_writer.write_col(array.value(i))?;
252                    }
253                    DataType::UInt16 => {
254                        let array = column.as_primitive::<UInt16Type>();
255                        row_writer.write_col(array.value(i))?;
256                    }
257                    DataType::UInt32 => {
258                        let array = column.as_primitive::<UInt32Type>();
259                        row_writer.write_col(array.value(i))?;
260                    }
261                    DataType::UInt64 => {
262                        let array = column.as_primitive::<UInt64Type>();
263                        row_writer.write_col(array.value(i))?;
264                    }
265                    DataType::Int8 => {
266                        let array = column.as_primitive::<Int8Type>();
267                        row_writer.write_col(array.value(i))?;
268                    }
269                    DataType::Int16 => {
270                        let array = column.as_primitive::<Int16Type>();
271                        row_writer.write_col(array.value(i))?;
272                    }
273                    DataType::Int32 => {
274                        let array = column.as_primitive::<Int32Type>();
275                        row_writer.write_col(array.value(i))?;
276                    }
277                    DataType::Int64 => {
278                        let array = column.as_primitive::<Int64Type>();
279                        row_writer.write_col(array.value(i))?;
280                    }
281                    DataType::Float32 => {
282                        let array = column.as_primitive::<Float32Type>();
283                        row_writer.write_col(array.value(i))?;
284                    }
285                    DataType::Float64 => {
286                        let array = column.as_primitive::<Float64Type>();
287                        row_writer.write_col(array.value(i))?;
288                    }
289                    DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8 => {
290                        let v = datatypes::arrow_array::string_array_value(column, i);
291                        row_writer.write_col(v)?;
292                    }
293                    DataType::Binary | DataType::BinaryView | DataType::LargeBinary => {
294                        let v = datatypes::arrow_array::binary_array_value(column, i);
295                        if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
296                            let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
297                            row_writer.write_col(s)?;
298                        } else {
299                            row_writer.write_col(v)?;
300                        }
301                    }
302                    DataType::Date32 => {
303                        let array = column.as_primitive::<Date32Type>();
304                        let v = Date::new(array.value(i));
305                        row_writer.write_col(v.to_chrono_date())?;
306                    }
307                    DataType::Timestamp(_, _) => {
308                        let v = datatypes::arrow_array::timestamp_array_value(column, i);
309                        let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone()));
310                        row_writer.write_col(v)?;
311                    }
312                    DataType::Interval(interval_unit) => match interval_unit {
313                        IntervalUnit::YearMonth => {
314                            let array = column.as_primitive::<IntervalYearMonthType>();
315                            let v: IntervalYearMonth = array.value(i).into();
316                            row_writer.write_col(v.to_iso8601_string())?;
317                        }
318                        IntervalUnit::DayTime => {
319                            let array = column.as_primitive::<IntervalDayTimeType>();
320                            let v: IntervalDayTime = array.value(i).into();
321                            row_writer.write_col(v.to_iso8601_string())?;
322                        }
323                        IntervalUnit::MonthDayNano => {
324                            let array = column.as_primitive::<IntervalMonthDayNanoType>();
325                            let v: IntervalMonthDayNano = array.value(i).into();
326                            row_writer.write_col(v.to_iso8601_string())?;
327                        }
328                    },
329                    DataType::Duration(_) => {
330                        let v: Duration =
331                            datatypes::arrow_array::duration_array_value(column, i).into();
332                        row_writer.write_col(v)?;
333                    }
334                    DataType::List(_) | DataType::Struct(_) => {
335                        let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
336                        row_writer.write_col(v.to_string())?;
337                    }
338                    DataType::Time32(_) | DataType::Time64(_) => {
339                        let time = datatypes::arrow_array::time_array_value(column, i);
340                        let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
341                        row_writer.write_col(v)?;
342                    }
343                    DataType::Decimal128(precision, scale) => {
344                        let array = column.as_primitive::<Decimal128Type>();
345                        let v = Decimal128::new(array.value(i), *precision, *scale);
346                        row_writer.write_col(v.to_string())?;
347                    }
348                    _ => {
349                        return NotSupportedSnafu {
350                            feat: format!("convert {} to MySQL value", column.data_type()),
351                        }
352                        .fail();
353                    }
354                }
355            }
356            row_writer.end_row().await?;
357        }
358        Ok(())
359    }
360
361    async fn write_query_error(
362        error: impl ErrorExt,
363        w: QueryResultWriter<'a, W>,
364        query_context: QueryContextRef,
365    ) -> io::Result<()> {
366        METRIC_ERROR_COUNTER
367            .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
368            .inc();
369
370        let (kind, err) = handle_err(error, query_context);
371        debug!("Write query error, kind: {:?}, err: {}", kind, err);
372        w.error(kind, err.as_bytes()).await?;
373        Ok(())
374    }
375}
376
377pub(crate) fn create_mysql_column(
378    data_type: &ConcreteDataType,
379    column_name: &str,
380) -> Result<Column> {
381    let column_type = match data_type {
382        ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
383        ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
384            Ok(ColumnType::MYSQL_TYPE_TINY)
385        }
386        ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
387            Ok(ColumnType::MYSQL_TYPE_SHORT)
388        }
389        ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
390        ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
391            Ok(ColumnType::MYSQL_TYPE_LONGLONG)
392        }
393        ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
394        ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
395        ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
396            Ok(ColumnType::MYSQL_TYPE_VARCHAR)
397        }
398        ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
399        ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
400        ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
401        ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
402        ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
403        ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
404        ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
405        ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
406        ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
407        ConcreteDataType::Struct(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
408        _ => error::UnsupportedDataTypeSnafu {
409            data_type,
410            reason: "not implemented",
411        }
412        .fail(),
413    };
414    let mut colflags = ColumnFlags::empty();
415    match data_type {
416        ConcreteDataType::UInt16(_)
417        | ConcreteDataType::UInt8(_)
418        | ConcreteDataType::UInt32(_)
419        | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
420        _ => {}
421    };
422    column_type.map(|column_type| Column {
423        column: column_name.to_string(),
424        coltype: column_type,
425        // TODO(LFC): Currently "table" and "colflags" are not relevant in MySQL server
426        //   implementation, will revisit them again in the future.
427        table: String::default(),
428        collen: 0, // 0 means "use default".
429        colflags,
430    })
431}
432
433/// Creates MySQL columns definition from our column schema.
434pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
435    schema
436        .column_schemas()
437        .iter()
438        .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
439        .collect()
440}
441
442fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
443    match status_code {
444        StatusCode::Success => ErrorKind::ER_YES,
445        StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
446        StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
447        StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
448        StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
449        StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
450        StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
451            ErrorKind::ER_TABLE_EXISTS_ERROR
452        }
453        StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
454        StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
455        StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
456        StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
457        StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
458        StatusCode::PermissionDenied | StatusCode::AccessDenied => {
459            ErrorKind::ER_ACCESS_DENIED_ERROR
460        }
461        StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
462        StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
463            ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
464        }
465        StatusCode::Unexpected
466        | StatusCode::Internal
467        | StatusCode::IllegalState
468        | StatusCode::PlanQuery
469        | StatusCode::EngineExecuteQuery
470        | StatusCode::RegionNotReady
471        | StatusCode::RegionBusy
472        | StatusCode::TableUnavailable
473        | StatusCode::StorageUnavailable
474        | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
475        StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
476        StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
477        StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
478        StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
479        StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
480        StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
481        StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
482        StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
483        StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
484        StatusCode::Suspended => ErrorKind::ER_SERVER_SHUTDOWN,
485    }
486}