mito2/sst/parquet/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet writer.
16
17use std::future::Future;
18use std::pin::Pin;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21use std::task::{Context, Poll};
22
23use common_time::Timestamp;
24use datatypes::arrow::datatypes::SchemaRef;
25use object_store::{FuturesAsyncWriter, ObjectStore};
26use parquet::arrow::AsyncArrowWriter;
27use parquet::basic::{Compression, Encoding, ZstdLevel};
28use parquet::file::metadata::KeyValue;
29use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
30use parquet::schema::types::ColumnPath;
31use smallvec::smallvec;
32use snafu::ResultExt;
33use store_api::metadata::RegionMetadataRef;
34use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
35use store_api::storage::SequenceNumber;
36use tokio::io::AsyncWrite;
37use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
38
39use crate::access_layer::{FilePathProvider, SstInfoArray};
40use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
41use crate::read::{Batch, Source};
42use crate::sst::file::FileId;
43use crate::sst::index::{Indexer, IndexerBuilder};
44use crate::sst::parquet::format::WriteFormat;
45use crate::sst::parquet::helper::parse_parquet_metadata;
46use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
47use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
48
49/// Parquet SST writer.
50pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
51    /// Path provider that creates SST and index file paths according to file id.
52    path_provider: P,
53    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
54    /// Current active file id.
55    current_file: FileId,
56    writer_factory: F,
57    /// Region metadata of the source and the target SST.
58    metadata: RegionMetadataRef,
59    /// Indexer build that can create indexer for multiple files.
60    indexer_builder: I,
61    /// Current active indexer.
62    current_indexer: Option<Indexer>,
63    bytes_written: Arc<AtomicUsize>,
64}
65
66pub trait WriterFactory {
67    type Writer: AsyncWrite + Send + Unpin;
68    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
69}
70
71pub struct ObjectStoreWriterFactory {
72    object_store: ObjectStore,
73}
74
75impl WriterFactory for ObjectStoreWriterFactory {
76    type Writer = Compat<FuturesAsyncWriter>;
77
78    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
79        self.object_store
80            .writer_with(file_path)
81            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
82            .concurrent(DEFAULT_WRITE_CONCURRENCY)
83            .await
84            .map(|v| v.into_futures_async_write().compat_write())
85            .context(OpenDalSnafu)
86    }
87}
88
89impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
90where
91    P: FilePathProvider,
92    I: IndexerBuilder,
93{
94    pub async fn new_with_object_store(
95        object_store: ObjectStore,
96        metadata: RegionMetadataRef,
97        indexer_builder: I,
98        path_provider: P,
99    ) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
100        ParquetWriter::new(
101            ObjectStoreWriterFactory { object_store },
102            metadata,
103            indexer_builder,
104            path_provider,
105        )
106        .await
107    }
108}
109
110impl<F, I, P> ParquetWriter<F, I, P>
111where
112    F: WriterFactory,
113    I: IndexerBuilder,
114    P: FilePathProvider,
115{
116    /// Creates a new parquet SST writer.
117    pub async fn new(
118        factory: F,
119        metadata: RegionMetadataRef,
120        indexer_builder: I,
121        path_provider: P,
122    ) -> ParquetWriter<F, I, P> {
123        let init_file = FileId::random();
124        let indexer = indexer_builder.build(init_file).await;
125
126        ParquetWriter {
127            path_provider,
128            writer: None,
129            current_file: init_file,
130            writer_factory: factory,
131            metadata,
132            indexer_builder,
133            current_indexer: Some(indexer),
134            bytes_written: Arc::new(AtomicUsize::new(0)),
135        }
136    }
137
138    async fn get_or_create_indexer(&mut self) -> &mut Indexer {
139        match self.current_indexer {
140            None => {
141                self.current_file = FileId::random();
142                let indexer = self.indexer_builder.build(self.current_file).await;
143                self.current_indexer = Some(indexer);
144                // safety: self.current_indexer already set above.
145                self.current_indexer.as_mut().unwrap()
146            }
147            Some(ref mut indexer) => indexer,
148        }
149    }
150
151    /// Iterates source and writes all rows to Parquet file.
152    ///
153    /// Returns the [SstInfo] if the SST is written.
154    pub async fn write_all(
155        &mut self,
156        mut source: Source,
157        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
158        opts: &WriteOptions,
159    ) -> Result<SstInfoArray> {
160        let write_format =
161            WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
162        let mut stats = SourceStats::default();
163
164        while let Some(res) = self
165            .write_next_batch(&mut source, &write_format, opts)
166            .await
167            .transpose()
168        {
169            match res {
170                Ok(mut batch) => {
171                    stats.update(&batch);
172                    self.get_or_create_indexer().await.update(&mut batch).await;
173                }
174                Err(e) => {
175                    self.get_or_create_indexer().await.abort().await;
176                    return Err(e);
177                }
178            }
179        }
180
181        let index_output = self.get_or_create_indexer().await.finish().await;
182
183        if stats.num_rows == 0 {
184            return Ok(smallvec![]);
185        }
186
187        let Some(mut arrow_writer) = self.writer.take() else {
188            // No batch actually written.
189            return Ok(smallvec![]);
190        };
191
192        arrow_writer.flush().await.context(WriteParquetSnafu)?;
193
194        let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?;
195        let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
196
197        // Safety: num rows > 0 so we must have min/max.
198        let time_range = stats.time_range.unwrap();
199
200        // convert FileMetaData to ParquetMetaData
201        let parquet_metadata = parse_parquet_metadata(file_meta)?;
202
203        let file_id = self.current_file;
204
205        // object_store.write will make sure all bytes are written or an error is raised.
206        Ok(smallvec![SstInfo {
207            file_id,
208            time_range,
209            file_size,
210            num_rows: stats.num_rows,
211            num_row_groups: parquet_metadata.num_row_groups() as u64,
212            file_metadata: Some(Arc::new(parquet_metadata)),
213            index_metadata: index_output,
214        }])
215    }
216
217    /// Customizes per-column config according to schema and maybe column cardinality.
218    fn customize_column_config(
219        builder: WriterPropertiesBuilder,
220        region_metadata: &RegionMetadataRef,
221    ) -> WriterPropertiesBuilder {
222        let ts_col = ColumnPath::new(vec![region_metadata
223            .time_index_column()
224            .column_schema
225            .name
226            .clone()]);
227        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
228
229        builder
230            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
231            .set_column_dictionary_enabled(seq_col, false)
232            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
233            .set_column_dictionary_enabled(ts_col, false)
234    }
235
236    async fn write_next_batch(
237        &mut self,
238        source: &mut Source,
239        write_format: &WriteFormat,
240        opts: &WriteOptions,
241    ) -> Result<Option<Batch>> {
242        let Some(batch) = source.next_batch().await? else {
243            return Ok(None);
244        };
245
246        let arrow_batch = write_format.convert_batch(&batch)?;
247        self.maybe_init_writer(write_format.arrow_schema(), opts)
248            .await?
249            .write(&arrow_batch)
250            .await
251            .context(WriteParquetSnafu)?;
252        Ok(Some(batch))
253    }
254
255    async fn maybe_init_writer(
256        &mut self,
257        schema: &SchemaRef,
258        opts: &WriteOptions,
259    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
260        if let Some(ref mut w) = self.writer {
261            Ok(w)
262        } else {
263            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
264            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
265
266            // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
267            let props_builder = WriterProperties::builder()
268                .set_key_value_metadata(Some(vec![key_value_meta]))
269                .set_compression(Compression::ZSTD(ZstdLevel::default()))
270                .set_encoding(Encoding::PLAIN)
271                .set_max_row_group_size(opts.row_group_size);
272
273            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
274            let writer_props = props_builder.build();
275
276            let sst_file_path = self.path_provider.build_sst_file_path(self.current_file);
277            let writer = SizeAwareWriter::new(
278                self.writer_factory.create(&sst_file_path).await?,
279                self.bytes_written.clone(),
280            );
281            let arrow_writer =
282                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
283                    .context(WriteParquetSnafu)?;
284            self.writer = Some(arrow_writer);
285            // safety: self.writer is assigned above
286            Ok(self.writer.as_mut().unwrap())
287        }
288    }
289}
290
291#[derive(Default)]
292struct SourceStats {
293    /// Number of rows fetched.
294    num_rows: usize,
295    /// Time range of fetched batches.
296    time_range: Option<(Timestamp, Timestamp)>,
297}
298
299impl SourceStats {
300    fn update(&mut self, batch: &Batch) {
301        if batch.is_empty() {
302            return;
303        }
304
305        self.num_rows += batch.num_rows();
306        // Safety: batch is not empty.
307        let (min_in_batch, max_in_batch) = (
308            batch.first_timestamp().unwrap(),
309            batch.last_timestamp().unwrap(),
310        );
311        if let Some(time_range) = &mut self.time_range {
312            time_range.0 = time_range.0.min(min_in_batch);
313            time_range.1 = time_range.1.max(max_in_batch);
314        } else {
315            self.time_range = Some((min_in_batch, max_in_batch));
316        }
317    }
318}
319
320/// Workaround for [AsyncArrowWriter] does not provide a method to
321/// get total bytes written after close.
322struct SizeAwareWriter<W> {
323    inner: W,
324    size: Arc<AtomicUsize>,
325}
326
327impl<W> SizeAwareWriter<W> {
328    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
329        Self {
330            inner,
331            size: size.clone(),
332        }
333    }
334}
335
336impl<W> AsyncWrite for SizeAwareWriter<W>
337where
338    W: AsyncWrite + Unpin,
339{
340    fn poll_write(
341        mut self: Pin<&mut Self>,
342        cx: &mut Context<'_>,
343        buf: &[u8],
344    ) -> Poll<std::result::Result<usize, std::io::Error>> {
345        let this = self.as_mut().get_mut();
346
347        match Pin::new(&mut this.inner).poll_write(cx, buf) {
348            Poll::Ready(Ok(bytes_written)) => {
349                this.size.fetch_add(bytes_written, Ordering::Relaxed);
350                Poll::Ready(Ok(bytes_written))
351            }
352            other => other,
353        }
354    }
355
356    fn poll_flush(
357        mut self: Pin<&mut Self>,
358        cx: &mut Context<'_>,
359    ) -> Poll<std::result::Result<(), std::io::Error>> {
360        Pin::new(&mut self.inner).poll_flush(cx)
361    }
362
363    fn poll_shutdown(
364        mut self: Pin<&mut Self>,
365        cx: &mut Context<'_>,
366    ) -> Poll<std::result::Result<(), std::io::Error>> {
367        Pin::new(&mut self.inner).poll_shutdown(cx)
368    }
369}