1use std::time::Duration;
16
17use arrow::array::{Array, AsArray};
18use arrow::datatypes::{
19 Date32Type, Decimal128Type, DurationMicrosecondType, DurationMillisecondType,
20 DurationNanosecondType, DurationSecondType, Float32Type, Float64Type, Int8Type, Int16Type,
21 Int32Type, Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType,
22 Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType,
23 TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType,
24 TimestampSecondType, UInt8Type, UInt16Type, UInt32Type, UInt64Type,
25};
26use arrow_schema::{DataType, IntervalUnit, TimeUnit};
27use common_decimal::Decimal128;
28use common_error::ext::ErrorExt;
29use common_error::status_code::StatusCode;
30use common_query::{Output, OutputData};
31use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
32use common_telemetry::{debug, error};
33use common_time::time::Time;
34use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth, Timestamp};
35use datafusion_common::ScalarValue;
36use datatypes::prelude::ConcreteDataType;
37use datatypes::schema::SchemaRef;
38use datatypes::types::jsonb_to_string;
39use futures::StreamExt;
40use opensrv_mysql::{
41 Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
42};
43use session::context::QueryContextRef;
44use snafu::prelude::*;
45use tokio::io::AsyncWrite;
46
47use crate::error::{self, ConvertSqlValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result};
48use crate::metrics::*;
49
50pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
52 w: QueryResultWriter<'_, W>,
53 query_context: QueryContextRef,
54 outputs: Vec<Result<Output>>,
55) -> Result<()> {
56 let mut writer = Some(MysqlResultWriter::new(w, query_context.clone()));
57 for output in outputs {
58 let result_writer = writer.take().context(error::InternalSnafu {
59 err_msg: "Sending multiple result set is unsupported",
60 })?;
61 writer = result_writer.try_write_one(output).await?;
62 }
63
64 if let Some(result_writer) = writer {
65 result_writer.finish().await?;
66 }
67 Ok(())
68}
69
70pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
72 let status_code = e.status_code();
73 let kind = mysql_error_kind(&status_code);
74
75 if status_code.should_log_error() {
76 let root_error = e.root_cause().unwrap_or(&e);
77 error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
78 } else {
79 debug!(
80 "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
81 status_code,
82 query_ctx.get_db_string(),
83 e
84 );
85 };
86 let msg = e.output_msg();
87 let err_msg = format!("({status_code}): {msg}");
89
90 (kind, err_msg)
91}
92
93struct QueryResult {
94 schema: SchemaRef,
95 stream: SendableRecordBatchStream,
96}
97
98pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
99 writer: QueryResultWriter<'a, W>,
100 query_context: QueryContextRef,
101}
102
103impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
104 pub fn new(
105 writer: QueryResultWriter<'a, W>,
106 query_context: QueryContextRef,
107 ) -> MysqlResultWriter<'a, W> {
108 MysqlResultWriter::<'a, W> {
109 writer,
110 query_context,
111 }
112 }
113
114 pub async fn try_write_one(
116 self,
117 output: Result<Output>,
118 ) -> Result<Option<MysqlResultWriter<'a, W>>> {
119 match output {
122 Ok(output) => match output.data {
123 OutputData::Stream(stream) => {
124 let query_result = QueryResult {
125 schema: stream.schema(),
126 stream,
127 };
128 Self::write_query_result(query_result, self.writer, self.query_context).await?;
129 }
130 OutputData::RecordBatches(recordbatches) => {
131 let query_result = QueryResult {
132 schema: recordbatches.schema(),
133 stream: recordbatches.as_stream(),
134 };
135 Self::write_query_result(query_result, self.writer, self.query_context).await?;
136 }
137 OutputData::AffectedRows(rows) => {
138 let next_writer = Self::write_affected_rows(self.writer, rows).await?;
139 return Ok(Some(MysqlResultWriter::new(
140 next_writer,
141 self.query_context,
142 )));
143 }
144 },
145 Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
146 }
147 Ok(None)
148 }
149
150 pub async fn finish(self) -> Result<()> {
152 self.writer.no_more_results().await?;
153 Ok(())
154 }
155
156 async fn write_affected_rows(
157 w: QueryResultWriter<'a, W>,
158 rows: usize,
159 ) -> Result<QueryResultWriter<'a, W>> {
160 let next_writer = w
161 .complete_one(OkResponse {
162 affected_rows: rows as u64,
163 ..Default::default()
164 })
165 .await?;
166 Ok(next_writer)
167 }
168
169 async fn write_query_result(
170 mut query_result: QueryResult,
171 writer: QueryResultWriter<'a, W>,
172 query_context: QueryContextRef,
173 ) -> Result<()> {
174 match create_mysql_column_def(&query_result.schema) {
175 Ok(column_def) => {
176 let mut row_writer = writer.start(&column_def).await?;
179 while let Some(record_batch) = query_result.stream.next().await {
180 match record_batch {
181 Ok(record_batch) => {
182 Self::write_recordbatch(
183 &mut row_writer,
184 record_batch,
185 query_context.clone(),
186 &query_result.schema,
187 )
188 .await?
189 }
190 Err(e) => {
191 let (kind, err) = handle_err(e, query_context);
192 debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
193 row_writer.finish_error(kind, &err.as_bytes()).await?;
194
195 return Ok(());
196 }
197 }
198 }
199 row_writer.finish().await?;
200 Ok(())
201 }
202 Err(error) => Self::write_query_error(error, writer, query_context).await,
203 }
204 }
205
206 async fn write_recordbatch(
207 row_writer: &mut RowWriter<'_, W>,
208 record_batch: RecordBatch,
209 query_context: QueryContextRef,
210 schema: &SchemaRef,
211 ) -> Result<()> {
212 let record_batch = record_batch.into_df_record_batch();
213 for i in 0..record_batch.num_rows() {
214 for (j, column) in record_batch.columns().iter().enumerate() {
215 if column.is_null(i) {
216 row_writer.write_col(None::<u8>)?;
217 continue;
218 }
219
220 match column.data_type() {
221 DataType::Null => {
222 row_writer.write_col(None::<u8>)?;
223 }
224 DataType::Boolean => {
225 let array = column.as_boolean();
226 row_writer.write_col(array.value(i) as i8)?;
227 }
228 DataType::UInt8 => {
229 let array = column.as_primitive::<UInt8Type>();
230 row_writer.write_col(array.value(i))?;
231 }
232 DataType::UInt16 => {
233 let array = column.as_primitive::<UInt16Type>();
234 row_writer.write_col(array.value(i))?;
235 }
236 DataType::UInt32 => {
237 let array = column.as_primitive::<UInt32Type>();
238 row_writer.write_col(array.value(i))?;
239 }
240 DataType::UInt64 => {
241 let array = column.as_primitive::<UInt64Type>();
242 row_writer.write_col(array.value(i))?;
243 }
244 DataType::Int8 => {
245 let array = column.as_primitive::<Int8Type>();
246 row_writer.write_col(array.value(i))?;
247 }
248 DataType::Int16 => {
249 let array = column.as_primitive::<Int16Type>();
250 row_writer.write_col(array.value(i))?;
251 }
252 DataType::Int32 => {
253 let array = column.as_primitive::<Int32Type>();
254 row_writer.write_col(array.value(i))?;
255 }
256 DataType::Int64 => {
257 let array = column.as_primitive::<Int64Type>();
258 row_writer.write_col(array.value(i))?;
259 }
260 DataType::Float32 => {
261 let array = column.as_primitive::<Float32Type>();
262 row_writer.write_col(array.value(i))?;
263 }
264 DataType::Float64 => {
265 let array = column.as_primitive::<Float64Type>();
266 row_writer.write_col(array.value(i))?;
267 }
268 DataType::Utf8 => {
269 let array = column.as_string::<i32>();
270 row_writer.write_col(array.value(i))?;
271 }
272 DataType::Utf8View => {
273 let array = column.as_string_view();
274 row_writer.write_col(array.value(i))?;
275 }
276 DataType::LargeUtf8 => {
277 let array = column.as_string::<i64>();
278 row_writer.write_col(array.value(i))?;
279 }
280 DataType::Binary => {
281 let array = column.as_binary::<i32>();
282 let v = array.value(i);
283 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
284 let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
285 row_writer.write_col(s)?;
286 } else {
287 row_writer.write_col(v)?;
288 }
289 }
290 DataType::BinaryView => {
291 let array = column.as_binary_view();
292 let v = array.value(i);
293 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
294 let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
295 row_writer.write_col(s)?;
296 } else {
297 row_writer.write_col(v)?;
298 }
299 }
300 DataType::LargeBinary => {
301 let array = column.as_binary::<i64>();
302 let v = array.value(i);
303 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
304 let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
305 row_writer.write_col(s)?;
306 } else {
307 row_writer.write_col(v)?;
308 }
309 }
310 DataType::Date32 => {
311 let array = column.as_primitive::<Date32Type>();
312 let v = Date::new(array.value(i));
313 row_writer.write_col(v.to_chrono_date())?;
314 }
315 DataType::Timestamp(time_unit, _) => {
316 let v = match time_unit {
317 TimeUnit::Second => {
318 let array = column.as_primitive::<TimestampSecondType>();
319 array.value(i)
320 }
321 TimeUnit::Millisecond => {
322 let array = column.as_primitive::<TimestampMillisecondType>();
323 array.value(i)
324 }
325 TimeUnit::Microsecond => {
326 let array = column.as_primitive::<TimestampMicrosecondType>();
327 array.value(i)
328 }
329 TimeUnit::Nanosecond => {
330 let array = column.as_primitive::<TimestampNanosecondType>();
331 array.value(i)
332 }
333 };
334 let v = Timestamp::new(v, time_unit.into());
335 let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone()));
336 row_writer.write_col(v)?;
337 }
338 DataType::Interval(interval_unit) => match interval_unit {
339 IntervalUnit::YearMonth => {
340 let array = column.as_primitive::<IntervalYearMonthType>();
341 let v: IntervalYearMonth = array.value(i).into();
342 row_writer.write_col(v.to_iso8601_string())?;
343 }
344 IntervalUnit::DayTime => {
345 let array = column.as_primitive::<IntervalDayTimeType>();
346 let v: IntervalDayTime = array.value(i).into();
347 row_writer.write_col(v.to_iso8601_string())?;
348 }
349 IntervalUnit::MonthDayNano => {
350 let array = column.as_primitive::<IntervalMonthDayNanoType>();
351 let v: IntervalMonthDayNano = array.value(i).into();
352 row_writer.write_col(v.to_iso8601_string())?;
353 }
354 },
355 DataType::Duration(time_unit) => match time_unit {
356 TimeUnit::Second => {
357 let array = column.as_primitive::<DurationSecondType>();
358 let v = array.value(i);
359 row_writer.write_col(Duration::from_secs(v as u64))?;
360 }
361 TimeUnit::Millisecond => {
362 let array = column.as_primitive::<DurationMillisecondType>();
363 let v = array.value(i);
364 row_writer.write_col(Duration::from_millis(v as u64))?;
365 }
366 TimeUnit::Microsecond => {
367 let array = column.as_primitive::<DurationMicrosecondType>();
368 let v = array.value(i);
369 row_writer.write_col(Duration::from_micros(v as u64))?;
370 }
371 TimeUnit::Nanosecond => {
372 let array = column.as_primitive::<DurationNanosecondType>();
373 let v = array.value(i);
374 row_writer.write_col(Duration::from_nanos(v as u64))?;
375 }
376 },
377 DataType::List(_) => {
378 let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
379 row_writer.write_col(v.to_string())?;
380 }
381 DataType::Struct(_) => {
382 let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
383 row_writer.write_col(v.to_string())?;
384 }
385 DataType::Time32(time_unit) => {
386 let time = match time_unit {
387 TimeUnit::Second => {
388 let array = column.as_primitive::<Time32SecondType>();
389 Time::new_second(array.value(i) as i64)
390 }
391 TimeUnit::Millisecond => {
392 let array = column.as_primitive::<Time32MillisecondType>();
393 Time::new_millisecond(array.value(i) as i64)
394 }
395 _ => unreachable!(
396 "`DataType::Time32` has only second and millisecond time units"
397 ),
398 };
399 let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
400 row_writer.write_col(v)?;
401 }
402 DataType::Time64(time_unit) => {
403 let time = match time_unit {
404 TimeUnit::Microsecond => {
405 let array = column.as_primitive::<Time64MicrosecondType>();
406 Time::new_microsecond(array.value(i))
407 }
408 TimeUnit::Nanosecond => {
409 let array = column.as_primitive::<Time64NanosecondType>();
410 Time::new_nanosecond(array.value(i))
411 }
412 _ => unreachable!(
413 "`DataType::Time64` has only microsecond and nanosecond time units"
414 ),
415 };
416 let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
417 row_writer.write_col(v)?;
418 }
419 DataType::Decimal128(precision, scale) => {
420 let array = column.as_primitive::<Decimal128Type>();
421 let v = Decimal128::new(array.value(i), *precision, *scale);
422 row_writer.write_col(v.to_string())?;
423 }
424 _ => {
425 return NotSupportedSnafu {
426 feat: format!("convert {} to MySQL value", column.data_type()),
427 }
428 .fail();
429 }
430 }
431 }
432 row_writer.end_row().await?;
433 }
434 Ok(())
435 }
436
437 async fn write_query_error(
438 error: impl ErrorExt,
439 w: QueryResultWriter<'a, W>,
440 query_context: QueryContextRef,
441 ) -> Result<()> {
442 METRIC_ERROR_COUNTER
443 .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
444 .inc();
445
446 let (kind, err) = handle_err(error, query_context);
447 debug!("Write query error, kind: {:?}, err: {}", kind, err);
448 w.error(kind, err.as_bytes()).await?;
449 Ok(())
450 }
451}
452
453pub(crate) fn create_mysql_column(
454 data_type: &ConcreteDataType,
455 column_name: &str,
456) -> Result<Column> {
457 let column_type = match data_type {
458 ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
459 ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
460 Ok(ColumnType::MYSQL_TYPE_TINY)
461 }
462 ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
463 Ok(ColumnType::MYSQL_TYPE_SHORT)
464 }
465 ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
466 ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
467 Ok(ColumnType::MYSQL_TYPE_LONGLONG)
468 }
469 ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
470 ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
471 ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
472 Ok(ColumnType::MYSQL_TYPE_VARCHAR)
473 }
474 ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
475 ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
476 ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
477 ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
478 ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
479 ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
480 ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
481 ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
482 ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
483 ConcreteDataType::Struct(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
484 _ => error::UnsupportedDataTypeSnafu {
485 data_type,
486 reason: "not implemented",
487 }
488 .fail(),
489 };
490 let mut colflags = ColumnFlags::empty();
491 match data_type {
492 ConcreteDataType::UInt16(_)
493 | ConcreteDataType::UInt8(_)
494 | ConcreteDataType::UInt32(_)
495 | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
496 _ => {}
497 };
498 column_type.map(|column_type| Column {
499 column: column_name.to_string(),
500 coltype: column_type,
501
502 table: String::default(),
505 colflags,
506 })
507}
508
509pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
511 schema
512 .column_schemas()
513 .iter()
514 .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
515 .collect()
516}
517
518fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
519 match status_code {
520 StatusCode::Success => ErrorKind::ER_YES,
521 StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
522 StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
523 StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
524 StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
525 StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
526 StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
527 ErrorKind::ER_TABLE_EXISTS_ERROR
528 }
529 StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
530 StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
531 StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
532 StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
533 StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
534 StatusCode::PermissionDenied | StatusCode::AccessDenied => {
535 ErrorKind::ER_ACCESS_DENIED_ERROR
536 }
537 StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
538 StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
539 ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
540 }
541 StatusCode::Unexpected
542 | StatusCode::Internal
543 | StatusCode::IllegalState
544 | StatusCode::PlanQuery
545 | StatusCode::EngineExecuteQuery
546 | StatusCode::RegionNotReady
547 | StatusCode::RegionBusy
548 | StatusCode::TableUnavailable
549 | StatusCode::StorageUnavailable
550 | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
551 StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
552 StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
553 StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
554 StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
555 StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
556 StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
557 StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
558 StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
559 StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
560 }
561}