mito2/sst/parquet/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet writer.
16
17use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28    ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29    TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
44use store_api::storage::{FileId, SequenceNumber};
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::config::{IndexBuildMode, IndexConfig};
50use crate::error::{
51    InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
52};
53use crate::read::FlatSource;
54use crate::sst::file::RegionFileId;
55use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
56use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
57use crate::sst::parquet::format::PrimaryKeyWriteFormat;
58use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
59use crate::sst::{
60    DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
61};
62
63/// Converts a flat RecordBatch for writing to parquet.
64enum FlatBatchConverter {
65    /// Write as-is in flat format.
66    Flat(FlatWriteFormat),
67    /// Convert flat batch to primary-key format by stripping tag columns.
68    PrimaryKey {
69        format: PrimaryKeyWriteFormat,
70        num_fields: usize,
71    },
72}
73
74impl FlatBatchConverter {
75    fn arrow_schema(&self) -> &SchemaRef {
76        match self {
77            FlatBatchConverter::Flat(f) => f.arrow_schema(),
78            FlatBatchConverter::PrimaryKey { format, .. } => format.arrow_schema(),
79        }
80    }
81
82    fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
83        match self {
84            FlatBatchConverter::Flat(f) => f.convert_batch(batch),
85            FlatBatchConverter::PrimaryKey { format, num_fields } => {
86                format.convert_flat_batch(batch, *num_fields)
87            }
88        }
89    }
90}
91
92/// Parquet SST writer.
93pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
94    /// Path provider that creates SST and index file paths according to file id.
95    path_provider: P,
96    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
97    /// Current active file id.
98    current_file: FileId,
99    writer_factory: F,
100    /// Region metadata of the source and the target SST.
101    metadata: RegionMetadataRef,
102    /// Global index config.
103    index_config: IndexConfig,
104    /// Indexer build that can create indexer for multiple files.
105    indexer_builder: I,
106    /// Current active indexer.
107    current_indexer: Option<Indexer>,
108    bytes_written: Arc<AtomicUsize>,
109    /// Cleaner to remove temp files on failure.
110    file_cleaner: Option<TempFileCleaner>,
111    /// Write metrics
112    metrics: &'a mut Metrics,
113}
114
115pub trait WriterFactory {
116    type Writer: AsyncWrite + Send + Unpin;
117    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
118}
119
120pub struct ObjectStoreWriterFactory {
121    object_store: ObjectStore,
122}
123
124impl WriterFactory for ObjectStoreWriterFactory {
125    type Writer = Compat<FuturesAsyncWriter>;
126
127    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
128        self.object_store
129            .writer_with(file_path)
130            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
131            .concurrent(DEFAULT_WRITE_CONCURRENCY)
132            .await
133            .map(|v| v.into_futures_async_write().compat_write())
134            .context(OpenDalSnafu)
135    }
136}
137
138impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
139where
140    P: FilePathProvider,
141    I: IndexerBuilder,
142{
143    pub async fn new_with_object_store(
144        object_store: ObjectStore,
145        metadata: RegionMetadataRef,
146        index_config: IndexConfig,
147        indexer_builder: I,
148        path_provider: P,
149        metrics: &'a mut Metrics,
150    ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
151        ParquetWriter::new(
152            ObjectStoreWriterFactory { object_store },
153            metadata,
154            index_config,
155            indexer_builder,
156            path_provider,
157            metrics,
158        )
159        .await
160    }
161
162    pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
163        self.file_cleaner = Some(cleaner);
164        self
165    }
166}
167
168impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
169where
170    F: WriterFactory,
171    I: IndexerBuilder,
172    P: FilePathProvider,
173{
174    /// Creates a new parquet SST writer.
175    pub async fn new(
176        factory: F,
177        metadata: RegionMetadataRef,
178        index_config: IndexConfig,
179        indexer_builder: I,
180        path_provider: P,
181        metrics: &'a mut Metrics,
182    ) -> ParquetWriter<'a, F, I, P> {
183        let init_file = FileId::random();
184        let indexer = indexer_builder.build(init_file, 0).await;
185
186        ParquetWriter {
187            path_provider,
188            writer: None,
189            current_file: init_file,
190            writer_factory: factory,
191            metadata,
192            index_config,
193            indexer_builder,
194            current_indexer: Some(indexer),
195            bytes_written: Arc::new(AtomicUsize::new(0)),
196            file_cleaner: None,
197            metrics,
198        }
199    }
200
201    /// Finishes current SST file and index file.
202    async fn finish_current_file(
203        &mut self,
204        ssts: &mut SstInfoArray,
205        stats: &mut SourceStats,
206    ) -> Result<()> {
207        // maybe_init_writer will re-create a new file.
208        if let Some(mut current_writer) = mem::take(&mut self.writer) {
209            let mut stats = mem::take(stats);
210            // At least one row has been written.
211            assert!(stats.num_rows > 0);
212
213            debug!(
214                "Finishing current file {}, file size: {}, num rows: {}",
215                self.current_file,
216                self.bytes_written.load(Ordering::Relaxed),
217                stats.num_rows
218            );
219
220            // Finish indexer and writer.
221            // safety: writer and index can only be both present or not.
222            let mut index_output = IndexOutput::default();
223            match self.index_config.build_mode {
224                IndexBuildMode::Sync => {
225                    index_output = self.current_indexer.as_mut().unwrap().finish().await;
226                }
227                IndexBuildMode::Async => {
228                    debug!(
229                        "Index for file {} will be built asynchronously later",
230                        self.current_file
231                    );
232                }
233            }
234            current_writer.flush().await.context(WriteParquetSnafu)?;
235
236            let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
237            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
238
239            // Safety: num rows > 0 so we must have min/max.
240            let time_range = stats.time_range.unwrap();
241
242            let max_row_group_uncompressed_size: u64 = parquet_metadata
243                .row_groups()
244                .iter()
245                .map(|rg| {
246                    rg.columns()
247                        .iter()
248                        .map(|c| c.uncompressed_size() as u64)
249                        .sum::<u64>()
250                })
251                .max()
252                .unwrap_or(0);
253            let num_series = stats.series_estimator.finish();
254            ssts.push(SstInfo {
255                file_id: self.current_file,
256                time_range,
257                file_size,
258                max_row_group_uncompressed_size,
259                num_rows: stats.num_rows,
260                num_row_groups: parquet_metadata.num_row_groups() as u64,
261                file_metadata: Some(Arc::new(parquet_metadata)),
262                index_metadata: index_output,
263                num_series,
264            });
265            self.current_file = FileId::random();
266            self.bytes_written.store(0, Ordering::Relaxed)
267        };
268
269        Ok(())
270    }
271
272    /// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file.
273    ///
274    /// Returns the [SstInfo] if the SST is written.
275    pub async fn write_all_flat(
276        &mut self,
277        source: FlatSource,
278        override_sequence: Option<SequenceNumber>,
279        opts: &WriteOptions,
280    ) -> Result<SstInfoArray> {
281        let converter = FlatBatchConverter::Flat(
282            FlatWriteFormat::new(
283                self.metadata.clone(),
284                &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
285            )
286            .with_override_sequence(override_sequence),
287        );
288        let res = self.write_all_flat_inner(source, &converter, opts).await;
289        if res.is_err() {
290            let file_id = self.current_file;
291            if let Some(cleaner) = &self.file_cleaner {
292                cleaner.clean_by_file_id(file_id).await;
293            }
294        }
295        res
296    }
297
298    /// Iterates FlatSource and writes all RecordBatch in primary-key format to Parquet file.
299    ///
300    /// Returns the [SstInfo] if the SST is written.
301    pub async fn write_all_flat_as_primary_key(
302        &mut self,
303        source: FlatSource,
304        override_sequence: Option<SequenceNumber>,
305        opts: &WriteOptions,
306    ) -> Result<SstInfoArray> {
307        let num_fields = self.metadata.field_columns().count();
308        let converter = FlatBatchConverter::PrimaryKey {
309            format: PrimaryKeyWriteFormat::new(self.metadata.clone())
310                .with_override_sequence(override_sequence),
311            num_fields,
312        };
313        let res = self.write_all_flat_inner(source, &converter, opts).await;
314        if res.is_err() {
315            let file_id = self.current_file;
316            if let Some(cleaner) = &self.file_cleaner {
317                cleaner.clean_by_file_id(file_id).await;
318            }
319        }
320        res
321    }
322
323    async fn write_all_flat_inner(
324        &mut self,
325        mut source: FlatSource,
326        converter: &FlatBatchConverter,
327        opts: &WriteOptions,
328    ) -> Result<SstInfoArray> {
329        let mut results = smallvec![];
330        let mut stats = SourceStats::default();
331
332        while let Some(record_batch) = self
333            .write_next_flat_batch(&mut source, converter, opts)
334            .await
335            .transpose()
336        {
337            match record_batch {
338                Ok(batch) => {
339                    stats.update_flat(&batch)?;
340                    if matches!(self.index_config.build_mode, IndexBuildMode::Sync) {
341                        let start = Instant::now();
342                        // safety: self.current_indexer must be set when first batch has been written.
343                        self.current_indexer
344                            .as_mut()
345                            .unwrap()
346                            .update_flat(&batch)
347                            .await;
348                        self.metrics.update_index += start.elapsed();
349                    }
350                    if let Some(max_file_size) = opts.max_file_size
351                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
352                    {
353                        self.finish_current_file(&mut results, &mut stats).await?;
354                    }
355                }
356                Err(e) => {
357                    if let Some(indexer) = &mut self.current_indexer {
358                        indexer.abort().await;
359                    }
360                    return Err(e);
361                }
362            }
363        }
364
365        self.finish_current_file(&mut results, &mut stats).await?;
366
367        // object_store.write will make sure all bytes are written or an error is raised.
368        Ok(results)
369    }
370
371    /// Customizes per-column config according to schema and maybe column cardinality.
372    fn customize_column_config(
373        builder: WriterPropertiesBuilder,
374        region_metadata: &RegionMetadataRef,
375    ) -> WriterPropertiesBuilder {
376        let ts_col = ColumnPath::new(vec![
377            region_metadata
378                .time_index_column()
379                .column_schema
380                .name
381                .clone(),
382        ]);
383        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
384        let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
385
386        builder
387            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
388            .set_column_dictionary_enabled(seq_col, false)
389            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
390            .set_column_dictionary_enabled(ts_col, false)
391            .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
392    }
393
394    async fn write_next_flat_batch(
395        &mut self,
396        source: &mut FlatSource,
397        converter: &FlatBatchConverter,
398        opts: &WriteOptions,
399    ) -> Result<Option<RecordBatch>> {
400        let start = Instant::now();
401        let Some(record_batch) = source.next_batch().await? else {
402            return Ok(None);
403        };
404        self.metrics.iter_source += start.elapsed();
405
406        let arrow_batch = converter.convert_batch(&record_batch)?;
407
408        let start = Instant::now();
409        self.maybe_init_writer(converter.arrow_schema(), opts)
410            .await?
411            .write(&arrow_batch)
412            .await
413            .context(WriteParquetSnafu)?;
414        self.metrics.write_batch += start.elapsed();
415        // Return original flat batch for stats/indexer which use flat layout.
416        Ok(Some(record_batch))
417    }
418
419    async fn maybe_init_writer(
420        &mut self,
421        schema: &SchemaRef,
422        opts: &WriteOptions,
423    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
424        if let Some(ref mut w) = self.writer {
425            Ok(w)
426        } else {
427            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
428            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
429
430            // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
431            let props_builder = WriterProperties::builder()
432                .set_key_value_metadata(Some(vec![key_value_meta]))
433                .set_compression(Compression::ZSTD(ZstdLevel::default()))
434                .set_encoding(Encoding::PLAIN)
435                .set_max_row_group_size(opts.row_group_size)
436                .set_column_index_truncate_length(None)
437                .set_statistics_truncate_length(None);
438
439            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
440            let writer_props = props_builder.build();
441
442            let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
443                self.metadata.region_id,
444                self.current_file,
445            ));
446            let writer = SizeAwareWriter::new(
447                self.writer_factory.create(&sst_file_path).await?,
448                self.bytes_written.clone(),
449            );
450            let arrow_writer =
451                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
452                    .context(WriteParquetSnafu)?;
453            self.writer = Some(arrow_writer);
454
455            let indexer = self.indexer_builder.build(self.current_file, 0).await;
456            self.current_indexer = Some(indexer);
457
458            // safety: self.writer is assigned above
459            Ok(self.writer.as_mut().unwrap())
460        }
461    }
462}
463
464#[derive(Default)]
465struct SourceStats {
466    /// Number of rows fetched.
467    num_rows: usize,
468    /// Time range of fetched batches.
469    time_range: Option<(Timestamp, Timestamp)>,
470    /// Series estimator for computing num_series.
471    series_estimator: SeriesEstimator,
472}
473
474impl SourceStats {
475    fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
476        if record_batch.num_rows() == 0 {
477            return Ok(());
478        }
479
480        self.num_rows += record_batch.num_rows();
481        self.series_estimator.update_flat(record_batch);
482
483        // Get the timestamp column by index
484        let time_index_col_idx = time_index_column_index(record_batch.num_columns());
485        let timestamp_array = record_batch.column(time_index_col_idx);
486
487        if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
488            if let Some(time_range) = &mut self.time_range {
489                time_range.0 = time_range.0.min(min_in_batch);
490                time_range.1 = time_range.1.max(max_in_batch);
491            } else {
492                self.time_range = Some((min_in_batch, max_in_batch));
493            }
494        }
495
496        Ok(())
497    }
498}
499
500/// Gets min and max timestamp from an timestamp array.
501fn timestamp_range_from_array(
502    timestamp_array: &ArrayRef,
503) -> Result<Option<(Timestamp, Timestamp)>> {
504    let (min_ts, max_ts) = match timestamp_array.data_type() {
505        DataType::Timestamp(TimeUnit::Second, _) => {
506            let array = timestamp_array
507                .as_any()
508                .downcast_ref::<TimestampSecondArray>()
509                .unwrap();
510            let min_val = min(array).map(Timestamp::new_second);
511            let max_val = max(array).map(Timestamp::new_second);
512            (min_val, max_val)
513        }
514        DataType::Timestamp(TimeUnit::Millisecond, _) => {
515            let array = timestamp_array
516                .as_any()
517                .downcast_ref::<TimestampMillisecondArray>()
518                .unwrap();
519            let min_val = min(array).map(Timestamp::new_millisecond);
520            let max_val = max(array).map(Timestamp::new_millisecond);
521            (min_val, max_val)
522        }
523        DataType::Timestamp(TimeUnit::Microsecond, _) => {
524            let array = timestamp_array
525                .as_any()
526                .downcast_ref::<TimestampMicrosecondArray>()
527                .unwrap();
528            let min_val = min(array).map(Timestamp::new_microsecond);
529            let max_val = max(array).map(Timestamp::new_microsecond);
530            (min_val, max_val)
531        }
532        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
533            let array = timestamp_array
534                .as_any()
535                .downcast_ref::<TimestampNanosecondArray>()
536                .unwrap();
537            let min_val = min(array).map(Timestamp::new_nanosecond);
538            let max_val = max(array).map(Timestamp::new_nanosecond);
539            (min_val, max_val)
540        }
541        _ => {
542            return UnexpectedSnafu {
543                reason: format!(
544                    "Unexpected data type of time index: {:?}",
545                    timestamp_array.data_type()
546                ),
547            }
548            .fail();
549        }
550    };
551
552    // If min timestamp exists, max timestamp should also exist.
553    Ok(min_ts.zip(max_ts))
554}
555
556/// Workaround for [AsyncArrowWriter] does not provide a method to
557/// get total bytes written after close.
558struct SizeAwareWriter<W> {
559    inner: W,
560    size: Arc<AtomicUsize>,
561}
562
563impl<W> SizeAwareWriter<W> {
564    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
565        Self {
566            inner,
567            size: size.clone(),
568        }
569    }
570}
571
572impl<W> AsyncWrite for SizeAwareWriter<W>
573where
574    W: AsyncWrite + Unpin,
575{
576    fn poll_write(
577        mut self: Pin<&mut Self>,
578        cx: &mut Context<'_>,
579        buf: &[u8],
580    ) -> Poll<std::result::Result<usize, std::io::Error>> {
581        let this = self.as_mut().get_mut();
582
583        match Pin::new(&mut this.inner).poll_write(cx, buf) {
584            Poll::Ready(Ok(bytes_written)) => {
585                this.size.fetch_add(bytes_written, Ordering::Relaxed);
586                Poll::Ready(Ok(bytes_written))
587            }
588            other => other,
589        }
590    }
591
592    fn poll_flush(
593        mut self: Pin<&mut Self>,
594        cx: &mut Context<'_>,
595    ) -> Poll<std::result::Result<(), std::io::Error>> {
596        Pin::new(&mut self.inner).poll_flush(cx)
597    }
598
599    fn poll_shutdown(
600        mut self: Pin<&mut Self>,
601        cx: &mut Context<'_>,
602    ) -> Poll<std::result::Result<(), std::io::Error>> {
603        Pin::new(&mut self.inner).poll_shutdown(cx)
604    }
605}