1use std::time::Duration;
16
17use arrow::array::{Array, AsArray};
18use arrow::datatypes::{
19 Date32Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
20 Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, UInt8Type,
21 UInt16Type, UInt32Type, UInt64Type,
22};
23use arrow_schema::{DataType, IntervalUnit};
24use common_decimal::Decimal128;
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_query::{Output, OutputData};
28use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
29use common_telemetry::{debug, error};
30use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
31use datafusion_common::ScalarValue;
32use datatypes::prelude::ConcreteDataType;
33use datatypes::schema::SchemaRef;
34use datatypes::types::jsonb_to_string;
35use futures::StreamExt;
36use opensrv_mysql::{
37 Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
38};
39use session::context::QueryContextRef;
40use snafu::prelude::*;
41use tokio::io::AsyncWrite;
42
43use crate::error::{self, ConvertSqlValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result};
44use crate::metrics::*;
45
46pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
48 w: QueryResultWriter<'_, W>,
49 query_context: QueryContextRef,
50 outputs: Vec<Result<Output>>,
51) -> Result<()> {
52 let mut writer = Some(MysqlResultWriter::new(w, query_context.clone()));
53 for output in outputs {
54 let result_writer = writer.take().context(error::InternalSnafu {
55 err_msg: "Sending multiple result set is unsupported",
56 })?;
57 writer = result_writer.try_write_one(output).await?;
58 }
59
60 if let Some(result_writer) = writer {
61 result_writer.finish().await?;
62 }
63 Ok(())
64}
65
66pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
68 let status_code = e.status_code();
69 let kind = mysql_error_kind(&status_code);
70
71 if status_code.should_log_error() {
72 let root_error = e.root_cause().unwrap_or(&e);
73 error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
74 } else {
75 debug!(
76 "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
77 status_code,
78 query_ctx.get_db_string(),
79 e
80 );
81 };
82 let msg = e.output_msg();
83 let err_msg = format!("({status_code}): {msg}");
85
86 (kind, err_msg)
87}
88
89struct QueryResult {
90 schema: SchemaRef,
91 stream: SendableRecordBatchStream,
92}
93
94pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
95 writer: QueryResultWriter<'a, W>,
96 query_context: QueryContextRef,
97}
98
99impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
100 pub fn new(
101 writer: QueryResultWriter<'a, W>,
102 query_context: QueryContextRef,
103 ) -> MysqlResultWriter<'a, W> {
104 MysqlResultWriter::<'a, W> {
105 writer,
106 query_context,
107 }
108 }
109
110 pub async fn try_write_one(
112 self,
113 output: Result<Output>,
114 ) -> Result<Option<MysqlResultWriter<'a, W>>> {
115 match output {
118 Ok(output) => match output.data {
119 OutputData::Stream(stream) => {
120 let query_result = QueryResult {
121 schema: stream.schema(),
122 stream,
123 };
124 Self::write_query_result(query_result, self.writer, self.query_context).await?;
125 }
126 OutputData::RecordBatches(recordbatches) => {
127 let query_result = QueryResult {
128 schema: recordbatches.schema(),
129 stream: recordbatches.as_stream(),
130 };
131 Self::write_query_result(query_result, self.writer, self.query_context).await?;
132 }
133 OutputData::AffectedRows(rows) => {
134 let next_writer = Self::write_affected_rows(self.writer, rows).await?;
135 return Ok(Some(MysqlResultWriter::new(
136 next_writer,
137 self.query_context,
138 )));
139 }
140 },
141 Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
142 }
143 Ok(None)
144 }
145
146 pub async fn finish(self) -> Result<()> {
148 self.writer.no_more_results().await?;
149 Ok(())
150 }
151
152 async fn write_affected_rows(
153 w: QueryResultWriter<'a, W>,
154 rows: usize,
155 ) -> Result<QueryResultWriter<'a, W>> {
156 let next_writer = w
157 .complete_one(OkResponse {
158 affected_rows: rows as u64,
159 ..Default::default()
160 })
161 .await?;
162 Ok(next_writer)
163 }
164
165 async fn write_query_result(
166 mut query_result: QueryResult,
167 writer: QueryResultWriter<'a, W>,
168 query_context: QueryContextRef,
169 ) -> Result<()> {
170 match create_mysql_column_def(&query_result.schema) {
171 Ok(column_def) => {
172 let mut row_writer = writer.start(&column_def).await?;
175 while let Some(record_batch) = query_result.stream.next().await {
176 match record_batch {
177 Ok(record_batch) => {
178 Self::write_recordbatch(
179 &mut row_writer,
180 record_batch,
181 query_context.clone(),
182 &query_result.schema,
183 )
184 .await?
185 }
186 Err(e) => {
187 let (kind, err) = handle_err(e, query_context);
188 debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
189 row_writer.finish_error(kind, &err.as_bytes()).await?;
190
191 return Ok(());
192 }
193 }
194 }
195 row_writer.finish().await?;
196 Ok(())
197 }
198 Err(error) => Self::write_query_error(error, writer, query_context).await,
199 }
200 }
201
202 async fn write_recordbatch(
203 row_writer: &mut RowWriter<'_, W>,
204 record_batch: RecordBatch,
205 query_context: QueryContextRef,
206 schema: &SchemaRef,
207 ) -> Result<()> {
208 let record_batch = record_batch.into_df_record_batch();
209 for i in 0..record_batch.num_rows() {
210 for (j, column) in record_batch.columns().iter().enumerate() {
211 if column.is_null(i) {
212 row_writer.write_col(None::<u8>)?;
213 continue;
214 }
215
216 match column.data_type() {
217 DataType::Null => {
218 row_writer.write_col(None::<u8>)?;
219 }
220 DataType::Boolean => {
221 let array = column.as_boolean();
222 row_writer.write_col(array.value(i) as i8)?;
223 }
224 DataType::UInt8 => {
225 let array = column.as_primitive::<UInt8Type>();
226 row_writer.write_col(array.value(i))?;
227 }
228 DataType::UInt16 => {
229 let array = column.as_primitive::<UInt16Type>();
230 row_writer.write_col(array.value(i))?;
231 }
232 DataType::UInt32 => {
233 let array = column.as_primitive::<UInt32Type>();
234 row_writer.write_col(array.value(i))?;
235 }
236 DataType::UInt64 => {
237 let array = column.as_primitive::<UInt64Type>();
238 row_writer.write_col(array.value(i))?;
239 }
240 DataType::Int8 => {
241 let array = column.as_primitive::<Int8Type>();
242 row_writer.write_col(array.value(i))?;
243 }
244 DataType::Int16 => {
245 let array = column.as_primitive::<Int16Type>();
246 row_writer.write_col(array.value(i))?;
247 }
248 DataType::Int32 => {
249 let array = column.as_primitive::<Int32Type>();
250 row_writer.write_col(array.value(i))?;
251 }
252 DataType::Int64 => {
253 let array = column.as_primitive::<Int64Type>();
254 row_writer.write_col(array.value(i))?;
255 }
256 DataType::Float32 => {
257 let array = column.as_primitive::<Float32Type>();
258 row_writer.write_col(array.value(i))?;
259 }
260 DataType::Float64 => {
261 let array = column.as_primitive::<Float64Type>();
262 row_writer.write_col(array.value(i))?;
263 }
264 DataType::Utf8 => {
265 let array = column.as_string::<i32>();
266 row_writer.write_col(array.value(i))?;
267 }
268 DataType::Utf8View => {
269 let array = column.as_string_view();
270 row_writer.write_col(array.value(i))?;
271 }
272 DataType::LargeUtf8 => {
273 let array = column.as_string::<i64>();
274 row_writer.write_col(array.value(i))?;
275 }
276 DataType::Binary => {
277 let array = column.as_binary::<i32>();
278 let v = array.value(i);
279 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
280 let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
281 row_writer.write_col(s)?;
282 } else {
283 row_writer.write_col(v)?;
284 }
285 }
286 DataType::BinaryView => {
287 let array = column.as_binary_view();
288 let v = array.value(i);
289 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
290 let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
291 row_writer.write_col(s)?;
292 } else {
293 row_writer.write_col(v)?;
294 }
295 }
296 DataType::LargeBinary => {
297 let array = column.as_binary::<i64>();
298 let v = array.value(i);
299 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
300 let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
301 row_writer.write_col(s)?;
302 } else {
303 row_writer.write_col(v)?;
304 }
305 }
306 DataType::Date32 => {
307 let array = column.as_primitive::<Date32Type>();
308 let v = Date::new(array.value(i));
309 row_writer.write_col(v.to_chrono_date())?;
310 }
311 DataType::Timestamp(_, _) => {
312 let v = datatypes::arrow_array::timestamp_array_value(column, i);
313 let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone()));
314 row_writer.write_col(v)?;
315 }
316 DataType::Interval(interval_unit) => match interval_unit {
317 IntervalUnit::YearMonth => {
318 let array = column.as_primitive::<IntervalYearMonthType>();
319 let v: IntervalYearMonth = array.value(i).into();
320 row_writer.write_col(v.to_iso8601_string())?;
321 }
322 IntervalUnit::DayTime => {
323 let array = column.as_primitive::<IntervalDayTimeType>();
324 let v: IntervalDayTime = array.value(i).into();
325 row_writer.write_col(v.to_iso8601_string())?;
326 }
327 IntervalUnit::MonthDayNano => {
328 let array = column.as_primitive::<IntervalMonthDayNanoType>();
329 let v: IntervalMonthDayNano = array.value(i).into();
330 row_writer.write_col(v.to_iso8601_string())?;
331 }
332 },
333 DataType::Duration(_) => {
334 let v: Duration =
335 datatypes::arrow_array::duration_array_value(column, i).into();
336 row_writer.write_col(v)?;
337 }
338 DataType::List(_) => {
339 let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
340 row_writer.write_col(v.to_string())?;
341 }
342 DataType::Struct(_) => {
343 let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
344 row_writer.write_col(v.to_string())?;
345 }
346 DataType::Time32(_) | DataType::Time64(_) => {
347 let time = datatypes::arrow_array::time_array_value(column, i);
348 let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
349 row_writer.write_col(v)?;
350 }
351 DataType::Decimal128(precision, scale) => {
352 let array = column.as_primitive::<Decimal128Type>();
353 let v = Decimal128::new(array.value(i), *precision, *scale);
354 row_writer.write_col(v.to_string())?;
355 }
356 _ => {
357 return NotSupportedSnafu {
358 feat: format!("convert {} to MySQL value", column.data_type()),
359 }
360 .fail();
361 }
362 }
363 }
364 row_writer.end_row().await?;
365 }
366 Ok(())
367 }
368
369 async fn write_query_error(
370 error: impl ErrorExt,
371 w: QueryResultWriter<'a, W>,
372 query_context: QueryContextRef,
373 ) -> Result<()> {
374 METRIC_ERROR_COUNTER
375 .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
376 .inc();
377
378 let (kind, err) = handle_err(error, query_context);
379 debug!("Write query error, kind: {:?}, err: {}", kind, err);
380 w.error(kind, err.as_bytes()).await?;
381 Ok(())
382 }
383}
384
385pub(crate) fn create_mysql_column(
386 data_type: &ConcreteDataType,
387 column_name: &str,
388) -> Result<Column> {
389 let column_type = match data_type {
390 ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
391 ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
392 Ok(ColumnType::MYSQL_TYPE_TINY)
393 }
394 ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
395 Ok(ColumnType::MYSQL_TYPE_SHORT)
396 }
397 ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
398 ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
399 Ok(ColumnType::MYSQL_TYPE_LONGLONG)
400 }
401 ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
402 ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
403 ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
404 Ok(ColumnType::MYSQL_TYPE_VARCHAR)
405 }
406 ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
407 ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
408 ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
409 ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
410 ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
411 ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
412 ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
413 ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
414 ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
415 ConcreteDataType::Struct(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
416 _ => error::UnsupportedDataTypeSnafu {
417 data_type,
418 reason: "not implemented",
419 }
420 .fail(),
421 };
422 let mut colflags = ColumnFlags::empty();
423 match data_type {
424 ConcreteDataType::UInt16(_)
425 | ConcreteDataType::UInt8(_)
426 | ConcreteDataType::UInt32(_)
427 | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
428 _ => {}
429 };
430 column_type.map(|column_type| Column {
431 column: column_name.to_string(),
432 coltype: column_type,
433 table: String::default(),
436 collen: 0, colflags,
438 })
439}
440
441pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
443 schema
444 .column_schemas()
445 .iter()
446 .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
447 .collect()
448}
449
450fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
451 match status_code {
452 StatusCode::Success => ErrorKind::ER_YES,
453 StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
454 StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
455 StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
456 StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
457 StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
458 StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
459 ErrorKind::ER_TABLE_EXISTS_ERROR
460 }
461 StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
462 StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
463 StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
464 StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
465 StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
466 StatusCode::PermissionDenied | StatusCode::AccessDenied => {
467 ErrorKind::ER_ACCESS_DENIED_ERROR
468 }
469 StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
470 StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
471 ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
472 }
473 StatusCode::Unexpected
474 | StatusCode::Internal
475 | StatusCode::IllegalState
476 | StatusCode::PlanQuery
477 | StatusCode::EngineExecuteQuery
478 | StatusCode::RegionNotReady
479 | StatusCode::RegionBusy
480 | StatusCode::TableUnavailable
481 | StatusCode::StorageUnavailable
482 | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
483 StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
484 StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
485 StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
486 StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
487 StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
488 StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
489 StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
490 StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
491 StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
492 }
493}