Struct BatchCoalescer
pub struct BatchCoalescer {
schema: Arc<Schema>,
target_batch_size: usize,
in_progress_arrays: Vec<Box<dyn InProgressArray>>,
buffered_rows: usize,
completed: VecDeque<RecordBatch>,
}
Expand description
Concatenate multiple [RecordBatch
]es
Implements the common pattern of incrementally creating output
[RecordBatch
]es of a specific size from an input stream of
[RecordBatch
]es.
This is useful after operations such as filter
and take
that produce
smaller batches, and we want to coalesce them into larger batches for
further processing.
§Motivation
If we use concat_batches
to implement the same functionality, there are 2 potential issues:
- At least 2x peak memory (holding the input and output of concat)
- 2 copies of the data (to create the output of filter and then create the output of concat)
See: https://github.com/apache/arrow-rs/issues/6692 for more discussions about the motivation.
§Example
use arrow_array::record_batch;
use arrow_select::coalesce::{BatchCoalescer};
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5])).unwrap();
// Create a `BatchCoalescer` that will produce batches with at least 4 rows
let target_batch_size = 4;
let mut coalescer = BatchCoalescer::new(batch1.schema(), 4);
// push the batches
coalescer.push_batch(batch1).unwrap();
// only pushed 3 rows (not yet 4, enough to produce a batch)
assert!(coalescer.next_completed_batch().is_none());
coalescer.push_batch(batch2).unwrap();
// now we have 5 rows, so we can produce a batch
let finished = coalescer.next_completed_batch().unwrap();
// 4 rows came out (target batch size is 4)
let expected = record_batch!(("a", Int32, [1, 2, 3, 4])).unwrap();
assert_eq!(finished, expected);
// Have no more input, but still have an in-progress batch
assert!(coalescer.next_completed_batch().is_none());
// We can finish the batch, which will produce the remaining rows
coalescer.finish_buffered_batch().unwrap();
let expected = record_batch!(("a", Int32, [5])).unwrap();
assert_eq!(coalescer.next_completed_batch().unwrap(), expected);
// The coalescer is now empty
assert!(coalescer.next_completed_batch().is_none());
§Background
Generally speaking, larger [RecordBatch
]es are more efficient to process
than smaller [RecordBatch
]es (until the CPU cache is exceeded) because
there is fixed processing overhead per batch. This coalescer builds up these
larger batches incrementally.
┌────────────────────┐
│ RecordBatch │
│ num_rows = 100 │
└────────────────────┘ ┌────────────────────┐
│ │
┌────────────────────┐ Coalesce │ │
│ │ Batches │ │
│ RecordBatch │ │ │
│ num_rows = 200 │ ─ ─ ─ ─ ─ ─ ▶ │ │
│ │ │ RecordBatch │
│ │ │ num_rows = 400 │
└────────────────────┘ │ │
│ │
┌────────────────────┐ │ │
│ │ │ │
│ RecordBatch │ │ │
│ num_rows = 100 │ └────────────────────┘
│ │
└────────────────────┘
§Notes:
-
Output rows are produced in the same order as the input rows
-
The output is a sequence of batches, with all but the last being at exactly
target_batch_size
rows.
Fields§
§schema: Arc<Schema>
§target_batch_size: usize
§in_progress_arrays: Vec<Box<dyn InProgressArray>>
§buffered_rows: usize
§completed: VecDeque<RecordBatch>
Implementations§
§impl BatchCoalescer
impl BatchCoalescer
pub fn new(schema: Arc<Schema>, target_batch_size: usize) -> BatchCoalescer
pub fn new(schema: Arc<Schema>, target_batch_size: usize) -> BatchCoalescer
Create a new BatchCoalescer
§Arguments
schema
- the schema of the output batchestarget_batch_size
- the number of rows in each output batch. Typical values are4096
or8192
rows.
pub fn push_batch_with_filter(
&mut self,
batch: RecordBatch,
filter: &BooleanArray,
) -> Result<(), ArrowError>
pub fn push_batch_with_filter( &mut self, batch: RecordBatch, filter: &BooleanArray, ) -> Result<(), ArrowError>
Push a batch into the Coalescer after applying a filter
This is semantically equivalent of calling Self::push_batch
with the results from filter_record_batch
§Example
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
// Apply a filter to each batch to pick the first and last row
let filter = BooleanArray::from(vec![true, false, true]);
// create a new Coalescer that targets creating 1000 row batches
let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
coalescer.push_batch_with_filter(batch1, &filter);
coalescer.push_batch_with_filter(batch2, &filter);
// finsh and retrieve the created batch
coalescer.finish_buffered_batch().unwrap();
let completed_batch = coalescer.next_completed_batch().unwrap();
// filtered out 2 and 5:
let expected_batch = record_batch!(("a", Int32, [1, 3, 4, 6])).unwrap();
assert_eq!(completed_batch, expected_batch);
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>
pub fn push_batch(&mut self, batch: RecordBatch) -> Result<(), ArrowError>
Push all the rows from batch
into the Coalescer
When buffered data plus incoming rows reach target_batch_size
,
completed batches are generated eagerly and can be retrieved via
Self::next_completed_batch()
.
Output batches contain exactly target_batch_size
rows, so the tail of
the input batch may remain buffered.
Remaining partial data either waits for future input batches or can be
materialized immediately by calling Self::finish_buffered_batch()
.
§Example
let batch1 = record_batch!(("a", Int32, [1, 2, 3])).unwrap();
let batch2 = record_batch!(("a", Int32, [4, 5, 6])).unwrap();
// create a new Coalescer that targets creating 1000 row batches
let mut coalescer = BatchCoalescer::new(batch1.schema(), 1000);
coalescer.push_batch(batch1);
coalescer.push_batch(batch2);
// finsh and retrieve the created batch
coalescer.finish_buffered_batch().unwrap();
let completed_batch = coalescer.next_completed_batch().unwrap();
let expected_batch = record_batch!(("a", Int32, [1, 2, 3, 4, 5, 6])).unwrap();
assert_eq!(completed_batch, expected_batch);
pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError>
pub fn finish_buffered_batch(&mut self) -> Result<(), ArrowError>
Concatenates any buffered batches into a single RecordBatch
and
clears any output buffers
Normally this is called when the input stream is exhausted, and we want to finalize the last batch of rows.
See Self::next_completed_batch()
for the completed batches.
pub fn has_completed_batch(&self) -> bool
pub fn has_completed_batch(&self) -> bool
Returns true if there are any completed batches
pub fn next_completed_batch(&mut self) -> Option<RecordBatch>
pub fn next_completed_batch(&mut self) -> Option<RecordBatch>
Removes and returns the next completed batch, if any.
Trait Implementations§
Auto Trait Implementations§
impl Freeze for BatchCoalescer
impl !RefUnwindSafe for BatchCoalescer
impl Send for BatchCoalescer
impl Sync for BatchCoalescer
impl Unpin for BatchCoalescer
impl !UnwindSafe for BatchCoalescer
Blanket Implementations§
Source§impl<T> BorrowMut<T> for Twhere
T: ?Sized,
impl<T> BorrowMut<T> for Twhere
T: ?Sized,
Source§fn borrow_mut(&mut self) -> &mut T
fn borrow_mut(&mut self) -> &mut T
§impl<T> Conv for T
impl<T> Conv for T
§impl<T, V> Convert<T> for Vwhere
V: Into<T>,
impl<T, V> Convert<T> for Vwhere
V: Into<T>,
fn convert(value: Self) -> T
fn convert_box(value: Box<Self>) -> Box<T>
fn convert_vec(value: Vec<Self>) -> Vec<T>
fn convert_vec_box(value: Vec<Box<Self>>) -> Vec<Box<T>>
fn convert_matrix(value: Vec<Vec<Self>>) -> Vec<Vec<T>>
fn convert_option(value: Option<Self>) -> Option<T>
fn convert_option_box(value: Option<Box<Self>>) -> Option<Box<T>>
fn convert_option_vec(value: Option<Vec<Self>>) -> Option<Vec<T>>
§impl<T> FmtForward for T
impl<T> FmtForward for T
§fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
fn fmt_binary(self) -> FmtBinary<Self>where
Self: Binary,
self
to use its Binary
implementation when Debug
-formatted.§fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
fn fmt_display(self) -> FmtDisplay<Self>where
Self: Display,
self
to use its Display
implementation when
Debug
-formatted.§fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
fn fmt_lower_exp(self) -> FmtLowerExp<Self>where
Self: LowerExp,
self
to use its LowerExp
implementation when
Debug
-formatted.§fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
fn fmt_lower_hex(self) -> FmtLowerHex<Self>where
Self: LowerHex,
self
to use its LowerHex
implementation when
Debug
-formatted.§fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
fn fmt_octal(self) -> FmtOctal<Self>where
Self: Octal,
self
to use its Octal
implementation when Debug
-formatted.§fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
fn fmt_pointer(self) -> FmtPointer<Self>where
Self: Pointer,
self
to use its Pointer
implementation when
Debug
-formatted.§fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
fn fmt_upper_exp(self) -> FmtUpperExp<Self>where
Self: UpperExp,
self
to use its UpperExp
implementation when
Debug
-formatted.§fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
fn fmt_upper_hex(self) -> FmtUpperHex<Self>where
Self: UpperHex,
self
to use its UpperHex
implementation when
Debug
-formatted.§fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
fn fmt_list(self) -> FmtList<Self>where
&'a Self: for<'a> IntoIterator,
§impl<T> FutureExt for T
impl<T> FutureExt for T
§fn with_context(self, otel_cx: Context) -> WithContext<Self>
fn with_context(self, otel_cx: Context) -> WithContext<Self>
§fn with_current_context(self) -> WithContext<Self>
fn with_current_context(self) -> WithContext<Self>
§impl<T> Instrument for T
impl<T> Instrument for T
§fn instrument(self, span: Span) -> Instrumented<Self>
fn instrument(self, span: Span) -> Instrumented<Self>
§fn in_current_span(self) -> Instrumented<Self>
fn in_current_span(self) -> Instrumented<Self>
Source§impl<T> IntoEither for T
impl<T> IntoEither for T
Source§fn into_either(self, into_left: bool) -> Either<Self, Self>
fn into_either(self, into_left: bool) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left
is true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read moreSource§fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
fn into_either_with<F>(self, into_left: F) -> Either<Self, Self>
self
into a Left
variant of Either<Self, Self>
if into_left(&self)
returns true
.
Converts self
into a Right
variant of Either<Self, Self>
otherwise. Read more§impl<T> IntoRequest<T> for T
impl<T> IntoRequest<T> for T
§fn into_request(self) -> Request<T>
fn into_request(self) -> Request<T>
T
in a tonic::Request
§impl<L> LayerExt<L> for L
impl<L> LayerExt<L> for L
§fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>where
L: Layer<S>,
fn named_layer<S>(&self, service: S) -> Layered<<L as Layer<S>>::Service, S>where
L: Layer<S>,
Layered
].§impl<T> Pipe for Twhere
T: ?Sized,
impl<T> Pipe for Twhere
T: ?Sized,
§fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
fn pipe<R>(self, func: impl FnOnce(Self) -> R) -> Rwhere
Self: Sized,
§fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref<'a, R>(&'a self, func: impl FnOnce(&'a Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
fn pipe_ref_mut<'a, R>(&'a mut self, func: impl FnOnce(&'a mut Self) -> R) -> Rwhere
R: 'a,
self
and passes that borrow into the pipe function. Read more§fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
fn pipe_borrow<'a, B, R>(&'a self, func: impl FnOnce(&'a B) -> R) -> R
§fn pipe_borrow_mut<'a, B, R>(
&'a mut self,
func: impl FnOnce(&'a mut B) -> R,
) -> R
fn pipe_borrow_mut<'a, B, R>( &'a mut self, func: impl FnOnce(&'a mut B) -> R, ) -> R
§fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
fn pipe_as_ref<'a, U, R>(&'a self, func: impl FnOnce(&'a U) -> R) -> R
self
, then passes self.as_ref()
into the pipe function.§fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
fn pipe_as_mut<'a, U, R>(&'a mut self, func: impl FnOnce(&'a mut U) -> R) -> R
self
, then passes self.as_mut()
into the pipe
function.§fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
fn pipe_deref<'a, T, R>(&'a self, func: impl FnOnce(&'a T) -> R) -> R
self
, then passes self.deref()
into the pipe function.§impl<T> PolicyExt for Twhere
T: ?Sized,
impl<T> PolicyExt for Twhere
T: ?Sized,
§impl<T> ServiceExt for T
impl<T> ServiceExt for T
§fn propagate_header(self, header: HeaderName) -> PropagateHeader<Self>where
Self: Sized,
fn propagate_header(self, header: HeaderName) -> PropagateHeader<Self>where
Self: Sized,
§fn add_extension<T>(self, value: T) -> AddExtension<Self, T>where
Self: Sized,
fn add_extension<T>(self, value: T) -> AddExtension<Self, T>where
Self: Sized,
§fn map_request_body<F>(self, f: F) -> MapRequestBody<Self, F>where
Self: Sized,
fn map_request_body<F>(self, f: F) -> MapRequestBody<Self, F>where
Self: Sized,
§fn map_response_body<F>(self, f: F) -> MapResponseBody<Self, F>where
Self: Sized,
fn map_response_body<F>(self, f: F) -> MapResponseBody<Self, F>where
Self: Sized,
§fn compression(self) -> Compression<Self>where
Self: Sized,
fn compression(self) -> Compression<Self>where
Self: Sized,
§fn decompression(self) -> Decompression<Self>where
Self: Sized,
fn decompression(self) -> Decompression<Self>where
Self: Sized,
§fn trace_for_http(self) -> Trace<Self, SharedClassifier<ServerErrorsAsFailures>>where
Self: Sized,
fn trace_for_http(self) -> Trace<Self, SharedClassifier<ServerErrorsAsFailures>>where
Self: Sized,
§fn trace_for_grpc(self) -> Trace<Self, SharedClassifier<GrpcErrorsAsFailures>>where
Self: Sized,
fn trace_for_grpc(self) -> Trace<Self, SharedClassifier<GrpcErrorsAsFailures>>where
Self: Sized,
§fn follow_redirects(self) -> FollowRedirect<Self>where
Self: Sized,
fn follow_redirects(self) -> FollowRedirect<Self>where
Self: Sized,
§fn sensitive_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveRequestHeaders<SetSensitiveResponseHeaders<Self>>where
Self: Sized,
fn sensitive_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveRequestHeaders<SetSensitiveResponseHeaders<Self>>where
Self: Sized,
§fn sensitive_request_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveRequestHeaders<Self>where
Self: Sized,
fn sensitive_request_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveRequestHeaders<Self>where
Self: Sized,
§fn sensitive_response_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveResponseHeaders<Self>where
Self: Sized,
fn sensitive_response_headers(
self,
headers: impl IntoIterator<Item = HeaderName>,
) -> SetSensitiveResponseHeaders<Self>where
Self: Sized,
§fn override_request_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
fn override_request_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
§fn append_request_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
fn append_request_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
§fn insert_request_header_if_not_present<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
fn insert_request_header_if_not_present<M>(
self,
header_name: HeaderName,
make: M,
) -> SetRequestHeader<Self, M>where
Self: Sized,
§fn override_response_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
fn override_response_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
§fn append_response_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
fn append_response_header<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
§fn insert_response_header_if_not_present<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
fn insert_response_header_if_not_present<M>(
self,
header_name: HeaderName,
make: M,
) -> SetResponseHeader<Self, M>where
Self: Sized,
§fn set_request_id<M>(
self,
header_name: HeaderName,
make_request_id: M,
) -> SetRequestId<Self, M>where
Self: Sized,
M: MakeRequestId,
fn set_request_id<M>(
self,
header_name: HeaderName,
make_request_id: M,
) -> SetRequestId<Self, M>where
Self: Sized,
M: MakeRequestId,
§fn set_x_request_id<M>(self, make_request_id: M) -> SetRequestId<Self, M>where
Self: Sized,
M: MakeRequestId,
fn set_x_request_id<M>(self, make_request_id: M) -> SetRequestId<Self, M>where
Self: Sized,
M: MakeRequestId,
x-request-id
as the header name. Read more§fn propagate_request_id(
self,
header_name: HeaderName,
) -> PropagateRequestId<Self>where
Self: Sized,
fn propagate_request_id(
self,
header_name: HeaderName,
) -> PropagateRequestId<Self>where
Self: Sized,
§fn propagate_x_request_id(self) -> PropagateRequestId<Self>where
Self: Sized,
fn propagate_x_request_id(self) -> PropagateRequestId<Self>where
Self: Sized,
x-request-id
as the header name. Read more§fn catch_panic(self) -> CatchPanic<Self, DefaultResponseForPanic>where
Self: Sized,
fn catch_panic(self) -> CatchPanic<Self, DefaultResponseForPanic>where
Self: Sized,
500 Internal Server
responses. Read more§fn request_body_limit(self, limit: usize) -> RequestBodyLimit<Self>where
Self: Sized,
fn request_body_limit(self, limit: usize) -> RequestBodyLimit<Self>where
Self: Sized,
413 Payload Too Large
responses. Read more§fn trim_trailing_slash(self) -> NormalizePath<Self>where
Self: Sized,
fn trim_trailing_slash(self) -> NormalizePath<Self>where
Self: Sized,
§fn append_trailing_slash(self) -> NormalizePath<Self>where
Self: Sized,
fn append_trailing_slash(self) -> NormalizePath<Self>where
Self: Sized,
§impl<T> Tap for T
impl<T> Tap for T
§fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow<B>(self, func: impl FnOnce(&B)) -> Self
Borrow<B>
of a value. Read more§fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut<B>(self, func: impl FnOnce(&mut B)) -> Self
BorrowMut<B>
of a value. Read more§fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref<R>(self, func: impl FnOnce(&R)) -> Self
AsRef<R>
view of a value. Read more§fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut<R>(self, func: impl FnOnce(&mut R)) -> Self
AsMut<R>
view of a value. Read more§fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref<T>(self, func: impl FnOnce(&T)) -> Self
Deref::Target
of a value. Read more§fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
fn tap_deref_mut<T>(self, func: impl FnOnce(&mut T)) -> Self
Deref::Target
of a value. Read more§fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
fn tap_dbg(self, func: impl FnOnce(&Self)) -> Self
.tap()
only in debug builds, and is erased in release builds.§fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
fn tap_mut_dbg(self, func: impl FnOnce(&mut Self)) -> Self
.tap_mut()
only in debug builds, and is erased in release
builds.§fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
fn tap_borrow_dbg<B>(self, func: impl FnOnce(&B)) -> Self
.tap_borrow()
only in debug builds, and is erased in release
builds.§fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
fn tap_borrow_mut_dbg<B>(self, func: impl FnOnce(&mut B)) -> Self
.tap_borrow_mut()
only in debug builds, and is erased in release
builds.§fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
fn tap_ref_dbg<R>(self, func: impl FnOnce(&R)) -> Self
.tap_ref()
only in debug builds, and is erased in release
builds.§fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
fn tap_ref_mut_dbg<R>(self, func: impl FnOnce(&mut R)) -> Self
.tap_ref_mut()
only in debug builds, and is erased in release
builds.§fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
fn tap_deref_dbg<T>(self, func: impl FnOnce(&T)) -> Self
.tap_deref()
only in debug builds, and is erased in release
builds.