1use std::ops::Deref;
16
17use common_error::ext::ErrorExt;
18use common_error::status_code::StatusCode;
19use common_query::{Output, OutputData};
20use common_recordbatch::{RecordBatch, SendableRecordBatchStream};
21use common_telemetry::{debug, error};
22use datatypes::prelude::{ConcreteDataType, Value};
23use datatypes::schema::SchemaRef;
24use datatypes::types::json_type_value_to_string;
25use futures::StreamExt;
26use itertools::Itertools;
27use opensrv_mysql::{
28 Column, ColumnFlags, ColumnType, ErrorKind, OkResponse, QueryResultWriter, RowWriter,
29};
30use session::context::QueryContextRef;
31use snafu::prelude::*;
32use tokio::io::AsyncWrite;
33
34use crate::error::{self, ConvertSqlValueSnafu, Result};
35use crate::metrics::*;
36
37pub async fn write_output<W: AsyncWrite + Send + Sync + Unpin>(
39 w: QueryResultWriter<'_, W>,
40 query_context: QueryContextRef,
41 outputs: Vec<Result<Output>>,
42) -> Result<()> {
43 let mut writer = Some(MysqlResultWriter::new(w, query_context.clone()));
44 for output in outputs {
45 let result_writer = writer.take().context(error::InternalSnafu {
46 err_msg: "Sending multiple result set is unsupported",
47 })?;
48 writer = result_writer.try_write_one(output).await?;
49 }
50
51 if let Some(result_writer) = writer {
52 result_writer.finish().await?;
53 }
54 Ok(())
55}
56
57pub fn handle_err(e: impl ErrorExt, query_ctx: QueryContextRef) -> (ErrorKind, String) {
59 let status_code = e.status_code();
60 let kind = mysql_error_kind(&status_code);
61
62 if status_code.should_log_error() {
63 let root_error = e.root_cause().unwrap_or(&e);
64 error!(e; "Failed to handle mysql query, code: {}, error: {}, db: {}", status_code, root_error.to_string(), query_ctx.get_db_string());
65 } else {
66 debug!(
67 "Failed to handle mysql query, code: {}, db: {}, error: {:?}",
68 status_code,
69 query_ctx.get_db_string(),
70 e
71 );
72 };
73 let msg = e.output_msg();
74 let err_msg = format!("({status_code}): {msg}");
76
77 (kind, err_msg)
78}
79
80struct QueryResult {
81 schema: SchemaRef,
82 stream: SendableRecordBatchStream,
83}
84
85pub struct MysqlResultWriter<'a, W: AsyncWrite + Unpin> {
86 writer: QueryResultWriter<'a, W>,
87 query_context: QueryContextRef,
88}
89
90impl<'a, W: AsyncWrite + Unpin> MysqlResultWriter<'a, W> {
91 pub fn new(
92 writer: QueryResultWriter<'a, W>,
93 query_context: QueryContextRef,
94 ) -> MysqlResultWriter<'a, W> {
95 MysqlResultWriter::<'a, W> {
96 writer,
97 query_context,
98 }
99 }
100
101 pub async fn try_write_one(
103 self,
104 output: Result<Output>,
105 ) -> Result<Option<MysqlResultWriter<'a, W>>> {
106 match output {
109 Ok(output) => match output.data {
110 OutputData::Stream(stream) => {
111 let query_result = QueryResult {
112 schema: stream.schema(),
113 stream,
114 };
115 Self::write_query_result(query_result, self.writer, self.query_context).await?;
116 }
117 OutputData::RecordBatches(recordbatches) => {
118 let query_result = QueryResult {
119 schema: recordbatches.schema(),
120 stream: recordbatches.as_stream(),
121 };
122 Self::write_query_result(query_result, self.writer, self.query_context).await?;
123 }
124 OutputData::AffectedRows(rows) => {
125 let next_writer = Self::write_affected_rows(self.writer, rows).await?;
126 return Ok(Some(MysqlResultWriter::new(
127 next_writer,
128 self.query_context,
129 )));
130 }
131 },
132 Err(error) => Self::write_query_error(error, self.writer, self.query_context).await?,
133 }
134 Ok(None)
135 }
136
137 pub async fn finish(self) -> Result<()> {
139 self.writer.no_more_results().await?;
140 Ok(())
141 }
142
143 async fn write_affected_rows(
144 w: QueryResultWriter<'a, W>,
145 rows: usize,
146 ) -> Result<QueryResultWriter<'a, W>> {
147 let next_writer = w
148 .complete_one(OkResponse {
149 affected_rows: rows as u64,
150 ..Default::default()
151 })
152 .await?;
153 Ok(next_writer)
154 }
155
156 async fn write_query_result(
157 mut query_result: QueryResult,
158 writer: QueryResultWriter<'a, W>,
159 query_context: QueryContextRef,
160 ) -> Result<()> {
161 match create_mysql_column_def(&query_result.schema) {
162 Ok(column_def) => {
163 let mut row_writer = writer.start(&column_def).await?;
166 while let Some(record_batch) = query_result.stream.next().await {
167 match record_batch {
168 Ok(record_batch) => {
169 Self::write_recordbatch(
170 &mut row_writer,
171 &record_batch,
172 query_context.clone(),
173 &query_result.schema,
174 )
175 .await?
176 }
177 Err(e) => {
178 let (kind, err) = handle_err(e, query_context);
179 debug!("Failed to get result, kind: {:?}, err: {}", kind, err);
180 row_writer.finish_error(kind, &err.as_bytes()).await?;
181
182 return Ok(());
183 }
184 }
185 }
186 row_writer.finish().await?;
187 Ok(())
188 }
189 Err(error) => Self::write_query_error(error, writer, query_context).await,
190 }
191 }
192
193 async fn write_recordbatch(
194 row_writer: &mut RowWriter<'_, W>,
195 recordbatch: &RecordBatch,
196 query_context: QueryContextRef,
197 schema: &SchemaRef,
198 ) -> Result<()> {
199 for row in recordbatch.rows() {
200 for (value, column) in row.into_iter().zip(schema.column_schemas().iter()) {
201 match value {
202 Value::Null => row_writer.write_col(None::<u8>)?,
203 Value::Boolean(v) => row_writer.write_col(v as i8)?,
204 Value::UInt8(v) => row_writer.write_col(v)?,
205 Value::UInt16(v) => row_writer.write_col(v)?,
206 Value::UInt32(v) => row_writer.write_col(v)?,
207 Value::UInt64(v) => row_writer.write_col(v)?,
208 Value::Int8(v) => row_writer.write_col(v)?,
209 Value::Int16(v) => row_writer.write_col(v)?,
210 Value::Int32(v) => row_writer.write_col(v)?,
211 Value::Int64(v) => row_writer.write_col(v)?,
212 Value::Float32(v) => row_writer.write_col(v.0)?,
213 Value::Float64(v) => row_writer.write_col(v.0)?,
214 Value::String(v) => row_writer.write_col(v.as_utf8())?,
215 Value::Binary(v) => match column.data_type {
216 ConcreteDataType::Json(j) => {
217 let s = json_type_value_to_string(&v, &j.format)
218 .context(ConvertSqlValueSnafu)?;
219 row_writer.write_col(s)?;
220 }
221 _ => {
222 row_writer.write_col(v.deref())?;
223 }
224 },
225 Value::Date(v) => row_writer.write_col(v.to_chrono_date())?,
226 Value::Timestamp(v) => row_writer.write_col(
228 v.to_chrono_datetime_with_timezone(Some(&query_context.timezone())),
229 )?,
230 Value::IntervalYearMonth(v) => row_writer.write_col(v.to_iso8601_string())?,
231 Value::IntervalDayTime(v) => row_writer.write_col(v.to_iso8601_string())?,
232 Value::IntervalMonthDayNano(v) => {
233 row_writer.write_col(v.to_iso8601_string())?
234 }
235 Value::Duration(v) => row_writer.write_col(v.to_std_duration())?,
236 Value::List(v) => row_writer.write_col(format!(
237 "[{}]",
238 v.items().iter().map(|x| x.to_string()).join(", ")
239 ))?,
240 Value::Time(v) => row_writer
241 .write_col(v.to_timezone_aware_string(Some(&query_context.timezone())))?,
242 Value::Decimal128(v) => row_writer.write_col(v.to_string())?,
243 }
244 }
245 row_writer.end_row().await?;
246 }
247 Ok(())
248 }
249
250 async fn write_query_error(
251 error: impl ErrorExt,
252 w: QueryResultWriter<'a, W>,
253 query_context: QueryContextRef,
254 ) -> Result<()> {
255 METRIC_ERROR_COUNTER
256 .with_label_values(&[METRIC_ERROR_COUNTER_LABEL_MYSQL])
257 .inc();
258
259 let (kind, err) = handle_err(error, query_context);
260 debug!("Write query error, kind: {:?}, err: {}", kind, err);
261 w.error(kind, err.as_bytes()).await?;
262 Ok(())
263 }
264}
265
266pub(crate) fn create_mysql_column(
267 data_type: &ConcreteDataType,
268 column_name: &str,
269) -> Result<Column> {
270 let column_type = match data_type {
271 ConcreteDataType::Null(_) => Ok(ColumnType::MYSQL_TYPE_NULL),
272 ConcreteDataType::Boolean(_) | ConcreteDataType::Int8(_) | ConcreteDataType::UInt8(_) => {
273 Ok(ColumnType::MYSQL_TYPE_TINY)
274 }
275 ConcreteDataType::Int16(_) | ConcreteDataType::UInt16(_) => {
276 Ok(ColumnType::MYSQL_TYPE_SHORT)
277 }
278 ConcreteDataType::Int32(_) | ConcreteDataType::UInt32(_) => Ok(ColumnType::MYSQL_TYPE_LONG),
279 ConcreteDataType::Int64(_) | ConcreteDataType::UInt64(_) => {
280 Ok(ColumnType::MYSQL_TYPE_LONGLONG)
281 }
282 ConcreteDataType::Float32(_) => Ok(ColumnType::MYSQL_TYPE_FLOAT),
283 ConcreteDataType::Float64(_) => Ok(ColumnType::MYSQL_TYPE_DOUBLE),
284 ConcreteDataType::Binary(_) | ConcreteDataType::String(_) => {
285 Ok(ColumnType::MYSQL_TYPE_VARCHAR)
286 }
287 ConcreteDataType::Timestamp(_) => Ok(ColumnType::MYSQL_TYPE_TIMESTAMP),
288 ConcreteDataType::Time(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
289 ConcreteDataType::Date(_) => Ok(ColumnType::MYSQL_TYPE_DATE),
290 ConcreteDataType::Interval(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
291 ConcreteDataType::Duration(_) => Ok(ColumnType::MYSQL_TYPE_TIME),
292 ConcreteDataType::Decimal128(_) => Ok(ColumnType::MYSQL_TYPE_DECIMAL),
293 ConcreteDataType::Json(_) => Ok(ColumnType::MYSQL_TYPE_JSON),
294 ConcreteDataType::Vector(_) => Ok(ColumnType::MYSQL_TYPE_BLOB),
295 ConcreteDataType::List(_) => Ok(ColumnType::MYSQL_TYPE_VARCHAR),
296 _ => error::UnsupportedDataTypeSnafu {
297 data_type,
298 reason: "not implemented",
299 }
300 .fail(),
301 };
302 let mut colflags = ColumnFlags::empty();
303 match data_type {
304 ConcreteDataType::UInt16(_)
305 | ConcreteDataType::UInt8(_)
306 | ConcreteDataType::UInt32(_)
307 | ConcreteDataType::UInt64(_) => colflags |= ColumnFlags::UNSIGNED_FLAG,
308 _ => {}
309 };
310 column_type.map(|column_type| Column {
311 column: column_name.to_string(),
312 coltype: column_type,
313
314 table: String::default(),
317 colflags,
318 })
319}
320
321pub fn create_mysql_column_def(schema: &SchemaRef) -> Result<Vec<Column>> {
323 schema
324 .column_schemas()
325 .iter()
326 .map(|column_schema| create_mysql_column(&column_schema.data_type, &column_schema.name))
327 .collect()
328}
329
330fn mysql_error_kind(status_code: &StatusCode) -> ErrorKind {
331 match status_code {
332 StatusCode::Success => ErrorKind::ER_YES,
333 StatusCode::Unknown | StatusCode::External => ErrorKind::ER_UNKNOWN_ERROR,
334 StatusCode::Unsupported => ErrorKind::ER_NOT_SUPPORTED_YET,
335 StatusCode::Cancelled | StatusCode::DeadlineExceeded => ErrorKind::ER_QUERY_INTERRUPTED,
336 StatusCode::RuntimeResourcesExhausted => ErrorKind::ER_OUT_OF_RESOURCES,
337 StatusCode::InvalidSyntax => ErrorKind::ER_SYNTAX_ERROR,
338 StatusCode::RegionAlreadyExists | StatusCode::TableAlreadyExists => {
339 ErrorKind::ER_TABLE_EXISTS_ERROR
340 }
341 StatusCode::RegionNotFound | StatusCode::TableNotFound => ErrorKind::ER_NO_SUCH_TABLE,
342 StatusCode::RegionReadonly => ErrorKind::ER_READ_ONLY_MODE,
343 StatusCode::DatabaseNotFound => ErrorKind::ER_WRONG_DB_NAME,
344 StatusCode::UserNotFound => ErrorKind::ER_NO_SUCH_USER,
345 StatusCode::UnsupportedPasswordType => ErrorKind::ER_PASSWORD_FORMAT,
346 StatusCode::PermissionDenied | StatusCode::AccessDenied => {
347 ErrorKind::ER_ACCESS_DENIED_ERROR
348 }
349 StatusCode::UserPasswordMismatch => ErrorKind::ER_DBACCESS_DENIED_ERROR,
350 StatusCode::InvalidAuthHeader | StatusCode::AuthHeaderNotFound => {
351 ErrorKind::ER_NOT_SUPPORTED_AUTH_MODE
352 }
353 StatusCode::Unexpected
354 | StatusCode::Internal
355 | StatusCode::IllegalState
356 | StatusCode::PlanQuery
357 | StatusCode::EngineExecuteQuery
358 | StatusCode::RegionNotReady
359 | StatusCode::RegionBusy
360 | StatusCode::TableUnavailable
361 | StatusCode::StorageUnavailable
362 | StatusCode::RequestOutdated => ErrorKind::ER_INTERNAL_ERROR,
363 StatusCode::InvalidArguments => ErrorKind::ER_WRONG_ARGUMENTS,
364 StatusCode::TableColumnNotFound => ErrorKind::ER_BAD_FIELD_ERROR,
365 StatusCode::TableColumnExists => ErrorKind::ER_DUP_FIELDNAME,
366 StatusCode::DatabaseAlreadyExists => ErrorKind::ER_DB_CREATE_EXISTS,
367 StatusCode::RateLimited => ErrorKind::ER_TOO_MANY_CONCURRENT_TRXS,
368 StatusCode::FlowAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
369 StatusCode::FlowNotFound => ErrorKind::ER_NO_SUCH_TABLE,
370 StatusCode::TriggerAlreadyExists => ErrorKind::ER_TABLE_EXISTS_ERROR,
371 StatusCode::TriggerNotFound => ErrorKind::ER_NO_SUCH_TABLE,
372 }
373}