1use std::ops::Deref;
16
17use common_error::ext::ErrorExt;
18use common_error::status_code::StatusCode;
19use common_query::{Output, OutputData};
20use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
21use common_telemetry::{debug, error};
22use datatypes::prelude::{ConcreteDataType, Value};
23use datatypes::schema::SchemaRef;
24use datatypes::types::json_type_value_to_string;
25use futures::StreamExt;
26use itertools::Itertools;
27use opensrv_mysql::{
28 Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
29};
30use session::context::QueryContextRef;
31use snafu::prelude::*;
32use tokio::io::AsyncWrite;
33
34use crate::error::{self, ConvertSqlValueSnafu, Result};
35use crate::metrics::*;
36
37pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
39 w: QueryResultWriter<'_, W>,
40 query_context: QueryContextRef,
41 outputs: Vec<Result<Output>>,
42) -> Result<()> {
43 let mut writer = Some(MysqlResultWriter::new(w, query_context.clone()));
44 for output in outputs {
45 let result_writer = writer.take().context(error::InternalSnafu {
46 err_msg: "Sending multiple result set is unsupported",
47 })?;
48 writer = result_writer.try_write_one(output).await?;
49 }
50
51 if let Some(result_writer) = writer {
52 result_writer.finish().await?;
53 }
54 Ok(())
55}
56
57pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
59 let status_code = e.status_code();
60 let kind = mysql_error_kind(&status_code);
61
62 if status_code.should_log_error() {
63 let root_error = e.root_cause().unwrap_or(&e);
64 error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
65 } else {
66 debug!(
67 "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
68 status_code,
69 query_ctx.get_db_string(),
70 e
71 );
72 };
73 let msg = e.output_msg();
74 let err_msg = format!("({status_code}): {msg}");
76
77 (kind, err_msg)
78}
79
80struct QueryResult {
81 schema: SchemaRef,
82 stream: SendableRecordBatchStream,
83}
84
85pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
86 writer: QueryResultWriter<'a, W>,
87 query_context: QueryContextRef,
88}
89
90impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
91 pub fn new(
92 writer: QueryResultWriter<'a, W>,
93 query_context: QueryContextRef,
94 ) -> MysqlResultWriter<'a, W> {
95 MysqlResultWriter::<'a, W> {
96 writer,
97 query_context,
98 }
99 }
100
101 pub async fn try_write_one(
103 self,
104 output: Result<Output>,
105 ) -> Result<Option<MysqlResultWriter<'a, W>>> {
106 match output {
109 Ok(output) => match output.data {
110 OutputData::Stream(stream) => {
111 let query_result = QueryResult {
112 schema: stream.schema(),
113 stream,
114 };
115 Self::write_query_result(query_result, self.writer, self.query_context).await?;
116 }
117 OutputData::RecordBatches(recordbatches) => {
118 let query_result = QueryResult {
119 schema: recordbatches.schema(),
120 stream: recordbatches.as_stream(),
121 };
122 Self::write_query_result(query_result, self.writer, self.query_context).await?;
123 }
124 OutputData::AffectedRows(rows) => {
125 let next_writer = Self::write_affected_rows(self.writer, rows).await?;
126 return Ok(Some(MysqlResultWriter::new(
127 next_writer,
128 self.query_context,
129 )));
130 }
131 },
132 Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
133 }
134 Ok(None)
135 }
136
137 pub async fn finish(self) -> Result<()> {
139 self.writer.no_more_results().await?;
140 Ok(())
141 }
142
143 async fn write_affected_rows(
144 w: QueryResultWriter<'a, W>,
145 rows: usize,
146 ) -> Result<QueryResultWriter<'a, W>> {
147 let next_writer = w
148 .complete_one(OkResponse {
149 affected_rows: rows as u64,
150 ..Default::default()
151 })
152 .await?;
153 Ok(next_writer)
154 }
155
156 async fn write_query_result(
157 mut query_result: QueryResult,
158 writer: QueryResultWriter<'a, W>,
159 query_context: QueryContextRef,
160 ) -> Result<()> {
161 match create_mysql_column_def(&query_result.schema) {
162 Ok(column_def) => {
163 let mut row_writer = writer.start(&column_def).await?;
166 while let Some(record_batch) = query_result.stream.next().await {
167 match record_batch {
168 Ok(record_batch) => {
169 Self::write_recordbatch(
170 &mut row_writer,
171 &record_batch,
172 query_context.clone(),
173 &query_result.schema,
174 )
175 .await?
176 }
177 Err(e) => {
178 let (kind, err) = handle_err(e, query_context);
179 debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
180 row_writer.finish_error(kind, &err.as_bytes()).await?;
181
182 return Ok(());
183 }
184 }
185 }
186 row_writer.finish().await?;
187 Ok(())
188 }
189 Err(error) => Self::write_query_error(error, writer, query_context).await,
190 }
191 }
192
193 async fn write_recordbatch(
194 row_writer: &mut RowWriter<'_, W>,
195 recordbatch: &RecordBatch,
196 query_context: QueryContextRef,
197 schema: &SchemaRef,
198 ) -> Result<()> {
199 for row in recordbatch.rows() {
200 for (value, column) in row.into_iter().zip(schema.column_schemas().iter()) {
201 match value {
202 Value::Null => row_writer.write_col(None::<u8>)?,
203 Value::Boolean(v) => row_writer.write_col(v as i8)?,
204 Value::UInt8(v) => row_writer.write_col(v)?,
205 Value::UInt16(v) => row_writer.write_col(v)?,
206 Value::UInt32(v) => row_writer.write_col(v)?,
207 Value::UInt64(v) => row_writer.write_col(v)?,
208 Value::Int8(v) => row_writer.write_col(v)?,
209 Value::Int16(v) => row_writer.write_col(v)?,
210 Value::Int32(v) => row_writer.write_col(v)?,
211 Value::Int64(v) => row_writer.write_col(v)?,
212 Value::Float32(v) => row_writer.write_col(v.0)?,
213 Value::Float64(v) => row_writer.write_col(v.0)?,
214 Value::String(v) => row_writer.write_col(v.as_utf8())?,
215 Value::Binary(v) => match column.data_type {
216 ConcreteDataType::Json(j) => {
217 let s = json_type_value_to_string(&v, &j.format)
218 .context(ConvertSqlValueSnafu)?;
219 row_writer.write_col(s)?;
220 }
221 _ => {
222 row_writer.write_col(v.deref())?;
223 }
224 },
225 Value::Date(v) => row_writer.write_col(v.to_chrono_date())?,
226 Value::Timestamp(v) => row_writer.write_col(
228 v.to_chrono_datetime_with_timezone(Some(&query_context.timezone())),
229 )?,
230 Value::IntervalYearMonth(v) => row_writer.write_col(v.to_iso8601_string())?,
231 Value::IntervalDayTime(v) => row_writer.write_col(v.to_iso8601_string())?,
232 Value::IntervalMonthDayNano(v) => {
233 row_writer.write_col(v.to_iso8601_string())?
234 }
235 Value::Duration(v) => row_writer.write_col(v.to_std_duration())?,
236 Value::List(v) => row_writer.write_col(format!(
237 "[{}]",
238 v.items().iter().map(|x| x.to_string()).join(", ")
239 ))?,
240 Value::Struct(struct_value) => row_writer.write_col(format!(
241 "{{{}}}",
242 struct_value
243 .struct_type()
244 .fields()
245 .iter()
246 .map(|f| f.name())
247 .zip(struct_value.items().iter())
248 .map(|(k, v)| format!("{k}: {v}"))
249 .join(", ")
250 ))?,
251 Value::Time(v) => row_writer
252 .write_col(v.to_timezone_aware_string(Some(&query_context.timezone())))?,
253 Value::Decimal128(v) => row_writer.write_col(v.to_string())?,
254 }
255 }
256 row_writer.end_row().await?;
257 }
258 Ok(())
259 }
260
261 async fn write_query_error(
262 error: impl ErrorExt,
263 w: QueryResultWriter<'a, W>,
264 query_context: QueryContextRef,
265 ) -> Result<()> {
266 METRIC_ERROR_COUNTER
267 .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
268 .inc();
269
270 let (kind, err) = handle_err(error, query_context);
271 debug!("Write query error, kind: {:?}, err: {}", kind, err);
272 w.error(kind, err.as_bytes()).await?;
273 Ok(())
274 }
275}
276
277pub(crate) fn create_mysql_column(
278 data_type: &ConcreteDataType,
279 column_name: &str,
280) -> Result<Column> {
281 let column_type = match data_type {
282 ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
283 ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
284 Ok(ColumnType::MYSQL_TYPE_TINY)
285 }
286 ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
287 Ok(ColumnType::MYSQL_TYPE_SHORT)
288 }
289 ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
290 ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
291 Ok(ColumnType::MYSQL_TYPE_LONGLONG)
292 }
293 ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
294 ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
295 ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
296 Ok(ColumnType::MYSQL_TYPE_VARCHAR)
297 }
298 ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
299 ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
300 ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
301 ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
302 ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
303 ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
304 ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
305 ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
306 ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
307 ConcreteDataType::Struct(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
308 _ => error::UnsupportedDataTypeSnafu {
309 data_type,
310 reason: "not implemented",
311 }
312 .fail(),
313 };
314 let mut colflags = ColumnFlags::empty();
315 match data_type {
316 ConcreteDataType::UInt16(_)
317 | ConcreteDataType::UInt8(_)
318 | ConcreteDataType::UInt32(_)
319 | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
320 _ => {}
321 };
322 column_type.map(|column_type| Column {
323 column: column_name.to_string(),
324 coltype: column_type,
325
326 table: String::default(),
329 colflags,
330 })
331}
332
333pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
335 schema
336 .column_schemas()
337 .iter()
338 .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
339 .collect()
340}
341
342fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
343 match status_code {
344 StatusCode::Success => ErrorKind::ER_YES,
345 StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
346 StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
347 StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
348 StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
349 StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
350 StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
351 ErrorKind::ER_TABLE_EXISTS_ERROR
352 }
353 StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
354 StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
355 StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
356 StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
357 StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
358 StatusCode::PermissionDenied | StatusCode::AccessDenied => {
359 ErrorKind::ER_ACCESS_DENIED_ERROR
360 }
361 StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
362 StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
363 ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
364 }
365 StatusCode::Unexpected
366 | StatusCode::Internal
367 | StatusCode::IllegalState
368 | StatusCode::PlanQuery
369 | StatusCode::EngineExecuteQuery
370 | StatusCode::RegionNotReady
371 | StatusCode::RegionBusy
372 | StatusCode::TableUnavailable
373 | StatusCode::StorageUnavailable
374 | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
375 StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
376 StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
377 StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
378 StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
379 StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
380 StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
381 StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
382 StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
383 StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
384 }
385}