mito2/sst/parquet/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet writer.
16
17use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28    ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29    TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
44use store_api::storage::{FileId, SequenceNumber};
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::config::{IndexBuildMode, IndexConfig};
50use crate::error::{
51    InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
52};
53use crate::read::{Batch, FlatSource, Source};
54use crate::sst::file::RegionFileId;
55use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
56use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
57use crate::sst::parquet::format::PrimaryKeyWriteFormat;
58use crate::sst::parquet::helper::parse_parquet_metadata;
59use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
60use crate::sst::{
61    DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
62};
63
64/// Parquet SST writer.
65pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
66    /// Path provider that creates SST and index file paths according to file id.
67    path_provider: P,
68    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
69    /// Current active file id.
70    current_file: FileId,
71    writer_factory: F,
72    /// Region metadata of the source and the target SST.
73    metadata: RegionMetadataRef,
74    /// Global index config.
75    index_config: IndexConfig,
76    /// Indexer build that can create indexer for multiple files.
77    indexer_builder: I,
78    /// Current active indexer.
79    current_indexer: Option<Indexer>,
80    bytes_written: Arc<AtomicUsize>,
81    /// Cleaner to remove temp files on failure.
82    file_cleaner: Option<TempFileCleaner>,
83    /// Write metrics
84    metrics: &'a mut Metrics,
85}
86
87pub trait WriterFactory {
88    type Writer: AsyncWrite + Send + Unpin;
89    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
90}
91
92pub struct ObjectStoreWriterFactory {
93    object_store: ObjectStore,
94}
95
96impl WriterFactory for ObjectStoreWriterFactory {
97    type Writer = Compat<FuturesAsyncWriter>;
98
99    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
100        self.object_store
101            .writer_with(file_path)
102            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
103            .concurrent(DEFAULT_WRITE_CONCURRENCY)
104            .await
105            .map(|v| v.into_futures_async_write().compat_write())
106            .context(OpenDalSnafu)
107    }
108}
109
110impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
111where
112    P: FilePathProvider,
113    I: IndexerBuilder,
114{
115    pub async fn new_with_object_store(
116        object_store: ObjectStore,
117        metadata: RegionMetadataRef,
118        index_config: IndexConfig,
119        indexer_builder: I,
120        path_provider: P,
121        metrics: &'a mut Metrics,
122    ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
123        ParquetWriter::new(
124            ObjectStoreWriterFactory { object_store },
125            metadata,
126            index_config,
127            indexer_builder,
128            path_provider,
129            metrics,
130        )
131        .await
132    }
133
134    pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
135        self.file_cleaner = Some(cleaner);
136        self
137    }
138}
139
140impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
141where
142    F: WriterFactory,
143    I: IndexerBuilder,
144    P: FilePathProvider,
145{
146    /// Creates a new parquet SST writer.
147    pub async fn new(
148        factory: F,
149        metadata: RegionMetadataRef,
150        index_config: IndexConfig,
151        indexer_builder: I,
152        path_provider: P,
153        metrics: &'a mut Metrics,
154    ) -> ParquetWriter<'a, F, I, P> {
155        let init_file = FileId::random();
156        let indexer = indexer_builder.build(init_file, 0).await;
157
158        ParquetWriter {
159            path_provider,
160            writer: None,
161            current_file: init_file,
162            writer_factory: factory,
163            metadata,
164            index_config,
165            indexer_builder,
166            current_indexer: Some(indexer),
167            bytes_written: Arc::new(AtomicUsize::new(0)),
168            file_cleaner: None,
169            metrics,
170        }
171    }
172
173    /// Finishes current SST file and index file.
174    async fn finish_current_file(
175        &mut self,
176        ssts: &mut SstInfoArray,
177        stats: &mut SourceStats,
178    ) -> Result<()> {
179        // maybe_init_writer will re-create a new file.
180        if let Some(mut current_writer) = mem::take(&mut self.writer) {
181            let mut stats = mem::take(stats);
182            // At least one row has been written.
183            assert!(stats.num_rows > 0);
184
185            debug!(
186                "Finishing current file {}, file size: {}, num rows: {}",
187                self.current_file,
188                self.bytes_written.load(Ordering::Relaxed),
189                stats.num_rows
190            );
191
192            // Finish indexer and writer.
193            // safety: writer and index can only be both present or not.
194            let mut index_output = IndexOutput::default();
195            match self.index_config.build_mode {
196                IndexBuildMode::Sync => {
197                    index_output = self.current_indexer.as_mut().unwrap().finish().await;
198                }
199                IndexBuildMode::Async => {
200                    debug!(
201                        "Index for file {} will be built asynchronously later",
202                        self.current_file
203                    );
204                }
205            }
206            current_writer.flush().await.context(WriteParquetSnafu)?;
207
208            let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
209            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
210
211            // Safety: num rows > 0 so we must have min/max.
212            let time_range = stats.time_range.unwrap();
213
214            // convert FileMetaData to ParquetMetaData
215            let parquet_metadata = parse_parquet_metadata(file_meta)?;
216            let max_row_group_uncompressed_size: u64 = parquet_metadata
217                .row_groups()
218                .iter()
219                .map(|rg| {
220                    rg.columns()
221                        .iter()
222                        .map(|c| c.uncompressed_size() as u64)
223                        .sum::<u64>()
224                })
225                .max()
226                .unwrap_or(0);
227            let num_series = stats.series_estimator.finish();
228            ssts.push(SstInfo {
229                file_id: self.current_file,
230                time_range,
231                file_size,
232                max_row_group_uncompressed_size,
233                num_rows: stats.num_rows,
234                num_row_groups: parquet_metadata.num_row_groups() as u64,
235                file_metadata: Some(Arc::new(parquet_metadata)),
236                index_metadata: index_output,
237                num_series,
238            });
239            self.current_file = FileId::random();
240            self.bytes_written.store(0, Ordering::Relaxed)
241        };
242
243        Ok(())
244    }
245
246    /// Iterates source and writes all rows to Parquet file.
247    ///
248    /// Returns the [SstInfo] if the SST is written.
249    pub async fn write_all(
250        &mut self,
251        source: Source,
252        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
253        opts: &WriteOptions,
254    ) -> Result<SstInfoArray> {
255        let res = self
256            .write_all_without_cleaning(source, override_sequence, opts)
257            .await;
258        if res.is_err() {
259            // Clean tmp files explicitly on failure.
260            let file_id = self.current_file;
261            if let Some(cleaner) = &self.file_cleaner {
262                cleaner.clean_by_file_id(file_id).await;
263            }
264        }
265        res
266    }
267
268    async fn write_all_without_cleaning(
269        &mut self,
270        mut source: Source,
271        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
272        opts: &WriteOptions,
273    ) -> Result<SstInfoArray> {
274        let mut results = smallvec![];
275        let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone())
276            .with_override_sequence(override_sequence);
277        let mut stats = SourceStats::default();
278
279        while let Some(res) = self
280            .write_next_batch(&mut source, &write_format, opts)
281            .await
282            .transpose()
283        {
284            match res {
285                Ok(mut batch) => {
286                    stats.update(&batch);
287                    let start = Instant::now();
288                    // safety: self.current_indexer must be set when first batch has been written.
289                    match self.index_config.build_mode {
290                        IndexBuildMode::Sync => {
291                            self.current_indexer
292                                .as_mut()
293                                .unwrap()
294                                .update(&mut batch)
295                                .await;
296                        }
297                        IndexBuildMode::Async => {}
298                    }
299                    self.metrics.update_index += start.elapsed();
300                    if let Some(max_file_size) = opts.max_file_size
301                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
302                    {
303                        self.finish_current_file(&mut results, &mut stats).await?;
304                    }
305                }
306                Err(e) => {
307                    if let Some(indexer) = &mut self.current_indexer {
308                        indexer.abort().await;
309                    }
310                    return Err(e);
311                }
312            }
313        }
314
315        self.finish_current_file(&mut results, &mut stats).await?;
316
317        // object_store.write will make sure all bytes are written or an error is raised.
318        Ok(results)
319    }
320
321    /// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file.
322    ///
323    /// Returns the [SstInfo] if the SST is written.
324    pub async fn write_all_flat(
325        &mut self,
326        source: FlatSource,
327        opts: &WriteOptions,
328    ) -> Result<SstInfoArray> {
329        let res = self.write_all_flat_without_cleaning(source, opts).await;
330        if res.is_err() {
331            // Clean tmp files explicitly on failure.
332            let file_id = self.current_file;
333            if let Some(cleaner) = &self.file_cleaner {
334                cleaner.clean_by_file_id(file_id).await;
335            }
336        }
337        res
338    }
339
340    async fn write_all_flat_without_cleaning(
341        &mut self,
342        mut source: FlatSource,
343        opts: &WriteOptions,
344    ) -> Result<SstInfoArray> {
345        let mut results = smallvec![];
346        let flat_format = FlatWriteFormat::new(
347            self.metadata.clone(),
348            &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
349        )
350        .with_override_sequence(None);
351        let mut stats = SourceStats::default();
352
353        while let Some(record_batch) = self
354            .write_next_flat_batch(&mut source, &flat_format, opts)
355            .await
356            .transpose()
357        {
358            match record_batch {
359                Ok(batch) => {
360                    stats.update_flat(&batch)?;
361                    let start = Instant::now();
362                    // safety: self.current_indexer must be set when first batch has been written.
363                    self.current_indexer
364                        .as_mut()
365                        .unwrap()
366                        .update_flat(&batch)
367                        .await;
368                    self.metrics.update_index += start.elapsed();
369                    if let Some(max_file_size) = opts.max_file_size
370                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
371                    {
372                        self.finish_current_file(&mut results, &mut stats).await?;
373                    }
374                }
375                Err(e) => {
376                    if let Some(indexer) = &mut self.current_indexer {
377                        indexer.abort().await;
378                    }
379                    return Err(e);
380                }
381            }
382        }
383
384        self.finish_current_file(&mut results, &mut stats).await?;
385
386        // object_store.write will make sure all bytes are written or an error is raised.
387        Ok(results)
388    }
389
390    /// Customizes per-column config according to schema and maybe column cardinality.
391    fn customize_column_config(
392        builder: WriterPropertiesBuilder,
393        region_metadata: &RegionMetadataRef,
394    ) -> WriterPropertiesBuilder {
395        let ts_col = ColumnPath::new(vec![
396            region_metadata
397                .time_index_column()
398                .column_schema
399                .name
400                .clone(),
401        ]);
402        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
403        let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
404
405        builder
406            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
407            .set_column_dictionary_enabled(seq_col, false)
408            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
409            .set_column_dictionary_enabled(ts_col, false)
410            .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
411    }
412
413    async fn write_next_batch(
414        &mut self,
415        source: &mut Source,
416        write_format: &PrimaryKeyWriteFormat,
417        opts: &WriteOptions,
418    ) -> Result<Option<Batch>> {
419        let start = Instant::now();
420        let Some(batch) = source.next_batch().await? else {
421            return Ok(None);
422        };
423        self.metrics.iter_source += start.elapsed();
424
425        let arrow_batch = write_format.convert_batch(&batch)?;
426
427        let start = Instant::now();
428        self.maybe_init_writer(write_format.arrow_schema(), opts)
429            .await?
430            .write(&arrow_batch)
431            .await
432            .context(WriteParquetSnafu)?;
433        self.metrics.write_batch += start.elapsed();
434        Ok(Some(batch))
435    }
436
437    async fn write_next_flat_batch(
438        &mut self,
439        source: &mut FlatSource,
440        flat_format: &FlatWriteFormat,
441        opts: &WriteOptions,
442    ) -> Result<Option<RecordBatch>> {
443        let start = Instant::now();
444        let Some(record_batch) = source.next_batch().await? else {
445            return Ok(None);
446        };
447        self.metrics.iter_source += start.elapsed();
448
449        let arrow_batch = flat_format.convert_batch(&record_batch)?;
450
451        let start = Instant::now();
452        self.maybe_init_writer(flat_format.arrow_schema(), opts)
453            .await?
454            .write(&arrow_batch)
455            .await
456            .context(WriteParquetSnafu)?;
457        self.metrics.write_batch += start.elapsed();
458        Ok(Some(record_batch))
459    }
460
461    async fn maybe_init_writer(
462        &mut self,
463        schema: &SchemaRef,
464        opts: &WriteOptions,
465    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
466        if let Some(ref mut w) = self.writer {
467            Ok(w)
468        } else {
469            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
470            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
471
472            // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
473            let props_builder = WriterProperties::builder()
474                .set_key_value_metadata(Some(vec![key_value_meta]))
475                .set_compression(Compression::ZSTD(ZstdLevel::default()))
476                .set_encoding(Encoding::PLAIN)
477                .set_max_row_group_size(opts.row_group_size)
478                .set_column_index_truncate_length(None)
479                .set_statistics_truncate_length(None);
480
481            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
482            let writer_props = props_builder.build();
483
484            let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
485                self.metadata.region_id,
486                self.current_file,
487            ));
488            let writer = SizeAwareWriter::new(
489                self.writer_factory.create(&sst_file_path).await?,
490                self.bytes_written.clone(),
491            );
492            let arrow_writer =
493                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
494                    .context(WriteParquetSnafu)?;
495            self.writer = Some(arrow_writer);
496
497            let indexer = self.indexer_builder.build(self.current_file, 0).await;
498            self.current_indexer = Some(indexer);
499
500            // safety: self.writer is assigned above
501            Ok(self.writer.as_mut().unwrap())
502        }
503    }
504}
505
506#[derive(Default)]
507struct SourceStats {
508    /// Number of rows fetched.
509    num_rows: usize,
510    /// Time range of fetched batches.
511    time_range: Option<(Timestamp, Timestamp)>,
512    /// Series estimator for computing num_series.
513    series_estimator: SeriesEstimator,
514}
515
516impl SourceStats {
517    fn update(&mut self, batch: &Batch) {
518        if batch.is_empty() {
519            return;
520        }
521
522        self.num_rows += batch.num_rows();
523        self.series_estimator.update(batch);
524        // Safety: batch is not empty.
525        let (min_in_batch, max_in_batch) = (
526            batch.first_timestamp().unwrap(),
527            batch.last_timestamp().unwrap(),
528        );
529        if let Some(time_range) = &mut self.time_range {
530            time_range.0 = time_range.0.min(min_in_batch);
531            time_range.1 = time_range.1.max(max_in_batch);
532        } else {
533            self.time_range = Some((min_in_batch, max_in_batch));
534        }
535    }
536
537    fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
538        if record_batch.num_rows() == 0 {
539            return Ok(());
540        }
541
542        self.num_rows += record_batch.num_rows();
543        self.series_estimator.update_flat(record_batch);
544
545        // Get the timestamp column by index
546        let time_index_col_idx = time_index_column_index(record_batch.num_columns());
547        let timestamp_array = record_batch.column(time_index_col_idx);
548
549        if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
550            if let Some(time_range) = &mut self.time_range {
551                time_range.0 = time_range.0.min(min_in_batch);
552                time_range.1 = time_range.1.max(max_in_batch);
553            } else {
554                self.time_range = Some((min_in_batch, max_in_batch));
555            }
556        }
557
558        Ok(())
559    }
560}
561
562/// Gets min and max timestamp from an timestamp array.
563fn timestamp_range_from_array(
564    timestamp_array: &ArrayRef,
565) -> Result<Option<(Timestamp, Timestamp)>> {
566    let (min_ts, max_ts) = match timestamp_array.data_type() {
567        DataType::Timestamp(TimeUnit::Second, _) => {
568            let array = timestamp_array
569                .as_any()
570                .downcast_ref::<TimestampSecondArray>()
571                .unwrap();
572            let min_val = min(array).map(Timestamp::new_second);
573            let max_val = max(array).map(Timestamp::new_second);
574            (min_val, max_val)
575        }
576        DataType::Timestamp(TimeUnit::Millisecond, _) => {
577            let array = timestamp_array
578                .as_any()
579                .downcast_ref::<TimestampMillisecondArray>()
580                .unwrap();
581            let min_val = min(array).map(Timestamp::new_millisecond);
582            let max_val = max(array).map(Timestamp::new_millisecond);
583            (min_val, max_val)
584        }
585        DataType::Timestamp(TimeUnit::Microsecond, _) => {
586            let array = timestamp_array
587                .as_any()
588                .downcast_ref::<TimestampMicrosecondArray>()
589                .unwrap();
590            let min_val = min(array).map(Timestamp::new_microsecond);
591            let max_val = max(array).map(Timestamp::new_microsecond);
592            (min_val, max_val)
593        }
594        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
595            let array = timestamp_array
596                .as_any()
597                .downcast_ref::<TimestampNanosecondArray>()
598                .unwrap();
599            let min_val = min(array).map(Timestamp::new_nanosecond);
600            let max_val = max(array).map(Timestamp::new_nanosecond);
601            (min_val, max_val)
602        }
603        _ => {
604            return UnexpectedSnafu {
605                reason: format!(
606                    "Unexpected data type of time index: {:?}",
607                    timestamp_array.data_type()
608                ),
609            }
610            .fail();
611        }
612    };
613
614    // If min timestamp exists, max timestamp should also exist.
615    Ok(min_ts.zip(max_ts))
616}
617
618/// Workaround for [AsyncArrowWriter] does not provide a method to
619/// get total bytes written after close.
620struct SizeAwareWriter<W> {
621    inner: W,
622    size: Arc<AtomicUsize>,
623}
624
625impl<W> SizeAwareWriter<W> {
626    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
627        Self {
628            inner,
629            size: size.clone(),
630        }
631    }
632}
633
634impl<W> AsyncWrite for SizeAwareWriter<W>
635where
636    W: AsyncWrite + Unpin,
637{
638    fn poll_write(
639        mut self: Pin<&mut Self>,
640        cx: &mut Context<'_>,
641        buf: &[u8],
642    ) -> Poll<std::result::Result<usize, std::io::Error>> {
643        let this = self.as_mut().get_mut();
644
645        match Pin::new(&mut this.inner).poll_write(cx, buf) {
646            Poll::Ready(Ok(bytes_written)) => {
647                this.size.fetch_add(bytes_written, Ordering::Relaxed);
648                Poll::Ready(Ok(bytes_written))
649            }
650            other => other,
651        }
652    }
653
654    fn poll_flush(
655        mut self: Pin<&mut Self>,
656        cx: &mut Context<'_>,
657    ) -> Poll<std::result::Result<(), std::io::Error>> {
658        Pin::new(&mut self.inner).poll_flush(cx)
659    }
660
661    fn poll_shutdown(
662        mut self: Pin<&mut Self>,
663        cx: &mut Context<'_>,
664    ) -> Poll<std::result::Result<(), std::io::Error>> {
665        Pin::new(&mut self.inner).poll_shutdown(cx)
666    }
667}