1mod bytea;
16mod datetime;
17mod error;
18mod interval;
19
20use std::collections::HashMap;
21use std::ops::Deref;
22
23use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
24use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datafusion_expr::LogicalPlan;
27use datatypes::arrow::datatypes::DataType as ArrowDataType;
28use datatypes::prelude::{ConcreteDataType, Value};
29use datatypes::schema::Schema;
30use datatypes::types::{json_type_value_to_string, IntervalType, TimestampType};
31use datatypes::value::ListValue;
32use pgwire::api::portal::{Format, Portal};
33use pgwire::api::results::{DataRowEncoder, FieldInfo};
34use pgwire::api::Type;
35use pgwire::error::{PgWireError, PgWireResult};
36use session::context::QueryContextRef;
37use session::session_config::PGByteaOutputValue;
38
39use self::bytea::{EscapeOutputBytea, HexOutputBytea};
40use self::datetime::{StylingDate, StylingDateTime};
41pub use self::error::{PgErrorCode, PgErrorSeverity};
42use self::interval::PgInterval;
43use crate::error::{self as server_error, Error, Result};
44use crate::SqlPlan;
45
46pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result<Vec<FieldInfo>> {
47 origin
48 .column_schemas()
49 .iter()
50 .enumerate()
51 .map(|(idx, col)| {
52 Ok(FieldInfo::new(
53 col.name.clone(),
54 None,
55 None,
56 type_gt_to_pg(&col.data_type)?,
57 field_formats.format_for(idx),
58 ))
59 })
60 .collect::<Result<Vec<FieldInfo>>>()
61}
62
63fn encode_array(
64 query_ctx: &QueryContextRef,
65 value_list: &ListValue,
66 builder: &mut DataRowEncoder,
67) -> PgWireResult<()> {
68 match value_list.datatype() {
69 &ConcreteDataType::Boolean(_) => {
70 let array = value_list
71 .items()
72 .iter()
73 .map(|v| match v {
74 Value::Null => Ok(None),
75 Value::Boolean(v) => Ok(Some(*v)),
76 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
77 err_msg: format!("Invalid list item type, find {v:?}, expected bool",),
78 }))),
79 })
80 .collect::<PgWireResult<Vec<Option<bool>>>>()?;
81 builder.encode_field(&array)
82 }
83 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => {
84 let array = value_list
85 .items()
86 .iter()
87 .map(|v| match v {
88 Value::Null => Ok(None),
89 Value::Int8(v) => Ok(Some(*v)),
90 Value::UInt8(v) => Ok(Some(*v as i8)),
91 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
92 err_msg: format!(
93 "Invalid list item type, find {v:?}, expected int8 or uint8",
94 ),
95 }))),
96 })
97 .collect::<PgWireResult<Vec<Option<i8>>>>()?;
98 builder.encode_field(&array)
99 }
100 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => {
101 let array = value_list
102 .items()
103 .iter()
104 .map(|v| match v {
105 Value::Null => Ok(None),
106 Value::Int16(v) => Ok(Some(*v)),
107 Value::UInt16(v) => Ok(Some(*v as i16)),
108 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
109 err_msg: format!(
110 "Invalid list item type, find {v:?}, expected int16 or uint16",
111 ),
112 }))),
113 })
114 .collect::<PgWireResult<Vec<Option<i16>>>>()?;
115 builder.encode_field(&array)
116 }
117 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => {
118 let array = value_list
119 .items()
120 .iter()
121 .map(|v| match v {
122 Value::Null => Ok(None),
123 Value::Int32(v) => Ok(Some(*v)),
124 Value::UInt32(v) => Ok(Some(*v as i32)),
125 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
126 err_msg: format!(
127 "Invalid list item type, find {v:?}, expected int32 or uint32",
128 ),
129 }))),
130 })
131 .collect::<PgWireResult<Vec<Option<i32>>>>()?;
132 builder.encode_field(&array)
133 }
134 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => {
135 let array = value_list
136 .items()
137 .iter()
138 .map(|v| match v {
139 Value::Null => Ok(None),
140 Value::Int64(v) => Ok(Some(*v)),
141 Value::UInt64(v) => Ok(Some(*v as i64)),
142 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
143 err_msg: format!(
144 "Invalid list item type, find {v:?}, expected int64 or uint64",
145 ),
146 }))),
147 })
148 .collect::<PgWireResult<Vec<Option<i64>>>>()?;
149 builder.encode_field(&array)
150 }
151 &ConcreteDataType::Float32(_) => {
152 let array = value_list
153 .items()
154 .iter()
155 .map(|v| match v {
156 Value::Null => Ok(None),
157 Value::Float32(v) => Ok(Some(v.0)),
158 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
159 err_msg: format!("Invalid list item type, find {v:?}, expected float32",),
160 }))),
161 })
162 .collect::<PgWireResult<Vec<Option<f32>>>>()?;
163 builder.encode_field(&array)
164 }
165 &ConcreteDataType::Float64(_) => {
166 let array = value_list
167 .items()
168 .iter()
169 .map(|v| match v {
170 Value::Null => Ok(None),
171 Value::Float64(v) => Ok(Some(v.0)),
172 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
173 err_msg: format!("Invalid list item type, find {v:?}, expected float64",),
174 }))),
175 })
176 .collect::<PgWireResult<Vec<Option<f64>>>>()?;
177 builder.encode_field(&array)
178 }
179 &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => {
180 let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
181
182 match *bytea_output {
183 PGByteaOutputValue::ESCAPE => {
184 let array = value_list
185 .items()
186 .iter()
187 .map(|v| match v {
188 Value::Null => Ok(None),
189 Value::Binary(v) => Ok(Some(EscapeOutputBytea(v.deref()))),
190
191 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
192 err_msg: format!(
193 "Invalid list item type, find {v:?}, expected binary",
194 ),
195 }))),
196 })
197 .collect::<PgWireResult<Vec<Option<EscapeOutputBytea>>>>()?;
198 builder.encode_field(&array)
199 }
200 PGByteaOutputValue::HEX => {
201 let array = value_list
202 .items()
203 .iter()
204 .map(|v| match v {
205 Value::Null => Ok(None),
206 Value::Binary(v) => Ok(Some(HexOutputBytea(v.deref()))),
207
208 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
209 err_msg: format!(
210 "Invalid list item type, find {v:?}, expected binary",
211 ),
212 }))),
213 })
214 .collect::<PgWireResult<Vec<Option<HexOutputBytea>>>>()?;
215 builder.encode_field(&array)
216 }
217 }
218 }
219 &ConcreteDataType::String(_) => {
220 let array = value_list
221 .items()
222 .iter()
223 .map(|v| match v {
224 Value::Null => Ok(None),
225 Value::String(v) => Ok(Some(v.as_utf8())),
226 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
227 err_msg: format!("Invalid list item type, find {v:?}, expected string",),
228 }))),
229 })
230 .collect::<PgWireResult<Vec<Option<&str>>>>()?;
231 builder.encode_field(&array)
232 }
233 &ConcreteDataType::Date(_) => {
234 let array = value_list
235 .items()
236 .iter()
237 .map(|v| match v {
238 Value::Null => Ok(None),
239 Value::Date(v) => {
240 if let Some(date) = v.to_chrono_date() {
241 let (style, order) =
242 *query_ctx.configuration_parameter().pg_datetime_style();
243 Ok(Some(StylingDate(date, style, order)))
244 } else {
245 Err(PgWireError::ApiError(Box::new(Error::Internal {
246 err_msg: format!("Failed to convert date to postgres type {v:?}",),
247 })))
248 }
249 }
250 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
251 err_msg: format!("Invalid list item type, find {v:?}, expected date",),
252 }))),
253 })
254 .collect::<PgWireResult<Vec<Option<StylingDate>>>>()?;
255 builder.encode_field(&array)
256 }
257 &ConcreteDataType::Timestamp(_) => {
258 let array = value_list
259 .items()
260 .iter()
261 .map(|v| match v {
262 Value::Null => Ok(None),
263 Value::Timestamp(v) => {
264 if let Some(datetime) =
265 v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
266 {
267 let (style, order) =
268 *query_ctx.configuration_parameter().pg_datetime_style();
269 Ok(Some(StylingDateTime(datetime, style, order)))
270 } else {
271 Err(PgWireError::ApiError(Box::new(Error::Internal {
272 err_msg: format!("Failed to convert date to postgres type {v:?}",),
273 })))
274 }
275 }
276 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
277 err_msg: format!("Invalid list item type, find {v:?}, expected timestamp",),
278 }))),
279 })
280 .collect::<PgWireResult<Vec<Option<StylingDateTime>>>>()?;
281 builder.encode_field(&array)
282 }
283 &ConcreteDataType::Time(_) => {
284 let array = value_list
285 .items()
286 .iter()
287 .map(|v| match v {
288 Value::Null => Ok(None),
289 Value::Time(v) => Ok(v.to_chrono_time()),
290 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
291 err_msg: format!("Invalid list item type, find {v:?}, expected time",),
292 }))),
293 })
294 .collect::<PgWireResult<Vec<Option<NaiveTime>>>>()?;
295 builder.encode_field(&array)
296 }
297 &ConcreteDataType::Interval(_) => {
298 let array = value_list
299 .items()
300 .iter()
301 .map(|v| match v {
302 Value::Null => Ok(None),
303 Value::IntervalYearMonth(v) => Ok(Some(PgInterval::from(*v))),
304 Value::IntervalDayTime(v) => Ok(Some(PgInterval::from(*v))),
305 Value::IntervalMonthDayNano(v) => Ok(Some(PgInterval::from(*v))),
306 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
307 err_msg: format!("Invalid list item type, find {v:?}, expected interval",),
308 }))),
309 })
310 .collect::<PgWireResult<Vec<Option<PgInterval>>>>()?;
311 builder.encode_field(&array)
312 }
313 &ConcreteDataType::Decimal128(_) => {
314 let array = value_list
315 .items()
316 .iter()
317 .map(|v| match v {
318 Value::Null => Ok(None),
319 Value::Decimal128(v) => Ok(Some(v.to_string())),
320 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
321 err_msg: format!("Invalid list item type, find {v:?}, expected decimal",),
322 }))),
323 })
324 .collect::<PgWireResult<Vec<Option<String>>>>()?;
325 builder.encode_field(&array)
326 }
327 &ConcreteDataType::Json(j) => {
328 let array = value_list
329 .items()
330 .iter()
331 .map(|v| match v {
332 Value::Null => Ok(None),
333 Value::Binary(v) => {
334 let s = json_type_value_to_string(v, &j.format)
335 .map_err(|e| PgWireError::ApiError(Box::new(e)))?;
336 Ok(Some(s))
337 }
338 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
339 err_msg: format!("Invalid list item type, find {v:?}, expected json",),
340 }))),
341 })
342 .collect::<PgWireResult<Vec<Option<String>>>>()?;
343 builder.encode_field(&array)
344 }
345 _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
346 err_msg: format!(
347 "cannot write array type {:?} in postgres protocol: unimplemented",
348 value_list.datatype()
349 ),
350 }))),
351 }
352}
353
354pub(super) fn encode_value(
355 query_ctx: &QueryContextRef,
356 value: &Value,
357 builder: &mut DataRowEncoder,
358 datatype: &ConcreteDataType,
359) -> PgWireResult<()> {
360 match value {
361 Value::Null => builder.encode_field(&None::<&i8>),
362 Value::Boolean(v) => builder.encode_field(v),
363 Value::UInt8(v) => builder.encode_field(&(*v as i8)),
364 Value::UInt16(v) => builder.encode_field(&(*v as i16)),
365 Value::UInt32(v) => builder.encode_field(v),
366 Value::UInt64(v) => builder.encode_field(&(*v as i64)),
367 Value::Int8(v) => builder.encode_field(v),
368 Value::Int16(v) => builder.encode_field(v),
369 Value::Int32(v) => builder.encode_field(v),
370 Value::Int64(v) => builder.encode_field(v),
371 Value::Float32(v) => builder.encode_field(&v.0),
372 Value::Float64(v) => builder.encode_field(&v.0),
373 Value::String(v) => builder.encode_field(&v.as_utf8()),
374 Value::Binary(v) => match datatype {
375 ConcreteDataType::Json(j) => {
376 let s = json_type_value_to_string(v, &j.format)
377 .map_err(|e| PgWireError::ApiError(Box::new(e)))?;
378 builder.encode_field(&s)
379 }
380 _ => {
381 let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
382 match *bytea_output {
383 PGByteaOutputValue::ESCAPE => {
384 builder.encode_field(&EscapeOutputBytea(v.deref()))
385 }
386 PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
387 }
388 }
389 },
390 Value::Date(v) => {
391 if let Some(date) = v.to_chrono_date() {
392 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
393 builder.encode_field(&StylingDate(date, style, order))
394 } else {
395 Err(PgWireError::ApiError(Box::new(Error::Internal {
396 err_msg: format!("Failed to convert date to postgres type {v:?}",),
397 })))
398 }
399 }
400 Value::Timestamp(v) => {
401 if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
402 {
403 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
404 builder.encode_field(&StylingDateTime(datetime, style, order))
405 } else {
406 Err(PgWireError::ApiError(Box::new(Error::Internal {
407 err_msg: format!("Failed to convert date to postgres type {v:?}",),
408 })))
409 }
410 }
411 Value::Time(v) => {
412 if let Some(time) = v.to_chrono_time() {
413 builder.encode_field(&time)
414 } else {
415 Err(PgWireError::ApiError(Box::new(Error::Internal {
416 err_msg: format!("Failed to convert time to postgres type {v:?}",),
417 })))
418 }
419 }
420 Value::IntervalYearMonth(v) => builder.encode_field(&PgInterval::from(*v)),
421 Value::IntervalDayTime(v) => builder.encode_field(&PgInterval::from(*v)),
422 Value::IntervalMonthDayNano(v) => builder.encode_field(&PgInterval::from(*v)),
423 Value::Decimal128(v) => builder.encode_field(&v.to_string()),
424 Value::Duration(d) => match PgInterval::try_from(*d) {
425 Ok(i) => builder.encode_field(&i),
426 Err(e) => Err(PgWireError::ApiError(Box::new(Error::Internal {
427 err_msg: e.to_string(),
428 }))),
429 },
430 Value::List(values) => encode_array(query_ctx, values, builder),
431 }
432}
433
434pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
435 match origin {
436 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
437 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
438 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR),
439 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2),
440 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4),
441 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8),
442 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
443 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
444 &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
445 &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
446 &ConcreteDataType::Date(_) => Ok(Type::DATE),
447 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
448 &ConcreteDataType::Time(_) => Ok(Type::TIME),
449 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
450 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
451 &ConcreteDataType::Json(_) => Ok(Type::JSON),
452 ConcreteDataType::List(list) => match list.item_type() {
453 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
454 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
455 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR_ARRAY),
456 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2_ARRAY),
457 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4_ARRAY),
458 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8_ARRAY),
459 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
460 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
461 &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
462 &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
463 &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
464 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
465 &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
466 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
467 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
468 &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
469 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
470 &ConcreteDataType::Dictionary(_)
471 | &ConcreteDataType::Vector(_)
472 | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
473 data_type: origin,
474 reason: "not implemented",
475 }
476 .fail(),
477 },
478 &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
479 data_type: origin,
480 reason: "not implemented",
481 }
482 .fail(),
483 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
484 }
485}
486
487#[allow(dead_code)]
488pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
489 match origin {
491 &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
492 &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
493 &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
494 &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
495 &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
496 &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
497 &Type::TIMESTAMP => Ok(ConcreteDataType::timestamp_datatype(
498 common_time::timestamp::TimeUnit::Millisecond,
499 )),
500 &Type::DATE => Ok(ConcreteDataType::date_datatype()),
501 &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
502 common_time::timestamp::TimeUnit::Microsecond,
503 )),
504 &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
505 ConcreteDataType::int8_datatype(),
506 )),
507 &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(
508 ConcreteDataType::int16_datatype(),
509 )),
510 &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(
511 ConcreteDataType::int32_datatype(),
512 )),
513 &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(
514 ConcreteDataType::int64_datatype(),
515 )),
516 &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
517 ConcreteDataType::string_datatype(),
518 )),
519 _ => server_error::InternalSnafu {
520 err_msg: format!("unimplemented datatype {origin:?}"),
521 }
522 .fail(),
523 }
524}
525
526pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
527 let param_type = portal.statement.parameter_types.get(idx).unwrap();
530 match param_type {
531 &Type::VARCHAR | &Type::TEXT => Ok(format!(
532 "'{}'",
533 portal
534 .parameter::<String>(idx, param_type)?
535 .as_deref()
536 .unwrap_or("")
537 )),
538 &Type::BOOL => Ok(portal
539 .parameter::<bool>(idx, param_type)?
540 .map(|v| v.to_string())
541 .unwrap_or_else(|| "".to_owned())),
542 &Type::INT4 => Ok(portal
543 .parameter::<i32>(idx, param_type)?
544 .map(|v| v.to_string())
545 .unwrap_or_else(|| "".to_owned())),
546 &Type::INT8 => Ok(portal
547 .parameter::<i64>(idx, param_type)?
548 .map(|v| v.to_string())
549 .unwrap_or_else(|| "".to_owned())),
550 &Type::FLOAT4 => Ok(portal
551 .parameter::<f32>(idx, param_type)?
552 .map(|v| v.to_string())
553 .unwrap_or_else(|| "".to_owned())),
554 &Type::FLOAT8 => Ok(portal
555 .parameter::<f64>(idx, param_type)?
556 .map(|v| v.to_string())
557 .unwrap_or_else(|| "".to_owned())),
558 &Type::DATE => Ok(portal
559 .parameter::<NaiveDate>(idx, param_type)?
560 .map(|v| v.format("%Y-%m-%d").to_string())
561 .unwrap_or_else(|| "".to_owned())),
562 &Type::TIMESTAMP => Ok(portal
563 .parameter::<NaiveDateTime>(idx, param_type)?
564 .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
565 .unwrap_or_else(|| "".to_owned())),
566 &Type::INTERVAL => Ok(portal
567 .parameter::<PgInterval>(idx, param_type)?
568 .map(|v| v.to_string())
569 .unwrap_or_else(|| "".to_owned())),
570 _ => Err(invalid_parameter_error(
571 "unsupported_parameter_type",
572 Some(param_type.to_string()),
573 )),
574 }
575}
576
577pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
578 let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
579 error_info.detail = detail;
580 PgWireError::UserError(Box::new(error_info))
581}
582
583fn to_timestamp_scalar_value<T>(
584 data: Option<T>,
585 unit: &TimestampType,
586 ctype: &ConcreteDataType,
587) -> PgWireResult<ScalarValue>
588where
589 T: Into<i64>,
590{
591 if let Some(n) = data {
592 Value::Timestamp(unit.create_timestamp(n.into()))
593 .try_to_scalar_value(ctype)
594 .map_err(|e| PgWireError::ApiError(Box::new(e)))
595 } else {
596 Ok(ScalarValue::Null)
597 }
598}
599
600pub(super) fn parameters_to_scalar_values(
601 plan: &LogicalPlan,
602 portal: &Portal<SqlPlan>,
603) -> PgWireResult<Vec<ScalarValue>> {
604 let param_count = portal.parameter_len();
605 let mut results = Vec::with_capacity(param_count);
606
607 let client_param_types = &portal.statement.parameter_types;
608 let param_types = plan
609 .get_parameter_types()
610 .map_err(|e| PgWireError::ApiError(Box::new(e)))?
611 .into_iter()
612 .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
613 .collect::<HashMap<_, _>>();
614
615 for idx in 0..param_count {
616 let server_type = param_types
617 .get(&format!("${}", idx + 1))
618 .and_then(|t| t.as_ref());
619
620 let client_type = if let Some(client_given_type) = client_param_types.get(idx) {
621 client_given_type.clone()
622 } else if let Some(server_provided_type) = &server_type {
623 type_gt_to_pg(server_provided_type).map_err(|e| PgWireError::ApiError(Box::new(e)))?
624 } else {
625 return Err(invalid_parameter_error(
626 "unknown_parameter_type",
627 Some(format!(
628 "Cannot get parameter type information for parameter {}",
629 idx
630 )),
631 ));
632 };
633
634 let value = match &client_type {
635 &Type::VARCHAR | &Type::TEXT => {
636 let data = portal.parameter::<String>(idx, &client_type)?;
637 if let Some(server_type) = &server_type {
638 match server_type {
639 ConcreteDataType::String(_) => ScalarValue::Utf8(data),
640 _ => {
641 return Err(invalid_parameter_error(
642 "invalid_parameter_type",
643 Some(format!("Expected: {}, found: {}", server_type, client_type)),
644 ))
645 }
646 }
647 } else {
648 ScalarValue::Utf8(data)
649 }
650 }
651 &Type::BOOL => {
652 let data = portal.parameter::<bool>(idx, &client_type)?;
653 if let Some(server_type) = &server_type {
654 match server_type {
655 ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
656 _ => {
657 return Err(invalid_parameter_error(
658 "invalid_parameter_type",
659 Some(format!("Expected: {}, found: {}", server_type, client_type)),
660 ))
661 }
662 }
663 } else {
664 ScalarValue::Boolean(data)
665 }
666 }
667 &Type::INT2 => {
668 let data = portal.parameter::<i16>(idx, &client_type)?;
669 if let Some(server_type) = &server_type {
670 match server_type {
671 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
672 ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
673 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
674 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
675 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
676 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
677 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
678 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
679 ConcreteDataType::Timestamp(unit) => {
680 to_timestamp_scalar_value(data, unit, server_type)?
681 }
682 _ => {
683 return Err(invalid_parameter_error(
684 "invalid_parameter_type",
685 Some(format!("Expected: {}, found: {}", server_type, client_type)),
686 ))
687 }
688 }
689 } else {
690 ScalarValue::Int16(data)
691 }
692 }
693 &Type::INT4 => {
694 let data = portal.parameter::<i32>(idx, &client_type)?;
695 if let Some(server_type) = &server_type {
696 match server_type {
697 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
698 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
699 ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
700 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
701 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
702 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
703 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
704 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
705 ConcreteDataType::Timestamp(unit) => {
706 to_timestamp_scalar_value(data, unit, server_type)?
707 }
708 _ => {
709 return Err(invalid_parameter_error(
710 "invalid_parameter_type",
711 Some(format!("Expected: {}, found: {}", server_type, client_type)),
712 ))
713 }
714 }
715 } else {
716 ScalarValue::Int32(data)
717 }
718 }
719 &Type::INT8 => {
720 let data = portal.parameter::<i64>(idx, &client_type)?;
721 if let Some(server_type) = &server_type {
722 match server_type {
723 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
724 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
725 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
726 ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
727 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
728 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
729 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
730 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
731 ConcreteDataType::Timestamp(unit) => {
732 to_timestamp_scalar_value(data, unit, server_type)?
733 }
734 _ => {
735 return Err(invalid_parameter_error(
736 "invalid_parameter_type",
737 Some(format!("Expected: {}, found: {}", server_type, client_type)),
738 ))
739 }
740 }
741 } else {
742 ScalarValue::Int64(data)
743 }
744 }
745 &Type::FLOAT4 => {
746 let data = portal.parameter::<f32>(idx, &client_type)?;
747 if let Some(server_type) = &server_type {
748 match server_type {
749 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
750 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
751 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
752 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
753 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
754 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
755 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
756 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
757 ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
758 ConcreteDataType::Float64(_) => {
759 ScalarValue::Float64(data.map(|n| n as f64))
760 }
761 _ => {
762 return Err(invalid_parameter_error(
763 "invalid_parameter_type",
764 Some(format!("Expected: {}, found: {}", server_type, client_type)),
765 ))
766 }
767 }
768 } else {
769 ScalarValue::Float32(data)
770 }
771 }
772 &Type::FLOAT8 => {
773 let data = portal.parameter::<f64>(idx, &client_type)?;
774 if let Some(server_type) = &server_type {
775 match server_type {
776 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
777 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
778 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
779 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
780 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
781 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
782 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
783 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
784 ConcreteDataType::Float32(_) => {
785 ScalarValue::Float32(data.map(|n| n as f32))
786 }
787 ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
788 _ => {
789 return Err(invalid_parameter_error(
790 "invalid_parameter_type",
791 Some(format!("Expected: {}, found: {}", server_type, client_type)),
792 ))
793 }
794 }
795 } else {
796 ScalarValue::Float64(data)
797 }
798 }
799 &Type::TIMESTAMP => {
800 let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
801 if let Some(server_type) = &server_type {
802 match server_type {
803 ConcreteDataType::Timestamp(unit) => match *unit {
804 TimestampType::Second(_) => ScalarValue::TimestampSecond(
805 data.map(|ts| ts.and_utc().timestamp()),
806 None,
807 ),
808 TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
809 data.map(|ts| ts.and_utc().timestamp_millis()),
810 None,
811 ),
812 TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
813 data.map(|ts| ts.and_utc().timestamp_micros()),
814 None,
815 ),
816 TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
817 data.map(|ts| ts.and_utc().timestamp_micros()),
818 None,
819 ),
820 },
821 _ => {
822 return Err(invalid_parameter_error(
823 "invalid_parameter_type",
824 Some(format!("Expected: {}, found: {}", server_type, client_type)),
825 ))
826 }
827 }
828 } else {
829 ScalarValue::TimestampMillisecond(
830 data.map(|ts| ts.and_utc().timestamp_millis()),
831 None,
832 )
833 }
834 }
835 &Type::DATE => {
836 let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
837 if let Some(server_type) = &server_type {
838 match server_type {
839 ConcreteDataType::Date(_) => ScalarValue::Date32(data.map(|d| {
840 (d - NaiveDate::from(NaiveDateTime::UNIX_EPOCH)).num_days() as i32
841 })),
842 _ => {
843 return Err(invalid_parameter_error(
844 "invalid_parameter_type",
845 Some(format!("Expected: {}, found: {}", server_type, client_type)),
846 ));
847 }
848 }
849 } else {
850 ScalarValue::Date32(data.map(|d| {
851 (d - NaiveDate::from(NaiveDateTime::UNIX_EPOCH)).num_days() as i32
852 }))
853 }
854 }
855 &Type::INTERVAL => {
856 let data = portal.parameter::<PgInterval>(idx, &client_type)?;
857 if let Some(server_type) = &server_type {
858 match server_type {
859 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
860 ScalarValue::IntervalYearMonth(
861 data.map(|i| {
862 if i.days != 0 || i.microseconds != 0 {
863 Err(invalid_parameter_error(
864 "invalid_parameter_type",
865 Some(format!(
866 "Expected: {}, found: {}",
867 server_type, client_type
868 )),
869 ))
870 } else {
871 Ok(IntervalYearMonth::new(i.months).to_i32())
872 }
873 })
874 .transpose()?,
875 )
876 }
877 ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
878 ScalarValue::IntervalDayTime(
879 data.map(|i| {
880 if i.months != 0 || i.microseconds % 1000 != 0 {
881 Err(invalid_parameter_error(
882 "invalid_parameter_type",
883 Some(format!(
884 "Expected: {}, found: {}",
885 server_type, client_type
886 )),
887 ))
888 } else {
889 Ok(IntervalDayTime::new(
890 i.days,
891 (i.microseconds / 1000) as i32,
892 )
893 .into())
894 }
895 })
896 .transpose()?,
897 )
898 }
899 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
900 ScalarValue::IntervalMonthDayNano(
901 data.map(|i| IntervalMonthDayNano::from(i).into()),
902 )
903 }
904 _ => {
905 return Err(invalid_parameter_error(
906 "invalid_parameter_type",
907 Some(format!("Expected: {}, found: {}", server_type, client_type)),
908 ));
909 }
910 }
911 } else {
912 ScalarValue::IntervalMonthDayNano(
913 data.map(|i| IntervalMonthDayNano::from(i).into()),
914 )
915 }
916 }
917 &Type::BYTEA => {
918 let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
919 if let Some(server_type) = &server_type {
920 match server_type {
921 ConcreteDataType::String(_) => {
922 ScalarValue::Utf8(data.map(|d| String::from_utf8_lossy(&d).to_string()))
923 }
924 ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
925 _ => {
926 return Err(invalid_parameter_error(
927 "invalid_parameter_type",
928 Some(format!("Expected: {}, found: {}", server_type, client_type)),
929 ));
930 }
931 }
932 } else {
933 ScalarValue::Binary(data)
934 }
935 }
936 &Type::JSONB => {
937 let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
938 if let Some(server_type) = &server_type {
939 match server_type {
940 ConcreteDataType::Binary(_) => {
941 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
942 }
943 _ => {
944 return Err(invalid_parameter_error(
945 "invalid_parameter_type",
946 Some(format!("Expected: {}, found: {}", server_type, client_type)),
947 ));
948 }
949 }
950 } else {
951 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
952 }
953 }
954 &Type::INT2_ARRAY => {
955 let data = portal.parameter::<Vec<i16>>(idx, &client_type)?;
956 if let Some(data) = data {
957 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
958 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
959 } else {
960 ScalarValue::Null
961 }
962 }
963 &Type::INT4_ARRAY => {
964 let data = portal.parameter::<Vec<i32>>(idx, &client_type)?;
965 if let Some(data) = data {
966 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
967 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
968 } else {
969 ScalarValue::Null
970 }
971 }
972 &Type::INT8_ARRAY => {
973 let data = portal.parameter::<Vec<i64>>(idx, &client_type)?;
974 if let Some(data) = data {
975 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
976 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
977 } else {
978 ScalarValue::Null
979 }
980 }
981 &Type::VARCHAR_ARRAY => {
982 let data = portal.parameter::<Vec<String>>(idx, &client_type)?;
983 if let Some(data) = data {
984 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
985 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
986 } else {
987 ScalarValue::Null
988 }
989 }
990 _ => Err(invalid_parameter_error(
991 "unsupported_parameter_value",
992 Some(format!("Found type: {}", client_type)),
993 ))?,
994 };
995
996 results.push(value);
997 }
998
999 Ok(results)
1000}
1001
1002pub(super) fn param_types_to_pg_types(
1003 param_types: &HashMap<String, Option<ConcreteDataType>>,
1004) -> Result<Vec<Type>> {
1005 let param_count = param_types.len();
1006 let mut types = Vec::with_capacity(param_count);
1007 for i in 0..param_count {
1008 if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1009 let pg_type = type_gt_to_pg(param_type)?;
1010 types.push(pg_type);
1011 } else {
1012 types.push(Type::UNKNOWN);
1013 }
1014 }
1015 Ok(types)
1016}
1017
1018#[cfg(test)]
1019mod test {
1020 use std::sync::Arc;
1021
1022 use common_time::interval::IntervalUnit;
1023 use common_time::timestamp::TimeUnit;
1024 use common_time::Timestamp;
1025 use datatypes::schema::{ColumnSchema, Schema};
1026 use datatypes::value::ListValue;
1027 use pgwire::api::results::{FieldFormat, FieldInfo};
1028 use pgwire::api::Type;
1029 use session::context::QueryContextBuilder;
1030
1031 use super::*;
1032
1033 #[test]
1034 fn test_schema_convert() {
1035 let column_schemas = vec![
1036 ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1037 ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1038 ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1039 ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1040 ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1041 ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1042 ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1043 ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1044 ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1045 ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1046 ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1047 ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1048 ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1049 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1050 ColumnSchema::new(
1051 "timestamps",
1052 ConcreteDataType::timestamp_millisecond_datatype(),
1053 true,
1054 ),
1055 ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1056 ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1057 ColumnSchema::new(
1058 "intervals",
1059 ConcreteDataType::interval_month_day_nano_datatype(),
1060 true,
1061 ),
1062 ];
1063 let pg_field_info = vec![
1064 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1065 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1066 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1067 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1068 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1069 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1070 FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1071 FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1072 FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1073 FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1074 FieldInfo::new(
1075 "float32s".into(),
1076 None,
1077 None,
1078 Type::FLOAT4,
1079 FieldFormat::Text,
1080 ),
1081 FieldInfo::new(
1082 "float64s".into(),
1083 None,
1084 None,
1085 Type::FLOAT8,
1086 FieldFormat::Text,
1087 ),
1088 FieldInfo::new(
1089 "binaries".into(),
1090 None,
1091 None,
1092 Type::BYTEA,
1093 FieldFormat::Text,
1094 ),
1095 FieldInfo::new(
1096 "strings".into(),
1097 None,
1098 None,
1099 Type::VARCHAR,
1100 FieldFormat::Text,
1101 ),
1102 FieldInfo::new(
1103 "timestamps".into(),
1104 None,
1105 None,
1106 Type::TIMESTAMP,
1107 FieldFormat::Text,
1108 ),
1109 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1110 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1111 FieldInfo::new(
1112 "intervals".into(),
1113 None,
1114 None,
1115 Type::INTERVAL,
1116 FieldFormat::Text,
1117 ),
1118 ];
1119 let schema = Schema::new(column_schemas);
1120 let fs = schema_to_pg(&schema, &Format::UnifiedText).unwrap();
1121 assert_eq!(fs, pg_field_info);
1122 }
1123
1124 #[test]
1125 fn test_encode_text_format_data() {
1126 let schema = vec![
1127 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1128 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1129 FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1130 FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1131 FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1132 FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1133 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1134 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1135 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1136 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1137 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1138 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1139 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1140 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1141 FieldInfo::new(
1142 "float32s".into(),
1143 None,
1144 None,
1145 Type::FLOAT4,
1146 FieldFormat::Text,
1147 ),
1148 FieldInfo::new(
1149 "float32s".into(),
1150 None,
1151 None,
1152 Type::FLOAT4,
1153 FieldFormat::Text,
1154 ),
1155 FieldInfo::new(
1156 "float32s".into(),
1157 None,
1158 None,
1159 Type::FLOAT4,
1160 FieldFormat::Text,
1161 ),
1162 FieldInfo::new(
1163 "float64s".into(),
1164 None,
1165 None,
1166 Type::FLOAT8,
1167 FieldFormat::Text,
1168 ),
1169 FieldInfo::new(
1170 "float64s".into(),
1171 None,
1172 None,
1173 Type::FLOAT8,
1174 FieldFormat::Text,
1175 ),
1176 FieldInfo::new(
1177 "float64s".into(),
1178 None,
1179 None,
1180 Type::FLOAT8,
1181 FieldFormat::Text,
1182 ),
1183 FieldInfo::new(
1184 "strings".into(),
1185 None,
1186 None,
1187 Type::VARCHAR,
1188 FieldFormat::Text,
1189 ),
1190 FieldInfo::new(
1191 "binaries".into(),
1192 None,
1193 None,
1194 Type::BYTEA,
1195 FieldFormat::Text,
1196 ),
1197 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1198 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1199 FieldInfo::new(
1200 "timestamps".into(),
1201 None,
1202 None,
1203 Type::TIMESTAMP,
1204 FieldFormat::Text,
1205 ),
1206 FieldInfo::new(
1207 "interval_year_month".into(),
1208 None,
1209 None,
1210 Type::INTERVAL,
1211 FieldFormat::Text,
1212 ),
1213 FieldInfo::new(
1214 "interval_day_time".into(),
1215 None,
1216 None,
1217 Type::INTERVAL,
1218 FieldFormat::Text,
1219 ),
1220 FieldInfo::new(
1221 "interval_month_day_nano".into(),
1222 None,
1223 None,
1224 Type::INTERVAL,
1225 FieldFormat::Text,
1226 ),
1227 FieldInfo::new(
1228 "int_list".into(),
1229 None,
1230 None,
1231 Type::INT8_ARRAY,
1232 FieldFormat::Text,
1233 ),
1234 FieldInfo::new(
1235 "float_list".into(),
1236 None,
1237 None,
1238 Type::FLOAT8_ARRAY,
1239 FieldFormat::Text,
1240 ),
1241 FieldInfo::new(
1242 "string_list".into(),
1243 None,
1244 None,
1245 Type::VARCHAR_ARRAY,
1246 FieldFormat::Text,
1247 ),
1248 FieldInfo::new(
1249 "timestamp_list".into(),
1250 None,
1251 None,
1252 Type::TIMESTAMP_ARRAY,
1253 FieldFormat::Text,
1254 ),
1255 ];
1256
1257 let datatypes = vec![
1258 ConcreteDataType::null_datatype(),
1259 ConcreteDataType::boolean_datatype(),
1260 ConcreteDataType::uint8_datatype(),
1261 ConcreteDataType::uint16_datatype(),
1262 ConcreteDataType::uint32_datatype(),
1263 ConcreteDataType::uint64_datatype(),
1264 ConcreteDataType::int8_datatype(),
1265 ConcreteDataType::int8_datatype(),
1266 ConcreteDataType::int16_datatype(),
1267 ConcreteDataType::int16_datatype(),
1268 ConcreteDataType::int32_datatype(),
1269 ConcreteDataType::int32_datatype(),
1270 ConcreteDataType::int64_datatype(),
1271 ConcreteDataType::int64_datatype(),
1272 ConcreteDataType::float32_datatype(),
1273 ConcreteDataType::float32_datatype(),
1274 ConcreteDataType::float32_datatype(),
1275 ConcreteDataType::float64_datatype(),
1276 ConcreteDataType::float64_datatype(),
1277 ConcreteDataType::float64_datatype(),
1278 ConcreteDataType::string_datatype(),
1279 ConcreteDataType::binary_datatype(),
1280 ConcreteDataType::date_datatype(),
1281 ConcreteDataType::time_datatype(TimeUnit::Second),
1282 ConcreteDataType::timestamp_datatype(TimeUnit::Second),
1283 ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
1284 ConcreteDataType::interval_datatype(IntervalUnit::DayTime),
1285 ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
1286 ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()),
1287 ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
1288 ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
1289 ConcreteDataType::list_datatype(ConcreteDataType::timestamp_second_datatype()),
1290 ];
1291 let values = vec![
1292 Value::Null,
1293 Value::Boolean(true),
1294 Value::UInt8(u8::MAX),
1295 Value::UInt16(u16::MAX),
1296 Value::UInt32(u32::MAX),
1297 Value::UInt64(u64::MAX),
1298 Value::Int8(i8::MAX),
1299 Value::Int8(i8::MIN),
1300 Value::Int16(i16::MAX),
1301 Value::Int16(i16::MIN),
1302 Value::Int32(i32::MAX),
1303 Value::Int32(i32::MIN),
1304 Value::Int64(i64::MAX),
1305 Value::Int64(i64::MIN),
1306 Value::Float32(f32::MAX.into()),
1307 Value::Float32(f32::MIN.into()),
1308 Value::Float32(0f32.into()),
1309 Value::Float64(f64::MAX.into()),
1310 Value::Float64(f64::MIN.into()),
1311 Value::Float64(0f64.into()),
1312 Value::String("greptime".into()),
1313 Value::Binary("greptime".as_bytes().into()),
1314 Value::Date(1001i32.into()),
1315 Value::Time(1001i64.into()),
1316 Value::Timestamp(1000001i64.into()),
1317 Value::IntervalYearMonth(IntervalYearMonth::new(1)),
1318 Value::IntervalDayTime(IntervalDayTime::new(1, 10)),
1319 Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 10)),
1320 Value::List(ListValue::new(
1321 vec![Value::Int64(1i64)],
1322 ConcreteDataType::int64_datatype(),
1323 )),
1324 Value::List(ListValue::new(
1325 vec![Value::Float64(1.0f64.into())],
1326 ConcreteDataType::float64_datatype(),
1327 )),
1328 Value::List(ListValue::new(
1329 vec![Value::String("tom".into())],
1330 ConcreteDataType::string_datatype(),
1331 )),
1332 Value::List(ListValue::new(
1333 vec![Value::Timestamp(Timestamp::new(1i64, TimeUnit::Second))],
1334 ConcreteDataType::timestamp_second_datatype(),
1335 )),
1336 ];
1337 let query_context = QueryContextBuilder::default()
1338 .configuration_parameter(Default::default())
1339 .build()
1340 .into();
1341 let mut builder = DataRowEncoder::new(Arc::new(schema));
1342 for (value, datatype) in values.iter().zip(datatypes) {
1343 encode_value(&query_context, value, &mut builder, &datatype).unwrap();
1344 }
1345 }
1346
1347 #[test]
1348 fn test_invalid_parameter() {
1349 let msg = "invalid_parameter_count";
1351 let error = invalid_parameter_error(msg, None);
1352 if let PgWireError::UserError(value) = error {
1353 assert_eq!("ERROR", value.severity);
1354 assert_eq!("22023", value.code);
1355 assert_eq!(msg, value.message);
1356 } else {
1357 panic!("test_invalid_parameter failed");
1358 }
1359 }
1360}