servers/postgres/
types.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod bytea;
16mod datetime;
17mod error;
18mod interval;
19
20use std::collections::HashMap;
21use std::ops::Deref;
22
23use chrono::{DateTime, NaiveDate, NaiveDateTime, NaiveTime};
24use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datafusion_expr::LogicalPlan;
27use datatypes::arrow::datatypes::DataType as ArrowDataType;
28use datatypes::prelude::{ConcreteDataType, Value};
29use datatypes::schema::Schema;
30use datatypes::types::{IntervalType, TimestampType, json_type_value_to_string};
31use datatypes::value::{ListValue, StructValue};
32use pgwire::api::Type;
33use pgwire::api::portal::{Format, Portal};
34use pgwire::api::results::{DataRowEncoder, FieldInfo};
35use pgwire::error::{PgWireError, PgWireResult};
36use session::context::QueryContextRef;
37use session::session_config::PGByteaOutputValue;
38use snafu::ResultExt;
39
40use self::bytea::{EscapeOutputBytea, HexOutputBytea};
41use self::datetime::{StylingDate, StylingDateTime};
42pub use self::error::{PgErrorCode, PgErrorSeverity};
43use self::interval::PgInterval;
44use crate::SqlPlan;
45use crate::error::{self as server_error, DataFusionSnafu, Error, Result};
46use crate::postgres::utils::convert_err;
47
48pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result<Vec<FieldInfo>> {
49    origin
50        .column_schemas()
51        .iter()
52        .enumerate()
53        .map(|(idx, col)| {
54            Ok(FieldInfo::new(
55                col.name.clone(),
56                None,
57                None,
58                type_gt_to_pg(&col.data_type)?,
59                field_formats.format_for(idx),
60            ))
61        })
62        .collect::<Result<Vec<FieldInfo>>>()
63}
64
65fn encode_struct(
66    _query_ctx: &QueryContextRef,
67    _struct_value: &StructValue,
68    _builder: &mut DataRowEncoder,
69) -> PgWireResult<()> {
70    todo!("how to encode struct for postgres");
71}
72
73fn encode_array(
74    query_ctx: &QueryContextRef,
75    value_list: &ListValue,
76    builder: &mut DataRowEncoder,
77) -> PgWireResult<()> {
78    match value_list.datatype() {
79        &ConcreteDataType::Boolean(_) => {
80            let array = value_list
81                .items()
82                .iter()
83                .map(|v| match v {
84                    Value::Null => Ok(None),
85                    Value::Boolean(v) => Ok(Some(*v)),
86                    _ => Err(convert_err(Error::Internal {
87                        err_msg: format!("Invalid list item type, find {v:?}, expected bool",),
88                    })),
89                })
90                .collect::<PgWireResult<Vec<Option<bool>>>>()?;
91            builder.encode_field(&array)
92        }
93        &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => {
94            let array = value_list
95                .items()
96                .iter()
97                .map(|v| match v {
98                    Value::Null => Ok(None),
99                    Value::Int8(v) => Ok(Some(*v)),
100                    Value::UInt8(v) => Ok(Some(*v as i8)),
101                    _ => Err(convert_err(Error::Internal {
102                        err_msg: format!(
103                            "Invalid list item type, find {v:?}, expected int8 or uint8",
104                        ),
105                    })),
106                })
107                .collect::<PgWireResult<Vec<Option<i8>>>>()?;
108            builder.encode_field(&array)
109        }
110        &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => {
111            let array = value_list
112                .items()
113                .iter()
114                .map(|v| match v {
115                    Value::Null => Ok(None),
116                    Value::Int16(v) => Ok(Some(*v)),
117                    Value::UInt16(v) => Ok(Some(*v as i16)),
118                    _ => Err(convert_err(Error::Internal {
119                        err_msg: format!(
120                            "Invalid list item type, find {v:?}, expected int16 or uint16",
121                        ),
122                    })),
123                })
124                .collect::<PgWireResult<Vec<Option<i16>>>>()?;
125            builder.encode_field(&array)
126        }
127        &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => {
128            let array = value_list
129                .items()
130                .iter()
131                .map(|v| match v {
132                    Value::Null => Ok(None),
133                    Value::Int32(v) => Ok(Some(*v)),
134                    Value::UInt32(v) => Ok(Some(*v as i32)),
135                    _ => Err(convert_err(Error::Internal {
136                        err_msg: format!(
137                            "Invalid list item type, find {v:?}, expected int32 or uint32",
138                        ),
139                    })),
140                })
141                .collect::<PgWireResult<Vec<Option<i32>>>>()?;
142            builder.encode_field(&array)
143        }
144        &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => {
145            let array = value_list
146                .items()
147                .iter()
148                .map(|v| match v {
149                    Value::Null => Ok(None),
150                    Value::Int64(v) => Ok(Some(*v)),
151                    Value::UInt64(v) => Ok(Some(*v as i64)),
152                    _ => Err(convert_err(Error::Internal {
153                        err_msg: format!(
154                            "Invalid list item type, find {v:?}, expected int64 or uint64",
155                        ),
156                    })),
157                })
158                .collect::<PgWireResult<Vec<Option<i64>>>>()?;
159            builder.encode_field(&array)
160        }
161        &ConcreteDataType::Float32(_) => {
162            let array = value_list
163                .items()
164                .iter()
165                .map(|v| match v {
166                    Value::Null => Ok(None),
167                    Value::Float32(v) => Ok(Some(v.0)),
168                    _ => Err(convert_err(Error::Internal {
169                        err_msg: format!("Invalid list item type, find {v:?}, expected float32",),
170                    })),
171                })
172                .collect::<PgWireResult<Vec<Option<f32>>>>()?;
173            builder.encode_field(&array)
174        }
175        &ConcreteDataType::Float64(_) => {
176            let array = value_list
177                .items()
178                .iter()
179                .map(|v| match v {
180                    Value::Null => Ok(None),
181                    Value::Float64(v) => Ok(Some(v.0)),
182                    _ => Err(convert_err(Error::Internal {
183                        err_msg: format!("Invalid list item type, find {v:?}, expected float64",),
184                    })),
185                })
186                .collect::<PgWireResult<Vec<Option<f64>>>>()?;
187            builder.encode_field(&array)
188        }
189        &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => {
190            let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
191
192            match *bytea_output {
193                PGByteaOutputValue::ESCAPE => {
194                    let array = value_list
195                        .items()
196                        .iter()
197                        .map(|v| match v {
198                            Value::Null => Ok(None),
199                            Value::Binary(v) => Ok(Some(EscapeOutputBytea(v.deref()))),
200
201                            _ => Err(convert_err(Error::Internal {
202                                err_msg: format!(
203                                    "Invalid list item type, find {v:?}, expected binary",
204                                ),
205                            })),
206                        })
207                        .collect::<PgWireResult<Vec<Option<EscapeOutputBytea>>>>()?;
208                    builder.encode_field(&array)
209                }
210                PGByteaOutputValue::HEX => {
211                    let array = value_list
212                        .items()
213                        .iter()
214                        .map(|v| match v {
215                            Value::Null => Ok(None),
216                            Value::Binary(v) => Ok(Some(HexOutputBytea(v.deref()))),
217
218                            _ => Err(convert_err(Error::Internal {
219                                err_msg: format!(
220                                    "Invalid list item type, find {v:?}, expected binary",
221                                ),
222                            })),
223                        })
224                        .collect::<PgWireResult<Vec<Option<HexOutputBytea>>>>()?;
225                    builder.encode_field(&array)
226                }
227            }
228        }
229        &ConcreteDataType::String(_) => {
230            let array = value_list
231                .items()
232                .iter()
233                .map(|v| match v {
234                    Value::Null => Ok(None),
235                    Value::String(v) => Ok(Some(v.as_utf8())),
236                    _ => Err(convert_err(Error::Internal {
237                        err_msg: format!("Invalid list item type, find {v:?}, expected string",),
238                    })),
239                })
240                .collect::<PgWireResult<Vec<Option<&str>>>>()?;
241            builder.encode_field(&array)
242        }
243        &ConcreteDataType::Date(_) => {
244            let array = value_list
245                .items()
246                .iter()
247                .map(|v| match v {
248                    Value::Null => Ok(None),
249                    Value::Date(v) => {
250                        if let Some(date) = v.to_chrono_date() {
251                            let (style, order) =
252                                *query_ctx.configuration_parameter().pg_datetime_style();
253                            Ok(Some(StylingDate(date, style, order)))
254                        } else {
255                            Err(convert_err(Error::Internal {
256                                err_msg: format!("Failed to convert date to postgres type {v:?}",),
257                            }))
258                        }
259                    }
260                    _ => Err(convert_err(Error::Internal {
261                        err_msg: format!("Invalid list item type, find {v:?}, expected date",),
262                    })),
263                })
264                .collect::<PgWireResult<Vec<Option<StylingDate>>>>()?;
265            builder.encode_field(&array)
266        }
267        &ConcreteDataType::Timestamp(_) => {
268            let array = value_list
269                .items()
270                .iter()
271                .map(|v| match v {
272                    Value::Null => Ok(None),
273                    Value::Timestamp(v) => {
274                        if let Some(datetime) =
275                            v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
276                        {
277                            let (style, order) =
278                                *query_ctx.configuration_parameter().pg_datetime_style();
279                            Ok(Some(StylingDateTime(datetime, style, order)))
280                        } else {
281                            Err(convert_err(Error::Internal {
282                                err_msg: format!("Failed to convert date to postgres type {v:?}",),
283                            }))
284                        }
285                    }
286                    _ => Err(convert_err(Error::Internal {
287                        err_msg: format!("Invalid list item type, find {v:?}, expected timestamp",),
288                    })),
289                })
290                .collect::<PgWireResult<Vec<Option<StylingDateTime>>>>()?;
291            builder.encode_field(&array)
292        }
293        &ConcreteDataType::Time(_) => {
294            let array = value_list
295                .items()
296                .iter()
297                .map(|v| match v {
298                    Value::Null => Ok(None),
299                    Value::Time(v) => Ok(v.to_chrono_time()),
300                    _ => Err(convert_err(Error::Internal {
301                        err_msg: format!("Invalid list item type, find {v:?}, expected time",),
302                    })),
303                })
304                .collect::<PgWireResult<Vec<Option<NaiveTime>>>>()?;
305            builder.encode_field(&array)
306        }
307        &ConcreteDataType::Interval(_) => {
308            let array = value_list
309                .items()
310                .iter()
311                .map(|v| match v {
312                    Value::Null => Ok(None),
313                    Value::IntervalYearMonth(v) => Ok(Some(PgInterval::from(*v))),
314                    Value::IntervalDayTime(v) => Ok(Some(PgInterval::from(*v))),
315                    Value::IntervalMonthDayNano(v) => Ok(Some(PgInterval::from(*v))),
316                    _ => Err(convert_err(Error::Internal {
317                        err_msg: format!("Invalid list item type, find {v:?}, expected interval",),
318                    })),
319                })
320                .collect::<PgWireResult<Vec<Option<PgInterval>>>>()?;
321            builder.encode_field(&array)
322        }
323        &ConcreteDataType::Decimal128(_) => {
324            let array = value_list
325                .items()
326                .iter()
327                .map(|v| match v {
328                    Value::Null => Ok(None),
329                    Value::Decimal128(v) => Ok(Some(v.to_string())),
330                    _ => Err(convert_err(Error::Internal {
331                        err_msg: format!("Invalid list item type, find {v:?}, expected decimal",),
332                    })),
333                })
334                .collect::<PgWireResult<Vec<Option<String>>>>()?;
335            builder.encode_field(&array)
336        }
337        &ConcreteDataType::Json(j) => {
338            let array = value_list
339                .items()
340                .iter()
341                .map(|v| match v {
342                    Value::Null => Ok(None),
343                    Value::Binary(v) => {
344                        let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
345                        Ok(Some(s))
346                    }
347                    _ => Err(convert_err(Error::Internal {
348                        err_msg: format!("Invalid list item type, find {v:?}, expected json",),
349                    })),
350                })
351                .collect::<PgWireResult<Vec<Option<String>>>>()?;
352            builder.encode_field(&array)
353        }
354        _ => Err(convert_err(Error::Internal {
355            err_msg: format!(
356                "cannot write array type {:?} in postgres protocol: unimplemented",
357                value_list.datatype()
358            ),
359        })),
360    }
361}
362
363pub(super) fn encode_value(
364    query_ctx: &QueryContextRef,
365    value: &Value,
366    builder: &mut DataRowEncoder,
367    datatype: &ConcreteDataType,
368) -> PgWireResult<()> {
369    match value {
370        Value::Null => builder.encode_field(&None::<&i8>),
371        Value::Boolean(v) => builder.encode_field(v),
372        Value::UInt8(v) => builder.encode_field(&(*v as i8)),
373        Value::UInt16(v) => builder.encode_field(&(*v as i16)),
374        Value::UInt32(v) => builder.encode_field(v),
375        Value::UInt64(v) => builder.encode_field(&(*v as i64)),
376        Value::Int8(v) => builder.encode_field(v),
377        Value::Int16(v) => builder.encode_field(v),
378        Value::Int32(v) => builder.encode_field(v),
379        Value::Int64(v) => builder.encode_field(v),
380        Value::Float32(v) => builder.encode_field(&v.0),
381        Value::Float64(v) => builder.encode_field(&v.0),
382        Value::String(v) => builder.encode_field(&v.as_utf8()),
383        Value::Binary(v) => match datatype {
384            ConcreteDataType::Json(j) => {
385                let s = json_type_value_to_string(v, &j.format).map_err(convert_err)?;
386                builder.encode_field(&s)
387            }
388            _ => {
389                let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
390                match *bytea_output {
391                    PGByteaOutputValue::ESCAPE => {
392                        builder.encode_field(&EscapeOutputBytea(v.deref()))
393                    }
394                    PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
395                }
396            }
397        },
398        Value::Date(v) => {
399            if let Some(date) = v.to_chrono_date() {
400                let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
401                builder.encode_field(&StylingDate(date, style, order))
402            } else {
403                Err(convert_err(Error::Internal {
404                    err_msg: format!("Failed to convert date to postgres type {v:?}",),
405                }))
406            }
407        }
408        Value::Timestamp(v) => {
409            if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
410            {
411                let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
412                builder.encode_field(&StylingDateTime(datetime, style, order))
413            } else {
414                Err(convert_err(Error::Internal {
415                    err_msg: format!("Failed to convert date to postgres type {v:?}",),
416                }))
417            }
418        }
419        Value::Time(v) => {
420            if let Some(time) = v.to_chrono_time() {
421                builder.encode_field(&time)
422            } else {
423                Err(convert_err(Error::Internal {
424                    err_msg: format!("Failed to convert time to postgres type {v:?}",),
425                }))
426            }
427        }
428        Value::IntervalYearMonth(v) => builder.encode_field(&PgInterval::from(*v)),
429        Value::IntervalDayTime(v) => builder.encode_field(&PgInterval::from(*v)),
430        Value::IntervalMonthDayNano(v) => builder.encode_field(&PgInterval::from(*v)),
431        Value::Decimal128(v) => builder.encode_field(&v.to_string()),
432        Value::Duration(d) => match PgInterval::try_from(*d) {
433            Ok(i) => builder.encode_field(&i),
434            Err(e) => Err(convert_err(Error::Internal {
435                err_msg: e.to_string(),
436            })),
437        },
438        Value::List(values) => encode_array(query_ctx, values, builder),
439        Value::Struct(values) => encode_struct(query_ctx, values, builder),
440    }
441}
442
443pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
444    match origin {
445        &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
446        &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
447        &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR),
448        &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2),
449        &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4),
450        &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8),
451        &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
452        &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
453        &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
454        &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
455        &ConcreteDataType::Date(_) => Ok(Type::DATE),
456        &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
457        &ConcreteDataType::Time(_) => Ok(Type::TIME),
458        &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
459        &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
460        &ConcreteDataType::Json(_) => Ok(Type::JSON),
461        ConcreteDataType::List(list) => match list.item_type() {
462            &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
463            &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
464            &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR_ARRAY),
465            &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2_ARRAY),
466            &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4_ARRAY),
467            &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8_ARRAY),
468            &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
469            &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
470            &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
471            &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
472            &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
473            &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
474            &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
475            &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
476            &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
477            &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
478            &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
479            // TODO(sunng87) we may treat list/array as json directly so we can
480            // support deeply nested data structures
481            &ConcreteDataType::Struct(_) => Ok(Type::RECORD_ARRAY),
482            &ConcreteDataType::Dictionary(_)
483            | &ConcreteDataType::Vector(_)
484            | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
485                data_type: origin,
486                reason: "not implemented",
487            }
488            .fail(),
489        },
490        &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
491            data_type: origin,
492            reason: "not implemented",
493        }
494        .fail(),
495        &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
496        &ConcreteDataType::Struct(_) => Ok(Type::RECORD),
497    }
498}
499
500#[allow(dead_code)]
501pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
502    // Note that we only support a small amount of pg data types
503    match origin {
504        &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
505        &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
506        &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
507        &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
508        &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
509        &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
510        &Type::TIMESTAMP => Ok(ConcreteDataType::timestamp_datatype(
511            common_time::timestamp::TimeUnit::Millisecond,
512        )),
513        &Type::DATE => Ok(ConcreteDataType::date_datatype()),
514        &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
515            common_time::timestamp::TimeUnit::Microsecond,
516        )),
517        &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
518            ConcreteDataType::int8_datatype(),
519        )),
520        &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(
521            ConcreteDataType::int16_datatype(),
522        )),
523        &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(
524            ConcreteDataType::int32_datatype(),
525        )),
526        &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(
527            ConcreteDataType::int64_datatype(),
528        )),
529        &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
530            ConcreteDataType::string_datatype(),
531        )),
532        _ => server_error::InternalSnafu {
533            err_msg: format!("unimplemented datatype {origin:?}"),
534        }
535        .fail(),
536    }
537}
538
539pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
540    // the index is managed from portal's parameters count so it's safe to
541    // unwrap here.
542    let param_type = portal.statement.parameter_types.get(idx).unwrap();
543    match param_type {
544        &Type::VARCHAR | &Type::TEXT => Ok(format!(
545            "'{}'",
546            portal
547                .parameter::<String>(idx, param_type)?
548                .as_deref()
549                .unwrap_or("")
550        )),
551        &Type::BOOL => Ok(portal
552            .parameter::<bool>(idx, param_type)?
553            .map(|v| v.to_string())
554            .unwrap_or_else(|| "".to_owned())),
555        &Type::INT4 => Ok(portal
556            .parameter::<i32>(idx, param_type)?
557            .map(|v| v.to_string())
558            .unwrap_or_else(|| "".to_owned())),
559        &Type::INT8 => Ok(portal
560            .parameter::<i64>(idx, param_type)?
561            .map(|v| v.to_string())
562            .unwrap_or_else(|| "".to_owned())),
563        &Type::FLOAT4 => Ok(portal
564            .parameter::<f32>(idx, param_type)?
565            .map(|v| v.to_string())
566            .unwrap_or_else(|| "".to_owned())),
567        &Type::FLOAT8 => Ok(portal
568            .parameter::<f64>(idx, param_type)?
569            .map(|v| v.to_string())
570            .unwrap_or_else(|| "".to_owned())),
571        &Type::DATE => Ok(portal
572            .parameter::<NaiveDate>(idx, param_type)?
573            .map(|v| v.format("%Y-%m-%d").to_string())
574            .unwrap_or_else(|| "".to_owned())),
575        &Type::TIMESTAMP => Ok(portal
576            .parameter::<NaiveDateTime>(idx, param_type)?
577            .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
578            .unwrap_or_else(|| "".to_owned())),
579        &Type::INTERVAL => Ok(portal
580            .parameter::<PgInterval>(idx, param_type)?
581            .map(|v| v.to_string())
582            .unwrap_or_else(|| "".to_owned())),
583        _ => Err(invalid_parameter_error(
584            "unsupported_parameter_type",
585            Some(param_type.to_string()),
586        )),
587    }
588}
589
590pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
591    let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
592    error_info.detail = detail;
593    PgWireError::UserError(Box::new(error_info))
594}
595
596fn to_timestamp_scalar_value<T>(
597    data: Option<T>,
598    unit: &TimestampType,
599    ctype: &ConcreteDataType,
600) -> PgWireResult<ScalarValue>
601where
602    T: Into<i64>,
603{
604    if let Some(n) = data {
605        Value::Timestamp(unit.create_timestamp(n.into()))
606            .try_to_scalar_value(ctype)
607            .map_err(convert_err)
608    } else {
609        Ok(ScalarValue::Null)
610    }
611}
612
613pub(super) fn parameters_to_scalar_values(
614    plan: &LogicalPlan,
615    portal: &Portal<SqlPlan>,
616) -> PgWireResult<Vec<ScalarValue>> {
617    let param_count = portal.parameter_len();
618    let mut results = Vec::with_capacity(param_count);
619
620    let client_param_types = &portal.statement.parameter_types;
621    let param_types = plan
622        .get_parameter_types()
623        .context(DataFusionSnafu)
624        .map_err(convert_err)?
625        .into_iter()
626        .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
627        .collect::<HashMap<_, _>>();
628
629    for idx in 0..param_count {
630        let server_type = param_types
631            .get(&format!("${}", idx + 1))
632            .and_then(|t| t.as_ref());
633
634        let client_type = if let Some(client_given_type) = client_param_types.get(idx) {
635            client_given_type.clone()
636        } else if let Some(server_provided_type) = &server_type {
637            type_gt_to_pg(server_provided_type).map_err(convert_err)?
638        } else {
639            return Err(invalid_parameter_error(
640                "unknown_parameter_type",
641                Some(format!(
642                    "Cannot get parameter type information for parameter {}",
643                    idx
644                )),
645            ));
646        };
647
648        let value = match &client_type {
649            &Type::VARCHAR | &Type::TEXT => {
650                let data = portal.parameter::<String>(idx, &client_type)?;
651                if let Some(server_type) = &server_type {
652                    match server_type {
653                        ConcreteDataType::String(_) => ScalarValue::Utf8(data),
654                        _ => {
655                            return Err(invalid_parameter_error(
656                                "invalid_parameter_type",
657                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
658                            ));
659                        }
660                    }
661                } else {
662                    ScalarValue::Utf8(data)
663                }
664            }
665            &Type::BOOL => {
666                let data = portal.parameter::<bool>(idx, &client_type)?;
667                if let Some(server_type) = &server_type {
668                    match server_type {
669                        ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
670                        _ => {
671                            return Err(invalid_parameter_error(
672                                "invalid_parameter_type",
673                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
674                            ));
675                        }
676                    }
677                } else {
678                    ScalarValue::Boolean(data)
679                }
680            }
681            &Type::INT2 => {
682                let data = portal.parameter::<i16>(idx, &client_type)?;
683                if let Some(server_type) = &server_type {
684                    match server_type {
685                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
686                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
687                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
688                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
689                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
690                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
691                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
692                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
693                        ConcreteDataType::Timestamp(unit) => {
694                            to_timestamp_scalar_value(data, unit, server_type)?
695                        }
696                        _ => {
697                            return Err(invalid_parameter_error(
698                                "invalid_parameter_type",
699                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
700                            ));
701                        }
702                    }
703                } else {
704                    ScalarValue::Int16(data)
705                }
706            }
707            &Type::INT4 => {
708                let data = portal.parameter::<i32>(idx, &client_type)?;
709                if let Some(server_type) = &server_type {
710                    match server_type {
711                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
712                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
713                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
714                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
715                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
716                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
717                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
718                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
719                        ConcreteDataType::Timestamp(unit) => {
720                            to_timestamp_scalar_value(data, unit, server_type)?
721                        }
722                        _ => {
723                            return Err(invalid_parameter_error(
724                                "invalid_parameter_type",
725                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
726                            ));
727                        }
728                    }
729                } else {
730                    ScalarValue::Int32(data)
731                }
732            }
733            &Type::INT8 => {
734                let data = portal.parameter::<i64>(idx, &client_type)?;
735                if let Some(server_type) = &server_type {
736                    match server_type {
737                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
738                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
739                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
740                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
741                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
742                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
743                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
744                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
745                        ConcreteDataType::Timestamp(unit) => {
746                            to_timestamp_scalar_value(data, unit, server_type)?
747                        }
748                        _ => {
749                            return Err(invalid_parameter_error(
750                                "invalid_parameter_type",
751                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
752                            ));
753                        }
754                    }
755                } else {
756                    ScalarValue::Int64(data)
757                }
758            }
759            &Type::FLOAT4 => {
760                let data = portal.parameter::<f32>(idx, &client_type)?;
761                if let Some(server_type) = &server_type {
762                    match server_type {
763                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
764                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
765                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
766                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
767                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
768                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
769                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
770                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
771                        ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
772                        ConcreteDataType::Float64(_) => {
773                            ScalarValue::Float64(data.map(|n| n as f64))
774                        }
775                        _ => {
776                            return Err(invalid_parameter_error(
777                                "invalid_parameter_type",
778                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
779                            ));
780                        }
781                    }
782                } else {
783                    ScalarValue::Float32(data)
784                }
785            }
786            &Type::FLOAT8 => {
787                let data = portal.parameter::<f64>(idx, &client_type)?;
788                if let Some(server_type) = &server_type {
789                    match server_type {
790                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
791                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
792                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
793                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
794                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
795                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
796                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
797                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
798                        ConcreteDataType::Float32(_) => {
799                            ScalarValue::Float32(data.map(|n| n as f32))
800                        }
801                        ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
802                        _ => {
803                            return Err(invalid_parameter_error(
804                                "invalid_parameter_type",
805                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
806                            ));
807                        }
808                    }
809                } else {
810                    ScalarValue::Float64(data)
811                }
812            }
813            &Type::TIMESTAMP => {
814                let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
815                if let Some(server_type) = &server_type {
816                    match server_type {
817                        ConcreteDataType::Timestamp(unit) => match *unit {
818                            TimestampType::Second(_) => ScalarValue::TimestampSecond(
819                                data.map(|ts| ts.and_utc().timestamp()),
820                                None,
821                            ),
822                            TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
823                                data.map(|ts| ts.and_utc().timestamp_millis()),
824                                None,
825                            ),
826                            TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
827                                data.map(|ts| ts.and_utc().timestamp_micros()),
828                                None,
829                            ),
830                            TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
831                                data.map(|ts| ts.and_utc().timestamp_micros()),
832                                None,
833                            ),
834                        },
835                        _ => {
836                            return Err(invalid_parameter_error(
837                                "invalid_parameter_type",
838                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
839                            ));
840                        }
841                    }
842                } else {
843                    ScalarValue::TimestampMillisecond(
844                        data.map(|ts| ts.and_utc().timestamp_millis()),
845                        None,
846                    )
847                }
848            }
849            &Type::DATE => {
850                let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
851                if let Some(server_type) = &server_type {
852                    match server_type {
853                        ConcreteDataType::Date(_) => ScalarValue::Date32(
854                            data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
855                        ),
856                        _ => {
857                            return Err(invalid_parameter_error(
858                                "invalid_parameter_type",
859                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
860                            ));
861                        }
862                    }
863                } else {
864                    ScalarValue::Date32(
865                        data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
866                    )
867                }
868            }
869            &Type::INTERVAL => {
870                let data = portal.parameter::<PgInterval>(idx, &client_type)?;
871                if let Some(server_type) = &server_type {
872                    match server_type {
873                        ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
874                            ScalarValue::IntervalYearMonth(
875                                data.map(|i| {
876                                    if i.days != 0 || i.microseconds != 0 {
877                                        Err(invalid_parameter_error(
878                                            "invalid_parameter_type",
879                                            Some(format!(
880                                                "Expected: {}, found: {}",
881                                                server_type, client_type
882                                            )),
883                                        ))
884                                    } else {
885                                        Ok(IntervalYearMonth::new(i.months).to_i32())
886                                    }
887                                })
888                                .transpose()?,
889                            )
890                        }
891                        ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
892                            ScalarValue::IntervalDayTime(
893                                data.map(|i| {
894                                    if i.months != 0 || i.microseconds % 1000 != 0 {
895                                        Err(invalid_parameter_error(
896                                            "invalid_parameter_type",
897                                            Some(format!(
898                                                "Expected: {}, found: {}",
899                                                server_type, client_type
900                                            )),
901                                        ))
902                                    } else {
903                                        Ok(IntervalDayTime::new(
904                                            i.days,
905                                            (i.microseconds / 1000) as i32,
906                                        )
907                                        .into())
908                                    }
909                                })
910                                .transpose()?,
911                            )
912                        }
913                        ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
914                            ScalarValue::IntervalMonthDayNano(
915                                data.map(|i| IntervalMonthDayNano::from(i).into()),
916                            )
917                        }
918                        _ => {
919                            return Err(invalid_parameter_error(
920                                "invalid_parameter_type",
921                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
922                            ));
923                        }
924                    }
925                } else {
926                    ScalarValue::IntervalMonthDayNano(
927                        data.map(|i| IntervalMonthDayNano::from(i).into()),
928                    )
929                }
930            }
931            &Type::BYTEA => {
932                let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
933                if let Some(server_type) = &server_type {
934                    match server_type {
935                        ConcreteDataType::String(_) => {
936                            ScalarValue::Utf8(data.map(|d| String::from_utf8_lossy(&d).to_string()))
937                        }
938                        ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
939                        _ => {
940                            return Err(invalid_parameter_error(
941                                "invalid_parameter_type",
942                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
943                            ));
944                        }
945                    }
946                } else {
947                    ScalarValue::Binary(data)
948                }
949            }
950            &Type::JSONB => {
951                let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
952                if let Some(server_type) = &server_type {
953                    match server_type {
954                        ConcreteDataType::Binary(_) => {
955                            ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
956                        }
957                        _ => {
958                            return Err(invalid_parameter_error(
959                                "invalid_parameter_type",
960                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
961                            ));
962                        }
963                    }
964                } else {
965                    ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
966                }
967            }
968            &Type::INT2_ARRAY => {
969                let data = portal.parameter::<Vec<i16>>(idx, &client_type)?;
970                if let Some(data) = data {
971                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
972                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
973                } else {
974                    ScalarValue::Null
975                }
976            }
977            &Type::INT4_ARRAY => {
978                let data = portal.parameter::<Vec<i32>>(idx, &client_type)?;
979                if let Some(data) = data {
980                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
981                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
982                } else {
983                    ScalarValue::Null
984                }
985            }
986            &Type::INT8_ARRAY => {
987                let data = portal.parameter::<Vec<i64>>(idx, &client_type)?;
988                if let Some(data) = data {
989                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
990                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
991                } else {
992                    ScalarValue::Null
993                }
994            }
995            &Type::VARCHAR_ARRAY => {
996                let data = portal.parameter::<Vec<String>>(idx, &client_type)?;
997                if let Some(data) = data {
998                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
999                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
1000                } else {
1001                    ScalarValue::Null
1002                }
1003            }
1004            _ => Err(invalid_parameter_error(
1005                "unsupported_parameter_value",
1006                Some(format!("Found type: {}", client_type)),
1007            ))?,
1008        };
1009
1010        results.push(value);
1011    }
1012
1013    Ok(results)
1014}
1015
1016pub(super) fn param_types_to_pg_types(
1017    param_types: &HashMap<String, Option<ConcreteDataType>>,
1018) -> Result<Vec<Type>> {
1019    let param_count = param_types.len();
1020    let mut types = Vec::with_capacity(param_count);
1021    for i in 0..param_count {
1022        if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1023            let pg_type = type_gt_to_pg(param_type)?;
1024            types.push(pg_type);
1025        } else {
1026            types.push(Type::UNKNOWN);
1027        }
1028    }
1029    Ok(types)
1030}
1031
1032#[cfg(test)]
1033mod test {
1034    use std::sync::Arc;
1035
1036    use common_time::Timestamp;
1037    use common_time::interval::IntervalUnit;
1038    use common_time::timestamp::TimeUnit;
1039    use datatypes::schema::{ColumnSchema, Schema};
1040    use datatypes::value::ListValue;
1041    use pgwire::api::Type;
1042    use pgwire::api::results::{FieldFormat, FieldInfo};
1043    use session::context::QueryContextBuilder;
1044
1045    use super::*;
1046
1047    #[test]
1048    fn test_schema_convert() {
1049        let column_schemas = vec![
1050            ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1051            ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1052            ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1053            ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1054            ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1055            ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1056            ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1057            ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1058            ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1059            ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1060            ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1061            ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1062            ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1063            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1064            ColumnSchema::new(
1065                "timestamps",
1066                ConcreteDataType::timestamp_millisecond_datatype(),
1067                true,
1068            ),
1069            ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1070            ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1071            ColumnSchema::new(
1072                "intervals",
1073                ConcreteDataType::interval_month_day_nano_datatype(),
1074                true,
1075            ),
1076        ];
1077        let pg_field_info = vec![
1078            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1079            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1080            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1081            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1082            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1083            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1084            FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1085            FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1086            FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1087            FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1088            FieldInfo::new(
1089                "float32s".into(),
1090                None,
1091                None,
1092                Type::FLOAT4,
1093                FieldFormat::Text,
1094            ),
1095            FieldInfo::new(
1096                "float64s".into(),
1097                None,
1098                None,
1099                Type::FLOAT8,
1100                FieldFormat::Text,
1101            ),
1102            FieldInfo::new(
1103                "binaries".into(),
1104                None,
1105                None,
1106                Type::BYTEA,
1107                FieldFormat::Text,
1108            ),
1109            FieldInfo::new(
1110                "strings".into(),
1111                None,
1112                None,
1113                Type::VARCHAR,
1114                FieldFormat::Text,
1115            ),
1116            FieldInfo::new(
1117                "timestamps".into(),
1118                None,
1119                None,
1120                Type::TIMESTAMP,
1121                FieldFormat::Text,
1122            ),
1123            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1124            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1125            FieldInfo::new(
1126                "intervals".into(),
1127                None,
1128                None,
1129                Type::INTERVAL,
1130                FieldFormat::Text,
1131            ),
1132        ];
1133        let schema = Schema::new(column_schemas);
1134        let fs = schema_to_pg(&schema, &Format::UnifiedText).unwrap();
1135        assert_eq!(fs, pg_field_info);
1136    }
1137
1138    #[test]
1139    fn test_encode_text_format_data() {
1140        let schema = vec![
1141            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1142            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1143            FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1144            FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1145            FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1146            FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1147            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1148            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1149            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1150            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1151            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1152            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1153            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1154            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1155            FieldInfo::new(
1156                "float32s".into(),
1157                None,
1158                None,
1159                Type::FLOAT4,
1160                FieldFormat::Text,
1161            ),
1162            FieldInfo::new(
1163                "float32s".into(),
1164                None,
1165                None,
1166                Type::FLOAT4,
1167                FieldFormat::Text,
1168            ),
1169            FieldInfo::new(
1170                "float32s".into(),
1171                None,
1172                None,
1173                Type::FLOAT4,
1174                FieldFormat::Text,
1175            ),
1176            FieldInfo::new(
1177                "float64s".into(),
1178                None,
1179                None,
1180                Type::FLOAT8,
1181                FieldFormat::Text,
1182            ),
1183            FieldInfo::new(
1184                "float64s".into(),
1185                None,
1186                None,
1187                Type::FLOAT8,
1188                FieldFormat::Text,
1189            ),
1190            FieldInfo::new(
1191                "float64s".into(),
1192                None,
1193                None,
1194                Type::FLOAT8,
1195                FieldFormat::Text,
1196            ),
1197            FieldInfo::new(
1198                "strings".into(),
1199                None,
1200                None,
1201                Type::VARCHAR,
1202                FieldFormat::Text,
1203            ),
1204            FieldInfo::new(
1205                "binaries".into(),
1206                None,
1207                None,
1208                Type::BYTEA,
1209                FieldFormat::Text,
1210            ),
1211            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1212            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1213            FieldInfo::new(
1214                "timestamps".into(),
1215                None,
1216                None,
1217                Type::TIMESTAMP,
1218                FieldFormat::Text,
1219            ),
1220            FieldInfo::new(
1221                "interval_year_month".into(),
1222                None,
1223                None,
1224                Type::INTERVAL,
1225                FieldFormat::Text,
1226            ),
1227            FieldInfo::new(
1228                "interval_day_time".into(),
1229                None,
1230                None,
1231                Type::INTERVAL,
1232                FieldFormat::Text,
1233            ),
1234            FieldInfo::new(
1235                "interval_month_day_nano".into(),
1236                None,
1237                None,
1238                Type::INTERVAL,
1239                FieldFormat::Text,
1240            ),
1241            FieldInfo::new(
1242                "int_list".into(),
1243                None,
1244                None,
1245                Type::INT8_ARRAY,
1246                FieldFormat::Text,
1247            ),
1248            FieldInfo::new(
1249                "float_list".into(),
1250                None,
1251                None,
1252                Type::FLOAT8_ARRAY,
1253                FieldFormat::Text,
1254            ),
1255            FieldInfo::new(
1256                "string_list".into(),
1257                None,
1258                None,
1259                Type::VARCHAR_ARRAY,
1260                FieldFormat::Text,
1261            ),
1262            FieldInfo::new(
1263                "timestamp_list".into(),
1264                None,
1265                None,
1266                Type::TIMESTAMP_ARRAY,
1267                FieldFormat::Text,
1268            ),
1269        ];
1270
1271        let datatypes = vec![
1272            ConcreteDataType::null_datatype(),
1273            ConcreteDataType::boolean_datatype(),
1274            ConcreteDataType::uint8_datatype(),
1275            ConcreteDataType::uint16_datatype(),
1276            ConcreteDataType::uint32_datatype(),
1277            ConcreteDataType::uint64_datatype(),
1278            ConcreteDataType::int8_datatype(),
1279            ConcreteDataType::int8_datatype(),
1280            ConcreteDataType::int16_datatype(),
1281            ConcreteDataType::int16_datatype(),
1282            ConcreteDataType::int32_datatype(),
1283            ConcreteDataType::int32_datatype(),
1284            ConcreteDataType::int64_datatype(),
1285            ConcreteDataType::int64_datatype(),
1286            ConcreteDataType::float32_datatype(),
1287            ConcreteDataType::float32_datatype(),
1288            ConcreteDataType::float32_datatype(),
1289            ConcreteDataType::float64_datatype(),
1290            ConcreteDataType::float64_datatype(),
1291            ConcreteDataType::float64_datatype(),
1292            ConcreteDataType::string_datatype(),
1293            ConcreteDataType::binary_datatype(),
1294            ConcreteDataType::date_datatype(),
1295            ConcreteDataType::time_datatype(TimeUnit::Second),
1296            ConcreteDataType::timestamp_datatype(TimeUnit::Second),
1297            ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
1298            ConcreteDataType::interval_datatype(IntervalUnit::DayTime),
1299            ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
1300            ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()),
1301            ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
1302            ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
1303            ConcreteDataType::list_datatype(ConcreteDataType::timestamp_second_datatype()),
1304        ];
1305        let values = vec![
1306            Value::Null,
1307            Value::Boolean(true),
1308            Value::UInt8(u8::MAX),
1309            Value::UInt16(u16::MAX),
1310            Value::UInt32(u32::MAX),
1311            Value::UInt64(u64::MAX),
1312            Value::Int8(i8::MAX),
1313            Value::Int8(i8::MIN),
1314            Value::Int16(i16::MAX),
1315            Value::Int16(i16::MIN),
1316            Value::Int32(i32::MAX),
1317            Value::Int32(i32::MIN),
1318            Value::Int64(i64::MAX),
1319            Value::Int64(i64::MIN),
1320            Value::Float32(f32::MAX.into()),
1321            Value::Float32(f32::MIN.into()),
1322            Value::Float32(0f32.into()),
1323            Value::Float64(f64::MAX.into()),
1324            Value::Float64(f64::MIN.into()),
1325            Value::Float64(0f64.into()),
1326            Value::String("greptime".into()),
1327            Value::Binary("greptime".as_bytes().into()),
1328            Value::Date(1001i32.into()),
1329            Value::Time(1001i64.into()),
1330            Value::Timestamp(1000001i64.into()),
1331            Value::IntervalYearMonth(IntervalYearMonth::new(1)),
1332            Value::IntervalDayTime(IntervalDayTime::new(1, 10)),
1333            Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 10)),
1334            Value::List(ListValue::new(
1335                vec![Value::Int64(1i64)],
1336                ConcreteDataType::int64_datatype(),
1337            )),
1338            Value::List(ListValue::new(
1339                vec![Value::Float64(1.0f64.into())],
1340                ConcreteDataType::float64_datatype(),
1341            )),
1342            Value::List(ListValue::new(
1343                vec![Value::String("tom".into())],
1344                ConcreteDataType::string_datatype(),
1345            )),
1346            Value::List(ListValue::new(
1347                vec![Value::Timestamp(Timestamp::new(1i64, TimeUnit::Second))],
1348                ConcreteDataType::timestamp_second_datatype(),
1349            )),
1350        ];
1351        let query_context = QueryContextBuilder::default()
1352            .configuration_parameter(Default::default())
1353            .build()
1354            .into();
1355        let mut builder = DataRowEncoder::new(Arc::new(schema));
1356        for (value, datatype) in values.iter().zip(datatypes) {
1357            encode_value(&query_context, value, &mut builder, &datatype).unwrap();
1358        }
1359    }
1360
1361    #[test]
1362    fn test_invalid_parameter() {
1363        // test for refactor with PgErrorCode
1364        let msg = "invalid_parameter_count";
1365        let error = invalid_parameter_error(msg, None);
1366        if let PgWireError::UserError(value) = error {
1367            assert_eq!("ERROR", value.severity);
1368            assert_eq!("22023", value.code);
1369            assert_eq!(msg, value.message);
1370        } else {
1371            panic!("test_invalid_parameter failed");
1372        }
1373    }
1374}