1mod bytea;
16mod datetime;
17mod error;
18mod interval;
19
20use std::collections::HashMap;
21use std::ops::Deref;
22
23use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
24use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datafusion_expr::LogicalPlan;
27use datatypes::arrow::datatypes::DataType as ArrowDataType;
28use datatypes::prelude::{ConcreteDataType, Value};
29use datatypes::schema::Schema;
30use datatypes::types::{IntervalType, TimestampType, json_type_value_to_string};
31use datatypes::value::{ListValue, StructValue};
32use pgwire::api::Type;
33use pgwire::api::portal::{Format, Portal};
34use pgwire::api::results::{DataRowEncoder, FieldInfo};
35use pgwire::error::{PgWireError, PgWireResult};
36use session::context::QueryContextRef;
37use session::session_config::PGByteaOutputValue;
38use snafu::ResultExt;
39
40use self::bytea::{EscapeOutputBytea, HexOutputBytea};
41use self::datetime::{StylingDate, StylingDateTime};
42pub use self::error::{PgErrorCode, PgErrorSeverity};
43use self::interval::PgInterval;
44use crate::SqlPlan;
45use crate::error::{self as server_error, DataFusionSnafu, Error, Result};
46use crate::postgres::utils::convert_err;
47
48pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result<Vec<FieldInfo>> {
49 origin
50 .column_schemas()
51 .iter()
52 .enumerate()
53 .map(|(idx, col)| {
54 Ok(FieldInfo::new(
55 col.name.clone(),
56 None,
57 None,
58 type_gt_to_pg(&col.data_type)?,
59 field_formats.format_for(idx),
60 ))
61 })
62 .collect::<Result<Vec<FieldInfo>>>()
63}
64
65fn encode_struct(
66 _query_ctx: &QueryContextRef,
67 _struct_value: &StructValue,
68 _builder: &mut DataRowEncoder,
69) -> PgWireResult<()> {
70 todo!("how to encode struct for postgres");
71}
72
73fn encode_array(
74 query_ctx: &QueryContextRef,
75 value_list: &ListValue,
76 builder: &mut DataRowEncoder,
77) -> PgWireResult<()> {
78 match value_list.datatype() {
79 &ConcreteDataType::Boolean(_) => {
80 let array = value_list
81 .items()
82 .iter()
83 .map(|v| match v {
84 Value::Null => Ok(None),
85 Value::Boolean(v) => Ok(Some(*v)),
86 _ => Err(convert_err(Error::Internal {
87 err_msg: format!("Invalid list item type, find {v:?}, expected bool",),
88 })),
89 })
90 .collect::<PgWireResult<Vec<Option<bool>>>>()?;
91 builder.encode_field(&array)
92 }
93 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => {
94 let array = value_list
95 .items()
96 .iter()
97 .map(|v| match v {
98 Value::Null => Ok(None),
99 Value::Int8(v) => Ok(Some(*v)),
100 Value::UInt8(v) => Ok(Some(*v as i8)),
101 _ => Err(convert_err(Error::Internal {
102 err_msg: format!(
103 "Invalid list item type, find {v:?}, expected int8 or uint8",
104 ),
105 })),
106 })
107 .collect::<PgWireResult<Vec<Option<i8>>>>()?;
108 builder.encode_field(&array)
109 }
110 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => {
111 let array = value_list
112 .items()
113 .iter()
114 .map(|v| match v {
115 Value::Null => Ok(None),
116 Value::Int16(v) => Ok(Some(*v)),
117 Value::UInt16(v) => Ok(Some(*v as i16)),
118 _ => Err(convert_err(Error::Internal {
119 err_msg: format!(
120 "Invalid list item type, find {v:?}, expected int16 or uint16",
121 ),
122 })),
123 })
124 .collect::<PgWireResult<Vec<Option<i16>>>>()?;
125 builder.encode_field(&array)
126 }
127 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => {
128 let array = value_list
129 .items()
130 .iter()
131 .map(|v| match v {
132 Value::Null => Ok(None),
133 Value::Int32(v) => Ok(Some(*v)),
134 Value::UInt32(v) => Ok(Some(*v as i32)),
135 _ => Err(convert_err(Error::Internal {
136 err_msg: format!(
137 "Invalid list item type, find {v:?}, expected int32 or uint32",
138 ),
139 })),
140 })
141 .collect::<PgWireResult<Vec<Option<i32>>>>()?;
142 builder.encode_field(&array)
143 }
144 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => {
145 let array = value_list
146 .items()
147 .iter()
148 .map(|v| match v {
149 Value::Null => Ok(None),
150 Value::Int64(v) => Ok(Some(*v)),
151 Value::UInt64(v) => Ok(Some(*v as i64)),
152 _ => Err(convert_err(Error::Internal {
153 err_msg: format!(
154 "Invalid list item type, find {v:?}, expected int64 or uint64",
155 ),
156 })),
157 })
158 .collect::<PgWireResult<Vec<Option<i64>>>>()?;
159 builder.encode_field(&array)
160 }
161 &ConcreteDataType::Float32(_) => {
162 let array = value_list
163 .items()
164 .iter()
165 .map(|v| match v {
166 Value::Null => Ok(None),
167 Value::Float32(v) => Ok(Some(v.0)),
168 _ => Err(convert_err(Error::Internal {
169 err_msg: format!("Invalid list item type, find {v:?}, expected float32",),
170 })),
171 })
172 .collect::<PgWireResult<Vec<Option<f32>>>>()?;
173 builder.encode_field(&array)
174 }
175 &ConcreteDataType::Float64(_) => {
176 let array = value_list
177 .items()
178 .iter()
179 .map(|v| match v {
180 Value::Null => Ok(None),
181 Value::Float64(v) => Ok(Some(v.0)),
182 _ => Err(convert_err(Error::Internal {
183 err_msg: format!("Invalid list item type, find {v:?}, expected float64",),
184 })),
185 })
186 .collect::<PgWireResult<Vec<Option<f64>>>>()?;
187 builder.encode_field(&array)
188 }
189 &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => {
190 let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
191
192 match *bytea_output {
193 PGByteaOutputValue::ESCAPE => {
194 let array = value_list
195 .items()
196 .iter()
197 .map(|v| match v {
198 Value::Null => Ok(None),
199 Value::Binary(v) => Ok(Some(EscapeOutputBytea(v.deref()))),
200
201 _ => Err(convert_err(Error::Internal {
202 err_msg: format!(
203 "Invalid list item type, find {v:?}, expected binary",
204 ),
205 })),
206 })
207 .collect::<PgWireResult<Vec<Option<EscapeOutputBytea>>>>()?;
208 builder.encode_field(&array)
209 }
210 PGByteaOutputValue::HEX => {
211 let array = value_list
212 .items()
213 .iter()
214 .map(|v| match v {
215 Value::Null => Ok(None),
216 Value::Binary(v) => Ok(Some(HexOutputBytea(v.deref()))),
217
218 _ => Err(convert_err(Error::Internal {
219 err_msg: format!(
220 "Invalid list item type, find {v:?}, expected binary",
221 ),
222 })),
223 })
224 .collect::<PgWireResult<Vec<Option<HexOutputBytea>>>>()?;
225 builder.encode_field(&array)
226 }
227 }
228 }
229 &ConcreteDataType::String(_) => {
230 let array = value_list
231 .items()
232 .iter()
233 .map(|v| match v {
234 Value::Null => Ok(None),
235 Value::String(v) => Ok(Some(v.as_utf8())),
236 _ => Err(convert_err(Error::Internal {
237 err_msg: format!("Invalid list item type, find {v:?}, expected string",),
238 })),
239 })
240 .collect::<PgWireResult<Vec<Option<&str>>>>()?;
241 builder.encode_field(&array)
242 }
243 &ConcreteDataType::Date(_) => {
244 let array = value_list
245 .items()
246 .iter()
247 .map(|v| match v {
248 Value::Null => Ok(None),
249 Value::Date(v) => {
250 if let Some(date) = v.to_chrono_date() {
251 let (style, order) =
252 *query_ctx.configuration_parameter().pg_datetime_style();
253 Ok(Some(StylingDate(date, style, order)))
254 } else {
255 Err(convert_err(Error::Internal {
256 err_msg: format!("Failed to convert date to postgres type {v:?}",),
257 }))
258 }
259 }
260 _ => Err(convert_err(Error::Internal {
261 err_msg: format!("Invalid list item type, find {v:?}, expected date",),
262 })),
263 })
264 .collect::<PgWireResult<Vec<Option<StylingDate>>>>()?;
265 builder.encode_field(&array)
266 }
267 &ConcreteDataType::Timestamp(_) => {
268 let array = value_list
269 .items()
270 .iter()
271 .map(|v| match v {
272 Value::Null => Ok(None),
273 Value::Timestamp(v) => {
274 if let Some(datetime) =
275 v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
276 {
277 let (style, order) =
278 *query_ctx.configuration_parameter().pg_datetime_style();
279 Ok(Some(StylingDateTime(datetime, style, order)))
280 } else {
281 Err(convert_err(Error::Internal {
282 err_msg: format!("Failed to convert date to postgres type {v:?}",),
283 }))
284 }
285 }
286 _ => Err(convert_err(Error::Internal {
287 err_msg: format!("Invalid list item type, find {v:?}, expected timestamp",),
288 })),
289 })
290 .collect::<PgWireResult<Vec<Option<StylingDateTime>>>>()?;
291 builder.encode_field(&array)
292 }
293 &ConcreteDataType::Time(_) => {
294 let array = value_list
295 .items()
296 .iter()
297 .map(|v| match v {
298 Value::Null => Ok(None),
299 Value::Time(v) => Ok(v.to_chrono_time()),
300 _ => Err(convert_err(Error::Internal {
301 err_msg: format!("Invalid list item type, find {v:?}, expected time",),
302 })),
303 })
304 .collect::<PgWireResult<Vec<Option<NaiveTime>>>>()?;
305 builder.encode_field(&array)
306 }
307 &ConcreteDataType::Interval(_) => {
308 let array = value_list
309 .items()
310 .iter()
311 .map(|v| match v {
312 Value::Null => Ok(None),
313 Value::IntervalYearMonth(v) => Ok(Some(PgInterval::from(*v))),
314 Value::IntervalDayTime(v) => Ok(Some(PgInterval::from(*v))),
315 Value::IntervalMonthDayNano(v) => Ok(Some(PgInterval::from(*v))),
316 _ => Err(convert_err(Error::Internal {
317 err_msg: format!("Invalid list item type, find {v:?}, expected interval",),
318 })),
319 })
320 .collect::<PgWireResult<Vec<Option<PgInterval>>>>()?;
321 builder.encode_field(&array)
322 }
323 &ConcreteDataType::Decimal128(_) => {
324 let array = value_list
325 .items()
326 .iter()
327 .map(|v| match v {
328 Value::Null => Ok(None),
329 Value::Decimal128(v) => Ok(Some(v.to_string())),
330 _ => Err(convert_err(Error::Internal {
331 err_msg: format!("Invalid list item type, find {v:?}, expected decimal",),
332 })),
333 })
334 .collect::<PgWireResult<Vec<Option<String>>>>()?;
335 builder.encode_field(&array)
336 }
337 &ConcreteDataType::Json(j) => {
338 let array = value_list
339 .items()
340 .iter()
341 .map(|v| match v {
342 Value::Null => Ok(None),
343 Value::Binary(v) => {
344 let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
345 Ok(Some(s))
346 }
347 _ => Err(convert_err(Error::Internal {
348 err_msg: format!("Invalid list item type, find {v:?}, expected json",),
349 })),
350 })
351 .collect::<PgWireResult<Vec<Option<String>>>>()?;
352 builder.encode_field(&array)
353 }
354 _ => Err(convert_err(Error::Internal {
355 err_msg: format!(
356 "cannot write array type {:?} in postgres protocol: unimplemented",
357 value_list.datatype()
358 ),
359 })),
360 }
361}
362
363pub(super) fn encode_value(
364 query_ctx: &QueryContextRef,
365 value: &Value,
366 builder: &mut DataRowEncoder,
367 datatype: &ConcreteDataType,
368) -> PgWireResult<()> {
369 match value {
370 Value::Null => builder.encode_field(&None::<&i8>),
371 Value::Boolean(v) => builder.encode_field(v),
372 Value::UInt8(v) => builder.encode_field(&(*v as i8)),
373 Value::UInt16(v) => builder.encode_field(&(*v as i16)),
374 Value::UInt32(v) => builder.encode_field(v),
375 Value::UInt64(v) => builder.encode_field(&(*v as i64)),
376 Value::Int8(v) => builder.encode_field(v),
377 Value::Int16(v) => builder.encode_field(v),
378 Value::Int32(v) => builder.encode_field(v),
379 Value::Int64(v) => builder.encode_field(v),
380 Value::Float32(v) => builder.encode_field(&v.0),
381 Value::Float64(v) => builder.encode_field(&v.0),
382 Value::String(v) => builder.encode_field(&v.as_utf8()),
383 Value::Binary(v) => match datatype {
384 ConcreteDataType::Json(j) => {
385 let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
386 builder.encode_field(&s)
387 }
388 _ => {
389 let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
390 match *bytea_output {
391 PGByteaOutputValue::ESCAPE => {
392 builder.encode_field(&EscapeOutputBytea(v.deref()))
393 }
394 PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
395 }
396 }
397 },
398 Value::Date(v) => {
399 if let Some(date) = v.to_chrono_date() {
400 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
401 builder.encode_field(&StylingDate(date, style, order))
402 } else {
403 Err(convert_err(Error::Internal {
404 err_msg: format!("Failed to convert date to postgres type {v:?}",),
405 }))
406 }
407 }
408 Value::Timestamp(v) => {
409 if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
410 {
411 let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
412 builder.encode_field(&StylingDateTime(datetime, style, order))
413 } else {
414 Err(convert_err(Error::Internal {
415 err_msg: format!("Failed to convert date to postgres type {v:?}",),
416 }))
417 }
418 }
419 Value::Time(v) => {
420 if let Some(time) = v.to_chrono_time() {
421 builder.encode_field(&time)
422 } else {
423 Err(convert_err(Error::Internal {
424 err_msg: format!("Failed to convert time to postgres type {v:?}",),
425 }))
426 }
427 }
428 Value::IntervalYearMonth(v) => builder.encode_field(&PgInterval::from(*v)),
429 Value::IntervalDayTime(v) => builder.encode_field(&PgInterval::from(*v)),
430 Value::IntervalMonthDayNano(v) => builder.encode_field(&PgInterval::from(*v)),
431 Value::Decimal128(v) => builder.encode_field(&v.to_string()),
432 Value::Duration(d) => match PgInterval::try_from(*d) {
433 Ok(i) => builder.encode_field(&i),
434 Err(e) => Err(convert_err(Error::Internal {
435 err_msg: e.to_string(),
436 })),
437 },
438 Value::List(values) => encode_array(query_ctx, values, builder),
439 Value::Struct(values) => encode_struct(query_ctx, values, builder),
440 }
441}
442
443pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
444 match origin {
445 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
446 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
447 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR),
448 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2),
449 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4),
450 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8),
451 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
452 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
453 &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
454 &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
455 &ConcreteDataType::Date(_) => Ok(Type::DATE),
456 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
457 &ConcreteDataType::Time(_) => Ok(Type::TIME),
458 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
459 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
460 &ConcreteDataType::Json(_) => Ok(Type::JSON),
461 ConcreteDataType::List(list) => match list.item_type() {
462 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
463 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
464 &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR_ARRAY),
465 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2_ARRAY),
466 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4_ARRAY),
467 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8_ARRAY),
468 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
469 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
470 &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
471 &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
472 &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
473 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
474 &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
475 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
476 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
477 &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
478 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
479 &ConcreteDataType::Struct(_) => Ok(Type::RECORD_ARRAY),
482 &ConcreteDataType::Dictionary(_)
483 | &ConcreteDataType::Vector(_)
484 | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
485 data_type: origin,
486 reason: "not implemented",
487 }
488 .fail(),
489 },
490 &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
491 data_type: origin,
492 reason: "not implemented",
493 }
494 .fail(),
495 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
496 &ConcreteDataType::Struct(_) => Ok(Type::RECORD),
497 }
498}
499
500#[allow(dead_code)]
501pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
502 match origin {
504 &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
505 &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
506 &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
507 &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
508 &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
509 &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
510 &Type::TIMESTAMP => Ok(ConcreteDataType::timestamp_datatype(
511 common_time::timestamp::TimeUnit::Millisecond,
512 )),
513 &Type::DATE => Ok(ConcreteDataType::date_datatype()),
514 &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
515 common_time::timestamp::TimeUnit::Microsecond,
516 )),
517 &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
518 ConcreteDataType::int8_datatype(),
519 )),
520 &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(
521 ConcreteDataType::int16_datatype(),
522 )),
523 &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(
524 ConcreteDataType::int32_datatype(),
525 )),
526 &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(
527 ConcreteDataType::int64_datatype(),
528 )),
529 &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
530 ConcreteDataType::string_datatype(),
531 )),
532 _ => server_error::InternalSnafu {
533 err_msg: format!("unimplemented datatype {origin:?}"),
534 }
535 .fail(),
536 }
537}
538
539pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
540 let param_type = portal.statement.parameter_types.get(idx).unwrap();
543 match param_type {
544 &Type::VARCHAR | &Type::TEXT => Ok(format!(
545 "'{}'",
546 portal
547 .parameter::<String>(idx, param_type)?
548 .as_deref()
549 .unwrap_or("")
550 )),
551 &Type::BOOL => Ok(portal
552 .parameter::<bool>(idx, param_type)?
553 .map(|v| v.to_string())
554 .unwrap_or_else(|| "".to_owned())),
555 &Type::INT4 => Ok(portal
556 .parameter::<i32>(idx, param_type)?
557 .map(|v| v.to_string())
558 .unwrap_or_else(|| "".to_owned())),
559 &Type::INT8 => Ok(portal
560 .parameter::<i64>(idx, param_type)?
561 .map(|v| v.to_string())
562 .unwrap_or_else(|| "".to_owned())),
563 &Type::FLOAT4 => Ok(portal
564 .parameter::<f32>(idx, param_type)?
565 .map(|v| v.to_string())
566 .unwrap_or_else(|| "".to_owned())),
567 &Type::FLOAT8 => Ok(portal
568 .parameter::<f64>(idx, param_type)?
569 .map(|v| v.to_string())
570 .unwrap_or_else(|| "".to_owned())),
571 &Type::DATE => Ok(portal
572 .parameter::<NaiveDate>(idx, param_type)?
573 .map(|v| v.format("%Y-%m-%d").to_string())
574 .unwrap_or_else(|| "".to_owned())),
575 &Type::TIMESTAMP => Ok(portal
576 .parameter::<NaiveDateTime>(idx, param_type)?
577 .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
578 .unwrap_or_else(|| "".to_owned())),
579 &Type::INTERVAL => Ok(portal
580 .parameter::<PgInterval>(idx, param_type)?
581 .map(|v| v.to_string())
582 .unwrap_or_else(|| "".to_owned())),
583 _ => Err(invalid_parameter_error(
584 "unsupported_parameter_type",
585 Some(param_type.to_string()),
586 )),
587 }
588}
589
590pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
591 let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
592 error_info.detail = detail;
593 PgWireError::UserError(Box::new(error_info))
594}
595
596fn to_timestamp_scalar_value<T>(
597 data: Option<T>,
598 unit: &TimestampType,
599 ctype: &ConcreteDataType,
600) -> PgWireResult<ScalarValue>
601where
602 T: Into<i64>,
603{
604 if let Some(n) = data {
605 Value::Timestamp(unit.create_timestamp(n.into()))
606 .try_to_scalar_value(ctype)
607 .map_err(convert_err)
608 } else {
609 Ok(ScalarValue::Null)
610 }
611}
612
613pub(super) fn parameters_to_scalar_values(
614 plan: &LogicalPlan,
615 portal: &Portal<SqlPlan>,
616) -> PgWireResult<Vec<ScalarValue>> {
617 let param_count = portal.parameter_len();
618 let mut results = Vec::with_capacity(param_count);
619
620 let client_param_types = &portal.statement.parameter_types;
621 let param_types = plan
622 .get_parameter_types()
623 .context(DataFusionSnafu)
624 .map_err(convert_err)?
625 .into_iter()
626 .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
627 .collect::<HashMap<_, _>>();
628
629 for idx in 0..param_count {
630 let server_type = param_types
631 .get(&format!("${}", idx + 1))
632 .and_then(|t| t.as_ref());
633
634 let client_type = if let Some(client_given_type) = client_param_types.get(idx) {
635 client_given_type.clone()
636 } else if let Some(server_provided_type) = &server_type {
637 type_gt_to_pg(server_provided_type).map_err(convert_err)?
638 } else {
639 return Err(invalid_parameter_error(
640 "unknown_parameter_type",
641 Some(format!(
642 "Cannot get parameter type information for parameter {}",
643 idx
644 )),
645 ));
646 };
647
648 let value = match &client_type {
649 &Type::VARCHAR | &Type::TEXT => {
650 let data = portal.parameter::<String>(idx, &client_type)?;
651 if let Some(server_type) = &server_type {
652 match server_type {
653 ConcreteDataType::String(_) => ScalarValue::Utf8(data),
654 _ => {
655 return Err(invalid_parameter_error(
656 "invalid_parameter_type",
657 Some(format!("Expected: {}, found: {}", server_type, client_type)),
658 ));
659 }
660 }
661 } else {
662 ScalarValue::Utf8(data)
663 }
664 }
665 &Type::BOOL => {
666 let data = portal.parameter::<bool>(idx, &client_type)?;
667 if let Some(server_type) = &server_type {
668 match server_type {
669 ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
670 _ => {
671 return Err(invalid_parameter_error(
672 "invalid_parameter_type",
673 Some(format!("Expected: {}, found: {}", server_type, client_type)),
674 ));
675 }
676 }
677 } else {
678 ScalarValue::Boolean(data)
679 }
680 }
681 &Type::INT2 => {
682 let data = portal.parameter::<i16>(idx, &client_type)?;
683 if let Some(server_type) = &server_type {
684 match server_type {
685 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
686 ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
687 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
688 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
689 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
690 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
691 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
692 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
693 ConcreteDataType::Timestamp(unit) => {
694 to_timestamp_scalar_value(data, unit, server_type)?
695 }
696 _ => {
697 return Err(invalid_parameter_error(
698 "invalid_parameter_type",
699 Some(format!("Expected: {}, found: {}", server_type, client_type)),
700 ));
701 }
702 }
703 } else {
704 ScalarValue::Int16(data)
705 }
706 }
707 &Type::INT4 => {
708 let data = portal.parameter::<i32>(idx, &client_type)?;
709 if let Some(server_type) = &server_type {
710 match server_type {
711 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
712 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
713 ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
714 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
715 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
716 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
717 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
718 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
719 ConcreteDataType::Timestamp(unit) => {
720 to_timestamp_scalar_value(data, unit, server_type)?
721 }
722 _ => {
723 return Err(invalid_parameter_error(
724 "invalid_parameter_type",
725 Some(format!("Expected: {}, found: {}", server_type, client_type)),
726 ));
727 }
728 }
729 } else {
730 ScalarValue::Int32(data)
731 }
732 }
733 &Type::INT8 => {
734 let data = portal.parameter::<i64>(idx, &client_type)?;
735 if let Some(server_type) = &server_type {
736 match server_type {
737 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
738 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
739 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
740 ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
741 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
742 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
743 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
744 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
745 ConcreteDataType::Timestamp(unit) => {
746 to_timestamp_scalar_value(data, unit, server_type)?
747 }
748 _ => {
749 return Err(invalid_parameter_error(
750 "invalid_parameter_type",
751 Some(format!("Expected: {}, found: {}", server_type, client_type)),
752 ));
753 }
754 }
755 } else {
756 ScalarValue::Int64(data)
757 }
758 }
759 &Type::FLOAT4 => {
760 let data = portal.parameter::<f32>(idx, &client_type)?;
761 if let Some(server_type) = &server_type {
762 match server_type {
763 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
764 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
765 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
766 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
767 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
768 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
769 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
770 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
771 ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
772 ConcreteDataType::Float64(_) => {
773 ScalarValue::Float64(data.map(|n| n as f64))
774 }
775 _ => {
776 return Err(invalid_parameter_error(
777 "invalid_parameter_type",
778 Some(format!("Expected: {}, found: {}", server_type, client_type)),
779 ));
780 }
781 }
782 } else {
783 ScalarValue::Float32(data)
784 }
785 }
786 &Type::FLOAT8 => {
787 let data = portal.parameter::<f64>(idx, &client_type)?;
788 if let Some(server_type) = &server_type {
789 match server_type {
790 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
791 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
792 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
793 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
794 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
795 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
796 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
797 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
798 ConcreteDataType::Float32(_) => {
799 ScalarValue::Float32(data.map(|n| n as f32))
800 }
801 ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
802 _ => {
803 return Err(invalid_parameter_error(
804 "invalid_parameter_type",
805 Some(format!("Expected: {}, found: {}", server_type, client_type)),
806 ));
807 }
808 }
809 } else {
810 ScalarValue::Float64(data)
811 }
812 }
813 &Type::TIMESTAMP => {
814 let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
815 if let Some(server_type) = &server_type {
816 match server_type {
817 ConcreteDataType::Timestamp(unit) => match *unit {
818 TimestampType::Second(_) => ScalarValue::TimestampSecond(
819 data.map(|ts| ts.and_utc().timestamp()),
820 None,
821 ),
822 TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
823 data.map(|ts| ts.and_utc().timestamp_millis()),
824 None,
825 ),
826 TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
827 data.map(|ts| ts.and_utc().timestamp_micros()),
828 None,
829 ),
830 TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
831 data.map(|ts| ts.and_utc().timestamp_micros()),
832 None,
833 ),
834 },
835 _ => {
836 return Err(invalid_parameter_error(
837 "invalid_parameter_type",
838 Some(format!("Expected: {}, found: {}", server_type, client_type)),
839 ));
840 }
841 }
842 } else {
843 ScalarValue::TimestampMillisecond(
844 data.map(|ts| ts.and_utc().timestamp_millis()),
845 None,
846 )
847 }
848 }
849 &Type::DATE => {
850 let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
851 if let Some(server_type) = &server_type {
852 match server_type {
853 ConcreteDataType::Date(_) => ScalarValue::Date32(
854 data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
855 ),
856 _ => {
857 return Err(invalid_parameter_error(
858 "invalid_parameter_type",
859 Some(format!("Expected: {}, found: {}", server_type, client_type)),
860 ));
861 }
862 }
863 } else {
864 ScalarValue::Date32(
865 data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
866 )
867 }
868 }
869 &Type::INTERVAL => {
870 let data = portal.parameter::<PgInterval>(idx, &client_type)?;
871 if let Some(server_type) = &server_type {
872 match server_type {
873 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
874 ScalarValue::IntervalYearMonth(
875 data.map(|i| {
876 if i.days != 0 || i.microseconds != 0 {
877 Err(invalid_parameter_error(
878 "invalid_parameter_type",
879 Some(format!(
880 "Expected: {}, found: {}",
881 server_type, client_type
882 )),
883 ))
884 } else {
885 Ok(IntervalYearMonth::new(i.months).to_i32())
886 }
887 })
888 .transpose()?,
889 )
890 }
891 ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
892 ScalarValue::IntervalDayTime(
893 data.map(|i| {
894 if i.months != 0 || i.microseconds % 1000 != 0 {
895 Err(invalid_parameter_error(
896 "invalid_parameter_type",
897 Some(format!(
898 "Expected: {}, found: {}",
899 server_type, client_type
900 )),
901 ))
902 } else {
903 Ok(IntervalDayTime::new(
904 i.days,
905 (i.microseconds / 1000) as i32,
906 )
907 .into())
908 }
909 })
910 .transpose()?,
911 )
912 }
913 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
914 ScalarValue::IntervalMonthDayNano(
915 data.map(|i| IntervalMonthDayNano::from(i).into()),
916 )
917 }
918 _ => {
919 return Err(invalid_parameter_error(
920 "invalid_parameter_type",
921 Some(format!("Expected: {}, found: {}", server_type, client_type)),
922 ));
923 }
924 }
925 } else {
926 ScalarValue::IntervalMonthDayNano(
927 data.map(|i| IntervalMonthDayNano::from(i).into()),
928 )
929 }
930 }
931 &Type::BYTEA => {
932 let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
933 if let Some(server_type) = &server_type {
934 match server_type {
935 ConcreteDataType::String(_) => {
936 ScalarValue::Utf8(data.map(|d| String::from_utf8_lossy(&d).to_string()))
937 }
938 ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
939 _ => {
940 return Err(invalid_parameter_error(
941 "invalid_parameter_type",
942 Some(format!("Expected: {}, found: {}", server_type, client_type)),
943 ));
944 }
945 }
946 } else {
947 ScalarValue::Binary(data)
948 }
949 }
950 &Type::JSONB => {
951 let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
952 if let Some(server_type) = &server_type {
953 match server_type {
954 ConcreteDataType::Binary(_) => {
955 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
956 }
957 _ => {
958 return Err(invalid_parameter_error(
959 "invalid_parameter_type",
960 Some(format!("Expected: {}, found: {}", server_type, client_type)),
961 ));
962 }
963 }
964 } else {
965 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
966 }
967 }
968 &Type::INT2_ARRAY => {
969 let data = portal.parameter::<Vec<i16>>(idx, &client_type)?;
970 if let Some(data) = data {
971 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
972 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
973 } else {
974 ScalarValue::Null
975 }
976 }
977 &Type::INT4_ARRAY => {
978 let data = portal.parameter::<Vec<i32>>(idx, &client_type)?;
979 if let Some(data) = data {
980 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
981 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
982 } else {
983 ScalarValue::Null
984 }
985 }
986 &Type::INT8_ARRAY => {
987 let data = portal.parameter::<Vec<i64>>(idx, &client_type)?;
988 if let Some(data) = data {
989 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
990 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
991 } else {
992 ScalarValue::Null
993 }
994 }
995 &Type::VARCHAR_ARRAY => {
996 let data = portal.parameter::<Vec<String>>(idx, &client_type)?;
997 if let Some(data) = data {
998 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
999 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
1000 } else {
1001 ScalarValue::Null
1002 }
1003 }
1004 _ => Err(invalid_parameter_error(
1005 "unsupported_parameter_value",
1006 Some(format!("Found type: {}", client_type)),
1007 ))?,
1008 };
1009
1010 results.push(value);
1011 }
1012
1013 Ok(results)
1014}
1015
1016pub(super) fn param_types_to_pg_types(
1017 param_types: &HashMap<String, Option<ConcreteDataType>>,
1018) -> Result<Vec<Type>> {
1019 let param_count = param_types.len();
1020 let mut types = Vec::with_capacity(param_count);
1021 for i in 0..param_count {
1022 if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1023 let pg_type = type_gt_to_pg(param_type)?;
1024 types.push(pg_type);
1025 } else {
1026 types.push(Type::UNKNOWN);
1027 }
1028 }
1029 Ok(types)
1030}
1031
1032#[cfg(test)]
1033mod test {
1034 use std::sync::Arc;
1035
1036 use common_time::Timestamp;
1037 use common_time::interval::IntervalUnit;
1038 use common_time::timestamp::TimeUnit;
1039 use datatypes::schema::{ColumnSchema, Schema};
1040 use datatypes::value::ListValue;
1041 use pgwire::api::Type;
1042 use pgwire::api::results::{FieldFormat, FieldInfo};
1043 use session::context::QueryContextBuilder;
1044
1045 use super::*;
1046
1047 #[test]
1048 fn test_schema_convert() {
1049 let column_schemas = vec![
1050 ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1051 ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1052 ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1053 ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1054 ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1055 ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1056 ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1057 ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1058 ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1059 ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1060 ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1061 ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1062 ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1063 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1064 ColumnSchema::new(
1065 "timestamps",
1066 ConcreteDataType::timestamp_millisecond_datatype(),
1067 true,
1068 ),
1069 ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1070 ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1071 ColumnSchema::new(
1072 "intervals",
1073 ConcreteDataType::interval_month_day_nano_datatype(),
1074 true,
1075 ),
1076 ];
1077 let pg_field_info = vec![
1078 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1079 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1080 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1081 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1082 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1083 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1084 FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1085 FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1086 FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1087 FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1088 FieldInfo::new(
1089 "float32s".into(),
1090 None,
1091 None,
1092 Type::FLOAT4,
1093 FieldFormat::Text,
1094 ),
1095 FieldInfo::new(
1096 "float64s".into(),
1097 None,
1098 None,
1099 Type::FLOAT8,
1100 FieldFormat::Text,
1101 ),
1102 FieldInfo::new(
1103 "binaries".into(),
1104 None,
1105 None,
1106 Type::BYTEA,
1107 FieldFormat::Text,
1108 ),
1109 FieldInfo::new(
1110 "strings".into(),
1111 None,
1112 None,
1113 Type::VARCHAR,
1114 FieldFormat::Text,
1115 ),
1116 FieldInfo::new(
1117 "timestamps".into(),
1118 None,
1119 None,
1120 Type::TIMESTAMP,
1121 FieldFormat::Text,
1122 ),
1123 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1124 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1125 FieldInfo::new(
1126 "intervals".into(),
1127 None,
1128 None,
1129 Type::INTERVAL,
1130 FieldFormat::Text,
1131 ),
1132 ];
1133 let schema = Schema::new(column_schemas);
1134 let fs = schema_to_pg(&schema, &Format::UnifiedText).unwrap();
1135 assert_eq!(fs, pg_field_info);
1136 }
1137
1138 #[test]
1139 fn test_encode_text_format_data() {
1140 let schema = vec![
1141 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1142 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1143 FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1144 FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1145 FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1146 FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1147 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1148 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1149 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1150 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1151 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1152 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1153 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1154 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1155 FieldInfo::new(
1156 "float32s".into(),
1157 None,
1158 None,
1159 Type::FLOAT4,
1160 FieldFormat::Text,
1161 ),
1162 FieldInfo::new(
1163 "float32s".into(),
1164 None,
1165 None,
1166 Type::FLOAT4,
1167 FieldFormat::Text,
1168 ),
1169 FieldInfo::new(
1170 "float32s".into(),
1171 None,
1172 None,
1173 Type::FLOAT4,
1174 FieldFormat::Text,
1175 ),
1176 FieldInfo::new(
1177 "float64s".into(),
1178 None,
1179 None,
1180 Type::FLOAT8,
1181 FieldFormat::Text,
1182 ),
1183 FieldInfo::new(
1184 "float64s".into(),
1185 None,
1186 None,
1187 Type::FLOAT8,
1188 FieldFormat::Text,
1189 ),
1190 FieldInfo::new(
1191 "float64s".into(),
1192 None,
1193 None,
1194 Type::FLOAT8,
1195 FieldFormat::Text,
1196 ),
1197 FieldInfo::new(
1198 "strings".into(),
1199 None,
1200 None,
1201 Type::VARCHAR,
1202 FieldFormat::Text,
1203 ),
1204 FieldInfo::new(
1205 "binaries".into(),
1206 None,
1207 None,
1208 Type::BYTEA,
1209 FieldFormat::Text,
1210 ),
1211 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1212 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1213 FieldInfo::new(
1214 "timestamps".into(),
1215 None,
1216 None,
1217 Type::TIMESTAMP,
1218 FieldFormat::Text,
1219 ),
1220 FieldInfo::new(
1221 "interval_year_month".into(),
1222 None,
1223 None,
1224 Type::INTERVAL,
1225 FieldFormat::Text,
1226 ),
1227 FieldInfo::new(
1228 "interval_day_time".into(),
1229 None,
1230 None,
1231 Type::INTERVAL,
1232 FieldFormat::Text,
1233 ),
1234 FieldInfo::new(
1235 "interval_month_day_nano".into(),
1236 None,
1237 None,
1238 Type::INTERVAL,
1239 FieldFormat::Text,
1240 ),
1241 FieldInfo::new(
1242 "int_list".into(),
1243 None,
1244 None,
1245 Type::INT8_ARRAY,
1246 FieldFormat::Text,
1247 ),
1248 FieldInfo::new(
1249 "float_list".into(),
1250 None,
1251 None,
1252 Type::FLOAT8_ARRAY,
1253 FieldFormat::Text,
1254 ),
1255 FieldInfo::new(
1256 "string_list".into(),
1257 None,
1258 None,
1259 Type::VARCHAR_ARRAY,
1260 FieldFormat::Text,
1261 ),
1262 FieldInfo::new(
1263 "timestamp_list".into(),
1264 None,
1265 None,
1266 Type::TIMESTAMP_ARRAY,
1267 FieldFormat::Text,
1268 ),
1269 ];
1270
1271 let datatypes = vec![
1272 ConcreteDataType::null_datatype(),
1273 ConcreteDataType::boolean_datatype(),
1274 ConcreteDataType::uint8_datatype(),
1275 ConcreteDataType::uint16_datatype(),
1276 ConcreteDataType::uint32_datatype(),
1277 ConcreteDataType::uint64_datatype(),
1278 ConcreteDataType::int8_datatype(),
1279 ConcreteDataType::int8_datatype(),
1280 ConcreteDataType::int16_datatype(),
1281 ConcreteDataType::int16_datatype(),
1282 ConcreteDataType::int32_datatype(),
1283 ConcreteDataType::int32_datatype(),
1284 ConcreteDataType::int64_datatype(),
1285 ConcreteDataType::int64_datatype(),
1286 ConcreteDataType::float32_datatype(),
1287 ConcreteDataType::float32_datatype(),
1288 ConcreteDataType::float32_datatype(),
1289 ConcreteDataType::float64_datatype(),
1290 ConcreteDataType::float64_datatype(),
1291 ConcreteDataType::float64_datatype(),
1292 ConcreteDataType::string_datatype(),
1293 ConcreteDataType::binary_datatype(),
1294 ConcreteDataType::date_datatype(),
1295 ConcreteDataType::time_datatype(TimeUnit::Second),
1296 ConcreteDataType::timestamp_datatype(TimeUnit::Second),
1297 ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
1298 ConcreteDataType::interval_datatype(IntervalUnit::DayTime),
1299 ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
1300 ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()),
1301 ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
1302 ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
1303 ConcreteDataType::list_datatype(ConcreteDataType::timestamp_second_datatype()),
1304 ];
1305 let values = vec![
1306 Value::Null,
1307 Value::Boolean(true),
1308 Value::UInt8(u8::MAX),
1309 Value::UInt16(u16::MAX),
1310 Value::UInt32(u32::MAX),
1311 Value::UInt64(u64::MAX),
1312 Value::Int8(i8::MAX),
1313 Value::Int8(i8::MIN),
1314 Value::Int16(i16::MAX),
1315 Value::Int16(i16::MIN),
1316 Value::Int32(i32::MAX),
1317 Value::Int32(i32::MIN),
1318 Value::Int64(i64::MAX),
1319 Value::Int64(i64::MIN),
1320 Value::Float32(f32::MAX.into()),
1321 Value::Float32(f32::MIN.into()),
1322 Value::Float32(0f32.into()),
1323 Value::Float64(f64::MAX.into()),
1324 Value::Float64(f64::MIN.into()),
1325 Value::Float64(0f64.into()),
1326 Value::String("greptime".into()),
1327 Value::Binary("greptime".as_bytes().into()),
1328 Value::Date(1001i32.into()),
1329 Value::Time(1001i64.into()),
1330 Value::Timestamp(1000001i64.into()),
1331 Value::IntervalYearMonth(IntervalYearMonth::new(1)),
1332 Value::IntervalDayTime(IntervalDayTime::new(1, 10)),
1333 Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 10)),
1334 Value::List(ListValue::new(
1335 vec![Value::Int64(1i64)],
1336 ConcreteDataType::int64_datatype(),
1337 )),
1338 Value::List(ListValue::new(
1339 vec![Value::Float64(1.0f64.into())],
1340 ConcreteDataType::float64_datatype(),
1341 )),
1342 Value::List(ListValue::new(
1343 vec![Value::String("tom".into())],
1344 ConcreteDataType::string_datatype(),
1345 )),
1346 Value::List(ListValue::new(
1347 vec![Value::Timestamp(Timestamp::new(1i64, TimeUnit::Second))],
1348 ConcreteDataType::timestamp_second_datatype(),
1349 )),
1350 ];
1351 let query_context = QueryContextBuilder::default()
1352 .configuration_parameter(Default::default())
1353 .build()
1354 .into();
1355 let mut builder = DataRowEncoder::new(Arc::new(schema));
1356 for (value, datatype) in values.iter().zip(datatypes) {
1357 encode_value(&query_context, value, &mut builder, &datatype).unwrap();
1358 }
1359 }
1360
1361 #[test]
1362 fn test_invalid_parameter() {
1363 let msg = "invalid_parameter_count";
1365 let error = invalid_parameter_error(msg, None);
1366 if let PgWireError::UserError(value) = error {
1367 assert_eq!("ERROR", value.severity);
1368 assert_eq!("22023", value.code);
1369 assert_eq!(msg, value.message);
1370 } else {
1371 panic!("test_invalid_parameter failed");
1372 }
1373 }
1374}