mito2/sst/parquet/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet writer.
16
17use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28    ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29    TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
44use store_api::storage::{FileId, SequenceNumber};
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::config::{IndexBuildMode, IndexConfig};
50use crate::error::{
51    InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
52};
53use crate::read::{Batch, FlatSource, Source};
54use crate::sst::file::RegionFileId;
55use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
56use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
57use crate::sst::parquet::format::PrimaryKeyWriteFormat;
58use crate::sst::parquet::helper::parse_parquet_metadata;
59use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
60use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions};
61
62/// Parquet SST writer.
63pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
64    /// Path provider that creates SST and index file paths according to file id.
65    path_provider: P,
66    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
67    /// Current active file id.
68    current_file: FileId,
69    writer_factory: F,
70    /// Region metadata of the source and the target SST.
71    metadata: RegionMetadataRef,
72    /// Global index config.
73    index_config: IndexConfig,
74    /// Indexer build that can create indexer for multiple files.
75    indexer_builder: I,
76    /// Current active indexer.
77    current_indexer: Option<Indexer>,
78    bytes_written: Arc<AtomicUsize>,
79    /// Cleaner to remove temp files on failure.
80    file_cleaner: Option<TempFileCleaner>,
81    /// Write metrics
82    metrics: Metrics,
83}
84
85pub trait WriterFactory {
86    type Writer: AsyncWrite + Send + Unpin;
87    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
88}
89
90pub struct ObjectStoreWriterFactory {
91    object_store: ObjectStore,
92}
93
94impl WriterFactory for ObjectStoreWriterFactory {
95    type Writer = Compat<FuturesAsyncWriter>;
96
97    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
98        self.object_store
99            .writer_with(file_path)
100            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
101            .concurrent(DEFAULT_WRITE_CONCURRENCY)
102            .await
103            .map(|v| v.into_futures_async_write().compat_write())
104            .context(OpenDalSnafu)
105    }
106}
107
108impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
109where
110    P: FilePathProvider,
111    I: IndexerBuilder,
112{
113    pub async fn new_with_object_store(
114        object_store: ObjectStore,
115        metadata: RegionMetadataRef,
116        index_config: IndexConfig,
117        indexer_builder: I,
118        path_provider: P,
119        metrics: Metrics,
120    ) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
121        ParquetWriter::new(
122            ObjectStoreWriterFactory { object_store },
123            metadata,
124            index_config,
125            indexer_builder,
126            path_provider,
127            metrics,
128        )
129        .await
130    }
131
132    pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
133        self.file_cleaner = Some(cleaner);
134        self
135    }
136}
137
138impl<F, I, P> ParquetWriter<F, I, P>
139where
140    F: WriterFactory,
141    I: IndexerBuilder,
142    P: FilePathProvider,
143{
144    /// Creates a new parquet SST writer.
145    pub async fn new(
146        factory: F,
147        metadata: RegionMetadataRef,
148        index_config: IndexConfig,
149        indexer_builder: I,
150        path_provider: P,
151        metrics: Metrics,
152    ) -> ParquetWriter<F, I, P> {
153        let init_file = FileId::random();
154        let indexer = indexer_builder.build(init_file).await;
155
156        ParquetWriter {
157            path_provider,
158            writer: None,
159            current_file: init_file,
160            writer_factory: factory,
161            metadata,
162            index_config,
163            indexer_builder,
164            current_indexer: Some(indexer),
165            bytes_written: Arc::new(AtomicUsize::new(0)),
166            file_cleaner: None,
167            metrics,
168        }
169    }
170
171    /// Finishes current SST file and index file.
172    async fn finish_current_file(
173        &mut self,
174        ssts: &mut SstInfoArray,
175        stats: &mut SourceStats,
176    ) -> Result<()> {
177        // maybe_init_writer will re-create a new file.
178        if let Some(mut current_writer) = mem::take(&mut self.writer) {
179            let stats = mem::take(stats);
180            // At least one row has been written.
181            assert!(stats.num_rows > 0);
182
183            debug!(
184                "Finishing current file {}, file size: {}, num rows: {}",
185                self.current_file,
186                self.bytes_written.load(Ordering::Relaxed),
187                stats.num_rows
188            );
189
190            // Finish indexer and writer.
191            // safety: writer and index can only be both present or not.
192            let mut index_output = IndexOutput::default();
193            match self.index_config.build_mode {
194                IndexBuildMode::Sync => {
195                    index_output = self.current_indexer.as_mut().unwrap().finish().await;
196                }
197                IndexBuildMode::Async => {
198                    debug!(
199                        "Index for file {} will be built asynchronously later",
200                        self.current_file
201                    );
202                }
203            }
204            current_writer.flush().await.context(WriteParquetSnafu)?;
205
206            let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
207            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
208
209            // Safety: num rows > 0 so we must have min/max.
210            let time_range = stats.time_range.unwrap();
211
212            // convert FileMetaData to ParquetMetaData
213            let parquet_metadata = parse_parquet_metadata(file_meta)?;
214            ssts.push(SstInfo {
215                file_id: self.current_file,
216                time_range,
217                file_size,
218                num_rows: stats.num_rows,
219                num_row_groups: parquet_metadata.num_row_groups() as u64,
220                file_metadata: Some(Arc::new(parquet_metadata)),
221                index_metadata: index_output,
222            });
223            self.current_file = FileId::random();
224            self.bytes_written.store(0, Ordering::Relaxed)
225        };
226
227        Ok(())
228    }
229
230    /// Iterates source and writes all rows to Parquet file.
231    ///
232    /// Returns the [SstInfo] if the SST is written.
233    pub async fn write_all(
234        &mut self,
235        source: Source,
236        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
237        opts: &WriteOptions,
238    ) -> Result<SstInfoArray> {
239        let res = self
240            .write_all_without_cleaning(source, override_sequence, opts)
241            .await;
242        if res.is_err() {
243            // Clean tmp files explicitly on failure.
244            let file_id = self.current_file;
245            if let Some(cleaner) = &self.file_cleaner {
246                cleaner.clean_by_file_id(file_id).await;
247            }
248        }
249        res
250    }
251
252    async fn write_all_without_cleaning(
253        &mut self,
254        mut source: Source,
255        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
256        opts: &WriteOptions,
257    ) -> Result<SstInfoArray> {
258        let mut results = smallvec![];
259        let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone())
260            .with_override_sequence(override_sequence);
261        let mut stats = SourceStats::default();
262
263        while let Some(res) = self
264            .write_next_batch(&mut source, &write_format, opts)
265            .await
266            .transpose()
267        {
268            match res {
269                Ok(mut batch) => {
270                    stats.update(&batch);
271                    let start = Instant::now();
272                    // safety: self.current_indexer must be set when first batch has been written.
273                    match self.index_config.build_mode {
274                        IndexBuildMode::Sync => {
275                            self.current_indexer
276                                .as_mut()
277                                .unwrap()
278                                .update(&mut batch)
279                                .await;
280                        }
281                        IndexBuildMode::Async => {}
282                    }
283                    self.metrics.update_index += start.elapsed();
284                    if let Some(max_file_size) = opts.max_file_size
285                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
286                    {
287                        self.finish_current_file(&mut results, &mut stats).await?;
288                    }
289                }
290                Err(e) => {
291                    if let Some(indexer) = &mut self.current_indexer {
292                        indexer.abort().await;
293                    }
294                    return Err(e);
295                }
296            }
297        }
298
299        self.finish_current_file(&mut results, &mut stats).await?;
300
301        // object_store.write will make sure all bytes are written or an error is raised.
302        Ok(results)
303    }
304
305    /// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file.
306    ///
307    /// Returns the [SstInfo] if the SST is written.
308    pub async fn write_all_flat(
309        &mut self,
310        source: FlatSource,
311        opts: &WriteOptions,
312    ) -> Result<SstInfoArray> {
313        let res = self.write_all_flat_without_cleaning(source, opts).await;
314        if res.is_err() {
315            // Clean tmp files explicitly on failure.
316            let file_id = self.current_file;
317            if let Some(cleaner) = &self.file_cleaner {
318                cleaner.clean_by_file_id(file_id).await;
319            }
320        }
321        res
322    }
323
324    async fn write_all_flat_without_cleaning(
325        &mut self,
326        mut source: FlatSource,
327        opts: &WriteOptions,
328    ) -> Result<SstInfoArray> {
329        let mut results = smallvec![];
330        let flat_format = FlatWriteFormat::new(
331            self.metadata.clone(),
332            &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
333        )
334        .with_override_sequence(None);
335        let mut stats = SourceStats::default();
336
337        while let Some(record_batch) = self
338            .write_next_flat_batch(&mut source, &flat_format, opts)
339            .await
340            .transpose()
341        {
342            match record_batch {
343                Ok(batch) => {
344                    stats.update_flat(&batch)?;
345                    let start = Instant::now();
346                    // safety: self.current_indexer must be set when first batch has been written.
347                    self.current_indexer
348                        .as_mut()
349                        .unwrap()
350                        .update_flat(&batch)
351                        .await;
352                    self.metrics.update_index += start.elapsed();
353                    if let Some(max_file_size) = opts.max_file_size
354                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
355                    {
356                        self.finish_current_file(&mut results, &mut stats).await?;
357                    }
358                }
359                Err(e) => {
360                    if let Some(indexer) = &mut self.current_indexer {
361                        indexer.abort().await;
362                    }
363                    return Err(e);
364                }
365            }
366        }
367
368        self.finish_current_file(&mut results, &mut stats).await?;
369
370        // object_store.write will make sure all bytes are written or an error is raised.
371        Ok(results)
372    }
373
374    /// Customizes per-column config according to schema and maybe column cardinality.
375    fn customize_column_config(
376        builder: WriterPropertiesBuilder,
377        region_metadata: &RegionMetadataRef,
378    ) -> WriterPropertiesBuilder {
379        let ts_col = ColumnPath::new(vec![
380            region_metadata
381                .time_index_column()
382                .column_schema
383                .name
384                .clone(),
385        ]);
386        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
387
388        builder
389            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
390            .set_column_dictionary_enabled(seq_col, false)
391            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
392            .set_column_dictionary_enabled(ts_col, false)
393    }
394
395    async fn write_next_batch(
396        &mut self,
397        source: &mut Source,
398        write_format: &PrimaryKeyWriteFormat,
399        opts: &WriteOptions,
400    ) -> Result<Option<Batch>> {
401        let start = Instant::now();
402        let Some(batch) = source.next_batch().await? else {
403            return Ok(None);
404        };
405        self.metrics.iter_source += start.elapsed();
406
407        let arrow_batch = write_format.convert_batch(&batch)?;
408
409        let start = Instant::now();
410        self.maybe_init_writer(write_format.arrow_schema(), opts)
411            .await?
412            .write(&arrow_batch)
413            .await
414            .context(WriteParquetSnafu)?;
415        self.metrics.write_batch += start.elapsed();
416        Ok(Some(batch))
417    }
418
419    async fn write_next_flat_batch(
420        &mut self,
421        source: &mut FlatSource,
422        flat_format: &FlatWriteFormat,
423        opts: &WriteOptions,
424    ) -> Result<Option<RecordBatch>> {
425        let start = Instant::now();
426        let Some(record_batch) = source.next_batch().await? else {
427            return Ok(None);
428        };
429        self.metrics.iter_source += start.elapsed();
430
431        let arrow_batch = flat_format.convert_batch(&record_batch)?;
432
433        let start = Instant::now();
434        self.maybe_init_writer(flat_format.arrow_schema(), opts)
435            .await?
436            .write(&arrow_batch)
437            .await
438            .context(WriteParquetSnafu)?;
439        self.metrics.write_batch += start.elapsed();
440        Ok(Some(record_batch))
441    }
442
443    async fn maybe_init_writer(
444        &mut self,
445        schema: &SchemaRef,
446        opts: &WriteOptions,
447    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
448        if let Some(ref mut w) = self.writer {
449            Ok(w)
450        } else {
451            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
452            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
453
454            // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
455            let props_builder = WriterProperties::builder()
456                .set_key_value_metadata(Some(vec![key_value_meta]))
457                .set_compression(Compression::ZSTD(ZstdLevel::default()))
458                .set_encoding(Encoding::PLAIN)
459                .set_max_row_group_size(opts.row_group_size)
460                .set_column_index_truncate_length(None)
461                .set_statistics_truncate_length(None);
462
463            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
464            let writer_props = props_builder.build();
465
466            let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
467                self.metadata.region_id,
468                self.current_file,
469            ));
470            let writer = SizeAwareWriter::new(
471                self.writer_factory.create(&sst_file_path).await?,
472                self.bytes_written.clone(),
473            );
474            let arrow_writer =
475                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
476                    .context(WriteParquetSnafu)?;
477            self.writer = Some(arrow_writer);
478
479            let indexer = self.indexer_builder.build(self.current_file).await;
480            self.current_indexer = Some(indexer);
481
482            // safety: self.writer is assigned above
483            Ok(self.writer.as_mut().unwrap())
484        }
485    }
486
487    /// Consumes write and return the collected metrics.
488    pub fn into_metrics(self) -> Metrics {
489        self.metrics
490    }
491}
492
493#[derive(Default)]
494struct SourceStats {
495    /// Number of rows fetched.
496    num_rows: usize,
497    /// Time range of fetched batches.
498    time_range: Option<(Timestamp, Timestamp)>,
499}
500
501impl SourceStats {
502    fn update(&mut self, batch: &Batch) {
503        if batch.is_empty() {
504            return;
505        }
506
507        self.num_rows += batch.num_rows();
508        // Safety: batch is not empty.
509        let (min_in_batch, max_in_batch) = (
510            batch.first_timestamp().unwrap(),
511            batch.last_timestamp().unwrap(),
512        );
513        if let Some(time_range) = &mut self.time_range {
514            time_range.0 = time_range.0.min(min_in_batch);
515            time_range.1 = time_range.1.max(max_in_batch);
516        } else {
517            self.time_range = Some((min_in_batch, max_in_batch));
518        }
519    }
520
521    fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
522        if record_batch.num_rows() == 0 {
523            return Ok(());
524        }
525
526        self.num_rows += record_batch.num_rows();
527
528        // Get the timestamp column by index
529        let time_index_col_idx = time_index_column_index(record_batch.num_columns());
530        let timestamp_array = record_batch.column(time_index_col_idx);
531
532        if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
533            if let Some(time_range) = &mut self.time_range {
534                time_range.0 = time_range.0.min(min_in_batch);
535                time_range.1 = time_range.1.max(max_in_batch);
536            } else {
537                self.time_range = Some((min_in_batch, max_in_batch));
538            }
539        }
540
541        Ok(())
542    }
543}
544
545/// Gets min and max timestamp from an timestamp array.
546fn timestamp_range_from_array(
547    timestamp_array: &ArrayRef,
548) -> Result<Option<(Timestamp, Timestamp)>> {
549    let (min_ts, max_ts) = match timestamp_array.data_type() {
550        DataType::Timestamp(TimeUnit::Second, _) => {
551            let array = timestamp_array
552                .as_any()
553                .downcast_ref::<TimestampSecondArray>()
554                .unwrap();
555            let min_val = min(array).map(Timestamp::new_second);
556            let max_val = max(array).map(Timestamp::new_second);
557            (min_val, max_val)
558        }
559        DataType::Timestamp(TimeUnit::Millisecond, _) => {
560            let array = timestamp_array
561                .as_any()
562                .downcast_ref::<TimestampMillisecondArray>()
563                .unwrap();
564            let min_val = min(array).map(Timestamp::new_millisecond);
565            let max_val = max(array).map(Timestamp::new_millisecond);
566            (min_val, max_val)
567        }
568        DataType::Timestamp(TimeUnit::Microsecond, _) => {
569            let array = timestamp_array
570                .as_any()
571                .downcast_ref::<TimestampMicrosecondArray>()
572                .unwrap();
573            let min_val = min(array).map(Timestamp::new_microsecond);
574            let max_val = max(array).map(Timestamp::new_microsecond);
575            (min_val, max_val)
576        }
577        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
578            let array = timestamp_array
579                .as_any()
580                .downcast_ref::<TimestampNanosecondArray>()
581                .unwrap();
582            let min_val = min(array).map(Timestamp::new_nanosecond);
583            let max_val = max(array).map(Timestamp::new_nanosecond);
584            (min_val, max_val)
585        }
586        _ => {
587            return UnexpectedSnafu {
588                reason: format!(
589                    "Unexpected data type of time index: {:?}",
590                    timestamp_array.data_type()
591                ),
592            }
593            .fail();
594        }
595    };
596
597    // If min timestamp exists, max timestamp should also exist.
598    Ok(min_ts.zip(max_ts))
599}
600
601/// Workaround for [AsyncArrowWriter] does not provide a method to
602/// get total bytes written after close.
603struct SizeAwareWriter<W> {
604    inner: W,
605    size: Arc<AtomicUsize>,
606}
607
608impl<W> SizeAwareWriter<W> {
609    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
610        Self {
611            inner,
612            size: size.clone(),
613        }
614    }
615}
616
617impl<W> AsyncWrite for SizeAwareWriter<W>
618where
619    W: AsyncWrite + Unpin,
620{
621    fn poll_write(
622        mut self: Pin<&mut Self>,
623        cx: &mut Context<'_>,
624        buf: &[u8],
625    ) -> Poll<std::result::Result<usize, std::io::Error>> {
626        let this = self.as_mut().get_mut();
627
628        match Pin::new(&mut this.inner).poll_write(cx, buf) {
629            Poll::Ready(Ok(bytes_written)) => {
630                this.size.fetch_add(bytes_written, Ordering::Relaxed);
631                Poll::Ready(Ok(bytes_written))
632            }
633            other => other,
634        }
635    }
636
637    fn poll_flush(
638        mut self: Pin<&mut Self>,
639        cx: &mut Context<'_>,
640    ) -> Poll<std::result::Result<(), std::io::Error>> {
641        Pin::new(&mut self.inner).poll_flush(cx)
642    }
643
644    fn poll_shutdown(
645        mut self: Pin<&mut Self>,
646        cx: &mut Context<'_>,
647    ) -> Poll<std::result::Result<(), std::io::Error>> {
648        Pin::new(&mut self.inner).poll_shutdown(cx)
649    }
650}