servers/postgres/
types.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15mod bytea;
16mod datetime;
17mod error;
18mod interval;
19
20use std::collections::HashMap;
21use std::ops::Deref;
22
23use chrono::{NaiveDate, NaiveDateTime, NaiveTime};
24use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
25use datafusion_common::ScalarValue;
26use datafusion_expr::LogicalPlan;
27use datatypes::arrow::datatypes::DataType as ArrowDataType;
28use datatypes::prelude::{ConcreteDataType, Value};
29use datatypes::schema::Schema;
30use datatypes::types::{json_type_value_to_string, IntervalType, TimestampType};
31use datatypes::value::ListValue;
32use pgwire::api::portal::{Format, Portal};
33use pgwire::api::results::{DataRowEncoder, FieldInfo};
34use pgwire::api::Type;
35use pgwire::error::{PgWireError, PgWireResult};
36use session::context::QueryContextRef;
37use session::session_config::PGByteaOutputValue;
38
39use self::bytea::{EscapeOutputBytea, HexOutputBytea};
40use self::datetime::{StylingDate, StylingDateTime};
41pub use self::error::{PgErrorCode, PgErrorSeverity};
42use self::interval::PgInterval;
43use crate::error::{self as server_error, Error, Result};
44use crate::SqlPlan;
45
46pub(super) fn schema_to_pg(origin: &Schema, field_formats: &Format) -> Result<Vec<FieldInfo>> {
47    origin
48        .column_schemas()
49        .iter()
50        .enumerate()
51        .map(|(idx, col)| {
52            Ok(FieldInfo::new(
53                col.name.clone(),
54                None,
55                None,
56                type_gt_to_pg(&col.data_type)?,
57                field_formats.format_for(idx),
58            ))
59        })
60        .collect::<Result<Vec<FieldInfo>>>()
61}
62
63fn encode_array(
64    query_ctx: &QueryContextRef,
65    value_list: &ListValue,
66    builder: &mut DataRowEncoder,
67) -> PgWireResult<()> {
68    match value_list.datatype() {
69        &ConcreteDataType::Boolean(_) => {
70            let array = value_list
71                .items()
72                .iter()
73                .map(|v| match v {
74                    Value::Null => Ok(None),
75                    Value::Boolean(v) => Ok(Some(*v)),
76                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
77                        err_msg: format!("Invalid list item type, find {v:?}, expected bool",),
78                    }))),
79                })
80                .collect::<PgWireResult<Vec<Option<bool>>>>()?;
81            builder.encode_field(&array)
82        }
83        &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => {
84            let array = value_list
85                .items()
86                .iter()
87                .map(|v| match v {
88                    Value::Null => Ok(None),
89                    Value::Int8(v) => Ok(Some(*v)),
90                    Value::UInt8(v) => Ok(Some(*v as i8)),
91                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
92                        err_msg: format!(
93                            "Invalid list item type, find {v:?}, expected int8 or uint8",
94                        ),
95                    }))),
96                })
97                .collect::<PgWireResult<Vec<Option<i8>>>>()?;
98            builder.encode_field(&array)
99        }
100        &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => {
101            let array = value_list
102                .items()
103                .iter()
104                .map(|v| match v {
105                    Value::Null => Ok(None),
106                    Value::Int16(v) => Ok(Some(*v)),
107                    Value::UInt16(v) => Ok(Some(*v as i16)),
108                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
109                        err_msg: format!(
110                            "Invalid list item type, find {v:?}, expected int16 or uint16",
111                        ),
112                    }))),
113                })
114                .collect::<PgWireResult<Vec<Option<i16>>>>()?;
115            builder.encode_field(&array)
116        }
117        &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => {
118            let array = value_list
119                .items()
120                .iter()
121                .map(|v| match v {
122                    Value::Null => Ok(None),
123                    Value::Int32(v) => Ok(Some(*v)),
124                    Value::UInt32(v) => Ok(Some(*v as i32)),
125                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
126                        err_msg: format!(
127                            "Invalid list item type, find {v:?}, expected int32 or uint32",
128                        ),
129                    }))),
130                })
131                .collect::<PgWireResult<Vec<Option<i32>>>>()?;
132            builder.encode_field(&array)
133        }
134        &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => {
135            let array = value_list
136                .items()
137                .iter()
138                .map(|v| match v {
139                    Value::Null => Ok(None),
140                    Value::Int64(v) => Ok(Some(*v)),
141                    Value::UInt64(v) => Ok(Some(*v as i64)),
142                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
143                        err_msg: format!(
144                            "Invalid list item type, find {v:?}, expected int64 or uint64",
145                        ),
146                    }))),
147                })
148                .collect::<PgWireResult<Vec<Option<i64>>>>()?;
149            builder.encode_field(&array)
150        }
151        &ConcreteDataType::Float32(_) => {
152            let array = value_list
153                .items()
154                .iter()
155                .map(|v| match v {
156                    Value::Null => Ok(None),
157                    Value::Float32(v) => Ok(Some(v.0)),
158                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
159                        err_msg: format!("Invalid list item type, find {v:?}, expected float32",),
160                    }))),
161                })
162                .collect::<PgWireResult<Vec<Option<f32>>>>()?;
163            builder.encode_field(&array)
164        }
165        &ConcreteDataType::Float64(_) => {
166            let array = value_list
167                .items()
168                .iter()
169                .map(|v| match v {
170                    Value::Null => Ok(None),
171                    Value::Float64(v) => Ok(Some(v.0)),
172                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
173                        err_msg: format!("Invalid list item type, find {v:?}, expected float64",),
174                    }))),
175                })
176                .collect::<PgWireResult<Vec<Option<f64>>>>()?;
177            builder.encode_field(&array)
178        }
179        &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => {
180            let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
181
182            match *bytea_output {
183                PGByteaOutputValue::ESCAPE => {
184                    let array = value_list
185                        .items()
186                        .iter()
187                        .map(|v| match v {
188                            Value::Null => Ok(None),
189                            Value::Binary(v) => Ok(Some(EscapeOutputBytea(v.deref()))),
190
191                            _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
192                                err_msg: format!(
193                                    "Invalid list item type, find {v:?}, expected binary",
194                                ),
195                            }))),
196                        })
197                        .collect::<PgWireResult<Vec<Option<EscapeOutputBytea>>>>()?;
198                    builder.encode_field(&array)
199                }
200                PGByteaOutputValue::HEX => {
201                    let array = value_list
202                        .items()
203                        .iter()
204                        .map(|v| match v {
205                            Value::Null => Ok(None),
206                            Value::Binary(v) => Ok(Some(HexOutputBytea(v.deref()))),
207
208                            _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
209                                err_msg: format!(
210                                    "Invalid list item type, find {v:?}, expected binary",
211                                ),
212                            }))),
213                        })
214                        .collect::<PgWireResult<Vec<Option<HexOutputBytea>>>>()?;
215                    builder.encode_field(&array)
216                }
217            }
218        }
219        &ConcreteDataType::String(_) => {
220            let array = value_list
221                .items()
222                .iter()
223                .map(|v| match v {
224                    Value::Null => Ok(None),
225                    Value::String(v) => Ok(Some(v.as_utf8())),
226                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
227                        err_msg: format!("Invalid list item type, find {v:?}, expected string",),
228                    }))),
229                })
230                .collect::<PgWireResult<Vec<Option<&str>>>>()?;
231            builder.encode_field(&array)
232        }
233        &ConcreteDataType::Date(_) => {
234            let array = value_list
235                .items()
236                .iter()
237                .map(|v| match v {
238                    Value::Null => Ok(None),
239                    Value::Date(v) => {
240                        if let Some(date) = v.to_chrono_date() {
241                            let (style, order) =
242                                *query_ctx.configuration_parameter().pg_datetime_style();
243                            Ok(Some(StylingDate(date, style, order)))
244                        } else {
245                            Err(PgWireError::ApiError(Box::new(Error::Internal {
246                                err_msg: format!("Failed to convert date to postgres type {v:?}",),
247                            })))
248                        }
249                    }
250                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
251                        err_msg: format!("Invalid list item type, find {v:?}, expected date",),
252                    }))),
253                })
254                .collect::<PgWireResult<Vec<Option<StylingDate>>>>()?;
255            builder.encode_field(&array)
256        }
257        &ConcreteDataType::Timestamp(_) => {
258            let array = value_list
259                .items()
260                .iter()
261                .map(|v| match v {
262                    Value::Null => Ok(None),
263                    Value::Timestamp(v) => {
264                        if let Some(datetime) =
265                            v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
266                        {
267                            let (style, order) =
268                                *query_ctx.configuration_parameter().pg_datetime_style();
269                            Ok(Some(StylingDateTime(datetime, style, order)))
270                        } else {
271                            Err(PgWireError::ApiError(Box::new(Error::Internal {
272                                err_msg: format!("Failed to convert date to postgres type {v:?}",),
273                            })))
274                        }
275                    }
276                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
277                        err_msg: format!("Invalid list item type, find {v:?}, expected timestamp",),
278                    }))),
279                })
280                .collect::<PgWireResult<Vec<Option<StylingDateTime>>>>()?;
281            builder.encode_field(&array)
282        }
283        &ConcreteDataType::Time(_) => {
284            let array = value_list
285                .items()
286                .iter()
287                .map(|v| match v {
288                    Value::Null => Ok(None),
289                    Value::Time(v) => Ok(v.to_chrono_time()),
290                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
291                        err_msg: format!("Invalid list item type, find {v:?}, expected time",),
292                    }))),
293                })
294                .collect::<PgWireResult<Vec<Option<NaiveTime>>>>()?;
295            builder.encode_field(&array)
296        }
297        &ConcreteDataType::Interval(_) => {
298            let array = value_list
299                .items()
300                .iter()
301                .map(|v| match v {
302                    Value::Null => Ok(None),
303                    Value::IntervalYearMonth(v) => Ok(Some(PgInterval::from(*v))),
304                    Value::IntervalDayTime(v) => Ok(Some(PgInterval::from(*v))),
305                    Value::IntervalMonthDayNano(v) => Ok(Some(PgInterval::from(*v))),
306                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
307                        err_msg: format!("Invalid list item type, find {v:?}, expected interval",),
308                    }))),
309                })
310                .collect::<PgWireResult<Vec<Option<PgInterval>>>>()?;
311            builder.encode_field(&array)
312        }
313        &ConcreteDataType::Decimal128(_) => {
314            let array = value_list
315                .items()
316                .iter()
317                .map(|v| match v {
318                    Value::Null => Ok(None),
319                    Value::Decimal128(v) => Ok(Some(v.to_string())),
320                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
321                        err_msg: format!("Invalid list item type, find {v:?}, expected decimal",),
322                    }))),
323                })
324                .collect::<PgWireResult<Vec<Option<String>>>>()?;
325            builder.encode_field(&array)
326        }
327        &ConcreteDataType::Json(j) => {
328            let array = value_list
329                .items()
330                .iter()
331                .map(|v| match v {
332                    Value::Null => Ok(None),
333                    Value::Binary(v) => {
334                        let s = json_type_value_to_string(v, &j.format)
335                            .map_err(|e| PgWireError::ApiError(Box::new(e)))?;
336                        Ok(Some(s))
337                    }
338                    _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
339                        err_msg: format!("Invalid list item type, find {v:?}, expected json",),
340                    }))),
341                })
342                .collect::<PgWireResult<Vec<Option<String>>>>()?;
343            builder.encode_field(&array)
344        }
345        _ => Err(PgWireError::ApiError(Box::new(Error::Internal {
346            err_msg: format!(
347                "cannot write array type {:?} in postgres protocol: unimplemented",
348                value_list.datatype()
349            ),
350        }))),
351    }
352}
353
354pub(super) fn encode_value(
355    query_ctx: &QueryContextRef,
356    value: &Value,
357    builder: &mut DataRowEncoder,
358    datatype: &ConcreteDataType,
359) -> PgWireResult<()> {
360    match value {
361        Value::Null => builder.encode_field(&None::<&i8>),
362        Value::Boolean(v) => builder.encode_field(v),
363        Value::UInt8(v) => builder.encode_field(&(*v as i8)),
364        Value::UInt16(v) => builder.encode_field(&(*v as i16)),
365        Value::UInt32(v) => builder.encode_field(v),
366        Value::UInt64(v) => builder.encode_field(&(*v as i64)),
367        Value::Int8(v) => builder.encode_field(v),
368        Value::Int16(v) => builder.encode_field(v),
369        Value::Int32(v) => builder.encode_field(v),
370        Value::Int64(v) => builder.encode_field(v),
371        Value::Float32(v) => builder.encode_field(&v.0),
372        Value::Float64(v) => builder.encode_field(&v.0),
373        Value::String(v) => builder.encode_field(&v.as_utf8()),
374        Value::Binary(v) => match datatype {
375            ConcreteDataType::Json(j) => {
376                let s = json_type_value_to_string(v, &j.format)
377                    .map_err(|e| PgWireError::ApiError(Box::new(e)))?;
378                builder.encode_field(&s)
379            }
380            _ => {
381                let bytea_output = query_ctx.configuration_parameter().postgres_bytea_output();
382                match *bytea_output {
383                    PGByteaOutputValue::ESCAPE => {
384                        builder.encode_field(&EscapeOutputBytea(v.deref()))
385                    }
386                    PGByteaOutputValue::HEX => builder.encode_field(&HexOutputBytea(v.deref())),
387                }
388            }
389        },
390        Value::Date(v) => {
391            if let Some(date) = v.to_chrono_date() {
392                let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
393                builder.encode_field(&StylingDate(date, style, order))
394            } else {
395                Err(PgWireError::ApiError(Box::new(Error::Internal {
396                    err_msg: format!("Failed to convert date to postgres type {v:?}",),
397                })))
398            }
399        }
400        Value::Timestamp(v) => {
401            if let Some(datetime) = v.to_chrono_datetime_with_timezone(Some(&query_ctx.timezone()))
402            {
403                let (style, order) = *query_ctx.configuration_parameter().pg_datetime_style();
404                builder.encode_field(&StylingDateTime(datetime, style, order))
405            } else {
406                Err(PgWireError::ApiError(Box::new(Error::Internal {
407                    err_msg: format!("Failed to convert date to postgres type {v:?}",),
408                })))
409            }
410        }
411        Value::Time(v) => {
412            if let Some(time) = v.to_chrono_time() {
413                builder.encode_field(&time)
414            } else {
415                Err(PgWireError::ApiError(Box::new(Error::Internal {
416                    err_msg: format!("Failed to convert time to postgres type {v:?}",),
417                })))
418            }
419        }
420        Value::IntervalYearMonth(v) => builder.encode_field(&PgInterval::from(*v)),
421        Value::IntervalDayTime(v) => builder.encode_field(&PgInterval::from(*v)),
422        Value::IntervalMonthDayNano(v) => builder.encode_field(&PgInterval::from(*v)),
423        Value::Decimal128(v) => builder.encode_field(&v.to_string()),
424        Value::Duration(d) => match PgInterval::try_from(*d) {
425            Ok(i) => builder.encode_field(&i),
426            Err(e) => Err(PgWireError::ApiError(Box::new(Error::Internal {
427                err_msg: e.to_string(),
428            }))),
429        },
430        Value::List(values) => encode_array(query_ctx, values, builder),
431    }
432}
433
434pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
435    match origin {
436        &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
437        &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
438        &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR),
439        &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2),
440        &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4),
441        &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8),
442        &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
443        &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
444        &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
445        &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
446        &ConcreteDataType::Date(_) => Ok(Type::DATE),
447        &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
448        &ConcreteDataType::Time(_) => Ok(Type::TIME),
449        &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
450        &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
451        &ConcreteDataType::Json(_) => Ok(Type::JSON),
452        ConcreteDataType::List(list) => match list.item_type() {
453            &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
454            &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
455            &ConcreteDataType::Int8(_) | &ConcreteDataType::UInt8(_) => Ok(Type::CHAR_ARRAY),
456            &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT2_ARRAY),
457            &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT4_ARRAY),
458            &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt64(_) => Ok(Type::INT8_ARRAY),
459            &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
460            &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
461            &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
462            &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
463            &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
464            &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
465            &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
466            &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
467            &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
468            &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
469            &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
470            &ConcreteDataType::Dictionary(_)
471            | &ConcreteDataType::Vector(_)
472            | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
473                data_type: origin,
474                reason: "not implemented",
475            }
476            .fail(),
477        },
478        &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
479            data_type: origin,
480            reason: "not implemented",
481        }
482        .fail(),
483        &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
484    }
485}
486
487#[allow(dead_code)]
488pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
489    // Note that we only support a small amount of pg data types
490    match origin {
491        &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
492        &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
493        &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
494        &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
495        &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
496        &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
497        &Type::TIMESTAMP => Ok(ConcreteDataType::timestamp_datatype(
498            common_time::timestamp::TimeUnit::Millisecond,
499        )),
500        &Type::DATE => Ok(ConcreteDataType::date_datatype()),
501        &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
502            common_time::timestamp::TimeUnit::Microsecond,
503        )),
504        &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
505            ConcreteDataType::int8_datatype(),
506        )),
507        &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(
508            ConcreteDataType::int16_datatype(),
509        )),
510        &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(
511            ConcreteDataType::int32_datatype(),
512        )),
513        &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(
514            ConcreteDataType::int64_datatype(),
515        )),
516        &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(
517            ConcreteDataType::string_datatype(),
518        )),
519        _ => server_error::InternalSnafu {
520            err_msg: format!("unimplemented datatype {origin:?}"),
521        }
522        .fail(),
523    }
524}
525
526pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
527    // the index is managed from portal's parameters count so it's safe to
528    // unwrap here.
529    let param_type = portal.statement.parameter_types.get(idx).unwrap();
530    match param_type {
531        &Type::VARCHAR | &Type::TEXT => Ok(format!(
532            "'{}'",
533            portal
534                .parameter::<String>(idx, param_type)?
535                .as_deref()
536                .unwrap_or("")
537        )),
538        &Type::BOOL => Ok(portal
539            .parameter::<bool>(idx, param_type)?
540            .map(|v| v.to_string())
541            .unwrap_or_else(|| "".to_owned())),
542        &Type::INT4 => Ok(portal
543            .parameter::<i32>(idx, param_type)?
544            .map(|v| v.to_string())
545            .unwrap_or_else(|| "".to_owned())),
546        &Type::INT8 => Ok(portal
547            .parameter::<i64>(idx, param_type)?
548            .map(|v| v.to_string())
549            .unwrap_or_else(|| "".to_owned())),
550        &Type::FLOAT4 => Ok(portal
551            .parameter::<f32>(idx, param_type)?
552            .map(|v| v.to_string())
553            .unwrap_or_else(|| "".to_owned())),
554        &Type::FLOAT8 => Ok(portal
555            .parameter::<f64>(idx, param_type)?
556            .map(|v| v.to_string())
557            .unwrap_or_else(|| "".to_owned())),
558        &Type::DATE => Ok(portal
559            .parameter::<NaiveDate>(idx, param_type)?
560            .map(|v| v.format("%Y-%m-%d").to_string())
561            .unwrap_or_else(|| "".to_owned())),
562        &Type::TIMESTAMP => Ok(portal
563            .parameter::<NaiveDateTime>(idx, param_type)?
564            .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
565            .unwrap_or_else(|| "".to_owned())),
566        &Type::INTERVAL => Ok(portal
567            .parameter::<PgInterval>(idx, param_type)?
568            .map(|v| v.to_string())
569            .unwrap_or_else(|| "".to_owned())),
570        _ => Err(invalid_parameter_error(
571            "unsupported_parameter_type",
572            Some(param_type.to_string()),
573        )),
574    }
575}
576
577pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
578    let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
579    error_info.detail = detail;
580    PgWireError::UserError(Box::new(error_info))
581}
582
583fn to_timestamp_scalar_value<T>(
584    data: Option<T>,
585    unit: &TimestampType,
586    ctype: &ConcreteDataType,
587) -> PgWireResult<ScalarValue>
588where
589    T: Into<i64>,
590{
591    if let Some(n) = data {
592        Value::Timestamp(unit.create_timestamp(n.into()))
593            .try_to_scalar_value(ctype)
594            .map_err(|e| PgWireError::ApiError(Box::new(e)))
595    } else {
596        Ok(ScalarValue::Null)
597    }
598}
599
600pub(super) fn parameters_to_scalar_values(
601    plan: &LogicalPlan,
602    portal: &Portal<SqlPlan>,
603) -> PgWireResult<Vec<ScalarValue>> {
604    let param_count = portal.parameter_len();
605    let mut results = Vec::with_capacity(param_count);
606
607    let client_param_types = &portal.statement.parameter_types;
608    let param_types = plan
609        .get_parameter_types()
610        .map_err(|e| PgWireError::ApiError(Box::new(e)))?
611        .into_iter()
612        .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
613        .collect::<HashMap<_, _>>();
614
615    for idx in 0..param_count {
616        let server_type = param_types
617            .get(&format!("${}", idx + 1))
618            .and_then(|t| t.as_ref());
619
620        let client_type = if let Some(client_given_type) = client_param_types.get(idx) {
621            client_given_type.clone()
622        } else if let Some(server_provided_type) = &server_type {
623            type_gt_to_pg(server_provided_type).map_err(|e| PgWireError::ApiError(Box::new(e)))?
624        } else {
625            return Err(invalid_parameter_error(
626                "unknown_parameter_type",
627                Some(format!(
628                    "Cannot get parameter type information for parameter {}",
629                    idx
630                )),
631            ));
632        };
633
634        let value = match &client_type {
635            &Type::VARCHAR | &Type::TEXT => {
636                let data = portal.parameter::<String>(idx, &client_type)?;
637                if let Some(server_type) = &server_type {
638                    match server_type {
639                        ConcreteDataType::String(_) => ScalarValue::Utf8(data),
640                        _ => {
641                            return Err(invalid_parameter_error(
642                                "invalid_parameter_type",
643                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
644                            ))
645                        }
646                    }
647                } else {
648                    ScalarValue::Utf8(data)
649                }
650            }
651            &Type::BOOL => {
652                let data = portal.parameter::<bool>(idx, &client_type)?;
653                if let Some(server_type) = &server_type {
654                    match server_type {
655                        ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
656                        _ => {
657                            return Err(invalid_parameter_error(
658                                "invalid_parameter_type",
659                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
660                            ))
661                        }
662                    }
663                } else {
664                    ScalarValue::Boolean(data)
665                }
666            }
667            &Type::INT2 => {
668                let data = portal.parameter::<i16>(idx, &client_type)?;
669                if let Some(server_type) = &server_type {
670                    match server_type {
671                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
672                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
673                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
674                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
675                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
676                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
677                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
678                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
679                        ConcreteDataType::Timestamp(unit) => {
680                            to_timestamp_scalar_value(data, unit, server_type)?
681                        }
682                        _ => {
683                            return Err(invalid_parameter_error(
684                                "invalid_parameter_type",
685                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
686                            ))
687                        }
688                    }
689                } else {
690                    ScalarValue::Int16(data)
691                }
692            }
693            &Type::INT4 => {
694                let data = portal.parameter::<i32>(idx, &client_type)?;
695                if let Some(server_type) = &server_type {
696                    match server_type {
697                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
698                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
699                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
700                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
701                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
702                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
703                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
704                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
705                        ConcreteDataType::Timestamp(unit) => {
706                            to_timestamp_scalar_value(data, unit, server_type)?
707                        }
708                        _ => {
709                            return Err(invalid_parameter_error(
710                                "invalid_parameter_type",
711                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
712                            ))
713                        }
714                    }
715                } else {
716                    ScalarValue::Int32(data)
717                }
718            }
719            &Type::INT8 => {
720                let data = portal.parameter::<i64>(idx, &client_type)?;
721                if let Some(server_type) = &server_type {
722                    match server_type {
723                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
724                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
725                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
726                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
727                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
728                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
729                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
730                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
731                        ConcreteDataType::Timestamp(unit) => {
732                            to_timestamp_scalar_value(data, unit, server_type)?
733                        }
734                        _ => {
735                            return Err(invalid_parameter_error(
736                                "invalid_parameter_type",
737                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
738                            ))
739                        }
740                    }
741                } else {
742                    ScalarValue::Int64(data)
743                }
744            }
745            &Type::FLOAT4 => {
746                let data = portal.parameter::<f32>(idx, &client_type)?;
747                if let Some(server_type) = &server_type {
748                    match server_type {
749                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
750                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
751                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
752                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
753                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
754                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
755                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
756                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
757                        ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
758                        ConcreteDataType::Float64(_) => {
759                            ScalarValue::Float64(data.map(|n| n as f64))
760                        }
761                        _ => {
762                            return Err(invalid_parameter_error(
763                                "invalid_parameter_type",
764                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
765                            ))
766                        }
767                    }
768                } else {
769                    ScalarValue::Float32(data)
770                }
771            }
772            &Type::FLOAT8 => {
773                let data = portal.parameter::<f64>(idx, &client_type)?;
774                if let Some(server_type) = &server_type {
775                    match server_type {
776                        ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
777                        ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
778                        ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
779                        ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
780                        ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
781                        ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
782                        ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
783                        ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
784                        ConcreteDataType::Float32(_) => {
785                            ScalarValue::Float32(data.map(|n| n as f32))
786                        }
787                        ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
788                        _ => {
789                            return Err(invalid_parameter_error(
790                                "invalid_parameter_type",
791                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
792                            ))
793                        }
794                    }
795                } else {
796                    ScalarValue::Float64(data)
797                }
798            }
799            &Type::TIMESTAMP => {
800                let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
801                if let Some(server_type) = &server_type {
802                    match server_type {
803                        ConcreteDataType::Timestamp(unit) => match *unit {
804                            TimestampType::Second(_) => ScalarValue::TimestampSecond(
805                                data.map(|ts| ts.and_utc().timestamp()),
806                                None,
807                            ),
808                            TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
809                                data.map(|ts| ts.and_utc().timestamp_millis()),
810                                None,
811                            ),
812                            TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
813                                data.map(|ts| ts.and_utc().timestamp_micros()),
814                                None,
815                            ),
816                            TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
817                                data.map(|ts| ts.and_utc().timestamp_micros()),
818                                None,
819                            ),
820                        },
821                        _ => {
822                            return Err(invalid_parameter_error(
823                                "invalid_parameter_type",
824                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
825                            ))
826                        }
827                    }
828                } else {
829                    ScalarValue::TimestampMillisecond(
830                        data.map(|ts| ts.and_utc().timestamp_millis()),
831                        None,
832                    )
833                }
834            }
835            &Type::DATE => {
836                let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
837                if let Some(server_type) = &server_type {
838                    match server_type {
839                        ConcreteDataType::Date(_) => ScalarValue::Date32(data.map(|d| {
840                            (d - NaiveDate::from(NaiveDateTime::UNIX_EPOCH)).num_days() as i32
841                        })),
842                        _ => {
843                            return Err(invalid_parameter_error(
844                                "invalid_parameter_type",
845                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
846                            ));
847                        }
848                    }
849                } else {
850                    ScalarValue::Date32(data.map(|d| {
851                        (d - NaiveDate::from(NaiveDateTime::UNIX_EPOCH)).num_days() as i32
852                    }))
853                }
854            }
855            &Type::INTERVAL => {
856                let data = portal.parameter::<PgInterval>(idx, &client_type)?;
857                if let Some(server_type) = &server_type {
858                    match server_type {
859                        ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
860                            ScalarValue::IntervalYearMonth(
861                                data.map(|i| {
862                                    if i.days != 0 || i.microseconds != 0 {
863                                        Err(invalid_parameter_error(
864                                            "invalid_parameter_type",
865                                            Some(format!(
866                                                "Expected: {}, found: {}",
867                                                server_type, client_type
868                                            )),
869                                        ))
870                                    } else {
871                                        Ok(IntervalYearMonth::new(i.months).to_i32())
872                                    }
873                                })
874                                .transpose()?,
875                            )
876                        }
877                        ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
878                            ScalarValue::IntervalDayTime(
879                                data.map(|i| {
880                                    if i.months != 0 || i.microseconds % 1000 != 0 {
881                                        Err(invalid_parameter_error(
882                                            "invalid_parameter_type",
883                                            Some(format!(
884                                                "Expected: {}, found: {}",
885                                                server_type, client_type
886                                            )),
887                                        ))
888                                    } else {
889                                        Ok(IntervalDayTime::new(
890                                            i.days,
891                                            (i.microseconds / 1000) as i32,
892                                        )
893                                        .into())
894                                    }
895                                })
896                                .transpose()?,
897                            )
898                        }
899                        ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
900                            ScalarValue::IntervalMonthDayNano(
901                                data.map(|i| IntervalMonthDayNano::from(i).into()),
902                            )
903                        }
904                        _ => {
905                            return Err(invalid_parameter_error(
906                                "invalid_parameter_type",
907                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
908                            ));
909                        }
910                    }
911                } else {
912                    ScalarValue::IntervalMonthDayNano(
913                        data.map(|i| IntervalMonthDayNano::from(i).into()),
914                    )
915                }
916            }
917            &Type::BYTEA => {
918                let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
919                if let Some(server_type) = &server_type {
920                    match server_type {
921                        ConcreteDataType::String(_) => {
922                            ScalarValue::Utf8(data.map(|d| String::from_utf8_lossy(&d).to_string()))
923                        }
924                        ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
925                        _ => {
926                            return Err(invalid_parameter_error(
927                                "invalid_parameter_type",
928                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
929                            ));
930                        }
931                    }
932                } else {
933                    ScalarValue::Binary(data)
934                }
935            }
936            &Type::JSONB => {
937                let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
938                if let Some(server_type) = &server_type {
939                    match server_type {
940                        ConcreteDataType::Binary(_) => {
941                            ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
942                        }
943                        _ => {
944                            return Err(invalid_parameter_error(
945                                "invalid_parameter_type",
946                                Some(format!("Expected: {}, found: {}", server_type, client_type)),
947                            ));
948                        }
949                    }
950                } else {
951                    ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
952                }
953            }
954            &Type::INT2_ARRAY => {
955                let data = portal.parameter::<Vec<i16>>(idx, &client_type)?;
956                if let Some(data) = data {
957                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
958                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
959                } else {
960                    ScalarValue::Null
961                }
962            }
963            &Type::INT4_ARRAY => {
964                let data = portal.parameter::<Vec<i32>>(idx, &client_type)?;
965                if let Some(data) = data {
966                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
967                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
968                } else {
969                    ScalarValue::Null
970                }
971            }
972            &Type::INT8_ARRAY => {
973                let data = portal.parameter::<Vec<i64>>(idx, &client_type)?;
974                if let Some(data) = data {
975                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
976                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
977                } else {
978                    ScalarValue::Null
979                }
980            }
981            &Type::VARCHAR_ARRAY => {
982                let data = portal.parameter::<Vec<String>>(idx, &client_type)?;
983                if let Some(data) = data {
984                    let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
985                    ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
986                } else {
987                    ScalarValue::Null
988                }
989            }
990            _ => Err(invalid_parameter_error(
991                "unsupported_parameter_value",
992                Some(format!("Found type: {}", client_type)),
993            ))?,
994        };
995
996        results.push(value);
997    }
998
999    Ok(results)
1000}
1001
1002pub(super) fn param_types_to_pg_types(
1003    param_types: &HashMap<String, Option<ConcreteDataType>>,
1004) -> Result<Vec<Type>> {
1005    let param_count = param_types.len();
1006    let mut types = Vec::with_capacity(param_count);
1007    for i in 0..param_count {
1008        if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1009            let pg_type = type_gt_to_pg(param_type)?;
1010            types.push(pg_type);
1011        } else {
1012            types.push(Type::UNKNOWN);
1013        }
1014    }
1015    Ok(types)
1016}
1017
1018#[cfg(test)]
1019mod test {
1020    use std::sync::Arc;
1021
1022    use common_time::interval::IntervalUnit;
1023    use common_time::timestamp::TimeUnit;
1024    use common_time::Timestamp;
1025    use datatypes::schema::{ColumnSchema, Schema};
1026    use datatypes::value::ListValue;
1027    use pgwire::api::results::{FieldFormat, FieldInfo};
1028    use pgwire::api::Type;
1029    use session::context::QueryContextBuilder;
1030
1031    use super::*;
1032
1033    #[test]
1034    fn test_schema_convert() {
1035        let column_schemas = vec![
1036            ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1037            ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1038            ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1039            ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1040            ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1041            ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1042            ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1043            ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1044            ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1045            ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1046            ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1047            ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1048            ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1049            ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1050            ColumnSchema::new(
1051                "timestamps",
1052                ConcreteDataType::timestamp_millisecond_datatype(),
1053                true,
1054            ),
1055            ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1056            ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1057            ColumnSchema::new(
1058                "intervals",
1059                ConcreteDataType::interval_month_day_nano_datatype(),
1060                true,
1061            ),
1062        ];
1063        let pg_field_info = vec![
1064            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1065            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1066            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1067            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1068            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1069            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1070            FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1071            FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1072            FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1073            FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1074            FieldInfo::new(
1075                "float32s".into(),
1076                None,
1077                None,
1078                Type::FLOAT4,
1079                FieldFormat::Text,
1080            ),
1081            FieldInfo::new(
1082                "float64s".into(),
1083                None,
1084                None,
1085                Type::FLOAT8,
1086                FieldFormat::Text,
1087            ),
1088            FieldInfo::new(
1089                "binaries".into(),
1090                None,
1091                None,
1092                Type::BYTEA,
1093                FieldFormat::Text,
1094            ),
1095            FieldInfo::new(
1096                "strings".into(),
1097                None,
1098                None,
1099                Type::VARCHAR,
1100                FieldFormat::Text,
1101            ),
1102            FieldInfo::new(
1103                "timestamps".into(),
1104                None,
1105                None,
1106                Type::TIMESTAMP,
1107                FieldFormat::Text,
1108            ),
1109            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1110            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1111            FieldInfo::new(
1112                "intervals".into(),
1113                None,
1114                None,
1115                Type::INTERVAL,
1116                FieldFormat::Text,
1117            ),
1118        ];
1119        let schema = Schema::new(column_schemas);
1120        let fs = schema_to_pg(&schema, &Format::UnifiedText).unwrap();
1121        assert_eq!(fs, pg_field_info);
1122    }
1123
1124    #[test]
1125    fn test_encode_text_format_data() {
1126        let schema = vec![
1127            FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1128            FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1129            FieldInfo::new("uint8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1130            FieldInfo::new("uint16s".into(), None, None, Type::INT2, FieldFormat::Text),
1131            FieldInfo::new("uint32s".into(), None, None, Type::INT4, FieldFormat::Text),
1132            FieldInfo::new("uint64s".into(), None, None, Type::INT8, FieldFormat::Text),
1133            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1134            FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1135            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1136            FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1137            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1138            FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1139            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1140            FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1141            FieldInfo::new(
1142                "float32s".into(),
1143                None,
1144                None,
1145                Type::FLOAT4,
1146                FieldFormat::Text,
1147            ),
1148            FieldInfo::new(
1149                "float32s".into(),
1150                None,
1151                None,
1152                Type::FLOAT4,
1153                FieldFormat::Text,
1154            ),
1155            FieldInfo::new(
1156                "float32s".into(),
1157                None,
1158                None,
1159                Type::FLOAT4,
1160                FieldFormat::Text,
1161            ),
1162            FieldInfo::new(
1163                "float64s".into(),
1164                None,
1165                None,
1166                Type::FLOAT8,
1167                FieldFormat::Text,
1168            ),
1169            FieldInfo::new(
1170                "float64s".into(),
1171                None,
1172                None,
1173                Type::FLOAT8,
1174                FieldFormat::Text,
1175            ),
1176            FieldInfo::new(
1177                "float64s".into(),
1178                None,
1179                None,
1180                Type::FLOAT8,
1181                FieldFormat::Text,
1182            ),
1183            FieldInfo::new(
1184                "strings".into(),
1185                None,
1186                None,
1187                Type::VARCHAR,
1188                FieldFormat::Text,
1189            ),
1190            FieldInfo::new(
1191                "binaries".into(),
1192                None,
1193                None,
1194                Type::BYTEA,
1195                FieldFormat::Text,
1196            ),
1197            FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1198            FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1199            FieldInfo::new(
1200                "timestamps".into(),
1201                None,
1202                None,
1203                Type::TIMESTAMP,
1204                FieldFormat::Text,
1205            ),
1206            FieldInfo::new(
1207                "interval_year_month".into(),
1208                None,
1209                None,
1210                Type::INTERVAL,
1211                FieldFormat::Text,
1212            ),
1213            FieldInfo::new(
1214                "interval_day_time".into(),
1215                None,
1216                None,
1217                Type::INTERVAL,
1218                FieldFormat::Text,
1219            ),
1220            FieldInfo::new(
1221                "interval_month_day_nano".into(),
1222                None,
1223                None,
1224                Type::INTERVAL,
1225                FieldFormat::Text,
1226            ),
1227            FieldInfo::new(
1228                "int_list".into(),
1229                None,
1230                None,
1231                Type::INT8_ARRAY,
1232                FieldFormat::Text,
1233            ),
1234            FieldInfo::new(
1235                "float_list".into(),
1236                None,
1237                None,
1238                Type::FLOAT8_ARRAY,
1239                FieldFormat::Text,
1240            ),
1241            FieldInfo::new(
1242                "string_list".into(),
1243                None,
1244                None,
1245                Type::VARCHAR_ARRAY,
1246                FieldFormat::Text,
1247            ),
1248            FieldInfo::new(
1249                "timestamp_list".into(),
1250                None,
1251                None,
1252                Type::TIMESTAMP_ARRAY,
1253                FieldFormat::Text,
1254            ),
1255        ];
1256
1257        let datatypes = vec![
1258            ConcreteDataType::null_datatype(),
1259            ConcreteDataType::boolean_datatype(),
1260            ConcreteDataType::uint8_datatype(),
1261            ConcreteDataType::uint16_datatype(),
1262            ConcreteDataType::uint32_datatype(),
1263            ConcreteDataType::uint64_datatype(),
1264            ConcreteDataType::int8_datatype(),
1265            ConcreteDataType::int8_datatype(),
1266            ConcreteDataType::int16_datatype(),
1267            ConcreteDataType::int16_datatype(),
1268            ConcreteDataType::int32_datatype(),
1269            ConcreteDataType::int32_datatype(),
1270            ConcreteDataType::int64_datatype(),
1271            ConcreteDataType::int64_datatype(),
1272            ConcreteDataType::float32_datatype(),
1273            ConcreteDataType::float32_datatype(),
1274            ConcreteDataType::float32_datatype(),
1275            ConcreteDataType::float64_datatype(),
1276            ConcreteDataType::float64_datatype(),
1277            ConcreteDataType::float64_datatype(),
1278            ConcreteDataType::string_datatype(),
1279            ConcreteDataType::binary_datatype(),
1280            ConcreteDataType::date_datatype(),
1281            ConcreteDataType::time_datatype(TimeUnit::Second),
1282            ConcreteDataType::timestamp_datatype(TimeUnit::Second),
1283            ConcreteDataType::interval_datatype(IntervalUnit::YearMonth),
1284            ConcreteDataType::interval_datatype(IntervalUnit::DayTime),
1285            ConcreteDataType::interval_datatype(IntervalUnit::MonthDayNano),
1286            ConcreteDataType::list_datatype(ConcreteDataType::int64_datatype()),
1287            ConcreteDataType::list_datatype(ConcreteDataType::float64_datatype()),
1288            ConcreteDataType::list_datatype(ConcreteDataType::string_datatype()),
1289            ConcreteDataType::list_datatype(ConcreteDataType::timestamp_second_datatype()),
1290        ];
1291        let values = vec![
1292            Value::Null,
1293            Value::Boolean(true),
1294            Value::UInt8(u8::MAX),
1295            Value::UInt16(u16::MAX),
1296            Value::UInt32(u32::MAX),
1297            Value::UInt64(u64::MAX),
1298            Value::Int8(i8::MAX),
1299            Value::Int8(i8::MIN),
1300            Value::Int16(i16::MAX),
1301            Value::Int16(i16::MIN),
1302            Value::Int32(i32::MAX),
1303            Value::Int32(i32::MIN),
1304            Value::Int64(i64::MAX),
1305            Value::Int64(i64::MIN),
1306            Value::Float32(f32::MAX.into()),
1307            Value::Float32(f32::MIN.into()),
1308            Value::Float32(0f32.into()),
1309            Value::Float64(f64::MAX.into()),
1310            Value::Float64(f64::MIN.into()),
1311            Value::Float64(0f64.into()),
1312            Value::String("greptime".into()),
1313            Value::Binary("greptime".as_bytes().into()),
1314            Value::Date(1001i32.into()),
1315            Value::Time(1001i64.into()),
1316            Value::Timestamp(1000001i64.into()),
1317            Value::IntervalYearMonth(IntervalYearMonth::new(1)),
1318            Value::IntervalDayTime(IntervalDayTime::new(1, 10)),
1319            Value::IntervalMonthDayNano(IntervalMonthDayNano::new(1, 1, 10)),
1320            Value::List(ListValue::new(
1321                vec![Value::Int64(1i64)],
1322                ConcreteDataType::int64_datatype(),
1323            )),
1324            Value::List(ListValue::new(
1325                vec![Value::Float64(1.0f64.into())],
1326                ConcreteDataType::float64_datatype(),
1327            )),
1328            Value::List(ListValue::new(
1329                vec![Value::String("tom".into())],
1330                ConcreteDataType::string_datatype(),
1331            )),
1332            Value::List(ListValue::new(
1333                vec![Value::Timestamp(Timestamp::new(1i64, TimeUnit::Second))],
1334                ConcreteDataType::timestamp_second_datatype(),
1335            )),
1336        ];
1337        let query_context = QueryContextBuilder::default()
1338            .configuration_parameter(Default::default())
1339            .build()
1340            .into();
1341        let mut builder = DataRowEncoder::new(Arc::new(schema));
1342        for (value, datatype) in values.iter().zip(datatypes) {
1343            encode_value(&query_context, value, &mut builder, &datatype).unwrap();
1344        }
1345    }
1346
1347    #[test]
1348    fn test_invalid_parameter() {
1349        // test for refactor with PgErrorCode
1350        let msg = "invalid_parameter_count";
1351        let error = invalid_parameter_error(msg, None);
1352        if let PgWireError::UserError(value) = error {
1353            assert_eq!("ERROR", value.severity);
1354            assert_eq!("22023", value.code);
1355            assert_eq!(msg, value.message);
1356        } else {
1357            panic!("test_invalid_parameter failed");
1358        }
1359    }
1360}