Skip to main content

mito2/sst/parquet/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet writer.
16
17use std::collections::HashMap;
18use std::future::Future;
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::task::{Context, Poll};
24use std::time::Instant;
25
26use common_telemetry::debug;
27use common_time::Timestamp;
28use datatypes::arrow::array::{
29    ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30    TimestampSecondArray,
31};
32use datatypes::arrow::compute::{max, min};
33use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
34use datatypes::arrow::record_batch::RecordBatch;
35use datatypes::extension::json::is_json_extension_type;
36use datatypes::schema::ext::ArrowSchemaExt;
37use object_store::{FuturesAsyncWriter, ObjectStore};
38use parquet::arrow::AsyncArrowWriter;
39use parquet::basic::{Compression, Encoding, ZstdLevel};
40use parquet::file::metadata::KeyValue;
41use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
42use parquet::schema::types::ColumnPath;
43use smallvec::smallvec;
44use snafu::ResultExt;
45use store_api::metadata::RegionMetadataRef;
46use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
47use store_api::storage::{FileId, SequenceNumber};
48use tokio::io::AsyncWrite;
49use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
50
51use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
52use crate::config::{IndexBuildMode, IndexConfig};
53use crate::error::{
54    InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
55};
56use crate::read::FlatSource;
57use crate::sst::file::RegionFileId;
58use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
59use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
60use crate::sst::parquet::format::PrimaryKeyWriteFormat;
61use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
62use crate::sst::{
63    DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
64};
65
66/// Converts a flat RecordBatch for writing to parquet.
67enum FlatBatchConverter {
68    /// Write as-is in flat format.
69    Flat(FlatWriteFormat),
70    /// Convert flat batch to primary-key format by stripping tag columns.
71    PrimaryKey {
72        format: PrimaryKeyWriteFormat,
73        num_fields: usize,
74    },
75}
76
77impl FlatBatchConverter {
78    fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
79        match self {
80            FlatBatchConverter::Flat(f) => f.convert_batch(batch),
81            FlatBatchConverter::PrimaryKey { format, num_fields } => {
82                format.convert_flat_batch(batch, *num_fields)
83            }
84        }
85    }
86}
87
88/// Parquet SST writer.
89pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
90    /// Path provider that creates SST and index file paths according to file id.
91    path_provider: P,
92    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
93    /// Current active file id.
94    current_file: FileId,
95    writer_factory: F,
96    /// Region metadata of the source and the target SST.
97    metadata: RegionMetadataRef,
98    /// Global index config.
99    index_config: IndexConfig,
100    /// Indexer build that can create indexer for multiple files.
101    indexer_builder: I,
102    /// Current active indexer.
103    current_indexer: Option<Indexer>,
104    bytes_written: Arc<AtomicUsize>,
105    /// Cleaner to remove temp files on failure.
106    file_cleaner: Option<TempFileCleaner>,
107    /// Write metrics
108    metrics: &'a mut Metrics,
109}
110
111pub trait WriterFactory {
112    type Writer: AsyncWrite + Send + Unpin;
113    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
114}
115
116pub struct ObjectStoreWriterFactory {
117    object_store: ObjectStore,
118}
119
120impl WriterFactory for ObjectStoreWriterFactory {
121    type Writer = Compat<FuturesAsyncWriter>;
122
123    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
124        self.object_store
125            .writer_with(file_path)
126            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
127            .concurrent(DEFAULT_WRITE_CONCURRENCY)
128            .await
129            .map(|v| v.into_futures_async_write().compat_write())
130            .context(OpenDalSnafu)
131    }
132}
133
134impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
135where
136    P: FilePathProvider,
137    I: IndexerBuilder,
138{
139    pub async fn new_with_object_store(
140        object_store: ObjectStore,
141        metadata: RegionMetadataRef,
142        index_config: IndexConfig,
143        indexer_builder: I,
144        path_provider: P,
145        metrics: &'a mut Metrics,
146    ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
147        ParquetWriter::new(
148            ObjectStoreWriterFactory { object_store },
149            metadata,
150            index_config,
151            indexer_builder,
152            path_provider,
153            metrics,
154        )
155        .await
156    }
157
158    pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
159        self.file_cleaner = Some(cleaner);
160        self
161    }
162}
163
164impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
165where
166    F: WriterFactory,
167    I: IndexerBuilder,
168    P: FilePathProvider,
169{
170    /// Creates a new parquet SST writer.
171    pub async fn new(
172        factory: F,
173        metadata: RegionMetadataRef,
174        index_config: IndexConfig,
175        indexer_builder: I,
176        path_provider: P,
177        metrics: &'a mut Metrics,
178    ) -> ParquetWriter<'a, F, I, P> {
179        let init_file = FileId::random();
180        let indexer = indexer_builder.build(init_file, 0).await;
181
182        ParquetWriter {
183            path_provider,
184            writer: None,
185            current_file: init_file,
186            writer_factory: factory,
187            metadata,
188            index_config,
189            indexer_builder,
190            current_indexer: Some(indexer),
191            bytes_written: Arc::new(AtomicUsize::new(0)),
192            file_cleaner: None,
193            metrics,
194        }
195    }
196
197    /// Finishes current SST file and index file.
198    async fn finish_current_file(
199        &mut self,
200        ssts: &mut SstInfoArray,
201        stats: &mut SourceStats,
202    ) -> Result<()> {
203        // maybe_init_writer will re-create a new file.
204        if let Some(mut current_writer) = mem::take(&mut self.writer) {
205            let mut stats = mem::take(stats);
206            // At least one row has been written.
207            assert!(stats.num_rows > 0);
208
209            debug!(
210                "Finishing current file {}, file size: {}, num rows: {}",
211                self.current_file,
212                self.bytes_written.load(Ordering::Relaxed),
213                stats.num_rows
214            );
215
216            // Finish indexer and writer.
217            // safety: writer and index can only be both present or not.
218            let mut index_output = IndexOutput::default();
219            match self.index_config.build_mode {
220                IndexBuildMode::Sync => {
221                    index_output = self.current_indexer.as_mut().unwrap().finish().await;
222                }
223                IndexBuildMode::Async => {
224                    debug!(
225                        "Index for file {} will be built asynchronously later",
226                        self.current_file
227                    );
228                }
229            }
230            current_writer.flush().await.context(WriteParquetSnafu)?;
231
232            let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
233            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
234
235            // Safety: num rows > 0 so we must have min/max.
236            let time_range = stats.time_range.unwrap();
237
238            let max_row_group_uncompressed_size: u64 = parquet_metadata
239                .row_groups()
240                .iter()
241                .map(|rg| {
242                    rg.columns()
243                        .iter()
244                        .map(|c| c.uncompressed_size() as u64)
245                        .sum::<u64>()
246                })
247                .max()
248                .unwrap_or(0);
249            let num_series = stats.series_estimator.finish();
250            ssts.push(SstInfo {
251                file_id: self.current_file,
252                time_range,
253                file_size,
254                max_row_group_uncompressed_size,
255                num_rows: stats.num_rows,
256                num_row_groups: parquet_metadata.num_row_groups() as u64,
257                file_metadata: Some(Arc::new(parquet_metadata)),
258                index_metadata: index_output,
259                num_series,
260            });
261            self.current_file = FileId::random();
262            self.bytes_written.store(0, Ordering::Relaxed)
263        };
264
265        Ok(())
266    }
267
268    /// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file.
269    ///
270    /// Returns the [SstInfo] if the SST is written.
271    pub async fn write_all_flat(
272        &mut self,
273        source: FlatSource,
274        override_sequence: Option<SequenceNumber>,
275        opts: &WriteOptions,
276    ) -> Result<SstInfoArray> {
277        let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
278
279        if source.schema().has_json_extension_field() {
280            options.concretized_json_types = source
281                .schema()
282                .fields()
283                .iter()
284                .filter(|&field| is_json_extension_type(field))
285                .map(|field| (field.name().clone(), field.data_type().clone()))
286                .collect::<HashMap<_, _>>();
287        }
288
289        let converter = FlatBatchConverter::Flat(
290            FlatWriteFormat::new(self.metadata.clone(), &options)
291                .with_override_sequence(override_sequence),
292        );
293        let res = self.write_all_flat_inner(source, &converter, opts).await;
294        if res.is_err() {
295            let file_id = self.current_file;
296            if let Some(cleaner) = &self.file_cleaner {
297                cleaner.clean_by_file_id(file_id).await;
298            }
299        }
300        res
301    }
302
303    /// Iterates FlatSource and writes all RecordBatch in primary-key format to Parquet file.
304    ///
305    /// Returns the [SstInfo] if the SST is written.
306    pub async fn write_all_flat_as_primary_key(
307        &mut self,
308        source: FlatSource,
309        override_sequence: Option<SequenceNumber>,
310        opts: &WriteOptions,
311    ) -> Result<SstInfoArray> {
312        let num_fields = self.metadata.field_columns().count();
313        let converter = FlatBatchConverter::PrimaryKey {
314            format: PrimaryKeyWriteFormat::new(self.metadata.clone())
315                .with_override_sequence(override_sequence),
316            num_fields,
317        };
318        let res = self.write_all_flat_inner(source, &converter, opts).await;
319        if res.is_err() {
320            let file_id = self.current_file;
321            if let Some(cleaner) = &self.file_cleaner {
322                cleaner.clean_by_file_id(file_id).await;
323            }
324        }
325        res
326    }
327
328    async fn write_all_flat_inner(
329        &mut self,
330        mut source: FlatSource,
331        converter: &FlatBatchConverter,
332        opts: &WriteOptions,
333    ) -> Result<SstInfoArray> {
334        let mut results = smallvec![];
335        let mut stats = SourceStats::default();
336
337        while let Some(record_batch) = self
338            .write_next_flat_batch(&mut source, converter, opts)
339            .await
340            .transpose()
341        {
342            match record_batch {
343                Ok(batch) => {
344                    stats.update_flat(&batch)?;
345                    if matches!(self.index_config.build_mode, IndexBuildMode::Sync) {
346                        let start = Instant::now();
347                        // safety: self.current_indexer must be set when first batch has been written.
348                        self.current_indexer
349                            .as_mut()
350                            .unwrap()
351                            .update_flat(&batch)
352                            .await;
353                        self.metrics.update_index += start.elapsed();
354                    }
355                    if let Some(max_file_size) = opts.max_file_size
356                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
357                    {
358                        self.finish_current_file(&mut results, &mut stats).await?;
359                    }
360                }
361                Err(e) => {
362                    if let Some(indexer) = &mut self.current_indexer {
363                        indexer.abort().await;
364                    }
365                    return Err(e);
366                }
367            }
368        }
369
370        self.finish_current_file(&mut results, &mut stats).await?;
371
372        // object_store.write will make sure all bytes are written or an error is raised.
373        Ok(results)
374    }
375
376    /// Customizes per-column config according to schema and maybe column cardinality.
377    fn customize_column_config(
378        builder: WriterPropertiesBuilder,
379        region_metadata: &RegionMetadataRef,
380    ) -> WriterPropertiesBuilder {
381        let ts_col = ColumnPath::new(vec![
382            region_metadata
383                .time_index_column()
384                .column_schema
385                .name
386                .clone(),
387        ]);
388        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
389        let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
390
391        builder
392            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
393            .set_column_dictionary_enabled(seq_col, false)
394            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
395            .set_column_dictionary_enabled(ts_col, false)
396            .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
397    }
398
399    async fn write_next_flat_batch(
400        &mut self,
401        source: &mut FlatSource,
402        converter: &FlatBatchConverter,
403        opts: &WriteOptions,
404    ) -> Result<Option<RecordBatch>> {
405        let start = Instant::now();
406        let Some(record_batch) = source.next_batch().await? else {
407            return Ok(None);
408        };
409        self.metrics.iter_source += start.elapsed();
410
411        let arrow_batch = converter.convert_batch(&record_batch)?;
412
413        let start = Instant::now();
414        self.maybe_init_writer(arrow_batch.schema_ref(), opts)
415            .await?
416            .write(&arrow_batch)
417            .await
418            .context(WriteParquetSnafu)?;
419        self.metrics.write_batch += start.elapsed();
420        // Return original flat batch for stats/indexer which use flat layout.
421        Ok(Some(record_batch))
422    }
423
424    async fn maybe_init_writer(
425        &mut self,
426        schema: &SchemaRef,
427        opts: &WriteOptions,
428    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
429        if let Some(ref mut w) = self.writer {
430            Ok(w)
431        } else {
432            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
433            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
434
435            // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
436            let props_builder = WriterProperties::builder()
437                .set_key_value_metadata(Some(vec![key_value_meta]))
438                .set_compression(Compression::ZSTD(ZstdLevel::default()))
439                .set_encoding(Encoding::PLAIN)
440                .set_max_row_group_row_count(Some(opts.row_group_size))
441                .set_column_index_truncate_length(None)
442                .set_statistics_truncate_length(None);
443
444            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
445            let writer_props = props_builder.build();
446
447            let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
448                self.metadata.region_id,
449                self.current_file,
450            ));
451            let writer = SizeAwareWriter::new(
452                self.writer_factory.create(&sst_file_path).await?,
453                self.bytes_written.clone(),
454            );
455            let arrow_writer =
456                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
457                    .context(WriteParquetSnafu)?;
458            self.writer = Some(arrow_writer);
459
460            let indexer = self.indexer_builder.build(self.current_file, 0).await;
461            self.current_indexer = Some(indexer);
462
463            // safety: self.writer is assigned above
464            Ok(self.writer.as_mut().unwrap())
465        }
466    }
467}
468
469#[derive(Default)]
470struct SourceStats {
471    /// Number of rows fetched.
472    num_rows: usize,
473    /// Time range of fetched batches.
474    time_range: Option<(Timestamp, Timestamp)>,
475    /// Series estimator for computing num_series.
476    series_estimator: SeriesEstimator,
477}
478
479impl SourceStats {
480    fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
481        if record_batch.num_rows() == 0 {
482            return Ok(());
483        }
484
485        self.num_rows += record_batch.num_rows();
486        self.series_estimator.update_flat(record_batch);
487
488        // Get the timestamp column by index
489        let time_index_col_idx = time_index_column_index(record_batch.num_columns());
490        let timestamp_array = record_batch.column(time_index_col_idx);
491
492        if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
493            if let Some(time_range) = &mut self.time_range {
494                time_range.0 = time_range.0.min(min_in_batch);
495                time_range.1 = time_range.1.max(max_in_batch);
496            } else {
497                self.time_range = Some((min_in_batch, max_in_batch));
498            }
499        }
500
501        Ok(())
502    }
503}
504
505/// Gets min and max timestamp from an timestamp array.
506fn timestamp_range_from_array(
507    timestamp_array: &ArrayRef,
508) -> Result<Option<(Timestamp, Timestamp)>> {
509    let (min_ts, max_ts) = match timestamp_array.data_type() {
510        DataType::Timestamp(TimeUnit::Second, _) => {
511            let array = timestamp_array
512                .as_any()
513                .downcast_ref::<TimestampSecondArray>()
514                .unwrap();
515            let min_val = min(array).map(Timestamp::new_second);
516            let max_val = max(array).map(Timestamp::new_second);
517            (min_val, max_val)
518        }
519        DataType::Timestamp(TimeUnit::Millisecond, _) => {
520            let array = timestamp_array
521                .as_any()
522                .downcast_ref::<TimestampMillisecondArray>()
523                .unwrap();
524            let min_val = min(array).map(Timestamp::new_millisecond);
525            let max_val = max(array).map(Timestamp::new_millisecond);
526            (min_val, max_val)
527        }
528        DataType::Timestamp(TimeUnit::Microsecond, _) => {
529            let array = timestamp_array
530                .as_any()
531                .downcast_ref::<TimestampMicrosecondArray>()
532                .unwrap();
533            let min_val = min(array).map(Timestamp::new_microsecond);
534            let max_val = max(array).map(Timestamp::new_microsecond);
535            (min_val, max_val)
536        }
537        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
538            let array = timestamp_array
539                .as_any()
540                .downcast_ref::<TimestampNanosecondArray>()
541                .unwrap();
542            let min_val = min(array).map(Timestamp::new_nanosecond);
543            let max_val = max(array).map(Timestamp::new_nanosecond);
544            (min_val, max_val)
545        }
546        _ => {
547            return UnexpectedSnafu {
548                reason: format!(
549                    "Unexpected data type of time index: {:?}",
550                    timestamp_array.data_type()
551                ),
552            }
553            .fail();
554        }
555    };
556
557    // If min timestamp exists, max timestamp should also exist.
558    Ok(min_ts.zip(max_ts))
559}
560
561/// Workaround for [AsyncArrowWriter] does not provide a method to
562/// get total bytes written after close.
563struct SizeAwareWriter<W> {
564    inner: W,
565    size: Arc<AtomicUsize>,
566}
567
568impl<W> SizeAwareWriter<W> {
569    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
570        Self {
571            inner,
572            size: size.clone(),
573        }
574    }
575}
576
577impl<W> AsyncWrite for SizeAwareWriter<W>
578where
579    W: AsyncWrite + Unpin,
580{
581    fn poll_write(
582        mut self: Pin<&mut Self>,
583        cx: &mut Context<'_>,
584        buf: &[u8],
585    ) -> Poll<std::result::Result<usize, std::io::Error>> {
586        let this = self.as_mut().get_mut();
587
588        match Pin::new(&mut this.inner).poll_write(cx, buf) {
589            Poll::Ready(Ok(bytes_written)) => {
590                this.size.fetch_add(bytes_written, Ordering::Relaxed);
591                Poll::Ready(Ok(bytes_written))
592            }
593            other => other,
594        }
595    }
596
597    fn poll_flush(
598        mut self: Pin<&mut Self>,
599        cx: &mut Context<'_>,
600    ) -> Poll<std::result::Result<(), std::io::Error>> {
601        Pin::new(&mut self.inner).poll_flush(cx)
602    }
603
604    fn poll_shutdown(
605        mut self: Pin<&mut Self>,
606        cx: &mut Context<'_>,
607    ) -> Poll<std::result::Result<(), std::io::Error>> {
608        Pin::new(&mut self.inner).poll_shutdown(cx)
609    }
610}