1mod bytea;
16mod datetime;
17mod error;
18mod interval;
19
20use std::collections::HashMap;
21use std::ops::Deref;
22
23use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
24use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datafusion_expr::LogicalPlan;
27use datatypes::arrow::datatypes::DataType as ArrowDataType;
28use datatypes::prelude::{ConcreteDataType, Value};
29use datatypes::schema::Schema;
30use datatypes::types::{json_type_value_to_string, IntervalType, TimestampType};
31use datatypes::value::ListValue;
32use pgwire::api::portal::{Format, Portal};
33use pgwire::api::results::{DataRowEncoder, FieldInfo};
34use pgwire::api::Type;
35use pgwire::error::{PgWireError, PgWireResult};
36use session::context::QueryContextRef;
37use session::session_config::PGByteaOutputValue;
38use snafu::ResultExt;
39
40use self::bytea::{EscapeOutputBytea, HexOutputBytea};
41use self::datetime::{StylingDate, StylingDateTime};
42pub use self::error::{PgErrorCode, PgErrorSeverity};
43use self::interval::PgInterval;
44use crate::error::{self as server_error, DataFusionSnafu, Error, Result};
45use crate::postgres::utils::convert_err;
46use crate::SqlPlan;
47
48pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result<Vec<FieldInfo>> {
49 origin
50 .column_schemas()
51 .iter()
52 .enumerate()
53 .map(|(idx, col)| {
54 Ok(FieldInfo::new(
55 col.name.clone(),
56 None,
57 None,
58 type_gt_to_pg(&col.data_type)?,
59 field_formats.format_for(idx),
60 ))
61 })
62 .collect::<Result<Vec<FieldInfo>>>()
63}
64
65fn encode_array(
66 query_ctx: &QueryContextRef,
67 value_list: &ListValue,
68 builder: &mut DataRowEncoder,
69) -> PgWireResult<()> {
70 match value_list.datatype() {
71 &ConcreteDataType::Boolean(_) => {
72 let array = value_list
73 .items()
74 .iter()
75 .map(|v| match v {
76 Value::Null => Ok(None),
77 Value::Boolean(v) => Ok(Some(*v)),
78 _ => Err(convert_err(Error::Internal {
79 err_msg: format!("Invalid list item type, find {v:?}, expected bool",),
80 })),
81 })
82 .collect::<PgWireResult<Vec<Option<bool>>>>()?;
83 builder.encode_field(&array)
84 }
85 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => {
86 let array = value_list
87 .items()
88 .iter()
89 .map(|v| match v {
90 Value::Null => Ok(None),
91 Value::Int8(v) => Ok(Some(*v)),
92 Value::UInt8(v) => Ok(Some(*v as i8)),
93 _ => Err(convert_err(Error::Internal {
94 err_msg: format!(
95 "Invalid list item type, find {v:?}, expected int8 or uint8",
96 ),
97 })),
98 })
99 .collect::<PgWireResult<Vec<Option<i8>>>>()?;
100 builder.encode_field(&array)
101 }
102 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => {
103 let array = value_list
104 .items()
105 .iter()
106 .map(|v| match v {
107 Value::Null => Ok(None),
108 Value::Int16(v) => Ok(Some(*v)),
109 Value::UInt16(v) => Ok(Some(*v as i16)),
110 _ => Err(convert_err(Error::Internal {
111 err_msg: format!(
112 "Invalid list item type, find {v:?}, expected int16 or uint16",
113 ),
114 })),
115 })
116 .collect::<PgWireResult<Vec<Option<i16>>>>()?;
117 builder.encode_field(&array)
118 }
119 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => {
120 let array = value_list
121 .items()
122 .iter()
123 .map(|v| match v {
124 Value::Null => Ok(None),
125 Value::Int32(v) => Ok(Some(*v)),
126 Value::UInt32(v) => Ok(Some(*v as i32)),
127 _ => Err(convert_err(Error::Internal {
128 err_msg: format!(
129 "Invalid list item type, find {v:?}, expected int32 or uint32",
130 ),
131 })),
132 })
133 .collect::<PgWireResult<Vec<Option<i32>>>>()?;
134 builder.encode_field(&array)
135 }
136 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => {
137 let array = value_list
138 .items()
139 .iter()
140 .map(|v| match v {
141 Value::Null => Ok(None),
142 Value::Int64(v) => Ok(Some(*v)),
143 Value::UInt64(v) => Ok(Some(*v as i64)),
144 _ => Err(convert_err(Error::Internal {
145 err_msg: format!(
146 "Invalid list item type, find {v:?}, expected int64 or uint64",
147 ),
148 })),
149 })
150 .collect::<PgWireResult<Vec<Option<i64>>>>()?;
151 builder.encode_field(&array)
152 }
153 &ConcreteDataType::Float32(_) => {
154 let array = value_list
155 .items()
156 .iter()
157 .map(|v| match v {
158 Value::Null => Ok(None),
159 Value::Float32(v) => Ok(Some(v.0)),
160 _ => Err(convert_err(Error::Internal {
161 err_msg: format!("Invalid list item type, find {v:?}, expected float32",),
162 })),
163 })
164 .collect::<PgWireResult<Vec<Option<f32>>>>()?;
165 builder.encode_field(&array)
166 }
167 &ConcreteDataType::Float64(_) => {
168 let array = value_list
169 .items()
170 .iter()
171 .map(|v| match v {
172 Value::Null => Ok(None),
173 Value::Float64(v) => Ok(Some(v.0)),
174 _ => Err(convert_err(Error::Internal {
175 err_msg: format!("Invalid list item type, find {v:?}, expected float64",),
176 })),
177 })
178 .collect::<PgWireResult<Vec<Option<f64>>>>()?;
179 builder.encode_field(&array)
180 }
181 &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => {
182 let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
183
184 match *bytea_output {
185 PGByteaOutputValue::ESCAPE => {
186 let array = value_list
187 .items()
188 .iter()
189 .map(|v| match v {
190 Value::Null => Ok(None),
191 Value::Binary(v) => Ok(Some(EscapeOutputBytea(v.deref()))),
192
193 _ => Err(convert_err(Error::Internal {
194 err_msg: format!(
195 "Invalid list item type, find {v:?}, expected binary",
196 ),
197 })),
198 })
199 .collect::<PgWireResult<Vec<Option<EscapeOutputBytea>>>>()?;
200 builder.encode_field(&array)
201 }
202 PGByteaOutputValue::HEX => {
203 let array = value_list
204 .items()
205 .iter()
206 .map(|v| match v {
207 Value::Null => Ok(None),
208 Value::Binary(v) => Ok(Some(HexOutputBytea(v.deref()))),
209
210 _ => Err(convert_err(Error::Internal {
211 err_msg: format!(
212 "Invalid list item type, find {v:?}, expected binary",
213 ),
214 })),
215 })
216 .collect::<PgWireResult<Vec<Option<HexOutputBytea>>>>()?;
217 builder.encode_field(&array)
218 }
219 }
220 }
221 &ConcreteDataType::String(_) => {
222 let array = value_list
223 .items()
224 .iter()
225 .map(|v| match v {
226 Value::Null => Ok(None),
227 Value::String(v) => Ok(Some(v.as_utf8())),
228 _ => Err(convert_err(Error::Internal {
229 err_msg: format!("Invalid list item type, find {v:?}, expected string",),
230 })),
231 })
232 .collect::<PgWireResult<Vec<Option<&str>>>>()?;
233 builder.encode_field(&array)
234 }
235 &ConcreteDataType::Date(_) => {
236 let array = value_list
237 .items()
238 .iter()
239 .map(|v| match v {
240 Value::Null => Ok(None),
241 Value::Date(v) => {
242 if let Some(date) = v.to_chrono_date() {
243 let (style, order) =
244 *query_ctx.configuration_parameter().pg_datetime_style();
245 Ok(Some(StylingDate(date, style, order)))
246 } else {
247 Err(convert_err(Error::Internal {
248 err_msg: format!("Failed to convert date to postgres type {v:?}",),
249 }))
250 }
251 }
252 _ => Err(convert_err(Error::Internal {
253 err_msg: format!("Invalid list item type, find {v:?}, expected date",),
254 })),
255 })
256 .collect::<PgWireResult<Vec<Option<StylingDate>>>>()?;
257 builder.encode_field(&array)
258 }
259 &ConcreteDataType::Timestamp(_) => {
260 let array = value_list
261 .items()
262 .iter()
263 .map(|v| match v {
264 Value::Null => Ok(None),
265 Value::Timestamp(v) => {
266 if let Some(datetime) =
267 v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
268 {
269 let (style, order) =
270 *query_ctx.configuration_parameter().pg_datetime_style();
271 Ok(Some(StylingDateTime(datetime, style, order)))
272 } else {
273 Err(convert_err(Error::Internal {
274 err_msg: format!("Failed to convert date to postgres type {v:?}",),
275 }))
276 }
277 }
278 _ => Err(convert_err(Error::Internal {
279 err_msg: format!("Invalid list item type, find {v:?}, expected timestamp",),
280 })),
281 })
282 .collect::<PgWireResult<Vec<Option<StylingDateTime>>>>()?;
283 builder.encode_field(&array)
284 }
285 &ConcreteDataType::Time(_) => {
286 let array = value_list
287 .items()
288 .iter()
289 .map(|v| match v {
290 Value::Null => Ok(None),
291 Value::Time(v) => Ok(v.to_chrono_time()),
292 _ => Err(convert_err(Error::Internal {
293 err_msg: format!("Invalid list item type, find {v:?}, expected time",),
294 })),
295 })
296 .collect::<PgWireResult<Vec<Option<NaiveTime>>>>()?;
297 builder.encode_field(&array)
298 }
299 &ConcreteDataType::Interval(_) => {
300 let array = value_list
301 .items()
302 .iter()
303 .map(|v| match v {
304 Value::Null => Ok(None),
305 Value::IntervalYearMonth(v) => Ok(Some(PgInterval::from(*v))),
306 Value::IntervalDayTime(v) => Ok(Some(PgInterval::from(*v))),
307 Value::IntervalMonthDayNano(v) => Ok(Some(PgInterval::from(*v))),
308 _ => Err(convert_err(Error::Internal {
309 err_msg: format!("Invalid list item type, find {v:?}, expected interval",),
310 })),
311 })
312 .collect::<PgWireResult<Vec<Option<PgInterval>>>>()?;
313 builder.encode_field(&array)
314 }
315 &ConcreteDataType::Decimal128(_) => {
316 let array = value_list
317 .items()
318 .iter()
319 .map(|v| match v {
320 Value::Null => Ok(None),
321 Value::Decimal128(v) => Ok(Some(v.to_string())),
322 _ => Err(convert_err(Error::Internal {
323 err_msg: format!("Invalid list item type, find {v:?}, expected decimal",),
324 })),
325 })
326 .collect::<PgWireResult<Vec<Option<String>>>>()?;
327 builder.encode_field(&array)
328 }
329 &ConcreteDataType::Json(j) => {
330 let array = value_list
331 .items()
332 .iter()
333 .map(|v| match v {
334 Value::Null => Ok(None),
335 Value::Binary(v) => {
336 let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
337 Ok(Some(s))
338 }
339 _ => Err(convert_err(Error::Internal {
340 err_msg: format!("Invalid list item type, find {v:?}, expected json",),
341 })),
342 })
343 .collect::<PgWireResult<Vec<Option<String>>>>()?;
344 builder.encode_field(&array)
345 }
346 _ => Err(convert_err(Error::Internal {
347 err_msg: format!(
348 "cannot write array type {:?} in postgres protocol: unimplemented",
349 value_list.datatype()
350 ),
351 })),
352 }
353}
354
355pub(super) fn encode_value(
356 query_ctx: &QueryContextRef,
357 value: &Value,
358 builder: &mut DataRowEncoder,
359 datatype: &ConcreteDataType,
360) -> PgWireResult<()> {
361 match value {
362 Value::Null => builder.encode_field(&None::<&i8>),
363 Value::Boolean(v) => builder.encode_field(v),
364 Value::UInt8(v) => builder.encode_field(&(*v as i8)),
365 Value::UInt16(v) => builder.encode_field(&(*v as i16)),
366 Value::UInt32(v) => builder.encode_field(v),
367 Value::UInt64(v) => builder.encode_field(&(*v as i64)),
368 Value::Int8(v) => builder.encode_field(v),
369 Value::Int16(v) => builder.encode_field(v),
370 Value::Int32(v) => builder.encode_field(v),
371 Value::Int64(v) => builder.encode_field(v),
372 Value::Float32(v) => builder.encode_field(&v.0),
373 Value::Float64(v) => builder.encode_field(&v.0),
374 Value::String(v) => builder.encode_field(&v.as_utf8()),
375 Value::Binary(v) => match datatype {
376 ConcreteDataType::Json(j) => {
377 let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
378 builder.encode_field(&s)
379 }
380 _ => {
381 let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
382 match *bytea_output {
383 PGByteaOutputValue::ESCAPE => {
384 builder.encode_field(&EscapeOutputBytea(v.deref()))
385 }
386 PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
387 }
388 }
389 },
390 Value::Date(v) => {
391 if let Some(date) = v.to_chrono_date() {
392 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
393 builder.encode_field(&StylingDate(date, style, order))
394 } else {
395 Err(convert_err(Error::Internal {
396 err_msg: format!("Failed to convert date to postgres type {v:?}",),
397 }))
398 }
399 }
400 Value::Timestamp(v) => {
401 if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
402 {
403 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
404 builder.encode_field(&StylingDateTime(datetime, style, order))
405 } else {
406 Err(convert_err(Error::Internal {
407 err_msg: format!("Failed to convert date to postgres type {v:?}",),
408 }))
409 }
410 }
411 Value::Time(v) => {
412 if let Some(time) = v.to_chrono_time() {
413 builder.encode_field(&time)
414 } else {
415 Err(convert_err(Error::Internal {
416 err_msg: format!("Failed to convert time to postgres type {v:?}",),
417 }))
418 }
419 }
420 Value::IntervalYearMonth(v) => builder.encode_field(&PgInterval::from(*v)),
421 Value::IntervalDayTime(v) => builder.encode_field(&PgInterval::from(*v)),
422 Value::IntervalMonthDayNano(v) => builder.encode_field(&PgInterval::from(*v)),
423 Value::Decimal128(v) => builder.encode_field(&v.to_string()),
424 Value::Duration(d) => match PgInterval::try_from(*d) {
425 Ok(i) => builder.encode_field(&i),
426 Err(e) => Err(convert_err(Error::Internal {
427 err_msg: e.to_string(),
428 })),
429 },
430 Value::List(values) => encode_array(query_ctx, values, builder),
431 }
432}
433
434pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
435 match origin {
436 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
437 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
438 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR),
439 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2),
440 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4),
441 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8),
442 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
443 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
444 &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
445 &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
446 &ConcreteDataType::Date(_) => Ok(Type::DATE),
447 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
448 &ConcreteDataType::Time(_) => Ok(Type::TIME),
449 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
450 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
451 &ConcreteDataType::Json(_) => Ok(Type::JSON),
452 ConcreteDataType::List(list) => match list.item_type() {
453 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
454 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
455 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR_ARRAY),
456 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2_ARRAY),
457 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4_ARRAY),
458 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8_ARRAY),
459 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
460 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
461 &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
462 &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
463 &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
464 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
465 &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
466 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
467 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
468 &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
469 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
470 &ConcreteDataType::Dictionary(_)
471 | &ConcreteDataType::Vector(_)
472 | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
473 data_type: origin,
474 reason: "not implemented",
475 }
476 .fail(),
477 },
478 &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
479 data_type: origin,
480 reason: "not implemented",
481 }
482 .fail(),
483 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
484 }
485}
486
487#[allow(dead_code)]
488pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
489 match origin {
491 &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
492 &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
493 &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
494 &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
495 &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
496 &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
497 &Type::TIMESTAMP => Ok(ConcreteDataType::timestamp_datatype(
498 common_time::timestamp::TimeUnit::Millisecond,
499 )),
500 &Type::DATE => Ok(ConcreteDataType::date_datatype()),
501 &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
502 common_time::timestamp::TimeUnit::Microsecond,
503 )),
504 &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
505 ConcreteDataType::int8_datatype(),
506 )),
507 &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(
508 ConcreteDataType::int16_datatype(),
509 )),
510 &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(
511 ConcreteDataType::int32_datatype(),
512 )),
513 &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(
514 ConcreteDataType::int64_datatype(),
515 )),
516 &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
517 ConcreteDataType::string_datatype(),
518 )),
519 _ => server_error::InternalSnafu {
520 err_msg: format!("unimplemented datatype {origin:?}"),
521 }
522 .fail(),
523 }
524}
525
526pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
527 let param_type = portal.statement.parameter_types.get(idx).unwrap();
530 match param_type {
531 &Type::VARCHAR | &Type::TEXT => Ok(format!(
532 "'{}'",
533 portal
534 .parameter::<String>(idx, param_type)?
535 .as_deref()
536 .unwrap_or("")
537 )),
538 &Type::BOOL => Ok(portal
539 .parameter::<bool>(idx, param_type)?
540 .map(|v| v.to_string())
541 .unwrap_or_else(|| "".to_owned())),
542 &Type::INT4 => Ok(portal
543 .parameter::<i32>(idx, param_type)?
544 .map(|v| v.to_string())
545 .unwrap_or_else(|| "".to_owned())),
546 &Type::INT8 => Ok(portal
547 .parameter::<i64>(idx, param_type)?
548 .map(|v| v.to_string())
549 .unwrap_or_else(|| "".to_owned())),
550 &Type::FLOAT4 => Ok(portal
551 .parameter::<f32>(idx, param_type)?
552 .map(|v| v.to_string())
553 .unwrap_or_else(|| "".to_owned())),
554 &Type::FLOAT8 => Ok(portal
555 .parameter::<f64>(idx, param_type)?
556 .map(|v| v.to_string())
557 .unwrap_or_else(|| "".to_owned())),
558 &Type::DATE => Ok(portal
559 .parameter::<NaiveDate>(idx, param_type)?
560 .map(|v| v.format("%Y-%m-%d").to_string())
561 .unwrap_or_else(|| "".to_owned())),
562 &Type::TIMESTAMP => Ok(portal
563 .parameter::<NaiveDateTime>(idx, param_type)?
564 .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
565 .unwrap_or_else(|| "".to_owned())),
566 &Type::INTERVAL => Ok(portal
567 .parameter::<PgInterval>(idx, param_type)?
568 .map(|v| v.to_string())
569 .unwrap_or_else(|| "".to_owned())),
570 _ => Err(invalid_parameter_error(
571 "unsupported_parameter_type",
572 Some(param_type.to_string()),
573 )),
574 }
575}
576
577pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
578 let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
579 error_info.detail = detail;
580 PgWireError::UserError(Box::new(error_info))
581}
582
583fn to_timestamp_scalar_value<T>(
584 data: Option<T>,
585 unit: &TimestampType,
586 ctype: &ConcreteDataType,
587) -> PgWireResult<ScalarValue>
588where
589 T: Into<i64>,
590{
591 if let Some(n) = data {
592 Value::Timestamp(unit.create_timestamp(n.into()))
593 .try_to_scalar_value(ctype)
594 .map_err(convert_err)
595 } else {
596 Ok(ScalarValue::Null)
597 }
598}
599
600pub(super) fn parameters_to_scalar_values(
601 plan: &LogicalPlan,
602 portal: &Portal<SqlPlan>,
603) -> PgWireResult<Vec<ScalarValue>> {
604 let param_count = portal.parameter_len();
605 let mut results = Vec::with_capacity(param_count);
606
607 let client_param_types = &portal.statement.parameter_types;
608 let param_types = plan
609 .get_parameter_types()
610 .context(DataFusionSnafu)
611 .map_err(convert_err)?
612 .into_iter()
613 .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
614 .collect::<HashMap<_, _>>();
615
616 for idx in 0..param_count {
617 let server_type = param_types
618 .get(&format!("${}", idx + 1))
619 .and_then(|t| t.as_ref());
620
621 let client_type = if let Some(client_given_type) = client_param_types.get(idx) {
622 client_given_type.clone()
623 } else if let Some(server_provided_type) = &server_type {
624 type_gt_to_pg(server_provided_type).map_err(convert_err)?
625 } else {
626 return Err(invalid_parameter_error(
627 "unknown_parameter_type",
628 Some(format!(
629 "Cannot get parameter type information for parameter {}",
630 idx
631 )),
632 ));
633 };
634
635 let value = match &client_type {
636 &Type::VARCHAR | &Type::TEXT => {
637 let data = portal.parameter::<String>(idx, &client_type)?;
638 if let Some(server_type) = &server_type {
639 match server_type {
640 ConcreteDataType::String(_) => ScalarValue::Utf8(data),
641 _ => {
642 return Err(invalid_parameter_error(
643 "invalid_parameter_type",
644 Some(format!("Expected: {}, found: {}", server_type, client_type)),
645 ))
646 }
647 }
648 } else {
649 ScalarValue::Utf8(data)
650 }
651 }
652 &Type::BOOL => {
653 let data = portal.parameter::<bool>(idx, &client_type)?;
654 if let Some(server_type) = &server_type {
655 match server_type {
656 ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
657 _ => {
658 return Err(invalid_parameter_error(
659 "invalid_parameter_type",
660 Some(format!("Expected: {}, found: {}", server_type, client_type)),
661 ))
662 }
663 }
664 } else {
665 ScalarValue::Boolean(data)
666 }
667 }
668 &Type::INT2 => {
669 let data = portal.parameter::<i16>(idx, &client_type)?;
670 if let Some(server_type) = &server_type {
671 match server_type {
672 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
673 ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
674 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
675 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
676 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
677 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
678 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
679 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
680 ConcreteDataType::Timestamp(unit) => {
681 to_timestamp_scalar_value(data, unit, server_type)?
682 }
683 _ => {
684 return Err(invalid_parameter_error(
685 "invalid_parameter_type",
686 Some(format!("Expected: {}, found: {}", server_type, client_type)),
687 ))
688 }
689 }
690 } else {
691 ScalarValue::Int16(data)
692 }
693 }
694 &Type::INT4 => {
695 let data = portal.parameter::<i32>(idx, &client_type)?;
696 if let Some(server_type) = &server_type {
697 match server_type {
698 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
699 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
700 ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
701 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
702 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
703 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
704 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
705 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
706 ConcreteDataType::Timestamp(unit) => {
707 to_timestamp_scalar_value(data, unit, server_type)?
708 }
709 _ => {
710 return Err(invalid_parameter_error(
711 "invalid_parameter_type",
712 Some(format!("Expected: {}, found: {}", server_type, client_type)),
713 ))
714 }
715 }
716 } else {
717 ScalarValue::Int32(data)
718 }
719 }
720 &Type::INT8 => {
721 let data = portal.parameter::<i64>(idx, &client_type)?;
722 if let Some(server_type) = &server_type {
723 match server_type {
724 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
725 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
726 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
727 ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
728 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
729 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
730 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
731 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
732 ConcreteDataType::Timestamp(unit) => {
733 to_timestamp_scalar_value(data, unit, server_type)?
734 }
735 _ => {
736 return Err(invalid_parameter_error(
737 "invalid_parameter_type",
738 Some(format!("Expected: {}, found: {}", server_type, client_type)),
739 ))
740 }
741 }
742 } else {
743 ScalarValue::Int64(data)
744 }
745 }
746 &Type::FLOAT4 => {
747 let data = portal.parameter::<f32>(idx, &client_type)?;
748 if let Some(server_type) = &server_type {
749 match server_type {
750 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
751 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
752 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
753 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
754 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
755 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
756 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
757 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
758 ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
759 ConcreteDataType::Float64(_) => {
760 ScalarValue::Float64(data.map(|n| n as f64))
761 }
762 _ => {
763 return Err(invalid_parameter_error(
764 "invalid_parameter_type",
765 Some(format!("Expected: {}, found: {}", server_type, client_type)),
766 ))
767 }
768 }
769 } else {
770 ScalarValue::Float32(data)
771 }
772 }
773 &Type::FLOAT8 => {
774 let data = portal.parameter::<f64>(idx, &client_type)?;
775 if let Some(server_type) = &server_type {
776 match server_type {
777 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
778 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
779 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
780 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
781 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
782 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
783 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
784 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
785 ConcreteDataType::Float32(_) => {
786 ScalarValue::Float32(data.map(|n| n as f32))
787 }
788 ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
789 _ => {
790 return Err(invalid_parameter_error(
791 "invalid_parameter_type",
792 Some(format!("Expected: {}, found: {}", server_type, client_type)),
793 ))
794 }
795 }
796 } else {
797 ScalarValue::Float64(data)
798 }
799 }
800 &Type::TIMESTAMP => {
801 let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
802 if let Some(server_type) = &server_type {
803 match server_type {
804 ConcreteDataType::Timestamp(unit) => match *unit {
805 TimestampType::Second(_) => ScalarValue::TimestampSecond(
806 data.map(|ts| ts.and_utc().timestamp()),
807 None,
808 ),
809 TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
810 data.map(|ts| ts.and_utc().timestamp_millis()),
811 None,
812 ),
813 TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
814 data.map(|ts| ts.and_utc().timestamp_micros()),
815 None,
816 ),
817 TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
818 data.map(|ts| ts.and_utc().timestamp_micros()),
819 None,
820 ),
821 },
822 _ => {
823 return Err(invalid_parameter_error(
824 "invalid_parameter_type",
825 Some(format!("Expected: {}, found: {}", server_type, client_type)),
826 ))
827 }
828 }
829 } else {
830 ScalarValue::TimestampMillisecond(
831 data.map(|ts| ts.and_utc().timestamp_millis()),
832 None,
833 )
834 }
835 }
836 &Type::DATE => {
837 let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
838 if let Some(server_type) = &server_type {
839 match server_type {
840 ConcreteDataType::Date(_) => ScalarValue::Date32(data.map(|d| {
841 (d - NaiveDate::from(NaiveDateTime::UNIX_EPOCH)).num_days() as i32
842 })),
843 _ => {
844 return Err(invalid_parameter_error(
845 "invalid_parameter_type",
846 Some(format!("Expected: {}, found: {}", server_type, client_type)),
847 ));
848 }
849 }
850 } else {
851 ScalarValue::Date32(data.map(|d| {
852 (d - NaiveDate::from(NaiveDateTime::UNIX_EPOCH)).num_days() as i32
853 }))
854 }
855 }
856 &Type::INTERVAL => {
857 let data = portal.parameter::<PgInterval>(idx, &client_type)?;
858 if let Some(server_type) = &server_type {
859 match server_type {
860 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
861 ScalarValue::IntervalYearMonth(
862 data.map(|i| {
863 if i.days != 0 || i.microseconds != 0 {
864 Err(invalid_parameter_error(
865 "invalid_parameter_type",
866 Some(format!(
867 "Expected: {}, found: {}",
868 server_type, client_type
869 )),
870 ))
871 } else {
872 Ok(IntervalYearMonth::new(i.months).to_i32())
873 }
874 })
875 .transpose()?,
876 )
877 }
878 ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
879 ScalarValue::IntervalDayTime(
880 data.map(|i| {
881 if i.months != 0 || i.microseconds % 1000 != 0 {
882 Err(invalid_parameter_error(
883 "invalid_parameter_type",
884 Some(format!(
885 "Expected: {}, found: {}",
886 server_type, client_type
887 )),
888 ))
889 } else {
890 Ok(IntervalDayTime::new(
891 i.days,
892 (i.microseconds / 1000) as i32,
893 )
894 .into())
895 }
896 })
897 .transpose()?,
898 )
899 }
900 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
901 ScalarValue::IntervalMonthDayNano(
902 data.map(|i| IntervalMonthDayNano::from(i).into()),
903 )
904 }
905 _ => {
906 return Err(invalid_parameter_error(
907 "invalid_parameter_type",
908 Some(format!("Expected: {}, found: {}", server_type, client_type)),
909 ));
910 }
911 }
912 } else {
913 ScalarValue::IntervalMonthDayNano(
914 data.map(|i| IntervalMonthDayNano::from(i).into()),
915 )
916 }
917 }
918 &Type::BYTEA => {
919 let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
920 if let Some(server_type) = &server_type {
921 match server_type {
922 ConcreteDataType::String(_) => {
923 ScalarValue::Utf8(data.map(|d| String::from_utf8_lossy(&d).to_string()))
924 }
925 ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
926 _ => {
927 return Err(invalid_parameter_error(
928 "invalid_parameter_type",
929 Some(format!("Expected: {}, found: {}", server_type, client_type)),
930 ));
931 }
932 }
933 } else {
934 ScalarValue::Binary(data)
935 }
936 }
937 &Type::JSONB => {
938 let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
939 if let Some(server_type) = &server_type {
940 match server_type {
941 ConcreteDataType::Binary(_) => {
942 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
943 }
944 _ => {
945 return Err(invalid_parameter_error(
946 "invalid_parameter_type",
947 Some(format!("Expected: {}, found: {}", server_type, client_type)),
948 ));
949 }
950 }
951 } else {
952 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
953 }
954 }
955 &Type::INT2_ARRAY => {
956 let data = portal.parameter::<Vec<i16>>(idx, &client_type)?;
957 if let Some(data) = data {
958 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
959 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
960 } else {
961 ScalarValue::Null
962 }
963 }
964 &Type::INT4_ARRAY => {
965 let data = portal.parameter::<Vec<i32>>(idx, &client_type)?;
966 if let Some(data) = data {
967 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
968 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
969 } else {
970 ScalarValue::Null
971 }
972 }
973 &Type::INT8_ARRAY => {
974 let data = portal.parameter::<Vec<i64>>(idx, &client_type)?;
975 if let Some(data) = data {
976 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
977 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
978 } else {
979 ScalarValue::Null
980 }
981 }
982 &Type::VARCHAR_ARRAY => {
983 let data = portal.parameter::<Vec<String>>(idx, &client_type)?;
984 if let Some(data) = data {
985 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
986 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
987 } else {
988 ScalarValue::Null
989 }
990 }
991 _ => Err(invalid_parameter_error(
992 "unsupported_parameter_value",
993 Some(format!("Found type: {}", client_type)),
994 ))?,
995 };
996
997 results.push(value);
998 }
999
1000 Ok(results)
1001}
1002
1003pub(super) fn param_types_to_pg_types(
1004 param_types: &HashMap<String, Option<ConcreteDataType>>,
1005) -> Result<Vec<Type>> {
1006 let param_count = param_types.len();
1007 let mut types = Vec::with_capacity(param_count);
1008 for i in 0..param_count {
1009 if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1010 let pg_type = type_gt_to_pg(param_type)?;
1011 types.push(pg_type);
1012 } else {
1013 types.push(Type::UNKNOWN);
1014 }
1015 }
1016 Ok(types)
1017}
1018
1019#[cfg(test)]
1020mod test {
1021 use std::sync::Arc;
1022
1023 use common_time::interval::IntervalUnit;
1024 use common_time::timestamp::TimeUnit;
1025 use common_time::Timestamp;
1026 use datatypes::schema::{ColumnSchema, Schema};
1027 use datatypes::value::ListValue;
1028 use pgwire::api::results::{FieldFormat, FieldInfo};
1029 use pgwire::api::Type;
1030 use session::context::QueryContextBuilder;
1031
1032 use super::*;
1033
1034 #[test]
1035 fn test_schema_convert() {
1036 let column_schemas = vec![
1037 ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1038 ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1039 ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1040 ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1041 ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1042 ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1043 ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1044 ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1045 ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1046 ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1047 ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1048 ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1049 ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1050 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1051 ColumnSchema::new(
1052 "timestamps",
1053 ConcreteDataType::timestamp_millisecond_datatype(),
1054 true,
1055 ),
1056 ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1057 ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1058 ColumnSchema::new(
1059 "intervals",
1060 ConcreteDataType::interval_month_day_nano_datatype(),
1061 true,
1062 ),
1063 ];
1064 let pg_field_info = vec![
1065 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1066 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1067 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1068 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1069 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1070 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1071 FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1072 FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1073 FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1074 FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1075 FieldInfo::new(
1076 "float32s".into(),
1077 None,
1078 None,
1079 Type::FLOAT4,
1080 FieldFormat::Text,
1081 ),
1082 FieldInfo::new(
1083 "float64s".into(),
1084 None,
1085 None,
1086 Type::FLOAT8,
1087 FieldFormat::Text,
1088 ),
1089 FieldInfo::new(
1090 "binaries".into(),
1091 None,
1092 None,
1093 Type::BYTEA,
1094 FieldFormat::Text,
1095 ),
1096 FieldInfo::new(
1097 "strings".into(),
1098 None,
1099 None,
1100 Type::VARCHAR,
1101 FieldFormat::Text,
1102 ),
1103 FieldInfo::new(
1104 "timestamps".into(),
1105 None,
1106 None,
1107 Type::TIMESTAMP,
1108 FieldFormat::Text,
1109 ),
1110 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1111 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1112 FieldInfo::new(
1113 "intervals".into(),
1114 None,
1115 None,
1116 Type::INTERVAL,
1117 FieldFormat::Text,
1118 ),
1119 ];
1120 let schema = Schema::new(column_schemas);
1121 let fs = schema_to_pg(&schema, &Format::UnifiedText).unwrap();
1122 assert_eq!(fs, pg_field_info);
1123 }
1124
1125 #[test]
1126 fn test_encode_text_format_data() {
1127 let schema = vec![
1128 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1129 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1130 FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1131 FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1132 FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1133 FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1134 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1135 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1136 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1137 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1138 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1139 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1140 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1141 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1142 FieldInfo::new(
1143 "float32s".into(),
1144 None,
1145 None,
1146 Type::FLOAT4,
1147 FieldFormat::Text,
1148 ),
1149 FieldInfo::new(
1150 "float32s".into(),
1151 None,
1152 None,
1153 Type::FLOAT4,
1154 FieldFormat::Text,
1155 ),
1156 FieldInfo::new(
1157 "float32s".into(),
1158 None,
1159 None,
1160 Type::FLOAT4,
1161 FieldFormat::Text,
1162 ),
1163 FieldInfo::new(
1164 "float64s".into(),
1165 None,
1166 None,
1167 Type::FLOAT8,
1168 FieldFormat::Text,
1169 ),
1170 FieldInfo::new(
1171 "float64s".into(),
1172 None,
1173 None,
1174 Type::FLOAT8,
1175 FieldFormat::Text,
1176 ),
1177 FieldInfo::new(
1178 "float64s".into(),
1179 None,
1180 None,
1181 Type::FLOAT8,
1182 FieldFormat::Text,
1183 ),
1184 FieldInfo::new(
1185 "strings".into(),
1186 None,
1187 None,
1188 Type::VARCHAR,
1189 FieldFormat::Text,
1190 ),
1191 FieldInfo::new(
1192 "binaries".into(),
1193 None,
1194 None,
1195 Type::BYTEA,
1196 FieldFormat::Text,
1197 ),
1198 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1199 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1200 FieldInfo::new(
1201 "timestamps".into(),
1202 None,
1203 None,
1204 Type::TIMESTAMP,
1205 FieldFormat::Text,
1206 ),
1207 FieldInfo::new(
1208 "interval_year_month".into(),
1209 None,
1210 None,
1211 Type::INTERVAL,
1212 FieldFormat::Text,
1213 ),
1214 FieldInfo::new(
1215 "interval_day_time".into(),
1216 None,
1217 None,
1218 Type::INTERVAL,
1219 FieldFormat::Text,
1220 ),
1221 FieldInfo::new(
1222 "interval_month_day_nano".into(),
1223 None,
1224 None,
1225 Type::INTERVAL,
1226 FieldFormat::Text,
1227 ),
1228 FieldInfo::new(
1229 "int_list".into(),
1230 None,
1231 None,
1232 Type::INT8_ARRAY,
1233 FieldFormat::Text,
1234 ),
1235 FieldInfo::new(
1236 "float_list".into(),
1237 None,
1238 None,
1239 Type::FLOAT8_ARRAY,
1240 FieldFormat::Text,
1241 ),
1242 FieldInfo::new(
1243 "string_list".into(),
1244 None,
1245 None,
1246 Type::VARCHAR_ARRAY,
1247 FieldFormat::Text,
1248 ),
1249 FieldInfo::new(
1250 "timestamp_list".into(),
1251 None,
1252 None,
1253 Type::TIMESTAMP_ARRAY,
1254 FieldFormat::Text,
1255 ),
1256 ];
1257
1258 let datatypes = vec![
1259 ConcreteDataType::null_datatype(),
1260 ConcreteDataType::boolean_datatype(),
1261 ConcreteDataType::uint8_datatype(),
1262 ConcreteDataType::uint16_datatype(),
1263 ConcreteDataType::uint32_datatype(),
1264 ConcreteDataType::uint64_datatype(),
1265 ConcreteDataType::int8_datatype(),
1266 ConcreteDataType::int8_datatype(),
1267 ConcreteDataType::int16_datatype(),
1268 ConcreteDataType::int16_datatype(),
1269 ConcreteDataType::int32_datatype(),
1270 ConcreteDataType::int32_datatype(),
1271 ConcreteDataType::int64_datatype(),
1272 ConcreteDataType::int64_datatype(),
1273 ConcreteDataType::float32_datatype(),
1274 ConcreteDataType::float32_datatype(),
1275 ConcreteDataType::float32_datatype(),
1276 ConcreteDataType::float64_datatype(),
1277 ConcreteDataType::float64_datatype(),
1278 ConcreteDataType::float64_datatype(),
1279 ConcreteDataType::string_datatype(),
1280 ConcreteDataType::binary_datatype(),
1281 ConcreteDataType::date_datatype(),
1282 ConcreteDataType::time_datatype(TimeUnit::Second),
1283 ConcreteDataType::timestamp_datatype(TimeUnit::Second),
1284 ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
1285 ConcreteDataType::interval_datatype(IntervalUnit::DayTime),
1286 ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
1287 ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()),
1288 ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
1289 ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
1290 ConcreteDataType::list_datatype(ConcreteDataType::timestamp_second_datatype()),
1291 ];
1292 let values = vec![
1293 Value::Null,
1294 Value::Boolean(true),
1295 Value::UInt8(u8::MAX),
1296 Value::UInt16(u16::MAX),
1297 Value::UInt32(u32::MAX),
1298 Value::UInt64(u64::MAX),
1299 Value::Int8(i8::MAX),
1300 Value::Int8(i8::MIN),
1301 Value::Int16(i16::MAX),
1302 Value::Int16(i16::MIN),
1303 Value::Int32(i32::MAX),
1304 Value::Int32(i32::MIN),
1305 Value::Int64(i64::MAX),
1306 Value::Int64(i64::MIN),
1307 Value::Float32(f32::MAX.into()),
1308 Value::Float32(f32::MIN.into()),
1309 Value::Float32(0f32.into()),
1310 Value::Float64(f64::MAX.into()),
1311 Value::Float64(f64::MIN.into()),
1312 Value::Float64(0f64.into()),
1313 Value::String("greptime".into()),
1314 Value::Binary("greptime".as_bytes().into()),
1315 Value::Date(1001i32.into()),
1316 Value::Time(1001i64.into()),
1317 Value::Timestamp(1000001i64.into()),
1318 Value::IntervalYearMonth(IntervalYearMonth::new(1)),
1319 Value::IntervalDayTime(IntervalDayTime::new(1, 10)),
1320 Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 10)),
1321 Value::List(ListValue::new(
1322 vec![Value::Int64(1i64)],
1323 ConcreteDataType::int64_datatype(),
1324 )),
1325 Value::List(ListValue::new(
1326 vec![Value::Float64(1.0f64.into())],
1327 ConcreteDataType::float64_datatype(),
1328 )),
1329 Value::List(ListValue::new(
1330 vec![Value::String("tom".into())],
1331 ConcreteDataType::string_datatype(),
1332 )),
1333 Value::List(ListValue::new(
1334 vec![Value::Timestamp(Timestamp::new(1i64, TimeUnit::Second))],
1335 ConcreteDataType::timestamp_second_datatype(),
1336 )),
1337 ];
1338 let query_context = QueryContextBuilder::default()
1339 .configuration_parameter(Default::default())
1340 .build()
1341 .into();
1342 let mut builder = DataRowEncoder::new(Arc::new(schema));
1343 for (value, datatype) in values.iter().zip(datatypes) {
1344 encode_value(&query_context, value, &mut builder, &datatype).unwrap();
1345 }
1346 }
1347
1348 #[test]
1349 fn test_invalid_parameter() {
1350 let msg = "invalid_parameter_count";
1352 let error = invalid_parameter_error(msg, None);
1353 if let PgWireError::UserError(value) = error {
1354 assert_eq!("ERROR", value.severity);
1355 assert_eq!("22023", value.code);
1356 assert_eq!(msg, value.message);
1357 } else {
1358 panic!("test_invalid_parameter failed");
1359 }
1360 }
1361}