servers/postgres/
types.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod error;
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use arrow::array::{Array, AsArray};
21use arrow_pg::encoder::encode_value;
22use arrow_pg::list_encoder::encode_list;
23use arrow_schema::{DataType, TimeUnit};
24use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime};
25use common_recordbatch::RecordBatch;
26use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
27use datafusion_common::ScalarValue;
28use datafusion_expr::LogicalPlan;
29use datatypes::arrow::datatypes::DataType as ArrowDataType;
30use datatypes::json::JsonStructureSettings;
31use datatypes::prelude::{ConcreteDataType, Value};
32use datatypes::schema::{Schema, SchemaRef};
33use datatypes::types::{IntervalType, TimestampType, jsonb_to_string};
34use datatypes::value::StructValue;
35use pg_interval::Interval as PgInterval;
36use pgwire::api::Type;
37use pgwire::api::portal::{Format, Portal};
38use pgwire::api::results::{DataRowEncoder, FieldInfo};
39use pgwire::error::{PgWireError, PgWireResult};
40use pgwire::messages::data::DataRow;
41use pgwire::types::format::FormatOptions as PgFormatOptions;
42use session::context::QueryContextRef;
43use snafu::ResultExt;
44
45pub use self::error::{PgErrorCode, PgErrorSeverity};
46use crate::SqlPlan;
47use crate::error::{self as server_error, DataFusionSnafu, Result};
48use crate::postgres::utils::convert_err;
49
50pub(super) fn schema_to_pg(
51    origin: &Schema,
52    field_formats: &Format,
53    format_options: Option<Arc<PgFormatOptions>>,
54) -> Result<Vec<FieldInfo>> {
55    origin
56        .column_schemas()
57        .iter()
58        .enumerate()
59        .map(|(idx, col)| {
60            let mut field_info = FieldInfo::new(
61                col.name.clone(),
62                None,
63                None,
64                type_gt_to_pg(&col.data_type)?,
65                field_formats.format_for(idx),
66            );
67            if let Some(format_options) = &format_options {
68                field_info = field_info.with_format_options(format_options.clone());
69            }
70            Ok(field_info)
71        })
72        .collect::<Result<Vec<FieldInfo>>>()
73}
74
75/// this function will encode greptime's `StructValue` into PostgreSQL jsonb type
76///
77/// Note that greptimedb has different types of StructValue for storing json data,
78/// based on policy defined in `JsonStructureSettings`. But here the `StructValue`
79/// should be fully structured.
80///
81/// there are alternatives like records, arrays, etc. but there are also limitations:
82/// records: there is no support for include keys
83/// arrays: element in array must be the same type
84fn encode_struct(
85    _query_ctx: &QueryContextRef,
86    struct_value: StructValue,
87    builder: &mut DataRowEncoder,
88) -> PgWireResult<()> {
89    let encoding_setting = JsonStructureSettings::Structured(None);
90    let json_value = encoding_setting
91        .decode(Value::Struct(struct_value))
92        .map_err(|e| PgWireError::ApiError(Box::new(e)))?;
93
94    builder.encode_field(&json_value)
95}
96
97pub(crate) struct RecordBatchRowIterator {
98    query_ctx: QueryContextRef,
99    pg_schema: Arc<Vec<FieldInfo>>,
100    schema: SchemaRef,
101    record_batch: arrow::record_batch::RecordBatch,
102    i: usize,
103}
104
105impl Iterator for RecordBatchRowIterator {
106    type Item = PgWireResult<DataRow>;
107
108    fn next(&mut self) -> Option<Self::Item> {
109        let mut encoder = DataRowEncoder::new(self.pg_schema.clone());
110        if self.i < self.record_batch.num_rows() {
111            if let Err(e) = self.encode_row(self.i, &mut encoder) {
112                return Some(Err(e));
113            }
114            self.i += 1;
115            Some(Ok(encoder.take_row()))
116        } else {
117            None
118        }
119    }
120}
121
122impl RecordBatchRowIterator {
123    pub(crate) fn new(
124        query_ctx: QueryContextRef,
125        pg_schema: Arc<Vec<FieldInfo>>,
126        record_batch: RecordBatch,
127    ) -> Self {
128        let schema = record_batch.schema.clone();
129        let record_batch = record_batch.into_df_record_batch();
130        Self {
131            query_ctx,
132            pg_schema,
133            schema,
134            record_batch,
135            i: 0,
136        }
137    }
138
139    fn encode_row(&mut self, i: usize, encoder: &mut DataRowEncoder) -> PgWireResult<()> {
140        let arrow_schema = self.record_batch.schema();
141        for (j, column) in self.record_batch.columns().iter().enumerate() {
142            if column.is_null(i) {
143                encoder.encode_field(&None::<&i8>)?;
144                continue;
145            }
146
147            let pg_field = &self.pg_schema[j];
148            match column.data_type() {
149                // these types are greptimedb specific or custom
150                DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
151                    // jsonb
152                    if let ConcreteDataType::Json(_) = &self.schema.column_schemas()[j].data_type {
153                        let v = datatypes::arrow_array::binary_array_value(column, i);
154                        let s = jsonb_to_string(v).map_err(convert_err)?;
155                        encoder.encode_field(&s)?;
156                    } else {
157                        // bytea
158                        let arrow_field = arrow_schema.field(j);
159                        encode_value(encoder, column, i, arrow_field, pg_field)?;
160                    }
161                }
162
163                DataType::List(_) => {
164                    let array = column.as_list::<i32>();
165                    let items = array.value(i);
166
167                    encode_list(encoder, items, pg_field)?;
168                }
169                DataType::Struct(_) => {
170                    encode_struct(&self.query_ctx, Default::default(), encoder)?;
171                }
172                _ => {
173                    // Encode value using arrow-pg
174                    let arrow_field = arrow_schema.field(j);
175                    encode_value(encoder, column, i, arrow_field, pg_field)?;
176                }
177            }
178        }
179        Ok(())
180    }
181}
182
183pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
184    match origin {
185        &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
186        &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
187        &ConcreteDataType::Int8(_) => Ok(Type::CHAR),
188        &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2),
189        &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4),
190        &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8),
191        &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC),
192        &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
193        &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
194        &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
195        &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
196        &ConcreteDataType::Date(_) => Ok(Type::DATE),
197        &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
198        &ConcreteDataType::Time(_) => Ok(Type::TIME),
199        &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
200        &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
201        &ConcreteDataType::Json(_) => Ok(Type::JSON),
202        ConcreteDataType::List(list) => match list.item_type() {
203            &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
204            &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
205            &ConcreteDataType::Int8(_) => Ok(Type::CHAR_ARRAY),
206            &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2_ARRAY),
207            &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4_ARRAY),
208            &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8_ARRAY),
209            &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC_ARRAY),
210            &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
211            &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
212            &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
213            &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
214            &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
215            &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
216            &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
217            &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
218            &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
219            &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
220            &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
221            &ConcreteDataType::Struct(_) => Ok(Type::JSON_ARRAY),
222            &ConcreteDataType::Dictionary(_)
223            | &ConcreteDataType::Vector(_)
224            | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
225                data_type: origin,
226                reason: "not implemented",
227            }
228            .fail(),
229        },
230        &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
231            data_type: origin,
232            reason: "not implemented",
233        }
234        .fail(),
235        &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
236        &ConcreteDataType::Struct(_) => Ok(Type::JSON),
237    }
238}
239
240#[allow(dead_code)]
241pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
242    // Note that we only support a small amount of pg data types
243    match origin {
244        &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
245        &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
246        &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
247        &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
248        &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
249        &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
250        &Type::TIMESTAMP | &Type::TIMESTAMPTZ => Ok(ConcreteDataType::timestamp_datatype(
251            common_time::timestamp::TimeUnit::Millisecond,
252        )),
253        &Type::DATE => Ok(ConcreteDataType::date_datatype()),
254        &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
255            common_time::timestamp::TimeUnit::Microsecond,
256        )),
257        &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
258            ConcreteDataType::int8_datatype(),
259        ))),
260        &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
261            ConcreteDataType::int16_datatype(),
262        ))),
263        &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
264            ConcreteDataType::int32_datatype(),
265        ))),
266        &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
267            ConcreteDataType::int64_datatype(),
268        ))),
269        &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
270            ConcreteDataType::string_datatype(),
271        ))),
272        _ => server_error::InternalSnafu {
273            err_msg: format!("unimplemented datatype {origin:?}"),
274        }
275        .fail(),
276    }
277}
278
279pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
280    // the index is managed from portal's parameters count so it's safe to
281    // unwrap here.
282    let param_type = portal
283        .statement
284        .parameter_types
285        .get(idx)
286        .unwrap()
287        .as_ref()
288        .unwrap_or(&Type::UNKNOWN);
289    match param_type {
290        &Type::VARCHAR | &Type::TEXT => Ok(format!(
291            "'{}'",
292            portal
293                .parameter::<String>(idx, param_type)?
294                .as_deref()
295                .unwrap_or("")
296        )),
297        &Type::BOOL => Ok(portal
298            .parameter::<bool>(idx, param_type)?
299            .map(|v| v.to_string())
300            .unwrap_or_else(|| "".to_owned())),
301        &Type::INT4 => Ok(portal
302            .parameter::<i32>(idx, param_type)?
303            .map(|v| v.to_string())
304            .unwrap_or_else(|| "".to_owned())),
305        &Type::INT8 => Ok(portal
306            .parameter::<i64>(idx, param_type)?
307            .map(|v| v.to_string())
308            .unwrap_or_else(|| "".to_owned())),
309        &Type::FLOAT4 => Ok(portal
310            .parameter::<f32>(idx, param_type)?
311            .map(|v| v.to_string())
312            .unwrap_or_else(|| "".to_owned())),
313        &Type::FLOAT8 => Ok(portal
314            .parameter::<f64>(idx, param_type)?
315            .map(|v| v.to_string())
316            .unwrap_or_else(|| "".to_owned())),
317        &Type::DATE => Ok(portal
318            .parameter::<NaiveDate>(idx, param_type)?
319            .map(|v| v.format("%Y-%m-%d").to_string())
320            .unwrap_or_else(|| "".to_owned())),
321        &Type::TIMESTAMP => Ok(portal
322            .parameter::<NaiveDateTime>(idx, param_type)?
323            .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
324            .unwrap_or_else(|| "".to_owned())),
325        &Type::INTERVAL => Ok(portal
326            .parameter::<PgInterval>(idx, param_type)?
327            .map(|v| v.to_sql())
328            .unwrap_or_else(|| "".to_owned())),
329        _ => Err(invalid_parameter_error(
330            "unsupported_parameter_type",
331            Some(param_type.to_string()),
332        )),
333    }
334}
335
336pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
337    let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
338    error_info.detail = detail;
339    PgWireError::UserError(Box::new(error_info))
340}
341
342fn to_timestamp_scalar_value<T>(
343    data: Option<T>,
344    unit: &TimestampType,
345    ctype: &ConcreteDataType,
346) -> PgWireResult<ScalarValue>
347where
348    T: Into<i64>,
349{
350    if let Some(n) = data {
351        Value::Timestamp(unit.create_timestamp(n.into()))
352            .try_to_scalar_value(ctype)
353            .map_err(convert_err)
354    } else {
355        Ok(ScalarValue::Null)
356    }
357}
358
359pub(super) fn parameters_to_scalar_values(
360    plan: &LogicalPlan,
361    portal: &Portal<SqlPlan>,
362) -> PgWireResult<Vec<ScalarValue>> {
363    let param_count = portal.parameter_len();
364    let mut results = Vec::with_capacity(param_count);
365
366    let client_param_types = &portal.statement.parameter_types;
367    let server_param_types = plan
368        .get_parameter_types()
369        .context(DataFusionSnafu)
370        .map_err(convert_err)?
371        .into_iter()
372        .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
373        .collect::<HashMap<_, _>>();
374
375    for idx in 0..param_count {
376        let server_type = server_param_types
377            .get(&format!("${}", idx + 1))
378            .and_then(|t| t.as_ref());
379
380        let client_type = if let Some(Some(client_given_type)) = client_param_types.get(idx) {
381            client_given_type.clone()
382        } else if let Some(server_provided_type) = &server_type {
383            type_gt_to_pg(server_provided_type).map_err(convert_err)?
384        } else {
385            return Err(invalid_parameter_error(
386                "unknown_parameter_type",
387                Some(format!(
388                    "Cannot get parameter type information for parameter {}",
389                    idx
390                )),
391            ));
392        };
393
394        let value = match &client_type {
395            &Type::VARCHAR | &Type::TEXT => {
396                let data = portal.parameter::<String>(idx, &client_type)?;
397                if let Some(server_type) = &server_type {
398                    match server_type {
399                        ConcreteDataType::String(t) => {
400                            if t.is_large() {
401                                ScalarValue::LargeUtf8(data)
402                            } else {
403                                ScalarValue::Utf8(data)
404                            }
405                        }
406                        _ => {
407                            return Err(invalid_parameter_error(
408                                "invalid_parameter_type",
409                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
410                            ));
411                        }
412                    }
413                } else {
414                    ScalarValue::Utf8(data)
415                }
416            }
417            &Type::BOOL => {
418                let data = portal.parameter::<bool>(idx, &client_type)?;
419                if let Some(server_type) = &server_type {
420                    match server_type {
421                        ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
422                        _ => {
423                            return Err(invalid_parameter_error(
424                                "invalid_parameter_type",
425                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
426                            ));
427                        }
428                    }
429                } else {
430                    ScalarValue::Boolean(data)
431                }
432            }
433            &Type::INT2 => {
434                let data = portal.parameter::<i16>(idx, &client_type)?;
435                if let Some(server_type) = &server_type {
436                    match server_type {
437                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
438                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
439                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
440                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
441                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
442                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
443                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
444                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
445                        ConcreteDataType::Timestamp(unit) => {
446                            to_timestamp_scalar_value(data, unit, server_type)?
447                        }
448                        _ => {
449                            return Err(invalid_parameter_error(
450                                "invalid_parameter_type",
451                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
452                            ));
453                        }
454                    }
455                } else {
456                    ScalarValue::Int16(data)
457                }
458            }
459            &Type::INT4 => {
460                let data = portal.parameter::<i32>(idx, &client_type)?;
461                if let Some(server_type) = &server_type {
462                    match server_type {
463                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
464                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
465                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
466                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
467                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
468                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
469                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
470                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
471                        ConcreteDataType::Timestamp(unit) => {
472                            to_timestamp_scalar_value(data, unit, server_type)?
473                        }
474                        _ => {
475                            return Err(invalid_parameter_error(
476                                "invalid_parameter_type",
477                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
478                            ));
479                        }
480                    }
481                } else {
482                    ScalarValue::Int32(data)
483                }
484            }
485            &Type::INT8 => {
486                let data = portal.parameter::<i64>(idx, &client_type)?;
487                if let Some(server_type) = &server_type {
488                    match server_type {
489                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
490                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
491                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
492                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
493                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
494                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
495                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
496                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
497                        ConcreteDataType::Timestamp(unit) => {
498                            to_timestamp_scalar_value(data, unit, server_type)?
499                        }
500                        _ => {
501                            return Err(invalid_parameter_error(
502                                "invalid_parameter_type",
503                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
504                            ));
505                        }
506                    }
507                } else {
508                    ScalarValue::Int64(data)
509                }
510            }
511            &Type::FLOAT4 => {
512                let data = portal.parameter::<f32>(idx, &client_type)?;
513                if let Some(server_type) = &server_type {
514                    match server_type {
515                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
516                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
517                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
518                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
519                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
520                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
521                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
522                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
523                        ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
524                        ConcreteDataType::Float64(_) => {
525                            ScalarValue::Float64(data.map(|n| n as f64))
526                        }
527                        _ => {
528                            return Err(invalid_parameter_error(
529                                "invalid_parameter_type",
530                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
531                            ));
532                        }
533                    }
534                } else {
535                    ScalarValue::Float32(data)
536                }
537            }
538            &Type::FLOAT8 => {
539                let data = portal.parameter::<f64>(idx, &client_type)?;
540                if let Some(server_type) = &server_type {
541                    match server_type {
542                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
543                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
544                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
545                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
546                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
547                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
548                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
549                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
550                        ConcreteDataType::Float32(_) => {
551                            ScalarValue::Float32(data.map(|n| n as f32))
552                        }
553                        ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
554                        _ => {
555                            return Err(invalid_parameter_error(
556                                "invalid_parameter_type",
557                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
558                            ));
559                        }
560                    }
561                } else {
562                    ScalarValue::Float64(data)
563                }
564            }
565            &Type::TIMESTAMP => {
566                let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
567                if let Some(server_type) = &server_type {
568                    match server_type {
569                        ConcreteDataType::Timestamp(unit) => match *unit {
570                            TimestampType::Second(_) => ScalarValue::TimestampSecond(
571                                data.map(|ts| ts.and_utc().timestamp()),
572                                None,
573                            ),
574                            TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
575                                data.map(|ts| ts.and_utc().timestamp_millis()),
576                                None,
577                            ),
578                            TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
579                                data.map(|ts| ts.and_utc().timestamp_micros()),
580                                None,
581                            ),
582                            TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
583                                data.and_then(|ts| ts.and_utc().timestamp_nanos_opt()),
584                                None,
585                            ),
586                        },
587                        _ => {
588                            return Err(invalid_parameter_error(
589                                "invalid_parameter_type",
590                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
591                            ));
592                        }
593                    }
594                } else {
595                    ScalarValue::TimestampMillisecond(
596                        data.map(|ts| ts.and_utc().timestamp_millis()),
597                        None,
598                    )
599                }
600            }
601            &Type::TIMESTAMPTZ => {
602                let data = portal.parameter::<DateTime<FixedOffset>>(idx, &client_type)?;
603                if let Some(server_type) = &server_type {
604                    match server_type {
605                        ConcreteDataType::Timestamp(unit) => match *unit {
606                            TimestampType::Second(_) => {
607                                ScalarValue::TimestampSecond(data.map(|ts| ts.timestamp()), None)
608                            }
609                            TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
610                                data.map(|ts| ts.timestamp_millis()),
611                                None,
612                            ),
613                            TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
614                                data.map(|ts| ts.timestamp_micros()),
615                                None,
616                            ),
617                            TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
618                                data.and_then(|ts| ts.timestamp_nanos_opt()),
619                                None,
620                            ),
621                        },
622                        _ => {
623                            return Err(invalid_parameter_error(
624                                "invalid_parameter_type",
625                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
626                            ));
627                        }
628                    }
629                } else {
630                    ScalarValue::TimestampMillisecond(data.map(|ts| ts.timestamp_millis()), None)
631                }
632            }
633            &Type::DATE => {
634                let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
635                if let Some(server_type) = &server_type {
636                    match server_type {
637                        ConcreteDataType::Date(_) => ScalarValue::Date32(
638                            data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
639                        ),
640                        _ => {
641                            return Err(invalid_parameter_error(
642                                "invalid_parameter_type",
643                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
644                            ));
645                        }
646                    }
647                } else {
648                    ScalarValue::Date32(
649                        data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
650                    )
651                }
652            }
653            &Type::INTERVAL => {
654                let data = portal.parameter::<PgInterval>(idx, &client_type)?;
655                if let Some(server_type) = &server_type {
656                    match server_type {
657                        ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
658                            ScalarValue::IntervalYearMonth(
659                                data.map(|i| {
660                                    if i.days != 0 || i.microseconds != 0 {
661                                        Err(invalid_parameter_error(
662                                            "invalid_parameter_type",
663                                            Some(format!(
664                                                "Expected: {}, found: {}",
665                                                server_type, client_type
666                                            )),
667                                        ))
668                                    } else {
669                                        Ok(IntervalYearMonth::new(i.months).to_i32())
670                                    }
671                                })
672                                .transpose()?,
673                            )
674                        }
675                        ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
676                            ScalarValue::IntervalDayTime(
677                                data.map(|i| {
678                                    if i.months != 0 || i.microseconds % 1000 != 0 {
679                                        Err(invalid_parameter_error(
680                                            "invalid_parameter_type",
681                                            Some(format!(
682                                                "Expected: {}, found: {}",
683                                                server_type, client_type
684                                            )),
685                                        ))
686                                    } else {
687                                        Ok(IntervalDayTime::new(
688                                            i.days,
689                                            (i.microseconds / 1000) as i32,
690                                        )
691                                        .into())
692                                    }
693                                })
694                                .transpose()?,
695                            )
696                        }
697                        ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
698                            ScalarValue::IntervalMonthDayNano(data.map(|i| {
699                                IntervalMonthDayNano::new(
700                                    i.months,
701                                    i.days,
702                                    i.microseconds * 1_000i64,
703                                )
704                                .into()
705                            }))
706                        }
707                        _ => {
708                            return Err(invalid_parameter_error(
709                                "invalid_parameter_type",
710                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
711                            ));
712                        }
713                    }
714                } else {
715                    ScalarValue::IntervalMonthDayNano(data.map(|i| {
716                        IntervalMonthDayNano::new(i.months, i.days, i.microseconds * 1_000i64)
717                            .into()
718                    }))
719                }
720            }
721            &Type::BYTEA => {
722                let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
723                if let Some(server_type) = &server_type {
724                    match server_type {
725                        ConcreteDataType::String(t) => {
726                            let s = data.map(|d| String::from_utf8_lossy(&d).to_string());
727                            if t.is_large() {
728                                ScalarValue::LargeUtf8(s)
729                            } else {
730                                ScalarValue::Utf8(s)
731                            }
732                        }
733                        ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
734                        _ => {
735                            return Err(invalid_parameter_error(
736                                "invalid_parameter_type",
737                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
738                            ));
739                        }
740                    }
741                } else {
742                    ScalarValue::Binary(data)
743                }
744            }
745            &Type::JSONB => {
746                let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
747                if let Some(server_type) = &server_type {
748                    match server_type {
749                        ConcreteDataType::Binary(_) => {
750                            ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
751                        }
752                        _ => {
753                            return Err(invalid_parameter_error(
754                                "invalid_parameter_type",
755                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
756                            ));
757                        }
758                    }
759                } else {
760                    ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
761                }
762            }
763            &Type::INT2_ARRAY => {
764                let data = portal.parameter::<Vec<i16>>(idx, &client_type)?;
765                if let Some(data) = data {
766                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
767                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
768                } else {
769                    ScalarValue::Null
770                }
771            }
772            &Type::INT4_ARRAY => {
773                let data = portal.parameter::<Vec<i32>>(idx, &client_type)?;
774                if let Some(data) = data {
775                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
776                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
777                } else {
778                    ScalarValue::Null
779                }
780            }
781            &Type::INT8_ARRAY => {
782                let data = portal.parameter::<Vec<i64>>(idx, &client_type)?;
783                if let Some(data) = data {
784                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
785                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
786                } else {
787                    ScalarValue::Null
788                }
789            }
790            &Type::VARCHAR_ARRAY => {
791                let data = portal.parameter::<Vec<String>>(idx, &client_type)?;
792                if let Some(data) = data {
793                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
794                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
795                } else {
796                    ScalarValue::Null
797                }
798            }
799            &Type::TIMESTAMP_ARRAY => {
800                let data = portal.parameter::<Vec<NaiveDateTime>>(idx, &client_type)?;
801                if let Some(data) = data {
802                    if let Some(ConcreteDataType::List(list_type)) = &server_type {
803                        match list_type.item_type() {
804                            ConcreteDataType::Timestamp(unit) => match *unit {
805                                TimestampType::Second(_) => {
806                                    let values = data
807                                        .into_iter()
808                                        .map(|ts| {
809                                            ScalarValue::TimestampSecond(
810                                                Some(ts.and_utc().timestamp()),
811                                                None,
812                                            )
813                                        })
814                                        .collect::<Vec<_>>();
815                                    ScalarValue::List(ScalarValue::new_list(
816                                        &values,
817                                        &ArrowDataType::Timestamp(TimeUnit::Second, None),
818                                        true,
819                                    ))
820                                }
821                                TimestampType::Millisecond(_) => {
822                                    let values = data
823                                        .into_iter()
824                                        .map(|ts| {
825                                            ScalarValue::TimestampMillisecond(
826                                                Some(ts.and_utc().timestamp_millis()),
827                                                None,
828                                            )
829                                        })
830                                        .collect::<Vec<_>>();
831                                    ScalarValue::List(ScalarValue::new_list(
832                                        &values,
833                                        &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
834                                        true,
835                                    ))
836                                }
837                                TimestampType::Microsecond(_) => {
838                                    let values = data
839                                        .into_iter()
840                                        .map(|ts| {
841                                            ScalarValue::TimestampMicrosecond(
842                                                Some(ts.and_utc().timestamp_micros()),
843                                                None,
844                                            )
845                                        })
846                                        .collect::<Vec<_>>();
847                                    ScalarValue::List(ScalarValue::new_list(
848                                        &values,
849                                        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
850                                        true,
851                                    ))
852                                }
853                                TimestampType::Nanosecond(_) => {
854                                    let values = data
855                                        .into_iter()
856                                        .filter_map(|ts| {
857                                            ts.and_utc().timestamp_nanos_opt().map(|nanos| {
858                                                ScalarValue::TimestampNanosecond(Some(nanos), None)
859                                            })
860                                        })
861                                        .collect::<Vec<_>>();
862                                    ScalarValue::List(ScalarValue::new_list(
863                                        &values,
864                                        &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
865                                        true,
866                                    ))
867                                }
868                            },
869                            _ => {
870                                return Err(invalid_parameter_error(
871                                    "invalid_parameter_type",
872                                    Some(format!(
873                                        "Expected: {}, found: {}",
874                                        list_type.item_type(),
875                                        client_type
876                                    )),
877                                ));
878                            }
879                        }
880                    } else {
881                        // Default to millisecond when no server type is specified
882                        let values = data
883                            .into_iter()
884                            .map(|ts| {
885                                ScalarValue::TimestampMillisecond(
886                                    Some(ts.and_utc().timestamp_millis()),
887                                    None,
888                                )
889                            })
890                            .collect::<Vec<_>>();
891                        ScalarValue::List(ScalarValue::new_list(
892                            &values,
893                            &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
894                            true,
895                        ))
896                    }
897                } else {
898                    ScalarValue::Null
899                }
900            }
901            &Type::TIMESTAMPTZ_ARRAY => {
902                let data = portal.parameter::<Vec<DateTime<FixedOffset>>>(idx, &client_type)?;
903                if let Some(data) = data {
904                    if let Some(ConcreteDataType::List(list_type)) = &server_type {
905                        match list_type.item_type() {
906                            ConcreteDataType::Timestamp(unit) => match *unit {
907                                TimestampType::Second(_) => {
908                                    let values = data
909                                        .into_iter()
910                                        .map(|ts| {
911                                            ScalarValue::TimestampSecond(Some(ts.timestamp()), None)
912                                        })
913                                        .collect::<Vec<_>>();
914                                    ScalarValue::List(ScalarValue::new_list(
915                                        &values,
916                                        &ArrowDataType::Timestamp(TimeUnit::Second, None),
917                                        true,
918                                    ))
919                                }
920                                TimestampType::Millisecond(_) => {
921                                    let values = data
922                                        .into_iter()
923                                        .map(|ts| {
924                                            ScalarValue::TimestampMillisecond(
925                                                Some(ts.timestamp_millis()),
926                                                None,
927                                            )
928                                        })
929                                        .collect::<Vec<_>>();
930                                    ScalarValue::List(ScalarValue::new_list(
931                                        &values,
932                                        &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
933                                        true,
934                                    ))
935                                }
936                                TimestampType::Microsecond(_) => {
937                                    let values = data
938                                        .into_iter()
939                                        .map(|ts| {
940                                            ScalarValue::TimestampMicrosecond(
941                                                Some(ts.timestamp_micros()),
942                                                None,
943                                            )
944                                        })
945                                        .collect::<Vec<_>>();
946                                    ScalarValue::List(ScalarValue::new_list(
947                                        &values,
948                                        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
949                                        true,
950                                    ))
951                                }
952                                TimestampType::Nanosecond(_) => {
953                                    let values = data
954                                        .into_iter()
955                                        .filter_map(|ts| {
956                                            ts.timestamp_nanos_opt().map(|nanos| {
957                                                ScalarValue::TimestampNanosecond(Some(nanos), None)
958                                            })
959                                        })
960                                        .collect::<Vec<_>>();
961                                    ScalarValue::List(ScalarValue::new_list(
962                                        &values,
963                                        &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
964                                        true,
965                                    ))
966                                }
967                            },
968                            _ => {
969                                return Err(invalid_parameter_error(
970                                    "invalid_parameter_type",
971                                    Some(format!(
972                                        "Expected: {}, found: {}",
973                                        list_type.item_type(),
974                                        client_type
975                                    )),
976                                ));
977                            }
978                        }
979                    } else {
980                        // Default to millisecond when no server type is specified
981                        let values = data
982                            .into_iter()
983                            .map(|ts| {
984                                ScalarValue::TimestampMillisecond(Some(ts.timestamp_millis()), None)
985                            })
986                            .collect::<Vec<_>>();
987                        ScalarValue::List(ScalarValue::new_list(
988                            &values,
989                            &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
990                            true,
991                        ))
992                    }
993                } else {
994                    ScalarValue::Null
995                }
996            }
997            _ => Err(invalid_parameter_error(
998                "unsupported_parameter_value",
999                Some(format!("Found type: {}", client_type)),
1000            ))?,
1001        };
1002
1003        results.push(value);
1004    }
1005
1006    Ok(results)
1007}
1008
1009pub(super) fn param_types_to_pg_types(
1010    param_types: &HashMap<String, Option<ConcreteDataType>>,
1011) -> Result<Vec<Type>> {
1012    let param_count = param_types.len();
1013    let mut types = Vec::with_capacity(param_count);
1014    for i in 0..param_count {
1015        if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1016            let pg_type = type_gt_to_pg(param_type)?;
1017            types.push(pg_type);
1018        } else {
1019            types.push(Type::UNKNOWN);
1020        }
1021    }
1022    Ok(types)
1023}
1024
1025pub fn format_options_from_query_ctx(query_ctx: &QueryContextRef) -> Arc<PgFormatOptions> {
1026    let config = query_ctx.configuration_parameter();
1027    let (date_style, date_order) = *config.pg_datetime_style();
1028
1029    let mut format_options = PgFormatOptions::default();
1030    format_options.date_style = format!("{}, {}", date_style, date_order);
1031    format_options.interval_style = config.pg_intervalstyle_format().to_string();
1032    format_options.bytea_output = config.postgres_bytea_output().to_string();
1033    format_options.time_zone = query_ctx.timezone().to_string();
1034
1035    Arc::new(format_options)
1036}
1037
1038#[cfg(test)]
1039mod test {
1040    use std::sync::Arc;
1041
1042    use arrow::array::{
1043        Float64Builder, Int64Builder, ListBuilder, StringBuilder, TimestampSecondBuilder,
1044    };
1045    use arrow_schema::{Field, IntervalUnit};
1046    use datatypes::schema::{ColumnSchema, Schema};
1047    use datatypes::vectors::{
1048        BinaryVector, BooleanVector, DateVector, Float32Vector, Float64Vector, Int8Vector,
1049        Int16Vector, Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
1050        IntervalYearMonthVector, ListVector, NullVector, StringVector, TimeSecondVector,
1051        TimestampSecondVector, UInt8Vector, UInt16Vector, UInt32Vector, UInt64Vector, VectorRef,
1052    };
1053    use pgwire::api::Type;
1054    use pgwire::api::results::{FieldFormat, FieldInfo};
1055    use session::context::QueryContextBuilder;
1056
1057    use super::*;
1058
1059    #[test]
1060    fn test_schema_convert() {
1061        let column_schemas = vec![
1062            ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1063            ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1064            ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1065            ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1066            ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1067            ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1068            ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1069            ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1070            ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1071            ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1072            ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1073            ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1074            ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1075            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1076            ColumnSchema::new(
1077                "timestamps",
1078                ConcreteDataType::timestamp_millisecond_datatype(),
1079                true,
1080            ),
1081            ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1082            ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1083            ColumnSchema::new(
1084                "intervals",
1085                ConcreteDataType::interval_month_day_nano_datatype(),
1086                true,
1087            ),
1088        ];
1089        let pg_field_info = vec![
1090            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1091            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1092            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1093            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1094            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1095            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1096            FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1097            FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1098            FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1099            FieldInfo::new(
1100                "uint64s".into(),
1101                None,
1102                None,
1103                Type::NUMERIC,
1104                FieldFormat::Text,
1105            ),
1106            FieldInfo::new(
1107                "float32s".into(),
1108                None,
1109                None,
1110                Type::FLOAT4,
1111                FieldFormat::Text,
1112            ),
1113            FieldInfo::new(
1114                "float64s".into(),
1115                None,
1116                None,
1117                Type::FLOAT8,
1118                FieldFormat::Text,
1119            ),
1120            FieldInfo::new(
1121                "binaries".into(),
1122                None,
1123                None,
1124                Type::BYTEA,
1125                FieldFormat::Text,
1126            ),
1127            FieldInfo::new(
1128                "strings".into(),
1129                None,
1130                None,
1131                Type::VARCHAR,
1132                FieldFormat::Text,
1133            ),
1134            FieldInfo::new(
1135                "timestamps".into(),
1136                None,
1137                None,
1138                Type::TIMESTAMP,
1139                FieldFormat::Text,
1140            ),
1141            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1142            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1143            FieldInfo::new(
1144                "intervals".into(),
1145                None,
1146                None,
1147                Type::INTERVAL,
1148                FieldFormat::Text,
1149            ),
1150        ];
1151        let schema = Schema::new(column_schemas);
1152        let fs = schema_to_pg(&schema, &Format::UnifiedText, None).unwrap();
1153        assert_eq!(fs, pg_field_info);
1154    }
1155
1156    #[test]
1157    fn test_encode_text_format_data() {
1158        let schema = vec![
1159            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1160            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1161            FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1162            FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1163            FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1164            FieldInfo::new(
1165                "uint64s".into(),
1166                None,
1167                None,
1168                Type::NUMERIC,
1169                FieldFormat::Text,
1170            ),
1171            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1172            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1173            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1174            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1175            FieldInfo::new(
1176                "float32s".into(),
1177                None,
1178                None,
1179                Type::FLOAT4,
1180                FieldFormat::Text,
1181            ),
1182            FieldInfo::new(
1183                "float64s".into(),
1184                None,
1185                None,
1186                Type::FLOAT8,
1187                FieldFormat::Text,
1188            ),
1189            FieldInfo::new(
1190                "strings".into(),
1191                None,
1192                None,
1193                Type::VARCHAR,
1194                FieldFormat::Text,
1195            ),
1196            FieldInfo::new(
1197                "binaries".into(),
1198                None,
1199                None,
1200                Type::BYTEA,
1201                FieldFormat::Text,
1202            ),
1203            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1204            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1205            FieldInfo::new(
1206                "timestamps".into(),
1207                None,
1208                None,
1209                Type::TIMESTAMP,
1210                FieldFormat::Text,
1211            ),
1212            FieldInfo::new(
1213                "interval_year_month".into(),
1214                None,
1215                None,
1216                Type::INTERVAL,
1217                FieldFormat::Text,
1218            ),
1219            FieldInfo::new(
1220                "interval_day_time".into(),
1221                None,
1222                None,
1223                Type::INTERVAL,
1224                FieldFormat::Text,
1225            ),
1226            FieldInfo::new(
1227                "interval_month_day_nano".into(),
1228                None,
1229                None,
1230                Type::INTERVAL,
1231                FieldFormat::Text,
1232            ),
1233            FieldInfo::new(
1234                "int_list".into(),
1235                None,
1236                None,
1237                Type::INT8_ARRAY,
1238                FieldFormat::Text,
1239            ),
1240            FieldInfo::new(
1241                "float_list".into(),
1242                None,
1243                None,
1244                Type::FLOAT8_ARRAY,
1245                FieldFormat::Text,
1246            ),
1247            FieldInfo::new(
1248                "string_list".into(),
1249                None,
1250                None,
1251                Type::VARCHAR_ARRAY,
1252                FieldFormat::Text,
1253            ),
1254            FieldInfo::new(
1255                "timestamp_list".into(),
1256                None,
1257                None,
1258                Type::TIMESTAMP_ARRAY,
1259                FieldFormat::Text,
1260            ),
1261        ];
1262
1263        let arrow_schema = arrow_schema::Schema::new(vec![
1264            Field::new("x", DataType::Null, true),
1265            Field::new("x", DataType::Boolean, true),
1266            Field::new("x", DataType::UInt8, true),
1267            Field::new("x", DataType::UInt16, true),
1268            Field::new("x", DataType::UInt32, true),
1269            Field::new("x", DataType::UInt64, true),
1270            Field::new("x", DataType::Int8, true),
1271            Field::new("x", DataType::Int16, true),
1272            Field::new("x", DataType::Int32, true),
1273            Field::new("x", DataType::Int64, true),
1274            Field::new("x", DataType::Float32, true),
1275            Field::new("x", DataType::Float64, true),
1276            Field::new("x", DataType::Utf8, true),
1277            Field::new("x", DataType::Binary, true),
1278            Field::new("x", DataType::Date32, true),
1279            Field::new("x", DataType::Time32(TimeUnit::Second), true),
1280            Field::new("x", DataType::Timestamp(TimeUnit::Second, None), true),
1281            Field::new("x", DataType::Interval(IntervalUnit::YearMonth), true),
1282            Field::new("x", DataType::Interval(IntervalUnit::DayTime), true),
1283            Field::new("x", DataType::Interval(IntervalUnit::MonthDayNano), true),
1284            Field::new(
1285                "x",
1286                DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
1287                true,
1288            ),
1289            Field::new(
1290                "x",
1291                DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
1292                true,
1293            ),
1294            Field::new(
1295                "x",
1296                DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
1297                true,
1298            ),
1299            Field::new(
1300                "x",
1301                DataType::List(Arc::new(Field::new(
1302                    "item",
1303                    DataType::Timestamp(TimeUnit::Second, None),
1304                    true,
1305                ))),
1306                true,
1307            ),
1308        ]);
1309
1310        let mut builder = ListBuilder::new(Int64Builder::new());
1311        builder.append_value([Some(1i64), None, Some(2)]);
1312        builder.append_null();
1313        builder.append_value([Some(-1i64), None, Some(-2)]);
1314        let i64_list_array = builder.finish();
1315
1316        let mut builder = ListBuilder::new(Float64Builder::new());
1317        builder.append_value([Some(1.0f64), None, Some(2.0)]);
1318        builder.append_null();
1319        builder.append_value([Some(-1.0f64), None, Some(-2.0)]);
1320        let f64_list_array = builder.finish();
1321
1322        let mut builder = ListBuilder::new(StringBuilder::new());
1323        builder.append_value([Some("a"), None, Some("b")]);
1324        builder.append_null();
1325        builder.append_value([Some("c"), None, Some("d")]);
1326        let string_list_array = builder.finish();
1327
1328        let mut builder = ListBuilder::new(TimestampSecondBuilder::new());
1329        builder.append_value([Some(1i64), None, Some(2)]);
1330        builder.append_null();
1331        builder.append_value([Some(3i64), None, Some(4)]);
1332        let timestamp_list_array = builder.finish();
1333
1334        let values = vec![
1335            Arc::new(NullVector::new(3)) as VectorRef,
1336            Arc::new(BooleanVector::from(vec![Some(true), Some(false), None])),
1337            Arc::new(UInt8Vector::from(vec![Some(u8::MAX), Some(u8::MIN), None])),
1338            Arc::new(UInt16Vector::from(vec![
1339                Some(u16::MAX),
1340                Some(u16::MIN),
1341                None,
1342            ])),
1343            Arc::new(UInt32Vector::from(vec![
1344                Some(u32::MAX),
1345                Some(u32::MIN),
1346                None,
1347            ])),
1348            Arc::new(UInt64Vector::from(vec![
1349                Some(u64::MAX),
1350                Some(u64::MIN),
1351                None,
1352            ])),
1353            Arc::new(Int8Vector::from(vec![Some(i8::MAX), Some(i8::MIN), None])),
1354            Arc::new(Int16Vector::from(vec![
1355                Some(i16::MAX),
1356                Some(i16::MIN),
1357                None,
1358            ])),
1359            Arc::new(Int32Vector::from(vec![
1360                Some(i32::MAX),
1361                Some(i32::MIN),
1362                None,
1363            ])),
1364            Arc::new(Int64Vector::from(vec![
1365                Some(i64::MAX),
1366                Some(i64::MIN),
1367                None,
1368            ])),
1369            Arc::new(Float32Vector::from(vec![
1370                None,
1371                Some(f32::MAX),
1372                Some(f32::MIN),
1373            ])),
1374            Arc::new(Float64Vector::from(vec![
1375                None,
1376                Some(f64::MAX),
1377                Some(f64::MIN),
1378            ])),
1379            Arc::new(StringVector::from(vec![
1380                None,
1381                Some("hello"),
1382                Some("greptime"),
1383            ])),
1384            Arc::new(BinaryVector::from(vec![
1385                None,
1386                Some("hello".as_bytes().to_vec()),
1387                Some("world".as_bytes().to_vec()),
1388            ])),
1389            Arc::new(DateVector::from(vec![Some(1001), None, Some(1)])),
1390            Arc::new(TimeSecondVector::from(vec![Some(1001), None, Some(1)])),
1391            Arc::new(TimestampSecondVector::from(vec![
1392                Some(1000001),
1393                None,
1394                Some(1),
1395            ])),
1396            Arc::new(IntervalYearMonthVector::from(vec![Some(1), None, Some(2)])),
1397            Arc::new(IntervalDayTimeVector::from(vec![
1398                Some(arrow::datatypes::IntervalDayTime::new(1, 1)),
1399                None,
1400                Some(arrow::datatypes::IntervalDayTime::new(2, 2)),
1401            ])),
1402            Arc::new(IntervalMonthDayNanoVector::from(vec![
1403                Some(arrow::datatypes::IntervalMonthDayNano::new(1, 1, 10)),
1404                None,
1405                Some(arrow::datatypes::IntervalMonthDayNano::new(2, 2, 20)),
1406            ])),
1407            Arc::new(ListVector::from(i64_list_array)),
1408            Arc::new(ListVector::from(f64_list_array)),
1409            Arc::new(ListVector::from(string_list_array)),
1410            Arc::new(ListVector::from(timestamp_list_array)),
1411        ];
1412        let record_batch =
1413            RecordBatch::new(Arc::new(arrow_schema.try_into().unwrap()), values).unwrap();
1414
1415        let query_context = QueryContextBuilder::default()
1416            .configuration_parameter(Default::default())
1417            .build()
1418            .into();
1419        let schema = Arc::new(schema);
1420
1421        let rows = RecordBatchRowIterator::new(query_context, schema.clone(), record_batch)
1422            .filter_map(|x| x.ok())
1423            .collect::<Vec<_>>();
1424        assert_eq!(rows.len(), 3);
1425        for row in rows {
1426            assert_eq!(row.field_count, schema.len() as i16);
1427        }
1428    }
1429
1430    #[test]
1431    fn test_invalid_parameter() {
1432        // test for refactor with PgErrorCode
1433        let msg = "invalid_parameter_count";
1434        let error = invalid_parameter_error(msg, None);
1435        if let PgWireError::UserError(value) = error {
1436            assert_eq!("ERROR", value.severity);
1437            assert_eq!("22023", value.code);
1438            assert_eq!(msg, value.message);
1439        } else {
1440            panic!("test_invalid_parameter failed");
1441        }
1442    }
1443}