servers/postgres/
types.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod error;
16
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use arrow::array::{Array, AsArray};
23use arrow_pg::encoder::{Encoder, encode_value};
24use arrow_pg::list_encoder::encode_list;
25use arrow_schema::{DataType, TimeUnit};
26use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime};
27use common_recordbatch::RecordBatch;
28use common_recordbatch::error::Result as RecordBatchResult;
29use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::arrow::datatypes::DataType as ArrowDataType;
33use datatypes::json::JsonStructureSettings;
34use datatypes::prelude::{ConcreteDataType, Value};
35use datatypes::schema::{Schema, SchemaRef};
36use datatypes::types::{IntervalType, TimestampType, jsonb_to_string};
37use datatypes::value::StructValue;
38use futures::Stream;
39use pg_interval::Interval as PgInterval;
40use pgwire::api::Type;
41use pgwire::api::portal::{Format, Portal};
42use pgwire::api::results::FieldInfo;
43use pgwire::error::{PgWireError, PgWireResult};
44use pgwire::types::format::FormatOptions as PgFormatOptions;
45use query::planner::DfLogicalPlanner;
46use session::context::QueryContextRef;
47use snafu::ResultExt;
48
49pub use self::error::{PgErrorCode, PgErrorSeverity};
50use crate::error::{self as server_error, InferParameterTypesSnafu, Result};
51use crate::postgres::handler::PgSqlPlan;
52use crate::postgres::utils::convert_err;
53
54pub(super) fn schema_to_pg(
55    origin: &Schema,
56    field_formats: &Format,
57    format_options: Option<Arc<PgFormatOptions>>,
58) -> Result<Vec<FieldInfo>> {
59    origin
60        .column_schemas()
61        .iter()
62        .enumerate()
63        .map(|(idx, col)| {
64            let mut field_info = FieldInfo::new(
65                col.name.clone(),
66                None,
67                None,
68                type_gt_to_pg(&col.data_type)?,
69                field_formats.format_for(idx),
70            );
71            if let Some(format_options) = &format_options {
72                field_info = field_info.with_format_options(format_options.clone());
73            }
74            Ok(field_info)
75        })
76        .collect::<Result<Vec<FieldInfo>>>()
77}
78
79/// this function will encode greptime's `StructValue` into PostgreSQL jsonb type
80///
81/// Note that greptimedb has different types of StructValue for storing json data,
82/// based on policy defined in `JsonStructureSettings`. But here the `StructValue`
83/// should be fully structured.
84///
85/// there are alternatives like records, arrays, etc. but there are also limitations:
86/// records: there is no support for include keys
87/// arrays: element in array must be the same type
88fn encode_struct<S: Encoder>(
89    _query_ctx: &QueryContextRef,
90    struct_value: StructValue,
91    builder: &mut S,
92    pg_field: &FieldInfo,
93) -> PgWireResult<()> {
94    let encoding_setting = JsonStructureSettings::Structured(None);
95    let json_value = encoding_setting
96        .decode(Value::Struct(struct_value))
97        .map_err(|e| PgWireError::ApiError(Box::new(e)))?;
98
99    builder.encode_field(&json_value, pg_field)
100}
101
102pub(crate) struct RecordBatchRowStream<S, B>
103where
104    S: Encoder,
105    B: Stream<Item = RecordBatchResult<RecordBatch>>,
106{
107    query_ctx: QueryContextRef,
108    pg_schema: Arc<Vec<FieldInfo>>,
109    schema: SchemaRef,
110    record_batches: Pin<Box<B>>,
111    encoder: S,
112}
113
114impl<S, B> Stream for RecordBatchRowStream<S, B>
115where
116    S: Encoder + Unpin,
117    B: Stream<Item = RecordBatchResult<RecordBatch>>,
118{
119    type Item = PgWireResult<Vec<S::Item>>;
120
121    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122        match self.record_batches.as_mut().poll_next(cx) {
123            Poll::Ready(Some(Ok(batch))) => {
124                let record_batch = batch.into_df_record_batch();
125                let num_rows = record_batch.num_rows();
126
127                if num_rows == 0 {
128                    return Poll::Ready(Some(Ok(vec![])));
129                }
130
131                let arrow_schema = record_batch.schema();
132                let query_ctx = self.query_ctx.clone();
133                let pg_schema = self.pg_schema.clone();
134                let schema = self.schema.clone();
135                let mut results = Vec::with_capacity(num_rows);
136
137                for i in 0..num_rows {
138                    if let Err(e) = Self::encode_row(
139                        &query_ctx,
140                        &pg_schema,
141                        &schema,
142                        arrow_schema.as_ref(),
143                        &mut self.encoder,
144                        &record_batch,
145                        i,
146                    ) {
147                        return Poll::Ready(Some(Err(e)));
148                    }
149                    results.push(self.encoder.take_row());
150                }
151
152                Poll::Ready(Some(Ok(results)))
153            }
154            Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(convert_err(e)))),
155            Poll::Ready(None) => Poll::Ready(None),
156            Poll::Pending => Poll::Pending,
157        }
158    }
159}
160
161impl<S, B> RecordBatchRowStream<S, B>
162where
163    S: Encoder,
164    B: Stream<Item = RecordBatchResult<RecordBatch>>,
165{
166    pub(crate) fn new(
167        query_ctx: QueryContextRef,
168        pg_schema: Arc<Vec<FieldInfo>>,
169        schema: SchemaRef,
170        record_batches: B,
171        encoder: S,
172    ) -> Self {
173        Self {
174            query_ctx,
175            pg_schema,
176            schema,
177            record_batches: Box::pin(record_batches),
178            encoder,
179        }
180    }
181
182    fn encode_row(
183        query_ctx: &QueryContextRef,
184        pg_schema: &Arc<Vec<FieldInfo>>,
185        schema: &SchemaRef,
186        arrow_schema: &arrow::datatypes::Schema,
187        encoder: &mut S,
188        record_batch: &arrow::record_batch::RecordBatch,
189        i: usize,
190    ) -> PgWireResult<()> {
191        for (j, column) in record_batch.columns().iter().enumerate() {
192            let pg_field = &pg_schema[j];
193
194            if column.is_null(i) {
195                encoder.encode_field(&None::<&i8>, pg_field)?;
196                continue;
197            }
198
199            match column.data_type() {
200                // these types are greptimedb specific or custom
201                DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
202                    // jsonb
203                    if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
204                        let v = datatypes::arrow_array::binary_array_value(column, i);
205                        let s = jsonb_to_string(v).map_err(convert_err)?;
206                        encoder.encode_field(&s, pg_field)?;
207                    } else {
208                        // bytea
209                        let arrow_field = arrow_schema.field(j);
210                        encode_value(encoder, column, i, arrow_field, pg_field)?;
211                    }
212                }
213
214                DataType::List(_) => {
215                    let array = column.as_list::<i32>();
216                    let items = array.value(i);
217
218                    encode_list(encoder, items, pg_field)?;
219                }
220                DataType::Struct(_) => {
221                    encode_struct(query_ctx, Default::default(), encoder, pg_field)?;
222                }
223                _ => {
224                    // Encode value using arrow-pg
225                    let arrow_field = arrow_schema.field(j);
226                    encode_value(encoder, column, i, arrow_field, pg_field)?;
227                }
228            }
229        }
230        Ok(())
231    }
232}
233
234pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
235    match origin {
236        &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
237        &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
238        &ConcreteDataType::Int8(_) => Ok(Type::CHAR),
239        &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2),
240        &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4),
241        &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8),
242        &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC),
243        &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
244        &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
245        &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
246        &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
247        &ConcreteDataType::Date(_) => Ok(Type::DATE),
248        &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
249        &ConcreteDataType::Time(_) => Ok(Type::TIME),
250        &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
251        &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
252        &ConcreteDataType::Json(_) => Ok(Type::JSON),
253        ConcreteDataType::List(list) => match list.item_type() {
254            &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
255            &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
256            &ConcreteDataType::Int8(_) => Ok(Type::CHAR_ARRAY),
257            &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2_ARRAY),
258            &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4_ARRAY),
259            &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8_ARRAY),
260            &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC_ARRAY),
261            &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
262            &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
263            &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
264            &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
265            &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
266            &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
267            &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
268            &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
269            &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
270            &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
271            &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
272            &ConcreteDataType::Struct(_) => Ok(Type::JSON_ARRAY),
273            &ConcreteDataType::Dictionary(_)
274            | &ConcreteDataType::Vector(_)
275            | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
276                data_type: origin,
277                reason: "not implemented",
278            }
279            .fail(),
280        },
281        &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
282            data_type: origin,
283            reason: "not implemented",
284        }
285        .fail(),
286        &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
287        &ConcreteDataType::Struct(_) => Ok(Type::JSON),
288    }
289}
290
291#[allow(dead_code)]
292pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
293    // Note that we only support a small amount of pg data types
294    match origin {
295        &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
296        &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
297        &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
298        &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
299        &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
300        &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
301        &Type::TIMESTAMP | &Type::TIMESTAMPTZ => Ok(ConcreteDataType::timestamp_datatype(
302            common_time::timestamp::TimeUnit::Millisecond,
303        )),
304        &Type::DATE => Ok(ConcreteDataType::date_datatype()),
305        &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
306            common_time::timestamp::TimeUnit::Microsecond,
307        )),
308        &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
309            ConcreteDataType::int8_datatype(),
310        ))),
311        &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
312            ConcreteDataType::int16_datatype(),
313        ))),
314        &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
315            ConcreteDataType::int32_datatype(),
316        ))),
317        &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
318            ConcreteDataType::int64_datatype(),
319        ))),
320        &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
321            ConcreteDataType::string_datatype(),
322        ))),
323        _ => server_error::InternalSnafu {
324            err_msg: format!("unimplemented datatype {origin:?}"),
325        }
326        .fail(),
327    }
328}
329
330pub(super) fn parameter_to_string(portal: &Portal<PgSqlPlan>, idx: usize) -> PgWireResult<String> {
331    // the index is managed from portal's parameters count so it's safe to
332    // unwrap here.
333    let param_type = portal
334        .statement
335        .parameter_types
336        .get(idx)
337        .unwrap()
338        .as_ref()
339        .unwrap_or(&Type::UNKNOWN);
340    match param_type {
341        &Type::VARCHAR | &Type::TEXT => Ok(format!(
342            "'{}'",
343            portal
344                .parameter::<String>(idx, param_type)?
345                .as_deref()
346                .unwrap_or("")
347        )),
348        &Type::BOOL => Ok(portal
349            .parameter::<bool>(idx, param_type)?
350            .map(|v| v.to_string())
351            .unwrap_or_else(|| "".to_owned())),
352        &Type::INT4 => Ok(portal
353            .parameter::<i32>(idx, param_type)?
354            .map(|v| v.to_string())
355            .unwrap_or_else(|| "".to_owned())),
356        &Type::INT8 => Ok(portal
357            .parameter::<i64>(idx, param_type)?
358            .map(|v| v.to_string())
359            .unwrap_or_else(|| "".to_owned())),
360        &Type::FLOAT4 => Ok(portal
361            .parameter::<f32>(idx, param_type)?
362            .map(|v| v.to_string())
363            .unwrap_or_else(|| "".to_owned())),
364        &Type::FLOAT8 => Ok(portal
365            .parameter::<f64>(idx, param_type)?
366            .map(|v| v.to_string())
367            .unwrap_or_else(|| "".to_owned())),
368        &Type::DATE => Ok(portal
369            .parameter::<NaiveDate>(idx, param_type)?
370            .map(|v| v.format("%Y-%m-%d").to_string())
371            .unwrap_or_else(|| "".to_owned())),
372        &Type::TIMESTAMP => Ok(portal
373            .parameter::<NaiveDateTime>(idx, param_type)?
374            .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
375            .unwrap_or_else(|| "".to_owned())),
376        &Type::INTERVAL => Ok(portal
377            .parameter::<PgInterval>(idx, param_type)?
378            .map(|v| v.to_sql())
379            .unwrap_or_else(|| "".to_owned())),
380        _ => Err(invalid_parameter_error(
381            "unsupported_parameter_type",
382            Some(param_type.to_string()),
383        )),
384    }
385}
386
387pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
388    let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
389    error_info.detail = detail;
390    PgWireError::UserError(Box::new(error_info))
391}
392
393fn to_timestamp_scalar_value<T>(
394    data: Option<T>,
395    unit: &TimestampType,
396    ctype: &ConcreteDataType,
397) -> PgWireResult<ScalarValue>
398where
399    T: Into<i64>,
400{
401    if let Some(n) = data {
402        Value::Timestamp(unit.create_timestamp(n.into()))
403            .try_to_scalar_value(ctype)
404            .map_err(convert_err)
405    } else {
406        Ok(ScalarValue::Null)
407    }
408}
409
410pub(super) fn parameters_to_scalar_values(
411    plan: &LogicalPlan,
412    portal: &Portal<PgSqlPlan>,
413) -> PgWireResult<Vec<ScalarValue>> {
414    let param_count = portal.parameter_len();
415    let mut results = Vec::with_capacity(param_count);
416
417    let client_param_types = &portal.statement.parameter_types;
418    let server_param_types = DfLogicalPlanner::get_inferred_parameter_types(plan)
419        .context(InferParameterTypesSnafu)
420        .map_err(convert_err)?
421        .into_iter()
422        .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
423        .collect::<HashMap<_, _>>();
424
425    for idx in 0..param_count {
426        let server_type = server_param_types
427            .get(&format!("${}", idx + 1))
428            .and_then(|t| t.as_ref());
429
430        let client_type = if let Some(Some(client_given_type)) = client_param_types.get(idx) {
431            client_given_type.clone()
432        } else if let Some(server_provided_type) = &server_type {
433            type_gt_to_pg(server_provided_type).map_err(convert_err)?
434        } else {
435            return Err(invalid_parameter_error(
436                "unknown_parameter_type",
437                Some(format!(
438                    "Cannot get parameter type information for parameter {}",
439                    idx
440                )),
441            ));
442        };
443
444        let value = match &client_type {
445            &Type::VARCHAR | &Type::TEXT => {
446                let data = portal.parameter::<String>(idx, &client_type)?;
447                if let Some(server_type) = &server_type {
448                    match server_type {
449                        ConcreteDataType::String(t) => {
450                            if t.is_large() {
451                                ScalarValue::LargeUtf8(data)
452                            } else {
453                                ScalarValue::Utf8(data)
454                            }
455                        }
456                        _ => {
457                            return Err(invalid_parameter_error(
458                                "invalid_parameter_type",
459                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
460                            ));
461                        }
462                    }
463                } else {
464                    ScalarValue::Utf8(data)
465                }
466            }
467            &Type::BOOL => {
468                let data = portal.parameter::<bool>(idx, &client_type)?;
469                if let Some(server_type) = &server_type {
470                    match server_type {
471                        ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
472                        _ => {
473                            return Err(invalid_parameter_error(
474                                "invalid_parameter_type",
475                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
476                            ));
477                        }
478                    }
479                } else {
480                    ScalarValue::Boolean(data)
481                }
482            }
483            &Type::INT2 => {
484                let data = portal.parameter::<i16>(idx, &client_type)?;
485                if let Some(server_type) = &server_type {
486                    match server_type {
487                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
488                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
489                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
490                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
491                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
492                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
493                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
494                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
495                        ConcreteDataType::Timestamp(unit) => {
496                            to_timestamp_scalar_value(data, unit, server_type)?
497                        }
498                        _ => {
499                            return Err(invalid_parameter_error(
500                                "invalid_parameter_type",
501                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
502                            ));
503                        }
504                    }
505                } else {
506                    ScalarValue::Int16(data)
507                }
508            }
509            &Type::INT4 => {
510                let data = portal.parameter::<i32>(idx, &client_type)?;
511                if let Some(server_type) = &server_type {
512                    match server_type {
513                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
514                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
515                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
516                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
517                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
518                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
519                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
520                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
521                        ConcreteDataType::Timestamp(unit) => {
522                            to_timestamp_scalar_value(data, unit, server_type)?
523                        }
524                        _ => {
525                            return Err(invalid_parameter_error(
526                                "invalid_parameter_type",
527                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
528                            ));
529                        }
530                    }
531                } else {
532                    ScalarValue::Int32(data)
533                }
534            }
535            &Type::INT8 => {
536                let data = portal.parameter::<i64>(idx, &client_type)?;
537                if let Some(server_type) = &server_type {
538                    match server_type {
539                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
540                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
541                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
542                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
543                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
544                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
545                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
546                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
547                        ConcreteDataType::Timestamp(unit) => {
548                            to_timestamp_scalar_value(data, unit, server_type)?
549                        }
550                        _ => {
551                            return Err(invalid_parameter_error(
552                                "invalid_parameter_type",
553                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
554                            ));
555                        }
556                    }
557                } else {
558                    ScalarValue::Int64(data)
559                }
560            }
561            &Type::FLOAT4 => {
562                let data = portal.parameter::<f32>(idx, &client_type)?;
563                if let Some(server_type) = &server_type {
564                    match server_type {
565                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
566                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
567                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
568                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
569                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
570                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
571                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
572                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
573                        ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
574                        ConcreteDataType::Float64(_) => {
575                            ScalarValue::Float64(data.map(|n| n as f64))
576                        }
577                        _ => {
578                            return Err(invalid_parameter_error(
579                                "invalid_parameter_type",
580                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
581                            ));
582                        }
583                    }
584                } else {
585                    ScalarValue::Float32(data)
586                }
587            }
588            &Type::FLOAT8 => {
589                let data = portal.parameter::<f64>(idx, &client_type)?;
590                if let Some(server_type) = &server_type {
591                    match server_type {
592                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
593                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
594                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
595                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
596                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
597                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
598                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
599                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
600                        ConcreteDataType::Float32(_) => {
601                            ScalarValue::Float32(data.map(|n| n as f32))
602                        }
603                        ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
604                        _ => {
605                            return Err(invalid_parameter_error(
606                                "invalid_parameter_type",
607                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
608                            ));
609                        }
610                    }
611                } else {
612                    ScalarValue::Float64(data)
613                }
614            }
615            &Type::TIMESTAMP => {
616                let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
617                if let Some(server_type) = &server_type {
618                    match server_type {
619                        ConcreteDataType::Timestamp(unit) => match *unit {
620                            TimestampType::Second(_) => ScalarValue::TimestampSecond(
621                                data.map(|ts| ts.and_utc().timestamp()),
622                                None,
623                            ),
624                            TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
625                                data.map(|ts| ts.and_utc().timestamp_millis()),
626                                None,
627                            ),
628                            TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
629                                data.map(|ts| ts.and_utc().timestamp_micros()),
630                                None,
631                            ),
632                            TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
633                                data.and_then(|ts| ts.and_utc().timestamp_nanos_opt()),
634                                None,
635                            ),
636                        },
637                        _ => {
638                            return Err(invalid_parameter_error(
639                                "invalid_parameter_type",
640                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
641                            ));
642                        }
643                    }
644                } else {
645                    ScalarValue::TimestampMillisecond(
646                        data.map(|ts| ts.and_utc().timestamp_millis()),
647                        None,
648                    )
649                }
650            }
651            &Type::TIMESTAMPTZ => {
652                let data = portal.parameter::<DateTime<FixedOffset>>(idx, &client_type)?;
653                if let Some(server_type) = &server_type {
654                    match server_type {
655                        ConcreteDataType::Timestamp(unit) => match *unit {
656                            TimestampType::Second(_) => {
657                                ScalarValue::TimestampSecond(data.map(|ts| ts.timestamp()), None)
658                            }
659                            TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
660                                data.map(|ts| ts.timestamp_millis()),
661                                None,
662                            ),
663                            TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
664                                data.map(|ts| ts.timestamp_micros()),
665                                None,
666                            ),
667                            TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
668                                data.and_then(|ts| ts.timestamp_nanos_opt()),
669                                None,
670                            ),
671                        },
672                        _ => {
673                            return Err(invalid_parameter_error(
674                                "invalid_parameter_type",
675                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
676                            ));
677                        }
678                    }
679                } else {
680                    ScalarValue::TimestampMillisecond(data.map(|ts| ts.timestamp_millis()), None)
681                }
682            }
683            &Type::DATE => {
684                let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
685                if let Some(server_type) = &server_type {
686                    match server_type {
687                        ConcreteDataType::Date(_) => ScalarValue::Date32(
688                            data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
689                        ),
690                        _ => {
691                            return Err(invalid_parameter_error(
692                                "invalid_parameter_type",
693                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
694                            ));
695                        }
696                    }
697                } else {
698                    ScalarValue::Date32(
699                        data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
700                    )
701                }
702            }
703            &Type::INTERVAL => {
704                let data = portal.parameter::<PgInterval>(idx, &client_type)?;
705                if let Some(server_type) = &server_type {
706                    match server_type {
707                        ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
708                            ScalarValue::IntervalYearMonth(
709                                data.map(|i| {
710                                    if i.days != 0 || i.microseconds != 0 {
711                                        Err(invalid_parameter_error(
712                                            "invalid_parameter_type",
713                                            Some(format!(
714                                                "Expected: {}, found: {}",
715                                                server_type, client_type
716                                            )),
717                                        ))
718                                    } else {
719                                        Ok(IntervalYearMonth::new(i.months).to_i32())
720                                    }
721                                })
722                                .transpose()?,
723                            )
724                        }
725                        ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
726                            ScalarValue::IntervalDayTime(
727                                data.map(|i| {
728                                    if i.months != 0 || i.microseconds % 1000 != 0 {
729                                        Err(invalid_parameter_error(
730                                            "invalid_parameter_type",
731                                            Some(format!(
732                                                "Expected: {}, found: {}",
733                                                server_type, client_type
734                                            )),
735                                        ))
736                                    } else {
737                                        Ok(IntervalDayTime::new(
738                                            i.days,
739                                            (i.microseconds / 1000) as i32,
740                                        )
741                                        .into())
742                                    }
743                                })
744                                .transpose()?,
745                            )
746                        }
747                        ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
748                            ScalarValue::IntervalMonthDayNano(data.map(|i| {
749                                IntervalMonthDayNano::new(
750                                    i.months,
751                                    i.days,
752                                    i.microseconds * 1_000i64,
753                                )
754                                .into()
755                            }))
756                        }
757                        _ => {
758                            return Err(invalid_parameter_error(
759                                "invalid_parameter_type",
760                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
761                            ));
762                        }
763                    }
764                } else {
765                    ScalarValue::IntervalMonthDayNano(data.map(|i| {
766                        IntervalMonthDayNano::new(i.months, i.days, i.microseconds * 1_000i64)
767                            .into()
768                    }))
769                }
770            }
771            &Type::BYTEA => {
772                let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
773                if let Some(server_type) = &server_type {
774                    match server_type {
775                        ConcreteDataType::String(t) => {
776                            let s = data.map(|d| String::from_utf8_lossy(&d).to_string());
777                            if t.is_large() {
778                                ScalarValue::LargeUtf8(s)
779                            } else {
780                                ScalarValue::Utf8(s)
781                            }
782                        }
783                        ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
784                        _ => {
785                            return Err(invalid_parameter_error(
786                                "invalid_parameter_type",
787                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
788                            ));
789                        }
790                    }
791                } else {
792                    ScalarValue::Binary(data)
793                }
794            }
795            &Type::JSONB => {
796                let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
797                if let Some(server_type) = &server_type {
798                    match server_type {
799                        ConcreteDataType::Binary(_) => {
800                            ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
801                        }
802                        _ => {
803                            return Err(invalid_parameter_error(
804                                "invalid_parameter_type",
805                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
806                            ));
807                        }
808                    }
809                } else {
810                    ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
811                }
812            }
813            &Type::INT2_ARRAY => {
814                let data = portal.parameter::<Vec<Option<i16>>>(idx, &client_type)?;
815                if let Some(data) = data {
816                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
817                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
818                } else {
819                    ScalarValue::Null
820                }
821            }
822            &Type::INT4_ARRAY => {
823                let data = portal.parameter::<Vec<Option<i32>>>(idx, &client_type)?;
824                if let Some(data) = data {
825                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
826                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
827                } else {
828                    ScalarValue::Null
829                }
830            }
831            &Type::INT8_ARRAY => {
832                let data = portal.parameter::<Vec<Option<i64>>>(idx, &client_type)?;
833                if let Some(data) = data {
834                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
835                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
836                } else {
837                    ScalarValue::Null
838                }
839            }
840            &Type::VARCHAR_ARRAY => {
841                let data = portal.parameter::<Vec<Option<String>>>(idx, &client_type)?;
842                if let Some(data) = data {
843                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
844                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
845                } else {
846                    ScalarValue::Null
847                }
848            }
849            &Type::TIMESTAMP_ARRAY => {
850                let data = portal.parameter::<Vec<Option<NaiveDateTime>>>(idx, &client_type)?;
851                if let Some(data) = data {
852                    if let Some(ConcreteDataType::List(list_type)) = &server_type {
853                        match list_type.item_type() {
854                            ConcreteDataType::Timestamp(unit) => match *unit {
855                                TimestampType::Second(_) => {
856                                    let values = data
857                                        .into_iter()
858                                        .map(|ts| {
859                                            ScalarValue::TimestampSecond(
860                                                ts.map(|ts| ts.and_utc().timestamp()),
861                                                None,
862                                            )
863                                        })
864                                        .collect::<Vec<_>>();
865                                    ScalarValue::List(ScalarValue::new_list(
866                                        &values,
867                                        &ArrowDataType::Timestamp(TimeUnit::Second, None),
868                                        true,
869                                    ))
870                                }
871                                TimestampType::Millisecond(_) => {
872                                    let values = data
873                                        .into_iter()
874                                        .map(|ts| {
875                                            ScalarValue::TimestampMillisecond(
876                                                ts.map(|ts| ts.and_utc().timestamp_millis()),
877                                                None,
878                                            )
879                                        })
880                                        .collect::<Vec<_>>();
881                                    ScalarValue::List(ScalarValue::new_list(
882                                        &values,
883                                        &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
884                                        true,
885                                    ))
886                                }
887                                TimestampType::Microsecond(_) => {
888                                    let values = data
889                                        .into_iter()
890                                        .map(|ts| {
891                                            ScalarValue::TimestampMicrosecond(
892                                                ts.map(|ts| ts.and_utc().timestamp_micros()),
893                                                None,
894                                            )
895                                        })
896                                        .collect::<Vec<_>>();
897                                    ScalarValue::List(ScalarValue::new_list(
898                                        &values,
899                                        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
900                                        true,
901                                    ))
902                                }
903                                TimestampType::Nanosecond(_) => {
904                                    let values = data
905                                        .into_iter()
906                                        .filter_map(|ts| {
907                                            ts.and_then(|ts| {
908                                                ts.and_utc().timestamp_nanos_opt().map(|nanos| {
909                                                    ScalarValue::TimestampNanosecond(
910                                                        Some(nanos),
911                                                        None,
912                                                    )
913                                                })
914                                            })
915                                        })
916                                        .collect::<Vec<_>>();
917                                    ScalarValue::List(ScalarValue::new_list(
918                                        &values,
919                                        &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
920                                        true,
921                                    ))
922                                }
923                            },
924                            _ => {
925                                return Err(invalid_parameter_error(
926                                    "invalid_parameter_type",
927                                    Some(format!(
928                                        "Expected: {}, found: {}",
929                                        list_type.item_type(),
930                                        client_type
931                                    )),
932                                ));
933                            }
934                        }
935                    } else {
936                        let values = data
937                            .into_iter()
938                            .map(|ts| {
939                                ScalarValue::TimestampMillisecond(
940                                    ts.map(|ts| ts.and_utc().timestamp_millis()),
941                                    None,
942                                )
943                            })
944                            .collect::<Vec<_>>();
945                        ScalarValue::List(ScalarValue::new_list(
946                            &values,
947                            &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
948                            true,
949                        ))
950                    }
951                } else {
952                    ScalarValue::Null
953                }
954            }
955            &Type::TIMESTAMPTZ_ARRAY => {
956                let data =
957                    portal.parameter::<Vec<Option<DateTime<FixedOffset>>>>(idx, &client_type)?;
958                if let Some(data) = data {
959                    if let Some(ConcreteDataType::List(list_type)) = &server_type {
960                        match list_type.item_type() {
961                            ConcreteDataType::Timestamp(unit) => match *unit {
962                                TimestampType::Second(_) => {
963                                    let values = data
964                                        .into_iter()
965                                        .map(|ts| {
966                                            ScalarValue::TimestampSecond(
967                                                ts.map(|ts| ts.timestamp()),
968                                                None,
969                                            )
970                                        })
971                                        .collect::<Vec<_>>();
972                                    ScalarValue::List(ScalarValue::new_list(
973                                        &values,
974                                        &ArrowDataType::Timestamp(TimeUnit::Second, None),
975                                        true,
976                                    ))
977                                }
978                                TimestampType::Millisecond(_) => {
979                                    let values = data
980                                        .into_iter()
981                                        .map(|ts| {
982                                            ScalarValue::TimestampMillisecond(
983                                                ts.map(|ts| ts.timestamp_millis()),
984                                                None,
985                                            )
986                                        })
987                                        .collect::<Vec<_>>();
988                                    ScalarValue::List(ScalarValue::new_list(
989                                        &values,
990                                        &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
991                                        true,
992                                    ))
993                                }
994                                TimestampType::Microsecond(_) => {
995                                    let values = data
996                                        .into_iter()
997                                        .map(|ts| {
998                                            ScalarValue::TimestampMicrosecond(
999                                                ts.map(|ts| ts.timestamp_micros()),
1000                                                None,
1001                                            )
1002                                        })
1003                                        .collect::<Vec<_>>();
1004                                    ScalarValue::List(ScalarValue::new_list(
1005                                        &values,
1006                                        &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1007                                        true,
1008                                    ))
1009                                }
1010                                TimestampType::Nanosecond(_) => {
1011                                    let values = data
1012                                        .into_iter()
1013                                        .map(|ts| {
1014                                            ScalarValue::TimestampNanosecond(
1015                                                ts.and_then(|ts| ts.timestamp_nanos_opt()),
1016                                                None,
1017                                            )
1018                                        })
1019                                        .collect::<Vec<_>>();
1020                                    ScalarValue::List(ScalarValue::new_list(
1021                                        &values,
1022                                        &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
1023                                        true,
1024                                    ))
1025                                }
1026                            },
1027                            _ => {
1028                                return Err(invalid_parameter_error(
1029                                    "invalid_parameter_type",
1030                                    Some(format!(
1031                                        "Expected: {}, found: {}",
1032                                        list_type.item_type(),
1033                                        client_type
1034                                    )),
1035                                ));
1036                            }
1037                        }
1038                    } else {
1039                        let values = data
1040                            .into_iter()
1041                            .map(|ts| {
1042                                ScalarValue::TimestampMillisecond(
1043                                    ts.map(|ts| ts.timestamp_millis()),
1044                                    None,
1045                                )
1046                            })
1047                            .collect::<Vec<_>>();
1048                        ScalarValue::List(ScalarValue::new_list(
1049                            &values,
1050                            &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1051                            true,
1052                        ))
1053                    }
1054                } else {
1055                    ScalarValue::Null
1056                }
1057            }
1058            _ => Err(invalid_parameter_error(
1059                "unsupported_parameter_value",
1060                Some(format!("Found type: {}", client_type)),
1061            ))?,
1062        };
1063
1064        results.push(value);
1065    }
1066
1067    Ok(results)
1068}
1069
1070pub(super) fn param_types_to_pg_types(
1071    param_types: &HashMap<String, Option<ConcreteDataType>>,
1072) -> Result<Vec<Type>> {
1073    let param_count = param_types.len();
1074    let mut types = Vec::with_capacity(param_count);
1075    for i in 0..param_count {
1076        if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1077            let pg_type = type_gt_to_pg(param_type)?;
1078            types.push(pg_type);
1079        } else {
1080            types.push(Type::UNKNOWN);
1081        }
1082    }
1083    Ok(types)
1084}
1085
1086pub fn format_options_from_query_ctx(query_ctx: &QueryContextRef) -> Arc<PgFormatOptions> {
1087    let config = query_ctx.configuration_parameter();
1088    let (date_style, date_order) = *config.pg_datetime_style();
1089
1090    let mut format_options = PgFormatOptions::default();
1091    format_options.date_style = format!("{}, {}", date_style, date_order);
1092    format_options.interval_style = config.pg_intervalstyle_format().to_string();
1093    format_options.bytea_output = config.postgres_bytea_output().to_string();
1094    format_options.time_zone = query_ctx.timezone().to_string();
1095
1096    Arc::new(format_options)
1097}
1098
1099#[cfg(test)]
1100mod test {
1101    use std::sync::Arc;
1102
1103    use arrow::array::{
1104        Float64Builder, Int64Builder, ListBuilder, StringBuilder, TimestampSecondBuilder,
1105    };
1106    use arrow_schema::{Field, IntervalUnit};
1107    use datatypes::schema::{ColumnSchema, Schema};
1108    use datatypes::vectors::{
1109        BinaryVector, BooleanVector, DateVector, Float32Vector, Float64Vector, Int8Vector,
1110        Int16Vector, Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
1111        IntervalYearMonthVector, ListVector, NullVector, StringVector, TimeSecondVector,
1112        TimestampSecondVector, UInt8Vector, UInt16Vector, UInt32Vector, UInt64Vector, VectorRef,
1113    };
1114    use futures::{StreamExt as FuturesStreamExt, stream};
1115    use pgwire::api::Type;
1116    use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo};
1117    use session::context::QueryContextBuilder;
1118
1119    use super::*;
1120
1121    #[test]
1122    fn test_schema_convert() {
1123        let column_schemas = vec![
1124            ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1125            ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1126            ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1127            ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1128            ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1129            ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1130            ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1131            ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1132            ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1133            ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1134            ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1135            ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1136            ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1137            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1138            ColumnSchema::new(
1139                "timestamps",
1140                ConcreteDataType::timestamp_millisecond_datatype(),
1141                true,
1142            ),
1143            ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1144            ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1145            ColumnSchema::new(
1146                "intervals",
1147                ConcreteDataType::interval_month_day_nano_datatype(),
1148                true,
1149            ),
1150        ];
1151        let pg_field_info = vec![
1152            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1153            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1154            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1155            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1156            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1157            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1158            FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1159            FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1160            FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1161            FieldInfo::new(
1162                "uint64s".into(),
1163                None,
1164                None,
1165                Type::NUMERIC,
1166                FieldFormat::Text,
1167            ),
1168            FieldInfo::new(
1169                "float32s".into(),
1170                None,
1171                None,
1172                Type::FLOAT4,
1173                FieldFormat::Text,
1174            ),
1175            FieldInfo::new(
1176                "float64s".into(),
1177                None,
1178                None,
1179                Type::FLOAT8,
1180                FieldFormat::Text,
1181            ),
1182            FieldInfo::new(
1183                "binaries".into(),
1184                None,
1185                None,
1186                Type::BYTEA,
1187                FieldFormat::Text,
1188            ),
1189            FieldInfo::new(
1190                "strings".into(),
1191                None,
1192                None,
1193                Type::VARCHAR,
1194                FieldFormat::Text,
1195            ),
1196            FieldInfo::new(
1197                "timestamps".into(),
1198                None,
1199                None,
1200                Type::TIMESTAMP,
1201                FieldFormat::Text,
1202            ),
1203            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1204            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1205            FieldInfo::new(
1206                "intervals".into(),
1207                None,
1208                None,
1209                Type::INTERVAL,
1210                FieldFormat::Text,
1211            ),
1212        ];
1213        let schema = Schema::new(column_schemas);
1214        let fs = schema_to_pg(&schema, &Format::UnifiedText, None).unwrap();
1215        assert_eq!(fs, pg_field_info);
1216    }
1217
1218    #[test]
1219    fn test_encode_text_format_data() {
1220        let pg_schema = vec![
1221            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1222            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1223            FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1224            FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1225            FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1226            FieldInfo::new(
1227                "uint64s".into(),
1228                None,
1229                None,
1230                Type::NUMERIC,
1231                FieldFormat::Text,
1232            ),
1233            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1234            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1235            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1236            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1237            FieldInfo::new(
1238                "float32s".into(),
1239                None,
1240                None,
1241                Type::FLOAT4,
1242                FieldFormat::Text,
1243            ),
1244            FieldInfo::new(
1245                "float64s".into(),
1246                None,
1247                None,
1248                Type::FLOAT8,
1249                FieldFormat::Text,
1250            ),
1251            FieldInfo::new(
1252                "strings".into(),
1253                None,
1254                None,
1255                Type::VARCHAR,
1256                FieldFormat::Text,
1257            ),
1258            FieldInfo::new(
1259                "binaries".into(),
1260                None,
1261                None,
1262                Type::BYTEA,
1263                FieldFormat::Text,
1264            ),
1265            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1266            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1267            FieldInfo::new(
1268                "timestamps".into(),
1269                None,
1270                None,
1271                Type::TIMESTAMP,
1272                FieldFormat::Text,
1273            ),
1274            FieldInfo::new(
1275                "interval_year_month".into(),
1276                None,
1277                None,
1278                Type::INTERVAL,
1279                FieldFormat::Text,
1280            ),
1281            FieldInfo::new(
1282                "interval_day_time".into(),
1283                None,
1284                None,
1285                Type::INTERVAL,
1286                FieldFormat::Text,
1287            ),
1288            FieldInfo::new(
1289                "interval_month_day_nano".into(),
1290                None,
1291                None,
1292                Type::INTERVAL,
1293                FieldFormat::Text,
1294            ),
1295            FieldInfo::new(
1296                "int_list".into(),
1297                None,
1298                None,
1299                Type::INT8_ARRAY,
1300                FieldFormat::Text,
1301            ),
1302            FieldInfo::new(
1303                "float_list".into(),
1304                None,
1305                None,
1306                Type::FLOAT8_ARRAY,
1307                FieldFormat::Text,
1308            ),
1309            FieldInfo::new(
1310                "string_list".into(),
1311                None,
1312                None,
1313                Type::VARCHAR_ARRAY,
1314                FieldFormat::Text,
1315            ),
1316            FieldInfo::new(
1317                "timestamp_list".into(),
1318                None,
1319                None,
1320                Type::TIMESTAMP_ARRAY,
1321                FieldFormat::Text,
1322            ),
1323        ];
1324
1325        let arrow_schema = arrow_schema::Schema::new(vec![
1326            Field::new("x", DataType::Null, true),
1327            Field::new("x", DataType::Boolean, true),
1328            Field::new("x", DataType::UInt8, true),
1329            Field::new("x", DataType::UInt16, true),
1330            Field::new("x", DataType::UInt32, true),
1331            Field::new("x", DataType::UInt64, true),
1332            Field::new("x", DataType::Int8, true),
1333            Field::new("x", DataType::Int16, true),
1334            Field::new("x", DataType::Int32, true),
1335            Field::new("x", DataType::Int64, true),
1336            Field::new("x", DataType::Float32, true),
1337            Field::new("x", DataType::Float64, true),
1338            Field::new("x", DataType::Utf8, true),
1339            Field::new("x", DataType::Binary, true),
1340            Field::new("x", DataType::Date32, true),
1341            Field::new("x", DataType::Time32(TimeUnit::Second), true),
1342            Field::new("x", DataType::Timestamp(TimeUnit::Second, None), true),
1343            Field::new("x", DataType::Interval(IntervalUnit::YearMonth), true),
1344            Field::new("x", DataType::Interval(IntervalUnit::DayTime), true),
1345            Field::new("x", DataType::Interval(IntervalUnit::MonthDayNano), true),
1346            Field::new(
1347                "x",
1348                DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
1349                true,
1350            ),
1351            Field::new(
1352                "x",
1353                DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
1354                true,
1355            ),
1356            Field::new(
1357                "x",
1358                DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
1359                true,
1360            ),
1361            Field::new(
1362                "x",
1363                DataType::List(Arc::new(Field::new(
1364                    "item",
1365                    DataType::Timestamp(TimeUnit::Second, None),
1366                    true,
1367                ))),
1368                true,
1369            ),
1370        ]);
1371
1372        let mut builder = ListBuilder::new(Int64Builder::new());
1373        builder.append_value([Some(1i64), None, Some(2)]);
1374        builder.append_null();
1375        builder.append_value([Some(-1i64), None, Some(-2)]);
1376        let i64_list_array = builder.finish();
1377
1378        let mut builder = ListBuilder::new(Float64Builder::new());
1379        builder.append_value([Some(1.0f64), None, Some(2.0)]);
1380        builder.append_null();
1381        builder.append_value([Some(-1.0f64), None, Some(-2.0)]);
1382        let f64_list_array = builder.finish();
1383
1384        let mut builder = ListBuilder::new(StringBuilder::new());
1385        builder.append_value([Some("a"), None, Some("b")]);
1386        builder.append_null();
1387        builder.append_value([Some("c"), None, Some("d")]);
1388        let string_list_array = builder.finish();
1389
1390        let mut builder = ListBuilder::new(TimestampSecondBuilder::new());
1391        builder.append_value([Some(1i64), None, Some(2)]);
1392        builder.append_null();
1393        builder.append_value([Some(3i64), None, Some(4)]);
1394        let timestamp_list_array = builder.finish();
1395
1396        let values = vec![
1397            Arc::new(NullVector::new(3)) as VectorRef,
1398            Arc::new(BooleanVector::from(vec![Some(true), Some(false), None])),
1399            Arc::new(UInt8Vector::from(vec![Some(u8::MAX), Some(u8::MIN), None])),
1400            Arc::new(UInt16Vector::from(vec![
1401                Some(u16::MAX),
1402                Some(u16::MIN),
1403                None,
1404            ])),
1405            Arc::new(UInt32Vector::from(vec![
1406                Some(u32::MAX),
1407                Some(u32::MIN),
1408                None,
1409            ])),
1410            Arc::new(UInt64Vector::from(vec![
1411                Some(u64::MAX),
1412                Some(u64::MIN),
1413                None,
1414            ])),
1415            Arc::new(Int8Vector::from(vec![Some(i8::MAX), Some(i8::MIN), None])),
1416            Arc::new(Int16Vector::from(vec![
1417                Some(i16::MAX),
1418                Some(i16::MIN),
1419                None,
1420            ])),
1421            Arc::new(Int32Vector::from(vec![
1422                Some(i32::MAX),
1423                Some(i32::MIN),
1424                None,
1425            ])),
1426            Arc::new(Int64Vector::from(vec![
1427                Some(i64::MAX),
1428                Some(i64::MIN),
1429                None,
1430            ])),
1431            Arc::new(Float32Vector::from(vec![
1432                None,
1433                Some(f32::MAX),
1434                Some(f32::MIN),
1435            ])),
1436            Arc::new(Float64Vector::from(vec![
1437                None,
1438                Some(f64::MAX),
1439                Some(f64::MIN),
1440            ])),
1441            Arc::new(StringVector::from(vec![
1442                None,
1443                Some("hello"),
1444                Some("greptime"),
1445            ])),
1446            Arc::new(BinaryVector::from(vec![
1447                None,
1448                Some("hello".as_bytes().to_vec()),
1449                Some("world".as_bytes().to_vec()),
1450            ])),
1451            Arc::new(DateVector::from(vec![Some(1001), None, Some(1)])),
1452            Arc::new(TimeSecondVector::from(vec![Some(1001), None, Some(1)])),
1453            Arc::new(TimestampSecondVector::from(vec![
1454                Some(1000001),
1455                None,
1456                Some(1),
1457            ])),
1458            Arc::new(IntervalYearMonthVector::from(vec![Some(1), None, Some(2)])),
1459            Arc::new(IntervalDayTimeVector::from(vec![
1460                Some(arrow::datatypes::IntervalDayTime::new(1, 1)),
1461                None,
1462                Some(arrow::datatypes::IntervalDayTime::new(2, 2)),
1463            ])),
1464            Arc::new(IntervalMonthDayNanoVector::from(vec![
1465                Some(arrow::datatypes::IntervalMonthDayNano::new(1, 1, 10)),
1466                None,
1467                Some(arrow::datatypes::IntervalMonthDayNano::new(2, 2, 20)),
1468            ])),
1469            Arc::new(ListVector::from(i64_list_array)),
1470            Arc::new(ListVector::from(f64_list_array)),
1471            Arc::new(ListVector::from(string_list_array)),
1472            Arc::new(ListVector::from(timestamp_list_array)),
1473        ];
1474        let record_batch =
1475            RecordBatch::new(Arc::new(arrow_schema.try_into().unwrap()), values).unwrap();
1476
1477        let query_context = QueryContextBuilder::default()
1478            .configuration_parameter(Default::default())
1479            .build()
1480            .into();
1481        let schema = record_batch.schema.clone();
1482        let pg_schema_ref = Arc::new(pg_schema);
1483
1484        let encoder = DataRowEncoder::new(pg_schema_ref.clone());
1485
1486        let row_stream = RecordBatchRowStream::new(
1487            query_context,
1488            pg_schema_ref.clone(),
1489            schema,
1490            stream::once(async { Ok(record_batch) }),
1491            encoder,
1492        );
1493
1494        let rows: Vec<_> = futures::executor::block_on(
1495            row_stream
1496                .filter_map(|x: PgWireResult<_>| async move { x.ok() })
1497                .flat_map(stream::iter)
1498                .collect::<Vec<_>>(),
1499        );
1500        assert_eq!(rows.len(), 3);
1501        for row in rows {
1502            assert_eq!(row.field_count, pg_schema_ref.len() as i16);
1503        }
1504    }
1505
1506    #[test]
1507    fn test_invalid_parameter() {
1508        // test for refactor with PgErrorCode
1509        let msg = "invalid_parameter_count";
1510        let error = invalid_parameter_error(msg, None);
1511        if let PgWireError::UserError(value) = error {
1512            assert_eq!("ERROR", value.severity);
1513            assert_eq!("22023", value.code);
1514            assert_eq!(msg, value.message);
1515        } else {
1516            panic!("test_invalid_parameter failed");
1517        }
1518    }
1519}