Skip to main content

servers/postgres/
types.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod error;
16
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use arrow::array::{Array, AsArray};
23use arrow_pg::encoder::{Encoder, encode_value};
24use arrow_pg::list_encoder::encode_list;
25use arrow_schema::{DataType, TimeUnit};
26use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime};
27use common_recordbatch::RecordBatch;
28use common_recordbatch::error::Result as RecordBatchResult;
29use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::arrow::datatypes::DataType as ArrowDataType;
33use datatypes::json::JsonStructureSettings;
34use datatypes::prelude::{ConcreteDataType, Value};
35use datatypes::schema::{Schema, SchemaRef};
36use datatypes::types::{Decimal128Type, IntervalType, TimestampType, jsonb_to_string};
37use datatypes::value::StructValue;
38use futures::Stream;
39use pg_interval::Interval as PgInterval;
40use pgwire::api::Type;
41use pgwire::api::portal::{Format, Portal};
42use pgwire::api::results::FieldInfo;
43use pgwire::error::{PgWireError, PgWireResult};
44use pgwire::types::format::FormatOptions as PgFormatOptions;
45use query::planner::DfLogicalPlanner;
46use rust_decimal::Decimal;
47use rust_decimal::prelude::ToPrimitive;
48use session::context::QueryContextRef;
49use snafu::ResultExt;
50
51pub use self::error::{PgErrorCode, PgErrorSeverity};
52use crate::error::{self as server_error, InferParameterTypesSnafu, Result};
53use crate::postgres::handler::PgSqlPlan;
54use crate::postgres::utils::convert_err;
55
56pub(super) fn schema_to_pg(
57    origin: &Schema,
58    field_formats: &Format,
59    format_options: Option<Arc<PgFormatOptions>>,
60) -> Result<Vec<FieldInfo>> {
61    origin
62        .column_schemas()
63        .iter()
64        .enumerate()
65        .map(|(idx, col)| {
66            let mut field_info = FieldInfo::new(
67                col.name.clone(),
68                None,
69                None,
70                type_gt_to_pg(&col.data_type)?,
71                field_formats.format_for(idx),
72            );
73            if let Some(format_options) = &format_options {
74                field_info = field_info.with_format_options(format_options.clone());
75            }
76            Ok(field_info)
77        })
78        .collect::<Result<Vec<FieldInfo>>>()
79}
80
81/// this function will encode greptime's `StructValue` into PostgreSQL jsonb type
82///
83/// Note that greptimedb has different types of StructValue for storing json data,
84/// based on policy defined in `JsonStructureSettings`. But here the `StructValue`
85/// should be fully structured.
86///
87/// there are alternatives like records, arrays, etc. but there are also limitations:
88/// records: there is no support for include keys
89/// arrays: element in array must be the same type
90fn encode_struct<S: Encoder>(
91    _query_ctx: &QueryContextRef,
92    struct_value: StructValue,
93    builder: &mut S,
94    pg_field: &FieldInfo,
95) -> PgWireResult<()> {
96    let encoding_setting = JsonStructureSettings::Structured(None);
97    let json_value = encoding_setting
98        .decode(Value::Struct(struct_value))
99        .map_err(|e| PgWireError::ApiError(Box::new(e)))?;
100
101    builder.encode_field(&json_value, pg_field)
102}
103
104pub(crate) struct RecordBatchRowStream<S, B>
105where
106    S: Encoder,
107    B: Stream<Item = RecordBatchResult<RecordBatch>>,
108{
109    query_ctx: QueryContextRef,
110    pg_schema: Arc<Vec<FieldInfo>>,
111    schema: SchemaRef,
112    record_batches: Pin<Box<B>>,
113    encoder: S,
114}
115
116impl<S, B> Stream for RecordBatchRowStream<S, B>
117where
118    S: Encoder + Unpin,
119    B: Stream<Item = RecordBatchResult<RecordBatch>>,
120{
121    type Item = PgWireResult<Vec<S::Item>>;
122
123    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
124        match self.record_batches.as_mut().poll_next(cx) {
125            Poll::Ready(Some(Ok(batch))) => {
126                let record_batch = batch.into_df_record_batch();
127                let num_rows = record_batch.num_rows();
128
129                if num_rows == 0 {
130                    return Poll::Ready(Some(Ok(vec![])));
131                }
132
133                let arrow_schema = record_batch.schema();
134                let query_ctx = self.query_ctx.clone();
135                let pg_schema = self.pg_schema.clone();
136                let schema = self.schema.clone();
137                let mut results = Vec::with_capacity(num_rows);
138
139                for i in 0..num_rows {
140                    if let Err(e) = Self::encode_row(
141                        &query_ctx,
142                        &pg_schema,
143                        &schema,
144                        arrow_schema.as_ref(),
145                        &mut self.encoder,
146                        &record_batch,
147                        i,
148                    ) {
149                        return Poll::Ready(Some(Err(e)));
150                    }
151                    results.push(self.encoder.take_row());
152                }
153
154                Poll::Ready(Some(Ok(results)))
155            }
156            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(convert_err(e)))),
157            Poll::Ready(None) => Poll::Ready(None),
158            Poll::Pending => Poll::Pending,
159        }
160    }
161}
162
163impl<S, B> RecordBatchRowStream<S, B>
164where
165    S: Encoder,
166    B: Stream<Item = RecordBatchResult<RecordBatch>>,
167{
168    pub(crate) fn new(
169        query_ctx: QueryContextRef,
170        pg_schema: Arc<Vec<FieldInfo>>,
171        schema: SchemaRef,
172        record_batches: B,
173        encoder: S,
174    ) -> Self {
175        Self {
176            query_ctx,
177            pg_schema,
178            schema,
179            record_batches: Box::pin(record_batches),
180            encoder,
181        }
182    }
183
184    fn encode_row(
185        query_ctx: &QueryContextRef,
186        pg_schema: &Arc<Vec<FieldInfo>>,
187        schema: &SchemaRef,
188        arrow_schema: &arrow::datatypes::Schema,
189        encoder: &mut S,
190        record_batch: &arrow::record_batch::RecordBatch,
191        i: usize,
192    ) -> PgWireResult<()> {
193        for (j, column) in record_batch.columns().iter().enumerate() {
194            let pg_field = &pg_schema[j];
195
196            if column.is_null(i) {
197                encoder.encode_field(&None::<&i8>, pg_field)?;
198                continue;
199            }
200
201            match column.data_type() {
202                // these types are greptimedb specific or custom
203                DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
204                    // jsonb
205                    if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
206                        let v = datatypes::arrow_array::binary_array_value(column, i);
207                        let s = jsonb_to_string(v).map_err(convert_err)?;
208                        encoder.encode_field(&s, pg_field)?;
209                    } else {
210                        // bytea
211                        let arrow_field = arrow_schema.field(j);
212                        encode_value(encoder, column, i, arrow_field, pg_field)?;
213                    }
214                }
215
216                DataType::List(_) => {
217                    let array = column.as_list::<i32>();
218                    let items = array.value(i);
219
220                    encode_list(encoder, items, pg_field)?;
221                }
222                DataType::Struct(_) => {
223                    encode_struct(query_ctx, Default::default(), encoder, pg_field)?;
224                }
225                _ => {
226                    // Encode value using arrow-pg
227                    let arrow_field = arrow_schema.field(j);
228                    encode_value(encoder, column, i, arrow_field, pg_field)?;
229                }
230            }
231        }
232        Ok(())
233    }
234}
235
236pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
237    match origin {
238        &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
239        &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
240        &ConcreteDataType::Int8(_) => Ok(Type::INT2),
241        &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2),
242        &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4),
243        &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8),
244        &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC),
245        &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
246        &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
247        &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
248        &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
249        &ConcreteDataType::Date(_) => Ok(Type::DATE),
250        &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
251        &ConcreteDataType::Time(_) => Ok(Type::TIME),
252        &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
253        &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
254        &ConcreteDataType::Json(_) => Ok(Type::JSON),
255        ConcreteDataType::List(list) => match list.item_type() {
256            &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
257            &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
258            &ConcreteDataType::Int8(_) => Ok(Type::INT2_ARRAY),
259            &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2_ARRAY),
260            &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4_ARRAY),
261            &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8_ARRAY),
262            &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC_ARRAY),
263            &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
264            &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
265            &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
266            &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
267            &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
268            &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
269            &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
270            &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
271            &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
272            &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
273            &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
274            &ConcreteDataType::Struct(_) => Ok(Type::JSON_ARRAY),
275            &ConcreteDataType::Dictionary(_)
276            | &ConcreteDataType::Vector(_)
277            | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
278                data_type: origin,
279                reason: "not implemented",
280            }
281            .fail(),
282        },
283        &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
284            data_type: origin,
285            reason: "not implemented",
286        }
287        .fail(),
288        &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
289        &ConcreteDataType::Struct(_) => Ok(Type::JSON),
290    }
291}
292
293#[allow(dead_code)]
294pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
295    // Note that we only support a small amount of pg data types
296    match origin {
297        &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
298        &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
299        &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
300        &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
301        &Type::NUMERIC => Ok(ConcreteDataType::uint64_datatype()),
302        &Type::VARCHAR | &Type::CHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
303        &Type::TIMESTAMP | &Type::TIMESTAMPTZ => Ok(ConcreteDataType::timestamp_datatype(
304            common_time::timestamp::TimeUnit::Millisecond,
305        )),
306        &Type::DATE => Ok(ConcreteDataType::date_datatype()),
307        &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
308            common_time::timestamp::TimeUnit::Microsecond,
309        )),
310        &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
311            ConcreteDataType::int16_datatype(),
312        ))),
313        &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
314            ConcreteDataType::int32_datatype(),
315        ))),
316        &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
317            ConcreteDataType::int64_datatype(),
318        ))),
319        &Type::NUMERIC_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
320            ConcreteDataType::uint64_datatype(),
321        ))),
322        &Type::VARCHAR_ARRAY | &Type::CHAR_ARRAY | &Type::TEXT_ARRAY => Ok(
323            ConcreteDataType::list_datatype(Arc::new(ConcreteDataType::string_datatype())),
324        ),
325        _ => server_error::InternalSnafu {
326            err_msg: format!("unimplemented datatype {origin:?}"),
327        }
328        .fail(),
329    }
330}
331
332pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
333    let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
334    error_info.detail = detail;
335    PgWireError::UserError(Box::new(error_info))
336}
337
338fn to_timestamp_scalar_value<T>(
339    data: Option<T>,
340    unit: &TimestampType,
341    ctype: &ConcreteDataType,
342) -> PgWireResult<ScalarValue>
343where
344    T: Into<i64>,
345{
346    if let Some(n) = data {
347        Value::Timestamp(unit.create_timestamp(n.into()))
348            .try_to_scalar_value(ctype)
349            .map_err(convert_err)
350    } else {
351        Ok(ScalarValue::Null)
352    }
353}
354
355fn to_decimal_scalar_value(data: Option<Decimal>, ctype: &Decimal128Type) -> ScalarValue {
356    if let Some(data) = data {
357        let mut value = data;
358        value.rescale(ctype.scale() as u32);
359
360        ScalarValue::Decimal128(Some(value.mantissa()), ctype.precision(), ctype.scale())
361    } else {
362        ScalarValue::Decimal128(None, ctype.precision(), ctype.scale())
363    }
364}
365
366fn numeric_out_of_range_error(value: impl std::fmt::Display) -> PgWireError {
367    invalid_parameter_error(
368        "numeric_value_out_of_range",
369        Some(format!("value {} is out of range for target type", value)),
370    )
371}
372
373pub(super) fn parameters_to_scalar_values(
374    plan: &LogicalPlan,
375    portal: &Portal<PgSqlPlan>,
376) -> PgWireResult<Vec<ScalarValue>> {
377    let param_count = portal.parameter_len();
378    let mut results = Vec::with_capacity(param_count);
379
380    let client_param_types = &portal.statement.parameter_types;
381    let server_param_types = DfLogicalPlanner::get_inferred_parameter_types(plan)
382        .context(InferParameterTypesSnafu)
383        .map_err(convert_err)?
384        .into_iter()
385        .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
386        .collect::<HashMap<_, _>>();
387
388    for idx in 0..param_count {
389        let server_type = server_param_types
390            .get(&format!("${}", idx + 1))
391            .and_then(|t| t.as_ref());
392
393        let client_type = if let Some(Some(client_given_type)) = client_param_types.get(idx) {
394            client_given_type.clone()
395        } else if let Some(server_provided_type) = &server_type {
396            type_gt_to_pg(server_provided_type).map_err(convert_err)?
397        } else {
398            return Err(invalid_parameter_error(
399                "unknown_parameter_type",
400                Some(format!(
401                    "Cannot get parameter type information for parameter {}",
402                    idx
403                )),
404            ));
405        };
406
407        let value = match &client_type {
408            &Type::VARCHAR | &Type::TEXT | &Type::CHAR => {
409                let data = portal.parameter::<String>(idx, &client_type)?;
410                if let Some(server_type) = &server_type {
411                    match server_type {
412                        ConcreteDataType::String(t) => {
413                            if t.is_large() {
414                                ScalarValue::LargeUtf8(data)
415                            } else {
416                                ScalarValue::Utf8(data)
417                            }
418                        }
419                        _ => {
420                            return Err(invalid_parameter_error(
421                                "invalid_parameter_type",
422                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
423                            ));
424                        }
425                    }
426                } else {
427                    ScalarValue::Utf8(data)
428                }
429            }
430            &Type::BOOL => {
431                let data = portal.parameter::<bool>(idx, &client_type)?;
432                if let Some(server_type) = &server_type {
433                    match server_type {
434                        ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
435                        _ => {
436                            return Err(invalid_parameter_error(
437                                "invalid_parameter_type",
438                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
439                            ));
440                        }
441                    }
442                } else {
443                    ScalarValue::Boolean(data)
444                }
445            }
446            &Type::INT2 => {
447                let data = portal.parameter::<i16>(idx, &client_type)?;
448                if let Some(server_type) = &server_type {
449                    match server_type {
450                        ConcreteDataType::Int8(_) => ScalarValue::Int8(
451                            data.map(|n| n.to_i8().ok_or_else(|| numeric_out_of_range_error(n)))
452                                .transpose()?,
453                        ),
454                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
455                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
456                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
457                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(
458                            data.map(|n| n.to_u8().ok_or_else(|| numeric_out_of_range_error(n)))
459                                .transpose()?,
460                        ),
461                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(
462                            data.map(|n| n.to_u16().ok_or_else(|| numeric_out_of_range_error(n)))
463                                .transpose()?,
464                        ),
465                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(
466                            data.map(|n| n.to_u32().ok_or_else(|| numeric_out_of_range_error(n)))
467                                .transpose()?,
468                        ),
469                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(
470                            data.map(|n| n.to_u64().ok_or_else(|| numeric_out_of_range_error(n)))
471                                .transpose()?,
472                        ),
473                        ConcreteDataType::Timestamp(unit) => {
474                            to_timestamp_scalar_value(data, unit, server_type)?
475                        }
476                        _ => {
477                            return Err(invalid_parameter_error(
478                                "invalid_parameter_type",
479                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
480                            ));
481                        }
482                    }
483                } else {
484                    ScalarValue::Int16(data)
485                }
486            }
487            &Type::INT4 => {
488                let data = portal.parameter::<i32>(idx, &client_type)?;
489                if let Some(server_type) = &server_type {
490                    match server_type {
491                        ConcreteDataType::Int8(_) => ScalarValue::Int8(
492                            data.map(|n| n.to_i8().ok_or_else(|| numeric_out_of_range_error(n)))
493                                .transpose()?,
494                        ),
495                        ConcreteDataType::Int16(_) => ScalarValue::Int16(
496                            data.map(|n| n.to_i16().ok_or_else(|| numeric_out_of_range_error(n)))
497                                .transpose()?,
498                        ),
499                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
500                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
501                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(
502                            data.map(|n| n.to_u8().ok_or_else(|| numeric_out_of_range_error(n)))
503                                .transpose()?,
504                        ),
505                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(
506                            data.map(|n| n.to_u16().ok_or_else(|| numeric_out_of_range_error(n)))
507                                .transpose()?,
508                        ),
509                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(
510                            data.map(|n| n.to_u32().ok_or_else(|| numeric_out_of_range_error(n)))
511                                .transpose()?,
512                        ),
513                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(
514                            data.map(|n| n.to_u64().ok_or_else(|| numeric_out_of_range_error(n)))
515                                .transpose()?,
516                        ),
517                        ConcreteDataType::Timestamp(unit) => {
518                            to_timestamp_scalar_value(data, unit, server_type)?
519                        }
520                        _ => {
521                            return Err(invalid_parameter_error(
522                                "invalid_parameter_type",
523                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
524                            ));
525                        }
526                    }
527                } else {
528                    ScalarValue::Int32(data)
529                }
530            }
531            &Type::INT8 => {
532                let data = portal.parameter::<i64>(idx, &client_type)?;
533                if let Some(server_type) = &server_type {
534                    match server_type {
535                        ConcreteDataType::Int8(_) => ScalarValue::Int8(
536                            data.map(|n| n.to_i8().ok_or_else(|| numeric_out_of_range_error(n)))
537                                .transpose()?,
538                        ),
539                        ConcreteDataType::Int16(_) => ScalarValue::Int16(
540                            data.map(|n| n.to_i16().ok_or_else(|| numeric_out_of_range_error(n)))
541                                .transpose()?,
542                        ),
543                        ConcreteDataType::Int32(_) => ScalarValue::Int32(
544                            data.map(|n| n.to_i32().ok_or_else(|| numeric_out_of_range_error(n)))
545                                .transpose()?,
546                        ),
547                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
548                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(
549                            data.map(|n| n.to_u8().ok_or_else(|| numeric_out_of_range_error(n)))
550                                .transpose()?,
551                        ),
552                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(
553                            data.map(|n| n.to_u16().ok_or_else(|| numeric_out_of_range_error(n)))
554                                .transpose()?,
555                        ),
556                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(
557                            data.map(|n| n.to_u32().ok_or_else(|| numeric_out_of_range_error(n)))
558                                .transpose()?,
559                        ),
560                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(
561                            data.map(|n| n.to_u64().ok_or_else(|| numeric_out_of_range_error(n)))
562                                .transpose()?,
563                        ),
564                        ConcreteDataType::Timestamp(unit) => {
565                            to_timestamp_scalar_value(data, unit, server_type)?
566                        }
567                        _ => {
568                            return Err(invalid_parameter_error(
569                                "invalid_parameter_type",
570                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
571                            ));
572                        }
573                    }
574                } else {
575                    ScalarValue::Int64(data)
576                }
577            }
578            &Type::NUMERIC => {
579                let data = portal.parameter::<Decimal>(idx, &client_type)?;
580                match &server_type {
581                    Some(ConcreteDataType::Decimal128(dt)) => to_decimal_scalar_value(data, dt),
582                    Some(st @ ConcreteDataType::Timestamp(unit)) => {
583                        to_timestamp_scalar_value(data.and_then(|n| n.to_i64()), unit, st)?
584                    }
585                    Some(ConcreteDataType::UInt64(_)) | None => {
586                        ScalarValue::UInt64(data.and_then(|n| n.to_u64()))
587                    }
588                    Some(st) => {
589                        return Err(invalid_parameter_error(
590                            "invalid_parameter_type",
591                            Some(format!("Expected: {}, found: {}", st, client_type)),
592                        ));
593                    }
594                }
595            }
596            &Type::FLOAT4 => {
597                let data = portal.parameter::<f32>(idx, &client_type)?;
598                if let Some(server_type) = &server_type {
599                    match server_type {
600                        ConcreteDataType::Int8(_) => ScalarValue::Int8(
601                            data.map(|n| n.to_i8().ok_or_else(|| numeric_out_of_range_error(n)))
602                                .transpose()?,
603                        ),
604                        ConcreteDataType::Int16(_) => ScalarValue::Int16(
605                            data.map(|n| n.to_i16().ok_or_else(|| numeric_out_of_range_error(n)))
606                                .transpose()?,
607                        ),
608                        ConcreteDataType::Int32(_) => ScalarValue::Int32(
609                            data.map(|n| n.to_i32().ok_or_else(|| numeric_out_of_range_error(n)))
610                                .transpose()?,
611                        ),
612                        ConcreteDataType::Int64(_) => ScalarValue::Int64(
613                            data.map(|n| n.to_i64().ok_or_else(|| numeric_out_of_range_error(n)))
614                                .transpose()?,
615                        ),
616                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(
617                            data.map(|n| n.to_u8().ok_or_else(|| numeric_out_of_range_error(n)))
618                                .transpose()?,
619                        ),
620                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(
621                            data.map(|n| n.to_u16().ok_or_else(|| numeric_out_of_range_error(n)))
622                                .transpose()?,
623                        ),
624                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(
625                            data.map(|n| n.to_u32().ok_or_else(|| numeric_out_of_range_error(n)))
626                                .transpose()?,
627                        ),
628                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(
629                            data.map(|n| n.to_u64().ok_or_else(|| numeric_out_of_range_error(n)))
630                                .transpose()?,
631                        ),
632                        ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
633                        ConcreteDataType::Float64(_) => {
634                            ScalarValue::Float64(data.map(|n| n as f64))
635                        }
636                        _ => {
637                            return Err(invalid_parameter_error(
638                                "invalid_parameter_type",
639                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
640                            ));
641                        }
642                    }
643                } else {
644                    ScalarValue::Float32(data)
645                }
646            }
647            &Type::FLOAT8 => {
648                let data = portal.parameter::<f64>(idx, &client_type)?;
649                if let Some(server_type) = &server_type {
650                    match server_type {
651                        ConcreteDataType::Int8(_) => ScalarValue::Int8(
652                            data.map(|n| n.to_i8().ok_or_else(|| numeric_out_of_range_error(n)))
653                                .transpose()?,
654                        ),
655                        ConcreteDataType::Int16(_) => ScalarValue::Int16(
656                            data.map(|n| n.to_i16().ok_or_else(|| numeric_out_of_range_error(n)))
657                                .transpose()?,
658                        ),
659                        ConcreteDataType::Int32(_) => ScalarValue::Int32(
660                            data.map(|n| n.to_i32().ok_or_else(|| numeric_out_of_range_error(n)))
661                                .transpose()?,
662                        ),
663                        ConcreteDataType::Int64(_) => ScalarValue::Int64(
664                            data.map(|n| n.to_i64().ok_or_else(|| numeric_out_of_range_error(n)))
665                                .transpose()?,
666                        ),
667                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(
668                            data.map(|n| n.to_u8().ok_or_else(|| numeric_out_of_range_error(n)))
669                                .transpose()?,
670                        ),
671                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(
672                            data.map(|n| n.to_u16().ok_or_else(|| numeric_out_of_range_error(n)))
673                                .transpose()?,
674                        ),
675                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(
676                            data.map(|n| n.to_u32().ok_or_else(|| numeric_out_of_range_error(n)))
677                                .transpose()?,
678                        ),
679                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(
680                            data.map(|n| n.to_u64().ok_or_else(|| numeric_out_of_range_error(n)))
681                                .transpose()?,
682                        ),
683                        ConcreteDataType::Float32(_) => ScalarValue::Float32(
684                            data.map(|n| n.to_f32().ok_or_else(|| numeric_out_of_range_error(n)))
685                                .transpose()?,
686                        ),
687                        ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
688                        _ => {
689                            return Err(invalid_parameter_error(
690                                "invalid_parameter_type",
691                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
692                            ));
693                        }
694                    }
695                } else {
696                    ScalarValue::Float64(data)
697                }
698            }
699            &Type::TIMESTAMP => {
700                let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
701                if let Some(server_type) = &server_type {
702                    match server_type {
703                        ConcreteDataType::Timestamp(unit) => match *unit {
704                            TimestampType::Second(_) => ScalarValue::TimestampSecond(
705                                data.map(|ts| ts.and_utc().timestamp()),
706                                None,
707                            ),
708                            TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
709                                data.map(|ts| ts.and_utc().timestamp_millis()),
710                                None,
711                            ),
712                            TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
713                                data.map(|ts| ts.and_utc().timestamp_micros()),
714                                None,
715                            ),
716                            TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
717                                data.and_then(|ts| ts.and_utc().timestamp_nanos_opt()),
718                                None,
719                            ),
720                        },
721                        _ => {
722                            return Err(invalid_parameter_error(
723                                "invalid_parameter_type",
724                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
725                            ));
726                        }
727                    }
728                } else {
729                    ScalarValue::TimestampMillisecond(
730                        data.map(|ts| ts.and_utc().timestamp_millis()),
731                        None,
732                    )
733                }
734            }
735            &Type::TIMESTAMPTZ => {
736                let data = portal.parameter::<DateTime<FixedOffset>>(idx, &client_type)?;
737                if let Some(server_type) = &server_type {
738                    match server_type {
739                        ConcreteDataType::Timestamp(unit) => match *unit {
740                            TimestampType::Second(_) => {
741                                ScalarValue::TimestampSecond(data.map(|ts| ts.timestamp()), None)
742                            }
743                            TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
744                                data.map(|ts| ts.timestamp_millis()),
745                                None,
746                            ),
747                            TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
748                                data.map(|ts| ts.timestamp_micros()),
749                                None,
750                            ),
751                            TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
752                                data.and_then(|ts| ts.timestamp_nanos_opt()),
753                                None,
754                            ),
755                        },
756                        _ => {
757                            return Err(invalid_parameter_error(
758                                "invalid_parameter_type",
759                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
760                            ));
761                        }
762                    }
763                } else {
764                    ScalarValue::TimestampMillisecond(data.map(|ts| ts.timestamp_millis()), None)
765                }
766            }
767            &Type::DATE => {
768                let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
769                if let Some(server_type) = &server_type {
770                    match server_type {
771                        ConcreteDataType::Date(_) => ScalarValue::Date32(
772                            data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
773                        ),
774                        _ => {
775                            return Err(invalid_parameter_error(
776                                "invalid_parameter_type",
777                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
778                            ));
779                        }
780                    }
781                } else {
782                    ScalarValue::Date32(
783                        data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
784                    )
785                }
786            }
787            &Type::INTERVAL => {
788                let data = portal.parameter::<PgInterval>(idx, &client_type)?;
789                if let Some(server_type) = &server_type {
790                    match server_type {
791                        ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
792                            ScalarValue::IntervalYearMonth(
793                                data.map(|i| {
794                                    if i.days != 0 || i.microseconds != 0 {
795                                        Err(invalid_parameter_error(
796                                            "invalid_parameter_type",
797                                            Some(format!(
798                                                "Expected: {}, found: {}",
799                                                server_type, client_type
800                                            )),
801                                        ))
802                                    } else {
803                                        Ok(IntervalYearMonth::new(i.months).to_i32())
804                                    }
805                                })
806                                .transpose()?,
807                            )
808                        }
809                        ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
810                            ScalarValue::IntervalDayTime(
811                                data.map(|i| {
812                                    if i.months != 0 || i.microseconds % 1000 != 0 {
813                                        Err(invalid_parameter_error(
814                                            "invalid_parameter_type",
815                                            Some(format!(
816                                                "Expected: {}, found: {}",
817                                                server_type, client_type
818                                            )),
819                                        ))
820                                    } else {
821                                        Ok(IntervalDayTime::new(
822                                            i.days,
823                                            (i.microseconds / 1000) as i32,
824                                        )
825                                        .into())
826                                    }
827                                })
828                                .transpose()?,
829                            )
830                        }
831                        ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
832                            ScalarValue::IntervalMonthDayNano(data.map(|i| {
833                                IntervalMonthDayNano::new(
834                                    i.months,
835                                    i.days,
836                                    i.microseconds * 1_000i64,
837                                )
838                                .into()
839                            }))
840                        }
841                        _ => {
842                            return Err(invalid_parameter_error(
843                                "invalid_parameter_type",
844                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
845                            ));
846                        }
847                    }
848                } else {
849                    ScalarValue::IntervalMonthDayNano(data.map(|i| {
850                        IntervalMonthDayNano::new(i.months, i.days, i.microseconds * 1_000i64)
851                            .into()
852                    }))
853                }
854            }
855            &Type::BYTEA => {
856                let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
857                if let Some(server_type) = &server_type {
858                    match server_type {
859                        ConcreteDataType::String(t) => {
860                            let s = data.map(|d| String::from_utf8_lossy(&d).to_string());
861                            if t.is_large() {
862                                ScalarValue::LargeUtf8(s)
863                            } else {
864                                ScalarValue::Utf8(s)
865                            }
866                        }
867                        ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
868                        _ => {
869                            return Err(invalid_parameter_error(
870                                "invalid_parameter_type",
871                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
872                            ));
873                        }
874                    }
875                } else {
876                    ScalarValue::Binary(data)
877                }
878            }
879            &Type::JSONB => {
880                let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
881                if let Some(server_type) = &server_type {
882                    match server_type {
883                        ConcreteDataType::Binary(_) => {
884                            ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
885                        }
886                        _ => {
887                            return Err(invalid_parameter_error(
888                                "invalid_parameter_type",
889                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
890                            ));
891                        }
892                    }
893                } else {
894                    ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
895                }
896            }
897            &Type::INT2_ARRAY => {
898                let data = portal.parameter::<Vec<Option<i16>>>(idx, &client_type)?;
899                if let Some(data) = data {
900                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
901                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
902                } else {
903                    ScalarValue::Null
904                }
905            }
906            &Type::INT4_ARRAY => {
907                let data = portal.parameter::<Vec<Option<i32>>>(idx, &client_type)?;
908                if let Some(data) = data {
909                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
910                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
911                } else {
912                    ScalarValue::Null
913                }
914            }
915            &Type::INT8_ARRAY => {
916                let data = portal.parameter::<Vec<Option<i64>>>(idx, &client_type)?;
917                if let Some(data) = data {
918                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
919                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
920                } else {
921                    ScalarValue::Null
922                }
923            }
924            &Type::NUMERIC_ARRAY => {
925                let data = portal.parameter::<Vec<Option<Decimal>>>(idx, &client_type)?;
926                if let Some(data) = data {
927                    let build_u64_list = |data: Vec<Option<Decimal>>| {
928                        let values = data
929                            .into_iter()
930                            .map(|n| ScalarValue::UInt64(n.and_then(|n| n.to_u64())))
931                            .collect::<Vec<_>>();
932                        ScalarValue::List(ScalarValue::new_list(
933                            &values,
934                            &ArrowDataType::UInt64,
935                            true,
936                        ))
937                    };
938                    if let Some(server_type) = &server_type {
939                        match server_type {
940                            ConcreteDataType::List(list_type) => match list_type.item_type() {
941                                ConcreteDataType::UInt64(_) => build_u64_list(data),
942                                ConcreteDataType::Decimal128(dt) => {
943                                    let values = data
944                                        .into_iter()
945                                        .map(|n| to_decimal_scalar_value(n, dt))
946                                        .collect::<Vec<_>>();
947                                    ScalarValue::List(ScalarValue::new_list(
948                                        &values,
949                                        &ArrowDataType::Decimal128(dt.precision(), dt.scale()),
950                                        true,
951                                    ))
952                                }
953                                _ => {
954                                    // the server type is not a list of decimal or uint64
955                                    return Err(invalid_parameter_error(
956                                        "invalid_parameter_type",
957                                        Some(format!(
958                                            "Expected: {}, found: {}",
959                                            list_type.item_type(),
960                                            client_type
961                                        )),
962                                    ));
963                                }
964                            },
965                            _ => {
966                                // the server type is not a list
967                                return Err(invalid_parameter_error(
968                                    "invalid_parameter_type",
969                                    Some(format!(
970                                        "Expected: {}, found: {}",
971                                        server_type, client_type
972                                    )),
973                                ));
974                            }
975                        }
976                    } else {
977                        // server type not provided
978                        build_u64_list(data)
979                    }
980                } else {
981                    ScalarValue::Null
982                }
983            }
984            &Type::VARCHAR_ARRAY | &Type::TEXT_ARRAY | &Type::CHAR_ARRAY => {
985                let data = portal.parameter::<Vec<Option<String>>>(idx, &client_type)?;
986                if let Some(data) = data {
987                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
988                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
989                } else {
990                    ScalarValue::Null
991                }
992            }
993            &Type::TIMESTAMP_ARRAY => {
994                let data = portal.parameter::<Vec<Option<NaiveDateTime>>>(idx, &client_type)?;
995                if let Some(data) = data {
996                    if let Some(ConcreteDataType::List(list_type)) = &server_type {
997                        match list_type.item_type() {
998                            ConcreteDataType::Timestamp(unit) => match *unit {
999                                TimestampType::Second(_) => {
1000                                    let values = data
1001                                        .into_iter()
1002                                        .map(|ts| {
1003                                            ScalarValue::TimestampSecond(
1004                                                ts.map(|ts| ts.and_utc().timestamp()),
1005                                                None,
1006                                            )
1007                                        })
1008                                        .collect::<Vec<_>>();
1009                                    ScalarValue::List(ScalarValue::new_list(
1010                                        &values,
1011                                        &ArrowDataType::Timestamp(TimeUnit::Second, None),
1012                                        true,
1013                                    ))
1014                                }
1015                                TimestampType::Millisecond(_) => {
1016                                    let values = data
1017                                        .into_iter()
1018                                        .map(|ts| {
1019                                            ScalarValue::TimestampMillisecond(
1020                                                ts.map(|ts| ts.and_utc().timestamp_millis()),
1021                                                None,
1022                                            )
1023                                        })
1024                                        .collect::<Vec<_>>();
1025                                    ScalarValue::List(ScalarValue::new_list(
1026                                        &values,
1027                                        &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1028                                        true,
1029                                    ))
1030                                }
1031                                TimestampType::Microsecond(_) => {
1032                                    let values = data
1033                                        .into_iter()
1034                                        .map(|ts| {
1035                                            ScalarValue::TimestampMicrosecond(
1036                                                ts.map(|ts| ts.and_utc().timestamp_micros()),
1037                                                None,
1038                                            )
1039                                        })
1040                                        .collect::<Vec<_>>();
1041                                    ScalarValue::List(ScalarValue::new_list(
1042                                        &values,
1043                                        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1044                                        true,
1045                                    ))
1046                                }
1047                                TimestampType::Nanosecond(_) => {
1048                                    let values = data
1049                                        .into_iter()
1050                                        .filter_map(|ts| {
1051                                            ts.and_then(|ts| {
1052                                                ts.and_utc().timestamp_nanos_opt().map(|nanos| {
1053                                                    ScalarValue::TimestampNanosecond(
1054                                                        Some(nanos),
1055                                                        None,
1056                                                    )
1057                                                })
1058                                            })
1059                                        })
1060                                        .collect::<Vec<_>>();
1061                                    ScalarValue::List(ScalarValue::new_list(
1062                                        &values,
1063                                        &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
1064                                        true,
1065                                    ))
1066                                }
1067                            },
1068                            _ => {
1069                                return Err(invalid_parameter_error(
1070                                    "invalid_parameter_type",
1071                                    Some(format!(
1072                                        "Expected: {}, found: {}",
1073                                        list_type.item_type(),
1074                                        client_type
1075                                    )),
1076                                ));
1077                            }
1078                        }
1079                    } else {
1080                        let values = data
1081                            .into_iter()
1082                            .map(|ts| {
1083                                ScalarValue::TimestampMillisecond(
1084                                    ts.map(|ts| ts.and_utc().timestamp_millis()),
1085                                    None,
1086                                )
1087                            })
1088                            .collect::<Vec<_>>();
1089                        ScalarValue::List(ScalarValue::new_list(
1090                            &values,
1091                            &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1092                            true,
1093                        ))
1094                    }
1095                } else {
1096                    ScalarValue::Null
1097                }
1098            }
1099            &Type::TIMESTAMPTZ_ARRAY => {
1100                let data =
1101                    portal.parameter::<Vec<Option<DateTime<FixedOffset>>>>(idx, &client_type)?;
1102                if let Some(data) = data {
1103                    if let Some(ConcreteDataType::List(list_type)) = &server_type {
1104                        match list_type.item_type() {
1105                            ConcreteDataType::Timestamp(unit) => match *unit {
1106                                TimestampType::Second(_) => {
1107                                    let values = data
1108                                        .into_iter()
1109                                        .map(|ts| {
1110                                            ScalarValue::TimestampSecond(
1111                                                ts.map(|ts| ts.timestamp()),
1112                                                None,
1113                                            )
1114                                        })
1115                                        .collect::<Vec<_>>();
1116                                    ScalarValue::List(ScalarValue::new_list(
1117                                        &values,
1118                                        &ArrowDataType::Timestamp(TimeUnit::Second, None),
1119                                        true,
1120                                    ))
1121                                }
1122                                TimestampType::Millisecond(_) => {
1123                                    let values = data
1124                                        .into_iter()
1125                                        .map(|ts| {
1126                                            ScalarValue::TimestampMillisecond(
1127                                                ts.map(|ts| ts.timestamp_millis()),
1128                                                None,
1129                                            )
1130                                        })
1131                                        .collect::<Vec<_>>();
1132                                    ScalarValue::List(ScalarValue::new_list(
1133                                        &values,
1134                                        &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1135                                        true,
1136                                    ))
1137                                }
1138                                TimestampType::Microsecond(_) => {
1139                                    let values = data
1140                                        .into_iter()
1141                                        .map(|ts| {
1142                                            ScalarValue::TimestampMicrosecond(
1143                                                ts.map(|ts| ts.timestamp_micros()),
1144                                                None,
1145                                            )
1146                                        })
1147                                        .collect::<Vec<_>>();
1148                                    ScalarValue::List(ScalarValue::new_list(
1149                                        &values,
1150                                        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1151                                        true,
1152                                    ))
1153                                }
1154                                TimestampType::Nanosecond(_) => {
1155                                    let values = data
1156                                        .into_iter()
1157                                        .map(|ts| {
1158                                            ScalarValue::TimestampNanosecond(
1159                                                ts.and_then(|ts| ts.timestamp_nanos_opt()),
1160                                                None,
1161                                            )
1162                                        })
1163                                        .collect::<Vec<_>>();
1164                                    ScalarValue::List(ScalarValue::new_list(
1165                                        &values,
1166                                        &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
1167                                        true,
1168                                    ))
1169                                }
1170                            },
1171                            _ => {
1172                                return Err(invalid_parameter_error(
1173                                    "invalid_parameter_type",
1174                                    Some(format!(
1175                                        "Expected: {}, found: {}",
1176                                        list_type.item_type(),
1177                                        client_type
1178                                    )),
1179                                ));
1180                            }
1181                        }
1182                    } else {
1183                        let values = data
1184                            .into_iter()
1185                            .map(|ts| {
1186                                ScalarValue::TimestampMillisecond(
1187                                    ts.map(|ts| ts.timestamp_millis()),
1188                                    None,
1189                                )
1190                            })
1191                            .collect::<Vec<_>>();
1192                        ScalarValue::List(ScalarValue::new_list(
1193                            &values,
1194                            &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1195                            true,
1196                        ))
1197                    }
1198                } else {
1199                    ScalarValue::Null
1200                }
1201            }
1202            _ => Err(invalid_parameter_error(
1203                "unsupported_parameter_value",
1204                Some(format!("Found type: {}", client_type)),
1205            ))?,
1206        };
1207
1208        results.push(value);
1209    }
1210
1211    Ok(results)
1212}
1213
1214pub(super) fn param_types_to_pg_types(
1215    param_types: &HashMap<String, Option<ConcreteDataType>>,
1216) -> Result<Vec<Type>> {
1217    let param_count = param_types.len();
1218    let mut types = Vec::with_capacity(param_count);
1219    for i in 0..param_count {
1220        if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1221            let pg_type = type_gt_to_pg(param_type)?;
1222            types.push(pg_type);
1223        } else {
1224            types.push(Type::UNKNOWN);
1225        }
1226    }
1227    Ok(types)
1228}
1229
1230pub fn format_options_from_query_ctx(query_ctx: &QueryContextRef) -> Arc<PgFormatOptions> {
1231    let config = query_ctx.configuration_parameter();
1232    let (date_style, date_order) = *config.pg_datetime_style();
1233
1234    let mut format_options = PgFormatOptions::default();
1235    format_options.date_style = format!("{}, {}", date_style, date_order);
1236    format_options.interval_style = config.pg_intervalstyle_format().to_string();
1237    format_options.bytea_output = config.postgres_bytea_output().to_string();
1238    format_options.time_zone = query_ctx.timezone().to_string();
1239
1240    Arc::new(format_options)
1241}
1242
1243#[cfg(test)]
1244mod test {
1245    use std::str::FromStr;
1246    use std::sync::Arc;
1247
1248    use arrow::array::{
1249        Float64Builder, Int64Builder, ListBuilder, StringBuilder, TimestampSecondBuilder,
1250    };
1251    use arrow_schema::{Field, IntervalUnit};
1252    use bytes::Bytes;
1253    use datafusion_expr::expr::Placeholder;
1254    use datafusion_expr::{Expr, LogicalPlanBuilder};
1255    use datatypes::schema::{ColumnSchema, Schema};
1256    use datatypes::vectors::{
1257        BinaryVector, BooleanVector, DateVector, Float32Vector, Float64Vector, Int8Vector,
1258        Int16Vector, Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
1259        IntervalYearMonthVector, ListVector, NullVector, StringVector, TimeSecondVector,
1260        TimestampSecondVector, UInt8Vector, UInt16Vector, UInt32Vector, UInt64Vector, VectorRef,
1261    };
1262    use futures::{StreamExt as FuturesStreamExt, stream};
1263    use pgwire::api::Type;
1264    use pgwire::api::portal::{Format, Portal};
1265    use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo};
1266    use pgwire::api::stmt::StoredStatement;
1267    use pgwire::messages::extendedquery::Bind;
1268    use session::context::QueryContextBuilder;
1269
1270    use super::*;
1271    use crate::SqlPlan;
1272    use crate::postgres::handler::PgSqlPlan;
1273
1274    #[test]
1275    fn test_schema_convert() {
1276        let column_schemas = vec![
1277            ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1278            ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1279            ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1280            ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1281            ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1282            ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1283            ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1284            ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1285            ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1286            ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1287            ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1288            ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1289            ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1290            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1291            ColumnSchema::new(
1292                "timestamps",
1293                ConcreteDataType::timestamp_millisecond_datatype(),
1294                true,
1295            ),
1296            ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1297            ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1298            ColumnSchema::new(
1299                "intervals",
1300                ConcreteDataType::interval_month_day_nano_datatype(),
1301                true,
1302            ),
1303        ];
1304        let pg_field_info = vec![
1305            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1306            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1307            FieldInfo::new("int8s".into(), None, None, Type::INT2, FieldFormat::Text),
1308            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1309            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1310            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1311            FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1312            FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1313            FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1314            FieldInfo::new(
1315                "uint64s".into(),
1316                None,
1317                None,
1318                Type::NUMERIC,
1319                FieldFormat::Text,
1320            ),
1321            FieldInfo::new(
1322                "float32s".into(),
1323                None,
1324                None,
1325                Type::FLOAT4,
1326                FieldFormat::Text,
1327            ),
1328            FieldInfo::new(
1329                "float64s".into(),
1330                None,
1331                None,
1332                Type::FLOAT8,
1333                FieldFormat::Text,
1334            ),
1335            FieldInfo::new(
1336                "binaries".into(),
1337                None,
1338                None,
1339                Type::BYTEA,
1340                FieldFormat::Text,
1341            ),
1342            FieldInfo::new(
1343                "strings".into(),
1344                None,
1345                None,
1346                Type::VARCHAR,
1347                FieldFormat::Text,
1348            ),
1349            FieldInfo::new(
1350                "timestamps".into(),
1351                None,
1352                None,
1353                Type::TIMESTAMP,
1354                FieldFormat::Text,
1355            ),
1356            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1357            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1358            FieldInfo::new(
1359                "intervals".into(),
1360                None,
1361                None,
1362                Type::INTERVAL,
1363                FieldFormat::Text,
1364            ),
1365        ];
1366        let schema = Schema::new(column_schemas);
1367        let fs = schema_to_pg(&schema, &Format::UnifiedText, None).unwrap();
1368        assert_eq!(fs, pg_field_info);
1369    }
1370
1371    #[test]
1372    fn test_encode_text_format_data() {
1373        let pg_schema = vec![
1374            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1375            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1376            FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1377            FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1378            FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1379            FieldInfo::new(
1380                "uint64s".into(),
1381                None,
1382                None,
1383                Type::NUMERIC,
1384                FieldFormat::Text,
1385            ),
1386            FieldInfo::new("int8s".into(), None, None, Type::INT2, FieldFormat::Text),
1387            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1388            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1389            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1390            FieldInfo::new(
1391                "float32s".into(),
1392                None,
1393                None,
1394                Type::FLOAT4,
1395                FieldFormat::Text,
1396            ),
1397            FieldInfo::new(
1398                "float64s".into(),
1399                None,
1400                None,
1401                Type::FLOAT8,
1402                FieldFormat::Text,
1403            ),
1404            FieldInfo::new(
1405                "strings".into(),
1406                None,
1407                None,
1408                Type::VARCHAR,
1409                FieldFormat::Text,
1410            ),
1411            FieldInfo::new(
1412                "binaries".into(),
1413                None,
1414                None,
1415                Type::BYTEA,
1416                FieldFormat::Text,
1417            ),
1418            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1419            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1420            FieldInfo::new(
1421                "timestamps".into(),
1422                None,
1423                None,
1424                Type::TIMESTAMP,
1425                FieldFormat::Text,
1426            ),
1427            FieldInfo::new(
1428                "interval_year_month".into(),
1429                None,
1430                None,
1431                Type::INTERVAL,
1432                FieldFormat::Text,
1433            ),
1434            FieldInfo::new(
1435                "interval_day_time".into(),
1436                None,
1437                None,
1438                Type::INTERVAL,
1439                FieldFormat::Text,
1440            ),
1441            FieldInfo::new(
1442                "interval_month_day_nano".into(),
1443                None,
1444                None,
1445                Type::INTERVAL,
1446                FieldFormat::Text,
1447            ),
1448            FieldInfo::new(
1449                "int_list".into(),
1450                None,
1451                None,
1452                Type::INT8_ARRAY,
1453                FieldFormat::Text,
1454            ),
1455            FieldInfo::new(
1456                "float_list".into(),
1457                None,
1458                None,
1459                Type::FLOAT8_ARRAY,
1460                FieldFormat::Text,
1461            ),
1462            FieldInfo::new(
1463                "string_list".into(),
1464                None,
1465                None,
1466                Type::VARCHAR_ARRAY,
1467                FieldFormat::Text,
1468            ),
1469            FieldInfo::new(
1470                "timestamp_list".into(),
1471                None,
1472                None,
1473                Type::TIMESTAMP_ARRAY,
1474                FieldFormat::Text,
1475            ),
1476        ];
1477
1478        let arrow_schema = arrow_schema::Schema::new(vec![
1479            Field::new("x", DataType::Null, true),
1480            Field::new("x", DataType::Boolean, true),
1481            Field::new("x", DataType::UInt8, true),
1482            Field::new("x", DataType::UInt16, true),
1483            Field::new("x", DataType::UInt32, true),
1484            Field::new("x", DataType::UInt64, true),
1485            Field::new("x", DataType::Int8, true),
1486            Field::new("x", DataType::Int16, true),
1487            Field::new("x", DataType::Int32, true),
1488            Field::new("x", DataType::Int64, true),
1489            Field::new("x", DataType::Float32, true),
1490            Field::new("x", DataType::Float64, true),
1491            Field::new("x", DataType::Utf8, true),
1492            Field::new("x", DataType::Binary, true),
1493            Field::new("x", DataType::Date32, true),
1494            Field::new("x", DataType::Time32(TimeUnit::Second), true),
1495            Field::new("x", DataType::Timestamp(TimeUnit::Second, None), true),
1496            Field::new("x", DataType::Interval(IntervalUnit::YearMonth), true),
1497            Field::new("x", DataType::Interval(IntervalUnit::DayTime), true),
1498            Field::new("x", DataType::Interval(IntervalUnit::MonthDayNano), true),
1499            Field::new(
1500                "x",
1501                DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
1502                true,
1503            ),
1504            Field::new(
1505                "x",
1506                DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
1507                true,
1508            ),
1509            Field::new(
1510                "x",
1511                DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
1512                true,
1513            ),
1514            Field::new(
1515                "x",
1516                DataType::List(Arc::new(Field::new(
1517                    "item",
1518                    DataType::Timestamp(TimeUnit::Second, None),
1519                    true,
1520                ))),
1521                true,
1522            ),
1523        ]);
1524
1525        let mut builder = ListBuilder::new(Int64Builder::new());
1526        builder.append_value([Some(1i64), None, Some(2)]);
1527        builder.append_null();
1528        builder.append_value([Some(-1i64), None, Some(-2)]);
1529        let i64_list_array = builder.finish();
1530
1531        let mut builder = ListBuilder::new(Float64Builder::new());
1532        builder.append_value([Some(1.0f64), None, Some(2.0)]);
1533        builder.append_null();
1534        builder.append_value([Some(-1.0f64), None, Some(-2.0)]);
1535        let f64_list_array = builder.finish();
1536
1537        let mut builder = ListBuilder::new(StringBuilder::new());
1538        builder.append_value([Some("a"), None, Some("b")]);
1539        builder.append_null();
1540        builder.append_value([Some("c"), None, Some("d")]);
1541        let string_list_array = builder.finish();
1542
1543        let mut builder = ListBuilder::new(TimestampSecondBuilder::new());
1544        builder.append_value([Some(1i64), None, Some(2)]);
1545        builder.append_null();
1546        builder.append_value([Some(3i64), None, Some(4)]);
1547        let timestamp_list_array = builder.finish();
1548
1549        let values = vec![
1550            Arc::new(NullVector::new(3)) as VectorRef,
1551            Arc::new(BooleanVector::from(vec![Some(true), Some(false), None])),
1552            Arc::new(UInt8Vector::from(vec![Some(u8::MAX), Some(u8::MIN), None])),
1553            Arc::new(UInt16Vector::from(vec![
1554                Some(u16::MAX),
1555                Some(u16::MIN),
1556                None,
1557            ])),
1558            Arc::new(UInt32Vector::from(vec![
1559                Some(u32::MAX),
1560                Some(u32::MIN),
1561                None,
1562            ])),
1563            Arc::new(UInt64Vector::from(vec![
1564                Some(u64::MAX),
1565                Some(u64::MIN),
1566                None,
1567            ])),
1568            Arc::new(Int8Vector::from(vec![Some(i8::MAX), Some(i8::MIN), None])),
1569            Arc::new(Int16Vector::from(vec![
1570                Some(i16::MAX),
1571                Some(i16::MIN),
1572                None,
1573            ])),
1574            Arc::new(Int32Vector::from(vec![
1575                Some(i32::MAX),
1576                Some(i32::MIN),
1577                None,
1578            ])),
1579            Arc::new(Int64Vector::from(vec![
1580                Some(i64::MAX),
1581                Some(i64::MIN),
1582                None,
1583            ])),
1584            Arc::new(Float32Vector::from(vec![
1585                None,
1586                Some(f32::MAX),
1587                Some(f32::MIN),
1588            ])),
1589            Arc::new(Float64Vector::from(vec![
1590                None,
1591                Some(f64::MAX),
1592                Some(f64::MIN),
1593            ])),
1594            Arc::new(StringVector::from(vec![
1595                None,
1596                Some("hello"),
1597                Some("greptime"),
1598            ])),
1599            Arc::new(BinaryVector::from(vec![
1600                None,
1601                Some("hello".as_bytes().to_vec()),
1602                Some("world".as_bytes().to_vec()),
1603            ])),
1604            Arc::new(DateVector::from(vec![Some(1001), None, Some(1)])),
1605            Arc::new(TimeSecondVector::from(vec![Some(1001), None, Some(1)])),
1606            Arc::new(TimestampSecondVector::from(vec![
1607                Some(1000001),
1608                None,
1609                Some(1),
1610            ])),
1611            Arc::new(IntervalYearMonthVector::from(vec![Some(1), None, Some(2)])),
1612            Arc::new(IntervalDayTimeVector::from(vec![
1613                Some(arrow::datatypes::IntervalDayTime::new(1, 1)),
1614                None,
1615                Some(arrow::datatypes::IntervalDayTime::new(2, 2)),
1616            ])),
1617            Arc::new(IntervalMonthDayNanoVector::from(vec![
1618                Some(arrow::datatypes::IntervalMonthDayNano::new(1, 1, 10)),
1619                None,
1620                Some(arrow::datatypes::IntervalMonthDayNano::new(2, 2, 20)),
1621            ])),
1622            Arc::new(ListVector::from(i64_list_array)),
1623            Arc::new(ListVector::from(f64_list_array)),
1624            Arc::new(ListVector::from(string_list_array)),
1625            Arc::new(ListVector::from(timestamp_list_array)),
1626        ];
1627        let record_batch =
1628            RecordBatch::new(Arc::new(arrow_schema.try_into().unwrap()), values).unwrap();
1629
1630        let query_context = QueryContextBuilder::default()
1631            .configuration_parameter(Default::default())
1632            .build()
1633            .into();
1634        let schema = record_batch.schema.clone();
1635        let pg_schema_ref = Arc::new(pg_schema);
1636
1637        let encoder = DataRowEncoder::new(pg_schema_ref.clone());
1638
1639        let row_stream = RecordBatchRowStream::new(
1640            query_context,
1641            pg_schema_ref.clone(),
1642            schema,
1643            stream::once(async { Ok(record_batch) }),
1644            encoder,
1645        );
1646
1647        let rows: Vec<_> = futures::executor::block_on(
1648            row_stream
1649                .filter_map(|x: PgWireResult<_>| async move { x.ok() })
1650                .flat_map(stream::iter)
1651                .collect::<Vec<_>>(),
1652        );
1653        assert_eq!(rows.len(), 3);
1654        for row in rows {
1655            assert_eq!(row.field_count, pg_schema_ref.len() as i16);
1656        }
1657    }
1658
1659    #[test]
1660    fn test_invalid_parameter() {
1661        // test for refactor with PgErrorCode
1662        let msg = "invalid_parameter_count";
1663        let error = invalid_parameter_error(msg, None);
1664        if let PgWireError::UserError(value) = error {
1665            assert_eq!("ERROR", value.severity);
1666            assert_eq!("22023", value.code);
1667            assert_eq!(msg, value.message);
1668        } else {
1669            panic!("test_invalid_parameter failed");
1670        }
1671    }
1672
1673    #[test]
1674    fn test_to_decimal_scalar_value() {
1675        let dt = Decimal128Type::new(18, 4);
1676
1677        let d = Decimal::from_str("12345.6789").unwrap();
1678        assert_eq!(d.mantissa(), 123456789i128);
1679        let scalar = to_decimal_scalar_value(Some(d), &dt);
1680        assert_eq!(scalar, ScalarValue::Decimal128(Some(123456789), 18, 4));
1681
1682        let d = Decimal::from_str("100.5").unwrap();
1683        assert_eq!(d.mantissa(), 1005);
1684        let scalar = to_decimal_scalar_value(Some(d), &dt);
1685        assert_eq!(scalar, ScalarValue::Decimal128(Some(1005000), 18, 4));
1686
1687        let d = Decimal::from_str("-9876.5432").unwrap();
1688        let scalar = to_decimal_scalar_value(Some(d), &dt);
1689        assert_eq!(scalar, ScalarValue::Decimal128(Some(-98765432), 18, 4));
1690
1691        let scalar = to_decimal_scalar_value(None, &dt);
1692        assert_eq!(scalar, ScalarValue::Decimal128(None, 18, 4));
1693    }
1694
1695    fn s(v: &str) -> Option<String> {
1696        Some(v.to_string())
1697    }
1698
1699    fn typed_param(id: &str, dt: DataType) -> Expr {
1700        Expr::Placeholder(Placeholder::new_with_field(
1701            id.to_string(),
1702            Some(Arc::new(arrow_schema::Field::new(id, dt, true))),
1703        ))
1704    }
1705
1706    fn build_plan_with_params(params: Vec<(&str, DataType)>) -> LogicalPlan {
1707        let exprs: Vec<Expr> = params
1708            .into_iter()
1709            .map(|(id, dt)| typed_param(id, dt))
1710            .collect();
1711        LogicalPlanBuilder::empty(true)
1712            .project(exprs)
1713            .unwrap()
1714            .build()
1715            .unwrap()
1716    }
1717
1718    fn make_portal(
1719        client_param_types: Vec<Option<Type>>,
1720        param_data: Vec<Option<String>>,
1721    ) -> Portal<PgSqlPlan> {
1722        let bind = Bind::new(
1723            None,
1724            None,
1725            vec![],
1726            param_data
1727                .into_iter()
1728                .map(|opt| opt.map(Bytes::from))
1729                .collect(),
1730            vec![],
1731        );
1732        let statement = Arc::new(StoredStatement::new(
1733            String::new(),
1734            PgSqlPlan {
1735                plan: SqlPlan::Empty,
1736                copy_to_stdout_format: None,
1737            },
1738            client_param_types,
1739        ));
1740        Portal::try_new(&bind, statement).unwrap()
1741    }
1742
1743    #[test]
1744    fn test_int2_coerce_in_range() {
1745        let plan = build_plan_with_params(vec![
1746            ("$1", DataType::Int8),
1747            ("$2", DataType::Int16),
1748            ("$3", DataType::Int32),
1749            ("$4", DataType::Int64),
1750            ("$5", DataType::UInt8),
1751            ("$6", DataType::UInt16),
1752            ("$7", DataType::UInt32),
1753            ("$8", DataType::UInt64),
1754        ]);
1755        let portal = make_portal(
1756            vec![
1757                Some(Type::INT2),
1758                Some(Type::INT2),
1759                Some(Type::INT2),
1760                Some(Type::INT2),
1761                Some(Type::INT2),
1762                Some(Type::INT2),
1763                Some(Type::INT2),
1764                Some(Type::INT2),
1765            ],
1766            vec![
1767                s("100"),
1768                s("100"),
1769                s("100"),
1770                s("100"),
1771                s("100"),
1772                s("100"),
1773                s("100"),
1774                s("100"),
1775            ],
1776        );
1777
1778        let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1779        assert_eq!(values[0], ScalarValue::Int8(Some(100)));
1780        assert_eq!(values[1], ScalarValue::Int16(Some(100)));
1781        assert_eq!(values[2], ScalarValue::Int32(Some(100)));
1782        assert_eq!(values[3], ScalarValue::Int64(Some(100)));
1783        assert_eq!(values[4], ScalarValue::UInt8(Some(100)));
1784        assert_eq!(values[5], ScalarValue::UInt16(Some(100)));
1785        assert_eq!(values[6], ScalarValue::UInt32(Some(100)));
1786        assert_eq!(values[7], ScalarValue::UInt64(Some(100)));
1787    }
1788
1789    #[test]
1790    fn test_int2_coerce_out_of_range() {
1791        let plan = build_plan_with_params(vec![("$1", DataType::Int8)]);
1792        let portal = make_portal(vec![Some(Type::INT2)], vec![s("200")]);
1793        let result = parameters_to_scalar_values(&plan, &portal);
1794        assert!(result.is_err());
1795    }
1796
1797    #[test]
1798    fn test_int2_coerce_negative_to_unsigned_out_of_range() {
1799        let plan = build_plan_with_params(vec![("$1", DataType::UInt64)]);
1800        let portal = make_portal(vec![Some(Type::INT2)], vec![s("-1")]);
1801        let result = parameters_to_scalar_values(&plan, &portal);
1802        assert!(result.is_err());
1803    }
1804
1805    #[test]
1806    fn test_int4_coerce_in_range() {
1807        let plan = build_plan_with_params(vec![
1808            ("$1", DataType::Int8),
1809            ("$2", DataType::Int16),
1810            ("$3", DataType::Int32),
1811            ("$4", DataType::Int64),
1812            ("$5", DataType::UInt8),
1813            ("$6", DataType::UInt16),
1814            ("$7", DataType::UInt32),
1815            ("$8", DataType::UInt64),
1816        ]);
1817        let portal = make_portal(
1818            vec![
1819                Some(Type::INT4),
1820                Some(Type::INT4),
1821                Some(Type::INT4),
1822                Some(Type::INT4),
1823                Some(Type::INT4),
1824                Some(Type::INT4),
1825                Some(Type::INT4),
1826                Some(Type::INT4),
1827            ],
1828            vec![
1829                s("100"),
1830                s("1000"),
1831                s("100000"),
1832                s("100000"),
1833                s("200"),
1834                s("1000"),
1835                s("100000"),
1836                s("100000"),
1837            ],
1838        );
1839
1840        let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1841        assert_eq!(values[0], ScalarValue::Int8(Some(100)));
1842        assert_eq!(values[1], ScalarValue::Int16(Some(1000)));
1843        assert_eq!(values[2], ScalarValue::Int32(Some(100000)));
1844        assert_eq!(values[3], ScalarValue::Int64(Some(100000)));
1845        assert_eq!(values[4], ScalarValue::UInt8(Some(200)));
1846        assert_eq!(values[5], ScalarValue::UInt16(Some(1000)));
1847        assert_eq!(values[6], ScalarValue::UInt32(Some(100000)));
1848        assert_eq!(values[7], ScalarValue::UInt64(Some(100000)));
1849    }
1850
1851    #[test]
1852    fn test_int4_coerce_out_of_range() {
1853        let plan = build_plan_with_params(vec![("$1", DataType::Int8)]);
1854        let portal = make_portal(vec![Some(Type::INT4)], vec![s("200")]);
1855        let result = parameters_to_scalar_values(&plan, &portal);
1856        assert!(result.is_err());
1857    }
1858
1859    #[test]
1860    fn test_int4_coerce_i32_max_to_i16_out_of_range() {
1861        let plan = build_plan_with_params(vec![("$1", DataType::Int16)]);
1862        let portal = make_portal(vec![Some(Type::INT4)], vec![Some(i32::MAX.to_string())]);
1863        let result = parameters_to_scalar_values(&plan, &portal);
1864        assert!(result.is_err());
1865    }
1866
1867    #[test]
1868    fn test_int8_coerce_in_range() {
1869        let plan = build_plan_with_params(vec![
1870            ("$1", DataType::Int8),
1871            ("$2", DataType::Int16),
1872            ("$3", DataType::Int32),
1873            ("$4", DataType::Int64),
1874            ("$5", DataType::UInt8),
1875            ("$6", DataType::UInt16),
1876            ("$7", DataType::UInt32),
1877            ("$8", DataType::UInt64),
1878        ]);
1879        let portal = make_portal(
1880            vec![
1881                Some(Type::INT8),
1882                Some(Type::INT8),
1883                Some(Type::INT8),
1884                Some(Type::INT8),
1885                Some(Type::INT8),
1886                Some(Type::INT8),
1887                Some(Type::INT8),
1888                Some(Type::INT8),
1889            ],
1890            vec![
1891                s("100"),
1892                s("1000"),
1893                s("100000"),
1894                s("100000"),
1895                s("200"),
1896                s("1000"),
1897                s("3000000000"),
1898                s("3000000000"),
1899            ],
1900        );
1901
1902        let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1903        assert_eq!(values[0], ScalarValue::Int8(Some(100)));
1904        assert_eq!(values[1], ScalarValue::Int16(Some(1000)));
1905        assert_eq!(values[2], ScalarValue::Int32(Some(100000)));
1906        assert_eq!(values[3], ScalarValue::Int64(Some(100000)));
1907        assert_eq!(values[4], ScalarValue::UInt8(Some(200)));
1908        assert_eq!(values[5], ScalarValue::UInt16(Some(1000)));
1909        assert_eq!(values[6], ScalarValue::UInt32(Some(3000000000)));
1910        assert_eq!(values[7], ScalarValue::UInt64(Some(3000000000)));
1911    }
1912
1913    #[test]
1914    fn test_int8_coerce_out_of_range() {
1915        let plan = build_plan_with_params(vec![("$1", DataType::Int32)]);
1916        let portal = make_portal(
1917            vec![Some(Type::INT8)],
1918            vec![Some((i32::MAX as i64 + 1).to_string())],
1919        );
1920        let result = parameters_to_scalar_values(&plan, &portal);
1921        assert!(result.is_err());
1922    }
1923
1924    #[test]
1925    fn test_int8_coerce_negative_to_unsigned_out_of_range() {
1926        let plan = build_plan_with_params(vec![("$1", DataType::UInt64)]);
1927        let portal = make_portal(vec![Some(Type::INT8)], vec![s("-1")]);
1928        let result = parameters_to_scalar_values(&plan, &portal);
1929        assert!(result.is_err());
1930    }
1931
1932    #[test]
1933    fn test_float4_coerce_in_range() {
1934        let plan =
1935            build_plan_with_params(vec![("$1", DataType::Float32), ("$2", DataType::Float64)]);
1936        let portal = make_portal(
1937            vec![Some(Type::FLOAT4), Some(Type::FLOAT4)],
1938            vec![s("1.5"), s("2.5")],
1939        );
1940
1941        let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1942        assert_eq!(values[0], ScalarValue::Float32(Some(1.5)));
1943        assert_eq!(values[1], ScalarValue::Float64(Some(2.5)));
1944    }
1945
1946    #[test]
1947    fn test_float4_coerce_to_int_in_range() {
1948        let plan = build_plan_with_params(vec![
1949            ("$1", DataType::Int8),
1950            ("$2", DataType::Int32),
1951            ("$3", DataType::UInt64),
1952        ]);
1953        let portal = make_portal(
1954            vec![Some(Type::FLOAT4), Some(Type::FLOAT4), Some(Type::FLOAT4)],
1955            vec![s("100"), s("1000"), s("200")],
1956        );
1957
1958        let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1959        assert_eq!(values[0], ScalarValue::Int8(Some(100)));
1960        assert_eq!(values[1], ScalarValue::Int32(Some(1000)));
1961        assert_eq!(values[2], ScalarValue::UInt64(Some(200)));
1962    }
1963
1964    #[test]
1965    fn test_float4_coerce_to_int_out_of_range() {
1966        let plan = build_plan_with_params(vec![("$1", DataType::Int8)]);
1967        let portal = make_portal(vec![Some(Type::FLOAT4)], vec![s("200")]);
1968        let result = parameters_to_scalar_values(&plan, &portal);
1969        assert!(result.is_err());
1970    }
1971
1972    #[test]
1973    fn test_float8_coerce_in_range() {
1974        let plan =
1975            build_plan_with_params(vec![("$1", DataType::Float32), ("$2", DataType::Float64)]);
1976        let portal = make_portal(
1977            vec![Some(Type::FLOAT8), Some(Type::FLOAT8)],
1978            vec![s("1.5"), s("2.5")],
1979        );
1980
1981        let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1982        assert_eq!(values[0], ScalarValue::Float32(Some(1.5)));
1983        assert_eq!(values[1], ScalarValue::Float64(Some(2.5)));
1984    }
1985
1986    #[test]
1987    fn test_float8_coerce_to_int_in_range() {
1988        let plan = build_plan_with_params(vec![
1989            ("$1", DataType::Int8),
1990            ("$2", DataType::Int64),
1991            ("$3", DataType::UInt64),
1992        ]);
1993        let portal = make_portal(
1994            vec![Some(Type::FLOAT8), Some(Type::FLOAT8), Some(Type::FLOAT8)],
1995            vec![s("100"), s("1000000"), s("200")],
1996        );
1997
1998        let values = parameters_to_scalar_values(&plan, &portal).unwrap();
1999        assert_eq!(values[0], ScalarValue::Int8(Some(100)));
2000        assert_eq!(values[1], ScalarValue::Int64(Some(1000000)));
2001        assert_eq!(values[2], ScalarValue::UInt64(Some(200)));
2002    }
2003
2004    #[test]
2005    fn test_float8_coerce_to_int_out_of_range() {
2006        let plan = build_plan_with_params(vec![("$1", DataType::Int8)]);
2007        let portal = make_portal(vec![Some(Type::FLOAT8)], vec![s("200")]);
2008        let result = parameters_to_scalar_values(&plan, &portal);
2009        assert!(result.is_err());
2010    }
2011
2012    #[test]
2013    fn test_float8_coerce_negative_to_unsigned_out_of_range() {
2014        let plan = build_plan_with_params(vec![("$1", DataType::UInt64)]);
2015        let portal = make_portal(vec![Some(Type::FLOAT8)], vec![s("-1")]);
2016        let result = parameters_to_scalar_values(&plan, &portal);
2017        assert!(result.is_err());
2018    }
2019
2020    #[test]
2021    fn test_null_parameter() {
2022        let plan = build_plan_with_params(vec![("$1", DataType::Int8)]);
2023        let portal = make_portal(vec![Some(Type::INT2)], vec![None]);
2024
2025        let values = parameters_to_scalar_values(&plan, &portal).unwrap();
2026        assert_eq!(values[0], ScalarValue::Int8(None));
2027    }
2028}