Skip to main content

mito2/sst/parquet/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet writer.
16
17use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28    ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29    TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
44use store_api::storage::{FileId, SequenceNumber};
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::config::{IndexBuildMode, IndexConfig};
50use crate::error::{
51    InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
52};
53use crate::read::FlatSource;
54use crate::sst::file::RegionFileId;
55use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
56use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
57use crate::sst::parquet::format::PrimaryKeyWriteFormat;
58use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
59use crate::sst::{
60    DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
61};
62
63/// Converts a flat RecordBatch for writing to parquet.
64enum FlatBatchConverter {
65    /// Write as-is in flat format.
66    Flat(FlatWriteFormat),
67    /// Convert flat batch to primary-key format by stripping tag columns.
68    PrimaryKey {
69        format: PrimaryKeyWriteFormat,
70        num_fields: usize,
71    },
72}
73
74impl FlatBatchConverter {
75    fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
76        match self {
77            FlatBatchConverter::Flat(f) => f.convert_batch(batch),
78            FlatBatchConverter::PrimaryKey { format, num_fields } => {
79                format.convert_flat_batch(batch, *num_fields)
80            }
81        }
82    }
83}
84
85/// Parquet SST writer.
86pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
87    /// Path provider that creates SST and index file paths according to file id.
88    path_provider: P,
89    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
90    /// Current active file id.
91    current_file: FileId,
92    writer_factory: F,
93    /// Region metadata of the source and the target SST.
94    metadata: RegionMetadataRef,
95    /// Global index config.
96    index_config: IndexConfig,
97    /// Indexer build that can create indexer for multiple files.
98    indexer_builder: I,
99    /// Current active indexer.
100    current_indexer: Option<Indexer>,
101    bytes_written: Arc<AtomicUsize>,
102    /// Cleaner to remove temp files on failure.
103    file_cleaner: Option<TempFileCleaner>,
104    /// Write metrics
105    metrics: &'a mut Metrics,
106}
107
108pub trait WriterFactory {
109    type Writer: AsyncWrite + Send + Unpin;
110    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
111}
112
113pub struct ObjectStoreWriterFactory {
114    object_store: ObjectStore,
115}
116
117impl WriterFactory for ObjectStoreWriterFactory {
118    type Writer = Compat<FuturesAsyncWriter>;
119
120    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
121        self.object_store
122            .writer_with(file_path)
123            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
124            .concurrent(DEFAULT_WRITE_CONCURRENCY)
125            .await
126            .map(|v| v.into_futures_async_write().compat_write())
127            .context(OpenDalSnafu)
128    }
129}
130
131impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
132where
133    P: FilePathProvider,
134    I: IndexerBuilder,
135{
136    pub async fn new_with_object_store(
137        object_store: ObjectStore,
138        metadata: RegionMetadataRef,
139        index_config: IndexConfig,
140        indexer_builder: I,
141        path_provider: P,
142        metrics: &'a mut Metrics,
143    ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
144        ParquetWriter::new(
145            ObjectStoreWriterFactory { object_store },
146            metadata,
147            index_config,
148            indexer_builder,
149            path_provider,
150            metrics,
151        )
152        .await
153    }
154
155    pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
156        self.file_cleaner = Some(cleaner);
157        self
158    }
159}
160
161impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
162where
163    F: WriterFactory,
164    I: IndexerBuilder,
165    P: FilePathProvider,
166{
167    /// Creates a new parquet SST writer.
168    pub async fn new(
169        factory: F,
170        metadata: RegionMetadataRef,
171        index_config: IndexConfig,
172        indexer_builder: I,
173        path_provider: P,
174        metrics: &'a mut Metrics,
175    ) -> ParquetWriter<'a, F, I, P> {
176        let init_file = FileId::random();
177        let indexer = indexer_builder.build(init_file, 0).await;
178
179        ParquetWriter {
180            path_provider,
181            writer: None,
182            current_file: init_file,
183            writer_factory: factory,
184            metadata,
185            index_config,
186            indexer_builder,
187            current_indexer: Some(indexer),
188            bytes_written: Arc::new(AtomicUsize::new(0)),
189            file_cleaner: None,
190            metrics,
191        }
192    }
193
194    /// Finishes current SST file and index file.
195    async fn finish_current_file(
196        &mut self,
197        ssts: &mut SstInfoArray,
198        stats: &mut SourceStats,
199    ) -> Result<()> {
200        // maybe_init_writer will re-create a new file.
201        if let Some(mut current_writer) = mem::take(&mut self.writer) {
202            let mut stats = mem::take(stats);
203            // At least one row has been written.
204            assert!(stats.num_rows > 0);
205
206            debug!(
207                "Finishing current file {}, file size: {}, num rows: {}",
208                self.current_file,
209                self.bytes_written.load(Ordering::Relaxed),
210                stats.num_rows
211            );
212
213            // Finish indexer and writer.
214            // safety: writer and index can only be both present or not.
215            let mut index_output = IndexOutput::default();
216            match self.index_config.build_mode {
217                IndexBuildMode::Sync => {
218                    index_output = self.current_indexer.as_mut().unwrap().finish().await;
219                }
220                IndexBuildMode::Async => {
221                    debug!(
222                        "Index for file {} will be built asynchronously later",
223                        self.current_file
224                    );
225                }
226            }
227            current_writer.flush().await.context(WriteParquetSnafu)?;
228
229            let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
230            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
231
232            // Safety: num rows > 0 so we must have min/max.
233            let time_range = stats.time_range.unwrap();
234
235            let max_row_group_uncompressed_size: u64 = parquet_metadata
236                .row_groups()
237                .iter()
238                .map(|rg| {
239                    rg.columns()
240                        .iter()
241                        .map(|c| c.uncompressed_size() as u64)
242                        .sum::<u64>()
243                })
244                .max()
245                .unwrap_or(0);
246            let num_series = stats.series_estimator.finish();
247            ssts.push(SstInfo {
248                file_id: self.current_file,
249                time_range,
250                file_size,
251                max_row_group_uncompressed_size,
252                num_rows: stats.num_rows,
253                num_row_groups: parquet_metadata.num_row_groups() as u64,
254                file_metadata: Some(Arc::new(parquet_metadata)),
255                index_metadata: index_output,
256                num_series,
257            });
258            self.current_file = FileId::random();
259            self.bytes_written.store(0, Ordering::Relaxed)
260        };
261
262        Ok(())
263    }
264
265    /// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file.
266    ///
267    /// Returns the [SstInfo] if the SST is written.
268    pub async fn write_all_flat(
269        &mut self,
270        source: FlatSource,
271        override_sequence: Option<SequenceNumber>,
272        opts: &WriteOptions,
273    ) -> Result<SstInfoArray> {
274        let converter = FlatBatchConverter::Flat(
275            FlatWriteFormat::new(
276                self.metadata.clone(),
277                &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
278            )
279            .with_override_sequence(override_sequence),
280        );
281        let res = self.write_all_flat_inner(source, &converter, opts).await;
282        if res.is_err() {
283            let file_id = self.current_file;
284            if let Some(cleaner) = &self.file_cleaner {
285                cleaner.clean_by_file_id(file_id).await;
286            }
287        }
288        res
289    }
290
291    /// Iterates FlatSource and writes all RecordBatch in primary-key format to Parquet file.
292    ///
293    /// Returns the [SstInfo] if the SST is written.
294    pub async fn write_all_flat_as_primary_key(
295        &mut self,
296        source: FlatSource,
297        override_sequence: Option<SequenceNumber>,
298        opts: &WriteOptions,
299    ) -> Result<SstInfoArray> {
300        let num_fields = self.metadata.field_columns().count();
301        let converter = FlatBatchConverter::PrimaryKey {
302            format: PrimaryKeyWriteFormat::new(self.metadata.clone())
303                .with_override_sequence(override_sequence),
304            num_fields,
305        };
306        let res = self.write_all_flat_inner(source, &converter, opts).await;
307        if res.is_err() {
308            let file_id = self.current_file;
309            if let Some(cleaner) = &self.file_cleaner {
310                cleaner.clean_by_file_id(file_id).await;
311            }
312        }
313        res
314    }
315
316    async fn write_all_flat_inner(
317        &mut self,
318        mut source: FlatSource,
319        converter: &FlatBatchConverter,
320        opts: &WriteOptions,
321    ) -> Result<SstInfoArray> {
322        let mut results = smallvec![];
323        let mut stats = SourceStats::default();
324
325        while let Some(record_batch) = self
326            .write_next_flat_batch(&mut source, converter, opts)
327            .await
328            .transpose()
329        {
330            match record_batch {
331                Ok(batch) => {
332                    stats.update_flat(&batch)?;
333                    if matches!(self.index_config.build_mode, IndexBuildMode::Sync) {
334                        let start = Instant::now();
335                        // safety: self.current_indexer must be set when first batch has been written.
336                        self.current_indexer
337                            .as_mut()
338                            .unwrap()
339                            .update_flat(&batch)
340                            .await;
341                        self.metrics.update_index += start.elapsed();
342                    }
343                    if let Some(max_file_size) = opts.max_file_size
344                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
345                    {
346                        self.finish_current_file(&mut results, &mut stats).await?;
347                    }
348                }
349                Err(e) => {
350                    if let Some(indexer) = &mut self.current_indexer {
351                        indexer.abort().await;
352                    }
353                    return Err(e);
354                }
355            }
356        }
357
358        self.finish_current_file(&mut results, &mut stats).await?;
359
360        // object_store.write will make sure all bytes are written or an error is raised.
361        Ok(results)
362    }
363
364    /// Customizes per-column config according to schema and maybe column cardinality.
365    fn customize_column_config(
366        builder: WriterPropertiesBuilder,
367        region_metadata: &RegionMetadataRef,
368    ) -> WriterPropertiesBuilder {
369        let ts_col = ColumnPath::new(vec![
370            region_metadata
371                .time_index_column()
372                .column_schema
373                .name
374                .clone(),
375        ]);
376        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
377        let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
378
379        builder
380            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
381            .set_column_dictionary_enabled(seq_col, false)
382            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
383            .set_column_dictionary_enabled(ts_col, false)
384            .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
385    }
386
387    async fn write_next_flat_batch(
388        &mut self,
389        source: &mut FlatSource,
390        converter: &FlatBatchConverter,
391        opts: &WriteOptions,
392    ) -> Result<Option<RecordBatch>> {
393        let start = Instant::now();
394        let Some(record_batch) = source.next_batch().await? else {
395            return Ok(None);
396        };
397        self.metrics.iter_source += start.elapsed();
398
399        let arrow_batch = converter.convert_batch(&record_batch)?;
400
401        let start = Instant::now();
402        self.maybe_init_writer(arrow_batch.schema_ref(), opts)
403            .await?
404            .write(&arrow_batch)
405            .await
406            .context(WriteParquetSnafu)?;
407        self.metrics.write_batch += start.elapsed();
408        // Return original flat batch for stats/indexer which use flat layout.
409        Ok(Some(record_batch))
410    }
411
412    async fn maybe_init_writer(
413        &mut self,
414        schema: &SchemaRef,
415        opts: &WriteOptions,
416    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
417        if let Some(ref mut w) = self.writer {
418            Ok(w)
419        } else {
420            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
421            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
422
423            // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
424            let props_builder = WriterProperties::builder()
425                .set_key_value_metadata(Some(vec![key_value_meta]))
426                .set_compression(Compression::ZSTD(ZstdLevel::default()))
427                .set_encoding(Encoding::PLAIN)
428                .set_max_row_group_size(opts.row_group_size)
429                .set_column_index_truncate_length(None)
430                .set_statistics_truncate_length(None);
431
432            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
433            let writer_props = props_builder.build();
434
435            let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
436                self.metadata.region_id,
437                self.current_file,
438            ));
439            let writer = SizeAwareWriter::new(
440                self.writer_factory.create(&sst_file_path).await?,
441                self.bytes_written.clone(),
442            );
443            let arrow_writer =
444                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
445                    .context(WriteParquetSnafu)?;
446            self.writer = Some(arrow_writer);
447
448            let indexer = self.indexer_builder.build(self.current_file, 0).await;
449            self.current_indexer = Some(indexer);
450
451            // safety: self.writer is assigned above
452            Ok(self.writer.as_mut().unwrap())
453        }
454    }
455}
456
457#[derive(Default)]
458struct SourceStats {
459    /// Number of rows fetched.
460    num_rows: usize,
461    /// Time range of fetched batches.
462    time_range: Option<(Timestamp, Timestamp)>,
463    /// Series estimator for computing num_series.
464    series_estimator: SeriesEstimator,
465}
466
467impl SourceStats {
468    fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
469        if record_batch.num_rows() == 0 {
470            return Ok(());
471        }
472
473        self.num_rows += record_batch.num_rows();
474        self.series_estimator.update_flat(record_batch);
475
476        // Get the timestamp column by index
477        let time_index_col_idx = time_index_column_index(record_batch.num_columns());
478        let timestamp_array = record_batch.column(time_index_col_idx);
479
480        if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
481            if let Some(time_range) = &mut self.time_range {
482                time_range.0 = time_range.0.min(min_in_batch);
483                time_range.1 = time_range.1.max(max_in_batch);
484            } else {
485                self.time_range = Some((min_in_batch, max_in_batch));
486            }
487        }
488
489        Ok(())
490    }
491}
492
493/// Gets min and max timestamp from an timestamp array.
494fn timestamp_range_from_array(
495    timestamp_array: &ArrayRef,
496) -> Result<Option<(Timestamp, Timestamp)>> {
497    let (min_ts, max_ts) = match timestamp_array.data_type() {
498        DataType::Timestamp(TimeUnit::Second, _) => {
499            let array = timestamp_array
500                .as_any()
501                .downcast_ref::<TimestampSecondArray>()
502                .unwrap();
503            let min_val = min(array).map(Timestamp::new_second);
504            let max_val = max(array).map(Timestamp::new_second);
505            (min_val, max_val)
506        }
507        DataType::Timestamp(TimeUnit::Millisecond, _) => {
508            let array = timestamp_array
509                .as_any()
510                .downcast_ref::<TimestampMillisecondArray>()
511                .unwrap();
512            let min_val = min(array).map(Timestamp::new_millisecond);
513            let max_val = max(array).map(Timestamp::new_millisecond);
514            (min_val, max_val)
515        }
516        DataType::Timestamp(TimeUnit::Microsecond, _) => {
517            let array = timestamp_array
518                .as_any()
519                .downcast_ref::<TimestampMicrosecondArray>()
520                .unwrap();
521            let min_val = min(array).map(Timestamp::new_microsecond);
522            let max_val = max(array).map(Timestamp::new_microsecond);
523            (min_val, max_val)
524        }
525        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
526            let array = timestamp_array
527                .as_any()
528                .downcast_ref::<TimestampNanosecondArray>()
529                .unwrap();
530            let min_val = min(array).map(Timestamp::new_nanosecond);
531            let max_val = max(array).map(Timestamp::new_nanosecond);
532            (min_val, max_val)
533        }
534        _ => {
535            return UnexpectedSnafu {
536                reason: format!(
537                    "Unexpected data type of time index: {:?}",
538                    timestamp_array.data_type()
539                ),
540            }
541            .fail();
542        }
543    };
544
545    // If min timestamp exists, max timestamp should also exist.
546    Ok(min_ts.zip(max_ts))
547}
548
549/// Workaround for [AsyncArrowWriter] does not provide a method to
550/// get total bytes written after close.
551struct SizeAwareWriter<W> {
552    inner: W,
553    size: Arc<AtomicUsize>,
554}
555
556impl<W> SizeAwareWriter<W> {
557    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
558        Self {
559            inner,
560            size: size.clone(),
561        }
562    }
563}
564
565impl<W> AsyncWrite for SizeAwareWriter<W>
566where
567    W: AsyncWrite + Unpin,
568{
569    fn poll_write(
570        mut self: Pin<&mut Self>,
571        cx: &mut Context<'_>,
572        buf: &[u8],
573    ) -> Poll<std::result::Result<usize, std::io::Error>> {
574        let this = self.as_mut().get_mut();
575
576        match Pin::new(&mut this.inner).poll_write(cx, buf) {
577            Poll::Ready(Ok(bytes_written)) => {
578                this.size.fetch_add(bytes_written, Ordering::Relaxed);
579                Poll::Ready(Ok(bytes_written))
580            }
581            other => other,
582        }
583    }
584
585    fn poll_flush(
586        mut self: Pin<&mut Self>,
587        cx: &mut Context<'_>,
588    ) -> Poll<std::result::Result<(), std::io::Error>> {
589        Pin::new(&mut self.inner).poll_flush(cx)
590    }
591
592    fn poll_shutdown(
593        mut self: Pin<&mut Self>,
594        cx: &mut Context<'_>,
595    ) -> Poll<std::result::Result<(), std::io::Error>> {
596        Pin::new(&mut self.inner).poll_shutdown(cx)
597    }
598}