1use std::ops::Deref;
16
17use common_error::ext::ErrorExt;
18use common_error::status_code::StatusCode;
19use common_query::{Output, OutputData};
20use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
21use common_telemetry::{debug, error};
22use datatypes::prelude::{ConcreteDataType, Value};
23use datatypes::schema::SchemaRef;
24use datatypes::types::json_type_value_to_string;
25use futures::StreamExt;
26use opensrv_mysql::{
27 Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
28};
29use session::context::QueryContextRef;
30use snafu::prelude::*;
31use tokio::io::AsyncWrite;
32
33use crate::error::{self, ConvertSqlValueSnafu, Error, Result};
34use crate::metrics::*;
35
36pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
38 w: QueryResultWriter<'_, W>,
39 query_context: QueryContextRef,
40 outputs: Vec<Result<Output>>,
41) -> Result<()> {
42 let mut writer = Some(MysqlResultWriter::new(w, query_context.clone()));
43 for output in outputs {
44 let result_writer = writer.take().context(error::InternalSnafu {
45 err_msg: "Sending multiple result set is unsupported",
46 })?;
47 writer = result_writer.try_write_one(output).await?;
48 }
49
50 if let Some(result_writer) = writer {
51 result_writer.finish().await?;
52 }
53 Ok(())
54}
55
56pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
58 let status_code = e.status_code();
59 let kind = mysql_error_kind(&status_code);
60
61 if status_code.should_log_error() {
62 let root_error = e.root_cause().unwrap_or(&e);
63 error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
64 } else {
65 debug!(
66 "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
67 status_code,
68 query_ctx.get_db_string(),
69 e
70 );
71 };
72 let msg = e.output_msg();
73 let err_msg = format!("({status_code}): {msg}");
75
76 (kind, err_msg)
77}
78
79struct QueryResult {
80 schema: SchemaRef,
81 stream: SendableRecordBatchStream,
82}
83
84pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
85 writer: QueryResultWriter<'a, W>,
86 query_context: QueryContextRef,
87}
88
89impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
90 pub fn new(
91 writer: QueryResultWriter<'a, W>,
92 query_context: QueryContextRef,
93 ) -> MysqlResultWriter<'a, W> {
94 MysqlResultWriter::<'a, W> {
95 writer,
96 query_context,
97 }
98 }
99
100 pub async fn try_write_one(
102 self,
103 output: Result<Output>,
104 ) -> Result<Option<MysqlResultWriter<'a, W>>> {
105 match output {
108 Ok(output) => match output.data {
109 OutputData::Stream(stream) => {
110 let query_result = QueryResult {
111 schema: stream.schema(),
112 stream,
113 };
114 Self::write_query_result(query_result, self.writer, self.query_context).await?;
115 }
116 OutputData::RecordBatches(recordbatches) => {
117 let query_result = QueryResult {
118 schema: recordbatches.schema(),
119 stream: recordbatches.as_stream(),
120 };
121 Self::write_query_result(query_result, self.writer, self.query_context).await?;
122 }
123 OutputData::AffectedRows(rows) => {
124 let next_writer = Self::write_affected_rows(self.writer, rows).await?;
125 return Ok(Some(MysqlResultWriter::new(
126 next_writer,
127 self.query_context,
128 )));
129 }
130 },
131 Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
132 }
133 Ok(None)
134 }
135
136 pub async fn finish(self) -> Result<()> {
138 self.writer.no_more_results().await?;
139 Ok(())
140 }
141
142 async fn write_affected_rows(
143 w: QueryResultWriter<'a, W>,
144 rows: usize,
145 ) -> Result<QueryResultWriter<'a, W>> {
146 let next_writer = w
147 .complete_one(OkResponse {
148 affected_rows: rows as u64,
149 ..Default::default()
150 })
151 .await?;
152 Ok(next_writer)
153 }
154
155 async fn write_query_result(
156 mut query_result: QueryResult,
157 writer: QueryResultWriter<'a, W>,
158 query_context: QueryContextRef,
159 ) -> Result<()> {
160 match create_mysql_column_def(&query_result.schema) {
161 Ok(column_def) => {
162 let mut row_writer = writer.start(&column_def).await?;
165 while let Some(record_batch) = query_result.stream.next().await {
166 match record_batch {
167 Ok(record_batch) => {
168 Self::write_recordbatch(
169 &mut row_writer,
170 &record_batch,
171 query_context.clone(),
172 &query_result.schema,
173 )
174 .await?
175 }
176 Err(e) => {
177 let (kind, err) = handle_err(e, query_context);
178 debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
179 row_writer.finish_error(kind, &err.as_bytes()).await?;
180
181 return Ok(());
182 }
183 }
184 }
185 row_writer.finish().await?;
186 Ok(())
187 }
188 Err(error) => Self::write_query_error(error, writer, query_context).await,
189 }
190 }
191
192 async fn write_recordbatch(
193 row_writer: &mut RowWriter<'_, W>,
194 recordbatch: &RecordBatch,
195 query_context: QueryContextRef,
196 schema: &SchemaRef,
197 ) -> Result<()> {
198 for row in recordbatch.rows() {
199 for (value, column) in row.into_iter().zip(schema.column_schemas().iter()) {
200 match value {
201 Value::Null => row_writer.write_col(None::<u8>)?,
202 Value::Boolean(v) => row_writer.write_col(v as i8)?,
203 Value::UInt8(v) => row_writer.write_col(v)?,
204 Value::UInt16(v) => row_writer.write_col(v)?,
205 Value::UInt32(v) => row_writer.write_col(v)?,
206 Value::UInt64(v) => row_writer.write_col(v)?,
207 Value::Int8(v) => row_writer.write_col(v)?,
208 Value::Int16(v) => row_writer.write_col(v)?,
209 Value::Int32(v) => row_writer.write_col(v)?,
210 Value::Int64(v) => row_writer.write_col(v)?,
211 Value::Float32(v) => row_writer.write_col(v.0)?,
212 Value::Float64(v) => row_writer.write_col(v.0)?,
213 Value::String(v) => row_writer.write_col(v.as_utf8())?,
214 Value::Binary(v) => match column.data_type {
215 ConcreteDataType::Json(j) => {
216 let s = json_type_value_to_string(&v, &j.format)
217 .context(ConvertSqlValueSnafu)?;
218 row_writer.write_col(s)?;
219 }
220 _ => {
221 row_writer.write_col(v.deref())?;
222 }
223 },
224 Value::Date(v) => row_writer.write_col(v.to_chrono_date())?,
225 Value::Timestamp(v) => row_writer.write_col(
227 v.to_chrono_datetime_with_timezone(Some(&query_context.timezone())),
228 )?,
229 Value::IntervalYearMonth(v) => row_writer.write_col(v.to_iso8601_string())?,
230 Value::IntervalDayTime(v) => row_writer.write_col(v.to_iso8601_string())?,
231 Value::IntervalMonthDayNano(v) => {
232 row_writer.write_col(v.to_iso8601_string())?
233 }
234 Value::Duration(v) => row_writer.write_col(v.to_std_duration())?,
235 Value::List(_) => {
236 return Err(Error::Internal {
237 err_msg: format!(
238 "cannot write value {:?} in mysql protocol: unimplemented",
239 &value
240 ),
241 })
242 }
243 Value::Time(v) => row_writer
244 .write_col(v.to_timezone_aware_string(Some(&query_context.timezone())))?,
245 Value::Decimal128(v) => row_writer.write_col(v.to_string())?,
246 }
247 }
248 row_writer.end_row().await?;
249 }
250 Ok(())
251 }
252
253 async fn write_query_error(
254 error: impl ErrorExt,
255 w: QueryResultWriter<'a, W>,
256 query_context: QueryContextRef,
257 ) -> Result<()> {
258 METRIC_ERROR_COUNTER
259 .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
260 .inc();
261
262 let (kind, err) = handle_err(error, query_context);
263 debug!("Write query error, kind: {:?}, err: {}", kind, err);
264 w.error(kind, err.as_bytes()).await?;
265 Ok(())
266 }
267}
268
269pub(crate) fn create_mysql_column(
270 data_type: &ConcreteDataType,
271 column_name: &str,
272) -> Result<Column> {
273 let column_type = match data_type {
274 ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
275 ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
276 Ok(ColumnType::MYSQL_TYPE_TINY)
277 }
278 ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
279 Ok(ColumnType::MYSQL_TYPE_SHORT)
280 }
281 ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
282 ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
283 Ok(ColumnType::MYSQL_TYPE_LONGLONG)
284 }
285 ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
286 ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
287 ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
288 Ok(ColumnType::MYSQL_TYPE_VARCHAR)
289 }
290 ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
291 ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
292 ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
293 ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
294 ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
295 ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
296 ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
297 ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
298 _ => error::UnsupportedDataTypeSnafu {
299 data_type,
300 reason: "not implemented",
301 }
302 .fail(),
303 };
304 let mut colflags = ColumnFlags::empty();
305 match data_type {
306 ConcreteDataType::UInt16(_)
307 | ConcreteDataType::UInt8(_)
308 | ConcreteDataType::UInt32(_)
309 | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
310 _ => {}
311 };
312 column_type.map(|column_type| Column {
313 column: column_name.to_string(),
314 coltype: column_type,
315
316 table: String::default(),
319 colflags,
320 })
321}
322
323pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
325 schema
326 .column_schemas()
327 .iter()
328 .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
329 .collect()
330}
331
332fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
333 match status_code {
334 StatusCode::Success => ErrorKind::ER_YES,
335 StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
336 StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
337 StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
338 StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
339 StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
340 StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
341 ErrorKind::ER_TABLE_EXISTS_ERROR
342 }
343 StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
344 StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
345 StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
346 StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
347 StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
348 StatusCode::PermissionDenied | StatusCode::AccessDenied => {
349 ErrorKind::ER_ACCESS_DENIED_ERROR
350 }
351 StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
352 StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
353 ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
354 }
355 StatusCode::Unexpected
356 | StatusCode::Internal
357 | StatusCode::IllegalState
358 | StatusCode::PlanQuery
359 | StatusCode::EngineExecuteQuery
360 | StatusCode::RegionNotReady
361 | StatusCode::RegionBusy
362 | StatusCode::TableUnavailable
363 | StatusCode::StorageUnavailable
364 | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
365 StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
366 StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
367 StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
368 StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
369 StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
370 StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
371 StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
372 }
373}