mito2/sst/parquet/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet writer.
16
17use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28    ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29    TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
44use store_api::storage::{FileId, SequenceNumber};
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::config::{IndexBuildMode, IndexConfig};
50use crate::error::{
51    InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
52};
53use crate::read::{Batch, FlatSource, Source};
54use crate::sst::file::RegionFileId;
55use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
56use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
57use crate::sst::parquet::format::PrimaryKeyWriteFormat;
58use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
59use crate::sst::{
60    DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
61};
62
63/// Parquet SST writer.
64pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
65    /// Path provider that creates SST and index file paths according to file id.
66    path_provider: P,
67    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
68    /// Current active file id.
69    current_file: FileId,
70    writer_factory: F,
71    /// Region metadata of the source and the target SST.
72    metadata: RegionMetadataRef,
73    /// Global index config.
74    index_config: IndexConfig,
75    /// Indexer build that can create indexer for multiple files.
76    indexer_builder: I,
77    /// Current active indexer.
78    current_indexer: Option<Indexer>,
79    bytes_written: Arc<AtomicUsize>,
80    /// Cleaner to remove temp files on failure.
81    file_cleaner: Option<TempFileCleaner>,
82    /// Write metrics
83    metrics: &'a mut Metrics,
84}
85
86pub trait WriterFactory {
87    type Writer: AsyncWrite + Send + Unpin;
88    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
89}
90
91pub struct ObjectStoreWriterFactory {
92    object_store: ObjectStore,
93}
94
95impl WriterFactory for ObjectStoreWriterFactory {
96    type Writer = Compat<FuturesAsyncWriter>;
97
98    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
99        self.object_store
100            .writer_with(file_path)
101            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
102            .concurrent(DEFAULT_WRITE_CONCURRENCY)
103            .await
104            .map(|v| v.into_futures_async_write().compat_write())
105            .context(OpenDalSnafu)
106    }
107}
108
109impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
110where
111    P: FilePathProvider,
112    I: IndexerBuilder,
113{
114    pub async fn new_with_object_store(
115        object_store: ObjectStore,
116        metadata: RegionMetadataRef,
117        index_config: IndexConfig,
118        indexer_builder: I,
119        path_provider: P,
120        metrics: &'a mut Metrics,
121    ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
122        ParquetWriter::new(
123            ObjectStoreWriterFactory { object_store },
124            metadata,
125            index_config,
126            indexer_builder,
127            path_provider,
128            metrics,
129        )
130        .await
131    }
132
133    pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
134        self.file_cleaner = Some(cleaner);
135        self
136    }
137}
138
139impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
140where
141    F: WriterFactory,
142    I: IndexerBuilder,
143    P: FilePathProvider,
144{
145    /// Creates a new parquet SST writer.
146    pub async fn new(
147        factory: F,
148        metadata: RegionMetadataRef,
149        index_config: IndexConfig,
150        indexer_builder: I,
151        path_provider: P,
152        metrics: &'a mut Metrics,
153    ) -> ParquetWriter<'a, F, I, P> {
154        let init_file = FileId::random();
155        let indexer = indexer_builder.build(init_file, 0).await;
156
157        ParquetWriter {
158            path_provider,
159            writer: None,
160            current_file: init_file,
161            writer_factory: factory,
162            metadata,
163            index_config,
164            indexer_builder,
165            current_indexer: Some(indexer),
166            bytes_written: Arc::new(AtomicUsize::new(0)),
167            file_cleaner: None,
168            metrics,
169        }
170    }
171
172    /// Finishes current SST file and index file.
173    async fn finish_current_file(
174        &mut self,
175        ssts: &mut SstInfoArray,
176        stats: &mut SourceStats,
177    ) -> Result<()> {
178        // maybe_init_writer will re-create a new file.
179        if let Some(mut current_writer) = mem::take(&mut self.writer) {
180            let mut stats = mem::take(stats);
181            // At least one row has been written.
182            assert!(stats.num_rows > 0);
183
184            debug!(
185                "Finishing current file {}, file size: {}, num rows: {}",
186                self.current_file,
187                self.bytes_written.load(Ordering::Relaxed),
188                stats.num_rows
189            );
190
191            // Finish indexer and writer.
192            // safety: writer and index can only be both present or not.
193            let mut index_output = IndexOutput::default();
194            match self.index_config.build_mode {
195                IndexBuildMode::Sync => {
196                    index_output = self.current_indexer.as_mut().unwrap().finish().await;
197                }
198                IndexBuildMode::Async => {
199                    debug!(
200                        "Index for file {} will be built asynchronously later",
201                        self.current_file
202                    );
203                }
204            }
205            current_writer.flush().await.context(WriteParquetSnafu)?;
206
207            let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
208            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
209
210            // Safety: num rows > 0 so we must have min/max.
211            let time_range = stats.time_range.unwrap();
212
213            let max_row_group_uncompressed_size: u64 = parquet_metadata
214                .row_groups()
215                .iter()
216                .map(|rg| {
217                    rg.columns()
218                        .iter()
219                        .map(|c| c.uncompressed_size() as u64)
220                        .sum::<u64>()
221                })
222                .max()
223                .unwrap_or(0);
224            let num_series = stats.series_estimator.finish();
225            ssts.push(SstInfo {
226                file_id: self.current_file,
227                time_range,
228                file_size,
229                max_row_group_uncompressed_size,
230                num_rows: stats.num_rows,
231                num_row_groups: parquet_metadata.num_row_groups() as u64,
232                file_metadata: Some(Arc::new(parquet_metadata)),
233                index_metadata: index_output,
234                num_series,
235            });
236            self.current_file = FileId::random();
237            self.bytes_written.store(0, Ordering::Relaxed)
238        };
239
240        Ok(())
241    }
242
243    /// Iterates source and writes all rows to Parquet file.
244    ///
245    /// Returns the [SstInfo] if the SST is written.
246    pub async fn write_all(
247        &mut self,
248        source: Source,
249        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
250        opts: &WriteOptions,
251    ) -> Result<SstInfoArray> {
252        let res = self
253            .write_all_without_cleaning(source, override_sequence, opts)
254            .await;
255        if res.is_err() {
256            // Clean tmp files explicitly on failure.
257            let file_id = self.current_file;
258            if let Some(cleaner) = &self.file_cleaner {
259                cleaner.clean_by_file_id(file_id).await;
260            }
261        }
262        res
263    }
264
265    async fn write_all_without_cleaning(
266        &mut self,
267        mut source: Source,
268        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
269        opts: &WriteOptions,
270    ) -> Result<SstInfoArray> {
271        let mut results = smallvec![];
272        let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone())
273            .with_override_sequence(override_sequence);
274        let mut stats = SourceStats::default();
275
276        while let Some(res) = self
277            .write_next_batch(&mut source, &write_format, opts)
278            .await
279            .transpose()
280        {
281            match res {
282                Ok(mut batch) => {
283                    stats.update(&batch);
284                    let start = Instant::now();
285                    // safety: self.current_indexer must be set when first batch has been written.
286                    match self.index_config.build_mode {
287                        IndexBuildMode::Sync => {
288                            self.current_indexer
289                                .as_mut()
290                                .unwrap()
291                                .update(&mut batch)
292                                .await;
293                        }
294                        IndexBuildMode::Async => {}
295                    }
296                    self.metrics.update_index += start.elapsed();
297                    if let Some(max_file_size) = opts.max_file_size
298                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
299                    {
300                        self.finish_current_file(&mut results, &mut stats).await?;
301                    }
302                }
303                Err(e) => {
304                    if let Some(indexer) = &mut self.current_indexer {
305                        indexer.abort().await;
306                    }
307                    return Err(e);
308                }
309            }
310        }
311
312        self.finish_current_file(&mut results, &mut stats).await?;
313
314        // object_store.write will make sure all bytes are written or an error is raised.
315        Ok(results)
316    }
317
318    /// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file.
319    ///
320    /// Returns the [SstInfo] if the SST is written.
321    pub async fn write_all_flat(
322        &mut self,
323        source: FlatSource,
324        opts: &WriteOptions,
325    ) -> Result<SstInfoArray> {
326        let res = self.write_all_flat_without_cleaning(source, opts).await;
327        if res.is_err() {
328            // Clean tmp files explicitly on failure.
329            let file_id = self.current_file;
330            if let Some(cleaner) = &self.file_cleaner {
331                cleaner.clean_by_file_id(file_id).await;
332            }
333        }
334        res
335    }
336
337    async fn write_all_flat_without_cleaning(
338        &mut self,
339        mut source: FlatSource,
340        opts: &WriteOptions,
341    ) -> Result<SstInfoArray> {
342        let mut results = smallvec![];
343        let flat_format = FlatWriteFormat::new(
344            self.metadata.clone(),
345            &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
346        )
347        .with_override_sequence(None);
348        let mut stats = SourceStats::default();
349
350        while let Some(record_batch) = self
351            .write_next_flat_batch(&mut source, &flat_format, opts)
352            .await
353            .transpose()
354        {
355            match record_batch {
356                Ok(batch) => {
357                    stats.update_flat(&batch)?;
358                    let start = Instant::now();
359                    // safety: self.current_indexer must be set when first batch has been written.
360                    self.current_indexer
361                        .as_mut()
362                        .unwrap()
363                        .update_flat(&batch)
364                        .await;
365                    self.metrics.update_index += start.elapsed();
366                    if let Some(max_file_size) = opts.max_file_size
367                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
368                    {
369                        self.finish_current_file(&mut results, &mut stats).await?;
370                    }
371                }
372                Err(e) => {
373                    if let Some(indexer) = &mut self.current_indexer {
374                        indexer.abort().await;
375                    }
376                    return Err(e);
377                }
378            }
379        }
380
381        self.finish_current_file(&mut results, &mut stats).await?;
382
383        // object_store.write will make sure all bytes are written or an error is raised.
384        Ok(results)
385    }
386
387    /// Customizes per-column config according to schema and maybe column cardinality.
388    fn customize_column_config(
389        builder: WriterPropertiesBuilder,
390        region_metadata: &RegionMetadataRef,
391    ) -> WriterPropertiesBuilder {
392        let ts_col = ColumnPath::new(vec![
393            region_metadata
394                .time_index_column()
395                .column_schema
396                .name
397                .clone(),
398        ]);
399        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
400        let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
401
402        builder
403            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
404            .set_column_dictionary_enabled(seq_col, false)
405            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
406            .set_column_dictionary_enabled(ts_col, false)
407            .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
408    }
409
410    async fn write_next_batch(
411        &mut self,
412        source: &mut Source,
413        write_format: &PrimaryKeyWriteFormat,
414        opts: &WriteOptions,
415    ) -> Result<Option<Batch>> {
416        let start = Instant::now();
417        let Some(batch) = source.next_batch().await? else {
418            return Ok(None);
419        };
420        self.metrics.iter_source += start.elapsed();
421
422        let arrow_batch = write_format.convert_batch(&batch)?;
423
424        let start = Instant::now();
425        self.maybe_init_writer(write_format.arrow_schema(), opts)
426            .await?
427            .write(&arrow_batch)
428            .await
429            .context(WriteParquetSnafu)?;
430        self.metrics.write_batch += start.elapsed();
431        Ok(Some(batch))
432    }
433
434    async fn write_next_flat_batch(
435        &mut self,
436        source: &mut FlatSource,
437        flat_format: &FlatWriteFormat,
438        opts: &WriteOptions,
439    ) -> Result<Option<RecordBatch>> {
440        let start = Instant::now();
441        let Some(record_batch) = source.next_batch().await? else {
442            return Ok(None);
443        };
444        self.metrics.iter_source += start.elapsed();
445
446        let arrow_batch = flat_format.convert_batch(&record_batch)?;
447
448        let start = Instant::now();
449        self.maybe_init_writer(flat_format.arrow_schema(), opts)
450            .await?
451            .write(&arrow_batch)
452            .await
453            .context(WriteParquetSnafu)?;
454        self.metrics.write_batch += start.elapsed();
455        Ok(Some(record_batch))
456    }
457
458    async fn maybe_init_writer(
459        &mut self,
460        schema: &SchemaRef,
461        opts: &WriteOptions,
462    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
463        if let Some(ref mut w) = self.writer {
464            Ok(w)
465        } else {
466            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
467            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
468
469            // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
470            let props_builder = WriterProperties::builder()
471                .set_key_value_metadata(Some(vec![key_value_meta]))
472                .set_compression(Compression::ZSTD(ZstdLevel::default()))
473                .set_encoding(Encoding::PLAIN)
474                .set_max_row_group_size(opts.row_group_size)
475                .set_column_index_truncate_length(None)
476                .set_statistics_truncate_length(None);
477
478            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
479            let writer_props = props_builder.build();
480
481            let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
482                self.metadata.region_id,
483                self.current_file,
484            ));
485            let writer = SizeAwareWriter::new(
486                self.writer_factory.create(&sst_file_path).await?,
487                self.bytes_written.clone(),
488            );
489            let arrow_writer =
490                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
491                    .context(WriteParquetSnafu)?;
492            self.writer = Some(arrow_writer);
493
494            let indexer = self.indexer_builder.build(self.current_file, 0).await;
495            self.current_indexer = Some(indexer);
496
497            // safety: self.writer is assigned above
498            Ok(self.writer.as_mut().unwrap())
499        }
500    }
501}
502
503#[derive(Default)]
504struct SourceStats {
505    /// Number of rows fetched.
506    num_rows: usize,
507    /// Time range of fetched batches.
508    time_range: Option<(Timestamp, Timestamp)>,
509    /// Series estimator for computing num_series.
510    series_estimator: SeriesEstimator,
511}
512
513impl SourceStats {
514    fn update(&mut self, batch: &Batch) {
515        if batch.is_empty() {
516            return;
517        }
518
519        self.num_rows += batch.num_rows();
520        self.series_estimator.update(batch);
521        // Safety: batch is not empty.
522        let (min_in_batch, max_in_batch) = (
523            batch.first_timestamp().unwrap(),
524            batch.last_timestamp().unwrap(),
525        );
526        if let Some(time_range) = &mut self.time_range {
527            time_range.0 = time_range.0.min(min_in_batch);
528            time_range.1 = time_range.1.max(max_in_batch);
529        } else {
530            self.time_range = Some((min_in_batch, max_in_batch));
531        }
532    }
533
534    fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
535        if record_batch.num_rows() == 0 {
536            return Ok(());
537        }
538
539        self.num_rows += record_batch.num_rows();
540        self.series_estimator.update_flat(record_batch);
541
542        // Get the timestamp column by index
543        let time_index_col_idx = time_index_column_index(record_batch.num_columns());
544        let timestamp_array = record_batch.column(time_index_col_idx);
545
546        if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
547            if let Some(time_range) = &mut self.time_range {
548                time_range.0 = time_range.0.min(min_in_batch);
549                time_range.1 = time_range.1.max(max_in_batch);
550            } else {
551                self.time_range = Some((min_in_batch, max_in_batch));
552            }
553        }
554
555        Ok(())
556    }
557}
558
559/// Gets min and max timestamp from an timestamp array.
560fn timestamp_range_from_array(
561    timestamp_array: &ArrayRef,
562) -> Result<Option<(Timestamp, Timestamp)>> {
563    let (min_ts, max_ts) = match timestamp_array.data_type() {
564        DataType::Timestamp(TimeUnit::Second, _) => {
565            let array = timestamp_array
566                .as_any()
567                .downcast_ref::<TimestampSecondArray>()
568                .unwrap();
569            let min_val = min(array).map(Timestamp::new_second);
570            let max_val = max(array).map(Timestamp::new_second);
571            (min_val, max_val)
572        }
573        DataType::Timestamp(TimeUnit::Millisecond, _) => {
574            let array = timestamp_array
575                .as_any()
576                .downcast_ref::<TimestampMillisecondArray>()
577                .unwrap();
578            let min_val = min(array).map(Timestamp::new_millisecond);
579            let max_val = max(array).map(Timestamp::new_millisecond);
580            (min_val, max_val)
581        }
582        DataType::Timestamp(TimeUnit::Microsecond, _) => {
583            let array = timestamp_array
584                .as_any()
585                .downcast_ref::<TimestampMicrosecondArray>()
586                .unwrap();
587            let min_val = min(array).map(Timestamp::new_microsecond);
588            let max_val = max(array).map(Timestamp::new_microsecond);
589            (min_val, max_val)
590        }
591        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
592            let array = timestamp_array
593                .as_any()
594                .downcast_ref::<TimestampNanosecondArray>()
595                .unwrap();
596            let min_val = min(array).map(Timestamp::new_nanosecond);
597            let max_val = max(array).map(Timestamp::new_nanosecond);
598            (min_val, max_val)
599        }
600        _ => {
601            return UnexpectedSnafu {
602                reason: format!(
603                    "Unexpected data type of time index: {:?}",
604                    timestamp_array.data_type()
605                ),
606            }
607            .fail();
608        }
609    };
610
611    // If min timestamp exists, max timestamp should also exist.
612    Ok(min_ts.zip(max_ts))
613}
614
615/// Workaround for [AsyncArrowWriter] does not provide a method to
616/// get total bytes written after close.
617struct SizeAwareWriter<W> {
618    inner: W,
619    size: Arc<AtomicUsize>,
620}
621
622impl<W> SizeAwareWriter<W> {
623    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
624        Self {
625            inner,
626            size: size.clone(),
627        }
628    }
629}
630
631impl<W> AsyncWrite for SizeAwareWriter<W>
632where
633    W: AsyncWrite + Unpin,
634{
635    fn poll_write(
636        mut self: Pin<&mut Self>,
637        cx: &mut Context<'_>,
638        buf: &[u8],
639    ) -> Poll<std::result::Result<usize, std::io::Error>> {
640        let this = self.as_mut().get_mut();
641
642        match Pin::new(&mut this.inner).poll_write(cx, buf) {
643            Poll::Ready(Ok(bytes_written)) => {
644                this.size.fetch_add(bytes_written, Ordering::Relaxed);
645                Poll::Ready(Ok(bytes_written))
646            }
647            other => other,
648        }
649    }
650
651    fn poll_flush(
652        mut self: Pin<&mut Self>,
653        cx: &mut Context<'_>,
654    ) -> Poll<std::result::Result<(), std::io::Error>> {
655        Pin::new(&mut self.inner).poll_flush(cx)
656    }
657
658    fn poll_shutdown(
659        mut self: Pin<&mut Self>,
660        cx: &mut Context<'_>,
661    ) -> Poll<std::result::Result<(), std::io::Error>> {
662        Pin::new(&mut self.inner).poll_shutdown(cx)
663    }
664}