servers/mysql/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::time::Duration;
16
17use arrow::array::{Array, AsArray};
18use arrow::datatypes::{
19    Date32Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
20    Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, UInt8Type,
21    UInt16Type, UInt32Type, UInt64Type,
22};
23use arrow_schema::{DataType, IntervalUnit};
24use common_decimal::Decimal128;
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_query::{Output, OutputData};
28use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
29use common_telemetry::{debug, error};
30use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
31use datafusion_common::ScalarValue;
32use datatypes::prelude::ConcreteDataType;
33use datatypes::schema::SchemaRef;
34use datatypes::types::jsonb_to_string;
35use futures::StreamExt;
36use opensrv_mysql::{
37    Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
38};
39use session::SessionRef;
40use session::context::QueryContextRef;
41use snafu::prelude::*;
42use tokio::io::AsyncWrite;
43
44use crate::error::{self, ConvertSqlValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result};
45use crate::metrics::*;
46
47/// Try to write multiple output to the writer if possible.
48pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
49    w: QueryResultWriter<'_, W>,
50    query_context: QueryContextRef,
51    session: SessionRef,
52    outputs: Vec<Result<Output>>,
53) -> Result<()> {
54    if let Some(warning) = query_context.warning() {
55        session.add_warning(warning);
56    }
57
58    let mut writer = Some(MysqlResultWriter::new(
59        w,
60        query_context.clone(),
61        session.clone(),
62    ));
63    for output in outputs {
64        let result_writer = writer.take().context(error::InternalSnafu {
65            err_msg: "Sending multiple result set is unsupported",
66        })?;
67        writer = result_writer.try_write_one(output).await?;
68    }
69
70    if let Some(result_writer) = writer {
71        result_writer.finish().await?;
72    }
73    Ok(())
74}
75
76/// Handle GreptimeDB error, convert it to MySQL error
77pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
78    let status_code = e.status_code();
79    let kind = mysql_error_kind(&status_code);
80
81    if status_code.should_log_error() {
82        let root_error = e.root_cause().unwrap_or(&e);
83        error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
84    } else {
85        debug!(
86            "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
87            status_code,
88            query_ctx.get_db_string(),
89            e
90        );
91    };
92    let msg = e.output_msg();
93    // Inline the status code to output message for MySQL
94    let err_msg = format!("({status_code}): {msg}");
95
96    (kind, err_msg)
97}
98
99struct QueryResult {
100    schema: SchemaRef,
101    stream: SendableRecordBatchStream,
102}
103
104pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
105    writer: QueryResultWriter<'a, W>,
106    query_context: QueryContextRef,
107    session: SessionRef,
108}
109
110impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
111    pub fn new(
112        writer: QueryResultWriter<'a, W>,
113        query_context: QueryContextRef,
114        session: SessionRef,
115    ) -> MysqlResultWriter<'a, W> {
116        MysqlResultWriter::<'a, W> {
117            writer,
118            query_context,
119            session,
120        }
121    }
122
123    /// Try to write one result set. If there are more than one result set, return `Some`.
124    pub async fn try_write_one(
125        self,
126        output: Result<Output>,
127    ) -> Result<Option<MysqlResultWriter<'a, W>>> {
128        // We don't support sending multiple query result because the RowWriter's lifetime is bound to
129        // a local variable.
130        match output {
131            Ok(output) => match output.data {
132                OutputData::Stream(stream) => {
133                    let query_result = QueryResult {
134                        schema: stream.schema(),
135                        stream,
136                    };
137                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
138                }
139                OutputData::RecordBatches(recordbatches) => {
140                    let query_result = QueryResult {
141                        schema: recordbatches.schema(),
142                        stream: recordbatches.as_stream(),
143                    };
144                    Self::write_query_result(query_result, self.writer, self.query_context).await?;
145                }
146                OutputData::AffectedRows(rows) => {
147                    let next_writer =
148                        Self::write_affected_rows(self.writer, rows, &self.session).await?;
149                    return Ok(Some(MysqlResultWriter::new(
150                        next_writer,
151                        self.query_context,
152                        self.session,
153                    )));
154                }
155            },
156            Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
157        }
158        Ok(None)
159    }
160
161    /// Indicate no more result set to write. No need to call this if there is only one result set.
162    pub async fn finish(self) -> Result<()> {
163        self.writer.no_more_results().await?;
164        Ok(())
165    }
166
167    async fn write_affected_rows(
168        w: QueryResultWriter<'a, W>,
169        rows: usize,
170        session: &SessionRef,
171    ) -> Result<QueryResultWriter<'a, W>> {
172        let warnings = session.warnings_count() as u16;
173
174        let next_writer = w
175            .complete_one(OkResponse {
176                affected_rows: rows as u64,
177                warnings,
178                ..Default::default()
179            })
180            .await?;
181        Ok(next_writer)
182    }
183
184    async fn write_query_result(
185        mut query_result: QueryResult,
186        writer: QueryResultWriter<'a, W>,
187        query_context: QueryContextRef,
188    ) -> Result<()> {
189        match create_mysql_column_def(&query_result.schema) {
190            Ok(column_def) => {
191                // The RowWriter's lifetime is bound to `column_def` thus we can't use finish_one()
192                // to return a new QueryResultWriter.
193                let mut row_writer = writer.start(&column_def).await?;
194                while let Some(record_batch) = query_result.stream.next().await {
195                    match record_batch {
196                        Ok(record_batch) => {
197                            Self::write_recordbatch(
198                                &mut row_writer,
199                                record_batch,
200                                query_context.clone(),
201                                &query_result.schema,
202                            )
203                            .await?
204                        }
205                        Err(e) => {
206                            let (kind, err) = handle_err(e, query_context);
207                            debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
208                            row_writer.finish_error(kind, &err.as_bytes()).await?;
209
210                            return Ok(());
211                        }
212                    }
213                }
214                row_writer.finish().await?;
215                Ok(())
216            }
217            Err(error) => Self::write_query_error(error, writer, query_context).await,
218        }
219    }
220
221    async fn write_recordbatch(
222        row_writer: &mut RowWriter<'_, W>,
223        record_batch: RecordBatch,
224        query_context: QueryContextRef,
225        schema: &SchemaRef,
226    ) -> Result<()> {
227        let record_batch = record_batch.into_df_record_batch();
228        for i in 0..record_batch.num_rows() {
229            for (j, column) in record_batch.columns().iter().enumerate() {
230                if column.is_null(i) {
231                    row_writer.write_col(None::<u8>)?;
232                    continue;
233                }
234
235                match column.data_type() {
236                    DataType::Null => {
237                        row_writer.write_col(None::<u8>)?;
238                    }
239                    DataType::Boolean => {
240                        let array = column.as_boolean();
241                        row_writer.write_col(array.value(i) as i8)?;
242                    }
243                    DataType::UInt8 => {
244                        let array = column.as_primitive::<UInt8Type>();
245                        row_writer.write_col(array.value(i))?;
246                    }
247                    DataType::UInt16 => {
248                        let array = column.as_primitive::<UInt16Type>();
249                        row_writer.write_col(array.value(i))?;
250                    }
251                    DataType::UInt32 => {
252                        let array = column.as_primitive::<UInt32Type>();
253                        row_writer.write_col(array.value(i))?;
254                    }
255                    DataType::UInt64 => {
256                        let array = column.as_primitive::<UInt64Type>();
257                        row_writer.write_col(array.value(i))?;
258                    }
259                    DataType::Int8 => {
260                        let array = column.as_primitive::<Int8Type>();
261                        row_writer.write_col(array.value(i))?;
262                    }
263                    DataType::Int16 => {
264                        let array = column.as_primitive::<Int16Type>();
265                        row_writer.write_col(array.value(i))?;
266                    }
267                    DataType::Int32 => {
268                        let array = column.as_primitive::<Int32Type>();
269                        row_writer.write_col(array.value(i))?;
270                    }
271                    DataType::Int64 => {
272                        let array = column.as_primitive::<Int64Type>();
273                        row_writer.write_col(array.value(i))?;
274                    }
275                    DataType::Float32 => {
276                        let array = column.as_primitive::<Float32Type>();
277                        row_writer.write_col(array.value(i))?;
278                    }
279                    DataType::Float64 => {
280                        let array = column.as_primitive::<Float64Type>();
281                        row_writer.write_col(array.value(i))?;
282                    }
283                    DataType::Utf8 => {
284                        let array = column.as_string::<i32>();
285                        row_writer.write_col(array.value(i))?;
286                    }
287                    DataType::Utf8View => {
288                        let array = column.as_string_view();
289                        row_writer.write_col(array.value(i))?;
290                    }
291                    DataType::LargeUtf8 => {
292                        let array = column.as_string::<i64>();
293                        row_writer.write_col(array.value(i))?;
294                    }
295                    DataType::Binary => {
296                        let array = column.as_binary::<i32>();
297                        let v = array.value(i);
298                        if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
299                            let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
300                            row_writer.write_col(s)?;
301                        } else {
302                            row_writer.write_col(v)?;
303                        }
304                    }
305                    DataType::BinaryView => {
306                        let array = column.as_binary_view();
307                        let v = array.value(i);
308                        if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
309                            let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
310                            row_writer.write_col(s)?;
311                        } else {
312                            row_writer.write_col(v)?;
313                        }
314                    }
315                    DataType::LargeBinary => {
316                        let array = column.as_binary::<i64>();
317                        let v = array.value(i);
318                        if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
319                            let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
320                            row_writer.write_col(s)?;
321                        } else {
322                            row_writer.write_col(v)?;
323                        }
324                    }
325                    DataType::Date32 => {
326                        let array = column.as_primitive::<Date32Type>();
327                        let v = Date::new(array.value(i));
328                        row_writer.write_col(v.to_chrono_date())?;
329                    }
330                    DataType::Timestamp(_, _) => {
331                        let v = datatypes::arrow_array::timestamp_array_value(column, i);
332                        let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone()));
333                        row_writer.write_col(v)?;
334                    }
335                    DataType::Interval(interval_unit) => match interval_unit {
336                        IntervalUnit::YearMonth => {
337                            let array = column.as_primitive::<IntervalYearMonthType>();
338                            let v: IntervalYearMonth = array.value(i).into();
339                            row_writer.write_col(v.to_iso8601_string())?;
340                        }
341                        IntervalUnit::DayTime => {
342                            let array = column.as_primitive::<IntervalDayTimeType>();
343                            let v: IntervalDayTime = array.value(i).into();
344                            row_writer.write_col(v.to_iso8601_string())?;
345                        }
346                        IntervalUnit::MonthDayNano => {
347                            let array = column.as_primitive::<IntervalMonthDayNanoType>();
348                            let v: IntervalMonthDayNano = array.value(i).into();
349                            row_writer.write_col(v.to_iso8601_string())?;
350                        }
351                    },
352                    DataType::Duration(_) => {
353                        let v: Duration =
354                            datatypes::arrow_array::duration_array_value(column, i).into();
355                        row_writer.write_col(v)?;
356                    }
357                    DataType::List(_) => {
358                        let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
359                        row_writer.write_col(v.to_string())?;
360                    }
361                    DataType::Struct(_) => {
362                        let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
363                        row_writer.write_col(v.to_string())?;
364                    }
365                    DataType::Time32(_) | DataType::Time64(_) => {
366                        let time = datatypes::arrow_array::time_array_value(column, i);
367                        let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
368                        row_writer.write_col(v)?;
369                    }
370                    DataType::Decimal128(precision, scale) => {
371                        let array = column.as_primitive::<Decimal128Type>();
372                        let v = Decimal128::new(array.value(i), *precision, *scale);
373                        row_writer.write_col(v.to_string())?;
374                    }
375                    _ => {
376                        return NotSupportedSnafu {
377                            feat: format!("convert {} to MySQL value", column.data_type()),
378                        }
379                        .fail();
380                    }
381                }
382            }
383            row_writer.end_row().await?;
384        }
385        Ok(())
386    }
387
388    async fn write_query_error(
389        error: impl ErrorExt,
390        w: QueryResultWriter<'a, W>,
391        query_context: QueryContextRef,
392    ) -> Result<()> {
393        METRIC_ERROR_COUNTER
394            .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
395            .inc();
396
397        let (kind, err) = handle_err(error, query_context);
398        debug!("Write query error, kind: {:?}, err: {}", kind, err);
399        w.error(kind, err.as_bytes()).await?;
400        Ok(())
401    }
402}
403
404pub(crate) fn create_mysql_column(
405    data_type: &ConcreteDataType,
406    column_name: &str,
407) -> Result<Column> {
408    let column_type = match data_type {
409        ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
410        ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
411            Ok(ColumnType::MYSQL_TYPE_TINY)
412        }
413        ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
414            Ok(ColumnType::MYSQL_TYPE_SHORT)
415        }
416        ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
417        ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
418            Ok(ColumnType::MYSQL_TYPE_LONGLONG)
419        }
420        ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
421        ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
422        ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
423            Ok(ColumnType::MYSQL_TYPE_VARCHAR)
424        }
425        ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
426        ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
427        ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
428        ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
429        ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
430        ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
431        ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
432        ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
433        ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
434        ConcreteDataType::Struct(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
435        _ => error::UnsupportedDataTypeSnafu {
436            data_type,
437            reason: "not implemented",
438        }
439        .fail(),
440    };
441    let mut colflags = ColumnFlags::empty();
442    match data_type {
443        ConcreteDataType::UInt16(_)
444        | ConcreteDataType::UInt8(_)
445        | ConcreteDataType::UInt32(_)
446        | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
447        _ => {}
448    };
449    column_type.map(|column_type| Column {
450        column: column_name.to_string(),
451        coltype: column_type,
452        // TODO(LFC): Currently "table" and "colflags" are not relevant in MySQL server
453        //   implementation, will revisit them again in the future.
454        table: String::default(),
455        collen: 0, // 0 means "use default".
456        colflags,
457    })
458}
459
460/// Creates MySQL columns definition from our column schema.
461pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
462    schema
463        .column_schemas()
464        .iter()
465        .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
466        .collect()
467}
468
469fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
470    match status_code {
471        StatusCode::Success => ErrorKind::ER_YES,
472        StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
473        StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
474        StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
475        StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
476        StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
477        StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
478            ErrorKind::ER_TABLE_EXISTS_ERROR
479        }
480        StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
481        StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
482        StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
483        StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
484        StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
485        StatusCode::PermissionDenied | StatusCode::AccessDenied => {
486            ErrorKind::ER_ACCESS_DENIED_ERROR
487        }
488        StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
489        StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
490            ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
491        }
492        StatusCode::Unexpected
493        | StatusCode::Internal
494        | StatusCode::IllegalState
495        | StatusCode::PlanQuery
496        | StatusCode::EngineExecuteQuery
497        | StatusCode::RegionNotReady
498        | StatusCode::RegionBusy
499        | StatusCode::TableUnavailable
500        | StatusCode::StorageUnavailable
501        | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
502        StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
503        StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
504        StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
505        StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
506        StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
507        StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
508        StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
509        StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
510        StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
511    }
512}