1mod bytea;
16mod datetime;
17mod error;
18mod interval;
19
20use std::collections::HashMap;
21use std::ops::Deref;
22
23use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
24use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datafusion_expr::LogicalPlan;
27use datatypes::arrow::datatypes::DataType as ArrowDataType;
28use datatypes::prelude::{ConcreteDataType, Value};
29use datatypes::schema::Schema;
30use datatypes::types::{json_type_value_to_string, IntervalType, TimestampType};
31use datatypes::value::ListValue;
32use pgwire::api::portal::{Format, Portal};
33use pgwire::api::results::{DataRowEncoder, FieldInfo};
34use pgwire::api::Type;
35use pgwire::error::{PgWireError, PgWireResult};
36use session::context::QueryContextRef;
37use session::session_config::PGByteaOutputValue;
38use snafu::ResultExt;
39
40use self::bytea::{EscapeOutputBytea, HexOutputBytea};
41use self::datetime::{StylingDate, StylingDateTime};
42pub use self::error::{PgErrorCode, PgErrorSeverity};
43use self::interval::PgInterval;
44use crate::error::{self as server_error, DataFusionSnafu, Error, Result};
45use crate::postgres::utils::convert_err;
46use crate::SqlPlan;
47
48pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result<Vec<FieldInfo>> {
49 origin
50 .column_schemas()
51 .iter()
52 .enumerate()
53 .map(|(idx, col)| {
54 Ok(FieldInfo::new(
55 col.name.clone(),
56 None,
57 None,
58 type_gt_to_pg(&col.data_type)?,
59 field_formats.format_for(idx),
60 ))
61 })
62 .collect::<Result<Vec<FieldInfo>>>()
63}
64
65fn encode_array(
66 query_ctx: &QueryContextRef,
67 value_list: &ListValue,
68 builder: &mut DataRowEncoder,
69) -> PgWireResult<()> {
70 match value_list.datatype() {
71 &ConcreteDataType::Boolean(_) => {
72 let array = value_list
73 .items()
74 .iter()
75 .map(|v| match v {
76 Value::Null => Ok(None),
77 Value::Boolean(v) => Ok(Some(*v)),
78 _ => Err(convert_err(Error::Internal {
79 err_msg: format!("Invalid list item type, find {v:?}, expected bool",),
80 })),
81 })
82 .collect::<PgWireResult<Vec<Option<bool>>>>()?;
83 builder.encode_field(&array)
84 }
85 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => {
86 let array = value_list
87 .items()
88 .iter()
89 .map(|v| match v {
90 Value::Null => Ok(None),
91 Value::Int8(v) => Ok(Some(*v)),
92 Value::UInt8(v) => Ok(Some(*v as i8)),
93 _ => Err(convert_err(Error::Internal {
94 err_msg: format!(
95 "Invalid list item type, find {v:?}, expected int8 or uint8",
96 ),
97 })),
98 })
99 .collect::<PgWireResult<Vec<Option<i8>>>>()?;
100 builder.encode_field(&array)
101 }
102 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => {
103 let array = value_list
104 .items()
105 .iter()
106 .map(|v| match v {
107 Value::Null => Ok(None),
108 Value::Int16(v) => Ok(Some(*v)),
109 Value::UInt16(v) => Ok(Some(*v as i16)),
110 _ => Err(convert_err(Error::Internal {
111 err_msg: format!(
112 "Invalid list item type, find {v:?}, expected int16 or uint16",
113 ),
114 })),
115 })
116 .collect::<PgWireResult<Vec<Option<i16>>>>()?;
117 builder.encode_field(&array)
118 }
119 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => {
120 let array = value_list
121 .items()
122 .iter()
123 .map(|v| match v {
124 Value::Null => Ok(None),
125 Value::Int32(v) => Ok(Some(*v)),
126 Value::UInt32(v) => Ok(Some(*v as i32)),
127 _ => Err(convert_err(Error::Internal {
128 err_msg: format!(
129 "Invalid list item type, find {v:?}, expected int32 or uint32",
130 ),
131 })),
132 })
133 .collect::<PgWireResult<Vec<Option<i32>>>>()?;
134 builder.encode_field(&array)
135 }
136 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => {
137 let array = value_list
138 .items()
139 .iter()
140 .map(|v| match v {
141 Value::Null => Ok(None),
142 Value::Int64(v) => Ok(Some(*v)),
143 Value::UInt64(v) => Ok(Some(*v as i64)),
144 _ => Err(convert_err(Error::Internal {
145 err_msg: format!(
146 "Invalid list item type, find {v:?}, expected int64 or uint64",
147 ),
148 })),
149 })
150 .collect::<PgWireResult<Vec<Option<i64>>>>()?;
151 builder.encode_field(&array)
152 }
153 &ConcreteDataType::Float32(_) => {
154 let array = value_list
155 .items()
156 .iter()
157 .map(|v| match v {
158 Value::Null => Ok(None),
159 Value::Float32(v) => Ok(Some(v.0)),
160 _ => Err(convert_err(Error::Internal {
161 err_msg: format!("Invalid list item type, find {v:?}, expected float32",),
162 })),
163 })
164 .collect::<PgWireResult<Vec<Option<f32>>>>()?;
165 builder.encode_field(&array)
166 }
167 &ConcreteDataType::Float64(_) => {
168 let array = value_list
169 .items()
170 .iter()
171 .map(|v| match v {
172 Value::Null => Ok(None),
173 Value::Float64(v) => Ok(Some(v.0)),
174 _ => Err(convert_err(Error::Internal {
175 err_msg: format!("Invalid list item type, find {v:?}, expected float64",),
176 })),
177 })
178 .collect::<PgWireResult<Vec<Option<f64>>>>()?;
179 builder.encode_field(&array)
180 }
181 &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => {
182 let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
183
184 match *bytea_output {
185 PGByteaOutputValue::ESCAPE => {
186 let array = value_list
187 .items()
188 .iter()
189 .map(|v| match v {
190 Value::Null => Ok(None),
191 Value::Binary(v) => Ok(Some(EscapeOutputBytea(v.deref()))),
192
193 _ => Err(convert_err(Error::Internal {
194 err_msg: format!(
195 "Invalid list item type, find {v:?}, expected binary",
196 ),
197 })),
198 })
199 .collect::<PgWireResult<Vec<Option<EscapeOutputBytea>>>>()?;
200 builder.encode_field(&array)
201 }
202 PGByteaOutputValue::HEX => {
203 let array = value_list
204 .items()
205 .iter()
206 .map(|v| match v {
207 Value::Null => Ok(None),
208 Value::Binary(v) => Ok(Some(HexOutputBytea(v.deref()))),
209
210 _ => Err(convert_err(Error::Internal {
211 err_msg: format!(
212 "Invalid list item type, find {v:?}, expected binary",
213 ),
214 })),
215 })
216 .collect::<PgWireResult<Vec<Option<HexOutputBytea>>>>()?;
217 builder.encode_field(&array)
218 }
219 }
220 }
221 &ConcreteDataType::String(_) => {
222 let array = value_list
223 .items()
224 .iter()
225 .map(|v| match v {
226 Value::Null => Ok(None),
227 Value::String(v) => Ok(Some(v.as_utf8())),
228 _ => Err(convert_err(Error::Internal {
229 err_msg: format!("Invalid list item type, find {v:?}, expected string",),
230 })),
231 })
232 .collect::<PgWireResult<Vec<Option<&str>>>>()?;
233 builder.encode_field(&array)
234 }
235 &ConcreteDataType::Date(_) => {
236 let array = value_list
237 .items()
238 .iter()
239 .map(|v| match v {
240 Value::Null => Ok(None),
241 Value::Date(v) => {
242 if let Some(date) = v.to_chrono_date() {
243 let (style, order) =
244 *query_ctx.configuration_parameter().pg_datetime_style();
245 Ok(Some(StylingDate(date, style, order)))
246 } else {
247 Err(convert_err(Error::Internal {
248 err_msg: format!("Failed to convert date to postgres type {v:?}",),
249 }))
250 }
251 }
252 _ => Err(convert_err(Error::Internal {
253 err_msg: format!("Invalid list item type, find {v:?}, expected date",),
254 })),
255 })
256 .collect::<PgWireResult<Vec<Option<StylingDate>>>>()?;
257 builder.encode_field(&array)
258 }
259 &ConcreteDataType::Timestamp(_) => {
260 let array = value_list
261 .items()
262 .iter()
263 .map(|v| match v {
264 Value::Null => Ok(None),
265 Value::Timestamp(v) => {
266 if let Some(datetime) =
267 v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
268 {
269 let (style, order) =
270 *query_ctx.configuration_parameter().pg_datetime_style();
271 Ok(Some(StylingDateTime(datetime, style, order)))
272 } else {
273 Err(convert_err(Error::Internal {
274 err_msg: format!("Failed to convert date to postgres type {v:?}",),
275 }))
276 }
277 }
278 _ => Err(convert_err(Error::Internal {
279 err_msg: format!("Invalid list item type, find {v:?}, expected timestamp",),
280 })),
281 })
282 .collect::<PgWireResult<Vec<Option<StylingDateTime>>>>()?;
283 builder.encode_field(&array)
284 }
285 &ConcreteDataType::Time(_) => {
286 let array = value_list
287 .items()
288 .iter()
289 .map(|v| match v {
290 Value::Null => Ok(None),
291 Value::Time(v) => Ok(v.to_chrono_time()),
292 _ => Err(convert_err(Error::Internal {
293 err_msg: format!("Invalid list item type, find {v:?}, expected time",),
294 })),
295 })
296 .collect::<PgWireResult<Vec<Option<NaiveTime>>>>()?;
297 builder.encode_field(&array)
298 }
299 &ConcreteDataType::Interval(_) => {
300 let array = value_list
301 .items()
302 .iter()
303 .map(|v| match v {
304 Value::Null => Ok(None),
305 Value::IntervalYearMonth(v) => Ok(Some(PgInterval::from(*v))),
306 Value::IntervalDayTime(v) => Ok(Some(PgInterval::from(*v))),
307 Value::IntervalMonthDayNano(v) => Ok(Some(PgInterval::from(*v))),
308 _ => Err(convert_err(Error::Internal {
309 err_msg: format!("Invalid list item type, find {v:?}, expected interval",),
310 })),
311 })
312 .collect::<PgWireResult<Vec<Option<PgInterval>>>>()?;
313 builder.encode_field(&array)
314 }
315 &ConcreteDataType::Decimal128(_) => {
316 let array = value_list
317 .items()
318 .iter()
319 .map(|v| match v {
320 Value::Null => Ok(None),
321 Value::Decimal128(v) => Ok(Some(v.to_string())),
322 _ => Err(convert_err(Error::Internal {
323 err_msg: format!("Invalid list item type, find {v:?}, expected decimal",),
324 })),
325 })
326 .collect::<PgWireResult<Vec<Option<String>>>>()?;
327 builder.encode_field(&array)
328 }
329 &ConcreteDataType::Json(j) => {
330 let array = value_list
331 .items()
332 .iter()
333 .map(|v| match v {
334 Value::Null => Ok(None),
335 Value::Binary(v) => {
336 let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
337 Ok(Some(s))
338 }
339 _ => Err(convert_err(Error::Internal {
340 err_msg: format!("Invalid list item type, find {v:?}, expected json",),
341 })),
342 })
343 .collect::<PgWireResult<Vec<Option<String>>>>()?;
344 builder.encode_field(&array)
345 }
346 _ => Err(convert_err(Error::Internal {
347 err_msg: format!(
348 "cannot write array type {:?} in postgres protocol: unimplemented",
349 value_list.datatype()
350 ),
351 })),
352 }
353}
354
355pub(super) fn encode_value(
356 query_ctx: &QueryContextRef,
357 value: &Value,
358 builder: &mut DataRowEncoder,
359 datatype: &ConcreteDataType,
360) -> PgWireResult<()> {
361 match value {
362 Value::Null => builder.encode_field(&None::<&i8>),
363 Value::Boolean(v) => builder.encode_field(v),
364 Value::UInt8(v) => builder.encode_field(&(*v as i8)),
365 Value::UInt16(v) => builder.encode_field(&(*v as i16)),
366 Value::UInt32(v) => builder.encode_field(v),
367 Value::UInt64(v) => builder.encode_field(&(*v as i64)),
368 Value::Int8(v) => builder.encode_field(v),
369 Value::Int16(v) => builder.encode_field(v),
370 Value::Int32(v) => builder.encode_field(v),
371 Value::Int64(v) => builder.encode_field(v),
372 Value::Float32(v) => builder.encode_field(&v.0),
373 Value::Float64(v) => builder.encode_field(&v.0),
374 Value::String(v) => builder.encode_field(&v.as_utf8()),
375 Value::Binary(v) => match datatype {
376 ConcreteDataType::Json(j) => {
377 let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
378 builder.encode_field(&s)
379 }
380 _ => {
381 let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
382 match *bytea_output {
383 PGByteaOutputValue::ESCAPE => {
384 builder.encode_field(&EscapeOutputBytea(v.deref()))
385 }
386 PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
387 }
388 }
389 },
390 Value::Date(v) => {
391 if let Some(date) = v.to_chrono_date() {
392 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
393 builder.encode_field(&StylingDate(date, style, order))
394 } else {
395 Err(convert_err(Error::Internal {
396 err_msg: format!("Failed to convert date to postgres type {v:?}",),
397 }))
398 }
399 }
400 Value::Timestamp(v) => {
401 if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
402 {
403 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
404 builder.encode_field(&StylingDateTime(datetime, style, order))
405 } else {
406 Err(convert_err(Error::Internal {
407 err_msg: format!("Failed to convert date to postgres type {v:?}",),
408 }))
409 }
410 }
411 Value::Time(v) => {
412 if let Some(time) = v.to_chrono_time() {
413 builder.encode_field(&time)
414 } else {
415 Err(convert_err(Error::Internal {
416 err_msg: format!("Failed to convert time to postgres type {v:?}",),
417 }))
418 }
419 }
420 Value::IntervalYearMonth(v) => builder.encode_field(&PgInterval::from(*v)),
421 Value::IntervalDayTime(v) => builder.encode_field(&PgInterval::from(*v)),
422 Value::IntervalMonthDayNano(v) => builder.encode_field(&PgInterval::from(*v)),
423 Value::Decimal128(v) => builder.encode_field(&v.to_string()),
424 Value::Duration(d) => match PgInterval::try_from(*d) {
425 Ok(i) => builder.encode_field(&i),
426 Err(e) => Err(convert_err(Error::Internal {
427 err_msg: e.to_string(),
428 })),
429 },
430 Value::List(values) => encode_array(query_ctx, values, builder),
431 }
432}
433
434pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
435 match origin {
436 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
437 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
438 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR),
439 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2),
440 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4),
441 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8),
442 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
443 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
444 &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
445 &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
446 &ConcreteDataType::Date(_) => Ok(Type::DATE),
447 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
448 &ConcreteDataType::Time(_) => Ok(Type::TIME),
449 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
450 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
451 &ConcreteDataType::Json(_) => Ok(Type::JSON),
452 ConcreteDataType::List(list) => match list.item_type() {
453 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
454 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
455 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR_ARRAY),
456 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2_ARRAY),
457 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4_ARRAY),
458 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8_ARRAY),
459 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
460 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
461 &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
462 &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
463 &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
464 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
465 &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
466 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
467 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
468 &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
469 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
470 &ConcreteDataType::Dictionary(_)
471 | &ConcreteDataType::Vector(_)
472 | &ConcreteDataType::List(_)
473 | &ConcreteDataType::Struct(_) => server_error::UnsupportedDataTypeSnafu {
474 data_type: origin,
475 reason: "not implemented",
476 }
477 .fail(),
478 },
479 &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
480 data_type: origin,
481 reason: "not implemented",
482 }
483 .fail(),
484 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
485 &ConcreteDataType::Struct(_) => server_error::UnsupportedDataTypeSnafu {
486 data_type: origin,
487 reason: "not implemented",
488 }
489 .fail(),
490 }
491}
492
493#[allow(dead_code)]
494pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
495 match origin {
497 &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
498 &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
499 &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
500 &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
501 &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
502 &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
503 &Type::TIMESTAMP => Ok(ConcreteDataType::timestamp_datatype(
504 common_time::timestamp::TimeUnit::Millisecond,
505 )),
506 &Type::DATE => Ok(ConcreteDataType::date_datatype()),
507 &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
508 common_time::timestamp::TimeUnit::Microsecond,
509 )),
510 &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
511 ConcreteDataType::int8_datatype(),
512 )),
513 &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(
514 ConcreteDataType::int16_datatype(),
515 )),
516 &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(
517 ConcreteDataType::int32_datatype(),
518 )),
519 &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(
520 ConcreteDataType::int64_datatype(),
521 )),
522 &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
523 ConcreteDataType::string_datatype(),
524 )),
525 _ => server_error::InternalSnafu {
526 err_msg: format!("unimplemented datatype {origin:?}"),
527 }
528 .fail(),
529 }
530}
531
532pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
533 let param_type = portal.statement.parameter_types.get(idx).unwrap();
536 match param_type {
537 &Type::VARCHAR | &Type::TEXT => Ok(format!(
538 "'{}'",
539 portal
540 .parameter::<String>(idx, param_type)?
541 .as_deref()
542 .unwrap_or("")
543 )),
544 &Type::BOOL => Ok(portal
545 .parameter::<bool>(idx, param_type)?
546 .map(|v| v.to_string())
547 .unwrap_or_else(|| "".to_owned())),
548 &Type::INT4 => Ok(portal
549 .parameter::<i32>(idx, param_type)?
550 .map(|v| v.to_string())
551 .unwrap_or_else(|| "".to_owned())),
552 &Type::INT8 => Ok(portal
553 .parameter::<i64>(idx, param_type)?
554 .map(|v| v.to_string())
555 .unwrap_or_else(|| "".to_owned())),
556 &Type::FLOAT4 => Ok(portal
557 .parameter::<f32>(idx, param_type)?
558 .map(|v| v.to_string())
559 .unwrap_or_else(|| "".to_owned())),
560 &Type::FLOAT8 => Ok(portal
561 .parameter::<f64>(idx, param_type)?
562 .map(|v| v.to_string())
563 .unwrap_or_else(|| "".to_owned())),
564 &Type::DATE => Ok(portal
565 .parameter::<NaiveDate>(idx, param_type)?
566 .map(|v| v.format("%Y-%m-%d").to_string())
567 .unwrap_or_else(|| "".to_owned())),
568 &Type::TIMESTAMP => Ok(portal
569 .parameter::<NaiveDateTime>(idx, param_type)?
570 .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
571 .unwrap_or_else(|| "".to_owned())),
572 &Type::INTERVAL => Ok(portal
573 .parameter::<PgInterval>(idx, param_type)?
574 .map(|v| v.to_string())
575 .unwrap_or_else(|| "".to_owned())),
576 _ => Err(invalid_parameter_error(
577 "unsupported_parameter_type",
578 Some(param_type.to_string()),
579 )),
580 }
581}
582
583pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
584 let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
585 error_info.detail = detail;
586 PgWireError::UserError(Box::new(error_info))
587}
588
589fn to_timestamp_scalar_value<T>(
590 data: Option<T>,
591 unit: &TimestampType,
592 ctype: &ConcreteDataType,
593) -> PgWireResult<ScalarValue>
594where
595 T: Into<i64>,
596{
597 if let Some(n) = data {
598 Value::Timestamp(unit.create_timestamp(n.into()))
599 .try_to_scalar_value(ctype)
600 .map_err(convert_err)
601 } else {
602 Ok(ScalarValue::Null)
603 }
604}
605
606pub(super) fn parameters_to_scalar_values(
607 plan: &LogicalPlan,
608 portal: &Portal<SqlPlan>,
609) -> PgWireResult<Vec<ScalarValue>> {
610 let param_count = portal.parameter_len();
611 let mut results = Vec::with_capacity(param_count);
612
613 let client_param_types = &portal.statement.parameter_types;
614 let param_types = plan
615 .get_parameter_types()
616 .context(DataFusionSnafu)
617 .map_err(convert_err)?
618 .into_iter()
619 .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
620 .collect::<HashMap<_, _>>();
621
622 for idx in 0..param_count {
623 let server_type = param_types
624 .get(&format!("${}", idx + 1))
625 .and_then(|t| t.as_ref());
626
627 let client_type = if let Some(client_given_type) = client_param_types.get(idx) {
628 client_given_type.clone()
629 } else if let Some(server_provided_type) = &server_type {
630 type_gt_to_pg(server_provided_type).map_err(convert_err)?
631 } else {
632 return Err(invalid_parameter_error(
633 "unknown_parameter_type",
634 Some(format!(
635 "Cannot get parameter type information for parameter {}",
636 idx
637 )),
638 ));
639 };
640
641 let value = match &client_type {
642 &Type::VARCHAR | &Type::TEXT => {
643 let data = portal.parameter::<String>(idx, &client_type)?;
644 if let Some(server_type) = &server_type {
645 match server_type {
646 ConcreteDataType::String(_) => ScalarValue::Utf8(data),
647 _ => {
648 return Err(invalid_parameter_error(
649 "invalid_parameter_type",
650 Some(format!("Expected: {}, found: {}", server_type, client_type)),
651 ))
652 }
653 }
654 } else {
655 ScalarValue::Utf8(data)
656 }
657 }
658 &Type::BOOL => {
659 let data = portal.parameter::<bool>(idx, &client_type)?;
660 if let Some(server_type) = &server_type {
661 match server_type {
662 ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
663 _ => {
664 return Err(invalid_parameter_error(
665 "invalid_parameter_type",
666 Some(format!("Expected: {}, found: {}", server_type, client_type)),
667 ))
668 }
669 }
670 } else {
671 ScalarValue::Boolean(data)
672 }
673 }
674 &Type::INT2 => {
675 let data = portal.parameter::<i16>(idx, &client_type)?;
676 if let Some(server_type) = &server_type {
677 match server_type {
678 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
679 ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
680 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
681 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
682 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
683 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
684 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
685 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
686 ConcreteDataType::Timestamp(unit) => {
687 to_timestamp_scalar_value(data, unit, server_type)?
688 }
689 _ => {
690 return Err(invalid_parameter_error(
691 "invalid_parameter_type",
692 Some(format!("Expected: {}, found: {}", server_type, client_type)),
693 ))
694 }
695 }
696 } else {
697 ScalarValue::Int16(data)
698 }
699 }
700 &Type::INT4 => {
701 let data = portal.parameter::<i32>(idx, &client_type)?;
702 if let Some(server_type) = &server_type {
703 match server_type {
704 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
705 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
706 ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
707 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
708 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
709 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
710 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
711 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
712 ConcreteDataType::Timestamp(unit) => {
713 to_timestamp_scalar_value(data, unit, server_type)?
714 }
715 _ => {
716 return Err(invalid_parameter_error(
717 "invalid_parameter_type",
718 Some(format!("Expected: {}, found: {}", server_type, client_type)),
719 ))
720 }
721 }
722 } else {
723 ScalarValue::Int32(data)
724 }
725 }
726 &Type::INT8 => {
727 let data = portal.parameter::<i64>(idx, &client_type)?;
728 if let Some(server_type) = &server_type {
729 match server_type {
730 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
731 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
732 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
733 ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
734 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
735 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
736 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
737 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
738 ConcreteDataType::Timestamp(unit) => {
739 to_timestamp_scalar_value(data, unit, server_type)?
740 }
741 _ => {
742 return Err(invalid_parameter_error(
743 "invalid_parameter_type",
744 Some(format!("Expected: {}, found: {}", server_type, client_type)),
745 ))
746 }
747 }
748 } else {
749 ScalarValue::Int64(data)
750 }
751 }
752 &Type::FLOAT4 => {
753 let data = portal.parameter::<f32>(idx, &client_type)?;
754 if let Some(server_type) = &server_type {
755 match server_type {
756 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
757 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
758 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
759 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
760 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
761 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
762 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
763 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
764 ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
765 ConcreteDataType::Float64(_) => {
766 ScalarValue::Float64(data.map(|n| n as f64))
767 }
768 _ => {
769 return Err(invalid_parameter_error(
770 "invalid_parameter_type",
771 Some(format!("Expected: {}, found: {}", server_type, client_type)),
772 ))
773 }
774 }
775 } else {
776 ScalarValue::Float32(data)
777 }
778 }
779 &Type::FLOAT8 => {
780 let data = portal.parameter::<f64>(idx, &client_type)?;
781 if let Some(server_type) = &server_type {
782 match server_type {
783 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
784 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
785 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
786 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
787 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
788 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
789 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
790 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
791 ConcreteDataType::Float32(_) => {
792 ScalarValue::Float32(data.map(|n| n as f32))
793 }
794 ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
795 _ => {
796 return Err(invalid_parameter_error(
797 "invalid_parameter_type",
798 Some(format!("Expected: {}, found: {}", server_type, client_type)),
799 ))
800 }
801 }
802 } else {
803 ScalarValue::Float64(data)
804 }
805 }
806 &Type::TIMESTAMP => {
807 let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
808 if let Some(server_type) = &server_type {
809 match server_type {
810 ConcreteDataType::Timestamp(unit) => match *unit {
811 TimestampType::Second(_) => ScalarValue::TimestampSecond(
812 data.map(|ts| ts.and_utc().timestamp()),
813 None,
814 ),
815 TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
816 data.map(|ts| ts.and_utc().timestamp_millis()),
817 None,
818 ),
819 TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
820 data.map(|ts| ts.and_utc().timestamp_micros()),
821 None,
822 ),
823 TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
824 data.map(|ts| ts.and_utc().timestamp_micros()),
825 None,
826 ),
827 },
828 _ => {
829 return Err(invalid_parameter_error(
830 "invalid_parameter_type",
831 Some(format!("Expected: {}, found: {}", server_type, client_type)),
832 ))
833 }
834 }
835 } else {
836 ScalarValue::TimestampMillisecond(
837 data.map(|ts| ts.and_utc().timestamp_millis()),
838 None,
839 )
840 }
841 }
842 &Type::DATE => {
843 let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
844 if let Some(server_type) = &server_type {
845 match server_type {
846 ConcreteDataType::Date(_) => ScalarValue::Date32(
847 data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
848 ),
849 _ => {
850 return Err(invalid_parameter_error(
851 "invalid_parameter_type",
852 Some(format!("Expected: {}, found: {}", server_type, client_type)),
853 ));
854 }
855 }
856 } else {
857 ScalarValue::Date32(
858 data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
859 )
860 }
861 }
862 &Type::INTERVAL => {
863 let data = portal.parameter::<PgInterval>(idx, &client_type)?;
864 if let Some(server_type) = &server_type {
865 match server_type {
866 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
867 ScalarValue::IntervalYearMonth(
868 data.map(|i| {
869 if i.days != 0 || i.microseconds != 0 {
870 Err(invalid_parameter_error(
871 "invalid_parameter_type",
872 Some(format!(
873 "Expected: {}, found: {}",
874 server_type, client_type
875 )),
876 ))
877 } else {
878 Ok(IntervalYearMonth::new(i.months).to_i32())
879 }
880 })
881 .transpose()?,
882 )
883 }
884 ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
885 ScalarValue::IntervalDayTime(
886 data.map(|i| {
887 if i.months != 0 || i.microseconds % 1000 != 0 {
888 Err(invalid_parameter_error(
889 "invalid_parameter_type",
890 Some(format!(
891 "Expected: {}, found: {}",
892 server_type, client_type
893 )),
894 ))
895 } else {
896 Ok(IntervalDayTime::new(
897 i.days,
898 (i.microseconds / 1000) as i32,
899 )
900 .into())
901 }
902 })
903 .transpose()?,
904 )
905 }
906 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
907 ScalarValue::IntervalMonthDayNano(
908 data.map(|i| IntervalMonthDayNano::from(i).into()),
909 )
910 }
911 _ => {
912 return Err(invalid_parameter_error(
913 "invalid_parameter_type",
914 Some(format!("Expected: {}, found: {}", server_type, client_type)),
915 ));
916 }
917 }
918 } else {
919 ScalarValue::IntervalMonthDayNano(
920 data.map(|i| IntervalMonthDayNano::from(i).into()),
921 )
922 }
923 }
924 &Type::BYTEA => {
925 let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
926 if let Some(server_type) = &server_type {
927 match server_type {
928 ConcreteDataType::String(_) => {
929 ScalarValue::Utf8(data.map(|d| String::from_utf8_lossy(&d).to_string()))
930 }
931 ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
932 _ => {
933 return Err(invalid_parameter_error(
934 "invalid_parameter_type",
935 Some(format!("Expected: {}, found: {}", server_type, client_type)),
936 ));
937 }
938 }
939 } else {
940 ScalarValue::Binary(data)
941 }
942 }
943 &Type::JSONB => {
944 let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
945 if let Some(server_type) = &server_type {
946 match server_type {
947 ConcreteDataType::Binary(_) => {
948 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
949 }
950 _ => {
951 return Err(invalid_parameter_error(
952 "invalid_parameter_type",
953 Some(format!("Expected: {}, found: {}", server_type, client_type)),
954 ));
955 }
956 }
957 } else {
958 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
959 }
960 }
961 &Type::INT2_ARRAY => {
962 let data = portal.parameter::<Vec<i16>>(idx, &client_type)?;
963 if let Some(data) = data {
964 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
965 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
966 } else {
967 ScalarValue::Null
968 }
969 }
970 &Type::INT4_ARRAY => {
971 let data = portal.parameter::<Vec<i32>>(idx, &client_type)?;
972 if let Some(data) = data {
973 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
974 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
975 } else {
976 ScalarValue::Null
977 }
978 }
979 &Type::INT8_ARRAY => {
980 let data = portal.parameter::<Vec<i64>>(idx, &client_type)?;
981 if let Some(data) = data {
982 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
983 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
984 } else {
985 ScalarValue::Null
986 }
987 }
988 &Type::VARCHAR_ARRAY => {
989 let data = portal.parameter::<Vec<String>>(idx, &client_type)?;
990 if let Some(data) = data {
991 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
992 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
993 } else {
994 ScalarValue::Null
995 }
996 }
997 _ => Err(invalid_parameter_error(
998 "unsupported_parameter_value",
999 Some(format!("Found type: {}", client_type)),
1000 ))?,
1001 };
1002
1003 results.push(value);
1004 }
1005
1006 Ok(results)
1007}
1008
1009pub(super) fn param_types_to_pg_types(
1010 param_types: &HashMap<String, Option<ConcreteDataType>>,
1011) -> Result<Vec<Type>> {
1012 let param_count = param_types.len();
1013 let mut types = Vec::with_capacity(param_count);
1014 for i in 0..param_count {
1015 if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1016 let pg_type = type_gt_to_pg(param_type)?;
1017 types.push(pg_type);
1018 } else {
1019 types.push(Type::UNKNOWN);
1020 }
1021 }
1022 Ok(types)
1023}
1024
1025#[cfg(test)]
1026mod test {
1027 use std::sync::Arc;
1028
1029 use common_time::interval::IntervalUnit;
1030 use common_time::timestamp::TimeUnit;
1031 use common_time::Timestamp;
1032 use datatypes::schema::{ColumnSchema, Schema};
1033 use datatypes::value::ListValue;
1034 use pgwire::api::results::{FieldFormat, FieldInfo};
1035 use pgwire::api::Type;
1036 use session::context::QueryContextBuilder;
1037
1038 use super::*;
1039
1040 #[test]
1041 fn test_schema_convert() {
1042 let column_schemas = vec![
1043 ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1044 ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1045 ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1046 ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1047 ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1048 ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1049 ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1050 ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1051 ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1052 ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1053 ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1054 ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1055 ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1056 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1057 ColumnSchema::new(
1058 "timestamps",
1059 ConcreteDataType::timestamp_millisecond_datatype(),
1060 true,
1061 ),
1062 ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1063 ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1064 ColumnSchema::new(
1065 "intervals",
1066 ConcreteDataType::interval_month_day_nano_datatype(),
1067 true,
1068 ),
1069 ];
1070 let pg_field_info = vec![
1071 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1072 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1073 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1074 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1075 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1076 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1077 FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1078 FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1079 FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1080 FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1081 FieldInfo::new(
1082 "float32s".into(),
1083 None,
1084 None,
1085 Type::FLOAT4,
1086 FieldFormat::Text,
1087 ),
1088 FieldInfo::new(
1089 "float64s".into(),
1090 None,
1091 None,
1092 Type::FLOAT8,
1093 FieldFormat::Text,
1094 ),
1095 FieldInfo::new(
1096 "binaries".into(),
1097 None,
1098 None,
1099 Type::BYTEA,
1100 FieldFormat::Text,
1101 ),
1102 FieldInfo::new(
1103 "strings".into(),
1104 None,
1105 None,
1106 Type::VARCHAR,
1107 FieldFormat::Text,
1108 ),
1109 FieldInfo::new(
1110 "timestamps".into(),
1111 None,
1112 None,
1113 Type::TIMESTAMP,
1114 FieldFormat::Text,
1115 ),
1116 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1117 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1118 FieldInfo::new(
1119 "intervals".into(),
1120 None,
1121 None,
1122 Type::INTERVAL,
1123 FieldFormat::Text,
1124 ),
1125 ];
1126 let schema = Schema::new(column_schemas);
1127 let fs = schema_to_pg(&schema, &Format::UnifiedText).unwrap();
1128 assert_eq!(fs, pg_field_info);
1129 }
1130
1131 #[test]
1132 fn test_encode_text_format_data() {
1133 let schema = vec![
1134 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1135 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1136 FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1137 FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1138 FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1139 FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1140 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1141 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1142 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1143 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1144 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1145 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1146 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1147 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1148 FieldInfo::new(
1149 "float32s".into(),
1150 None,
1151 None,
1152 Type::FLOAT4,
1153 FieldFormat::Text,
1154 ),
1155 FieldInfo::new(
1156 "float32s".into(),
1157 None,
1158 None,
1159 Type::FLOAT4,
1160 FieldFormat::Text,
1161 ),
1162 FieldInfo::new(
1163 "float32s".into(),
1164 None,
1165 None,
1166 Type::FLOAT4,
1167 FieldFormat::Text,
1168 ),
1169 FieldInfo::new(
1170 "float64s".into(),
1171 None,
1172 None,
1173 Type::FLOAT8,
1174 FieldFormat::Text,
1175 ),
1176 FieldInfo::new(
1177 "float64s".into(),
1178 None,
1179 None,
1180 Type::FLOAT8,
1181 FieldFormat::Text,
1182 ),
1183 FieldInfo::new(
1184 "float64s".into(),
1185 None,
1186 None,
1187 Type::FLOAT8,
1188 FieldFormat::Text,
1189 ),
1190 FieldInfo::new(
1191 "strings".into(),
1192 None,
1193 None,
1194 Type::VARCHAR,
1195 FieldFormat::Text,
1196 ),
1197 FieldInfo::new(
1198 "binaries".into(),
1199 None,
1200 None,
1201 Type::BYTEA,
1202 FieldFormat::Text,
1203 ),
1204 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1205 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1206 FieldInfo::new(
1207 "timestamps".into(),
1208 None,
1209 None,
1210 Type::TIMESTAMP,
1211 FieldFormat::Text,
1212 ),
1213 FieldInfo::new(
1214 "interval_year_month".into(),
1215 None,
1216 None,
1217 Type::INTERVAL,
1218 FieldFormat::Text,
1219 ),
1220 FieldInfo::new(
1221 "interval_day_time".into(),
1222 None,
1223 None,
1224 Type::INTERVAL,
1225 FieldFormat::Text,
1226 ),
1227 FieldInfo::new(
1228 "interval_month_day_nano".into(),
1229 None,
1230 None,
1231 Type::INTERVAL,
1232 FieldFormat::Text,
1233 ),
1234 FieldInfo::new(
1235 "int_list".into(),
1236 None,
1237 None,
1238 Type::INT8_ARRAY,
1239 FieldFormat::Text,
1240 ),
1241 FieldInfo::new(
1242 "float_list".into(),
1243 None,
1244 None,
1245 Type::FLOAT8_ARRAY,
1246 FieldFormat::Text,
1247 ),
1248 FieldInfo::new(
1249 "string_list".into(),
1250 None,
1251 None,
1252 Type::VARCHAR_ARRAY,
1253 FieldFormat::Text,
1254 ),
1255 FieldInfo::new(
1256 "timestamp_list".into(),
1257 None,
1258 None,
1259 Type::TIMESTAMP_ARRAY,
1260 FieldFormat::Text,
1261 ),
1262 ];
1263
1264 let datatypes = vec![
1265 ConcreteDataType::null_datatype(),
1266 ConcreteDataType::boolean_datatype(),
1267 ConcreteDataType::uint8_datatype(),
1268 ConcreteDataType::uint16_datatype(),
1269 ConcreteDataType::uint32_datatype(),
1270 ConcreteDataType::uint64_datatype(),
1271 ConcreteDataType::int8_datatype(),
1272 ConcreteDataType::int8_datatype(),
1273 ConcreteDataType::int16_datatype(),
1274 ConcreteDataType::int16_datatype(),
1275 ConcreteDataType::int32_datatype(),
1276 ConcreteDataType::int32_datatype(),
1277 ConcreteDataType::int64_datatype(),
1278 ConcreteDataType::int64_datatype(),
1279 ConcreteDataType::float32_datatype(),
1280 ConcreteDataType::float32_datatype(),
1281 ConcreteDataType::float32_datatype(),
1282 ConcreteDataType::float64_datatype(),
1283 ConcreteDataType::float64_datatype(),
1284 ConcreteDataType::float64_datatype(),
1285 ConcreteDataType::string_datatype(),
1286 ConcreteDataType::binary_datatype(),
1287 ConcreteDataType::date_datatype(),
1288 ConcreteDataType::time_datatype(TimeUnit::Second),
1289 ConcreteDataType::timestamp_datatype(TimeUnit::Second),
1290 ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
1291 ConcreteDataType::interval_datatype(IntervalUnit::DayTime),
1292 ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
1293 ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()),
1294 ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
1295 ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
1296 ConcreteDataType::list_datatype(ConcreteDataType::timestamp_second_datatype()),
1297 ];
1298 let values = vec![
1299 Value::Null,
1300 Value::Boolean(true),
1301 Value::UInt8(u8::MAX),
1302 Value::UInt16(u16::MAX),
1303 Value::UInt32(u32::MAX),
1304 Value::UInt64(u64::MAX),
1305 Value::Int8(i8::MAX),
1306 Value::Int8(i8::MIN),
1307 Value::Int16(i16::MAX),
1308 Value::Int16(i16::MIN),
1309 Value::Int32(i32::MAX),
1310 Value::Int32(i32::MIN),
1311 Value::Int64(i64::MAX),
1312 Value::Int64(i64::MIN),
1313 Value::Float32(f32::MAX.into()),
1314 Value::Float32(f32::MIN.into()),
1315 Value::Float32(0f32.into()),
1316 Value::Float64(f64::MAX.into()),
1317 Value::Float64(f64::MIN.into()),
1318 Value::Float64(0f64.into()),
1319 Value::String("greptime".into()),
1320 Value::Binary("greptime".as_bytes().into()),
1321 Value::Date(1001i32.into()),
1322 Value::Time(1001i64.into()),
1323 Value::Timestamp(1000001i64.into()),
1324 Value::IntervalYearMonth(IntervalYearMonth::new(1)),
1325 Value::IntervalDayTime(IntervalDayTime::new(1, 10)),
1326 Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 10)),
1327 Value::List(ListValue::new(
1328 vec![Value::Int64(1i64)],
1329 ConcreteDataType::int64_datatype(),
1330 )),
1331 Value::List(ListValue::new(
1332 vec![Value::Float64(1.0f64.into())],
1333 ConcreteDataType::float64_datatype(),
1334 )),
1335 Value::List(ListValue::new(
1336 vec![Value::String("tom".into())],
1337 ConcreteDataType::string_datatype(),
1338 )),
1339 Value::List(ListValue::new(
1340 vec![Value::Timestamp(Timestamp::new(1i64, TimeUnit::Second))],
1341 ConcreteDataType::timestamp_second_datatype(),
1342 )),
1343 ];
1344 let query_context = QueryContextBuilder::default()
1345 .configuration_parameter(Default::default())
1346 .build()
1347 .into();
1348 let mut builder = DataRowEncoder::new(Arc::new(schema));
1349 for (value, datatype) in values.iter().zip(datatypes) {
1350 encode_value(&query_context, value, &mut builder, &datatype).unwrap();
1351 }
1352 }
1353
1354 #[test]
1355 fn test_invalid_parameter() {
1356 let msg = "invalid_parameter_count";
1358 let error = invalid_parameter_error(msg, None);
1359 if let PgWireError::UserError(value) = error {
1360 assert_eq!("ERROR", value.severity);
1361 assert_eq!("22023", value.code);
1362 assert_eq!(msg, value.message);
1363 } else {
1364 panic!("test_invalid_parameter failed");
1365 }
1366 }
1367}