mito2/sst/parquet/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet writer.
16
17use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::task::{Context, Poll};
23
24use common_telemetry::debug;
25use common_time::Timestamp;
26use datatypes::arrow::datatypes::SchemaRef;
27use object_store::{FuturesAsyncWriter, ObjectStore};
28use parquet::arrow::AsyncArrowWriter;
29use parquet::basic::{Compression, Encoding, ZstdLevel};
30use parquet::file::metadata::KeyValue;
31use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
32use parquet::schema::types::ColumnPath;
33use smallvec::smallvec;
34use snafu::ResultExt;
35use store_api::metadata::RegionMetadataRef;
36use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
37use store_api::storage::SequenceNumber;
38use tokio::io::AsyncWrite;
39use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
40
41use crate::access_layer::{FilePathProvider, SstInfoArray, TempFileCleaner};
42use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
43use crate::read::{Batch, Source};
44use crate::sst::file::FileId;
45use crate::sst::index::{Indexer, IndexerBuilder};
46use crate::sst::parquet::format::WriteFormat;
47use crate::sst::parquet::helper::parse_parquet_metadata;
48use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
49use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
50
51/// Parquet SST writer.
52pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
53    /// Path provider that creates SST and index file paths according to file id.
54    path_provider: P,
55    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
56    /// Current active file id.
57    current_file: FileId,
58    writer_factory: F,
59    /// Region metadata of the source and the target SST.
60    metadata: RegionMetadataRef,
61    /// Indexer build that can create indexer for multiple files.
62    indexer_builder: I,
63    /// Current active indexer.
64    current_indexer: Option<Indexer>,
65    bytes_written: Arc<AtomicUsize>,
66    /// Cleaner to remove temp files on failure.
67    file_cleaner: Option<TempFileCleaner>,
68}
69
70pub trait WriterFactory {
71    type Writer: AsyncWrite + Send + Unpin;
72    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
73}
74
75pub struct ObjectStoreWriterFactory {
76    object_store: ObjectStore,
77}
78
79impl WriterFactory for ObjectStoreWriterFactory {
80    type Writer = Compat<FuturesAsyncWriter>;
81
82    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
83        self.object_store
84            .writer_with(file_path)
85            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
86            .concurrent(DEFAULT_WRITE_CONCURRENCY)
87            .await
88            .map(|v| v.into_futures_async_write().compat_write())
89            .context(OpenDalSnafu)
90    }
91}
92
93impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
94where
95    P: FilePathProvider,
96    I: IndexerBuilder,
97{
98    pub async fn new_with_object_store(
99        object_store: ObjectStore,
100        metadata: RegionMetadataRef,
101        indexer_builder: I,
102        path_provider: P,
103    ) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
104        ParquetWriter::new(
105            ObjectStoreWriterFactory { object_store },
106            metadata,
107            indexer_builder,
108            path_provider,
109        )
110        .await
111    }
112
113    pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
114        self.file_cleaner = Some(cleaner);
115        self
116    }
117}
118
119impl<F, I, P> ParquetWriter<F, I, P>
120where
121    F: WriterFactory,
122    I: IndexerBuilder,
123    P: FilePathProvider,
124{
125    /// Creates a new parquet SST writer.
126    pub async fn new(
127        factory: F,
128        metadata: RegionMetadataRef,
129        indexer_builder: I,
130        path_provider: P,
131    ) -> ParquetWriter<F, I, P> {
132        let init_file = FileId::random();
133        let indexer = indexer_builder.build(init_file).await;
134
135        ParquetWriter {
136            path_provider,
137            writer: None,
138            current_file: init_file,
139            writer_factory: factory,
140            metadata,
141            indexer_builder,
142            current_indexer: Some(indexer),
143            bytes_written: Arc::new(AtomicUsize::new(0)),
144            file_cleaner: None,
145        }
146    }
147
148    /// Finishes current SST file and index file.
149    async fn finish_current_file(
150        &mut self,
151        ssts: &mut SstInfoArray,
152        stats: &mut SourceStats,
153    ) -> Result<()> {
154        // maybe_init_writer will re-create a new file.
155        if let Some(mut current_writer) = mem::take(&mut self.writer) {
156            let stats = mem::take(stats);
157            // At least one row has been written.
158            assert!(stats.num_rows > 0);
159
160            debug!(
161                "Finishing current file {}, file size: {}, num rows: {}",
162                self.current_file,
163                self.bytes_written.load(Ordering::Relaxed),
164                stats.num_rows
165            );
166
167            // Finish indexer and writer.
168            // safety: writer and index can only be both present or not.
169            let index_output = self.current_indexer.as_mut().unwrap().finish().await;
170            current_writer.flush().await.context(WriteParquetSnafu)?;
171
172            let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
173            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
174
175            // Safety: num rows > 0 so we must have min/max.
176            let time_range = stats.time_range.unwrap();
177
178            // convert FileMetaData to ParquetMetaData
179            let parquet_metadata = parse_parquet_metadata(file_meta)?;
180            ssts.push(SstInfo {
181                file_id: self.current_file,
182                time_range,
183                file_size,
184                num_rows: stats.num_rows,
185                num_row_groups: parquet_metadata.num_row_groups() as u64,
186                file_metadata: Some(Arc::new(parquet_metadata)),
187                index_metadata: index_output,
188            });
189            self.current_file = FileId::random();
190            self.bytes_written.store(0, Ordering::Relaxed)
191        };
192
193        Ok(())
194    }
195
196    /// Iterates source and writes all rows to Parquet file.
197    ///
198    /// Returns the [SstInfo] if the SST is written.
199    pub async fn write_all(
200        &mut self,
201        source: Source,
202        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
203        opts: &WriteOptions,
204    ) -> Result<SstInfoArray> {
205        let res = self
206            .write_all_without_cleaning(source, override_sequence, opts)
207            .await;
208        if res.is_err() {
209            // Clean tmp files explicitly on failure.
210            let file_id = self.current_file;
211            if let Some(cleaner) = &self.file_cleaner {
212                cleaner.clean_by_file_id(file_id).await;
213            }
214        }
215        res
216    }
217
218    async fn write_all_without_cleaning(
219        &mut self,
220        mut source: Source,
221        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
222        opts: &WriteOptions,
223    ) -> Result<SstInfoArray> {
224        let mut results = smallvec![];
225        let write_format =
226            WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
227        let mut stats = SourceStats::default();
228
229        while let Some(res) = self
230            .write_next_batch(&mut source, &write_format, opts)
231            .await
232            .transpose()
233        {
234            match res {
235                Ok(mut batch) => {
236                    stats.update(&batch);
237                    // safety: self.current_indexer must be set when first batch has been written.
238                    self.current_indexer
239                        .as_mut()
240                        .unwrap()
241                        .update(&mut batch)
242                        .await;
243                    if let Some(max_file_size) = opts.max_file_size
244                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
245                    {
246                        self.finish_current_file(&mut results, &mut stats).await?;
247                    }
248                }
249                Err(e) => {
250                    if let Some(indexer) = &mut self.current_indexer {
251                        indexer.abort().await;
252                    }
253                    return Err(e);
254                }
255            }
256        }
257
258        self.finish_current_file(&mut results, &mut stats).await?;
259
260        // object_store.write will make sure all bytes are written or an error is raised.
261        Ok(results)
262    }
263
264    /// Customizes per-column config according to schema and maybe column cardinality.
265    fn customize_column_config(
266        builder: WriterPropertiesBuilder,
267        region_metadata: &RegionMetadataRef,
268    ) -> WriterPropertiesBuilder {
269        let ts_col = ColumnPath::new(vec![region_metadata
270            .time_index_column()
271            .column_schema
272            .name
273            .clone()]);
274        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
275
276        builder
277            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
278            .set_column_dictionary_enabled(seq_col, false)
279            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
280            .set_column_dictionary_enabled(ts_col, false)
281    }
282
283    async fn write_next_batch(
284        &mut self,
285        source: &mut Source,
286        write_format: &WriteFormat,
287        opts: &WriteOptions,
288    ) -> Result<Option<Batch>> {
289        let Some(batch) = source.next_batch().await? else {
290            return Ok(None);
291        };
292
293        let arrow_batch = write_format.convert_batch(&batch)?;
294        self.maybe_init_writer(write_format.arrow_schema(), opts)
295            .await?
296            .write(&arrow_batch)
297            .await
298            .context(WriteParquetSnafu)?;
299        Ok(Some(batch))
300    }
301
302    async fn maybe_init_writer(
303        &mut self,
304        schema: &SchemaRef,
305        opts: &WriteOptions,
306    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
307        if let Some(ref mut w) = self.writer {
308            Ok(w)
309        } else {
310            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
311            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
312
313            // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
314            let props_builder = WriterProperties::builder()
315                .set_key_value_metadata(Some(vec![key_value_meta]))
316                .set_compression(Compression::ZSTD(ZstdLevel::default()))
317                .set_encoding(Encoding::PLAIN)
318                .set_max_row_group_size(opts.row_group_size);
319
320            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
321            let writer_props = props_builder.build();
322
323            let sst_file_path = self.path_provider.build_sst_file_path(self.current_file);
324            let writer = SizeAwareWriter::new(
325                self.writer_factory.create(&sst_file_path).await?,
326                self.bytes_written.clone(),
327            );
328            let arrow_writer =
329                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
330                    .context(WriteParquetSnafu)?;
331            self.writer = Some(arrow_writer);
332
333            let indexer = self.indexer_builder.build(self.current_file).await;
334            self.current_indexer = Some(indexer);
335
336            // safety: self.writer is assigned above
337            Ok(self.writer.as_mut().unwrap())
338        }
339    }
340}
341
342#[derive(Default)]
343struct SourceStats {
344    /// Number of rows fetched.
345    num_rows: usize,
346    /// Time range of fetched batches.
347    time_range: Option<(Timestamp, Timestamp)>,
348}
349
350impl SourceStats {
351    fn update(&mut self, batch: &Batch) {
352        if batch.is_empty() {
353            return;
354        }
355
356        self.num_rows += batch.num_rows();
357        // Safety: batch is not empty.
358        let (min_in_batch, max_in_batch) = (
359            batch.first_timestamp().unwrap(),
360            batch.last_timestamp().unwrap(),
361        );
362        if let Some(time_range) = &mut self.time_range {
363            time_range.0 = time_range.0.min(min_in_batch);
364            time_range.1 = time_range.1.max(max_in_batch);
365        } else {
366            self.time_range = Some((min_in_batch, max_in_batch));
367        }
368    }
369}
370
371/// Workaround for [AsyncArrowWriter] does not provide a method to
372/// get total bytes written after close.
373struct SizeAwareWriter<W> {
374    inner: W,
375    size: Arc<AtomicUsize>,
376}
377
378impl<W> SizeAwareWriter<W> {
379    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
380        Self {
381            inner,
382            size: size.clone(),
383        }
384    }
385}
386
387impl<W> AsyncWrite for SizeAwareWriter<W>
388where
389    W: AsyncWrite + Unpin,
390{
391    fn poll_write(
392        mut self: Pin<&mut Self>,
393        cx: &mut Context<'_>,
394        buf: &[u8],
395    ) -> Poll<std::result::Result<usize, std::io::Error>> {
396        let this = self.as_mut().get_mut();
397
398        match Pin::new(&mut this.inner).poll_write(cx, buf) {
399            Poll::Ready(Ok(bytes_written)) => {
400                this.size.fetch_add(bytes_written, Ordering::Relaxed);
401                Poll::Ready(Ok(bytes_written))
402            }
403            other => other,
404        }
405    }
406
407    fn poll_flush(
408        mut self: Pin<&mut Self>,
409        cx: &mut Context<'_>,
410    ) -> Poll<std::result::Result<(), std::io::Error>> {
411        Pin::new(&mut self.inner).poll_flush(cx)
412    }
413
414    fn poll_shutdown(
415        mut self: Pin<&mut Self>,
416        cx: &mut Context<'_>,
417    ) -> Poll<std::result::Result<(), std::io::Error>> {
418        Pin::new(&mut self.inner).poll_shutdown(cx)
419    }
420}