servers/postgres/
types.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod bytea;
16mod datetime;
17mod error;
18mod interval;
19
20use std::collections::HashMap;
21use std::ops::Deref;
22
23use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
24use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datafusion_expr::LogicalPlan;
27use datatypes::arrow::datatypes::DataType as ArrowDataType;
28use datatypes::prelude::{ConcreteDataType, Value};
29use datatypes::schema::Schema;
30use datatypes::types::{json_type_value_to_string, IntervalType, TimestampType};
31use datatypes::value::ListValue;
32use pgwire::api::portal::{Format, Portal};
33use pgwire::api::results::{DataRowEncoder, FieldInfo};
34use pgwire::api::Type;
35use pgwire::error::{PgWireError, PgWireResult};
36use session::context::QueryContextRef;
37use session::session_config::PGByteaOutputValue;
38use snafu::ResultExt;
39
40use self::bytea::{EscapeOutputBytea, HexOutputBytea};
41use self::datetime::{StylingDate, StylingDateTime};
42pub use self::error::{PgErrorCode, PgErrorSeverity};
43use self::interval::PgInterval;
44use crate::error::{self as server_error, DataFusionSnafu, Error, Result};
45use crate::postgres::utils::convert_err;
46use crate::SqlPlan;
47
48pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result<Vec<FieldInfo>> {
49    origin
50        .column_schemas()
51        .iter()
52        .enumerate()
53        .map(|(idx, col)| {
54            Ok(FieldInfo::new(
55                col.name.clone(),
56                None,
57                None,
58                type_gt_to_pg(&col.data_type)?,
59                field_formats.format_for(idx),
60            ))
61        })
62        .collect::<Result<Vec<FieldInfo>>>()
63}
64
65fn encode_array(
66    query_ctx: &QueryContextRef,
67    value_list: &ListValue,
68    builder: &mut DataRowEncoder,
69) -> PgWireResult<()> {
70    match value_list.datatype() {
71        &ConcreteDataType::Boolean(_) => {
72            let array = value_list
73                .items()
74                .iter()
75                .map(|v| match v {
76                    Value::Null => Ok(None),
77                    Value::Boolean(v) => Ok(Some(*v)),
78                    _ => Err(convert_err(Error::Internal {
79                        err_msg: format!("Invalid list item type, find {v:?}, expected bool",),
80                    })),
81                })
82                .collect::<PgWireResult<Vec<Option<bool>>>>()?;
83            builder.encode_field(&array)
84        }
85        &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => {
86            let array = value_list
87                .items()
88                .iter()
89                .map(|v| match v {
90                    Value::Null => Ok(None),
91                    Value::Int8(v) => Ok(Some(*v)),
92                    Value::UInt8(v) => Ok(Some(*v as i8)),
93                    _ => Err(convert_err(Error::Internal {
94                        err_msg: format!(
95                            "Invalid list item type, find {v:?}, expected int8 or uint8",
96                        ),
97                    })),
98                })
99                .collect::<PgWireResult<Vec<Option<i8>>>>()?;
100            builder.encode_field(&array)
101        }
102        &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => {
103            let array = value_list
104                .items()
105                .iter()
106                .map(|v| match v {
107                    Value::Null => Ok(None),
108                    Value::Int16(v) => Ok(Some(*v)),
109                    Value::UInt16(v) => Ok(Some(*v as i16)),
110                    _ => Err(convert_err(Error::Internal {
111                        err_msg: format!(
112                            "Invalid list item type, find {v:?}, expected int16 or uint16",
113                        ),
114                    })),
115                })
116                .collect::<PgWireResult<Vec<Option<i16>>>>()?;
117            builder.encode_field(&array)
118        }
119        &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => {
120            let array = value_list
121                .items()
122                .iter()
123                .map(|v| match v {
124                    Value::Null => Ok(None),
125                    Value::Int32(v) => Ok(Some(*v)),
126                    Value::UInt32(v) => Ok(Some(*v as i32)),
127                    _ => Err(convert_err(Error::Internal {
128                        err_msg: format!(
129                            "Invalid list item type, find {v:?}, expected int32 or uint32",
130                        ),
131                    })),
132                })
133                .collect::<PgWireResult<Vec<Option<i32>>>>()?;
134            builder.encode_field(&array)
135        }
136        &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => {
137            let array = value_list
138                .items()
139                .iter()
140                .map(|v| match v {
141                    Value::Null => Ok(None),
142                    Value::Int64(v) => Ok(Some(*v)),
143                    Value::UInt64(v) => Ok(Some(*v as i64)),
144                    _ => Err(convert_err(Error::Internal {
145                        err_msg: format!(
146                            "Invalid list item type, find {v:?}, expected int64 or uint64",
147                        ),
148                    })),
149                })
150                .collect::<PgWireResult<Vec<Option<i64>>>>()?;
151            builder.encode_field(&array)
152        }
153        &ConcreteDataType::Float32(_) => {
154            let array = value_list
155                .items()
156                .iter()
157                .map(|v| match v {
158                    Value::Null => Ok(None),
159                    Value::Float32(v) => Ok(Some(v.0)),
160                    _ => Err(convert_err(Error::Internal {
161                        err_msg: format!("Invalid list item type, find {v:?}, expected float32",),
162                    })),
163                })
164                .collect::<PgWireResult<Vec<Option<f32>>>>()?;
165            builder.encode_field(&array)
166        }
167        &ConcreteDataType::Float64(_) => {
168            let array = value_list
169                .items()
170                .iter()
171                .map(|v| match v {
172                    Value::Null => Ok(None),
173                    Value::Float64(v) => Ok(Some(v.0)),
174                    _ => Err(convert_err(Error::Internal {
175                        err_msg: format!("Invalid list item type, find {v:?}, expected float64",),
176                    })),
177                })
178                .collect::<PgWireResult<Vec<Option<f64>>>>()?;
179            builder.encode_field(&array)
180        }
181        &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => {
182            let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
183
184            match *bytea_output {
185                PGByteaOutputValue::ESCAPE => {
186                    let array = value_list
187                        .items()
188                        .iter()
189                        .map(|v| match v {
190                            Value::Null => Ok(None),
191                            Value::Binary(v) => Ok(Some(EscapeOutputBytea(v.deref()))),
192
193                            _ => Err(convert_err(Error::Internal {
194                                err_msg: format!(
195                                    "Invalid list item type, find {v:?}, expected binary",
196                                ),
197                            })),
198                        })
199                        .collect::<PgWireResult<Vec<Option<EscapeOutputBytea>>>>()?;
200                    builder.encode_field(&array)
201                }
202                PGByteaOutputValue::HEX => {
203                    let array = value_list
204                        .items()
205                        .iter()
206                        .map(|v| match v {
207                            Value::Null => Ok(None),
208                            Value::Binary(v) => Ok(Some(HexOutputBytea(v.deref()))),
209
210                            _ => Err(convert_err(Error::Internal {
211                                err_msg: format!(
212                                    "Invalid list item type, find {v:?}, expected binary",
213                                ),
214                            })),
215                        })
216                        .collect::<PgWireResult<Vec<Option<HexOutputBytea>>>>()?;
217                    builder.encode_field(&array)
218                }
219            }
220        }
221        &ConcreteDataType::String(_) => {
222            let array = value_list
223                .items()
224                .iter()
225                .map(|v| match v {
226                    Value::Null => Ok(None),
227                    Value::String(v) => Ok(Some(v.as_utf8())),
228                    _ => Err(convert_err(Error::Internal {
229                        err_msg: format!("Invalid list item type, find {v:?}, expected string",),
230                    })),
231                })
232                .collect::<PgWireResult<Vec<Option<&str>>>>()?;
233            builder.encode_field(&array)
234        }
235        &ConcreteDataType::Date(_) => {
236            let array = value_list
237                .items()
238                .iter()
239                .map(|v| match v {
240                    Value::Null => Ok(None),
241                    Value::Date(v) => {
242                        if let Some(date) = v.to_chrono_date() {
243                            let (style, order) =
244                                *query_ctx.configuration_parameter().pg_datetime_style();
245                            Ok(Some(StylingDate(date, style, order)))
246                        } else {
247                            Err(convert_err(Error::Internal {
248                                err_msg: format!("Failed to convert date to postgres type {v:?}",),
249                            }))
250                        }
251                    }
252                    _ => Err(convert_err(Error::Internal {
253                        err_msg: format!("Invalid list item type, find {v:?}, expected date",),
254                    })),
255                })
256                .collect::<PgWireResult<Vec<Option<StylingDate>>>>()?;
257            builder.encode_field(&array)
258        }
259        &ConcreteDataType::Timestamp(_) => {
260            let array = value_list
261                .items()
262                .iter()
263                .map(|v| match v {
264                    Value::Null => Ok(None),
265                    Value::Timestamp(v) => {
266                        if let Some(datetime) =
267                            v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
268                        {
269                            let (style, order) =
270                                *query_ctx.configuration_parameter().pg_datetime_style();
271                            Ok(Some(StylingDateTime(datetime, style, order)))
272                        } else {
273                            Err(convert_err(Error::Internal {
274                                err_msg: format!("Failed to convert date to postgres type {v:?}",),
275                            }))
276                        }
277                    }
278                    _ => Err(convert_err(Error::Internal {
279                        err_msg: format!("Invalid list item type, find {v:?}, expected timestamp",),
280                    })),
281                })
282                .collect::<PgWireResult<Vec<Option<StylingDateTime>>>>()?;
283            builder.encode_field(&array)
284        }
285        &ConcreteDataType::Time(_) => {
286            let array = value_list
287                .items()
288                .iter()
289                .map(|v| match v {
290                    Value::Null => Ok(None),
291                    Value::Time(v) => Ok(v.to_chrono_time()),
292                    _ => Err(convert_err(Error::Internal {
293                        err_msg: format!("Invalid list item type, find {v:?}, expected time",),
294                    })),
295                })
296                .collect::<PgWireResult<Vec<Option<NaiveTime>>>>()?;
297            builder.encode_field(&array)
298        }
299        &ConcreteDataType::Interval(_) => {
300            let array = value_list
301                .items()
302                .iter()
303                .map(|v| match v {
304                    Value::Null => Ok(None),
305                    Value::IntervalYearMonth(v) => Ok(Some(PgInterval::from(*v))),
306                    Value::IntervalDayTime(v) => Ok(Some(PgInterval::from(*v))),
307                    Value::IntervalMonthDayNano(v) => Ok(Some(PgInterval::from(*v))),
308                    _ => Err(convert_err(Error::Internal {
309                        err_msg: format!("Invalid list item type, find {v:?}, expected interval",),
310                    })),
311                })
312                .collect::<PgWireResult<Vec<Option<PgInterval>>>>()?;
313            builder.encode_field(&array)
314        }
315        &ConcreteDataType::Decimal128(_) => {
316            let array = value_list
317                .items()
318                .iter()
319                .map(|v| match v {
320                    Value::Null => Ok(None),
321                    Value::Decimal128(v) => Ok(Some(v.to_string())),
322                    _ => Err(convert_err(Error::Internal {
323                        err_msg: format!("Invalid list item type, find {v:?}, expected decimal",),
324                    })),
325                })
326                .collect::<PgWireResult<Vec<Option<String>>>>()?;
327            builder.encode_field(&array)
328        }
329        &ConcreteDataType::Json(j) => {
330            let array = value_list
331                .items()
332                .iter()
333                .map(|v| match v {
334                    Value::Null => Ok(None),
335                    Value::Binary(v) => {
336                        let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
337                        Ok(Some(s))
338                    }
339                    _ => Err(convert_err(Error::Internal {
340                        err_msg: format!("Invalid list item type, find {v:?}, expected json",),
341                    })),
342                })
343                .collect::<PgWireResult<Vec<Option<String>>>>()?;
344            builder.encode_field(&array)
345        }
346        _ => Err(convert_err(Error::Internal {
347            err_msg: format!(
348                "cannot write array type {:?} in postgres protocol: unimplemented",
349                value_list.datatype()
350            ),
351        })),
352    }
353}
354
355pub(super) fn encode_value(
356    query_ctx: &QueryContextRef,
357    value: &Value,
358    builder: &mut DataRowEncoder,
359    datatype: &ConcreteDataType,
360) -> PgWireResult<()> {
361    match value {
362        Value::Null => builder.encode_field(&None::<&i8>),
363        Value::Boolean(v) => builder.encode_field(v),
364        Value::UInt8(v) => builder.encode_field(&(*v as i8)),
365        Value::UInt16(v) => builder.encode_field(&(*v as i16)),
366        Value::UInt32(v) => builder.encode_field(v),
367        Value::UInt64(v) => builder.encode_field(&(*v as i64)),
368        Value::Int8(v) => builder.encode_field(v),
369        Value::Int16(v) => builder.encode_field(v),
370        Value::Int32(v) => builder.encode_field(v),
371        Value::Int64(v) => builder.encode_field(v),
372        Value::Float32(v) => builder.encode_field(&v.0),
373        Value::Float64(v) => builder.encode_field(&v.0),
374        Value::String(v) => builder.encode_field(&v.as_utf8()),
375        Value::Binary(v) => match datatype {
376            ConcreteDataType::Json(j) => {
377                let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
378                builder.encode_field(&s)
379            }
380            _ => {
381                let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
382                match *bytea_output {
383                    PGByteaOutputValue::ESCAPE => {
384                        builder.encode_field(&EscapeOutputBytea(v.deref()))
385                    }
386                    PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
387                }
388            }
389        },
390        Value::Date(v) => {
391            if let Some(date) = v.to_chrono_date() {
392                let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
393                builder.encode_field(&StylingDate(date, style, order))
394            } else {
395                Err(convert_err(Error::Internal {
396                    err_msg: format!("Failed to convert date to postgres type {v:?}",),
397                }))
398            }
399        }
400        Value::Timestamp(v) => {
401            if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
402            {
403                let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
404                builder.encode_field(&StylingDateTime(datetime, style, order))
405            } else {
406                Err(convert_err(Error::Internal {
407                    err_msg: format!("Failed to convert date to postgres type {v:?}",),
408                }))
409            }
410        }
411        Value::Time(v) => {
412            if let Some(time) = v.to_chrono_time() {
413                builder.encode_field(&time)
414            } else {
415                Err(convert_err(Error::Internal {
416                    err_msg: format!("Failed to convert time to postgres type {v:?}",),
417                }))
418            }
419        }
420        Value::IntervalYearMonth(v) => builder.encode_field(&PgInterval::from(*v)),
421        Value::IntervalDayTime(v) => builder.encode_field(&PgInterval::from(*v)),
422        Value::IntervalMonthDayNano(v) => builder.encode_field(&PgInterval::from(*v)),
423        Value::Decimal128(v) => builder.encode_field(&v.to_string()),
424        Value::Duration(d) => match PgInterval::try_from(*d) {
425            Ok(i) => builder.encode_field(&i),
426            Err(e) => Err(convert_err(Error::Internal {
427                err_msg: e.to_string(),
428            })),
429        },
430        Value::List(values) => encode_array(query_ctx, values, builder),
431    }
432}
433
434pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
435    match origin {
436        &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
437        &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
438        &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR),
439        &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2),
440        &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4),
441        &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8),
442        &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
443        &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
444        &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
445        &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
446        &ConcreteDataType::Date(_) => Ok(Type::DATE),
447        &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
448        &ConcreteDataType::Time(_) => Ok(Type::TIME),
449        &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
450        &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
451        &ConcreteDataType::Json(_) => Ok(Type::JSON),
452        ConcreteDataType::List(list) => match list.item_type() {
453            &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
454            &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
455            &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR_ARRAY),
456            &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2_ARRAY),
457            &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4_ARRAY),
458            &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8_ARRAY),
459            &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
460            &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
461            &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
462            &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
463            &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
464            &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
465            &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
466            &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
467            &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
468            &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
469            &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
470            &ConcreteDataType::Dictionary(_)
471            | &ConcreteDataType::Vector(_)
472            | &ConcreteDataType::List(_)
473            | &ConcreteDataType::Struct(_) => server_error::UnsupportedDataTypeSnafu {
474                data_type: origin,
475                reason: "not implemented",
476            }
477            .fail(),
478        },
479        &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
480            data_type: origin,
481            reason: "not implemented",
482        }
483        .fail(),
484        &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
485        &ConcreteDataType::Struct(_) => server_error::UnsupportedDataTypeSnafu {
486            data_type: origin,
487            reason: "not implemented",
488        }
489        .fail(),
490    }
491}
492
493#[allow(dead_code)]
494pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
495    // Note that we only support a small amount of pg data types
496    match origin {
497        &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
498        &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
499        &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
500        &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
501        &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
502        &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
503        &Type::TIMESTAMP => Ok(ConcreteDataType::timestamp_datatype(
504            common_time::timestamp::TimeUnit::Millisecond,
505        )),
506        &Type::DATE => Ok(ConcreteDataType::date_datatype()),
507        &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
508            common_time::timestamp::TimeUnit::Microsecond,
509        )),
510        &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
511            ConcreteDataType::int8_datatype(),
512        )),
513        &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(
514            ConcreteDataType::int16_datatype(),
515        )),
516        &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(
517            ConcreteDataType::int32_datatype(),
518        )),
519        &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(
520            ConcreteDataType::int64_datatype(),
521        )),
522        &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
523            ConcreteDataType::string_datatype(),
524        )),
525        _ => server_error::InternalSnafu {
526            err_msg: format!("unimplemented datatype {origin:?}"),
527        }
528        .fail(),
529    }
530}
531
532pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
533    // the index is managed from portal's parameters count so it's safe to
534    // unwrap here.
535    let param_type = portal.statement.parameter_types.get(idx).unwrap();
536    match param_type {
537        &Type::VARCHAR | &Type::TEXT => Ok(format!(
538            "'{}'",
539            portal
540                .parameter::<String>(idx, param_type)?
541                .as_deref()
542                .unwrap_or("")
543        )),
544        &Type::BOOL => Ok(portal
545            .parameter::<bool>(idx, param_type)?
546            .map(|v| v.to_string())
547            .unwrap_or_else(|| "".to_owned())),
548        &Type::INT4 => Ok(portal
549            .parameter::<i32>(idx, param_type)?
550            .map(|v| v.to_string())
551            .unwrap_or_else(|| "".to_owned())),
552        &Type::INT8 => Ok(portal
553            .parameter::<i64>(idx, param_type)?
554            .map(|v| v.to_string())
555            .unwrap_or_else(|| "".to_owned())),
556        &Type::FLOAT4 => Ok(portal
557            .parameter::<f32>(idx, param_type)?
558            .map(|v| v.to_string())
559            .unwrap_or_else(|| "".to_owned())),
560        &Type::FLOAT8 => Ok(portal
561            .parameter::<f64>(idx, param_type)?
562            .map(|v| v.to_string())
563            .unwrap_or_else(|| "".to_owned())),
564        &Type::DATE => Ok(portal
565            .parameter::<NaiveDate>(idx, param_type)?
566            .map(|v| v.format("%Y-%m-%d").to_string())
567            .unwrap_or_else(|| "".to_owned())),
568        &Type::TIMESTAMP => Ok(portal
569            .parameter::<NaiveDateTime>(idx, param_type)?
570            .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
571            .unwrap_or_else(|| "".to_owned())),
572        &Type::INTERVAL => Ok(portal
573            .parameter::<PgInterval>(idx, param_type)?
574            .map(|v| v.to_string())
575            .unwrap_or_else(|| "".to_owned())),
576        _ => Err(invalid_parameter_error(
577            "unsupported_parameter_type",
578            Some(param_type.to_string()),
579        )),
580    }
581}
582
583pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
584    let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
585    error_info.detail = detail;
586    PgWireError::UserError(Box::new(error_info))
587}
588
589fn to_timestamp_scalar_value<T>(
590    data: Option<T>,
591    unit: &TimestampType,
592    ctype: &ConcreteDataType,
593) -> PgWireResult<ScalarValue>
594where
595    T: Into<i64>,
596{
597    if let Some(n) = data {
598        Value::Timestamp(unit.create_timestamp(n.into()))
599            .try_to_scalar_value(ctype)
600            .map_err(convert_err)
601    } else {
602        Ok(ScalarValue::Null)
603    }
604}
605
606pub(super) fn parameters_to_scalar_values(
607    plan: &LogicalPlan,
608    portal: &Portal<SqlPlan>,
609) -> PgWireResult<Vec<ScalarValue>> {
610    let param_count = portal.parameter_len();
611    let mut results = Vec::with_capacity(param_count);
612
613    let client_param_types = &portal.statement.parameter_types;
614    let param_types = plan
615        .get_parameter_types()
616        .context(DataFusionSnafu)
617        .map_err(convert_err)?
618        .into_iter()
619        .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
620        .collect::<HashMap<_, _>>();
621
622    for idx in 0..param_count {
623        let server_type = param_types
624            .get(&format!("${}", idx + 1))
625            .and_then(|t| t.as_ref());
626
627        let client_type = if let Some(client_given_type) = client_param_types.get(idx) {
628            client_given_type.clone()
629        } else if let Some(server_provided_type) = &server_type {
630            type_gt_to_pg(server_provided_type).map_err(convert_err)?
631        } else {
632            return Err(invalid_parameter_error(
633                "unknown_parameter_type",
634                Some(format!(
635                    "Cannot get parameter type information for parameter {}",
636                    idx
637                )),
638            ));
639        };
640
641        let value = match &client_type {
642            &Type::VARCHAR | &Type::TEXT => {
643                let data = portal.parameter::<String>(idx, &client_type)?;
644                if let Some(server_type) = &server_type {
645                    match server_type {
646                        ConcreteDataType::String(_) => ScalarValue::Utf8(data),
647                        _ => {
648                            return Err(invalid_parameter_error(
649                                "invalid_parameter_type",
650                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
651                            ))
652                        }
653                    }
654                } else {
655                    ScalarValue::Utf8(data)
656                }
657            }
658            &Type::BOOL => {
659                let data = portal.parameter::<bool>(idx, &client_type)?;
660                if let Some(server_type) = &server_type {
661                    match server_type {
662                        ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
663                        _ => {
664                            return Err(invalid_parameter_error(
665                                "invalid_parameter_type",
666                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
667                            ))
668                        }
669                    }
670                } else {
671                    ScalarValue::Boolean(data)
672                }
673            }
674            &Type::INT2 => {
675                let data = portal.parameter::<i16>(idx, &client_type)?;
676                if let Some(server_type) = &server_type {
677                    match server_type {
678                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
679                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
680                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
681                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
682                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
683                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
684                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
685                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
686                        ConcreteDataType::Timestamp(unit) => {
687                            to_timestamp_scalar_value(data, unit, server_type)?
688                        }
689                        _ => {
690                            return Err(invalid_parameter_error(
691                                "invalid_parameter_type",
692                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
693                            ))
694                        }
695                    }
696                } else {
697                    ScalarValue::Int16(data)
698                }
699            }
700            &Type::INT4 => {
701                let data = portal.parameter::<i32>(idx, &client_type)?;
702                if let Some(server_type) = &server_type {
703                    match server_type {
704                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
705                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
706                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
707                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
708                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
709                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
710                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
711                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
712                        ConcreteDataType::Timestamp(unit) => {
713                            to_timestamp_scalar_value(data, unit, server_type)?
714                        }
715                        _ => {
716                            return Err(invalid_parameter_error(
717                                "invalid_parameter_type",
718                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
719                            ))
720                        }
721                    }
722                } else {
723                    ScalarValue::Int32(data)
724                }
725            }
726            &Type::INT8 => {
727                let data = portal.parameter::<i64>(idx, &client_type)?;
728                if let Some(server_type) = &server_type {
729                    match server_type {
730                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
731                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
732                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
733                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
734                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
735                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
736                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
737                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
738                        ConcreteDataType::Timestamp(unit) => {
739                            to_timestamp_scalar_value(data, unit, server_type)?
740                        }
741                        _ => {
742                            return Err(invalid_parameter_error(
743                                "invalid_parameter_type",
744                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
745                            ))
746                        }
747                    }
748                } else {
749                    ScalarValue::Int64(data)
750                }
751            }
752            &Type::FLOAT4 => {
753                let data = portal.parameter::<f32>(idx, &client_type)?;
754                if let Some(server_type) = &server_type {
755                    match server_type {
756                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
757                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
758                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
759                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
760                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
761                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
762                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
763                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
764                        ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
765                        ConcreteDataType::Float64(_) => {
766                            ScalarValue::Float64(data.map(|n| n as f64))
767                        }
768                        _ => {
769                            return Err(invalid_parameter_error(
770                                "invalid_parameter_type",
771                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
772                            ))
773                        }
774                    }
775                } else {
776                    ScalarValue::Float32(data)
777                }
778            }
779            &Type::FLOAT8 => {
780                let data = portal.parameter::<f64>(idx, &client_type)?;
781                if let Some(server_type) = &server_type {
782                    match server_type {
783                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
784                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
785                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
786                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
787                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
788                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
789                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
790                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
791                        ConcreteDataType::Float32(_) => {
792                            ScalarValue::Float32(data.map(|n| n as f32))
793                        }
794                        ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
795                        _ => {
796                            return Err(invalid_parameter_error(
797                                "invalid_parameter_type",
798                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
799                            ))
800                        }
801                    }
802                } else {
803                    ScalarValue::Float64(data)
804                }
805            }
806            &Type::TIMESTAMP => {
807                let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
808                if let Some(server_type) = &server_type {
809                    match server_type {
810                        ConcreteDataType::Timestamp(unit) => match *unit {
811                            TimestampType::Second(_) => ScalarValue::TimestampSecond(
812                                data.map(|ts| ts.and_utc().timestamp()),
813                                None,
814                            ),
815                            TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
816                                data.map(|ts| ts.and_utc().timestamp_millis()),
817                                None,
818                            ),
819                            TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
820                                data.map(|ts| ts.and_utc().timestamp_micros()),
821                                None,
822                            ),
823                            TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
824                                data.map(|ts| ts.and_utc().timestamp_micros()),
825                                None,
826                            ),
827                        },
828                        _ => {
829                            return Err(invalid_parameter_error(
830                                "invalid_parameter_type",
831                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
832                            ))
833                        }
834                    }
835                } else {
836                    ScalarValue::TimestampMillisecond(
837                        data.map(|ts| ts.and_utc().timestamp_millis()),
838                        None,
839                    )
840                }
841            }
842            &Type::DATE => {
843                let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
844                if let Some(server_type) = &server_type {
845                    match server_type {
846                        ConcreteDataType::Date(_) => ScalarValue::Date32(
847                            data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
848                        ),
849                        _ => {
850                            return Err(invalid_parameter_error(
851                                "invalid_parameter_type",
852                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
853                            ));
854                        }
855                    }
856                } else {
857                    ScalarValue::Date32(
858                        data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
859                    )
860                }
861            }
862            &Type::INTERVAL => {
863                let data = portal.parameter::<PgInterval>(idx, &client_type)?;
864                if let Some(server_type) = &server_type {
865                    match server_type {
866                        ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
867                            ScalarValue::IntervalYearMonth(
868                                data.map(|i| {
869                                    if i.days != 0 || i.microseconds != 0 {
870                                        Err(invalid_parameter_error(
871                                            "invalid_parameter_type",
872                                            Some(format!(
873                                                "Expected: {}, found: {}",
874                                                server_type, client_type
875                                            )),
876                                        ))
877                                    } else {
878                                        Ok(IntervalYearMonth::new(i.months).to_i32())
879                                    }
880                                })
881                                .transpose()?,
882                            )
883                        }
884                        ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
885                            ScalarValue::IntervalDayTime(
886                                data.map(|i| {
887                                    if i.months != 0 || i.microseconds % 1000 != 0 {
888                                        Err(invalid_parameter_error(
889                                            "invalid_parameter_type",
890                                            Some(format!(
891                                                "Expected: {}, found: {}",
892                                                server_type, client_type
893                                            )),
894                                        ))
895                                    } else {
896                                        Ok(IntervalDayTime::new(
897                                            i.days,
898                                            (i.microseconds / 1000) as i32,
899                                        )
900                                        .into())
901                                    }
902                                })
903                                .transpose()?,
904                            )
905                        }
906                        ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
907                            ScalarValue::IntervalMonthDayNano(
908                                data.map(|i| IntervalMonthDayNano::from(i).into()),
909                            )
910                        }
911                        _ => {
912                            return Err(invalid_parameter_error(
913                                "invalid_parameter_type",
914                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
915                            ));
916                        }
917                    }
918                } else {
919                    ScalarValue::IntervalMonthDayNano(
920                        data.map(|i| IntervalMonthDayNano::from(i).into()),
921                    )
922                }
923            }
924            &Type::BYTEA => {
925                let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
926                if let Some(server_type) = &server_type {
927                    match server_type {
928                        ConcreteDataType::String(_) => {
929                            ScalarValue::Utf8(data.map(|d| String::from_utf8_lossy(&d).to_string()))
930                        }
931                        ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
932                        _ => {
933                            return Err(invalid_parameter_error(
934                                "invalid_parameter_type",
935                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
936                            ));
937                        }
938                    }
939                } else {
940                    ScalarValue::Binary(data)
941                }
942            }
943            &Type::JSONB => {
944                let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
945                if let Some(server_type) = &server_type {
946                    match server_type {
947                        ConcreteDataType::Binary(_) => {
948                            ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
949                        }
950                        _ => {
951                            return Err(invalid_parameter_error(
952                                "invalid_parameter_type",
953                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
954                            ));
955                        }
956                    }
957                } else {
958                    ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
959                }
960            }
961            &Type::INT2_ARRAY => {
962                let data = portal.parameter::<Vec<i16>>(idx, &client_type)?;
963                if let Some(data) = data {
964                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
965                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
966                } else {
967                    ScalarValue::Null
968                }
969            }
970            &Type::INT4_ARRAY => {
971                let data = portal.parameter::<Vec<i32>>(idx, &client_type)?;
972                if let Some(data) = data {
973                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
974                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
975                } else {
976                    ScalarValue::Null
977                }
978            }
979            &Type::INT8_ARRAY => {
980                let data = portal.parameter::<Vec<i64>>(idx, &client_type)?;
981                if let Some(data) = data {
982                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
983                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
984                } else {
985                    ScalarValue::Null
986                }
987            }
988            &Type::VARCHAR_ARRAY => {
989                let data = portal.parameter::<Vec<String>>(idx, &client_type)?;
990                if let Some(data) = data {
991                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
992                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
993                } else {
994                    ScalarValue::Null
995                }
996            }
997            _ => Err(invalid_parameter_error(
998                "unsupported_parameter_value",
999                Some(format!("Found type: {}", client_type)),
1000            ))?,
1001        };
1002
1003        results.push(value);
1004    }
1005
1006    Ok(results)
1007}
1008
1009pub(super) fn param_types_to_pg_types(
1010    param_types: &HashMap<String, Option<ConcreteDataType>>,
1011) -> Result<Vec<Type>> {
1012    let param_count = param_types.len();
1013    let mut types = Vec::with_capacity(param_count);
1014    for i in 0..param_count {
1015        if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1016            let pg_type = type_gt_to_pg(param_type)?;
1017            types.push(pg_type);
1018        } else {
1019            types.push(Type::UNKNOWN);
1020        }
1021    }
1022    Ok(types)
1023}
1024
1025#[cfg(test)]
1026mod test {
1027    use std::sync::Arc;
1028
1029    use common_time::interval::IntervalUnit;
1030    use common_time::timestamp::TimeUnit;
1031    use common_time::Timestamp;
1032    use datatypes::schema::{ColumnSchema, Schema};
1033    use datatypes::value::ListValue;
1034    use pgwire::api::results::{FieldFormat, FieldInfo};
1035    use pgwire::api::Type;
1036    use session::context::QueryContextBuilder;
1037
1038    use super::*;
1039
1040    #[test]
1041    fn test_schema_convert() {
1042        let column_schemas = vec![
1043            ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1044            ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1045            ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1046            ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1047            ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1048            ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1049            ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1050            ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1051            ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1052            ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1053            ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1054            ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1055            ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1056            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1057            ColumnSchema::new(
1058                "timestamps",
1059                ConcreteDataType::timestamp_millisecond_datatype(),
1060                true,
1061            ),
1062            ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1063            ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1064            ColumnSchema::new(
1065                "intervals",
1066                ConcreteDataType::interval_month_day_nano_datatype(),
1067                true,
1068            ),
1069        ];
1070        let pg_field_info = vec![
1071            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1072            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1073            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1074            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1075            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1076            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1077            FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1078            FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1079            FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1080            FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1081            FieldInfo::new(
1082                "float32s".into(),
1083                None,
1084                None,
1085                Type::FLOAT4,
1086                FieldFormat::Text,
1087            ),
1088            FieldInfo::new(
1089                "float64s".into(),
1090                None,
1091                None,
1092                Type::FLOAT8,
1093                FieldFormat::Text,
1094            ),
1095            FieldInfo::new(
1096                "binaries".into(),
1097                None,
1098                None,
1099                Type::BYTEA,
1100                FieldFormat::Text,
1101            ),
1102            FieldInfo::new(
1103                "strings".into(),
1104                None,
1105                None,
1106                Type::VARCHAR,
1107                FieldFormat::Text,
1108            ),
1109            FieldInfo::new(
1110                "timestamps".into(),
1111                None,
1112                None,
1113                Type::TIMESTAMP,
1114                FieldFormat::Text,
1115            ),
1116            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1117            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1118            FieldInfo::new(
1119                "intervals".into(),
1120                None,
1121                None,
1122                Type::INTERVAL,
1123                FieldFormat::Text,
1124            ),
1125        ];
1126        let schema = Schema::new(column_schemas);
1127        let fs = schema_to_pg(&schema, &Format::UnifiedText).unwrap();
1128        assert_eq!(fs, pg_field_info);
1129    }
1130
1131    #[test]
1132    fn test_encode_text_format_data() {
1133        let schema = vec![
1134            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1135            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1136            FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1137            FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1138            FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1139            FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1140            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1141            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1142            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1143            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1144            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1145            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1146            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1147            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1148            FieldInfo::new(
1149                "float32s".into(),
1150                None,
1151                None,
1152                Type::FLOAT4,
1153                FieldFormat::Text,
1154            ),
1155            FieldInfo::new(
1156                "float32s".into(),
1157                None,
1158                None,
1159                Type::FLOAT4,
1160                FieldFormat::Text,
1161            ),
1162            FieldInfo::new(
1163                "float32s".into(),
1164                None,
1165                None,
1166                Type::FLOAT4,
1167                FieldFormat::Text,
1168            ),
1169            FieldInfo::new(
1170                "float64s".into(),
1171                None,
1172                None,
1173                Type::FLOAT8,
1174                FieldFormat::Text,
1175            ),
1176            FieldInfo::new(
1177                "float64s".into(),
1178                None,
1179                None,
1180                Type::FLOAT8,
1181                FieldFormat::Text,
1182            ),
1183            FieldInfo::new(
1184                "float64s".into(),
1185                None,
1186                None,
1187                Type::FLOAT8,
1188                FieldFormat::Text,
1189            ),
1190            FieldInfo::new(
1191                "strings".into(),
1192                None,
1193                None,
1194                Type::VARCHAR,
1195                FieldFormat::Text,
1196            ),
1197            FieldInfo::new(
1198                "binaries".into(),
1199                None,
1200                None,
1201                Type::BYTEA,
1202                FieldFormat::Text,
1203            ),
1204            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1205            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1206            FieldInfo::new(
1207                "timestamps".into(),
1208                None,
1209                None,
1210                Type::TIMESTAMP,
1211                FieldFormat::Text,
1212            ),
1213            FieldInfo::new(
1214                "interval_year_month".into(),
1215                None,
1216                None,
1217                Type::INTERVAL,
1218                FieldFormat::Text,
1219            ),
1220            FieldInfo::new(
1221                "interval_day_time".into(),
1222                None,
1223                None,
1224                Type::INTERVAL,
1225                FieldFormat::Text,
1226            ),
1227            FieldInfo::new(
1228                "interval_month_day_nano".into(),
1229                None,
1230                None,
1231                Type::INTERVAL,
1232                FieldFormat::Text,
1233            ),
1234            FieldInfo::new(
1235                "int_list".into(),
1236                None,
1237                None,
1238                Type::INT8_ARRAY,
1239                FieldFormat::Text,
1240            ),
1241            FieldInfo::new(
1242                "float_list".into(),
1243                None,
1244                None,
1245                Type::FLOAT8_ARRAY,
1246                FieldFormat::Text,
1247            ),
1248            FieldInfo::new(
1249                "string_list".into(),
1250                None,
1251                None,
1252                Type::VARCHAR_ARRAY,
1253                FieldFormat::Text,
1254            ),
1255            FieldInfo::new(
1256                "timestamp_list".into(),
1257                None,
1258                None,
1259                Type::TIMESTAMP_ARRAY,
1260                FieldFormat::Text,
1261            ),
1262        ];
1263
1264        let datatypes = vec![
1265            ConcreteDataType::null_datatype(),
1266            ConcreteDataType::boolean_datatype(),
1267            ConcreteDataType::uint8_datatype(),
1268            ConcreteDataType::uint16_datatype(),
1269            ConcreteDataType::uint32_datatype(),
1270            ConcreteDataType::uint64_datatype(),
1271            ConcreteDataType::int8_datatype(),
1272            ConcreteDataType::int8_datatype(),
1273            ConcreteDataType::int16_datatype(),
1274            ConcreteDataType::int16_datatype(),
1275            ConcreteDataType::int32_datatype(),
1276            ConcreteDataType::int32_datatype(),
1277            ConcreteDataType::int64_datatype(),
1278            ConcreteDataType::int64_datatype(),
1279            ConcreteDataType::float32_datatype(),
1280            ConcreteDataType::float32_datatype(),
1281            ConcreteDataType::float32_datatype(),
1282            ConcreteDataType::float64_datatype(),
1283            ConcreteDataType::float64_datatype(),
1284            ConcreteDataType::float64_datatype(),
1285            ConcreteDataType::string_datatype(),
1286            ConcreteDataType::binary_datatype(),
1287            ConcreteDataType::date_datatype(),
1288            ConcreteDataType::time_datatype(TimeUnit::Second),
1289            ConcreteDataType::timestamp_datatype(TimeUnit::Second),
1290            ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
1291            ConcreteDataType::interval_datatype(IntervalUnit::DayTime),
1292            ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
1293            ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()),
1294            ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
1295            ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
1296            ConcreteDataType::list_datatype(ConcreteDataType::timestamp_second_datatype()),
1297        ];
1298        let values = vec![
1299            Value::Null,
1300            Value::Boolean(true),
1301            Value::UInt8(u8::MAX),
1302            Value::UInt16(u16::MAX),
1303            Value::UInt32(u32::MAX),
1304            Value::UInt64(u64::MAX),
1305            Value::Int8(i8::MAX),
1306            Value::Int8(i8::MIN),
1307            Value::Int16(i16::MAX),
1308            Value::Int16(i16::MIN),
1309            Value::Int32(i32::MAX),
1310            Value::Int32(i32::MIN),
1311            Value::Int64(i64::MAX),
1312            Value::Int64(i64::MIN),
1313            Value::Float32(f32::MAX.into()),
1314            Value::Float32(f32::MIN.into()),
1315            Value::Float32(0f32.into()),
1316            Value::Float64(f64::MAX.into()),
1317            Value::Float64(f64::MIN.into()),
1318            Value::Float64(0f64.into()),
1319            Value::String("greptime".into()),
1320            Value::Binary("greptime".as_bytes().into()),
1321            Value::Date(1001i32.into()),
1322            Value::Time(1001i64.into()),
1323            Value::Timestamp(1000001i64.into()),
1324            Value::IntervalYearMonth(IntervalYearMonth::new(1)),
1325            Value::IntervalDayTime(IntervalDayTime::new(1, 10)),
1326            Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 10)),
1327            Value::List(ListValue::new(
1328                vec![Value::Int64(1i64)],
1329                ConcreteDataType::int64_datatype(),
1330            )),
1331            Value::List(ListValue::new(
1332                vec![Value::Float64(1.0f64.into())],
1333                ConcreteDataType::float64_datatype(),
1334            )),
1335            Value::List(ListValue::new(
1336                vec![Value::String("tom".into())],
1337                ConcreteDataType::string_datatype(),
1338            )),
1339            Value::List(ListValue::new(
1340                vec![Value::Timestamp(Timestamp::new(1i64, TimeUnit::Second))],
1341                ConcreteDataType::timestamp_second_datatype(),
1342            )),
1343        ];
1344        let query_context = QueryContextBuilder::default()
1345            .configuration_parameter(Default::default())
1346            .build()
1347            .into();
1348        let mut builder = DataRowEncoder::new(Arc::new(schema));
1349        for (value, datatype) in values.iter().zip(datatypes) {
1350            encode_value(&query_context, value, &mut builder, &datatype).unwrap();
1351        }
1352    }
1353
1354    #[test]
1355    fn test_invalid_parameter() {
1356        // test for refactor with PgErrorCode
1357        let msg = "invalid_parameter_count";
1358        let error = invalid_parameter_error(msg, None);
1359        if let PgWireError::UserError(value) = error {
1360            assert_eq!("ERROR", value.severity);
1361            assert_eq!("22023", value.code);
1362            assert_eq!(msg, value.message);
1363        } else {
1364            panic!("test_invalid_parameter failed");
1365        }
1366    }
1367}