1mod error;
16
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use arrow::array::{Array, AsArray};
23use arrow_pg::encoder::{Encoder, encode_value};
24use arrow_pg::list_encoder::encode_list;
25use arrow_schema::{DataType, TimeUnit};
26use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime};
27use common_recordbatch::RecordBatch;
28use common_recordbatch::error::Result as RecordBatchResult;
29use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::arrow::datatypes::DataType as ArrowDataType;
33use datatypes::json::JsonStructureSettings;
34use datatypes::prelude::{ConcreteDataType, Value};
35use datatypes::schema::{Schema, SchemaRef};
36use datatypes::types::{Decimal128Type, IntervalType, TimestampType, jsonb_to_string};
37use datatypes::value::StructValue;
38use futures::Stream;
39use pg_interval::Interval as PgInterval;
40use pgwire::api::Type;
41use pgwire::api::portal::{Format, Portal};
42use pgwire::api::results::FieldInfo;
43use pgwire::error::{PgWireError, PgWireResult};
44use pgwire::types::format::FormatOptions as PgFormatOptions;
45use query::planner::DfLogicalPlanner;
46use rust_decimal::Decimal;
47use rust_decimal::prelude::ToPrimitive;
48use session::context::QueryContextRef;
49use snafu::ResultExt;
50
51pub use self::error::{PgErrorCode, PgErrorSeverity};
52use crate::error::{self as server_error, InferParameterTypesSnafu, Result};
53use crate::postgres::handler::PgSqlPlan;
54use crate::postgres::utils::convert_err;
55
56pub(super) fn schema_to_pg(
57 origin: &Schema,
58 field_formats: &Format,
59 format_options: Option<Arc<PgFormatOptions>>,
60) -> Result<Vec<FieldInfo>> {
61 origin
62 .column_schemas()
63 .iter()
64 .enumerate()
65 .map(|(idx, col)| {
66 let mut field_info = FieldInfo::new(
67 col.name.clone(),
68 None,
69 None,
70 type_gt_to_pg(&col.data_type)?,
71 field_formats.format_for(idx),
72 );
73 if let Some(format_options) = &format_options {
74 field_info = field_info.with_format_options(format_options.clone());
75 }
76 Ok(field_info)
77 })
78 .collect::<Result<Vec<FieldInfo>>>()
79}
80
81fn encode_struct<S: Encoder>(
91 _query_ctx: &QueryContextRef,
92 struct_value: StructValue,
93 builder: &mut S,
94 pg_field: &FieldInfo,
95) -> PgWireResult<()> {
96 let encoding_setting = JsonStructureSettings::Structured(None);
97 let json_value = encoding_setting
98 .decode(Value::Struct(struct_value))
99 .map_err(|e| PgWireError::ApiError(Box::new(e)))?;
100
101 builder.encode_field(&json_value, pg_field)
102}
103
104pub(crate) struct RecordBatchRowStream<S, B>
105where
106 S: Encoder,
107 B: Stream<Item = RecordBatchResult<RecordBatch>>,
108{
109 query_ctx: QueryContextRef,
110 pg_schema: Arc<Vec<FieldInfo>>,
111 schema: SchemaRef,
112 record_batches: Pin<Box<B>>,
113 encoder: S,
114}
115
116impl<S, B> Stream for RecordBatchRowStream<S, B>
117where
118 S: Encoder + Unpin,
119 B: Stream<Item = RecordBatchResult<RecordBatch>>,
120{
121 type Item = PgWireResult<Vec<S::Item>>;
122
123 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124 match self.record_batches.as_mut().poll_next(cx) {
125 Poll::Ready(Some(Ok(batch))) => {
126 let record_batch = batch.into_df_record_batch();
127 let num_rows = record_batch.num_rows();
128
129 if num_rows == 0 {
130 return Poll::Ready(Some(Ok(vec![])));
131 }
132
133 let arrow_schema = record_batch.schema();
134 let query_ctx = self.query_ctx.clone();
135 let pg_schema = self.pg_schema.clone();
136 let schema = self.schema.clone();
137 let mut results = Vec::with_capacity(num_rows);
138
139 for i in 0..num_rows {
140 if let Err(e) = Self::encode_row(
141 &query_ctx,
142 &pg_schema,
143 &schema,
144 arrow_schema.as_ref(),
145 &mut self.encoder,
146 &record_batch,
147 i,
148 ) {
149 return Poll::Ready(Some(Err(e)));
150 }
151 results.push(self.encoder.take_row());
152 }
153
154 Poll::Ready(Some(Ok(results)))
155 }
156 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(convert_err(e)))),
157 Poll::Ready(None) => Poll::Ready(None),
158 Poll::Pending => Poll::Pending,
159 }
160 }
161}
162
163impl<S, B> RecordBatchRowStream<S, B>
164where
165 S: Encoder,
166 B: Stream<Item = RecordBatchResult<RecordBatch>>,
167{
168 pub(crate) fn new(
169 query_ctx: QueryContextRef,
170 pg_schema: Arc<Vec<FieldInfo>>,
171 schema: SchemaRef,
172 record_batches: B,
173 encoder: S,
174 ) -> Self {
175 Self {
176 query_ctx,
177 pg_schema,
178 schema,
179 record_batches: Box::pin(record_batches),
180 encoder,
181 }
182 }
183
184 fn encode_row(
185 query_ctx: &QueryContextRef,
186 pg_schema: &Arc<Vec<FieldInfo>>,
187 schema: &SchemaRef,
188 arrow_schema: &arrow::datatypes::Schema,
189 encoder: &mut S,
190 record_batch: &arrow::record_batch::RecordBatch,
191 i: usize,
192 ) -> PgWireResult<()> {
193 for (j, column) in record_batch.columns().iter().enumerate() {
194 let pg_field = &pg_schema[j];
195
196 if column.is_null(i) {
197 encoder.encode_field(&None::<&i8>, pg_field)?;
198 continue;
199 }
200
201 match column.data_type() {
202 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
204 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
206 let v = datatypes::arrow_array::binary_array_value(column, i);
207 let s = jsonb_to_string(v).map_err(convert_err)?;
208 encoder.encode_field(&s, pg_field)?;
209 } else {
210 let arrow_field = arrow_schema.field(j);
212 encode_value(encoder, column, i, arrow_field, pg_field)?;
213 }
214 }
215
216 DataType::List(_) => {
217 let array = column.as_list::<i32>();
218 let items = array.value(i);
219
220 encode_list(encoder, items, pg_field)?;
221 }
222 DataType::Struct(_) => {
223 encode_struct(query_ctx, Default::default(), encoder, pg_field)?;
224 }
225 _ => {
226 let arrow_field = arrow_schema.field(j);
228 encode_value(encoder, column, i, arrow_field, pg_field)?;
229 }
230 }
231 }
232 Ok(())
233 }
234}
235
236pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
237 match origin {
238 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
239 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
240 &ConcreteDataType::Int8(_) => Ok(Type::INT2),
241 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2),
242 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4),
243 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8),
244 &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC),
245 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
246 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
247 &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
248 &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
249 &ConcreteDataType::Date(_) => Ok(Type::DATE),
250 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
251 &ConcreteDataType::Time(_) => Ok(Type::TIME),
252 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
253 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
254 &ConcreteDataType::Json(_) => Ok(Type::JSON),
255 ConcreteDataType::List(list) => match list.item_type() {
256 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
257 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
258 &ConcreteDataType::Int8(_) => Ok(Type::INT2_ARRAY),
259 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2_ARRAY),
260 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4_ARRAY),
261 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8_ARRAY),
262 &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC_ARRAY),
263 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
264 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
265 &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
266 &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
267 &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
268 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
269 &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
270 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
271 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
272 &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
273 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
274 &ConcreteDataType::Struct(_) => Ok(Type::JSON_ARRAY),
275 &ConcreteDataType::Dictionary(_)
276 | &ConcreteDataType::Vector(_)
277 | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
278 data_type: origin,
279 reason: "not implemented",
280 }
281 .fail(),
282 },
283 &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
284 data_type: origin,
285 reason: "not implemented",
286 }
287 .fail(),
288 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
289 &ConcreteDataType::Struct(_) => Ok(Type::JSON),
290 }
291}
292
293#[allow(dead_code)]
294pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
295 match origin {
297 &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
298 &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
299 &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
300 &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
301 &Type::NUMERIC => Ok(ConcreteDataType::uint64_datatype()),
302 &Type::VARCHAR | &Type::CHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
303 &Type::TIMESTAMP | &Type::TIMESTAMPTZ => Ok(ConcreteDataType::timestamp_datatype(
304 common_time::timestamp::TimeUnit::Millisecond,
305 )),
306 &Type::DATE => Ok(ConcreteDataType::date_datatype()),
307 &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
308 common_time::timestamp::TimeUnit::Microsecond,
309 )),
310 &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
311 ConcreteDataType::int16_datatype(),
312 ))),
313 &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
314 ConcreteDataType::int32_datatype(),
315 ))),
316 &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
317 ConcreteDataType::int64_datatype(),
318 ))),
319 &Type::NUMERIC_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
320 ConcreteDataType::uint64_datatype(),
321 ))),
322 &Type::VARCHAR_ARRAY | &Type::CHAR_ARRAY | &Type::TEXT_ARRAY => Ok(
323 ConcreteDataType::list_datatype(Arc::new(ConcreteDataType::string_datatype())),
324 ),
325 _ => server_error::InternalSnafu {
326 err_msg: format!("unimplemented datatype {origin:?}"),
327 }
328 .fail(),
329 }
330}
331
332pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
333 let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
334 error_info.detail = detail;
335 PgWireError::UserError(Box::new(error_info))
336}
337
338fn to_timestamp_scalar_value<T>(
339 data: Option<T>,
340 unit: &TimestampType,
341 ctype: &ConcreteDataType,
342) -> PgWireResult<ScalarValue>
343where
344 T: Into<i64>,
345{
346 if let Some(n) = data {
347 Value::Timestamp(unit.create_timestamp(n.into()))
348 .try_to_scalar_value(ctype)
349 .map_err(convert_err)
350 } else {
351 Ok(ScalarValue::Null)
352 }
353}
354
355fn to_decimal_scalar_value(data: Option<Decimal>, ctype: &Decimal128Type) -> ScalarValue {
356 if let Some(data) = data {
357 let mut value = data;
358 value.rescale(ctype.scale() as u32);
359
360 ScalarValue::Decimal128(Some(value.mantissa()), ctype.precision(), ctype.scale())
361 } else {
362 ScalarValue::Decimal128(None, ctype.precision(), ctype.scale())
363 }
364}
365
366fn numeric_out_of_range_error(value: impl std::fmt::Display) -> PgWireError {
367 invalid_parameter_error(
368 "numeric_value_out_of_range",
369 Some(format!("value {} is out of range for target type", value)),
370 )
371}
372
373pub(super) fn parameters_to_scalar_values(
374 plan: &LogicalPlan,
375 portal: &Portal<PgSqlPlan>,
376) -> PgWireResult<Vec<ScalarValue>> {
377 let param_count = portal.parameter_len();
378 let mut results = Vec::with_capacity(param_count);
379
380 let client_param_types = &portal.statement.parameter_types;
381 let server_param_types = DfLogicalPlanner::get_inferred_parameter_types(plan)
382 .context(InferParameterTypesSnafu)
383 .map_err(convert_err)?
384 .into_iter()
385 .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
386 .collect::<HashMap<_, _>>();
387
388 for idx in 0..param_count {
389 let server_type = server_param_types
390 .get(&format!("${}", idx + 1))
391 .and_then(|t| t.as_ref());
392
393 let client_type = if let Some(Some(client_given_type)) = client_param_types.get(idx) {
394 client_given_type.clone()
395 } else if let Some(server_provided_type) = &server_type {
396 type_gt_to_pg(server_provided_type).map_err(convert_err)?
397 } else {
398 return Err(invalid_parameter_error(
399 "unknown_parameter_type",
400 Some(format!(
401 "Cannot get parameter type information for parameter {}",
402 idx
403 )),
404 ));
405 };
406
407 let value = match &client_type {
408 &Type::VARCHAR | &Type::TEXT | &Type::CHAR => {
409 let data = portal.parameter::<String>(idx, &client_type)?;
410 if let Some(server_type) = &server_type {
411 match server_type {
412 ConcreteDataType::String(t) => {
413 if t.is_large() {
414 ScalarValue::LargeUtf8(data)
415 } else {
416 ScalarValue::Utf8(data)
417 }
418 }
419 _ => {
420 return Err(invalid_parameter_error(
421 "invalid_parameter_type",
422 Some(format!("Expected: {}, found: {}", server_type, client_type)),
423 ));
424 }
425 }
426 } else {
427 ScalarValue::Utf8(data)
428 }
429 }
430 &Type::BOOL => {
431 let data = portal.parameter::<bool>(idx, &client_type)?;
432 if let Some(server_type) = &server_type {
433 match server_type {
434 ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
435 _ => {
436 return Err(invalid_parameter_error(
437 "invalid_parameter_type",
438 Some(format!("Expected: {}, found: {}", server_type, client_type)),
439 ));
440 }
441 }
442 } else {
443 ScalarValue::Boolean(data)
444 }
445 }
446 &Type::INT2 => {
447 let data = portal.parameter::<i16>(idx, &client_type)?;
448 if let Some(server_type) = &server_type {
449 match server_type {
450 ConcreteDataType::Int8(_) => ScalarValue::Int8(
451 data.map(|n| n.to_i8().ok_or_else(|| numeric_out_of_range_error(n)))
452 .transpose()?,
453 ),
454 ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
455 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
456 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
457 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(
458 data.map(|n| n.to_u8().ok_or_else(|| numeric_out_of_range_error(n)))
459 .transpose()?,
460 ),
461 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(
462 data.map(|n| n.to_u16().ok_or_else(|| numeric_out_of_range_error(n)))
463 .transpose()?,
464 ),
465 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(
466 data.map(|n| n.to_u32().ok_or_else(|| numeric_out_of_range_error(n)))
467 .transpose()?,
468 ),
469 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(
470 data.map(|n| n.to_u64().ok_or_else(|| numeric_out_of_range_error(n)))
471 .transpose()?,
472 ),
473 ConcreteDataType::Timestamp(unit) => {
474 to_timestamp_scalar_value(data, unit, server_type)?
475 }
476 _ => {
477 return Err(invalid_parameter_error(
478 "invalid_parameter_type",
479 Some(format!("Expected: {}, found: {}", server_type, client_type)),
480 ));
481 }
482 }
483 } else {
484 ScalarValue::Int16(data)
485 }
486 }
487 &Type::INT4 => {
488 let data = portal.parameter::<i32>(idx, &client_type)?;
489 if let Some(server_type) = &server_type {
490 match server_type {
491 ConcreteDataType::Int8(_) => ScalarValue::Int8(
492 data.map(|n| n.to_i8().ok_or_else(|| numeric_out_of_range_error(n)))
493 .transpose()?,
494 ),
495 ConcreteDataType::Int16(_) => ScalarValue::Int16(
496 data.map(|n| n.to_i16().ok_or_else(|| numeric_out_of_range_error(n)))
497 .transpose()?,
498 ),
499 ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
500 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
501 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(
502 data.map(|n| n.to_u8().ok_or_else(|| numeric_out_of_range_error(n)))
503 .transpose()?,
504 ),
505 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(
506 data.map(|n| n.to_u16().ok_or_else(|| numeric_out_of_range_error(n)))
507 .transpose()?,
508 ),
509 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(
510 data.map(|n| n.to_u32().ok_or_else(|| numeric_out_of_range_error(n)))
511 .transpose()?,
512 ),
513 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(
514 data.map(|n| n.to_u64().ok_or_else(|| numeric_out_of_range_error(n)))
515 .transpose()?,
516 ),
517 ConcreteDataType::Timestamp(unit) => {
518 to_timestamp_scalar_value(data, unit, server_type)?
519 }
520 _ => {
521 return Err(invalid_parameter_error(
522 "invalid_parameter_type",
523 Some(format!("Expected: {}, found: {}", server_type, client_type)),
524 ));
525 }
526 }
527 } else {
528 ScalarValue::Int32(data)
529 }
530 }
531 &Type::INT8 => {
532 let data = portal.parameter::<i64>(idx, &client_type)?;
533 if let Some(server_type) = &server_type {
534 match server_type {
535 ConcreteDataType::Int8(_) => ScalarValue::Int8(
536 data.map(|n| n.to_i8().ok_or_else(|| numeric_out_of_range_error(n)))
537 .transpose()?,
538 ),
539 ConcreteDataType::Int16(_) => ScalarValue::Int16(
540 data.map(|n| n.to_i16().ok_or_else(|| numeric_out_of_range_error(n)))
541 .transpose()?,
542 ),
543 ConcreteDataType::Int32(_) => ScalarValue::Int32(
544 data.map(|n| n.to_i32().ok_or_else(|| numeric_out_of_range_error(n)))
545 .transpose()?,
546 ),
547 ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
548 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(
549 data.map(|n| n.to_u8().ok_or_else(|| numeric_out_of_range_error(n)))
550 .transpose()?,
551 ),
552 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(
553 data.map(|n| n.to_u16().ok_or_else(|| numeric_out_of_range_error(n)))
554 .transpose()?,
555 ),
556 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(
557 data.map(|n| n.to_u32().ok_or_else(|| numeric_out_of_range_error(n)))
558 .transpose()?,
559 ),
560 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(
561 data.map(|n| n.to_u64().ok_or_else(|| numeric_out_of_range_error(n)))
562 .transpose()?,
563 ),
564 ConcreteDataType::Timestamp(unit) => {
565 to_timestamp_scalar_value(data, unit, server_type)?
566 }
567 _ => {
568 return Err(invalid_parameter_error(
569 "invalid_parameter_type",
570 Some(format!("Expected: {}, found: {}", server_type, client_type)),
571 ));
572 }
573 }
574 } else {
575 ScalarValue::Int64(data)
576 }
577 }
578 &Type::NUMERIC => {
579 let data = portal.parameter::<Decimal>(idx, &client_type)?;
580 match &server_type {
581 Some(ConcreteDataType::Decimal128(dt)) => to_decimal_scalar_value(data, dt),
582 Some(st @ ConcreteDataType::Timestamp(unit)) => {
583 to_timestamp_scalar_value(data.and_then(|n| n.to_i64()), unit, st)?
584 }
585 Some(ConcreteDataType::UInt64(_)) | None => {
586 ScalarValue::UInt64(data.and_then(|n| n.to_u64()))
587 }
588 Some(st) => {
589 return Err(invalid_parameter_error(
590 "invalid_parameter_type",
591 Some(format!("Expected: {}, found: {}", st, client_type)),
592 ));
593 }
594 }
595 }
596 &Type::FLOAT4 => {
597 let data = portal.parameter::<f32>(idx, &client_type)?;
598 if let Some(server_type) = &server_type {
599 match server_type {
600 ConcreteDataType::Int8(_) => ScalarValue::Int8(
601 data.map(|n| n.to_i8().ok_or_else(|| numeric_out_of_range_error(n)))
602 .transpose()?,
603 ),
604 ConcreteDataType::Int16(_) => ScalarValue::Int16(
605 data.map(|n| n.to_i16().ok_or_else(|| numeric_out_of_range_error(n)))
606 .transpose()?,
607 ),
608 ConcreteDataType::Int32(_) => ScalarValue::Int32(
609 data.map(|n| n.to_i32().ok_or_else(|| numeric_out_of_range_error(n)))
610 .transpose()?,
611 ),
612 ConcreteDataType::Int64(_) => ScalarValue::Int64(
613 data.map(|n| n.to_i64().ok_or_else(|| numeric_out_of_range_error(n)))
614 .transpose()?,
615 ),
616 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(
617 data.map(|n| n.to_u8().ok_or_else(|| numeric_out_of_range_error(n)))
618 .transpose()?,
619 ),
620 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(
621 data.map(|n| n.to_u16().ok_or_else(|| numeric_out_of_range_error(n)))
622 .transpose()?,
623 ),
624 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(
625 data.map(|n| n.to_u32().ok_or_else(|| numeric_out_of_range_error(n)))
626 .transpose()?,
627 ),
628 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(
629 data.map(|n| n.to_u64().ok_or_else(|| numeric_out_of_range_error(n)))
630 .transpose()?,
631 ),
632 ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
633 ConcreteDataType::Float64(_) => {
634 ScalarValue::Float64(data.map(|n| n as f64))
635 }
636 _ => {
637 return Err(invalid_parameter_error(
638 "invalid_parameter_type",
639 Some(format!("Expected: {}, found: {}", server_type, client_type)),
640 ));
641 }
642 }
643 } else {
644 ScalarValue::Float32(data)
645 }
646 }
647 &Type::FLOAT8 => {
648 let data = portal.parameter::<f64>(idx, &client_type)?;
649 if let Some(server_type) = &server_type {
650 match server_type {
651 ConcreteDataType::Int8(_) => ScalarValue::Int8(
652 data.map(|n| n.to_i8().ok_or_else(|| numeric_out_of_range_error(n)))
653 .transpose()?,
654 ),
655 ConcreteDataType::Int16(_) => ScalarValue::Int16(
656 data.map(|n| n.to_i16().ok_or_else(|| numeric_out_of_range_error(n)))
657 .transpose()?,
658 ),
659 ConcreteDataType::Int32(_) => ScalarValue::Int32(
660 data.map(|n| n.to_i32().ok_or_else(|| numeric_out_of_range_error(n)))
661 .transpose()?,
662 ),
663 ConcreteDataType::Int64(_) => ScalarValue::Int64(
664 data.map(|n| n.to_i64().ok_or_else(|| numeric_out_of_range_error(n)))
665 .transpose()?,
666 ),
667 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(
668 data.map(|n| n.to_u8().ok_or_else(|| numeric_out_of_range_error(n)))
669 .transpose()?,
670 ),
671 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(
672 data.map(|n| n.to_u16().ok_or_else(|| numeric_out_of_range_error(n)))
673 .transpose()?,
674 ),
675 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(
676 data.map(|n| n.to_u32().ok_or_else(|| numeric_out_of_range_error(n)))
677 .transpose()?,
678 ),
679 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(
680 data.map(|n| n.to_u64().ok_or_else(|| numeric_out_of_range_error(n)))
681 .transpose()?,
682 ),
683 ConcreteDataType::Float32(_) => ScalarValue::Float32(
684 data.map(|n| n.to_f32().ok_or_else(|| numeric_out_of_range_error(n)))
685 .transpose()?,
686 ),
687 ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
688 _ => {
689 return Err(invalid_parameter_error(
690 "invalid_parameter_type",
691 Some(format!("Expected: {}, found: {}", server_type, client_type)),
692 ));
693 }
694 }
695 } else {
696 ScalarValue::Float64(data)
697 }
698 }
699 &Type::TIMESTAMP => {
700 let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
701 if let Some(server_type) = &server_type {
702 match server_type {
703 ConcreteDataType::Timestamp(unit) => match *unit {
704 TimestampType::Second(_) => ScalarValue::TimestampSecond(
705 data.map(|ts| ts.and_utc().timestamp()),
706 None,
707 ),
708 TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
709 data.map(|ts| ts.and_utc().timestamp_millis()),
710 None,
711 ),
712 TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
713 data.map(|ts| ts.and_utc().timestamp_micros()),
714 None,
715 ),
716 TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
717 data.and_then(|ts| ts.and_utc().timestamp_nanos_opt()),
718 None,
719 ),
720 },
721 _ => {
722 return Err(invalid_parameter_error(
723 "invalid_parameter_type",
724 Some(format!("Expected: {}, found: {}", server_type, client_type)),
725 ));
726 }
727 }
728 } else {
729 ScalarValue::TimestampMillisecond(
730 data.map(|ts| ts.and_utc().timestamp_millis()),
731 None,
732 )
733 }
734 }
735 &Type::TIMESTAMPTZ => {
736 let data = portal.parameter::<DateTime<FixedOffset>>(idx, &client_type)?;
737 if let Some(server_type) = &server_type {
738 match server_type {
739 ConcreteDataType::Timestamp(unit) => match *unit {
740 TimestampType::Second(_) => {
741 ScalarValue::TimestampSecond(data.map(|ts| ts.timestamp()), None)
742 }
743 TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
744 data.map(|ts| ts.timestamp_millis()),
745 None,
746 ),
747 TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
748 data.map(|ts| ts.timestamp_micros()),
749 None,
750 ),
751 TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
752 data.and_then(|ts| ts.timestamp_nanos_opt()),
753 None,
754 ),
755 },
756 _ => {
757 return Err(invalid_parameter_error(
758 "invalid_parameter_type",
759 Some(format!("Expected: {}, found: {}", server_type, client_type)),
760 ));
761 }
762 }
763 } else {
764 ScalarValue::TimestampMillisecond(data.map(|ts| ts.timestamp_millis()), None)
765 }
766 }
767 &Type::DATE => {
768 let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
769 if let Some(server_type) = &server_type {
770 match server_type {
771 ConcreteDataType::Date(_) => ScalarValue::Date32(
772 data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
773 ),
774 _ => {
775 return Err(invalid_parameter_error(
776 "invalid_parameter_type",
777 Some(format!("Expected: {}, found: {}", server_type, client_type)),
778 ));
779 }
780 }
781 } else {
782 ScalarValue::Date32(
783 data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
784 )
785 }
786 }
787 &Type::INTERVAL => {
788 let data = portal.parameter::<PgInterval>(idx, &client_type)?;
789 if let Some(server_type) = &server_type {
790 match server_type {
791 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
792 ScalarValue::IntervalYearMonth(
793 data.map(|i| {
794 if i.days != 0 || i.microseconds != 0 {
795 Err(invalid_parameter_error(
796 "invalid_parameter_type",
797 Some(format!(
798 "Expected: {}, found: {}",
799 server_type, client_type
800 )),
801 ))
802 } else {
803 Ok(IntervalYearMonth::new(i.months).to_i32())
804 }
805 })
806 .transpose()?,
807 )
808 }
809 ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
810 ScalarValue::IntervalDayTime(
811 data.map(|i| {
812 if i.months != 0 || i.microseconds % 1000 != 0 {
813 Err(invalid_parameter_error(
814 "invalid_parameter_type",
815 Some(format!(
816 "Expected: {}, found: {}",
817 server_type, client_type
818 )),
819 ))
820 } else {
821 Ok(IntervalDayTime::new(
822 i.days,
823 (i.microseconds / 1000) as i32,
824 )
825 .into())
826 }
827 })
828 .transpose()?,
829 )
830 }
831 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
832 ScalarValue::IntervalMonthDayNano(data.map(|i| {
833 IntervalMonthDayNano::new(
834 i.months,
835 i.days,
836 i.microseconds * 1_000i64,
837 )
838 .into()
839 }))
840 }
841 _ => {
842 return Err(invalid_parameter_error(
843 "invalid_parameter_type",
844 Some(format!("Expected: {}, found: {}", server_type, client_type)),
845 ));
846 }
847 }
848 } else {
849 ScalarValue::IntervalMonthDayNano(data.map(|i| {
850 IntervalMonthDayNano::new(i.months, i.days, i.microseconds * 1_000i64)
851 .into()
852 }))
853 }
854 }
855 &Type::BYTEA => {
856 let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
857 if let Some(server_type) = &server_type {
858 match server_type {
859 ConcreteDataType::String(t) => {
860 let s = data.map(|d| String::from_utf8_lossy(&d).to_string());
861 if t.is_large() {
862 ScalarValue::LargeUtf8(s)
863 } else {
864 ScalarValue::Utf8(s)
865 }
866 }
867 ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
868 _ => {
869 return Err(invalid_parameter_error(
870 "invalid_parameter_type",
871 Some(format!("Expected: {}, found: {}", server_type, client_type)),
872 ));
873 }
874 }
875 } else {
876 ScalarValue::Binary(data)
877 }
878 }
879 &Type::JSONB => {
880 let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
881 if let Some(server_type) = &server_type {
882 match server_type {
883 ConcreteDataType::Binary(_) => {
884 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
885 }
886 _ => {
887 return Err(invalid_parameter_error(
888 "invalid_parameter_type",
889 Some(format!("Expected: {}, found: {}", server_type, client_type)),
890 ));
891 }
892 }
893 } else {
894 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
895 }
896 }
897 &Type::INT2_ARRAY => {
898 let data = portal.parameter::<Vec<Option<i16>>>(idx, &client_type)?;
899 if let Some(data) = data {
900 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
901 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
902 } else {
903 ScalarValue::Null
904 }
905 }
906 &Type::INT4_ARRAY => {
907 let data = portal.parameter::<Vec<Option<i32>>>(idx, &client_type)?;
908 if let Some(data) = data {
909 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
910 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
911 } else {
912 ScalarValue::Null
913 }
914 }
915 &Type::INT8_ARRAY => {
916 let data = portal.parameter::<Vec<Option<i64>>>(idx, &client_type)?;
917 if let Some(data) = data {
918 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
919 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
920 } else {
921 ScalarValue::Null
922 }
923 }
924 &Type::NUMERIC_ARRAY => {
925 let data = portal.parameter::<Vec<Option<Decimal>>>(idx, &client_type)?;
926 if let Some(data) = data {
927 let build_u64_list = |data: Vec<Option<Decimal>>| {
928 let values = data
929 .into_iter()
930 .map(|n| ScalarValue::UInt64(n.and_then(|n| n.to_u64())))
931 .collect::<Vec<_>>();
932 ScalarValue::List(ScalarValue::new_list(
933 &values,
934 &ArrowDataType::UInt64,
935 true,
936 ))
937 };
938 if let Some(server_type) = &server_type {
939 match server_type {
940 ConcreteDataType::List(list_type) => match list_type.item_type() {
941 ConcreteDataType::UInt64(_) => build_u64_list(data),
942 ConcreteDataType::Decimal128(dt) => {
943 let values = data
944 .into_iter()
945 .map(|n| to_decimal_scalar_value(n, dt))
946 .collect::<Vec<_>>();
947 ScalarValue::List(ScalarValue::new_list(
948 &values,
949 &ArrowDataType::Decimal128(dt.precision(), dt.scale()),
950 true,
951 ))
952 }
953 _ => {
954 return Err(invalid_parameter_error(
956 "invalid_parameter_type",
957 Some(format!(
958 "Expected: {}, found: {}",
959 list_type.item_type(),
960 client_type
961 )),
962 ));
963 }
964 },
965 _ => {
966 return Err(invalid_parameter_error(
968 "invalid_parameter_type",
969 Some(format!(
970 "Expected: {}, found: {}",
971 server_type, client_type
972 )),
973 ));
974 }
975 }
976 } else {
977 build_u64_list(data)
979 }
980 } else {
981 ScalarValue::Null
982 }
983 }
984 &Type::VARCHAR_ARRAY | &Type::TEXT_ARRAY | &Type::CHAR_ARRAY => {
985 let data = portal.parameter::<Vec<Option<String>>>(idx, &client_type)?;
986 if let Some(data) = data {
987 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
988 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
989 } else {
990 ScalarValue::Null
991 }
992 }
993 &Type::TIMESTAMP_ARRAY => {
994 let data = portal.parameter::<Vec<Option<NaiveDateTime>>>(idx, &client_type)?;
995 if let Some(data) = data {
996 if let Some(ConcreteDataType::List(list_type)) = &server_type {
997 match list_type.item_type() {
998 ConcreteDataType::Timestamp(unit) => match *unit {
999 TimestampType::Second(_) => {
1000 let values = data
1001 .into_iter()
1002 .map(|ts| {
1003 ScalarValue::TimestampSecond(
1004 ts.map(|ts| ts.and_utc().timestamp()),
1005 None,
1006 )
1007 })
1008 .collect::<Vec<_>>();
1009 ScalarValue::List(ScalarValue::new_list(
1010 &values,
1011 &ArrowDataType::Timestamp(TimeUnit::Second, None),
1012 true,
1013 ))
1014 }
1015 TimestampType::Millisecond(_) => {
1016 let values = data
1017 .into_iter()
1018 .map(|ts| {
1019 ScalarValue::TimestampMillisecond(
1020 ts.map(|ts| ts.and_utc().timestamp_millis()),
1021 None,
1022 )
1023 })
1024 .collect::<Vec<_>>();
1025 ScalarValue::List(ScalarValue::new_list(
1026 &values,
1027 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1028 true,
1029 ))
1030 }
1031 TimestampType::Microsecond(_) => {
1032 let values = data
1033 .into_iter()
1034 .map(|ts| {
1035 ScalarValue::TimestampMicrosecond(
1036 ts.map(|ts| ts.and_utc().timestamp_micros()),
1037 None,
1038 )
1039 })
1040 .collect::<Vec<_>>();
1041 ScalarValue::List(ScalarValue::new_list(
1042 &values,
1043 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1044 true,
1045 ))
1046 }
1047 TimestampType::Nanosecond(_) => {
1048 let values = data
1049 .into_iter()
1050 .filter_map(|ts| {
1051 ts.and_then(|ts| {
1052 ts.and_utc().timestamp_nanos_opt().map(|nanos| {
1053 ScalarValue::TimestampNanosecond(
1054 Some(nanos),
1055 None,
1056 )
1057 })
1058 })
1059 })
1060 .collect::<Vec<_>>();
1061 ScalarValue::List(ScalarValue::new_list(
1062 &values,
1063 &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
1064 true,
1065 ))
1066 }
1067 },
1068 _ => {
1069 return Err(invalid_parameter_error(
1070 "invalid_parameter_type",
1071 Some(format!(
1072 "Expected: {}, found: {}",
1073 list_type.item_type(),
1074 client_type
1075 )),
1076 ));
1077 }
1078 }
1079 } else {
1080 let values = data
1081 .into_iter()
1082 .map(|ts| {
1083 ScalarValue::TimestampMillisecond(
1084 ts.map(|ts| ts.and_utc().timestamp_millis()),
1085 None,
1086 )
1087 })
1088 .collect::<Vec<_>>();
1089 ScalarValue::List(ScalarValue::new_list(
1090 &values,
1091 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1092 true,
1093 ))
1094 }
1095 } else {
1096 ScalarValue::Null
1097 }
1098 }
1099 &Type::TIMESTAMPTZ_ARRAY => {
1100 let data =
1101 portal.parameter::<Vec<Option<DateTime<FixedOffset>>>>(idx, &client_type)?;
1102 if let Some(data) = data {
1103 if let Some(ConcreteDataType::List(list_type)) = &server_type {
1104 match list_type.item_type() {
1105 ConcreteDataType::Timestamp(unit) => match *unit {
1106 TimestampType::Second(_) => {
1107 let values = data
1108 .into_iter()
1109 .map(|ts| {
1110 ScalarValue::TimestampSecond(
1111 ts.map(|ts| ts.timestamp()),
1112 None,
1113 )
1114 })
1115 .collect::<Vec<_>>();
1116 ScalarValue::List(ScalarValue::new_list(
1117 &values,
1118 &ArrowDataType::Timestamp(TimeUnit::Second, None),
1119 true,
1120 ))
1121 }
1122 TimestampType::Millisecond(_) => {
1123 let values = data
1124 .into_iter()
1125 .map(|ts| {
1126 ScalarValue::TimestampMillisecond(
1127 ts.map(|ts| ts.timestamp_millis()),
1128 None,
1129 )
1130 })
1131 .collect::<Vec<_>>();
1132 ScalarValue::List(ScalarValue::new_list(
1133 &values,
1134 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1135 true,
1136 ))
1137 }
1138 TimestampType::Microsecond(_) => {
1139 let values = data
1140 .into_iter()
1141 .map(|ts| {
1142 ScalarValue::TimestampMicrosecond(
1143 ts.map(|ts| ts.timestamp_micros()),
1144 None,
1145 )
1146 })
1147 .collect::<Vec<_>>();
1148 ScalarValue::List(ScalarValue::new_list(
1149 &values,
1150 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1151 true,
1152 ))
1153 }
1154 TimestampType::Nanosecond(_) => {
1155 let values = data
1156 .into_iter()
1157 .map(|ts| {
1158 ScalarValue::TimestampNanosecond(
1159 ts.and_then(|ts| ts.timestamp_nanos_opt()),
1160 None,
1161 )
1162 })
1163 .collect::<Vec<_>>();
1164 ScalarValue::List(ScalarValue::new_list(
1165 &values,
1166 &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
1167 true,
1168 ))
1169 }
1170 },
1171 _ => {
1172 return Err(invalid_parameter_error(
1173 "invalid_parameter_type",
1174 Some(format!(
1175 "Expected: {}, found: {}",
1176 list_type.item_type(),
1177 client_type
1178 )),
1179 ));
1180 }
1181 }
1182 } else {
1183 let values = data
1184 .into_iter()
1185 .map(|ts| {
1186 ScalarValue::TimestampMillisecond(
1187 ts.map(|ts| ts.timestamp_millis()),
1188 None,
1189 )
1190 })
1191 .collect::<Vec<_>>();
1192 ScalarValue::List(ScalarValue::new_list(
1193 &values,
1194 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1195 true,
1196 ))
1197 }
1198 } else {
1199 ScalarValue::Null
1200 }
1201 }
1202 _ => Err(invalid_parameter_error(
1203 "unsupported_parameter_value",
1204 Some(format!("Found type: {}", client_type)),
1205 ))?,
1206 };
1207
1208 results.push(value);
1209 }
1210
1211 Ok(results)
1212}
1213
1214pub(super) fn param_types_to_pg_types(
1215 param_types: &HashMap<String, Option<ConcreteDataType>>,
1216) -> Result<Vec<Type>> {
1217 let param_count = param_types.len();
1218 let mut types = Vec::with_capacity(param_count);
1219 for i in 0..param_count {
1220 if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1221 let pg_type = type_gt_to_pg(param_type)?;
1222 types.push(pg_type);
1223 } else {
1224 types.push(Type::UNKNOWN);
1225 }
1226 }
1227 Ok(types)
1228}
1229
1230pub fn format_options_from_query_ctx(query_ctx: &QueryContextRef) -> Arc<PgFormatOptions> {
1231 let config = query_ctx.configuration_parameter();
1232 let (date_style, date_order) = *config.pg_datetime_style();
1233
1234 let mut format_options = PgFormatOptions::default();
1235 format_options.date_style = format!("{}, {}", date_style, date_order);
1236 format_options.interval_style = config.pg_intervalstyle_format().to_string();
1237 format_options.bytea_output = config.postgres_bytea_output().to_string();
1238 format_options.time_zone = query_ctx.timezone().to_string();
1239
1240 Arc::new(format_options)
1241}
1242
1243#[cfg(test)]
1244mod test {
1245 use std::str::FromStr;
1246 use std::sync::Arc;
1247
1248 use arrow::array::{
1249 Float64Builder, Int64Builder, ListBuilder, StringBuilder, TimestampSecondBuilder,
1250 };
1251 use arrow_schema::{Field, IntervalUnit};
1252 use bytes::Bytes;
1253 use datafusion_expr::expr::Placeholder;
1254 use datafusion_expr::{Expr, LogicalPlanBuilder};
1255 use datatypes::schema::{ColumnSchema, Schema};
1256 use datatypes::vectors::{
1257 BinaryVector, BooleanVector, DateVector, Float32Vector, Float64Vector, Int8Vector,
1258 Int16Vector, Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
1259 IntervalYearMonthVector, ListVector, NullVector, StringVector, TimeSecondVector,
1260 TimestampSecondVector, UInt8Vector, UInt16Vector, UInt32Vector, UInt64Vector, VectorRef,
1261 };
1262 use futures::{StreamExt as FuturesStreamExt, stream};
1263 use pgwire::api::Type;
1264 use pgwire::api::portal::{Format, Portal};
1265 use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo};
1266 use pgwire::api::stmt::StoredStatement;
1267 use pgwire::messages::extendedquery::Bind;
1268 use session::context::QueryContextBuilder;
1269
1270 use super::*;
1271 use crate::SqlPlan;
1272 use crate::postgres::handler::PgSqlPlan;
1273
1274 #[test]
1275 fn test_schema_convert() {
1276 let column_schemas = vec![
1277 ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1278 ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1279 ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1280 ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1281 ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1282 ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1283 ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1284 ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1285 ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1286 ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1287 ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1288 ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1289 ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1290 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1291 ColumnSchema::new(
1292 "timestamps",
1293 ConcreteDataType::timestamp_millisecond_datatype(),
1294 true,
1295 ),
1296 ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1297 ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1298 ColumnSchema::new(
1299 "intervals",
1300 ConcreteDataType::interval_month_day_nano_datatype(),
1301 true,
1302 ),
1303 ];
1304 let pg_field_info = vec![
1305 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1306 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1307 FieldInfo::new("int8s".into(), None, None, Type::INT2, FieldFormat::Text),
1308 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1309 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1310 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1311 FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1312 FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1313 FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1314 FieldInfo::new(
1315 "uint64s".into(),
1316 None,
1317 None,
1318 Type::NUMERIC,
1319 FieldFormat::Text,
1320 ),
1321 FieldInfo::new(
1322 "float32s".into(),
1323 None,
1324 None,
1325 Type::FLOAT4,
1326 FieldFormat::Text,
1327 ),
1328 FieldInfo::new(
1329 "float64s".into(),
1330 None,
1331 None,
1332 Type::FLOAT8,
1333 FieldFormat::Text,
1334 ),
1335 FieldInfo::new(
1336 "binaries".into(),
1337 None,
1338 None,
1339 Type::BYTEA,
1340 FieldFormat::Text,
1341 ),
1342 FieldInfo::new(
1343 "strings".into(),
1344 None,
1345 None,
1346 Type::VARCHAR,
1347 FieldFormat::Text,
1348 ),
1349 FieldInfo::new(
1350 "timestamps".into(),
1351 None,
1352 None,
1353 Type::TIMESTAMP,
1354 FieldFormat::Text,
1355 ),
1356 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1357 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1358 FieldInfo::new(
1359 "intervals".into(),
1360 None,
1361 None,
1362 Type::INTERVAL,
1363 FieldFormat::Text,
1364 ),
1365 ];
1366 let schema = Schema::new(column_schemas);
1367 let fs = schema_to_pg(&schema, &Format::UnifiedText, None).unwrap();
1368 assert_eq!(fs, pg_field_info);
1369 }
1370
1371 #[test]
1372 fn test_encode_text_format_data() {
1373 let pg_schema = vec![
1374 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1375 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1376 FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1377 FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1378 FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1379 FieldInfo::new(
1380 "uint64s".into(),
1381 None,
1382 None,
1383 Type::NUMERIC,
1384 FieldFormat::Text,
1385 ),
1386 FieldInfo::new("int8s".into(), None, None, Type::INT2, FieldFormat::Text),
1387 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1388 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1389 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1390 FieldInfo::new(
1391 "float32s".into(),
1392 None,
1393 None,
1394 Type::FLOAT4,
1395 FieldFormat::Text,
1396 ),
1397 FieldInfo::new(
1398 "float64s".into(),
1399 None,
1400 None,
1401 Type::FLOAT8,
1402 FieldFormat::Text,
1403 ),
1404 FieldInfo::new(
1405 "strings".into(),
1406 None,
1407 None,
1408 Type::VARCHAR,
1409 FieldFormat::Text,
1410 ),
1411 FieldInfo::new(
1412 "binaries".into(),
1413 None,
1414 None,
1415 Type::BYTEA,
1416 FieldFormat::Text,
1417 ),
1418 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1419 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1420 FieldInfo::new(
1421 "timestamps".into(),
1422 None,
1423 None,
1424 Type::TIMESTAMP,
1425 FieldFormat::Text,
1426 ),
1427 FieldInfo::new(
1428 "interval_year_month".into(),
1429 None,
1430 None,
1431 Type::INTERVAL,
1432 FieldFormat::Text,
1433 ),
1434 FieldInfo::new(
1435 "interval_day_time".into(),
1436 None,
1437 None,
1438 Type::INTERVAL,
1439 FieldFormat::Text,
1440 ),
1441 FieldInfo::new(
1442 "interval_month_day_nano".into(),
1443 None,
1444 None,
1445 Type::INTERVAL,
1446 FieldFormat::Text,
1447 ),
1448 FieldInfo::new(
1449 "int_list".into(),
1450 None,
1451 None,
1452 Type::INT8_ARRAY,
1453 FieldFormat::Text,
1454 ),
1455 FieldInfo::new(
1456 "float_list".into(),
1457 None,
1458 None,
1459 Type::FLOAT8_ARRAY,
1460 FieldFormat::Text,
1461 ),
1462 FieldInfo::new(
1463 "string_list".into(),
1464 None,
1465 None,
1466 Type::VARCHAR_ARRAY,
1467 FieldFormat::Text,
1468 ),
1469 FieldInfo::new(
1470 "timestamp_list".into(),
1471 None,
1472 None,
1473 Type::TIMESTAMP_ARRAY,
1474 FieldFormat::Text,
1475 ),
1476 ];
1477
1478 let arrow_schema = arrow_schema::Schema::new(vec![
1479 Field::new("x", DataType::Null, true),
1480 Field::new("x", DataType::Boolean, true),
1481 Field::new("x", DataType::UInt8, true),
1482 Field::new("x", DataType::UInt16, true),
1483 Field::new("x", DataType::UInt32, true),
1484 Field::new("x", DataType::UInt64, true),
1485 Field::new("x", DataType::Int8, true),
1486 Field::new("x", DataType::Int16, true),
1487 Field::new("x", DataType::Int32, true),
1488 Field::new("x", DataType::Int64, true),
1489 Field::new("x", DataType::Float32, true),
1490 Field::new("x", DataType::Float64, true),
1491 Field::new("x", DataType::Utf8, true),
1492 Field::new("x", DataType::Binary, true),
1493 Field::new("x", DataType::Date32, true),
1494 Field::new("x", DataType::Time32(TimeUnit::Second), true),
1495 Field::new("x", DataType::Timestamp(TimeUnit::Second, None), true),
1496 Field::new("x", DataType::Interval(IntervalUnit::YearMonth), true),
1497 Field::new("x", DataType::Interval(IntervalUnit::DayTime), true),
1498 Field::new("x", DataType::Interval(IntervalUnit::MonthDayNano), true),
1499 Field::new(
1500 "x",
1501 DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
1502 true,
1503 ),
1504 Field::new(
1505 "x",
1506 DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
1507 true,
1508 ),
1509 Field::new(
1510 "x",
1511 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
1512 true,
1513 ),
1514 Field::new(
1515 "x",
1516 DataType::List(Arc::new(Field::new(
1517 "item",
1518 DataType::Timestamp(TimeUnit::Second, None),
1519 true,
1520 ))),
1521 true,
1522 ),
1523 ]);
1524
1525 let mut builder = ListBuilder::new(Int64Builder::new());
1526 builder.append_value([Some(1i64), None, Some(2)]);
1527 builder.append_null();
1528 builder.append_value([Some(-1i64), None, Some(-2)]);
1529 let i64_list_array = builder.finish();
1530
1531 let mut builder = ListBuilder::new(Float64Builder::new());
1532 builder.append_value([Some(1.0f64), None, Some(2.0)]);
1533 builder.append_null();
1534 builder.append_value([Some(-1.0f64), None, Some(-2.0)]);
1535 let f64_list_array = builder.finish();
1536
1537 let mut builder = ListBuilder::new(StringBuilder::new());
1538 builder.append_value([Some("a"), None, Some("b")]);
1539 builder.append_null();
1540 builder.append_value([Some("c"), None, Some("d")]);
1541 let string_list_array = builder.finish();
1542
1543 let mut builder = ListBuilder::new(TimestampSecondBuilder::new());
1544 builder.append_value([Some(1i64), None, Some(2)]);
1545 builder.append_null();
1546 builder.append_value([Some(3i64), None, Some(4)]);
1547 let timestamp_list_array = builder.finish();
1548
1549 let values = vec![
1550 Arc::new(NullVector::new(3)) as VectorRef,
1551 Arc::new(BooleanVector::from(vec![Some(true), Some(false), None])),
1552 Arc::new(UInt8Vector::from(vec![Some(u8::MAX), Some(u8::MIN), None])),
1553 Arc::new(UInt16Vector::from(vec![
1554 Some(u16::MAX),
1555 Some(u16::MIN),
1556 None,
1557 ])),
1558 Arc::new(UInt32Vector::from(vec![
1559 Some(u32::MAX),
1560 Some(u32::MIN),
1561 None,
1562 ])),
1563 Arc::new(UInt64Vector::from(vec![
1564 Some(u64::MAX),
1565 Some(u64::MIN),
1566 None,
1567 ])),
1568 Arc::new(Int8Vector::from(vec![Some(i8::MAX), Some(i8::MIN), None])),
1569 Arc::new(Int16Vector::from(vec![
1570 Some(i16::MAX),
1571 Some(i16::MIN),
1572 None,
1573 ])),
1574 Arc::new(Int32Vector::from(vec![
1575 Some(i32::MAX),
1576 Some(i32::MIN),
1577 None,
1578 ])),
1579 Arc::new(Int64Vector::from(vec![
1580 Some(i64::MAX),
1581 Some(i64::MIN),
1582 None,
1583 ])),
1584 Arc::new(Float32Vector::from(vec![
1585 None,
1586 Some(f32::MAX),
1587 Some(f32::MIN),
1588 ])),
1589 Arc::new(Float64Vector::from(vec![
1590 None,
1591 Some(f64::MAX),
1592 Some(f64::MIN),
1593 ])),
1594 Arc::new(StringVector::from(vec![
1595 None,
1596 Some("hello"),
1597 Some("greptime"),
1598 ])),
1599 Arc::new(BinaryVector::from(vec![
1600 None,
1601 Some("hello".as_bytes().to_vec()),
1602 Some("world".as_bytes().to_vec()),
1603 ])),
1604 Arc::new(DateVector::from(vec![Some(1001), None, Some(1)])),
1605 Arc::new(TimeSecondVector::from(vec![Some(1001), None, Some(1)])),
1606 Arc::new(TimestampSecondVector::from(vec![
1607 Some(1000001),
1608 None,
1609 Some(1),
1610 ])),
1611 Arc::new(IntervalYearMonthVector::from(vec![Some(1), None, Some(2)])),
1612 Arc::new(IntervalDayTimeVector::from(vec![
1613 Some(arrow::datatypes::IntervalDayTime::new(1, 1)),
1614 None,
1615 Some(arrow::datatypes::IntervalDayTime::new(2, 2)),
1616 ])),
1617 Arc::new(IntervalMonthDayNanoVector::from(vec![
1618 Some(arrow::datatypes::IntervalMonthDayNano::new(1, 1, 10)),
1619 None,
1620 Some(arrow::datatypes::IntervalMonthDayNano::new(2, 2, 20)),
1621 ])),
1622 Arc::new(ListVector::from(i64_list_array)),
1623 Arc::new(ListVector::from(f64_list_array)),
1624 Arc::new(ListVector::from(string_list_array)),
1625 Arc::new(ListVector::from(timestamp_list_array)),
1626 ];
1627 let record_batch =
1628 RecordBatch::new(Arc::new(arrow_schema.try_into().unwrap()), values).unwrap();
1629
1630 let query_context = QueryContextBuilder::default()
1631 .configuration_parameter(Default::default())
1632 .build()
1633 .into();
1634 let schema = record_batch.schema.clone();
1635 let pg_schema_ref = Arc::new(pg_schema);
1636
1637 let encoder = DataRowEncoder::new(pg_schema_ref.clone());
1638
1639 let row_stream = RecordBatchRowStream::new(
1640 query_context,
1641 pg_schema_ref.clone(),
1642 schema,
1643 stream::once(async { Ok(record_batch) }),
1644 encoder,
1645 );
1646
1647 let rows: Vec<_> = futures::executor::block_on(
1648 row_stream
1649 .filter_map(|x: PgWireResult<_>| async move { x.ok() })
1650 .flat_map(stream::iter)
1651 .collect::<Vec<_>>(),
1652 );
1653 assert_eq!(rows.len(), 3);
1654 for row in rows {
1655 assert_eq!(row.field_count, pg_schema_ref.len() as i16);
1656 }
1657 }
1658
1659 #[test]
1660 fn test_invalid_parameter() {
1661 let msg = "invalid_parameter_count";
1663 let error = invalid_parameter_error(msg, None);
1664 if let PgWireError::UserError(value) = error {
1665 assert_eq!("ERROR", value.severity);
1666 assert_eq!("22023", value.code);
1667 assert_eq!(msg, value.message);
1668 } else {
1669 panic!("test_invalid_parameter failed");
1670 }
1671 }
1672
1673 #[test]
1674 fn test_to_decimal_scalar_value() {
1675 let dt = Decimal128Type::new(18, 4);
1676
1677 let d = Decimal::from_str("12345.6789").unwrap();
1678 assert_eq!(d.mantissa(), 123456789i128);
1679 let scalar = to_decimal_scalar_value(Some(d), &dt);
1680 assert_eq!(scalar, ScalarValue::Decimal128(Some(123456789), 18, 4));
1681
1682 let d = Decimal::from_str("100.5").unwrap();
1683 assert_eq!(d.mantissa(), 1005);
1684 let scalar = to_decimal_scalar_value(Some(d), &dt);
1685 assert_eq!(scalar, ScalarValue::Decimal128(Some(1005000), 18, 4));
1686
1687 let d = Decimal::from_str("-9876.5432").unwrap();
1688 let scalar = to_decimal_scalar_value(Some(d), &dt);
1689 assert_eq!(scalar, ScalarValue::Decimal128(Some(-98765432), 18, 4));
1690
1691 let scalar = to_decimal_scalar_value(None, &dt);
1692 assert_eq!(scalar, ScalarValue::Decimal128(None, 18, 4));
1693 }
1694
1695 fn s(v: &str) -> Option<String> {
1696 Some(v.to_string())
1697 }
1698
1699 fn typed_param(id: &str, dt: DataType) -> Expr {
1700 Expr::Placeholder(Placeholder::new_with_field(
1701 id.to_string(),
1702 Some(Arc::new(arrow_schema::Field::new(id, dt, true))),
1703 ))
1704 }
1705
1706 fn build_plan_with_params(params: Vec<(&str, DataType)>) -> LogicalPlan {
1707 let exprs: Vec<Expr> = params
1708 .into_iter()
1709 .map(|(id, dt)| typed_param(id, dt))
1710 .collect();
1711 LogicalPlanBuilder::empty(true)
1712 .project(exprs)
1713 .unwrap()
1714 .build()
1715 .unwrap()
1716 }
1717
1718 fn make_portal(
1719 client_param_types: Vec<Option<Type>>,
1720 param_data: Vec<Option<String>>,
1721 ) -> Portal<PgSqlPlan> {
1722 let bind = Bind::new(
1723 None,
1724 None,
1725 vec![],
1726 param_data
1727 .into_iter()
1728 .map(|opt| opt.map(Bytes::from))
1729 .collect(),
1730 vec![],
1731 );
1732 let statement = Arc::new(StoredStatement::new(
1733 String::new(),
1734 PgSqlPlan {
1735 plan: SqlPlan::Empty,
1736 copy_to_stdout_format: None,
1737 },
1738 client_param_types,
1739 ));
1740 Portal::try_new(&bind, statement).unwrap()
1741 }
1742
1743 #[test]
1744 fn test_int2_coerce_in_range() {
1745 let plan = build_plan_with_params(vec![
1746 ("$1", DataType::Int8),
1747 ("$2", DataType::Int16),
1748 ("$3", DataType::Int32),
1749 ("$4", DataType::Int64),
1750 ("$5", DataType::UInt8),
1751 ("$6", DataType::UInt16),
1752 ("$7", DataType::UInt32),
1753 ("$8", DataType::UInt64),
1754 ]);
1755 let portal = make_portal(
1756 vec![
1757 Some(Type::INT2),
1758 Some(Type::INT2),
1759 Some(Type::INT2),
1760 Some(Type::INT2),
1761 Some(Type::INT2),
1762 Some(Type::INT2),
1763 Some(Type::INT2),
1764 Some(Type::INT2),
1765 ],
1766 vec![
1767 s("100"),
1768 s("100"),
1769 s("100"),
1770 s("100"),
1771 s("100"),
1772 s("100"),
1773 s("100"),
1774 s("100"),
1775 ],
1776 );
1777
1778 let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1779 assert_eq!(values[0], ScalarValue::Int8(Some(100)));
1780 assert_eq!(values[1], ScalarValue::Int16(Some(100)));
1781 assert_eq!(values[2], ScalarValue::Int32(Some(100)));
1782 assert_eq!(values[3], ScalarValue::Int64(Some(100)));
1783 assert_eq!(values[4], ScalarValue::UInt8(Some(100)));
1784 assert_eq!(values[5], ScalarValue::UInt16(Some(100)));
1785 assert_eq!(values[6], ScalarValue::UInt32(Some(100)));
1786 assert_eq!(values[7], ScalarValue::UInt64(Some(100)));
1787 }
1788
1789 #[test]
1790 fn test_int2_coerce_out_of_range() {
1791 let plan = build_plan_with_params(vec![("$1", DataType::Int8)]);
1792 let portal = make_portal(vec![Some(Type::INT2)], vec![s("200")]);
1793 let result = parameters_to_scalar_values(&plan, &portal);
1794 assert!(result.is_err());
1795 }
1796
1797 #[test]
1798 fn test_int2_coerce_negative_to_unsigned_out_of_range() {
1799 let plan = build_plan_with_params(vec![("$1", DataType::UInt64)]);
1800 let portal = make_portal(vec![Some(Type::INT2)], vec![s("-1")]);
1801 let result = parameters_to_scalar_values(&plan, &portal);
1802 assert!(result.is_err());
1803 }
1804
1805 #[test]
1806 fn test_int4_coerce_in_range() {
1807 let plan = build_plan_with_params(vec![
1808 ("$1", DataType::Int8),
1809 ("$2", DataType::Int16),
1810 ("$3", DataType::Int32),
1811 ("$4", DataType::Int64),
1812 ("$5", DataType::UInt8),
1813 ("$6", DataType::UInt16),
1814 ("$7", DataType::UInt32),
1815 ("$8", DataType::UInt64),
1816 ]);
1817 let portal = make_portal(
1818 vec![
1819 Some(Type::INT4),
1820 Some(Type::INT4),
1821 Some(Type::INT4),
1822 Some(Type::INT4),
1823 Some(Type::INT4),
1824 Some(Type::INT4),
1825 Some(Type::INT4),
1826 Some(Type::INT4),
1827 ],
1828 vec![
1829 s("100"),
1830 s("1000"),
1831 s("100000"),
1832 s("100000"),
1833 s("200"),
1834 s("1000"),
1835 s("100000"),
1836 s("100000"),
1837 ],
1838 );
1839
1840 let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1841 assert_eq!(values[0], ScalarValue::Int8(Some(100)));
1842 assert_eq!(values[1], ScalarValue::Int16(Some(1000)));
1843 assert_eq!(values[2], ScalarValue::Int32(Some(100000)));
1844 assert_eq!(values[3], ScalarValue::Int64(Some(100000)));
1845 assert_eq!(values[4], ScalarValue::UInt8(Some(200)));
1846 assert_eq!(values[5], ScalarValue::UInt16(Some(1000)));
1847 assert_eq!(values[6], ScalarValue::UInt32(Some(100000)));
1848 assert_eq!(values[7], ScalarValue::UInt64(Some(100000)));
1849 }
1850
1851 #[test]
1852 fn test_int4_coerce_out_of_range() {
1853 let plan = build_plan_with_params(vec![("$1", DataType::Int8)]);
1854 let portal = make_portal(vec![Some(Type::INT4)], vec![s("200")]);
1855 let result = parameters_to_scalar_values(&plan, &portal);
1856 assert!(result.is_err());
1857 }
1858
1859 #[test]
1860 fn test_int4_coerce_i32_max_to_i16_out_of_range() {
1861 let plan = build_plan_with_params(vec![("$1", DataType::Int16)]);
1862 let portal = make_portal(vec![Some(Type::INT4)], vec![Some(i32::MAX.to_string())]);
1863 let result = parameters_to_scalar_values(&plan, &portal);
1864 assert!(result.is_err());
1865 }
1866
1867 #[test]
1868 fn test_int8_coerce_in_range() {
1869 let plan = build_plan_with_params(vec![
1870 ("$1", DataType::Int8),
1871 ("$2", DataType::Int16),
1872 ("$3", DataType::Int32),
1873 ("$4", DataType::Int64),
1874 ("$5", DataType::UInt8),
1875 ("$6", DataType::UInt16),
1876 ("$7", DataType::UInt32),
1877 ("$8", DataType::UInt64),
1878 ]);
1879 let portal = make_portal(
1880 vec![
1881 Some(Type::INT8),
1882 Some(Type::INT8),
1883 Some(Type::INT8),
1884 Some(Type::INT8),
1885 Some(Type::INT8),
1886 Some(Type::INT8),
1887 Some(Type::INT8),
1888 Some(Type::INT8),
1889 ],
1890 vec![
1891 s("100"),
1892 s("1000"),
1893 s("100000"),
1894 s("100000"),
1895 s("200"),
1896 s("1000"),
1897 s("3000000000"),
1898 s("3000000000"),
1899 ],
1900 );
1901
1902 let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1903 assert_eq!(values[0], ScalarValue::Int8(Some(100)));
1904 assert_eq!(values[1], ScalarValue::Int16(Some(1000)));
1905 assert_eq!(values[2], ScalarValue::Int32(Some(100000)));
1906 assert_eq!(values[3], ScalarValue::Int64(Some(100000)));
1907 assert_eq!(values[4], ScalarValue::UInt8(Some(200)));
1908 assert_eq!(values[5], ScalarValue::UInt16(Some(1000)));
1909 assert_eq!(values[6], ScalarValue::UInt32(Some(3000000000)));
1910 assert_eq!(values[7], ScalarValue::UInt64(Some(3000000000)));
1911 }
1912
1913 #[test]
1914 fn test_int8_coerce_out_of_range() {
1915 let plan = build_plan_with_params(vec![("$1", DataType::Int32)]);
1916 let portal = make_portal(
1917 vec![Some(Type::INT8)],
1918 vec![Some((i32::MAX as i64 + 1).to_string())],
1919 );
1920 let result = parameters_to_scalar_values(&plan, &portal);
1921 assert!(result.is_err());
1922 }
1923
1924 #[test]
1925 fn test_int8_coerce_negative_to_unsigned_out_of_range() {
1926 let plan = build_plan_with_params(vec![("$1", DataType::UInt64)]);
1927 let portal = make_portal(vec![Some(Type::INT8)], vec![s("-1")]);
1928 let result = parameters_to_scalar_values(&plan, &portal);
1929 assert!(result.is_err());
1930 }
1931
1932 #[test]
1933 fn test_float4_coerce_in_range() {
1934 let plan =
1935 build_plan_with_params(vec![("$1", DataType::Float32), ("$2", DataType::Float64)]);
1936 let portal = make_portal(
1937 vec![Some(Type::FLOAT4), Some(Type::FLOAT4)],
1938 vec![s("1.5"), s("2.5")],
1939 );
1940
1941 let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1942 assert_eq!(values[0], ScalarValue::Float32(Some(1.5)));
1943 assert_eq!(values[1], ScalarValue::Float64(Some(2.5)));
1944 }
1945
1946 #[test]
1947 fn test_float4_coerce_to_int_in_range() {
1948 let plan = build_plan_with_params(vec![
1949 ("$1", DataType::Int8),
1950 ("$2", DataType::Int32),
1951 ("$3", DataType::UInt64),
1952 ]);
1953 let portal = make_portal(
1954 vec![Some(Type::FLOAT4), Some(Type::FLOAT4), Some(Type::FLOAT4)],
1955 vec![s("100"), s("1000"), s("200")],
1956 );
1957
1958 let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1959 assert_eq!(values[0], ScalarValue::Int8(Some(100)));
1960 assert_eq!(values[1], ScalarValue::Int32(Some(1000)));
1961 assert_eq!(values[2], ScalarValue::UInt64(Some(200)));
1962 }
1963
1964 #[test]
1965 fn test_float4_coerce_to_int_out_of_range() {
1966 let plan = build_plan_with_params(vec![("$1", DataType::Int8)]);
1967 let portal = make_portal(vec![Some(Type::FLOAT4)], vec![s("200")]);
1968 let result = parameters_to_scalar_values(&plan, &portal);
1969 assert!(result.is_err());
1970 }
1971
1972 #[test]
1973 fn test_float8_coerce_in_range() {
1974 let plan =
1975 build_plan_with_params(vec![("$1", DataType::Float32), ("$2", DataType::Float64)]);
1976 let portal = make_portal(
1977 vec![Some(Type::FLOAT8), Some(Type::FLOAT8)],
1978 vec![s("1.5"), s("2.5")],
1979 );
1980
1981 let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1982 assert_eq!(values[0], ScalarValue::Float32(Some(1.5)));
1983 assert_eq!(values[1], ScalarValue::Float64(Some(2.5)));
1984 }
1985
1986 #[test]
1987 fn test_float8_coerce_to_int_in_range() {
1988 let plan = build_plan_with_params(vec![
1989 ("$1", DataType::Int8),
1990 ("$2", DataType::Int64),
1991 ("$3", DataType::UInt64),
1992 ]);
1993 let portal = make_portal(
1994 vec![Some(Type::FLOAT8), Some(Type::FLOAT8), Some(Type::FLOAT8)],
1995 vec![s("100"), s("1000000"), s("200")],
1996 );
1997
1998 let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1999 assert_eq!(values[0], ScalarValue::Int8(Some(100)));
2000 assert_eq!(values[1], ScalarValue::Int64(Some(1000000)));
2001 assert_eq!(values[2], ScalarValue::UInt64(Some(200)));
2002 }
2003
2004 #[test]
2005 fn test_float8_coerce_to_int_out_of_range() {
2006 let plan = build_plan_with_params(vec![("$1", DataType::Int8)]);
2007 let portal = make_portal(vec![Some(Type::FLOAT8)], vec![s("200")]);
2008 let result = parameters_to_scalar_values(&plan, &portal);
2009 assert!(result.is_err());
2010 }
2011
2012 #[test]
2013 fn test_float8_coerce_negative_to_unsigned_out_of_range() {
2014 let plan = build_plan_with_params(vec![("$1", DataType::UInt64)]);
2015 let portal = make_portal(vec![Some(Type::FLOAT8)], vec![s("-1")]);
2016 let result = parameters_to_scalar_values(&plan, &portal);
2017 assert!(result.is_err());
2018 }
2019
2020 #[test]
2021 fn test_null_parameter() {
2022 let plan = build_plan_with_params(vec![("$1", DataType::Int8)]);
2023 let portal = make_portal(vec![Some(Type::INT2)], vec![None]);
2024
2025 let values = parameters_to_scalar_values(&plan, &portal).unwrap();
2026 assert_eq!(values[0], ScalarValue::Int8(None));
2027 }
2028}