servers/postgres/
types.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod bytea;
16mod datetime;
17mod error;
18mod interval;
19
20use std::collections::HashMap;
21use std::ops::Deref;
22
23use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
24use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datafusion_expr::LogicalPlan;
27use datatypes::arrow::datatypes::DataType as ArrowDataType;
28use datatypes::prelude::{ConcreteDataType, Value};
29use datatypes::schema::Schema;
30use datatypes::types::{json_type_value_to_string, IntervalType, TimestampType};
31use datatypes::value::ListValue;
32use pgwire::api::portal::{Format, Portal};
33use pgwire::api::results::{DataRowEncoder, FieldInfo};
34use pgwire::api::Type;
35use pgwire::error::{PgWireError, PgWireResult};
36use session::context::QueryContextRef;
37use session::session_config::PGByteaOutputValue;
38use snafu::ResultExt;
39
40use self::bytea::{EscapeOutputBytea, HexOutputBytea};
41use self::datetime::{StylingDate, StylingDateTime};
42pub use self::error::{PgErrorCode, PgErrorSeverity};
43use self::interval::PgInterval;
44use crate::error::{self as server_error, DataFusionSnafu, Error, Result};
45use crate::postgres::utils::convert_err;
46use crate::SqlPlan;
47
48pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result<Vec<FieldInfo>> {
49    origin
50        .column_schemas()
51        .iter()
52        .enumerate()
53        .map(|(idx, col)| {
54            Ok(FieldInfo::new(
55                col.name.clone(),
56                None,
57                None,
58                type_gt_to_pg(&col.data_type)?,
59                field_formats.format_for(idx),
60            ))
61        })
62        .collect::<Result<Vec<FieldInfo>>>()
63}
64
65fn encode_array(
66    query_ctx: &QueryContextRef,
67    value_list: &ListValue,
68    builder: &mut DataRowEncoder,
69) -> PgWireResult<()> {
70    match value_list.datatype() {
71        &ConcreteDataType::Boolean(_) => {
72            let array = value_list
73                .items()
74                .iter()
75                .map(|v| match v {
76                    Value::Null => Ok(None),
77                    Value::Boolean(v) => Ok(Some(*v)),
78                    _ => Err(convert_err(Error::Internal {
79                        err_msg: format!("Invalid list item type, find {v:?}, expected bool",),
80                    })),
81                })
82                .collect::<PgWireResult<Vec<Option<bool>>>>()?;
83            builder.encode_field(&array)
84        }
85        &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => {
86            let array = value_list
87                .items()
88                .iter()
89                .map(|v| match v {
90                    Value::Null => Ok(None),
91                    Value::Int8(v) => Ok(Some(*v)),
92                    Value::UInt8(v) => Ok(Some(*v as i8)),
93                    _ => Err(convert_err(Error::Internal {
94                        err_msg: format!(
95                            "Invalid list item type, find {v:?}, expected int8 or uint8",
96                        ),
97                    })),
98                })
99                .collect::<PgWireResult<Vec<Option<i8>>>>()?;
100            builder.encode_field(&array)
101        }
102        &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => {
103            let array = value_list
104                .items()
105                .iter()
106                .map(|v| match v {
107                    Value::Null => Ok(None),
108                    Value::Int16(v) => Ok(Some(*v)),
109                    Value::UInt16(v) => Ok(Some(*v as i16)),
110                    _ => Err(convert_err(Error::Internal {
111                        err_msg: format!(
112                            "Invalid list item type, find {v:?}, expected int16 or uint16",
113                        ),
114                    })),
115                })
116                .collect::<PgWireResult<Vec<Option<i16>>>>()?;
117            builder.encode_field(&array)
118        }
119        &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => {
120            let array = value_list
121                .items()
122                .iter()
123                .map(|v| match v {
124                    Value::Null => Ok(None),
125                    Value::Int32(v) => Ok(Some(*v)),
126                    Value::UInt32(v) => Ok(Some(*v as i32)),
127                    _ => Err(convert_err(Error::Internal {
128                        err_msg: format!(
129                            "Invalid list item type, find {v:?}, expected int32 or uint32",
130                        ),
131                    })),
132                })
133                .collect::<PgWireResult<Vec<Option<i32>>>>()?;
134            builder.encode_field(&array)
135        }
136        &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => {
137            let array = value_list
138                .items()
139                .iter()
140                .map(|v| match v {
141                    Value::Null => Ok(None),
142                    Value::Int64(v) => Ok(Some(*v)),
143                    Value::UInt64(v) => Ok(Some(*v as i64)),
144                    _ => Err(convert_err(Error::Internal {
145                        err_msg: format!(
146                            "Invalid list item type, find {v:?}, expected int64 or uint64",
147                        ),
148                    })),
149                })
150                .collect::<PgWireResult<Vec<Option<i64>>>>()?;
151            builder.encode_field(&array)
152        }
153        &ConcreteDataType::Float32(_) => {
154            let array = value_list
155                .items()
156                .iter()
157                .map(|v| match v {
158                    Value::Null => Ok(None),
159                    Value::Float32(v) => Ok(Some(v.0)),
160                    _ => Err(convert_err(Error::Internal {
161                        err_msg: format!("Invalid list item type, find {v:?}, expected float32",),
162                    })),
163                })
164                .collect::<PgWireResult<Vec<Option<f32>>>>()?;
165            builder.encode_field(&array)
166        }
167        &ConcreteDataType::Float64(_) => {
168            let array = value_list
169                .items()
170                .iter()
171                .map(|v| match v {
172                    Value::Null => Ok(None),
173                    Value::Float64(v) => Ok(Some(v.0)),
174                    _ => Err(convert_err(Error::Internal {
175                        err_msg: format!("Invalid list item type, find {v:?}, expected float64",),
176                    })),
177                })
178                .collect::<PgWireResult<Vec<Option<f64>>>>()?;
179            builder.encode_field(&array)
180        }
181        &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => {
182            let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
183
184            match *bytea_output {
185                PGByteaOutputValue::ESCAPE => {
186                    let array = value_list
187                        .items()
188                        .iter()
189                        .map(|v| match v {
190                            Value::Null => Ok(None),
191                            Value::Binary(v) => Ok(Some(EscapeOutputBytea(v.deref()))),
192
193                            _ => Err(convert_err(Error::Internal {
194                                err_msg: format!(
195                                    "Invalid list item type, find {v:?}, expected binary",
196                                ),
197                            })),
198                        })
199                        .collect::<PgWireResult<Vec<Option<EscapeOutputBytea>>>>()?;
200                    builder.encode_field(&array)
201                }
202                PGByteaOutputValue::HEX => {
203                    let array = value_list
204                        .items()
205                        .iter()
206                        .map(|v| match v {
207                            Value::Null => Ok(None),
208                            Value::Binary(v) => Ok(Some(HexOutputBytea(v.deref()))),
209
210                            _ => Err(convert_err(Error::Internal {
211                                err_msg: format!(
212                                    "Invalid list item type, find {v:?}, expected binary",
213                                ),
214                            })),
215                        })
216                        .collect::<PgWireResult<Vec<Option<HexOutputBytea>>>>()?;
217                    builder.encode_field(&array)
218                }
219            }
220        }
221        &ConcreteDataType::String(_) => {
222            let array = value_list
223                .items()
224                .iter()
225                .map(|v| match v {
226                    Value::Null => Ok(None),
227                    Value::String(v) => Ok(Some(v.as_utf8())),
228                    _ => Err(convert_err(Error::Internal {
229                        err_msg: format!("Invalid list item type, find {v:?}, expected string",),
230                    })),
231                })
232                .collect::<PgWireResult<Vec<Option<&str>>>>()?;
233            builder.encode_field(&array)
234        }
235        &ConcreteDataType::Date(_) => {
236            let array = value_list
237                .items()
238                .iter()
239                .map(|v| match v {
240                    Value::Null => Ok(None),
241                    Value::Date(v) => {
242                        if let Some(date) = v.to_chrono_date() {
243                            let (style, order) =
244                                *query_ctx.configuration_parameter().pg_datetime_style();
245                            Ok(Some(StylingDate(date, style, order)))
246                        } else {
247                            Err(convert_err(Error::Internal {
248                                err_msg: format!("Failed to convert date to postgres type {v:?}",),
249                            }))
250                        }
251                    }
252                    _ => Err(convert_err(Error::Internal {
253                        err_msg: format!("Invalid list item type, find {v:?}, expected date",),
254                    })),
255                })
256                .collect::<PgWireResult<Vec<Option<StylingDate>>>>()?;
257            builder.encode_field(&array)
258        }
259        &ConcreteDataType::Timestamp(_) => {
260            let array = value_list
261                .items()
262                .iter()
263                .map(|v| match v {
264                    Value::Null => Ok(None),
265                    Value::Timestamp(v) => {
266                        if let Some(datetime) =
267                            v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
268                        {
269                            let (style, order) =
270                                *query_ctx.configuration_parameter().pg_datetime_style();
271                            Ok(Some(StylingDateTime(datetime, style, order)))
272                        } else {
273                            Err(convert_err(Error::Internal {
274                                err_msg: format!("Failed to convert date to postgres type {v:?}",),
275                            }))
276                        }
277                    }
278                    _ => Err(convert_err(Error::Internal {
279                        err_msg: format!("Invalid list item type, find {v:?}, expected timestamp",),
280                    })),
281                })
282                .collect::<PgWireResult<Vec<Option<StylingDateTime>>>>()?;
283            builder.encode_field(&array)
284        }
285        &ConcreteDataType::Time(_) => {
286            let array = value_list
287                .items()
288                .iter()
289                .map(|v| match v {
290                    Value::Null => Ok(None),
291                    Value::Time(v) => Ok(v.to_chrono_time()),
292                    _ => Err(convert_err(Error::Internal {
293                        err_msg: format!("Invalid list item type, find {v:?}, expected time",),
294                    })),
295                })
296                .collect::<PgWireResult<Vec<Option<NaiveTime>>>>()?;
297            builder.encode_field(&array)
298        }
299        &ConcreteDataType::Interval(_) => {
300            let array = value_list
301                .items()
302                .iter()
303                .map(|v| match v {
304                    Value::Null => Ok(None),
305                    Value::IntervalYearMonth(v) => Ok(Some(PgInterval::from(*v))),
306                    Value::IntervalDayTime(v) => Ok(Some(PgInterval::from(*v))),
307                    Value::IntervalMonthDayNano(v) => Ok(Some(PgInterval::from(*v))),
308                    _ => Err(convert_err(Error::Internal {
309                        err_msg: format!("Invalid list item type, find {v:?}, expected interval",),
310                    })),
311                })
312                .collect::<PgWireResult<Vec<Option<PgInterval>>>>()?;
313            builder.encode_field(&array)
314        }
315        &ConcreteDataType::Decimal128(_) => {
316            let array = value_list
317                .items()
318                .iter()
319                .map(|v| match v {
320                    Value::Null => Ok(None),
321                    Value::Decimal128(v) => Ok(Some(v.to_string())),
322                    _ => Err(convert_err(Error::Internal {
323                        err_msg: format!("Invalid list item type, find {v:?}, expected decimal",),
324                    })),
325                })
326                .collect::<PgWireResult<Vec<Option<String>>>>()?;
327            builder.encode_field(&array)
328        }
329        &ConcreteDataType::Json(j) => {
330            let array = value_list
331                .items()
332                .iter()
333                .map(|v| match v {
334                    Value::Null => Ok(None),
335                    Value::Binary(v) => {
336                        let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
337                        Ok(Some(s))
338                    }
339                    _ => Err(convert_err(Error::Internal {
340                        err_msg: format!("Invalid list item type, find {v:?}, expected json",),
341                    })),
342                })
343                .collect::<PgWireResult<Vec<Option<String>>>>()?;
344            builder.encode_field(&array)
345        }
346        _ => Err(convert_err(Error::Internal {
347            err_msg: format!(
348                "cannot write array type {:?} in postgres protocol: unimplemented",
349                value_list.datatype()
350            ),
351        })),
352    }
353}
354
355pub(super) fn encode_value(
356    query_ctx: &QueryContextRef,
357    value: &Value,
358    builder: &mut DataRowEncoder,
359    datatype: &ConcreteDataType,
360) -> PgWireResult<()> {
361    match value {
362        Value::Null => builder.encode_field(&None::<&i8>),
363        Value::Boolean(v) => builder.encode_field(v),
364        Value::UInt8(v) => builder.encode_field(&(*v as i8)),
365        Value::UInt16(v) => builder.encode_field(&(*v as i16)),
366        Value::UInt32(v) => builder.encode_field(v),
367        Value::UInt64(v) => builder.encode_field(&(*v as i64)),
368        Value::Int8(v) => builder.encode_field(v),
369        Value::Int16(v) => builder.encode_field(v),
370        Value::Int32(v) => builder.encode_field(v),
371        Value::Int64(v) => builder.encode_field(v),
372        Value::Float32(v) => builder.encode_field(&v.0),
373        Value::Float64(v) => builder.encode_field(&v.0),
374        Value::String(v) => builder.encode_field(&v.as_utf8()),
375        Value::Binary(v) => match datatype {
376            ConcreteDataType::Json(j) => {
377                let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
378                builder.encode_field(&s)
379            }
380            _ => {
381                let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
382                match *bytea_output {
383                    PGByteaOutputValue::ESCAPE => {
384                        builder.encode_field(&EscapeOutputBytea(v.deref()))
385                    }
386                    PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
387                }
388            }
389        },
390        Value::Date(v) => {
391            if let Some(date) = v.to_chrono_date() {
392                let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
393                builder.encode_field(&StylingDate(date, style, order))
394            } else {
395                Err(convert_err(Error::Internal {
396                    err_msg: format!("Failed to convert date to postgres type {v:?}",),
397                }))
398            }
399        }
400        Value::Timestamp(v) => {
401            if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
402            {
403                let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
404                builder.encode_field(&StylingDateTime(datetime, style, order))
405            } else {
406                Err(convert_err(Error::Internal {
407                    err_msg: format!("Failed to convert date to postgres type {v:?}",),
408                }))
409            }
410        }
411        Value::Time(v) => {
412            if let Some(time) = v.to_chrono_time() {
413                builder.encode_field(&time)
414            } else {
415                Err(convert_err(Error::Internal {
416                    err_msg: format!("Failed to convert time to postgres type {v:?}",),
417                }))
418            }
419        }
420        Value::IntervalYearMonth(v) => builder.encode_field(&PgInterval::from(*v)),
421        Value::IntervalDayTime(v) => builder.encode_field(&PgInterval::from(*v)),
422        Value::IntervalMonthDayNano(v) => builder.encode_field(&PgInterval::from(*v)),
423        Value::Decimal128(v) => builder.encode_field(&v.to_string()),
424        Value::Duration(d) => match PgInterval::try_from(*d) {
425            Ok(i) => builder.encode_field(&i),
426            Err(e) => Err(convert_err(Error::Internal {
427                err_msg: e.to_string(),
428            })),
429        },
430        Value::List(values) => encode_array(query_ctx, values, builder),
431    }
432}
433
434pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
435    match origin {
436        &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
437        &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
438        &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR),
439        &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2),
440        &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4),
441        &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8),
442        &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
443        &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
444        &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
445        &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
446        &ConcreteDataType::Date(_) => Ok(Type::DATE),
447        &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
448        &ConcreteDataType::Time(_) => Ok(Type::TIME),
449        &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
450        &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
451        &ConcreteDataType::Json(_) => Ok(Type::JSON),
452        ConcreteDataType::List(list) => match list.item_type() {
453            &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
454            &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
455            &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR_ARRAY),
456            &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2_ARRAY),
457            &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4_ARRAY),
458            &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8_ARRAY),
459            &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
460            &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
461            &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
462            &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
463            &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
464            &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
465            &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
466            &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
467            &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
468            &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
469            &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
470            &ConcreteDataType::Dictionary(_)
471            | &ConcreteDataType::Vector(_)
472            | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
473                data_type: origin,
474                reason: "not implemented",
475            }
476            .fail(),
477        },
478        &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
479            data_type: origin,
480            reason: "not implemented",
481        }
482        .fail(),
483        &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
484    }
485}
486
487#[allow(dead_code)]
488pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
489    // Note that we only support a small amount of pg data types
490    match origin {
491        &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
492        &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
493        &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
494        &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
495        &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
496        &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
497        &Type::TIMESTAMP => Ok(ConcreteDataType::timestamp_datatype(
498            common_time::timestamp::TimeUnit::Millisecond,
499        )),
500        &Type::DATE => Ok(ConcreteDataType::date_datatype()),
501        &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
502            common_time::timestamp::TimeUnit::Microsecond,
503        )),
504        &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
505            ConcreteDataType::int8_datatype(),
506        )),
507        &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(
508            ConcreteDataType::int16_datatype(),
509        )),
510        &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(
511            ConcreteDataType::int32_datatype(),
512        )),
513        &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(
514            ConcreteDataType::int64_datatype(),
515        )),
516        &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
517            ConcreteDataType::string_datatype(),
518        )),
519        _ => server_error::InternalSnafu {
520            err_msg: format!("unimplemented datatype {origin:?}"),
521        }
522        .fail(),
523    }
524}
525
526pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
527    // the index is managed from portal's parameters count so it's safe to
528    // unwrap here.
529    let param_type = portal.statement.parameter_types.get(idx).unwrap();
530    match param_type {
531        &Type::VARCHAR | &Type::TEXT => Ok(format!(
532            "'{}'",
533            portal
534                .parameter::<String>(idx, param_type)?
535                .as_deref()
536                .unwrap_or("")
537        )),
538        &Type::BOOL => Ok(portal
539            .parameter::<bool>(idx, param_type)?
540            .map(|v| v.to_string())
541            .unwrap_or_else(|| "".to_owned())),
542        &Type::INT4 => Ok(portal
543            .parameter::<i32>(idx, param_type)?
544            .map(|v| v.to_string())
545            .unwrap_or_else(|| "".to_owned())),
546        &Type::INT8 => Ok(portal
547            .parameter::<i64>(idx, param_type)?
548            .map(|v| v.to_string())
549            .unwrap_or_else(|| "".to_owned())),
550        &Type::FLOAT4 => Ok(portal
551            .parameter::<f32>(idx, param_type)?
552            .map(|v| v.to_string())
553            .unwrap_or_else(|| "".to_owned())),
554        &Type::FLOAT8 => Ok(portal
555            .parameter::<f64>(idx, param_type)?
556            .map(|v| v.to_string())
557            .unwrap_or_else(|| "".to_owned())),
558        &Type::DATE => Ok(portal
559            .parameter::<NaiveDate>(idx, param_type)?
560            .map(|v| v.format("%Y-%m-%d").to_string())
561            .unwrap_or_else(|| "".to_owned())),
562        &Type::TIMESTAMP => Ok(portal
563            .parameter::<NaiveDateTime>(idx, param_type)?
564            .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
565            .unwrap_or_else(|| "".to_owned())),
566        &Type::INTERVAL => Ok(portal
567            .parameter::<PgInterval>(idx, param_type)?
568            .map(|v| v.to_string())
569            .unwrap_or_else(|| "".to_owned())),
570        _ => Err(invalid_parameter_error(
571            "unsupported_parameter_type",
572            Some(param_type.to_string()),
573        )),
574    }
575}
576
577pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
578    let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
579    error_info.detail = detail;
580    PgWireError::UserError(Box::new(error_info))
581}
582
583fn to_timestamp_scalar_value<T>(
584    data: Option<T>,
585    unit: &TimestampType,
586    ctype: &ConcreteDataType,
587) -> PgWireResult<ScalarValue>
588where
589    T: Into<i64>,
590{
591    if let Some(n) = data {
592        Value::Timestamp(unit.create_timestamp(n.into()))
593            .try_to_scalar_value(ctype)
594            .map_err(convert_err)
595    } else {
596        Ok(ScalarValue::Null)
597    }
598}
599
600pub(super) fn parameters_to_scalar_values(
601    plan: &LogicalPlan,
602    portal: &Portal<SqlPlan>,
603) -> PgWireResult<Vec<ScalarValue>> {
604    let param_count = portal.parameter_len();
605    let mut results = Vec::with_capacity(param_count);
606
607    let client_param_types = &portal.statement.parameter_types;
608    let param_types = plan
609        .get_parameter_types()
610        .context(DataFusionSnafu)
611        .map_err(convert_err)?
612        .into_iter()
613        .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
614        .collect::<HashMap<_, _>>();
615
616    for idx in 0..param_count {
617        let server_type = param_types
618            .get(&format!("${}", idx + 1))
619            .and_then(|t| t.as_ref());
620
621        let client_type = if let Some(client_given_type) = client_param_types.get(idx) {
622            client_given_type.clone()
623        } else if let Some(server_provided_type) = &server_type {
624            type_gt_to_pg(server_provided_type).map_err(convert_err)?
625        } else {
626            return Err(invalid_parameter_error(
627                "unknown_parameter_type",
628                Some(format!(
629                    "Cannot get parameter type information for parameter {}",
630                    idx
631                )),
632            ));
633        };
634
635        let value = match &client_type {
636            &Type::VARCHAR | &Type::TEXT => {
637                let data = portal.parameter::<String>(idx, &client_type)?;
638                if let Some(server_type) = &server_type {
639                    match server_type {
640                        ConcreteDataType::String(_) => ScalarValue::Utf8(data),
641                        _ => {
642                            return Err(invalid_parameter_error(
643                                "invalid_parameter_type",
644                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
645                            ))
646                        }
647                    }
648                } else {
649                    ScalarValue::Utf8(data)
650                }
651            }
652            &Type::BOOL => {
653                let data = portal.parameter::<bool>(idx, &client_type)?;
654                if let Some(server_type) = &server_type {
655                    match server_type {
656                        ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
657                        _ => {
658                            return Err(invalid_parameter_error(
659                                "invalid_parameter_type",
660                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
661                            ))
662                        }
663                    }
664                } else {
665                    ScalarValue::Boolean(data)
666                }
667            }
668            &Type::INT2 => {
669                let data = portal.parameter::<i16>(idx, &client_type)?;
670                if let Some(server_type) = &server_type {
671                    match server_type {
672                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
673                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
674                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
675                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
676                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
677                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
678                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
679                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
680                        ConcreteDataType::Timestamp(unit) => {
681                            to_timestamp_scalar_value(data, unit, server_type)?
682                        }
683                        _ => {
684                            return Err(invalid_parameter_error(
685                                "invalid_parameter_type",
686                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
687                            ))
688                        }
689                    }
690                } else {
691                    ScalarValue::Int16(data)
692                }
693            }
694            &Type::INT4 => {
695                let data = portal.parameter::<i32>(idx, &client_type)?;
696                if let Some(server_type) = &server_type {
697                    match server_type {
698                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
699                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
700                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
701                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
702                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
703                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
704                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
705                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
706                        ConcreteDataType::Timestamp(unit) => {
707                            to_timestamp_scalar_value(data, unit, server_type)?
708                        }
709                        _ => {
710                            return Err(invalid_parameter_error(
711                                "invalid_parameter_type",
712                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
713                            ))
714                        }
715                    }
716                } else {
717                    ScalarValue::Int32(data)
718                }
719            }
720            &Type::INT8 => {
721                let data = portal.parameter::<i64>(idx, &client_type)?;
722                if let Some(server_type) = &server_type {
723                    match server_type {
724                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
725                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
726                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
727                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
728                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
729                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
730                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
731                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
732                        ConcreteDataType::Timestamp(unit) => {
733                            to_timestamp_scalar_value(data, unit, server_type)?
734                        }
735                        _ => {
736                            return Err(invalid_parameter_error(
737                                "invalid_parameter_type",
738                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
739                            ))
740                        }
741                    }
742                } else {
743                    ScalarValue::Int64(data)
744                }
745            }
746            &Type::FLOAT4 => {
747                let data = portal.parameter::<f32>(idx, &client_type)?;
748                if let Some(server_type) = &server_type {
749                    match server_type {
750                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
751                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
752                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
753                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
754                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
755                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
756                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
757                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
758                        ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
759                        ConcreteDataType::Float64(_) => {
760                            ScalarValue::Float64(data.map(|n| n as f64))
761                        }
762                        _ => {
763                            return Err(invalid_parameter_error(
764                                "invalid_parameter_type",
765                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
766                            ))
767                        }
768                    }
769                } else {
770                    ScalarValue::Float32(data)
771                }
772            }
773            &Type::FLOAT8 => {
774                let data = portal.parameter::<f64>(idx, &client_type)?;
775                if let Some(server_type) = &server_type {
776                    match server_type {
777                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
778                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
779                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
780                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
781                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
782                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
783                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
784                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
785                        ConcreteDataType::Float32(_) => {
786                            ScalarValue::Float32(data.map(|n| n as f32))
787                        }
788                        ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
789                        _ => {
790                            return Err(invalid_parameter_error(
791                                "invalid_parameter_type",
792                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
793                            ))
794                        }
795                    }
796                } else {
797                    ScalarValue::Float64(data)
798                }
799            }
800            &Type::TIMESTAMP => {
801                let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
802                if let Some(server_type) = &server_type {
803                    match server_type {
804                        ConcreteDataType::Timestamp(unit) => match *unit {
805                            TimestampType::Second(_) => ScalarValue::TimestampSecond(
806                                data.map(|ts| ts.and_utc().timestamp()),
807                                None,
808                            ),
809                            TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
810                                data.map(|ts| ts.and_utc().timestamp_millis()),
811                                None,
812                            ),
813                            TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
814                                data.map(|ts| ts.and_utc().timestamp_micros()),
815                                None,
816                            ),
817                            TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
818                                data.map(|ts| ts.and_utc().timestamp_micros()),
819                                None,
820                            ),
821                        },
822                        _ => {
823                            return Err(invalid_parameter_error(
824                                "invalid_parameter_type",
825                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
826                            ))
827                        }
828                    }
829                } else {
830                    ScalarValue::TimestampMillisecond(
831                        data.map(|ts| ts.and_utc().timestamp_millis()),
832                        None,
833                    )
834                }
835            }
836            &Type::DATE => {
837                let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
838                if let Some(server_type) = &server_type {
839                    match server_type {
840                        ConcreteDataType::Date(_) => ScalarValue::Date32(data.map(|d| {
841                            (d - NaiveDate::from(NaiveDateTime::UNIX_EPOCH)).num_days() as i32
842                        })),
843                        _ => {
844                            return Err(invalid_parameter_error(
845                                "invalid_parameter_type",
846                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
847                            ));
848                        }
849                    }
850                } else {
851                    ScalarValue::Date32(data.map(|d| {
852                        (d - NaiveDate::from(NaiveDateTime::UNIX_EPOCH)).num_days() as i32
853                    }))
854                }
855            }
856            &Type::INTERVAL => {
857                let data = portal.parameter::<PgInterval>(idx, &client_type)?;
858                if let Some(server_type) = &server_type {
859                    match server_type {
860                        ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
861                            ScalarValue::IntervalYearMonth(
862                                data.map(|i| {
863                                    if i.days != 0 || i.microseconds != 0 {
864                                        Err(invalid_parameter_error(
865                                            "invalid_parameter_type",
866                                            Some(format!(
867                                                "Expected: {}, found: {}",
868                                                server_type, client_type
869                                            )),
870                                        ))
871                                    } else {
872                                        Ok(IntervalYearMonth::new(i.months).to_i32())
873                                    }
874                                })
875                                .transpose()?,
876                            )
877                        }
878                        ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
879                            ScalarValue::IntervalDayTime(
880                                data.map(|i| {
881                                    if i.months != 0 || i.microseconds % 1000 != 0 {
882                                        Err(invalid_parameter_error(
883                                            "invalid_parameter_type",
884                                            Some(format!(
885                                                "Expected: {}, found: {}",
886                                                server_type, client_type
887                                            )),
888                                        ))
889                                    } else {
890                                        Ok(IntervalDayTime::new(
891                                            i.days,
892                                            (i.microseconds / 1000) as i32,
893                                        )
894                                        .into())
895                                    }
896                                })
897                                .transpose()?,
898                            )
899                        }
900                        ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
901                            ScalarValue::IntervalMonthDayNano(
902                                data.map(|i| IntervalMonthDayNano::from(i).into()),
903                            )
904                        }
905                        _ => {
906                            return Err(invalid_parameter_error(
907                                "invalid_parameter_type",
908                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
909                            ));
910                        }
911                    }
912                } else {
913                    ScalarValue::IntervalMonthDayNano(
914                        data.map(|i| IntervalMonthDayNano::from(i).into()),
915                    )
916                }
917            }
918            &Type::BYTEA => {
919                let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
920                if let Some(server_type) = &server_type {
921                    match server_type {
922                        ConcreteDataType::String(_) => {
923                            ScalarValue::Utf8(data.map(|d| String::from_utf8_lossy(&d).to_string()))
924                        }
925                        ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
926                        _ => {
927                            return Err(invalid_parameter_error(
928                                "invalid_parameter_type",
929                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
930                            ));
931                        }
932                    }
933                } else {
934                    ScalarValue::Binary(data)
935                }
936            }
937            &Type::JSONB => {
938                let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
939                if let Some(server_type) = &server_type {
940                    match server_type {
941                        ConcreteDataType::Binary(_) => {
942                            ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
943                        }
944                        _ => {
945                            return Err(invalid_parameter_error(
946                                "invalid_parameter_type",
947                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
948                            ));
949                        }
950                    }
951                } else {
952                    ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
953                }
954            }
955            &Type::INT2_ARRAY => {
956                let data = portal.parameter::<Vec<i16>>(idx, &client_type)?;
957                if let Some(data) = data {
958                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
959                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
960                } else {
961                    ScalarValue::Null
962                }
963            }
964            &Type::INT4_ARRAY => {
965                let data = portal.parameter::<Vec<i32>>(idx, &client_type)?;
966                if let Some(data) = data {
967                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
968                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
969                } else {
970                    ScalarValue::Null
971                }
972            }
973            &Type::INT8_ARRAY => {
974                let data = portal.parameter::<Vec<i64>>(idx, &client_type)?;
975                if let Some(data) = data {
976                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
977                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
978                } else {
979                    ScalarValue::Null
980                }
981            }
982            &Type::VARCHAR_ARRAY => {
983                let data = portal.parameter::<Vec<String>>(idx, &client_type)?;
984                if let Some(data) = data {
985                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
986                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
987                } else {
988                    ScalarValue::Null
989                }
990            }
991            _ => Err(invalid_parameter_error(
992                "unsupported_parameter_value",
993                Some(format!("Found type: {}", client_type)),
994            ))?,
995        };
996
997        results.push(value);
998    }
999
1000    Ok(results)
1001}
1002
1003pub(super) fn param_types_to_pg_types(
1004    param_types: &HashMap<String, Option<ConcreteDataType>>,
1005) -> Result<Vec<Type>> {
1006    let param_count = param_types.len();
1007    let mut types = Vec::with_capacity(param_count);
1008    for i in 0..param_count {
1009        if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1010            let pg_type = type_gt_to_pg(param_type)?;
1011            types.push(pg_type);
1012        } else {
1013            types.push(Type::UNKNOWN);
1014        }
1015    }
1016    Ok(types)
1017}
1018
1019#[cfg(test)]
1020mod test {
1021    use std::sync::Arc;
1022
1023    use common_time::interval::IntervalUnit;
1024    use common_time::timestamp::TimeUnit;
1025    use common_time::Timestamp;
1026    use datatypes::schema::{ColumnSchema, Schema};
1027    use datatypes::value::ListValue;
1028    use pgwire::api::results::{FieldFormat, FieldInfo};
1029    use pgwire::api::Type;
1030    use session::context::QueryContextBuilder;
1031
1032    use super::*;
1033
1034    #[test]
1035    fn test_schema_convert() {
1036        let column_schemas = vec![
1037            ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1038            ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1039            ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1040            ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1041            ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1042            ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1043            ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1044            ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1045            ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1046            ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1047            ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1048            ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1049            ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1050            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1051            ColumnSchema::new(
1052                "timestamps",
1053                ConcreteDataType::timestamp_millisecond_datatype(),
1054                true,
1055            ),
1056            ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1057            ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1058            ColumnSchema::new(
1059                "intervals",
1060                ConcreteDataType::interval_month_day_nano_datatype(),
1061                true,
1062            ),
1063        ];
1064        let pg_field_info = vec![
1065            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1066            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1067            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1068            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1069            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1070            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1071            FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1072            FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1073            FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1074            FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1075            FieldInfo::new(
1076                "float32s".into(),
1077                None,
1078                None,
1079                Type::FLOAT4,
1080                FieldFormat::Text,
1081            ),
1082            FieldInfo::new(
1083                "float64s".into(),
1084                None,
1085                None,
1086                Type::FLOAT8,
1087                FieldFormat::Text,
1088            ),
1089            FieldInfo::new(
1090                "binaries".into(),
1091                None,
1092                None,
1093                Type::BYTEA,
1094                FieldFormat::Text,
1095            ),
1096            FieldInfo::new(
1097                "strings".into(),
1098                None,
1099                None,
1100                Type::VARCHAR,
1101                FieldFormat::Text,
1102            ),
1103            FieldInfo::new(
1104                "timestamps".into(),
1105                None,
1106                None,
1107                Type::TIMESTAMP,
1108                FieldFormat::Text,
1109            ),
1110            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1111            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1112            FieldInfo::new(
1113                "intervals".into(),
1114                None,
1115                None,
1116                Type::INTERVAL,
1117                FieldFormat::Text,
1118            ),
1119        ];
1120        let schema = Schema::new(column_schemas);
1121        let fs = schema_to_pg(&schema, &Format::UnifiedText).unwrap();
1122        assert_eq!(fs, pg_field_info);
1123    }
1124
1125    #[test]
1126    fn test_encode_text_format_data() {
1127        let schema = vec![
1128            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1129            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1130            FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1131            FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1132            FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1133            FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1134            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1135            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1136            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1137            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1138            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1139            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1140            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1141            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1142            FieldInfo::new(
1143                "float32s".into(),
1144                None,
1145                None,
1146                Type::FLOAT4,
1147                FieldFormat::Text,
1148            ),
1149            FieldInfo::new(
1150                "float32s".into(),
1151                None,
1152                None,
1153                Type::FLOAT4,
1154                FieldFormat::Text,
1155            ),
1156            FieldInfo::new(
1157                "float32s".into(),
1158                None,
1159                None,
1160                Type::FLOAT4,
1161                FieldFormat::Text,
1162            ),
1163            FieldInfo::new(
1164                "float64s".into(),
1165                None,
1166                None,
1167                Type::FLOAT8,
1168                FieldFormat::Text,
1169            ),
1170            FieldInfo::new(
1171                "float64s".into(),
1172                None,
1173                None,
1174                Type::FLOAT8,
1175                FieldFormat::Text,
1176            ),
1177            FieldInfo::new(
1178                "float64s".into(),
1179                None,
1180                None,
1181                Type::FLOAT8,
1182                FieldFormat::Text,
1183            ),
1184            FieldInfo::new(
1185                "strings".into(),
1186                None,
1187                None,
1188                Type::VARCHAR,
1189                FieldFormat::Text,
1190            ),
1191            FieldInfo::new(
1192                "binaries".into(),
1193                None,
1194                None,
1195                Type::BYTEA,
1196                FieldFormat::Text,
1197            ),
1198            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1199            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1200            FieldInfo::new(
1201                "timestamps".into(),
1202                None,
1203                None,
1204                Type::TIMESTAMP,
1205                FieldFormat::Text,
1206            ),
1207            FieldInfo::new(
1208                "interval_year_month".into(),
1209                None,
1210                None,
1211                Type::INTERVAL,
1212                FieldFormat::Text,
1213            ),
1214            FieldInfo::new(
1215                "interval_day_time".into(),
1216                None,
1217                None,
1218                Type::INTERVAL,
1219                FieldFormat::Text,
1220            ),
1221            FieldInfo::new(
1222                "interval_month_day_nano".into(),
1223                None,
1224                None,
1225                Type::INTERVAL,
1226                FieldFormat::Text,
1227            ),
1228            FieldInfo::new(
1229                "int_list".into(),
1230                None,
1231                None,
1232                Type::INT8_ARRAY,
1233                FieldFormat::Text,
1234            ),
1235            FieldInfo::new(
1236                "float_list".into(),
1237                None,
1238                None,
1239                Type::FLOAT8_ARRAY,
1240                FieldFormat::Text,
1241            ),
1242            FieldInfo::new(
1243                "string_list".into(),
1244                None,
1245                None,
1246                Type::VARCHAR_ARRAY,
1247                FieldFormat::Text,
1248            ),
1249            FieldInfo::new(
1250                "timestamp_list".into(),
1251                None,
1252                None,
1253                Type::TIMESTAMP_ARRAY,
1254                FieldFormat::Text,
1255            ),
1256        ];
1257
1258        let datatypes = vec![
1259            ConcreteDataType::null_datatype(),
1260            ConcreteDataType::boolean_datatype(),
1261            ConcreteDataType::uint8_datatype(),
1262            ConcreteDataType::uint16_datatype(),
1263            ConcreteDataType::uint32_datatype(),
1264            ConcreteDataType::uint64_datatype(),
1265            ConcreteDataType::int8_datatype(),
1266            ConcreteDataType::int8_datatype(),
1267            ConcreteDataType::int16_datatype(),
1268            ConcreteDataType::int16_datatype(),
1269            ConcreteDataType::int32_datatype(),
1270            ConcreteDataType::int32_datatype(),
1271            ConcreteDataType::int64_datatype(),
1272            ConcreteDataType::int64_datatype(),
1273            ConcreteDataType::float32_datatype(),
1274            ConcreteDataType::float32_datatype(),
1275            ConcreteDataType::float32_datatype(),
1276            ConcreteDataType::float64_datatype(),
1277            ConcreteDataType::float64_datatype(),
1278            ConcreteDataType::float64_datatype(),
1279            ConcreteDataType::string_datatype(),
1280            ConcreteDataType::binary_datatype(),
1281            ConcreteDataType::date_datatype(),
1282            ConcreteDataType::time_datatype(TimeUnit::Second),
1283            ConcreteDataType::timestamp_datatype(TimeUnit::Second),
1284            ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
1285            ConcreteDataType::interval_datatype(IntervalUnit::DayTime),
1286            ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
1287            ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()),
1288            ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
1289            ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
1290            ConcreteDataType::list_datatype(ConcreteDataType::timestamp_second_datatype()),
1291        ];
1292        let values = vec![
1293            Value::Null,
1294            Value::Boolean(true),
1295            Value::UInt8(u8::MAX),
1296            Value::UInt16(u16::MAX),
1297            Value::UInt32(u32::MAX),
1298            Value::UInt64(u64::MAX),
1299            Value::Int8(i8::MAX),
1300            Value::Int8(i8::MIN),
1301            Value::Int16(i16::MAX),
1302            Value::Int16(i16::MIN),
1303            Value::Int32(i32::MAX),
1304            Value::Int32(i32::MIN),
1305            Value::Int64(i64::MAX),
1306            Value::Int64(i64::MIN),
1307            Value::Float32(f32::MAX.into()),
1308            Value::Float32(f32::MIN.into()),
1309            Value::Float32(0f32.into()),
1310            Value::Float64(f64::MAX.into()),
1311            Value::Float64(f64::MIN.into()),
1312            Value::Float64(0f64.into()),
1313            Value::String("greptime".into()),
1314            Value::Binary("greptime".as_bytes().into()),
1315            Value::Date(1001i32.into()),
1316            Value::Time(1001i64.into()),
1317            Value::Timestamp(1000001i64.into()),
1318            Value::IntervalYearMonth(IntervalYearMonth::new(1)),
1319            Value::IntervalDayTime(IntervalDayTime::new(1, 10)),
1320            Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 10)),
1321            Value::List(ListValue::new(
1322                vec![Value::Int64(1i64)],
1323                ConcreteDataType::int64_datatype(),
1324            )),
1325            Value::List(ListValue::new(
1326                vec![Value::Float64(1.0f64.into())],
1327                ConcreteDataType::float64_datatype(),
1328            )),
1329            Value::List(ListValue::new(
1330                vec![Value::String("tom".into())],
1331                ConcreteDataType::string_datatype(),
1332            )),
1333            Value::List(ListValue::new(
1334                vec![Value::Timestamp(Timestamp::new(1i64, TimeUnit::Second))],
1335                ConcreteDataType::timestamp_second_datatype(),
1336            )),
1337        ];
1338        let query_context = QueryContextBuilder::default()
1339            .configuration_parameter(Default::default())
1340            .build()
1341            .into();
1342        let mut builder = DataRowEncoder::new(Arc::new(schema));
1343        for (value, datatype) in values.iter().zip(datatypes) {
1344            encode_value(&query_context, value, &mut builder, &datatype).unwrap();
1345        }
1346    }
1347
1348    #[test]
1349    fn test_invalid_parameter() {
1350        // test for refactor with PgErrorCode
1351        let msg = "invalid_parameter_count";
1352        let error = invalid_parameter_error(msg, None);
1353        if let PgWireError::UserError(value) = error {
1354            assert_eq!("ERROR", value.severity);
1355            assert_eq!("22023", value.code);
1356            assert_eq!(msg, value.message);
1357        } else {
1358            panic!("test_invalid_parameter failed");
1359        }
1360    }
1361}