1use std::time::Duration;
16
17use arrow::array::{Array, AsArray};
18use arrow::datatypes::{
19 Date32Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
20 Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, UInt8Type,
21 UInt16Type, UInt32Type, UInt64Type,
22};
23use arrow_schema::{DataType, IntervalUnit};
24use common_decimal::Decimal128;
25use common_error::ext::ErrorExt;
26use common_error::status_code::StatusCode;
27use common_query::{Output, OutputData};
28use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
29use common_telemetry::{debug, error};
30use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
31use datafusion_common::ScalarValue;
32use datatypes::prelude::ConcreteDataType;
33use datatypes::schema::SchemaRef;
34use datatypes::types::jsonb_to_string;
35use futures::StreamExt;
36use opensrv_mysql::{
37 Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
38};
39use session::SessionRef;
40use session::context::QueryContextRef;
41use snafu::prelude::*;
42use tokio::io::AsyncWrite;
43
44use crate::error::{self, ConvertSqlValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result};
45use crate::metrics::*;
46
47pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
49 w: QueryResultWriter<'_, W>,
50 query_context: QueryContextRef,
51 session: SessionRef,
52 outputs: Vec<Result<Output>>,
53) -> Result<()> {
54 if let Some(warning) = query_context.warning() {
55 session.add_warning(warning);
56 }
57
58 let mut writer = Some(MysqlResultWriter::new(
59 w,
60 query_context.clone(),
61 session.clone(),
62 ));
63 for output in outputs {
64 let result_writer = writer.take().context(error::InternalSnafu {
65 err_msg: "Sending multiple result set is unsupported",
66 })?;
67 writer = result_writer.try_write_one(output).await?;
68 }
69
70 if let Some(result_writer) = writer {
71 result_writer.finish().await?;
72 }
73 Ok(())
74}
75
76pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
78 let status_code = e.status_code();
79 let kind = mysql_error_kind(&status_code);
80
81 if status_code.should_log_error() {
82 let root_error = e.root_cause().unwrap_or(&e);
83 error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
84 } else {
85 debug!(
86 "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
87 status_code,
88 query_ctx.get_db_string(),
89 e
90 );
91 };
92 let msg = e.output_msg();
93 let err_msg = format!("({status_code}): {msg}");
95
96 (kind, err_msg)
97}
98
99struct QueryResult {
100 schema: SchemaRef,
101 stream: SendableRecordBatchStream,
102}
103
104pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
105 writer: QueryResultWriter<'a, W>,
106 query_context: QueryContextRef,
107 session: SessionRef,
108}
109
110impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
111 pub fn new(
112 writer: QueryResultWriter<'a, W>,
113 query_context: QueryContextRef,
114 session: SessionRef,
115 ) -> MysqlResultWriter<'a, W> {
116 MysqlResultWriter::<'a, W> {
117 writer,
118 query_context,
119 session,
120 }
121 }
122
123 pub async fn try_write_one(
125 self,
126 output: Result<Output>,
127 ) -> Result<Option<MysqlResultWriter<'a, W>>> {
128 match output {
131 Ok(output) => match output.data {
132 OutputData::Stream(stream) => {
133 let query_result = QueryResult {
134 schema: stream.schema(),
135 stream,
136 };
137 Self::write_query_result(query_result, self.writer, self.query_context).await?;
138 }
139 OutputData::RecordBatches(recordbatches) => {
140 let query_result = QueryResult {
141 schema: recordbatches.schema(),
142 stream: recordbatches.as_stream(),
143 };
144 Self::write_query_result(query_result, self.writer, self.query_context).await?;
145 }
146 OutputData::AffectedRows(rows) => {
147 let next_writer =
148 Self::write_affected_rows(self.writer, rows, &self.session).await?;
149 return Ok(Some(MysqlResultWriter::new(
150 next_writer,
151 self.query_context,
152 self.session,
153 )));
154 }
155 },
156 Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
157 }
158 Ok(None)
159 }
160
161 pub async fn finish(self) -> Result<()> {
163 self.writer.no_more_results().await?;
164 Ok(())
165 }
166
167 async fn write_affected_rows(
168 w: QueryResultWriter<'a, W>,
169 rows: usize,
170 session: &SessionRef,
171 ) -> Result<QueryResultWriter<'a, W>> {
172 let warnings = session.warnings_count() as u16;
173
174 let next_writer = w
175 .complete_one(OkResponse {
176 affected_rows: rows as u64,
177 warnings,
178 ..Default::default()
179 })
180 .await?;
181 Ok(next_writer)
182 }
183
184 async fn write_query_result(
185 mut query_result: QueryResult,
186 writer: QueryResultWriter<'a, W>,
187 query_context: QueryContextRef,
188 ) -> Result<()> {
189 match create_mysql_column_def(&query_result.schema) {
190 Ok(column_def) => {
191 let mut row_writer = writer.start(&column_def).await?;
194 while let Some(record_batch) = query_result.stream.next().await {
195 match record_batch {
196 Ok(record_batch) => {
197 Self::write_recordbatch(
198 &mut row_writer,
199 record_batch,
200 query_context.clone(),
201 &query_result.schema,
202 )
203 .await?
204 }
205 Err(e) => {
206 let (kind, err) = handle_err(e, query_context);
207 debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
208 row_writer.finish_error(kind, &err.as_bytes()).await?;
209
210 return Ok(());
211 }
212 }
213 }
214 row_writer.finish().await?;
215 Ok(())
216 }
217 Err(error) => Self::write_query_error(error, writer, query_context).await,
218 }
219 }
220
221 async fn write_recordbatch(
222 row_writer: &mut RowWriter<'_, W>,
223 record_batch: RecordBatch,
224 query_context: QueryContextRef,
225 schema: &SchemaRef,
226 ) -> Result<()> {
227 let record_batch = record_batch.into_df_record_batch();
228 for i in 0..record_batch.num_rows() {
229 for (j, column) in record_batch.columns().iter().enumerate() {
230 if column.is_null(i) {
231 row_writer.write_col(None::<u8>)?;
232 continue;
233 }
234
235 match column.data_type() {
236 DataType::Null => {
237 row_writer.write_col(None::<u8>)?;
238 }
239 DataType::Boolean => {
240 let array = column.as_boolean();
241 row_writer.write_col(array.value(i) as i8)?;
242 }
243 DataType::UInt8 => {
244 let array = column.as_primitive::<UInt8Type>();
245 row_writer.write_col(array.value(i))?;
246 }
247 DataType::UInt16 => {
248 let array = column.as_primitive::<UInt16Type>();
249 row_writer.write_col(array.value(i))?;
250 }
251 DataType::UInt32 => {
252 let array = column.as_primitive::<UInt32Type>();
253 row_writer.write_col(array.value(i))?;
254 }
255 DataType::UInt64 => {
256 let array = column.as_primitive::<UInt64Type>();
257 row_writer.write_col(array.value(i))?;
258 }
259 DataType::Int8 => {
260 let array = column.as_primitive::<Int8Type>();
261 row_writer.write_col(array.value(i))?;
262 }
263 DataType::Int16 => {
264 let array = column.as_primitive::<Int16Type>();
265 row_writer.write_col(array.value(i))?;
266 }
267 DataType::Int32 => {
268 let array = column.as_primitive::<Int32Type>();
269 row_writer.write_col(array.value(i))?;
270 }
271 DataType::Int64 => {
272 let array = column.as_primitive::<Int64Type>();
273 row_writer.write_col(array.value(i))?;
274 }
275 DataType::Float32 => {
276 let array = column.as_primitive::<Float32Type>();
277 row_writer.write_col(array.value(i))?;
278 }
279 DataType::Float64 => {
280 let array = column.as_primitive::<Float64Type>();
281 row_writer.write_col(array.value(i))?;
282 }
283 DataType::Utf8 => {
284 let array = column.as_string::<i32>();
285 row_writer.write_col(array.value(i))?;
286 }
287 DataType::Utf8View => {
288 let array = column.as_string_view();
289 row_writer.write_col(array.value(i))?;
290 }
291 DataType::LargeUtf8 => {
292 let array = column.as_string::<i64>();
293 row_writer.write_col(array.value(i))?;
294 }
295 DataType::Binary => {
296 let array = column.as_binary::<i32>();
297 let v = array.value(i);
298 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
299 let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
300 row_writer.write_col(s)?;
301 } else {
302 row_writer.write_col(v)?;
303 }
304 }
305 DataType::BinaryView => {
306 let array = column.as_binary_view();
307 let v = array.value(i);
308 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
309 let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
310 row_writer.write_col(s)?;
311 } else {
312 row_writer.write_col(v)?;
313 }
314 }
315 DataType::LargeBinary => {
316 let array = column.as_binary::<i64>();
317 let v = array.value(i);
318 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
319 let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
320 row_writer.write_col(s)?;
321 } else {
322 row_writer.write_col(v)?;
323 }
324 }
325 DataType::Date32 => {
326 let array = column.as_primitive::<Date32Type>();
327 let v = Date::new(array.value(i));
328 row_writer.write_col(v.to_chrono_date())?;
329 }
330 DataType::Timestamp(_, _) => {
331 let v = datatypes::arrow_array::timestamp_array_value(column, i);
332 let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone()));
333 row_writer.write_col(v)?;
334 }
335 DataType::Interval(interval_unit) => match interval_unit {
336 IntervalUnit::YearMonth => {
337 let array = column.as_primitive::<IntervalYearMonthType>();
338 let v: IntervalYearMonth = array.value(i).into();
339 row_writer.write_col(v.to_iso8601_string())?;
340 }
341 IntervalUnit::DayTime => {
342 let array = column.as_primitive::<IntervalDayTimeType>();
343 let v: IntervalDayTime = array.value(i).into();
344 row_writer.write_col(v.to_iso8601_string())?;
345 }
346 IntervalUnit::MonthDayNano => {
347 let array = column.as_primitive::<IntervalMonthDayNanoType>();
348 let v: IntervalMonthDayNano = array.value(i).into();
349 row_writer.write_col(v.to_iso8601_string())?;
350 }
351 },
352 DataType::Duration(_) => {
353 let v: Duration =
354 datatypes::arrow_array::duration_array_value(column, i).into();
355 row_writer.write_col(v)?;
356 }
357 DataType::List(_) => {
358 let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
359 row_writer.write_col(v.to_string())?;
360 }
361 DataType::Struct(_) => {
362 let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
363 row_writer.write_col(v.to_string())?;
364 }
365 DataType::Time32(_) | DataType::Time64(_) => {
366 let time = datatypes::arrow_array::time_array_value(column, i);
367 let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
368 row_writer.write_col(v)?;
369 }
370 DataType::Decimal128(precision, scale) => {
371 let array = column.as_primitive::<Decimal128Type>();
372 let v = Decimal128::new(array.value(i), *precision, *scale);
373 row_writer.write_col(v.to_string())?;
374 }
375 _ => {
376 return NotSupportedSnafu {
377 feat: format!("convert {} to MySQL value", column.data_type()),
378 }
379 .fail();
380 }
381 }
382 }
383 row_writer.end_row().await?;
384 }
385 Ok(())
386 }
387
388 async fn write_query_error(
389 error: impl ErrorExt,
390 w: QueryResultWriter<'a, W>,
391 query_context: QueryContextRef,
392 ) -> Result<()> {
393 METRIC_ERROR_COUNTER
394 .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
395 .inc();
396
397 let (kind, err) = handle_err(error, query_context);
398 debug!("Write query error, kind: {:?}, err: {}", kind, err);
399 w.error(kind, err.as_bytes()).await?;
400 Ok(())
401 }
402}
403
404pub(crate) fn create_mysql_column(
405 data_type: &ConcreteDataType,
406 column_name: &str,
407) -> Result<Column> {
408 let column_type = match data_type {
409 ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
410 ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
411 Ok(ColumnType::MYSQL_TYPE_TINY)
412 }
413 ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
414 Ok(ColumnType::MYSQL_TYPE_SHORT)
415 }
416 ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
417 ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
418 Ok(ColumnType::MYSQL_TYPE_LONGLONG)
419 }
420 ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
421 ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
422 ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
423 Ok(ColumnType::MYSQL_TYPE_VARCHAR)
424 }
425 ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
426 ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
427 ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
428 ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
429 ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
430 ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
431 ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
432 ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
433 ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
434 ConcreteDataType::Struct(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
435 _ => error::UnsupportedDataTypeSnafu {
436 data_type,
437 reason: "not implemented",
438 }
439 .fail(),
440 };
441 let mut colflags = ColumnFlags::empty();
442 match data_type {
443 ConcreteDataType::UInt16(_)
444 | ConcreteDataType::UInt8(_)
445 | ConcreteDataType::UInt32(_)
446 | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
447 _ => {}
448 };
449 column_type.map(|column_type| Column {
450 column: column_name.to_string(),
451 coltype: column_type,
452 table: String::default(),
455 collen: 0, colflags,
457 })
458}
459
460pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
462 schema
463 .column_schemas()
464 .iter()
465 .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
466 .collect()
467}
468
469fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
470 match status_code {
471 StatusCode::Success => ErrorKind::ER_YES,
472 StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
473 StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
474 StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
475 StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
476 StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
477 StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
478 ErrorKind::ER_TABLE_EXISTS_ERROR
479 }
480 StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
481 StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
482 StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
483 StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
484 StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
485 StatusCode::PermissionDenied | StatusCode::AccessDenied => {
486 ErrorKind::ER_ACCESS_DENIED_ERROR
487 }
488 StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
489 StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
490 ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
491 }
492 StatusCode::Unexpected
493 | StatusCode::Internal
494 | StatusCode::IllegalState
495 | StatusCode::PlanQuery
496 | StatusCode::EngineExecuteQuery
497 | StatusCode::RegionNotReady
498 | StatusCode::RegionBusy
499 | StatusCode::TableUnavailable
500 | StatusCode::StorageUnavailable
501 | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
502 StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
503 StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
504 StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
505 StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
506 StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
507 StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
508 StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
509 StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
510 StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
511 }
512}