1mod error;
16
17use std::collections::HashMap;
18use std::pin::Pin;
19use std::sync::Arc;
20use std::task::{Context, Poll};
21
22use arrow::array::{Array, AsArray};
23use arrow_pg::encoder::{Encoder, encode_value};
24use arrow_pg::list_encoder::encode_list;
25use arrow_schema::{DataType, TimeUnit};
26use chrono::{DateTime, FixedOffset, NaiveDate, NaiveDateTime};
27use common_recordbatch::RecordBatch;
28use common_recordbatch::error::Result as RecordBatchResult;
29use common_time::{IntervalDayTime, IntervalMonthDayNano, IntervalYearMonth};
30use datafusion_common::ScalarValue;
31use datafusion_expr::LogicalPlan;
32use datatypes::arrow::datatypes::DataType as ArrowDataType;
33use datatypes::json::JsonStructureSettings;
34use datatypes::prelude::{ConcreteDataType, Value};
35use datatypes::schema::{Schema, SchemaRef};
36use datatypes::types::{IntervalType, TimestampType, jsonb_to_string};
37use datatypes::value::StructValue;
38use futures::Stream;
39use pg_interval::Interval as PgInterval;
40use pgwire::api::Type;
41use pgwire::api::portal::{Format, Portal};
42use pgwire::api::results::FieldInfo;
43use pgwire::error::{PgWireError, PgWireResult};
44use pgwire::types::format::FormatOptions as PgFormatOptions;
45use query::planner::DfLogicalPlanner;
46use session::context::QueryContextRef;
47use snafu::ResultExt;
48
49pub use self::error::{PgErrorCode, PgErrorSeverity};
50use crate::error::{self as server_error, InferParameterTypesSnafu, Result};
51use crate::postgres::handler::PgSqlPlan;
52use crate::postgres::utils::convert_err;
53
54pub(super) fn schema_to_pg(
55 origin: &Schema,
56 field_formats: &Format,
57 format_options: Option<Arc<PgFormatOptions>>,
58) -> Result<Vec<FieldInfo>> {
59 origin
60 .column_schemas()
61 .iter()
62 .enumerate()
63 .map(|(idx, col)| {
64 let mut field_info = FieldInfo::new(
65 col.name.clone(),
66 None,
67 None,
68 type_gt_to_pg(&col.data_type)?,
69 field_formats.format_for(idx),
70 );
71 if let Some(format_options) = &format_options {
72 field_info = field_info.with_format_options(format_options.clone());
73 }
74 Ok(field_info)
75 })
76 .collect::<Result<Vec<FieldInfo>>>()
77}
78
79fn encode_struct<S: Encoder>(
89 _query_ctx: &QueryContextRef,
90 struct_value: StructValue,
91 builder: &mut S,
92 pg_field: &FieldInfo,
93) -> PgWireResult<()> {
94 let encoding_setting = JsonStructureSettings::Structured(None);
95 let json_value = encoding_setting
96 .decode(Value::Struct(struct_value))
97 .map_err(|e| PgWireError::ApiError(Box::new(e)))?;
98
99 builder.encode_field(&json_value, pg_field)
100}
101
102pub(crate) struct RecordBatchRowStream<S, B>
103where
104 S: Encoder,
105 B: Stream<Item = RecordBatchResult<RecordBatch>>,
106{
107 query_ctx: QueryContextRef,
108 pg_schema: Arc<Vec<FieldInfo>>,
109 schema: SchemaRef,
110 record_batches: Pin<Box<B>>,
111 encoder: S,
112}
113
114impl<S, B> Stream for RecordBatchRowStream<S, B>
115where
116 S: Encoder + Unpin,
117 B: Stream<Item = RecordBatchResult<RecordBatch>>,
118{
119 type Item = PgWireResult<Vec<S::Item>>;
120
121 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
122 match self.record_batches.as_mut().poll_next(cx) {
123 Poll::Ready(Some(Ok(batch))) => {
124 let record_batch = batch.into_df_record_batch();
125 let num_rows = record_batch.num_rows();
126
127 if num_rows == 0 {
128 return Poll::Ready(Some(Ok(vec![])));
129 }
130
131 let arrow_schema = record_batch.schema();
132 let query_ctx = self.query_ctx.clone();
133 let pg_schema = self.pg_schema.clone();
134 let schema = self.schema.clone();
135 let mut results = Vec::with_capacity(num_rows);
136
137 for i in 0..num_rows {
138 if let Err(e) = Self::encode_row(
139 &query_ctx,
140 &pg_schema,
141 &schema,
142 arrow_schema.as_ref(),
143 &mut self.encoder,
144 &record_batch,
145 i,
146 ) {
147 return Poll::Ready(Some(Err(e)));
148 }
149 results.push(self.encoder.take_row());
150 }
151
152 Poll::Ready(Some(Ok(results)))
153 }
154 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(convert_err(e)))),
155 Poll::Ready(None) => Poll::Ready(None),
156 Poll::Pending => Poll::Pending,
157 }
158 }
159}
160
161impl<S, B> RecordBatchRowStream<S, B>
162where
163 S: Encoder,
164 B: Stream<Item = RecordBatchResult<RecordBatch>>,
165{
166 pub(crate) fn new(
167 query_ctx: QueryContextRef,
168 pg_schema: Arc<Vec<FieldInfo>>,
169 schema: SchemaRef,
170 record_batches: B,
171 encoder: S,
172 ) -> Self {
173 Self {
174 query_ctx,
175 pg_schema,
176 schema,
177 record_batches: Box::pin(record_batches),
178 encoder,
179 }
180 }
181
182 fn encode_row(
183 query_ctx: &QueryContextRef,
184 pg_schema: &Arc<Vec<FieldInfo>>,
185 schema: &SchemaRef,
186 arrow_schema: &arrow::datatypes::Schema,
187 encoder: &mut S,
188 record_batch: &arrow::record_batch::RecordBatch,
189 i: usize,
190 ) -> PgWireResult<()> {
191 for (j, column) in record_batch.columns().iter().enumerate() {
192 let pg_field = &pg_schema[j];
193
194 if column.is_null(i) {
195 encoder.encode_field(&None::<&i8>, pg_field)?;
196 continue;
197 }
198
199 match column.data_type() {
200 DataType::Binary | DataType::LargeBinary | DataType::BinaryView => {
202 if let ConcreteDataType::Json(_) = &schema.column_schemas()[j].data_type {
204 let v = datatypes::arrow_array::binary_array_value(column, i);
205 let s = jsonb_to_string(v).map_err(convert_err)?;
206 encoder.encode_field(&s, pg_field)?;
207 } else {
208 let arrow_field = arrow_schema.field(j);
210 encode_value(encoder, column, i, arrow_field, pg_field)?;
211 }
212 }
213
214 DataType::List(_) => {
215 let array = column.as_list::<i32>();
216 let items = array.value(i);
217
218 encode_list(encoder, items, pg_field)?;
219 }
220 DataType::Struct(_) => {
221 encode_struct(query_ctx, Default::default(), encoder, pg_field)?;
222 }
223 _ => {
224 let arrow_field = arrow_schema.field(j);
226 encode_value(encoder, column, i, arrow_field, pg_field)?;
227 }
228 }
229 }
230 Ok(())
231 }
232}
233
234pub(super) fn type_gt_to_pg(origin: &ConcreteDataType) -> Result<Type> {
235 match origin {
236 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
237 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL),
238 &ConcreteDataType::Int8(_) => Ok(Type::CHAR),
239 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2),
240 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4),
241 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8),
242 &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC),
243 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4),
244 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8),
245 &ConcreteDataType::Binary(_) | &ConcreteDataType::Vector(_) => Ok(Type::BYTEA),
246 &ConcreteDataType::String(_) => Ok(Type::VARCHAR),
247 &ConcreteDataType::Date(_) => Ok(Type::DATE),
248 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP),
249 &ConcreteDataType::Time(_) => Ok(Type::TIME),
250 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL),
251 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC),
252 &ConcreteDataType::Json(_) => Ok(Type::JSON),
253 ConcreteDataType::List(list) => match list.item_type() {
254 &ConcreteDataType::Null(_) => Ok(Type::UNKNOWN),
255 &ConcreteDataType::Boolean(_) => Ok(Type::BOOL_ARRAY),
256 &ConcreteDataType::Int8(_) => Ok(Type::CHAR_ARRAY),
257 &ConcreteDataType::Int16(_) | &ConcreteDataType::UInt8(_) => Ok(Type::INT2_ARRAY),
258 &ConcreteDataType::Int32(_) | &ConcreteDataType::UInt16(_) => Ok(Type::INT4_ARRAY),
259 &ConcreteDataType::Int64(_) | &ConcreteDataType::UInt32(_) => Ok(Type::INT8_ARRAY),
260 &ConcreteDataType::UInt64(_) => Ok(Type::NUMERIC_ARRAY),
261 &ConcreteDataType::Float32(_) => Ok(Type::FLOAT4_ARRAY),
262 &ConcreteDataType::Float64(_) => Ok(Type::FLOAT8_ARRAY),
263 &ConcreteDataType::Binary(_) => Ok(Type::BYTEA_ARRAY),
264 &ConcreteDataType::String(_) => Ok(Type::VARCHAR_ARRAY),
265 &ConcreteDataType::Date(_) => Ok(Type::DATE_ARRAY),
266 &ConcreteDataType::Timestamp(_) => Ok(Type::TIMESTAMP_ARRAY),
267 &ConcreteDataType::Time(_) => Ok(Type::TIME_ARRAY),
268 &ConcreteDataType::Interval(_) => Ok(Type::INTERVAL_ARRAY),
269 &ConcreteDataType::Decimal128(_) => Ok(Type::NUMERIC_ARRAY),
270 &ConcreteDataType::Json(_) => Ok(Type::JSON_ARRAY),
271 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL_ARRAY),
272 &ConcreteDataType::Struct(_) => Ok(Type::JSON_ARRAY),
273 &ConcreteDataType::Dictionary(_)
274 | &ConcreteDataType::Vector(_)
275 | &ConcreteDataType::List(_) => server_error::UnsupportedDataTypeSnafu {
276 data_type: origin,
277 reason: "not implemented",
278 }
279 .fail(),
280 },
281 &ConcreteDataType::Dictionary(_) => server_error::UnsupportedDataTypeSnafu {
282 data_type: origin,
283 reason: "not implemented",
284 }
285 .fail(),
286 &ConcreteDataType::Duration(_) => Ok(Type::INTERVAL),
287 &ConcreteDataType::Struct(_) => Ok(Type::JSON),
288 }
289}
290
291#[allow(dead_code)]
292pub(super) fn type_pg_to_gt(origin: &Type) -> Result<ConcreteDataType> {
293 match origin {
295 &Type::BOOL => Ok(ConcreteDataType::boolean_datatype()),
296 &Type::CHAR => Ok(ConcreteDataType::int8_datatype()),
297 &Type::INT2 => Ok(ConcreteDataType::int16_datatype()),
298 &Type::INT4 => Ok(ConcreteDataType::int32_datatype()),
299 &Type::INT8 => Ok(ConcreteDataType::int64_datatype()),
300 &Type::VARCHAR | &Type::TEXT => Ok(ConcreteDataType::string_datatype()),
301 &Type::TIMESTAMP | &Type::TIMESTAMPTZ => Ok(ConcreteDataType::timestamp_datatype(
302 common_time::timestamp::TimeUnit::Millisecond,
303 )),
304 &Type::DATE => Ok(ConcreteDataType::date_datatype()),
305 &Type::TIME => Ok(ConcreteDataType::timestamp_datatype(
306 common_time::timestamp::TimeUnit::Microsecond,
307 )),
308 &Type::CHAR_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
309 ConcreteDataType::int8_datatype(),
310 ))),
311 &Type::INT2_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
312 ConcreteDataType::int16_datatype(),
313 ))),
314 &Type::INT4_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
315 ConcreteDataType::int32_datatype(),
316 ))),
317 &Type::INT8_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
318 ConcreteDataType::int64_datatype(),
319 ))),
320 &Type::VARCHAR_ARRAY => Ok(ConcreteDataType::list_datatype(Arc::new(
321 ConcreteDataType::string_datatype(),
322 ))),
323 _ => server_error::InternalSnafu {
324 err_msg: format!("unimplemented datatype {origin:?}"),
325 }
326 .fail(),
327 }
328}
329
330pub(super) fn parameter_to_string(portal: &Portal<PgSqlPlan>, idx: usize) -> PgWireResult<String> {
331 let param_type = portal
334 .statement
335 .parameter_types
336 .get(idx)
337 .unwrap()
338 .as_ref()
339 .unwrap_or(&Type::UNKNOWN);
340 match param_type {
341 &Type::VARCHAR | &Type::TEXT => Ok(format!(
342 "'{}'",
343 portal
344 .parameter::<String>(idx, param_type)?
345 .as_deref()
346 .unwrap_or("")
347 )),
348 &Type::BOOL => Ok(portal
349 .parameter::<bool>(idx, param_type)?
350 .map(|v| v.to_string())
351 .unwrap_or_else(|| "".to_owned())),
352 &Type::INT4 => Ok(portal
353 .parameter::<i32>(idx, param_type)?
354 .map(|v| v.to_string())
355 .unwrap_or_else(|| "".to_owned())),
356 &Type::INT8 => Ok(portal
357 .parameter::<i64>(idx, param_type)?
358 .map(|v| v.to_string())
359 .unwrap_or_else(|| "".to_owned())),
360 &Type::FLOAT4 => Ok(portal
361 .parameter::<f32>(idx, param_type)?
362 .map(|v| v.to_string())
363 .unwrap_or_else(|| "".to_owned())),
364 &Type::FLOAT8 => Ok(portal
365 .parameter::<f64>(idx, param_type)?
366 .map(|v| v.to_string())
367 .unwrap_or_else(|| "".to_owned())),
368 &Type::DATE => Ok(portal
369 .parameter::<NaiveDate>(idx, param_type)?
370 .map(|v| v.format("%Y-%m-%d").to_string())
371 .unwrap_or_else(|| "".to_owned())),
372 &Type::TIMESTAMP => Ok(portal
373 .parameter::<NaiveDateTime>(idx, param_type)?
374 .map(|v| v.format("%Y-%m-%d %H:%M:%S%.6f").to_string())
375 .unwrap_or_else(|| "".to_owned())),
376 &Type::INTERVAL => Ok(portal
377 .parameter::<PgInterval>(idx, param_type)?
378 .map(|v| v.to_sql())
379 .unwrap_or_else(|| "".to_owned())),
380 _ => Err(invalid_parameter_error(
381 "unsupported_parameter_type",
382 Some(param_type.to_string()),
383 )),
384 }
385}
386
387pub(super) fn invalid_parameter_error(msg: &str, detail: Option<String>) -> PgWireError {
388 let mut error_info = PgErrorCode::Ec22023.to_err_info(msg.to_string());
389 error_info.detail = detail;
390 PgWireError::UserError(Box::new(error_info))
391}
392
393fn to_timestamp_scalar_value<T>(
394 data: Option<T>,
395 unit: &TimestampType,
396 ctype: &ConcreteDataType,
397) -> PgWireResult<ScalarValue>
398where
399 T: Into<i64>,
400{
401 if let Some(n) = data {
402 Value::Timestamp(unit.create_timestamp(n.into()))
403 .try_to_scalar_value(ctype)
404 .map_err(convert_err)
405 } else {
406 Ok(ScalarValue::Null)
407 }
408}
409
410pub(super) fn parameters_to_scalar_values(
411 plan: &LogicalPlan,
412 portal: &Portal<PgSqlPlan>,
413) -> PgWireResult<Vec<ScalarValue>> {
414 let param_count = portal.parameter_len();
415 let mut results = Vec::with_capacity(param_count);
416
417 let client_param_types = &portal.statement.parameter_types;
418 let server_param_types = DfLogicalPlanner::get_inferred_parameter_types(plan)
419 .context(InferParameterTypesSnafu)
420 .map_err(convert_err)?
421 .into_iter()
422 .map(|(k, v)| (k, v.map(|v| ConcreteDataType::from_arrow_type(&v))))
423 .collect::<HashMap<_, _>>();
424
425 for idx in 0..param_count {
426 let server_type = server_param_types
427 .get(&format!("${}", idx + 1))
428 .and_then(|t| t.as_ref());
429
430 let client_type = if let Some(Some(client_given_type)) = client_param_types.get(idx) {
431 client_given_type.clone()
432 } else if let Some(server_provided_type) = &server_type {
433 type_gt_to_pg(server_provided_type).map_err(convert_err)?
434 } else {
435 return Err(invalid_parameter_error(
436 "unknown_parameter_type",
437 Some(format!(
438 "Cannot get parameter type information for parameter {}",
439 idx
440 )),
441 ));
442 };
443
444 let value = match &client_type {
445 &Type::VARCHAR | &Type::TEXT => {
446 let data = portal.parameter::<String>(idx, &client_type)?;
447 if let Some(server_type) = &server_type {
448 match server_type {
449 ConcreteDataType::String(t) => {
450 if t.is_large() {
451 ScalarValue::LargeUtf8(data)
452 } else {
453 ScalarValue::Utf8(data)
454 }
455 }
456 _ => {
457 return Err(invalid_parameter_error(
458 "invalid_parameter_type",
459 Some(format!("Expected: {}, found: {}", server_type, client_type)),
460 ));
461 }
462 }
463 } else {
464 ScalarValue::Utf8(data)
465 }
466 }
467 &Type::BOOL => {
468 let data = portal.parameter::<bool>(idx, &client_type)?;
469 if let Some(server_type) = &server_type {
470 match server_type {
471 ConcreteDataType::Boolean(_) => ScalarValue::Boolean(data),
472 _ => {
473 return Err(invalid_parameter_error(
474 "invalid_parameter_type",
475 Some(format!("Expected: {}, found: {}", server_type, client_type)),
476 ));
477 }
478 }
479 } else {
480 ScalarValue::Boolean(data)
481 }
482 }
483 &Type::INT2 => {
484 let data = portal.parameter::<i16>(idx, &client_type)?;
485 if let Some(server_type) = &server_type {
486 match server_type {
487 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
488 ConcreteDataType::Int16(_) => ScalarValue::Int16(data),
489 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
490 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
491 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
492 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
493 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
494 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
495 ConcreteDataType::Timestamp(unit) => {
496 to_timestamp_scalar_value(data, unit, server_type)?
497 }
498 _ => {
499 return Err(invalid_parameter_error(
500 "invalid_parameter_type",
501 Some(format!("Expected: {}, found: {}", server_type, client_type)),
502 ));
503 }
504 }
505 } else {
506 ScalarValue::Int16(data)
507 }
508 }
509 &Type::INT4 => {
510 let data = portal.parameter::<i32>(idx, &client_type)?;
511 if let Some(server_type) = &server_type {
512 match server_type {
513 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
514 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
515 ConcreteDataType::Int32(_) => ScalarValue::Int32(data),
516 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
517 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
518 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
519 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
520 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
521 ConcreteDataType::Timestamp(unit) => {
522 to_timestamp_scalar_value(data, unit, server_type)?
523 }
524 _ => {
525 return Err(invalid_parameter_error(
526 "invalid_parameter_type",
527 Some(format!("Expected: {}, found: {}", server_type, client_type)),
528 ));
529 }
530 }
531 } else {
532 ScalarValue::Int32(data)
533 }
534 }
535 &Type::INT8 => {
536 let data = portal.parameter::<i64>(idx, &client_type)?;
537 if let Some(server_type) = &server_type {
538 match server_type {
539 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
540 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
541 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
542 ConcreteDataType::Int64(_) => ScalarValue::Int64(data),
543 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
544 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
545 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
546 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
547 ConcreteDataType::Timestamp(unit) => {
548 to_timestamp_scalar_value(data, unit, server_type)?
549 }
550 _ => {
551 return Err(invalid_parameter_error(
552 "invalid_parameter_type",
553 Some(format!("Expected: {}, found: {}", server_type, client_type)),
554 ));
555 }
556 }
557 } else {
558 ScalarValue::Int64(data)
559 }
560 }
561 &Type::FLOAT4 => {
562 let data = portal.parameter::<f32>(idx, &client_type)?;
563 if let Some(server_type) = &server_type {
564 match server_type {
565 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
566 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
567 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
568 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
569 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
570 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
571 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
572 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
573 ConcreteDataType::Float32(_) => ScalarValue::Float32(data),
574 ConcreteDataType::Float64(_) => {
575 ScalarValue::Float64(data.map(|n| n as f64))
576 }
577 _ => {
578 return Err(invalid_parameter_error(
579 "invalid_parameter_type",
580 Some(format!("Expected: {}, found: {}", server_type, client_type)),
581 ));
582 }
583 }
584 } else {
585 ScalarValue::Float32(data)
586 }
587 }
588 &Type::FLOAT8 => {
589 let data = portal.parameter::<f64>(idx, &client_type)?;
590 if let Some(server_type) = &server_type {
591 match server_type {
592 ConcreteDataType::Int8(_) => ScalarValue::Int8(data.map(|n| n as i8)),
593 ConcreteDataType::Int16(_) => ScalarValue::Int16(data.map(|n| n as i16)),
594 ConcreteDataType::Int32(_) => ScalarValue::Int32(data.map(|n| n as i32)),
595 ConcreteDataType::Int64(_) => ScalarValue::Int64(data.map(|n| n as i64)),
596 ConcreteDataType::UInt8(_) => ScalarValue::UInt8(data.map(|n| n as u8)),
597 ConcreteDataType::UInt16(_) => ScalarValue::UInt16(data.map(|n| n as u16)),
598 ConcreteDataType::UInt32(_) => ScalarValue::UInt32(data.map(|n| n as u32)),
599 ConcreteDataType::UInt64(_) => ScalarValue::UInt64(data.map(|n| n as u64)),
600 ConcreteDataType::Float32(_) => {
601 ScalarValue::Float32(data.map(|n| n as f32))
602 }
603 ConcreteDataType::Float64(_) => ScalarValue::Float64(data),
604 _ => {
605 return Err(invalid_parameter_error(
606 "invalid_parameter_type",
607 Some(format!("Expected: {}, found: {}", server_type, client_type)),
608 ));
609 }
610 }
611 } else {
612 ScalarValue::Float64(data)
613 }
614 }
615 &Type::TIMESTAMP => {
616 let data = portal.parameter::<NaiveDateTime>(idx, &client_type)?;
617 if let Some(server_type) = &server_type {
618 match server_type {
619 ConcreteDataType::Timestamp(unit) => match *unit {
620 TimestampType::Second(_) => ScalarValue::TimestampSecond(
621 data.map(|ts| ts.and_utc().timestamp()),
622 None,
623 ),
624 TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
625 data.map(|ts| ts.and_utc().timestamp_millis()),
626 None,
627 ),
628 TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
629 data.map(|ts| ts.and_utc().timestamp_micros()),
630 None,
631 ),
632 TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
633 data.and_then(|ts| ts.and_utc().timestamp_nanos_opt()),
634 None,
635 ),
636 },
637 _ => {
638 return Err(invalid_parameter_error(
639 "invalid_parameter_type",
640 Some(format!("Expected: {}, found: {}", server_type, client_type)),
641 ));
642 }
643 }
644 } else {
645 ScalarValue::TimestampMillisecond(
646 data.map(|ts| ts.and_utc().timestamp_millis()),
647 None,
648 )
649 }
650 }
651 &Type::TIMESTAMPTZ => {
652 let data = portal.parameter::<DateTime<FixedOffset>>(idx, &client_type)?;
653 if let Some(server_type) = &server_type {
654 match server_type {
655 ConcreteDataType::Timestamp(unit) => match *unit {
656 TimestampType::Second(_) => {
657 ScalarValue::TimestampSecond(data.map(|ts| ts.timestamp()), None)
658 }
659 TimestampType::Millisecond(_) => ScalarValue::TimestampMillisecond(
660 data.map(|ts| ts.timestamp_millis()),
661 None,
662 ),
663 TimestampType::Microsecond(_) => ScalarValue::TimestampMicrosecond(
664 data.map(|ts| ts.timestamp_micros()),
665 None,
666 ),
667 TimestampType::Nanosecond(_) => ScalarValue::TimestampNanosecond(
668 data.and_then(|ts| ts.timestamp_nanos_opt()),
669 None,
670 ),
671 },
672 _ => {
673 return Err(invalid_parameter_error(
674 "invalid_parameter_type",
675 Some(format!("Expected: {}, found: {}", server_type, client_type)),
676 ));
677 }
678 }
679 } else {
680 ScalarValue::TimestampMillisecond(data.map(|ts| ts.timestamp_millis()), None)
681 }
682 }
683 &Type::DATE => {
684 let data = portal.parameter::<NaiveDate>(idx, &client_type)?;
685 if let Some(server_type) = &server_type {
686 match server_type {
687 ConcreteDataType::Date(_) => ScalarValue::Date32(
688 data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
689 ),
690 _ => {
691 return Err(invalid_parameter_error(
692 "invalid_parameter_type",
693 Some(format!("Expected: {}, found: {}", server_type, client_type)),
694 ));
695 }
696 }
697 } else {
698 ScalarValue::Date32(
699 data.map(|d| (d - DateTime::UNIX_EPOCH.date_naive()).num_days() as i32),
700 )
701 }
702 }
703 &Type::INTERVAL => {
704 let data = portal.parameter::<PgInterval>(idx, &client_type)?;
705 if let Some(server_type) = &server_type {
706 match server_type {
707 ConcreteDataType::Interval(IntervalType::YearMonth(_)) => {
708 ScalarValue::IntervalYearMonth(
709 data.map(|i| {
710 if i.days != 0 || i.microseconds != 0 {
711 Err(invalid_parameter_error(
712 "invalid_parameter_type",
713 Some(format!(
714 "Expected: {}, found: {}",
715 server_type, client_type
716 )),
717 ))
718 } else {
719 Ok(IntervalYearMonth::new(i.months).to_i32())
720 }
721 })
722 .transpose()?,
723 )
724 }
725 ConcreteDataType::Interval(IntervalType::DayTime(_)) => {
726 ScalarValue::IntervalDayTime(
727 data.map(|i| {
728 if i.months != 0 || i.microseconds % 1000 != 0 {
729 Err(invalid_parameter_error(
730 "invalid_parameter_type",
731 Some(format!(
732 "Expected: {}, found: {}",
733 server_type, client_type
734 )),
735 ))
736 } else {
737 Ok(IntervalDayTime::new(
738 i.days,
739 (i.microseconds / 1000) as i32,
740 )
741 .into())
742 }
743 })
744 .transpose()?,
745 )
746 }
747 ConcreteDataType::Interval(IntervalType::MonthDayNano(_)) => {
748 ScalarValue::IntervalMonthDayNano(data.map(|i| {
749 IntervalMonthDayNano::new(
750 i.months,
751 i.days,
752 i.microseconds * 1_000i64,
753 )
754 .into()
755 }))
756 }
757 _ => {
758 return Err(invalid_parameter_error(
759 "invalid_parameter_type",
760 Some(format!("Expected: {}, found: {}", server_type, client_type)),
761 ));
762 }
763 }
764 } else {
765 ScalarValue::IntervalMonthDayNano(data.map(|i| {
766 IntervalMonthDayNano::new(i.months, i.days, i.microseconds * 1_000i64)
767 .into()
768 }))
769 }
770 }
771 &Type::BYTEA => {
772 let data = portal.parameter::<Vec<u8>>(idx, &client_type)?;
773 if let Some(server_type) = &server_type {
774 match server_type {
775 ConcreteDataType::String(t) => {
776 let s = data.map(|d| String::from_utf8_lossy(&d).to_string());
777 if t.is_large() {
778 ScalarValue::LargeUtf8(s)
779 } else {
780 ScalarValue::Utf8(s)
781 }
782 }
783 ConcreteDataType::Binary(_) => ScalarValue::Binary(data),
784 _ => {
785 return Err(invalid_parameter_error(
786 "invalid_parameter_type",
787 Some(format!("Expected: {}, found: {}", server_type, client_type)),
788 ));
789 }
790 }
791 } else {
792 ScalarValue::Binary(data)
793 }
794 }
795 &Type::JSONB => {
796 let data = portal.parameter::<serde_json::Value>(idx, &client_type)?;
797 if let Some(server_type) = &server_type {
798 match server_type {
799 ConcreteDataType::Binary(_) => {
800 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
801 }
802 _ => {
803 return Err(invalid_parameter_error(
804 "invalid_parameter_type",
805 Some(format!("Expected: {}, found: {}", server_type, client_type)),
806 ));
807 }
808 }
809 } else {
810 ScalarValue::Binary(data.map(|d| d.to_string().into_bytes()))
811 }
812 }
813 &Type::INT2_ARRAY => {
814 let data = portal.parameter::<Vec<Option<i16>>>(idx, &client_type)?;
815 if let Some(data) = data {
816 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
817 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int16, true))
818 } else {
819 ScalarValue::Null
820 }
821 }
822 &Type::INT4_ARRAY => {
823 let data = portal.parameter::<Vec<Option<i32>>>(idx, &client_type)?;
824 if let Some(data) = data {
825 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
826 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int32, true))
827 } else {
828 ScalarValue::Null
829 }
830 }
831 &Type::INT8_ARRAY => {
832 let data = portal.parameter::<Vec<Option<i64>>>(idx, &client_type)?;
833 if let Some(data) = data {
834 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
835 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Int64, true))
836 } else {
837 ScalarValue::Null
838 }
839 }
840 &Type::VARCHAR_ARRAY => {
841 let data = portal.parameter::<Vec<Option<String>>>(idx, &client_type)?;
842 if let Some(data) = data {
843 let values = data.into_iter().map(|i| i.into()).collect::<Vec<_>>();
844 ScalarValue::List(ScalarValue::new_list(&values, &ArrowDataType::Utf8, true))
845 } else {
846 ScalarValue::Null
847 }
848 }
849 &Type::TIMESTAMP_ARRAY => {
850 let data = portal.parameter::<Vec<Option<NaiveDateTime>>>(idx, &client_type)?;
851 if let Some(data) = data {
852 if let Some(ConcreteDataType::List(list_type)) = &server_type {
853 match list_type.item_type() {
854 ConcreteDataType::Timestamp(unit) => match *unit {
855 TimestampType::Second(_) => {
856 let values = data
857 .into_iter()
858 .map(|ts| {
859 ScalarValue::TimestampSecond(
860 ts.map(|ts| ts.and_utc().timestamp()),
861 None,
862 )
863 })
864 .collect::<Vec<_>>();
865 ScalarValue::List(ScalarValue::new_list(
866 &values,
867 &ArrowDataType::Timestamp(TimeUnit::Second, None),
868 true,
869 ))
870 }
871 TimestampType::Millisecond(_) => {
872 let values = data
873 .into_iter()
874 .map(|ts| {
875 ScalarValue::TimestampMillisecond(
876 ts.map(|ts| ts.and_utc().timestamp_millis()),
877 None,
878 )
879 })
880 .collect::<Vec<_>>();
881 ScalarValue::List(ScalarValue::new_list(
882 &values,
883 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
884 true,
885 ))
886 }
887 TimestampType::Microsecond(_) => {
888 let values = data
889 .into_iter()
890 .map(|ts| {
891 ScalarValue::TimestampMicrosecond(
892 ts.map(|ts| ts.and_utc().timestamp_micros()),
893 None,
894 )
895 })
896 .collect::<Vec<_>>();
897 ScalarValue::List(ScalarValue::new_list(
898 &values,
899 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
900 true,
901 ))
902 }
903 TimestampType::Nanosecond(_) => {
904 let values = data
905 .into_iter()
906 .filter_map(|ts| {
907 ts.and_then(|ts| {
908 ts.and_utc().timestamp_nanos_opt().map(|nanos| {
909 ScalarValue::TimestampNanosecond(
910 Some(nanos),
911 None,
912 )
913 })
914 })
915 })
916 .collect::<Vec<_>>();
917 ScalarValue::List(ScalarValue::new_list(
918 &values,
919 &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
920 true,
921 ))
922 }
923 },
924 _ => {
925 return Err(invalid_parameter_error(
926 "invalid_parameter_type",
927 Some(format!(
928 "Expected: {}, found: {}",
929 list_type.item_type(),
930 client_type
931 )),
932 ));
933 }
934 }
935 } else {
936 let values = data
937 .into_iter()
938 .map(|ts| {
939 ScalarValue::TimestampMillisecond(
940 ts.map(|ts| ts.and_utc().timestamp_millis()),
941 None,
942 )
943 })
944 .collect::<Vec<_>>();
945 ScalarValue::List(ScalarValue::new_list(
946 &values,
947 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
948 true,
949 ))
950 }
951 } else {
952 ScalarValue::Null
953 }
954 }
955 &Type::TIMESTAMPTZ_ARRAY => {
956 let data =
957 portal.parameter::<Vec<Option<DateTime<FixedOffset>>>>(idx, &client_type)?;
958 if let Some(data) = data {
959 if let Some(ConcreteDataType::List(list_type)) = &server_type {
960 match list_type.item_type() {
961 ConcreteDataType::Timestamp(unit) => match *unit {
962 TimestampType::Second(_) => {
963 let values = data
964 .into_iter()
965 .map(|ts| {
966 ScalarValue::TimestampSecond(
967 ts.map(|ts| ts.timestamp()),
968 None,
969 )
970 })
971 .collect::<Vec<_>>();
972 ScalarValue::List(ScalarValue::new_list(
973 &values,
974 &ArrowDataType::Timestamp(TimeUnit::Second, None),
975 true,
976 ))
977 }
978 TimestampType::Millisecond(_) => {
979 let values = data
980 .into_iter()
981 .map(|ts| {
982 ScalarValue::TimestampMillisecond(
983 ts.map(|ts| ts.timestamp_millis()),
984 None,
985 )
986 })
987 .collect::<Vec<_>>();
988 ScalarValue::List(ScalarValue::new_list(
989 &values,
990 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
991 true,
992 ))
993 }
994 TimestampType::Microsecond(_) => {
995 let values = data
996 .into_iter()
997 .map(|ts| {
998 ScalarValue::TimestampMicrosecond(
999 ts.map(|ts| ts.timestamp_micros()),
1000 None,
1001 )
1002 })
1003 .collect::<Vec<_>>();
1004 ScalarValue::List(ScalarValue::new_list(
1005 &values,
1006 &ArrowDataType::Timestamp(TimeUnit::Microsecond, None),
1007 true,
1008 ))
1009 }
1010 TimestampType::Nanosecond(_) => {
1011 let values = data
1012 .into_iter()
1013 .map(|ts| {
1014 ScalarValue::TimestampNanosecond(
1015 ts.and_then(|ts| ts.timestamp_nanos_opt()),
1016 None,
1017 )
1018 })
1019 .collect::<Vec<_>>();
1020 ScalarValue::List(ScalarValue::new_list(
1021 &values,
1022 &ArrowDataType::Timestamp(TimeUnit::Nanosecond, None),
1023 true,
1024 ))
1025 }
1026 },
1027 _ => {
1028 return Err(invalid_parameter_error(
1029 "invalid_parameter_type",
1030 Some(format!(
1031 "Expected: {}, found: {}",
1032 list_type.item_type(),
1033 client_type
1034 )),
1035 ));
1036 }
1037 }
1038 } else {
1039 let values = data
1040 .into_iter()
1041 .map(|ts| {
1042 ScalarValue::TimestampMillisecond(
1043 ts.map(|ts| ts.timestamp_millis()),
1044 None,
1045 )
1046 })
1047 .collect::<Vec<_>>();
1048 ScalarValue::List(ScalarValue::new_list(
1049 &values,
1050 &ArrowDataType::Timestamp(TimeUnit::Millisecond, None),
1051 true,
1052 ))
1053 }
1054 } else {
1055 ScalarValue::Null
1056 }
1057 }
1058 _ => Err(invalid_parameter_error(
1059 "unsupported_parameter_value",
1060 Some(format!("Found type: {}", client_type)),
1061 ))?,
1062 };
1063
1064 results.push(value);
1065 }
1066
1067 Ok(results)
1068}
1069
1070pub(super) fn param_types_to_pg_types(
1071 param_types: &HashMap<String, Option<ConcreteDataType>>,
1072) -> Result<Vec<Type>> {
1073 let param_count = param_types.len();
1074 let mut types = Vec::with_capacity(param_count);
1075 for i in 0..param_count {
1076 if let Some(Some(param_type)) = param_types.get(&format!("${}", i + 1)) {
1077 let pg_type = type_gt_to_pg(param_type)?;
1078 types.push(pg_type);
1079 } else {
1080 types.push(Type::UNKNOWN);
1081 }
1082 }
1083 Ok(types)
1084}
1085
1086pub fn format_options_from_query_ctx(query_ctx: &QueryContextRef) -> Arc<PgFormatOptions> {
1087 let config = query_ctx.configuration_parameter();
1088 let (date_style, date_order) = *config.pg_datetime_style();
1089
1090 let mut format_options = PgFormatOptions::default();
1091 format_options.date_style = format!("{}, {}", date_style, date_order);
1092 format_options.interval_style = config.pg_intervalstyle_format().to_string();
1093 format_options.bytea_output = config.postgres_bytea_output().to_string();
1094 format_options.time_zone = query_ctx.timezone().to_string();
1095
1096 Arc::new(format_options)
1097}
1098
1099#[cfg(test)]
1100mod test {
1101 use std::sync::Arc;
1102
1103 use arrow::array::{
1104 Float64Builder, Int64Builder, ListBuilder, StringBuilder, TimestampSecondBuilder,
1105 };
1106 use arrow_schema::{Field, IntervalUnit};
1107 use datatypes::schema::{ColumnSchema, Schema};
1108 use datatypes::vectors::{
1109 BinaryVector, BooleanVector, DateVector, Float32Vector, Float64Vector, Int8Vector,
1110 Int16Vector, Int32Vector, Int64Vector, IntervalDayTimeVector, IntervalMonthDayNanoVector,
1111 IntervalYearMonthVector, ListVector, NullVector, StringVector, TimeSecondVector,
1112 TimestampSecondVector, UInt8Vector, UInt16Vector, UInt32Vector, UInt64Vector, VectorRef,
1113 };
1114 use futures::{StreamExt as FuturesStreamExt, stream};
1115 use pgwire::api::Type;
1116 use pgwire::api::results::{DataRowEncoder, FieldFormat, FieldInfo};
1117 use session::context::QueryContextBuilder;
1118
1119 use super::*;
1120
1121 #[test]
1122 fn test_schema_convert() {
1123 let column_schemas = vec![
1124 ColumnSchema::new("nulls", ConcreteDataType::null_datatype(), true),
1125 ColumnSchema::new("bools", ConcreteDataType::boolean_datatype(), true),
1126 ColumnSchema::new("int8s", ConcreteDataType::int8_datatype(), true),
1127 ColumnSchema::new("int16s", ConcreteDataType::int16_datatype(), true),
1128 ColumnSchema::new("int32s", ConcreteDataType::int32_datatype(), true),
1129 ColumnSchema::new("int64s", ConcreteDataType::int64_datatype(), true),
1130 ColumnSchema::new("uint8s", ConcreteDataType::uint8_datatype(), true),
1131 ColumnSchema::new("uint16s", ConcreteDataType::uint16_datatype(), true),
1132 ColumnSchema::new("uint32s", ConcreteDataType::uint32_datatype(), true),
1133 ColumnSchema::new("uint64s", ConcreteDataType::uint64_datatype(), true),
1134 ColumnSchema::new("float32s", ConcreteDataType::float32_datatype(), true),
1135 ColumnSchema::new("float64s", ConcreteDataType::float64_datatype(), true),
1136 ColumnSchema::new("binaries", ConcreteDataType::binary_datatype(), true),
1137 ColumnSchema::new("strings", ConcreteDataType::string_datatype(), true),
1138 ColumnSchema::new(
1139 "timestamps",
1140 ConcreteDataType::timestamp_millisecond_datatype(),
1141 true,
1142 ),
1143 ColumnSchema::new("dates", ConcreteDataType::date_datatype(), true),
1144 ColumnSchema::new("times", ConcreteDataType::time_second_datatype(), true),
1145 ColumnSchema::new(
1146 "intervals",
1147 ConcreteDataType::interval_month_day_nano_datatype(),
1148 true,
1149 ),
1150 ];
1151 let pg_field_info = vec![
1152 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1153 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1154 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1155 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1156 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1157 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1158 FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1159 FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1160 FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1161 FieldInfo::new(
1162 "uint64s".into(),
1163 None,
1164 None,
1165 Type::NUMERIC,
1166 FieldFormat::Text,
1167 ),
1168 FieldInfo::new(
1169 "float32s".into(),
1170 None,
1171 None,
1172 Type::FLOAT4,
1173 FieldFormat::Text,
1174 ),
1175 FieldInfo::new(
1176 "float64s".into(),
1177 None,
1178 None,
1179 Type::FLOAT8,
1180 FieldFormat::Text,
1181 ),
1182 FieldInfo::new(
1183 "binaries".into(),
1184 None,
1185 None,
1186 Type::BYTEA,
1187 FieldFormat::Text,
1188 ),
1189 FieldInfo::new(
1190 "strings".into(),
1191 None,
1192 None,
1193 Type::VARCHAR,
1194 FieldFormat::Text,
1195 ),
1196 FieldInfo::new(
1197 "timestamps".into(),
1198 None,
1199 None,
1200 Type::TIMESTAMP,
1201 FieldFormat::Text,
1202 ),
1203 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1204 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1205 FieldInfo::new(
1206 "intervals".into(),
1207 None,
1208 None,
1209 Type::INTERVAL,
1210 FieldFormat::Text,
1211 ),
1212 ];
1213 let schema = Schema::new(column_schemas);
1214 let fs = schema_to_pg(&schema, &Format::UnifiedText, None).unwrap();
1215 assert_eq!(fs, pg_field_info);
1216 }
1217
1218 #[test]
1219 fn test_encode_text_format_data() {
1220 let pg_schema = vec![
1221 FieldInfo::new("nulls".into(), None, None, Type::UNKNOWN, FieldFormat::Text),
1222 FieldInfo::new("bools".into(), None, None, Type::BOOL, FieldFormat::Text),
1223 FieldInfo::new("uint8s".into(), None, None, Type::INT2, FieldFormat::Text),
1224 FieldInfo::new("uint16s".into(), None, None, Type::INT4, FieldFormat::Text),
1225 FieldInfo::new("uint32s".into(), None, None, Type::INT8, FieldFormat::Text),
1226 FieldInfo::new(
1227 "uint64s".into(),
1228 None,
1229 None,
1230 Type::NUMERIC,
1231 FieldFormat::Text,
1232 ),
1233 FieldInfo::new("int8s".into(), None, None, Type::CHAR, FieldFormat::Text),
1234 FieldInfo::new("int16s".into(), None, None, Type::INT2, FieldFormat::Text),
1235 FieldInfo::new("int32s".into(), None, None, Type::INT4, FieldFormat::Text),
1236 FieldInfo::new("int64s".into(), None, None, Type::INT8, FieldFormat::Text),
1237 FieldInfo::new(
1238 "float32s".into(),
1239 None,
1240 None,
1241 Type::FLOAT4,
1242 FieldFormat::Text,
1243 ),
1244 FieldInfo::new(
1245 "float64s".into(),
1246 None,
1247 None,
1248 Type::FLOAT8,
1249 FieldFormat::Text,
1250 ),
1251 FieldInfo::new(
1252 "strings".into(),
1253 None,
1254 None,
1255 Type::VARCHAR,
1256 FieldFormat::Text,
1257 ),
1258 FieldInfo::new(
1259 "binaries".into(),
1260 None,
1261 None,
1262 Type::BYTEA,
1263 FieldFormat::Text,
1264 ),
1265 FieldInfo::new("dates".into(), None, None, Type::DATE, FieldFormat::Text),
1266 FieldInfo::new("times".into(), None, None, Type::TIME, FieldFormat::Text),
1267 FieldInfo::new(
1268 "timestamps".into(),
1269 None,
1270 None,
1271 Type::TIMESTAMP,
1272 FieldFormat::Text,
1273 ),
1274 FieldInfo::new(
1275 "interval_year_month".into(),
1276 None,
1277 None,
1278 Type::INTERVAL,
1279 FieldFormat::Text,
1280 ),
1281 FieldInfo::new(
1282 "interval_day_time".into(),
1283 None,
1284 None,
1285 Type::INTERVAL,
1286 FieldFormat::Text,
1287 ),
1288 FieldInfo::new(
1289 "interval_month_day_nano".into(),
1290 None,
1291 None,
1292 Type::INTERVAL,
1293 FieldFormat::Text,
1294 ),
1295 FieldInfo::new(
1296 "int_list".into(),
1297 None,
1298 None,
1299 Type::INT8_ARRAY,
1300 FieldFormat::Text,
1301 ),
1302 FieldInfo::new(
1303 "float_list".into(),
1304 None,
1305 None,
1306 Type::FLOAT8_ARRAY,
1307 FieldFormat::Text,
1308 ),
1309 FieldInfo::new(
1310 "string_list".into(),
1311 None,
1312 None,
1313 Type::VARCHAR_ARRAY,
1314 FieldFormat::Text,
1315 ),
1316 FieldInfo::new(
1317 "timestamp_list".into(),
1318 None,
1319 None,
1320 Type::TIMESTAMP_ARRAY,
1321 FieldFormat::Text,
1322 ),
1323 ];
1324
1325 let arrow_schema = arrow_schema::Schema::new(vec![
1326 Field::new("x", DataType::Null, true),
1327 Field::new("x", DataType::Boolean, true),
1328 Field::new("x", DataType::UInt8, true),
1329 Field::new("x", DataType::UInt16, true),
1330 Field::new("x", DataType::UInt32, true),
1331 Field::new("x", DataType::UInt64, true),
1332 Field::new("x", DataType::Int8, true),
1333 Field::new("x", DataType::Int16, true),
1334 Field::new("x", DataType::Int32, true),
1335 Field::new("x", DataType::Int64, true),
1336 Field::new("x", DataType::Float32, true),
1337 Field::new("x", DataType::Float64, true),
1338 Field::new("x", DataType::Utf8, true),
1339 Field::new("x", DataType::Binary, true),
1340 Field::new("x", DataType::Date32, true),
1341 Field::new("x", DataType::Time32(TimeUnit::Second), true),
1342 Field::new("x", DataType::Timestamp(TimeUnit::Second, None), true),
1343 Field::new("x", DataType::Interval(IntervalUnit::YearMonth), true),
1344 Field::new("x", DataType::Interval(IntervalUnit::DayTime), true),
1345 Field::new("x", DataType::Interval(IntervalUnit::MonthDayNano), true),
1346 Field::new(
1347 "x",
1348 DataType::List(Arc::new(Field::new("item", DataType::Int64, true))),
1349 true,
1350 ),
1351 Field::new(
1352 "x",
1353 DataType::List(Arc::new(Field::new("item", DataType::Float64, true))),
1354 true,
1355 ),
1356 Field::new(
1357 "x",
1358 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
1359 true,
1360 ),
1361 Field::new(
1362 "x",
1363 DataType::List(Arc::new(Field::new(
1364 "item",
1365 DataType::Timestamp(TimeUnit::Second, None),
1366 true,
1367 ))),
1368 true,
1369 ),
1370 ]);
1371
1372 let mut builder = ListBuilder::new(Int64Builder::new());
1373 builder.append_value([Some(1i64), None, Some(2)]);
1374 builder.append_null();
1375 builder.append_value([Some(-1i64), None, Some(-2)]);
1376 let i64_list_array = builder.finish();
1377
1378 let mut builder = ListBuilder::new(Float64Builder::new());
1379 builder.append_value([Some(1.0f64), None, Some(2.0)]);
1380 builder.append_null();
1381 builder.append_value([Some(-1.0f64), None, Some(-2.0)]);
1382 let f64_list_array = builder.finish();
1383
1384 let mut builder = ListBuilder::new(StringBuilder::new());
1385 builder.append_value([Some("a"), None, Some("b")]);
1386 builder.append_null();
1387 builder.append_value([Some("c"), None, Some("d")]);
1388 let string_list_array = builder.finish();
1389
1390 let mut builder = ListBuilder::new(TimestampSecondBuilder::new());
1391 builder.append_value([Some(1i64), None, Some(2)]);
1392 builder.append_null();
1393 builder.append_value([Some(3i64), None, Some(4)]);
1394 let timestamp_list_array = builder.finish();
1395
1396 let values = vec![
1397 Arc::new(NullVector::new(3)) as VectorRef,
1398 Arc::new(BooleanVector::from(vec![Some(true), Some(false), None])),
1399 Arc::new(UInt8Vector::from(vec![Some(u8::MAX), Some(u8::MIN), None])),
1400 Arc::new(UInt16Vector::from(vec![
1401 Some(u16::MAX),
1402 Some(u16::MIN),
1403 None,
1404 ])),
1405 Arc::new(UInt32Vector::from(vec![
1406 Some(u32::MAX),
1407 Some(u32::MIN),
1408 None,
1409 ])),
1410 Arc::new(UInt64Vector::from(vec![
1411 Some(u64::MAX),
1412 Some(u64::MIN),
1413 None,
1414 ])),
1415 Arc::new(Int8Vector::from(vec![Some(i8::MAX), Some(i8::MIN), None])),
1416 Arc::new(Int16Vector::from(vec![
1417 Some(i16::MAX),
1418 Some(i16::MIN),
1419 None,
1420 ])),
1421 Arc::new(Int32Vector::from(vec![
1422 Some(i32::MAX),
1423 Some(i32::MIN),
1424 None,
1425 ])),
1426 Arc::new(Int64Vector::from(vec![
1427 Some(i64::MAX),
1428 Some(i64::MIN),
1429 None,
1430 ])),
1431 Arc::new(Float32Vector::from(vec![
1432 None,
1433 Some(f32::MAX),
1434 Some(f32::MIN),
1435 ])),
1436 Arc::new(Float64Vector::from(vec![
1437 None,
1438 Some(f64::MAX),
1439 Some(f64::MIN),
1440 ])),
1441 Arc::new(StringVector::from(vec![
1442 None,
1443 Some("hello"),
1444 Some("greptime"),
1445 ])),
1446 Arc::new(BinaryVector::from(vec![
1447 None,
1448 Some("hello".as_bytes().to_vec()),
1449 Some("world".as_bytes().to_vec()),
1450 ])),
1451 Arc::new(DateVector::from(vec![Some(1001), None, Some(1)])),
1452 Arc::new(TimeSecondVector::from(vec![Some(1001), None, Some(1)])),
1453 Arc::new(TimestampSecondVector::from(vec![
1454 Some(1000001),
1455 None,
1456 Some(1),
1457 ])),
1458 Arc::new(IntervalYearMonthVector::from(vec![Some(1), None, Some(2)])),
1459 Arc::new(IntervalDayTimeVector::from(vec![
1460 Some(arrow::datatypes::IntervalDayTime::new(1, 1)),
1461 None,
1462 Some(arrow::datatypes::IntervalDayTime::new(2, 2)),
1463 ])),
1464 Arc::new(IntervalMonthDayNanoVector::from(vec![
1465 Some(arrow::datatypes::IntervalMonthDayNano::new(1, 1, 10)),
1466 None,
1467 Some(arrow::datatypes::IntervalMonthDayNano::new(2, 2, 20)),
1468 ])),
1469 Arc::new(ListVector::from(i64_list_array)),
1470 Arc::new(ListVector::from(f64_list_array)),
1471 Arc::new(ListVector::from(string_list_array)),
1472 Arc::new(ListVector::from(timestamp_list_array)),
1473 ];
1474 let record_batch =
1475 RecordBatch::new(Arc::new(arrow_schema.try_into().unwrap()), values).unwrap();
1476
1477 let query_context = QueryContextBuilder::default()
1478 .configuration_parameter(Default::default())
1479 .build()
1480 .into();
1481 let schema = record_batch.schema.clone();
1482 let pg_schema_ref = Arc::new(pg_schema);
1483
1484 let encoder = DataRowEncoder::new(pg_schema_ref.clone());
1485
1486 let row_stream = RecordBatchRowStream::new(
1487 query_context,
1488 pg_schema_ref.clone(),
1489 schema,
1490 stream::once(async { Ok(record_batch) }),
1491 encoder,
1492 );
1493
1494 let rows: Vec<_> = futures::executor::block_on(
1495 row_stream
1496 .filter_map(|x: PgWireResult<_>| async move { x.ok() })
1497 .flat_map(stream::iter)
1498 .collect::<Vec<_>>(),
1499 );
1500 assert_eq!(rows.len(), 3);
1501 for row in rows {
1502 assert_eq!(row.field_count, pg_schema_ref.len() as i16);
1503 }
1504 }
1505
1506 #[test]
1507 fn test_invalid_parameter() {
1508 let msg = "invalid_parameter_count";
1510 let error = invalid_parameter_error(msg, None);
1511 if let PgWireError::UserError(value) = error {
1512 assert_eq!("ERROR", value.severity);
1513 assert_eq!("22023", value.code);
1514 assert_eq!(msg, value.message);
1515 } else {
1516 panic!("test_invalid_parameter failed");
1517 }
1518 }
1519}