servers/mysql/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use arrow::array::{Array, AsArray};
18use arrow::datatypes::{
19    Date32Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType,
20    DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type,
21    Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
22    Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
23    TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
24    TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
25};
26use arrow_schema::{DataType, IntervalUnit, TimeUnit};
27use common_decimal::Decimal128;
28use common_error::ext::ErrorExt;
29use common_error::status_code::StatusCode;
30use common_query::{Output, OutputData};
31use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
32use common_telemetry::{debug, error};
33use common_time::time::Time;
34use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
35use datafusion_common::ScalarValue;
36use datatypes::prelude::ConcreteDataType;
37use datatypes::schema::SchemaRef;
38use datatypes::types::jsonb_to_string;
39use futures::StreamExt;
40use opensrv_mysql::{
41    Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
42};
43use session::context::QueryContextRef;
44use snafu::prelude::*;
45use tokio::io::AsyncWrite;
46
47use crate::error::{self, ConvertSqlValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result};
48use crate::metrics::*;
49
50/// Try to write multiple output to the writer if possible.
51pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
52    w: QueryResultWriter<'_, W>,
53    query_context: QueryContextRef,
54    outputs: Vec<Result<Output>>,
55) -> Result<()> {
56    let mut writer = Some(MysqlResultWriter::new(w, query_context.clone()));
57    for output in outputs {
58        let result_writer = writer.take().context(error::InternalSnafu {
59            err_msg: "Sending multiple result set is unsupported",
60        })?;
61        writer = result_writer.try_write_one(output).await?;
62    }
63
64    if let Some(result_writer) = writer {
65        result_writer.finish().await?;
66    }
67    Ok(())
68}
69
70/// Handle GreptimeDB error, convert it to MySQL error
71pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
72    let status_code = e.status_code();
73    let kind = mysql_error_kind(&status_code);
74
75    if status_code.should_log_error() {
76        let root_error = e.root_cause().unwrap_or(&e);
77        error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
78    } else {
79        debug!(
80            "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
81            status_code,
82            query_ctx.get_db_string(),
83            e
84        );
85    };
86    let msg = e.output_msg();
87    // Inline the status code to output message for MySQL
88    let err_msg = format!("({status_code}): {msg}");
89
90    (kind, err_msg)
91}
92
93struct QueryResult {
94    schema: SchemaRef,
95    stream: SendableRecordBatchStream,
96}
97
98pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
99    writer: QueryResultWriter<'a, W>,
100    query_context: QueryContextRef,
101}
102
103impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
104    pub fn new(
105        writer: QueryResultWriter<'a, W>,
106        query_context: QueryContextRef,
107    ) -> MysqlResultWriter<'a, W> {
108        MysqlResultWriter::<'a, W> {
109            writer,
110            query_context,
111        }
112    }
113
114    /// Try to write one result set. If there are more than one result set, return `Some`.
115    pub async fn try_write_one(
116        self,
117        output: Result<Output>,
118    ) -> Result<Option<MysqlResultWriter<'a, W>>> {
119        // We don't support sending multiple query result because the RowWriter's lifetime is bound to
120        // a local variable.
121        match output {
122            Ok(output) => match output.data {
123                OutputData::Stream(stream) => {
124                    let query_result = QueryResult {
125                        schema: stream.schema(),
126                        stream,
127                    };
128                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
129                }
130                OutputData::RecordBatches(recordbatches) => {
131                    let query_result = QueryResult {
132                        schema: recordbatches.schema(),
133                        stream: recordbatches.as_stream(),
134                    };
135                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
136                }
137                OutputData::AffectedRows(rows) => {
138                    let next_writer = Self::write_affected_rows(self.writer, rows).await?;
139                    return Ok(Some(MysqlResultWriter::new(
140                        next_writer,
141                        self.query_context,
142                    )));
143                }
144            },
145            Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
146        }
147        Ok(None)
148    }
149
150    /// Indicate no more result set to write. No need to call this if there is only one result set.
151    pub async fn finish(self) -> Result<()> {
152        self.writer.no_more_results().await?;
153        Ok(())
154    }
155
156    async fn write_affected_rows(
157        w: QueryResultWriter<'a, W>,
158        rows: usize,
159    ) -> Result<QueryResultWriter<'a, W>> {
160        let next_writer = w
161            .complete_one(OkResponse {
162                affected_rows: rows as u64,
163                ..Default::default()
164            })
165            .await?;
166        Ok(next_writer)
167    }
168
169    async fn write_query_result(
170        mut query_result: QueryResult,
171        writer: QueryResultWriter<'a, W>,
172        query_context: QueryContextRef,
173    ) -> Result<()> {
174        match create_mysql_column_def(&query_result.schema) {
175            Ok(column_def) => {
176                // The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one()
177                // to return a new QueryResultWriter.
178                let mut row_writer = writer.start(&column_def).await?;
179                while let Some(record_batch) = query_result.stream.next().await {
180                    match record_batch {
181                        Ok(record_batch) => {
182                            Self::write_recordbatch(
183                                &mut row_writer,
184                                record_batch,
185                                query_context.clone(),
186                                &query_result.schema,
187                            )
188                            .await?
189                        }
190                        Err(e) => {
191                            let (kind, err) = handle_err(e, query_context);
192                            debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
193                            row_writer.finish_error(kind, &err.as_bytes()).await?;
194
195                            return Ok(());
196                        }
197                    }
198                }
199                row_writer.finish().await?;
200                Ok(())
201            }
202            Err(error) => Self::write_query_error(error, writer, query_context).await,
203        }
204    }
205
206    async fn write_recordbatch(
207        row_writer: &mut RowWriter<'_, W>,
208        record_batch: RecordBatch,
209        query_context: QueryContextRef,
210        schema: &SchemaRef,
211    ) -> Result<()> {
212        let record_batch = record_batch.into_df_record_batch();
213        for i in 0..record_batch.num_rows() {
214            for (j, column) in record_batch.columns().iter().enumerate() {
215                if column.is_null(i) {
216                    row_writer.write_col(None::<u8>)?;
217                    continue;
218                }
219
220                match column.data_type() {
221                    DataType::Null => {
222                        row_writer.write_col(None::<u8>)?;
223                    }
224                    DataType::Boolean => {
225                        let array = column.as_boolean();
226                        row_writer.write_col(array.value(i) as i8)?;
227                    }
228                    DataType::UInt8 => {
229                        let array = column.as_primitive::<UInt8Type>();
230                        row_writer.write_col(array.value(i))?;
231                    }
232                    DataType::UInt16 => {
233                        let array = column.as_primitive::<UInt16Type>();
234                        row_writer.write_col(array.value(i))?;
235                    }
236                    DataType::UInt32 => {
237                        let array = column.as_primitive::<UInt32Type>();
238                        row_writer.write_col(array.value(i))?;
239                    }
240                    DataType::UInt64 => {
241                        let array = column.as_primitive::<UInt64Type>();
242                        row_writer.write_col(array.value(i))?;
243                    }
244                    DataType::Int8 => {
245                        let array = column.as_primitive::<Int8Type>();
246                        row_writer.write_col(array.value(i))?;
247                    }
248                    DataType::Int16 => {
249                        let array = column.as_primitive::<Int16Type>();
250                        row_writer.write_col(array.value(i))?;
251                    }
252                    DataType::Int32 => {
253                        let array = column.as_primitive::<Int32Type>();
254                        row_writer.write_col(array.value(i))?;
255                    }
256                    DataType::Int64 => {
257                        let array = column.as_primitive::<Int64Type>();
258                        row_writer.write_col(array.value(i))?;
259                    }
260                    DataType::Float32 => {
261                        let array = column.as_primitive::<Float32Type>();
262                        row_writer.write_col(array.value(i))?;
263                    }
264                    DataType::Float64 => {
265                        let array = column.as_primitive::<Float64Type>();
266                        row_writer.write_col(array.value(i))?;
267                    }
268                    DataType::Utf8 => {
269                        let array = column.as_string::<i32>();
270                        row_writer.write_col(array.value(i))?;
271                    }
272                    DataType::Utf8View => {
273                        let array = column.as_string_view();
274                        row_writer.write_col(array.value(i))?;
275                    }
276                    DataType::LargeUtf8 => {
277                        let array = column.as_string::<i64>();
278                        row_writer.write_col(array.value(i))?;
279                    }
280                    DataType::Binary => {
281                        let array = column.as_binary::<i32>();
282                        let v = array.value(i);
283                        if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
284                            let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
285                            row_writer.write_col(s)?;
286                        } else {
287                            row_writer.write_col(v)?;
288                        }
289                    }
290                    DataType::BinaryView => {
291                        let array = column.as_binary_view();
292                        let v = array.value(i);
293                        if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
294                            let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
295                            row_writer.write_col(s)?;
296                        } else {
297                            row_writer.write_col(v)?;
298                        }
299                    }
300                    DataType::LargeBinary => {
301                        let array = column.as_binary::<i64>();
302                        let v = array.value(i);
303                        if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
304                            let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
305                            row_writer.write_col(s)?;
306                        } else {
307                            row_writer.write_col(v)?;
308                        }
309                    }
310                    DataType::Date32 => {
311                        let array = column.as_primitive::<Date32Type>();
312                        let v = Date::new(array.value(i));
313                        row_writer.write_col(v.to_chrono_date())?;
314                    }
315                    DataType::Timestamp(time_unit, _) => {
316                        let v = match time_unit {
317                            TimeUnit::Second => {
318                                let array = column.as_primitive::<TimestampSecondType>();
319                                array.value(i)
320                            }
321                            TimeUnit::Millisecond => {
322                                let array = column.as_primitive::<TimestampMillisecondType>();
323                                array.value(i)
324                            }
325                            TimeUnit::Microsecond => {
326                                let array = column.as_primitive::<TimestampMicrosecondType>();
327                                array.value(i)
328                            }
329                            TimeUnit::Nanosecond => {
330                                let array = column.as_primitive::<TimestampNanosecondType>();
331                                array.value(i)
332                            }
333                        };
334                        let v = Timestamp::new(v, time_unit.into());
335                        let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone()));
336                        row_writer.write_col(v)?;
337                    }
338                    DataType::Interval(interval_unit) => match interval_unit {
339                        IntervalUnit::YearMonth => {
340                            let array = column.as_primitive::<IntervalYearMonthType>();
341                            let v: IntervalYearMonth = array.value(i).into();
342                            row_writer.write_col(v.to_iso8601_string())?;
343                        }
344                        IntervalUnit::DayTime => {
345                            let array = column.as_primitive::<IntervalDayTimeType>();
346                            let v: IntervalDayTime = array.value(i).into();
347                            row_writer.write_col(v.to_iso8601_string())?;
348                        }
349                        IntervalUnit::MonthDayNano => {
350                            let array = column.as_primitive::<IntervalMonthDayNanoType>();
351                            let v: IntervalMonthDayNano = array.value(i).into();
352                            row_writer.write_col(v.to_iso8601_string())?;
353                        }
354                    },
355                    DataType::Duration(time_unit) => match time_unit {
356                        TimeUnit::Second => {
357                            let array = column.as_primitive::<DurationSecondType>();
358                            let v = array.value(i);
359                            row_writer.write_col(Duration::from_secs(v as u64))?;
360                        }
361                        TimeUnit::Millisecond => {
362                            let array = column.as_primitive::<DurationMillisecondType>();
363                            let v = array.value(i);
364                            row_writer.write_col(Duration::from_millis(v as u64))?;
365                        }
366                        TimeUnit::Microsecond => {
367                            let array = column.as_primitive::<DurationMicrosecondType>();
368                            let v = array.value(i);
369                            row_writer.write_col(Duration::from_micros(v as u64))?;
370                        }
371                        TimeUnit::Nanosecond => {
372                            let array = column.as_primitive::<DurationNanosecondType>();
373                            let v = array.value(i);
374                            row_writer.write_col(Duration::from_nanos(v as u64))?;
375                        }
376                    },
377                    DataType::List(_) => {
378                        let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
379                        row_writer.write_col(v.to_string())?;
380                    }
381                    DataType::Struct(_) => {
382                        let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
383                        row_writer.write_col(v.to_string())?;
384                    }
385                    DataType::Time32(time_unit) => {
386                        let time = match time_unit {
387                            TimeUnit::Second => {
388                                let array = column.as_primitive::<Time32SecondType>();
389                                Time::new_second(array.value(i) as i64)
390                            }
391                            TimeUnit::Millisecond => {
392                                let array = column.as_primitive::<Time32MillisecondType>();
393                                Time::new_millisecond(array.value(i) as i64)
394                            }
395                            _ => unreachable!(
396                                "`DataType::Time32` has only second and millisecond time units"
397                            ),
398                        };
399                        let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
400                        row_writer.write_col(v)?;
401                    }
402                    DataType::Time64(time_unit) => {
403                        let time = match time_unit {
404                            TimeUnit::Microsecond => {
405                                let array = column.as_primitive::<Time64MicrosecondType>();
406                                Time::new_microsecond(array.value(i))
407                            }
408                            TimeUnit::Nanosecond => {
409                                let array = column.as_primitive::<Time64NanosecondType>();
410                                Time::new_nanosecond(array.value(i))
411                            }
412                            _ => unreachable!(
413                                "`DataType::Time64` has only microsecond and nanosecond time units"
414                            ),
415                        };
416                        let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
417                        row_writer.write_col(v)?;
418                    }
419                    DataType::Decimal128(precision, scale) => {
420                        let array = column.as_primitive::<Decimal128Type>();
421                        let v = Decimal128::new(array.value(i), *precision, *scale);
422                        row_writer.write_col(v.to_string())?;
423                    }
424                    _ => {
425                        return NotSupportedSnafu {
426                            feat: format!("convert {} to MySQL value", column.data_type()),
427                        }
428                        .fail();
429                    }
430                }
431            }
432            row_writer.end_row().await?;
433        }
434        Ok(())
435    }
436
437    async fn write_query_error(
438        error: impl ErrorExt,
439        w: QueryResultWriter<'a, W>,
440        query_context: QueryContextRef,
441    ) -> Result<()> {
442        METRIC_ERROR_COUNTER
443            .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
444            .inc();
445
446        let (kind, err) = handle_err(error, query_context);
447        debug!("Write query error, kind: {:?}, err: {}", kind, err);
448        w.error(kind, err.as_bytes()).await?;
449        Ok(())
450    }
451}
452
453pub(crate) fn create_mysql_column(
454    data_type: &ConcreteDataType,
455    column_name: &str,
456) -> Result<Column> {
457    let column_type = match data_type {
458        ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
459        ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
460            Ok(ColumnType::MYSQL_TYPE_TINY)
461        }
462        ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
463            Ok(ColumnType::MYSQL_TYPE_SHORT)
464        }
465        ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
466        ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
467            Ok(ColumnType::MYSQL_TYPE_LONGLONG)
468        }
469        ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
470        ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
471        ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
472            Ok(ColumnType::MYSQL_TYPE_VARCHAR)
473        }
474        ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
475        ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
476        ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
477        ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
478        ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
479        ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
480        ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
481        ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
482        ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
483        ConcreteDataType::Struct(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
484        _ => error::UnsupportedDataTypeSnafu {
485            data_type,
486            reason: "not implemented",
487        }
488        .fail(),
489    };
490    let mut colflags = ColumnFlags::empty();
491    match data_type {
492        ConcreteDataType::UInt16(_)
493        | ConcreteDataType::UInt8(_)
494        | ConcreteDataType::UInt32(_)
495        | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
496        _ => {}
497    };
498    column_type.map(|column_type| Column {
499        column: column_name.to_string(),
500        coltype: column_type,
501
502        // TODO(LFC): Currently "table" and "colflags" are not relevant in MySQL server
503        //   implementation, will revisit them again in the future.
504        table: String::default(),
505        colflags,
506    })
507}
508
509/// Creates MySQL columns definition from our column schema.
510pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
511    schema
512        .column_schemas()
513        .iter()
514        .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
515        .collect()
516}
517
518fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
519    match status_code {
520        StatusCode::Success => ErrorKind::ER_YES,
521        StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
522        StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
523        StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
524        StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
525        StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
526        StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
527            ErrorKind::ER_TABLE_EXISTS_ERROR
528        }
529        StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
530        StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
531        StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
532        StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
533        StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
534        StatusCode::PermissionDenied | StatusCode::AccessDenied => {
535            ErrorKind::ER_ACCESS_DENIED_ERROR
536        }
537        StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
538        StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
539            ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
540        }
541        StatusCode::Unexpected
542        | StatusCode::Internal
543        | StatusCode::IllegalState
544        | StatusCode::PlanQuery
545        | StatusCode::EngineExecuteQuery
546        | StatusCode::RegionNotReady
547        | StatusCode::RegionBusy
548        | StatusCode::TableUnavailable
549        | StatusCode::StorageUnavailable
550        | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
551        StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
552        StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
553        StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
554        StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
555        StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
556        StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
557        StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
558        StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
559        StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
560    }
561}