1use std::io;
16use std::time::Duration;
17
18use arrow::array::{Array, AsArray};
19use arrow::datatypes::{
20 Date32Type, Decimal128Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type,
21 Int64Type, IntervalDayTimeType, IntervalMonthDayNanoType, IntervalYearMonthType, UInt8Type,
22 UInt16Type, UInt32Type, UInt64Type,
23};
24use arrow_schema::{DataType, IntervalUnit};
25use common_decimal::Decimal128;
26use common_error::ext::ErrorExt;
27use common_error::status_code::StatusCode;
28use common_query::{Output, OutputData};
29use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
30use common_telemetry::{debug, error};
31use common_time::{Date, IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
32use datafusion_common::ScalarValue;
33use datatypes::prelude::ConcreteDataType;
34use datatypes::schema::SchemaRef;
35use datatypes::types::jsonb_to_string;
36use futures::StreamExt;
37use opensrv_mysql::{
38 Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
39};
40use session::SessionRef;
41use session::context::QueryContextRef;
42use snafu::prelude::*;
43use tokio::io::AsyncWrite;
44
45use crate::error::{self, ConvertSqlValueSnafu, DataFusionSnafu, NotSupportedSnafu, Result};
46use crate::metrics::*;
47
48pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
50 w: QueryResultWriter<'_, W>,
51 query_context: QueryContextRef,
52 session: SessionRef,
53 outputs: Vec<Result<Output>>,
54) -> Result<()> {
55 if let Some(warning) = query_context.warning() {
56 session.add_warning(warning);
57 }
58
59 let mut writer = Some(MysqlResultWriter::new(
60 w,
61 query_context.clone(),
62 session.clone(),
63 ));
64 for output in outputs {
65 let result_writer = writer.take().context(error::InternalSnafu {
66 err_msg: "Sending multiple result set is unsupported",
67 })?;
68 writer = result_writer.try_write_one(output).await?;
69 }
70
71 if let Some(result_writer) = writer {
72 result_writer.finish().await?;
73 }
74 Ok(())
75}
76
77pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
79 let status_code = e.status_code();
80 let kind = mysql_error_kind(&status_code);
81
82 if status_code.should_log_error() {
83 let root_error = e.root_cause().unwrap_or(&e);
84 error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
85 } else {
86 debug!(
87 "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
88 status_code,
89 query_ctx.get_db_string(),
90 e
91 );
92 };
93 let msg = e.output_msg();
94 let err_msg = format!("({status_code}): {msg}");
96
97 (kind, err_msg)
98}
99
100struct QueryResult {
101 schema: SchemaRef,
102 stream: SendableRecordBatchStream,
103}
104
105pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
106 writer: QueryResultWriter<'a, W>,
107 query_context: QueryContextRef,
108 session: SessionRef,
109}
110
111impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
112 pub fn new(
113 writer: QueryResultWriter<'a, W>,
114 query_context: QueryContextRef,
115 session: SessionRef,
116 ) -> MysqlResultWriter<'a, W> {
117 MysqlResultWriter::<'a, W> {
118 writer,
119 query_context,
120 session,
121 }
122 }
123
124 pub async fn try_write_one(
126 self,
127 output: Result<Output>,
128 ) -> io::Result<Option<MysqlResultWriter<'a, W>>> {
129 match output {
132 Ok(output) => match output.data {
133 OutputData::Stream(stream) => {
134 let query_result = QueryResult {
135 schema: stream.schema(),
136 stream,
137 };
138 Self::write_query_result(query_result, self.writer, self.query_context).await?;
139 }
140 OutputData::RecordBatches(recordbatches) => {
141 let query_result = QueryResult {
142 schema: recordbatches.schema(),
143 stream: recordbatches.as_stream(),
144 };
145 Self::write_query_result(query_result, self.writer, self.query_context).await?;
146 }
147 OutputData::AffectedRows(rows) => {
148 let next_writer =
149 Self::write_affected_rows(self.writer, rows, &self.session).await?;
150 return Ok(Some(MysqlResultWriter::new(
151 next_writer,
152 self.query_context,
153 self.session,
154 )));
155 }
156 },
157 Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
158 }
159 Ok(None)
160 }
161
162 pub async fn finish(self) -> Result<()> {
164 self.writer.no_more_results().await?;
165 Ok(())
166 }
167
168 async fn write_affected_rows(
169 w: QueryResultWriter<'a, W>,
170 rows: usize,
171 session: &SessionRef,
172 ) -> io::Result<QueryResultWriter<'a, W>> {
173 let warnings = session.warnings_count() as u16;
174
175 let next_writer = w
176 .complete_one(OkResponse {
177 affected_rows: rows as u64,
178 warnings,
179 ..Default::default()
180 })
181 .await?;
182 Ok(next_writer)
183 }
184
185 async fn write_query_result(
186 mut query_result: QueryResult,
187 writer: QueryResultWriter<'a, W>,
188 query_context: QueryContextRef,
189 ) -> io::Result<()> {
190 match create_mysql_column_def(&query_result.schema) {
191 Ok(column_def) => {
192 let mut row_writer = writer.start(&column_def).await?;
195 while let Some(record_batch) = query_result.stream.next().await {
196 match record_batch {
197 Ok(record_batch) => {
198 if let Err(e) = Self::write_recordbatch(
199 &mut row_writer,
200 record_batch,
201 query_context.clone(),
202 &query_result.schema,
203 )
204 .await
205 {
206 let (kind, err) = handle_err(e, query_context);
207 row_writer.finish_error(kind, &err.as_bytes()).await?;
208 return Ok(());
209 }
210 }
211 Err(e) => {
212 let (kind, err) = handle_err(e, query_context);
213 debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
214 row_writer.finish_error(kind, &err.as_bytes()).await?;
215
216 return Ok(());
217 }
218 }
219 }
220 row_writer.finish().await?;
221 Ok(())
222 }
223 Err(error) => Self::write_query_error(error, writer, query_context).await,
224 }
225 }
226
227 async fn write_recordbatch(
228 row_writer: &mut RowWriter<'_, W>,
229 record_batch: RecordBatch,
230 query_context: QueryContextRef,
231 schema: &SchemaRef,
232 ) -> Result<()> {
233 let record_batch = record_batch.into_df_record_batch();
234 for i in 0..record_batch.num_rows() {
235 for (j, column) in record_batch.columns().iter().enumerate() {
236 if column.is_null(i) {
237 row_writer.write_col(None::<u8>)?;
238 continue;
239 }
240
241 match column.data_type() {
242 DataType::Null => {
243 row_writer.write_col(None::<u8>)?;
244 }
245 DataType::Boolean => {
246 let array = column.as_boolean();
247 row_writer.write_col(array.value(i) as i8)?;
248 }
249 DataType::UInt8 => {
250 let array = column.as_primitive::<UInt8Type>();
251 row_writer.write_col(array.value(i))?;
252 }
253 DataType::UInt16 => {
254 let array = column.as_primitive::<UInt16Type>();
255 row_writer.write_col(array.value(i))?;
256 }
257 DataType::UInt32 => {
258 let array = column.as_primitive::<UInt32Type>();
259 row_writer.write_col(array.value(i))?;
260 }
261 DataType::UInt64 => {
262 let array = column.as_primitive::<UInt64Type>();
263 row_writer.write_col(array.value(i))?;
264 }
265 DataType::Int8 => {
266 let array = column.as_primitive::<Int8Type>();
267 row_writer.write_col(array.value(i))?;
268 }
269 DataType::Int16 => {
270 let array = column.as_primitive::<Int16Type>();
271 row_writer.write_col(array.value(i))?;
272 }
273 DataType::Int32 => {
274 let array = column.as_primitive::<Int32Type>();
275 row_writer.write_col(array.value(i))?;
276 }
277 DataType::Int64 => {
278 let array = column.as_primitive::<Int64Type>();
279 row_writer.write_col(array.value(i))?;
280 }
281 DataType::Float32 => {
282 let array = column.as_primitive::<Float32Type>();
283 row_writer.write_col(array.value(i))?;
284 }
285 DataType::Float64 => {
286 let array = column.as_primitive::<Float64Type>();
287 row_writer.write_col(array.value(i))?;
288 }
289 DataType::Utf8 | DataType::Utf8View | DataType::LargeUtf8 => {
290 let v = datatypes::arrow_array::string_array_value(column, i);
291 row_writer.write_col(v)?;
292 }
293 DataType::Binary | DataType::BinaryView | DataType::LargeBinary => {
294 let v = datatypes::arrow_array::binary_array_value(column, i);
295 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
296 let s = jsonb_to_string(v).context(ConvertSqlValueSnafu)?;
297 row_writer.write_col(s)?;
298 } else {
299 row_writer.write_col(v)?;
300 }
301 }
302 DataType::Date32 => {
303 let array = column.as_primitive::<Date32Type>();
304 let v = Date::new(array.value(i));
305 row_writer.write_col(v.to_chrono_date())?;
306 }
307 DataType::Timestamp(_, _) => {
308 let v = datatypes::arrow_array::timestamp_array_value(column, i);
309 let v = v.to_chrono_datetime_with_timezone(Some(&query_context.timezone()));
310 row_writer.write_col(v)?;
311 }
312 DataType::Interval(interval_unit) => match interval_unit {
313 IntervalUnit::YearMonth => {
314 let array = column.as_primitive::<IntervalYearMonthType>();
315 let v: IntervalYearMonth = array.value(i).into();
316 row_writer.write_col(v.to_iso8601_string())?;
317 }
318 IntervalUnit::DayTime => {
319 let array = column.as_primitive::<IntervalDayTimeType>();
320 let v: IntervalDayTime = array.value(i).into();
321 row_writer.write_col(v.to_iso8601_string())?;
322 }
323 IntervalUnit::MonthDayNano => {
324 let array = column.as_primitive::<IntervalMonthDayNanoType>();
325 let v: IntervalMonthDayNano = array.value(i).into();
326 row_writer.write_col(v.to_iso8601_string())?;
327 }
328 },
329 DataType::Duration(_) => {
330 let v: Duration =
331 datatypes::arrow_array::duration_array_value(column, i).into();
332 row_writer.write_col(v)?;
333 }
334 DataType::List(_) | DataType::Struct(_) => {
335 let v = ScalarValue::try_from_array(column, i).context(DataFusionSnafu)?;
336 row_writer.write_col(v.to_string())?;
337 }
338 DataType::Time32(_) | DataType::Time64(_) => {
339 let time = datatypes::arrow_array::time_array_value(column, i);
340 let v = time.to_timezone_aware_string(Some(&query_context.timezone()));
341 row_writer.write_col(v)?;
342 }
343 DataType::Decimal128(precision, scale) => {
344 let array = column.as_primitive::<Decimal128Type>();
345 let v = Decimal128::new(array.value(i), *precision, *scale);
346 row_writer.write_col(v.to_string())?;
347 }
348 _ => {
349 return NotSupportedSnafu {
350 feat: format!("convert {} to MySQL value", column.data_type()),
351 }
352 .fail();
353 }
354 }
355 }
356 row_writer.end_row().await?;
357 }
358 Ok(())
359 }
360
361 async fn write_query_error(
362 error: impl ErrorExt,
363 w: QueryResultWriter<'a, W>,
364 query_context: QueryContextRef,
365 ) -> io::Result<()> {
366 METRIC_ERROR_COUNTER
367 .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
368 .inc();
369
370 let (kind, err) = handle_err(error, query_context);
371 debug!("Write query error, kind: {:?}, err: {}", kind, err);
372 w.error(kind, err.as_bytes()).await?;
373 Ok(())
374 }
375}
376
377pub(crate) fn create_mysql_column(
378 data_type: &ConcreteDataType,
379 column_name: &str,
380) -> Result<Column> {
381 let column_type = match data_type {
382 ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
383 ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
384 Ok(ColumnType::MYSQL_TYPE_TINY)
385 }
386 ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
387 Ok(ColumnType::MYSQL_TYPE_SHORT)
388 }
389 ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
390 ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
391 Ok(ColumnType::MYSQL_TYPE_LONGLONG)
392 }
393 ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
394 ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
395 ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
396 Ok(ColumnType::MYSQL_TYPE_VARCHAR)
397 }
398 ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
399 ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
400 ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
401 ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
402 ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
403 ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
404 ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
405 ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
406 ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
407 ConcreteDataType::Struct(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
408 _ => error::UnsupportedDataTypeSnafu {
409 data_type,
410 reason: "not implemented",
411 }
412 .fail(),
413 };
414 let mut colflags = ColumnFlags::empty();
415 match data_type {
416 ConcreteDataType::UInt16(_)
417 | ConcreteDataType::UInt8(_)
418 | ConcreteDataType::UInt32(_)
419 | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
420 _ => {}
421 };
422 column_type.map(|column_type| Column {
423 column: column_name.to_string(),
424 coltype: column_type,
425 table: String::default(),
428 collen: 0, colflags,
430 })
431}
432
433pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
435 schema
436 .column_schemas()
437 .iter()
438 .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
439 .collect()
440}
441
442fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
443 match status_code {
444 StatusCode::Success => ErrorKind::ER_YES,
445 StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
446 StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
447 StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
448 StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
449 StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
450 StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
451 ErrorKind::ER_TABLE_EXISTS_ERROR
452 }
453 StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
454 StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
455 StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
456 StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
457 StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
458 StatusCode::PermissionDenied | StatusCode::AccessDenied => {
459 ErrorKind::ER_ACCESS_DENIED_ERROR
460 }
461 StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
462 StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
463 ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
464 }
465 StatusCode::Unexpected
466 | StatusCode::Internal
467 | StatusCode::IllegalState
468 | StatusCode::PlanQuery
469 | StatusCode::EngineExecuteQuery
470 | StatusCode::RegionNotReady
471 | StatusCode::RegionBusy
472 | StatusCode::TableUnavailable
473 | StatusCode::StorageUnavailable
474 | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
475 StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
476 StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
477 StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
478 StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
479 StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
480 StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
481 StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
482 StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
483 StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
484 StatusCode::Suspended => ErrorKind::ER_SERVER_SHUTDOWN,
485 }
486}