1mod error;
16
17use std::collections::HashMap;
18use std::sync::Arc;
19
20use arrow::array::{Array, AsArray};
21use arrow_pg::encoder::encode_value;
22use arrow_pg::list_encoder::encode_list;
23use arrow_schema::{DataType, TimeUnit};
24use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime};
25use common_recordbatch::RecordBatch;
26use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
27use datafusion_common::ScalarValue;
28use datafusion_expr::LogicalPlan;
29use datatypes::arrow::datatypes::DataType as ArrowDataType;
30use datatypes::json::JsonStructureSettings;
31use datatypes::prelude::{ConcreteDataType, Value};
32use datatypes::schema::{Schema, SchemaRef};
33use datatypes::types::{IntervalType, TimestampType, jsonb_to_string};
34use datatypes::value::StructValue;
35use pg_interval::Interval as PgInterval;
36use pgwire::api::Type;
37use pgwire::api::portal::{Format, Portal};
38use pgwire::api::results::{DataRowEncoder, FieldInfo};
39use pgwire::error::{PgWireError, PgWireResult};
40use pgwire::messages::data::DataRow;
41use pgwire::types::format::FormatOptions as PgFormatOptions;
42use session::context::QueryContextRef;
43use snafu::ResultExt;
44
45pub use self::error::{PgErrorCode, PgErrorSeverity};
46use crate::SqlPlan;
47use crate::error::{self as server_error, DataFusionSnafu, Result};
48use crate::postgres::utils::convert_err;
49
50pub(super) fn schema_to_pg(
51 origin: &Schema,
52 field_formats: &Format,
53 format_options: Option<Arc<PgFormatOptions>>,
54) -> Result<Vec<FieldInfo>> {
55 origin
56 .column_schemas()
57 .iter()
58 .enumerate()
59 .map(|(idx, col)| {
60 let mut field_info = FieldInfo::new(
61 col.name.clone(),
62 None,
63 None,
64 type_gt_to_pg(&col.data_type)?,
65 field_formats.format_for(idx),
66 );
67 if let Some(format_options) = &format_options {
68 field_info = field_info.with_format_options(format_options.clone());
69 }
70 Ok(field_info)
71 })
72 .collect::<Result<Vec<FieldInfo>>>()
73}
74
75fn encode_struct(
85 _query_ctx: &QueryContextRef,
86 struct_value: StructValue,
87 builder: &mut DataRowEncoder,
88) -> PgWireResult<()> {
89 let encoding_setting = JsonStructureSettings::Structured(None);
90 let json_value = encoding_setting
91 .decode(Value::Struct(struct_value))
92 .map_err(|e| PgWireError::ApiError(Box::new(e)))?;
93
94 builder.encode_field(&json_value)
95}
96
97pub(crate) struct RecordBatchRowIterator {
98 query_ctx: QueryContextRef,
99 pg_schema: Arc<Vec<FieldInfo>>,
100 schema: SchemaRef,
101 record_batch: arrow::record_batch::RecordBatch,
102 i: usize,
103}
104
105impl Iterator for RecordBatchRowIterator {
106 type Item = PgWireResult<DataRow>;
107
108 fn next(&mut self) -> Option<Self::Item> {
109 let mut encoder = DataRowEncoder::new(self.pg_schema.clone());
110 if self.i < self.record_batch.num_rows() {
111 if let Err(e) = self.encode_row(self.i, &mut encoder) {
112 return Some(Err(e));
113 }
114 self.i += 1;
115 Some(Ok(encoder.take_row()))
116 } else {
117 None
118 }
119 }
120}
121
122impl RecordBatchRowIterator {
123 pub(crate) fn new(
124 query_ctx: QueryContextRef,
125 pg_schema: Arc<Vec<FieldInfo>>,
126 record_batch: RecordBatch,
127 ) -> Self {
128 let schema = record_batch.schema.clone();
129 let record_batch = record_batch.into_df_record_batch();
130 Self {
131 query_ctx,
132 pg_schema,
133 schema,
134 record_batch,
135 i: 0,
136 }
137 }
138
139 fn encode_row(&mut self, i: usize, encoder: &mut DataRowEncoder) -> PgWireResult<()> {
140 let arrow_schema = self.record_batch.schema();
141 for (j, column) in self.record_batch.columns().iter().enumerate() {
142 if column.is_null(i) {
143 encoder.encode_field(&None::<&i8>)?;
144 continue;
145 }
146
147 let pg_field = &self.pg_schema[j];
148 match column.data_type() {
149 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
151 if let ConcreteDataType::Json(_) = &self.schema.column_schemas()[j].data_type {
153 let v = datatypes::arrow_array::binary_array_value(column, i);
154 let s = jsonb_to_string(v).map_err(convert_err)?;
155 encoder.encode_field(&s)?;
156 } else {
157 let arrow_field = arrow_schema.field(j);
159 encode_value(encoder, column, i, arrow_field, pg_field)?;
160 }
161 }
162
163 DataType::List(_) => {
164 let array = column.as_list::<i32>();
165 let items = array.value(i);
166
167 encode_list(encoder, items, pg_field)?;
168 }
169 DataType::Struct(_) => {
170 encode_struct(&self.query_ctx, Default::default(), encoder)?;
171 }
172 _ => {
173 let arrow_field = arrow_schema.field(j);
175 encode_value(encoder, column, i, arrow_field, pg_field)?;
176 }
177 }
178 }
179 Ok(())
180 }
181}
182
183pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
184 match origin {
185 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
186 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
187 &ConcreteDataType::Int8(_) => Ok(Type::CHAR),
188 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2),
189 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4),
190 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8),
191 &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC),
192 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
193 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
194 &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
195 &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
196 &ConcreteDataType::Date(_) => Ok(Type::DATE),
197 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
198 &ConcreteDataType::Time(_) => Ok(Type::TIME),
199 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
200 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
201 &ConcreteDataType::Json(_) => Ok(Type::JSON),
202 ConcreteDataType::List(list) => match list.item_type() {
203 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
204 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
205 &ConcreteDataType::Int8(_) => Ok(Type::CHAR_ARRAY),
206 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2_ARRAY),
207 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4_ARRAY),
208 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8_ARRAY),
209 &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC_ARRAY),
210 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
211 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
212 &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
213 &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
214 &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
215 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
216 &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
217 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
218 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
219 &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
220 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
221 &ConcreteDataType::Struct(_) => Ok(Type::JSON_ARRAY),
222 &ConcreteDataType::Dictionary(_)
223 | &ConcreteDataType::Vector(_)
224 | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
225 data_type: origin,
226 reason: "not implemented",
227 }
228 .fail(),
229 },
230 &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
231 data_type: origin,
232 reason: "not implemented",
233 }
234 .fail(),
235 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
236 &ConcreteDataType::Struct(_) => Ok(Type::JSON),
237 }
238}
239
240#[allow(dead_code)]
241pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
242 match origin {
244 &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
245 &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
246 &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
247 &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
248 &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
249 &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
250 &Type::TIMESTAMP | &Type::TIMESTAMPTZ => Ok(ConcreteDataType::timestamp_datatype(
251 common_time::timestamp::TimeUnit::Millisecond,
252 )),
253 &Type::DATE => Ok(ConcreteDataType::date_datatype()),
254 &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
255 common_time::timestamp::TimeUnit::Microsecond,
256 )),
257 &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
258 ConcreteDataType::int8_datatype(),
259 ))),
260 &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
261 ConcreteDataType::int16_datatype(),
262 ))),
263 &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
264 ConcreteDataType::int32_datatype(),
265 ))),
266 &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
267 ConcreteDataType::int64_datatype(),
268 ))),
269 &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
270 ConcreteDataType::string_datatype(),
271 ))),
272 _ => server_error::InternalSnafu {
273 err_msg: format!("unimplemented datatype {origin:?}"),
274 }
275 .fail(),
276 }
277}
278
279pub(super) fn parameter_to_string(portal: &Portal<SqlPlan>, idx: usize) -> PgWireResult<String> {
280 let param_type = portal
283 .statement
284 .parameter_types
285 .get(idx)
286 .unwrap()
287 .as_ref()
288 .unwrap_or(&Type::UNKNOWN);
289 match param_type {
290 &Type::VARCHAR | &Type::TEXT => Ok(format!(
291 "'{}'",
292 portal
293 .parameter::<String>(idx, param_type)?
294 .as_deref()
295 .unwrap_or("")
296 )),
297 &Type::BOOL => Ok(portal
298 .parameter::<bool>(idx, param_type)?
299 .map(|v| v.to_string())
300 .unwrap_or_else(|| "".to_owned())),
301 &Type::INT4 => Ok(portal
302 .parameter::<i32>(idx, param_type)?
303 .map(|v| v.to_string())
304 .unwrap_or_else(|| "".to_owned())),
305 &Type::INT8 => Ok(portal
306 .parameter::<i64>(idx, param_type)?
307 .map(|v| v.to_string())
308 .unwrap_or_else(|| "".to_owned())),
309 &Type::FLOAT4 => Ok(portal
310 .parameter::<f32>(idx, param_type)?
311 .map(|v| v.to_string())
312 .unwrap_or_else(|| "".to_owned())),
313 &Type::FLOAT8 => Ok(portal
314 .parameter::<f64>(idx, param_type)?
315 .map(|v| v.to_string())
316 .unwrap_or_else(|| "".to_owned())),
317 &Type::DATE => Ok(portal
318 .parameter::<NaiveDate>(idx, param_type)?
319 .map(|v| v.format("%Y-%m-%d").to_string())
320 .unwrap_or_else(|| "".to_owned())),
321 &Type::TIMESTAMP => Ok(portal
322 .parameter::<NaiveDateTime>(idx, param_type)?
323 .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
324 .unwrap_or_else(|| "".to_owned())),
325 &Type::INTERVAL => Ok(portal
326 .parameter::<PgInterval>(idx, param_type)?
327 .map(|v| v.to_sql())
328 .unwrap_or_else(|| "".to_owned())),
329 _ => Err(invalid_parameter_error(
330 "unsupported_parameter_type",
331 Some(param_type.to_string()),
332 )),
333 }
334}
335
336pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
337 let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
338 error_info.detail = detail;
339 PgWireError::UserError(Box::new(error_info))
340}
341
342fn to_timestamp_scalar_value<T>(
343 data: Option<T>,
344 unit: &TimestampType,
345 ctype: &ConcreteDataType,
346) -> PgWireResult<ScalarValue>
347where
348 T: Into<i64>,
349{
350 if let Some(n) = data {
351 Value::Timestamp(unit.create_timestamp(n.into()))
352 .try_to_scalar_value(ctype)
353 .map_err(convert_err)
354 } else {
355 Ok(ScalarValue::Null)
356 }
357}
358
359pub(super) fn parameters_to_scalar_values(
360 plan: &LogicalPlan,
361 portal: &Portal<SqlPlan>,
362) -> PgWireResult<Vec<ScalarValue>> {
363 let param_count = portal.parameter_len();
364 let mut results = Vec::with_capacity(param_count);
365
366 let client_param_types = &portal.statement.parameter_types;
367 let server_param_types = plan
368 .get_parameter_types()
369 .context(DataFusionSnafu)
370 .map_err(convert_err)?
371 .into_iter()
372 .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
373 .collect::<HashMap<_, _>>();
374
375 for idx in 0..param_count {
376 let server_type = server_param_types
377 .get(&format!("${}", idx + 1))
378 .and_then(|t| t.as_ref());
379
380 let client_type = if let Some(Some(client_given_type)) = client_param_types.get(idx) {
381 client_given_type.clone()
382 } else if let Some(server_provided_type) = &server_type {
383 type_gt_to_pg(server_provided_type).map_err(convert_err)?
384 } else {
385 return Err(invalid_parameter_error(
386 "unknown_parameter_type",
387 Some(format!(
388 "Cannot get parameter type information for parameter {}",
389 idx
390 )),
391 ));
392 };
393
394 let value = match &client_type {
395 &Type::VARCHAR | &Type::TEXT => {
396 let data = portal.parameter::<String>(idx, &client_type)?;
397 if let Some(server_type) = &server_type {
398 match server_type {
399 ConcreteDataType::String(t) => {
400 if t.is_large() {
401 ScalarValue::LargeUtf8(data)
402 } else {
403 ScalarValue::Utf8(data)
404 }
405 }
406 _ => {
407 return Err(invalid_parameter_error(
408 "invalid_parameter_type",
409 Some(format!("Expected: {}, found: {}", server_type, client_type)),
410 ));
411 }
412 }
413 } else {
414 ScalarValue::Utf8(data)
415 }
416 }
417 &Type::BOOL => {
418 let data = portal.parameter::<bool>(idx, &client_type)?;
419 if let Some(server_type) = &server_type {
420 match server_type {
421 ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
422 _ => {
423 return Err(invalid_parameter_error(
424 "invalid_parameter_type",
425 Some(format!("Expected: {}, found: {}", server_type, client_type)),
426 ));
427 }
428 }
429 } else {
430 ScalarValue::Boolean(data)
431 }
432 }
433 &Type::INT2 => {
434 let data = portal.parameter::<i16>(idx, &client_type)?;
435 if let Some(server_type) = &server_type {
436 match server_type {
437 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
438 ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
439 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
440 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
441 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
442 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
443 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
444 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
445 ConcreteDataType::Timestamp(unit) => {
446 to_timestamp_scalar_value(data, unit, server_type)?
447 }
448 _ => {
449 return Err(invalid_parameter_error(
450 "invalid_parameter_type",
451 Some(format!("Expected: {}, found: {}", server_type, client_type)),
452 ));
453 }
454 }
455 } else {
456 ScalarValue::Int16(data)
457 }
458 }
459 &Type::INT4 => {
460 let data = portal.parameter::<i32>(idx, &client_type)?;
461 if let Some(server_type) = &server_type {
462 match server_type {
463 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
464 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
465 ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
466 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
467 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
468 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
469 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
470 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
471 ConcreteDataType::Timestamp(unit) => {
472 to_timestamp_scalar_value(data, unit, server_type)?
473 }
474 _ => {
475 return Err(invalid_parameter_error(
476 "invalid_parameter_type",
477 Some(format!("Expected: {}, found: {}", server_type, client_type)),
478 ));
479 }
480 }
481 } else {
482 ScalarValue::Int32(data)
483 }
484 }
485 &Type::INT8 => {
486 let data = portal.parameter::<i64>(idx, &client_type)?;
487 if let Some(server_type) = &server_type {
488 match server_type {
489 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
490 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
491 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
492 ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
493 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
494 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
495 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
496 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
497 ConcreteDataType::Timestamp(unit) => {
498 to_timestamp_scalar_value(data, unit, server_type)?
499 }
500 _ => {
501 return Err(invalid_parameter_error(
502 "invalid_parameter_type",
503 Some(format!("Expected: {}, found: {}", server_type, client_type)),
504 ));
505 }
506 }
507 } else {
508 ScalarValue::Int64(data)
509 }
510 }
511 &Type::FLOAT4 => {
512 let data = portal.parameter::<f32>(idx, &client_type)?;
513 if let Some(server_type) = &server_type {
514 match server_type {
515 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
516 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
517 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
518 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
519 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
520 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
521 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
522 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
523 ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
524 ConcreteDataType::Float64(_) => {
525 ScalarValue::Float64(data.map(|n| n as f64))
526 }
527 _ => {
528 return Err(invalid_parameter_error(
529 "invalid_parameter_type",
530 Some(format!("Expected: {}, found: {}", server_type, client_type)),
531 ));
532 }
533 }
534 } else {
535 ScalarValue::Float32(data)
536 }
537 }
538 &Type::FLOAT8 => {
539 let data = portal.parameter::<f64>(idx, &client_type)?;
540 if let Some(server_type) = &server_type {
541 match server_type {
542 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
543 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
544 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
545 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
546 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
547 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
548 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
549 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
550 ConcreteDataType::Float32(_) => {
551 ScalarValue::Float32(data.map(|n| n as f32))
552 }
553 ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
554 _ => {
555 return Err(invalid_parameter_error(
556 "invalid_parameter_type",
557 Some(format!("Expected: {}, found: {}", server_type, client_type)),
558 ));
559 }
560 }
561 } else {
562 ScalarValue::Float64(data)
563 }
564 }
565 &Type::TIMESTAMP => {
566 let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
567 if let Some(server_type) = &server_type {
568 match server_type {
569 ConcreteDataType::Timestamp(unit) => match *unit {
570 TimestampType::Second(_) => ScalarValue::TimestampSecond(
571 data.map(|ts| ts.and_utc().timestamp()),
572 None,
573 ),
574 TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
575 data.map(|ts| ts.and_utc().timestamp_millis()),
576 None,
577 ),
578 TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
579 data.map(|ts| ts.and_utc().timestamp_micros()),
580 None,
581 ),
582 TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
583 data.and_then(|ts| ts.and_utc().timestamp_nanos_opt()),
584 None,
585 ),
586 },
587 _ => {
588 return Err(invalid_parameter_error(
589 "invalid_parameter_type",
590 Some(format!("Expected: {}, found: {}", server_type, client_type)),
591 ));
592 }
593 }
594 } else {
595 ScalarValue::TimestampMillisecond(
596 data.map(|ts| ts.and_utc().timestamp_millis()),
597 None,
598 )
599 }
600 }
601 &Type::TIMESTAMPTZ => {
602 let data = portal.parameter::<DateTime<FixedOffset>>(idx, &client_type)?;
603 if let Some(server_type) = &server_type {
604 match server_type {
605 ConcreteDataType::Timestamp(unit) => match *unit {
606 TimestampType::Second(_) => {
607 ScalarValue::TimestampSecond(data.map(|ts| ts.timestamp()), None)
608 }
609 TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
610 data.map(|ts| ts.timestamp_millis()),
611 None,
612 ),
613 TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
614 data.map(|ts| ts.timestamp_micros()),
615 None,
616 ),
617 TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
618 data.and_then(|ts| ts.timestamp_nanos_opt()),
619 None,
620 ),
621 },
622 _ => {
623 return Err(invalid_parameter_error(
624 "invalid_parameter_type",
625 Some(format!("Expected: {}, found: {}", server_type, client_type)),
626 ));
627 }
628 }
629 } else {
630 ScalarValue::TimestampMillisecond(data.map(|ts| ts.timestamp_millis()), None)
631 }
632 }
633 &Type::DATE => {
634 let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
635 if let Some(server_type) = &server_type {
636 match server_type {
637 ConcreteDataType::Date(_) => ScalarValue::Date32(
638 data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
639 ),
640 _ => {
641 return Err(invalid_parameter_error(
642 "invalid_parameter_type",
643 Some(format!("Expected: {}, found: {}", server_type, client_type)),
644 ));
645 }
646 }
647 } else {
648 ScalarValue::Date32(
649 data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
650 )
651 }
652 }
653 &Type::INTERVAL => {
654 let data = portal.parameter::<PgInterval>(idx, &client_type)?;
655 if let Some(server_type) = &server_type {
656 match server_type {
657 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
658 ScalarValue::IntervalYearMonth(
659 data.map(|i| {
660 if i.days != 0 || i.microseconds != 0 {
661 Err(invalid_parameter_error(
662 "invalid_parameter_type",
663 Some(format!(
664 "Expected: {}, found: {}",
665 server_type, client_type
666 )),
667 ))
668 } else {
669 Ok(IntervalYearMonth::new(i.months).to_i32())
670 }
671 })
672 .transpose()?,
673 )
674 }
675 ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
676 ScalarValue::IntervalDayTime(
677 data.map(|i| {
678 if i.months != 0 || i.microseconds % 1000 != 0 {
679 Err(invalid_parameter_error(
680 "invalid_parameter_type",
681 Some(format!(
682 "Expected: {}, found: {}",
683 server_type, client_type
684 )),
685 ))
686 } else {
687 Ok(IntervalDayTime::new(
688 i.days,
689 (i.microseconds / 1000) as i32,
690 )
691 .into())
692 }
693 })
694 .transpose()?,
695 )
696 }
697 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
698 ScalarValue::IntervalMonthDayNano(data.map(|i| {
699 IntervalMonthDayNano::new(
700 i.months,
701 i.days,
702 i.microseconds * 1_000i64,
703 )
704 .into()
705 }))
706 }
707 _ => {
708 return Err(invalid_parameter_error(
709 "invalid_parameter_type",
710 Some(format!("Expected: {}, found: {}", server_type, client_type)),
711 ));
712 }
713 }
714 } else {
715 ScalarValue::IntervalMonthDayNano(data.map(|i| {
716 IntervalMonthDayNano::new(i.months, i.days, i.microseconds * 1_000i64)
717 .into()
718 }))
719 }
720 }
721 &Type::BYTEA => {
722 let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
723 if let Some(server_type) = &server_type {
724 match server_type {
725 ConcreteDataType::String(t) => {
726 let s = data.map(|d| String::from_utf8_lossy(&d).to_string());
727 if t.is_large() {
728 ScalarValue::LargeUtf8(s)
729 } else {
730 ScalarValue::Utf8(s)
731 }
732 }
733 ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
734 _ => {
735 return Err(invalid_parameter_error(
736 "invalid_parameter_type",
737 Some(format!("Expected: {}, found: {}", server_type, client_type)),
738 ));
739 }
740 }
741 } else {
742 ScalarValue::Binary(data)
743 }
744 }
745 &Type::JSONB => {
746 let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
747 if let Some(server_type) = &server_type {
748 match server_type {
749 ConcreteDataType::Binary(_) => {
750 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
751 }
752 _ => {
753 return Err(invalid_parameter_error(
754 "invalid_parameter_type",
755 Some(format!("Expected: {}, found: {}", server_type, client_type)),
756 ));
757 }
758 }
759 } else {
760 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
761 }
762 }
763 &Type::INT2_ARRAY => {
764 let data = portal.parameter::<Vec<i16>>(idx, &client_type)?;
765 if let Some(data) = data {
766 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
767 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
768 } else {
769 ScalarValue::Null
770 }
771 }
772 &Type::INT4_ARRAY => {
773 let data = portal.parameter::<Vec<i32>>(idx, &client_type)?;
774 if let Some(data) = data {
775 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
776 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
777 } else {
778 ScalarValue::Null
779 }
780 }
781 &Type::INT8_ARRAY => {
782 let data = portal.parameter::<Vec<i64>>(idx, &client_type)?;
783 if let Some(data) = data {
784 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
785 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
786 } else {
787 ScalarValue::Null
788 }
789 }
790 &Type::VARCHAR_ARRAY => {
791 let data = portal.parameter::<Vec<String>>(idx, &client_type)?;
792 if let Some(data) = data {
793 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
794 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
795 } else {
796 ScalarValue::Null
797 }
798 }
799 &Type::TIMESTAMP_ARRAY => {
800 let data = portal.parameter::<Vec<NaiveDateTime>>(idx, &client_type)?;
801 if let Some(data) = data {
802 if let Some(ConcreteDataType::List(list_type)) = &server_type {
803 match list_type.item_type() {
804 ConcreteDataType::Timestamp(unit) => match *unit {
805 TimestampType::Second(_) => {
806 let values = data
807 .into_iter()
808 .map(|ts| {
809 ScalarValue::TimestampSecond(
810 Some(ts.and_utc().timestamp()),
811 None,
812 )
813 })
814 .collect::<Vec<_>>();
815 ScalarValue::List(ScalarValue::new_list(
816 &values,
817 &ArrowDataType::Timestamp(TimeUnit::Second, None),
818 true,
819 ))
820 }
821 TimestampType::Millisecond(_) => {
822 let values = data
823 .into_iter()
824 .map(|ts| {
825 ScalarValue::TimestampMillisecond(
826 Some(ts.and_utc().timestamp_millis()),
827 None,
828 )
829 })
830 .collect::<Vec<_>>();
831 ScalarValue::List(ScalarValue::new_list(
832 &values,
833 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
834 true,
835 ))
836 }
837 TimestampType::Microsecond(_) => {
838 let values = data
839 .into_iter()
840 .map(|ts| {
841 ScalarValue::TimestampMicrosecond(
842 Some(ts.and_utc().timestamp_micros()),
843 None,
844 )
845 })
846 .collect::<Vec<_>>();
847 ScalarValue::List(ScalarValue::new_list(
848 &values,
849 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
850 true,
851 ))
852 }
853 TimestampType::Nanosecond(_) => {
854 let values = data
855 .into_iter()
856 .filter_map(|ts| {
857 ts.and_utc().timestamp_nanos_opt().map(|nanos| {
858 ScalarValue::TimestampNanosecond(Some(nanos), None)
859 })
860 })
861 .collect::<Vec<_>>();
862 ScalarValue::List(ScalarValue::new_list(
863 &values,
864 &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
865 true,
866 ))
867 }
868 },
869 _ => {
870 return Err(invalid_parameter_error(
871 "invalid_parameter_type",
872 Some(format!(
873 "Expected: {}, found: {}",
874 list_type.item_type(),
875 client_type
876 )),
877 ));
878 }
879 }
880 } else {
881 let values = data
883 .into_iter()
884 .map(|ts| {
885 ScalarValue::TimestampMillisecond(
886 Some(ts.and_utc().timestamp_millis()),
887 None,
888 )
889 })
890 .collect::<Vec<_>>();
891 ScalarValue::List(ScalarValue::new_list(
892 &values,
893 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
894 true,
895 ))
896 }
897 } else {
898 ScalarValue::Null
899 }
900 }
901 &Type::TIMESTAMPTZ_ARRAY => {
902 let data = portal.parameter::<Vec<DateTime<FixedOffset>>>(idx, &client_type)?;
903 if let Some(data) = data {
904 if let Some(ConcreteDataType::List(list_type)) = &server_type {
905 match list_type.item_type() {
906 ConcreteDataType::Timestamp(unit) => match *unit {
907 TimestampType::Second(_) => {
908 let values = data
909 .into_iter()
910 .map(|ts| {
911 ScalarValue::TimestampSecond(Some(ts.timestamp()), None)
912 })
913 .collect::<Vec<_>>();
914 ScalarValue::List(ScalarValue::new_list(
915 &values,
916 &ArrowDataType::Timestamp(TimeUnit::Second, None),
917 true,
918 ))
919 }
920 TimestampType::Millisecond(_) => {
921 let values = data
922 .into_iter()
923 .map(|ts| {
924 ScalarValue::TimestampMillisecond(
925 Some(ts.timestamp_millis()),
926 None,
927 )
928 })
929 .collect::<Vec<_>>();
930 ScalarValue::List(ScalarValue::new_list(
931 &values,
932 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
933 true,
934 ))
935 }
936 TimestampType::Microsecond(_) => {
937 let values = data
938 .into_iter()
939 .map(|ts| {
940 ScalarValue::TimestampMicrosecond(
941 Some(ts.timestamp_micros()),
942 None,
943 )
944 })
945 .collect::<Vec<_>>();
946 ScalarValue::List(ScalarValue::new_list(
947 &values,
948 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
949 true,
950 ))
951 }
952 TimestampType::Nanosecond(_) => {
953 let values = data
954 .into_iter()
955 .filter_map(|ts| {
956 ts.timestamp_nanos_opt().map(|nanos| {
957 ScalarValue::TimestampNanosecond(Some(nanos), None)
958 })
959 })
960 .collect::<Vec<_>>();
961 ScalarValue::List(ScalarValue::new_list(
962 &values,
963 &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
964 true,
965 ))
966 }
967 },
968 _ => {
969 return Err(invalid_parameter_error(
970 "invalid_parameter_type",
971 Some(format!(
972 "Expected: {}, found: {}",
973 list_type.item_type(),
974 client_type
975 )),
976 ));
977 }
978 }
979 } else {
980 let values = data
982 .into_iter()
983 .map(|ts| {
984 ScalarValue::TimestampMillisecond(Some(ts.timestamp_millis()), None)
985 })
986 .collect::<Vec<_>>();
987 ScalarValue::List(ScalarValue::new_list(
988 &values,
989 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
990 true,
991 ))
992 }
993 } else {
994 ScalarValue::Null
995 }
996 }
997 _ => Err(invalid_parameter_error(
998 "unsupported_parameter_value",
999 Some(format!("Found type: {}", client_type)),
1000 ))?,
1001 };
1002
1003 results.push(value);
1004 }
1005
1006 Ok(results)
1007}
1008
1009pub(super) fn param_types_to_pg_types(
1010 param_types: &HashMap<String, Option<ConcreteDataType>>,
1011) -> Result<Vec<Type>> {
1012 let param_count = param_types.len();
1013 let mut types = Vec::with_capacity(param_count);
1014 for i in 0..param_count {
1015 if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1016 let pg_type = type_gt_to_pg(param_type)?;
1017 types.push(pg_type);
1018 } else {
1019 types.push(Type::UNKNOWN);
1020 }
1021 }
1022 Ok(types)
1023}
1024
1025pub fn format_options_from_query_ctx(query_ctx: &QueryContextRef) -> Arc<PgFormatOptions> {
1026 let config = query_ctx.configuration_parameter();
1027 let (date_style, date_order) = *config.pg_datetime_style();
1028
1029 let mut format_options = PgFormatOptions::default();
1030 format_options.date_style = format!("{}, {}", date_style, date_order);
1031 format_options.interval_style = config.pg_intervalstyle_format().to_string();
1032 format_options.bytea_output = config.postgres_bytea_output().to_string();
1033 format_options.time_zone = query_ctx.timezone().to_string();
1034
1035 Arc::new(format_options)
1036}
1037
1038#[cfg(test)]
1039mod test {
1040 use std::sync::Arc;
1041
1042 use arrow::array::{
1043 Float64Builder, Int64Builder, ListBuilder, StringBuilder, TimestampSecondBuilder,
1044 };
1045 use arrow_schema::{Field, IntervalUnit};
1046 use datatypes::schema::{ColumnSchema, Schema};
1047 use datatypes::vectors::{
1048 BinaryVector, BooleanVector, DateVector, Float32Vector, Float64Vector, Int8Vector,
1049 Int16Vector, Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
1050 IntervalYearMonthVector, ListVector, NullVector, StringVector, TimeSecondVector,
1051 TimestampSecondVector, UInt8Vector, UInt16Vector, UInt32Vector, UInt64Vector, VectorRef,
1052 };
1053 use pgwire::api::Type;
1054 use pgwire::api::results::{FieldFormat, FieldInfo};
1055 use session::context::QueryContextBuilder;
1056
1057 use super::*;
1058
1059 #[test]
1060 fn test_schema_convert() {
1061 let column_schemas = vec![
1062 ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1063 ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1064 ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1065 ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1066 ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1067 ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1068 ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1069 ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1070 ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1071 ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1072 ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1073 ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1074 ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1075 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1076 ColumnSchema::new(
1077 "timestamps",
1078 ConcreteDataType::timestamp_millisecond_datatype(),
1079 true,
1080 ),
1081 ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1082 ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1083 ColumnSchema::new(
1084 "intervals",
1085 ConcreteDataType::interval_month_day_nano_datatype(),
1086 true,
1087 ),
1088 ];
1089 let pg_field_info = vec![
1090 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1091 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1092 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1093 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1094 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1095 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1096 FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1097 FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1098 FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1099 FieldInfo::new(
1100 "uint64s".into(),
1101 None,
1102 None,
1103 Type::NUMERIC,
1104 FieldFormat::Text,
1105 ),
1106 FieldInfo::new(
1107 "float32s".into(),
1108 None,
1109 None,
1110 Type::FLOAT4,
1111 FieldFormat::Text,
1112 ),
1113 FieldInfo::new(
1114 "float64s".into(),
1115 None,
1116 None,
1117 Type::FLOAT8,
1118 FieldFormat::Text,
1119 ),
1120 FieldInfo::new(
1121 "binaries".into(),
1122 None,
1123 None,
1124 Type::BYTEA,
1125 FieldFormat::Text,
1126 ),
1127 FieldInfo::new(
1128 "strings".into(),
1129 None,
1130 None,
1131 Type::VARCHAR,
1132 FieldFormat::Text,
1133 ),
1134 FieldInfo::new(
1135 "timestamps".into(),
1136 None,
1137 None,
1138 Type::TIMESTAMP,
1139 FieldFormat::Text,
1140 ),
1141 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1142 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1143 FieldInfo::new(
1144 "intervals".into(),
1145 None,
1146 None,
1147 Type::INTERVAL,
1148 FieldFormat::Text,
1149 ),
1150 ];
1151 let schema = Schema::new(column_schemas);
1152 let fs = schema_to_pg(&schema, &Format::UnifiedText, None).unwrap();
1153 assert_eq!(fs, pg_field_info);
1154 }
1155
1156 #[test]
1157 fn test_encode_text_format_data() {
1158 let schema = vec![
1159 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1160 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1161 FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1162 FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1163 FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1164 FieldInfo::new(
1165 "uint64s".into(),
1166 None,
1167 None,
1168 Type::NUMERIC,
1169 FieldFormat::Text,
1170 ),
1171 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1172 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1173 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1174 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1175 FieldInfo::new(
1176 "float32s".into(),
1177 None,
1178 None,
1179 Type::FLOAT4,
1180 FieldFormat::Text,
1181 ),
1182 FieldInfo::new(
1183 "float64s".into(),
1184 None,
1185 None,
1186 Type::FLOAT8,
1187 FieldFormat::Text,
1188 ),
1189 FieldInfo::new(
1190 "strings".into(),
1191 None,
1192 None,
1193 Type::VARCHAR,
1194 FieldFormat::Text,
1195 ),
1196 FieldInfo::new(
1197 "binaries".into(),
1198 None,
1199 None,
1200 Type::BYTEA,
1201 FieldFormat::Text,
1202 ),
1203 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1204 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1205 FieldInfo::new(
1206 "timestamps".into(),
1207 None,
1208 None,
1209 Type::TIMESTAMP,
1210 FieldFormat::Text,
1211 ),
1212 FieldInfo::new(
1213 "interval_year_month".into(),
1214 None,
1215 None,
1216 Type::INTERVAL,
1217 FieldFormat::Text,
1218 ),
1219 FieldInfo::new(
1220 "interval_day_time".into(),
1221 None,
1222 None,
1223 Type::INTERVAL,
1224 FieldFormat::Text,
1225 ),
1226 FieldInfo::new(
1227 "interval_month_day_nano".into(),
1228 None,
1229 None,
1230 Type::INTERVAL,
1231 FieldFormat::Text,
1232 ),
1233 FieldInfo::new(
1234 "int_list".into(),
1235 None,
1236 None,
1237 Type::INT8_ARRAY,
1238 FieldFormat::Text,
1239 ),
1240 FieldInfo::new(
1241 "float_list".into(),
1242 None,
1243 None,
1244 Type::FLOAT8_ARRAY,
1245 FieldFormat::Text,
1246 ),
1247 FieldInfo::new(
1248 "string_list".into(),
1249 None,
1250 None,
1251 Type::VARCHAR_ARRAY,
1252 FieldFormat::Text,
1253 ),
1254 FieldInfo::new(
1255 "timestamp_list".into(),
1256 None,
1257 None,
1258 Type::TIMESTAMP_ARRAY,
1259 FieldFormat::Text,
1260 ),
1261 ];
1262
1263 let arrow_schema = arrow_schema::Schema::new(vec![
1264 Field::new("x", DataType::Null, true),
1265 Field::new("x", DataType::Boolean, true),
1266 Field::new("x", DataType::UInt8, true),
1267 Field::new("x", DataType::UInt16, true),
1268 Field::new("x", DataType::UInt32, true),
1269 Field::new("x", DataType::UInt64, true),
1270 Field::new("x", DataType::Int8, true),
1271 Field::new("x", DataType::Int16, true),
1272 Field::new("x", DataType::Int32, true),
1273 Field::new("x", DataType::Int64, true),
1274 Field::new("x", DataType::Float32, true),
1275 Field::new("x", DataType::Float64, true),
1276 Field::new("x", DataType::Utf8, true),
1277 Field::new("x", DataType::Binary, true),
1278 Field::new("x", DataType::Date32, true),
1279 Field::new("x", DataType::Time32(TimeUnit::Second), true),
1280 Field::new("x", DataType::Timestamp(TimeUnit::Second, None), true),
1281 Field::new("x", DataType::Interval(IntervalUnit::YearMonth), true),
1282 Field::new("x", DataType::Interval(IntervalUnit::DayTime), true),
1283 Field::new("x", DataType::Interval(IntervalUnit::MonthDayNano), true),
1284 Field::new(
1285 "x",
1286 DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
1287 true,
1288 ),
1289 Field::new(
1290 "x",
1291 DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
1292 true,
1293 ),
1294 Field::new(
1295 "x",
1296 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
1297 true,
1298 ),
1299 Field::new(
1300 "x",
1301 DataType::List(Arc::new(Field::new(
1302 "item",
1303 DataType::Timestamp(TimeUnit::Second, None),
1304 true,
1305 ))),
1306 true,
1307 ),
1308 ]);
1309
1310 let mut builder = ListBuilder::new(Int64Builder::new());
1311 builder.append_value([Some(1i64), None, Some(2)]);
1312 builder.append_null();
1313 builder.append_value([Some(-1i64), None, Some(-2)]);
1314 let i64_list_array = builder.finish();
1315
1316 let mut builder = ListBuilder::new(Float64Builder::new());
1317 builder.append_value([Some(1.0f64), None, Some(2.0)]);
1318 builder.append_null();
1319 builder.append_value([Some(-1.0f64), None, Some(-2.0)]);
1320 let f64_list_array = builder.finish();
1321
1322 let mut builder = ListBuilder::new(StringBuilder::new());
1323 builder.append_value([Some("a"), None, Some("b")]);
1324 builder.append_null();
1325 builder.append_value([Some("c"), None, Some("d")]);
1326 let string_list_array = builder.finish();
1327
1328 let mut builder = ListBuilder::new(TimestampSecondBuilder::new());
1329 builder.append_value([Some(1i64), None, Some(2)]);
1330 builder.append_null();
1331 builder.append_value([Some(3i64), None, Some(4)]);
1332 let timestamp_list_array = builder.finish();
1333
1334 let values = vec![
1335 Arc::new(NullVector::new(3)) as VectorRef,
1336 Arc::new(BooleanVector::from(vec![Some(true), Some(false), None])),
1337 Arc::new(UInt8Vector::from(vec![Some(u8::MAX), Some(u8::MIN), None])),
1338 Arc::new(UInt16Vector::from(vec![
1339 Some(u16::MAX),
1340 Some(u16::MIN),
1341 None,
1342 ])),
1343 Arc::new(UInt32Vector::from(vec![
1344 Some(u32::MAX),
1345 Some(u32::MIN),
1346 None,
1347 ])),
1348 Arc::new(UInt64Vector::from(vec![
1349 Some(u64::MAX),
1350 Some(u64::MIN),
1351 None,
1352 ])),
1353 Arc::new(Int8Vector::from(vec![Some(i8::MAX), Some(i8::MIN), None])),
1354 Arc::new(Int16Vector::from(vec![
1355 Some(i16::MAX),
1356 Some(i16::MIN),
1357 None,
1358 ])),
1359 Arc::new(Int32Vector::from(vec![
1360 Some(i32::MAX),
1361 Some(i32::MIN),
1362 None,
1363 ])),
1364 Arc::new(Int64Vector::from(vec![
1365 Some(i64::MAX),
1366 Some(i64::MIN),
1367 None,
1368 ])),
1369 Arc::new(Float32Vector::from(vec![
1370 None,
1371 Some(f32::MAX),
1372 Some(f32::MIN),
1373 ])),
1374 Arc::new(Float64Vector::from(vec![
1375 None,
1376 Some(f64::MAX),
1377 Some(f64::MIN),
1378 ])),
1379 Arc::new(StringVector::from(vec![
1380 None,
1381 Some("hello"),
1382 Some("greptime"),
1383 ])),
1384 Arc::new(BinaryVector::from(vec![
1385 None,
1386 Some("hello".as_bytes().to_vec()),
1387 Some("world".as_bytes().to_vec()),
1388 ])),
1389 Arc::new(DateVector::from(vec![Some(1001), None, Some(1)])),
1390 Arc::new(TimeSecondVector::from(vec![Some(1001), None, Some(1)])),
1391 Arc::new(TimestampSecondVector::from(vec![
1392 Some(1000001),
1393 None,
1394 Some(1),
1395 ])),
1396 Arc::new(IntervalYearMonthVector::from(vec![Some(1), None, Some(2)])),
1397 Arc::new(IntervalDayTimeVector::from(vec![
1398 Some(arrow::datatypes::IntervalDayTime::new(1, 1)),
1399 None,
1400 Some(arrow::datatypes::IntervalDayTime::new(2, 2)),
1401 ])),
1402 Arc::new(IntervalMonthDayNanoVector::from(vec![
1403 Some(arrow::datatypes::IntervalMonthDayNano::new(1, 1, 10)),
1404 None,
1405 Some(arrow::datatypes::IntervalMonthDayNano::new(2, 2, 20)),
1406 ])),
1407 Arc::new(ListVector::from(i64_list_array)),
1408 Arc::new(ListVector::from(f64_list_array)),
1409 Arc::new(ListVector::from(string_list_array)),
1410 Arc::new(ListVector::from(timestamp_list_array)),
1411 ];
1412 let record_batch =
1413 RecordBatch::new(Arc::new(arrow_schema.try_into().unwrap()), values).unwrap();
1414
1415 let query_context = QueryContextBuilder::default()
1416 .configuration_parameter(Default::default())
1417 .build()
1418 .into();
1419 let schema = Arc::new(schema);
1420
1421 let rows = RecordBatchRowIterator::new(query_context, schema.clone(), record_batch)
1422 .filter_map(|x| x.ok())
1423 .collect::<Vec<_>>();
1424 assert_eq!(rows.len(), 3);
1425 for row in rows {
1426 assert_eq!(row.field_count, schema.len() as i16);
1427 }
1428 }
1429
1430 #[test]
1431 fn test_invalid_parameter() {
1432 let msg = "invalid_parameter_count";
1434 let error = invalid_parameter_error(msg, None);
1435 if let PgWireError::UserError(value) = error {
1436 assert_eq!("ERROR", value.severity);
1437 assert_eq!("22023", value.code);
1438 assert_eq!(msg, value.message);
1439 } else {
1440 panic!("test_invalid_parameter failed");
1441 }
1442 }
1443}