Skip to main content

mito2/sst/parquet/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet writer.
16
17use std::collections::HashMap;
18use std::future::Future;
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::task::{Context, Poll};
24use std::time::Instant;
25
26use common_telemetry::debug;
27use common_time::Timestamp;
28use datatypes::arrow::array::{
29    ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30    TimestampSecondArray,
31};
32use datatypes::arrow::compute::{max, min};
33use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
34use datatypes::arrow::record_batch::RecordBatch;
35use datatypes::extension::json::is_structured_json_field;
36use object_store::{FuturesAsyncWriter, ObjectStore};
37use parquet::arrow::AsyncArrowWriter;
38use parquet::basic::{Compression, Encoding, ZstdLevel};
39use parquet::file::metadata::KeyValue;
40use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
41use parquet::schema::types::ColumnPath;
42use smallvec::smallvec;
43use snafu::ResultExt;
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
46use store_api::storage::{FileId, SequenceNumber};
47use tokio::io::AsyncWrite;
48use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
49
50use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
51use crate::config::{IndexBuildMode, IndexConfig};
52use crate::error::{
53    InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
54};
55use crate::read::FlatSource;
56use crate::sst::file::RegionFileId;
57use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
58use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
59use crate::sst::parquet::format::PrimaryKeyWriteFormat;
60use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
61use crate::sst::{
62    DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
63};
64
65/// Converts a flat RecordBatch for writing to parquet.
66enum FlatBatchConverter {
67    /// Write as-is in flat format.
68    Flat(FlatWriteFormat),
69    /// Convert flat batch to primary-key format by stripping tag columns.
70    PrimaryKey {
71        format: PrimaryKeyWriteFormat,
72        num_fields: usize,
73    },
74}
75
76impl FlatBatchConverter {
77    fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
78        match self {
79            FlatBatchConverter::Flat(f) => f.convert_batch(batch),
80            FlatBatchConverter::PrimaryKey { format, num_fields } => {
81                format.convert_flat_batch(batch, *num_fields)
82            }
83        }
84    }
85}
86
87/// Parquet SST writer.
88pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
89    /// Path provider that creates SST and index file paths according to file id.
90    path_provider: P,
91    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
92    /// Current active file id.
93    current_file: FileId,
94    writer_factory: F,
95    /// Region metadata of the source and the target SST.
96    metadata: RegionMetadataRef,
97    /// Global index config.
98    index_config: IndexConfig,
99    /// Indexer build that can create indexer for multiple files.
100    indexer_builder: I,
101    /// Current active indexer.
102    current_indexer: Option<Indexer>,
103    bytes_written: Arc<AtomicUsize>,
104    /// Cleaner to remove temp files on failure.
105    file_cleaner: Option<TempFileCleaner>,
106    /// Write metrics
107    metrics: &'a mut Metrics,
108}
109
110pub trait WriterFactory {
111    type Writer: AsyncWrite + Send + Unpin;
112    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
113}
114
115pub struct ObjectStoreWriterFactory {
116    object_store: ObjectStore,
117}
118
119impl WriterFactory for ObjectStoreWriterFactory {
120    type Writer = Compat<FuturesAsyncWriter>;
121
122    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
123        self.object_store
124            .writer_with(file_path)
125            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
126            .concurrent(DEFAULT_WRITE_CONCURRENCY)
127            .await
128            .map(|v| v.into_futures_async_write().compat_write())
129            .context(OpenDalSnafu)
130    }
131}
132
133impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
134where
135    P: FilePathProvider,
136    I: IndexerBuilder,
137{
138    pub async fn new_with_object_store(
139        object_store: ObjectStore,
140        metadata: RegionMetadataRef,
141        index_config: IndexConfig,
142        indexer_builder: I,
143        path_provider: P,
144        metrics: &'a mut Metrics,
145    ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
146        ParquetWriter::new(
147            ObjectStoreWriterFactory { object_store },
148            metadata,
149            index_config,
150            indexer_builder,
151            path_provider,
152            metrics,
153        )
154        .await
155    }
156
157    pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
158        self.file_cleaner = Some(cleaner);
159        self
160    }
161}
162
163impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
164where
165    F: WriterFactory,
166    I: IndexerBuilder,
167    P: FilePathProvider,
168{
169    /// Creates a new parquet SST writer.
170    pub async fn new(
171        factory: F,
172        metadata: RegionMetadataRef,
173        index_config: IndexConfig,
174        indexer_builder: I,
175        path_provider: P,
176        metrics: &'a mut Metrics,
177    ) -> ParquetWriter<'a, F, I, P> {
178        let init_file = FileId::random();
179        let indexer = indexer_builder.build(init_file, 0).await;
180
181        ParquetWriter {
182            path_provider,
183            writer: None,
184            current_file: init_file,
185            writer_factory: factory,
186            metadata,
187            index_config,
188            indexer_builder,
189            current_indexer: Some(indexer),
190            bytes_written: Arc::new(AtomicUsize::new(0)),
191            file_cleaner: None,
192            metrics,
193        }
194    }
195
196    /// Finishes current SST file and index file.
197    async fn finish_current_file(
198        &mut self,
199        ssts: &mut SstInfoArray,
200        stats: &mut SourceStats,
201    ) -> Result<()> {
202        // maybe_init_writer will re-create a new file.
203        if let Some(mut current_writer) = mem::take(&mut self.writer) {
204            let mut stats = mem::take(stats);
205            // At least one row has been written.
206            assert!(stats.num_rows > 0);
207
208            debug!(
209                "Finishing current file {}, file size: {}, num rows: {}",
210                self.current_file,
211                self.bytes_written.load(Ordering::Relaxed),
212                stats.num_rows
213            );
214
215            // Finish indexer and writer.
216            // safety: writer and index can only be both present or not.
217            let mut index_output = IndexOutput::default();
218            match self.index_config.build_mode {
219                IndexBuildMode::Sync => {
220                    index_output = self.current_indexer.as_mut().unwrap().finish().await;
221                }
222                IndexBuildMode::Async => {
223                    debug!(
224                        "Index for file {} will be built asynchronously later",
225                        self.current_file
226                    );
227                }
228            }
229            current_writer.flush().await.context(WriteParquetSnafu)?;
230
231            let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
232            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
233
234            // Safety: num rows > 0 so we must have min/max.
235            let time_range = stats.time_range.unwrap();
236
237            let max_row_group_uncompressed_size: u64 = parquet_metadata
238                .row_groups()
239                .iter()
240                .map(|rg| {
241                    rg.columns()
242                        .iter()
243                        .map(|c| c.uncompressed_size() as u64)
244                        .sum::<u64>()
245                })
246                .max()
247                .unwrap_or(0);
248            let num_series = stats.series_estimator.finish();
249            ssts.push(SstInfo {
250                file_id: self.current_file,
251                time_range,
252                file_size,
253                max_row_group_uncompressed_size,
254                num_rows: stats.num_rows,
255                num_row_groups: parquet_metadata.num_row_groups() as u64,
256                file_metadata: Some(Arc::new(parquet_metadata)),
257                index_metadata: index_output,
258                num_series,
259            });
260            self.current_file = FileId::random();
261            self.bytes_written.store(0, Ordering::Relaxed)
262        };
263
264        Ok(())
265    }
266
267    /// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file.
268    ///
269    /// Returns the [SstInfo] if the SST is written.
270    pub async fn write_all_flat(
271        &mut self,
272        source: FlatSource,
273        override_sequence: Option<SequenceNumber>,
274        opts: &WriteOptions,
275    ) -> Result<SstInfoArray> {
276        let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
277
278        if source
279            .schema()
280            .fields()
281            .iter()
282            .any(is_structured_json_field)
283        {
284            options.concretized_json_types = source
285                .schema()
286                .fields()
287                .iter()
288                .filter(|&field| is_structured_json_field(field))
289                .map(|field| (field.name().clone(), field.data_type().clone()))
290                .collect::<HashMap<_, _>>();
291        }
292
293        let converter = FlatBatchConverter::Flat(
294            FlatWriteFormat::new(self.metadata.clone(), &options)
295                .with_override_sequence(override_sequence),
296        );
297        let res = self.write_all_flat_inner(source, &converter, opts).await;
298        if res.is_err() {
299            let file_id = self.current_file;
300            if let Some(cleaner) = &self.file_cleaner {
301                cleaner.clean_by_file_id(file_id).await;
302            }
303        }
304        res
305    }
306
307    /// Iterates FlatSource and writes all RecordBatch in primary-key format to Parquet file.
308    ///
309    /// Returns the [SstInfo] if the SST is written.
310    pub async fn write_all_flat_as_primary_key(
311        &mut self,
312        source: FlatSource,
313        override_sequence: Option<SequenceNumber>,
314        opts: &WriteOptions,
315    ) -> Result<SstInfoArray> {
316        let num_fields = self.metadata.field_columns().count();
317        let converter = FlatBatchConverter::PrimaryKey {
318            format: PrimaryKeyWriteFormat::new(self.metadata.clone())
319                .with_override_sequence(override_sequence),
320            num_fields,
321        };
322        let res = self.write_all_flat_inner(source, &converter, opts).await;
323        if res.is_err() {
324            let file_id = self.current_file;
325            if let Some(cleaner) = &self.file_cleaner {
326                cleaner.clean_by_file_id(file_id).await;
327            }
328        }
329        res
330    }
331
332    async fn write_all_flat_inner(
333        &mut self,
334        mut source: FlatSource,
335        converter: &FlatBatchConverter,
336        opts: &WriteOptions,
337    ) -> Result<SstInfoArray> {
338        let mut results = smallvec![];
339        let mut stats = SourceStats::default();
340
341        while let Some(record_batch) = self
342            .write_next_flat_batch(&mut source, converter, opts)
343            .await
344            .transpose()
345        {
346            match record_batch {
347                Ok(batch) => {
348                    stats.update_flat(&batch)?;
349                    if matches!(self.index_config.build_mode, IndexBuildMode::Sync) {
350                        let start = Instant::now();
351                        // safety: self.current_indexer must be set when first batch has been written.
352                        self.current_indexer
353                            .as_mut()
354                            .unwrap()
355                            .update_flat(&batch)
356                            .await;
357                        self.metrics.update_index += start.elapsed();
358                    }
359                    if let Some(max_file_size) = opts.max_file_size
360                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
361                    {
362                        self.finish_current_file(&mut results, &mut stats).await?;
363                    }
364                }
365                Err(e) => {
366                    if let Some(indexer) = &mut self.current_indexer {
367                        indexer.abort().await;
368                    }
369                    return Err(e);
370                }
371            }
372        }
373
374        self.finish_current_file(&mut results, &mut stats).await?;
375
376        // object_store.write will make sure all bytes are written or an error is raised.
377        Ok(results)
378    }
379
380    /// Customizes per-column config according to schema and maybe column cardinality.
381    fn customize_column_config(
382        builder: WriterPropertiesBuilder,
383        region_metadata: &RegionMetadataRef,
384    ) -> WriterPropertiesBuilder {
385        let ts_col = ColumnPath::new(vec![
386            region_metadata
387                .time_index_column()
388                .column_schema
389                .name
390                .clone(),
391        ]);
392        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
393        let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
394
395        builder
396            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
397            .set_column_dictionary_enabled(seq_col, false)
398            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
399            .set_column_dictionary_enabled(ts_col, false)
400            .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
401    }
402
403    async fn write_next_flat_batch(
404        &mut self,
405        source: &mut FlatSource,
406        converter: &FlatBatchConverter,
407        opts: &WriteOptions,
408    ) -> Result<Option<RecordBatch>> {
409        let start = Instant::now();
410        let Some(record_batch) = source.next_batch().await? else {
411            return Ok(None);
412        };
413        self.metrics.iter_source += start.elapsed();
414
415        let arrow_batch = converter.convert_batch(&record_batch)?;
416
417        let start = Instant::now();
418        self.maybe_init_writer(arrow_batch.schema_ref(), opts)
419            .await?
420            .write(&arrow_batch)
421            .await
422            .context(WriteParquetSnafu)?;
423        self.metrics.write_batch += start.elapsed();
424        // Return original flat batch for stats/indexer which use flat layout.
425        Ok(Some(record_batch))
426    }
427
428    async fn maybe_init_writer(
429        &mut self,
430        schema: &SchemaRef,
431        opts: &WriteOptions,
432    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
433        if let Some(ref mut w) = self.writer {
434            Ok(w)
435        } else {
436            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
437            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
438
439            // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
440            let props_builder = WriterProperties::builder()
441                .set_key_value_metadata(Some(vec![key_value_meta]))
442                .set_compression(Compression::ZSTD(ZstdLevel::default()))
443                .set_encoding(Encoding::PLAIN)
444                .set_max_row_group_row_count(Some(opts.row_group_size))
445                .set_column_index_truncate_length(None)
446                .set_statistics_truncate_length(None);
447
448            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
449            let writer_props = props_builder.build();
450
451            let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
452                self.metadata.region_id,
453                self.current_file,
454            ));
455            let writer = SizeAwareWriter::new(
456                self.writer_factory.create(&sst_file_path).await?,
457                self.bytes_written.clone(),
458            );
459            let arrow_writer =
460                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
461                    .context(WriteParquetSnafu)?;
462            self.writer = Some(arrow_writer);
463
464            let indexer = self.indexer_builder.build(self.current_file, 0).await;
465            self.current_indexer = Some(indexer);
466
467            // safety: self.writer is assigned above
468            Ok(self.writer.as_mut().unwrap())
469        }
470    }
471}
472
473#[derive(Default)]
474struct SourceStats {
475    /// Number of rows fetched.
476    num_rows: usize,
477    /// Time range of fetched batches.
478    time_range: Option<(Timestamp, Timestamp)>,
479    /// Series estimator for computing num_series.
480    series_estimator: SeriesEstimator,
481}
482
483impl SourceStats {
484    fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
485        if record_batch.num_rows() == 0 {
486            return Ok(());
487        }
488
489        self.num_rows += record_batch.num_rows();
490        self.series_estimator.update_flat(record_batch);
491
492        // Get the timestamp column by index
493        let time_index_col_idx = time_index_column_index(record_batch.num_columns());
494        let timestamp_array = record_batch.column(time_index_col_idx);
495
496        if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
497            if let Some(time_range) = &mut self.time_range {
498                time_range.0 = time_range.0.min(min_in_batch);
499                time_range.1 = time_range.1.max(max_in_batch);
500            } else {
501                self.time_range = Some((min_in_batch, max_in_batch));
502            }
503        }
504
505        Ok(())
506    }
507}
508
509/// Gets min and max timestamp from an timestamp array.
510fn timestamp_range_from_array(
511    timestamp_array: &ArrayRef,
512) -> Result<Option<(Timestamp, Timestamp)>> {
513    let (min_ts, max_ts) = match timestamp_array.data_type() {
514        DataType::Timestamp(TimeUnit::Second, _) => {
515            let array = timestamp_array
516                .as_any()
517                .downcast_ref::<TimestampSecondArray>()
518                .unwrap();
519            let min_val = min(array).map(Timestamp::new_second);
520            let max_val = max(array).map(Timestamp::new_second);
521            (min_val, max_val)
522        }
523        DataType::Timestamp(TimeUnit::Millisecond, _) => {
524            let array = timestamp_array
525                .as_any()
526                .downcast_ref::<TimestampMillisecondArray>()
527                .unwrap();
528            let min_val = min(array).map(Timestamp::new_millisecond);
529            let max_val = max(array).map(Timestamp::new_millisecond);
530            (min_val, max_val)
531        }
532        DataType::Timestamp(TimeUnit::Microsecond, _) => {
533            let array = timestamp_array
534                .as_any()
535                .downcast_ref::<TimestampMicrosecondArray>()
536                .unwrap();
537            let min_val = min(array).map(Timestamp::new_microsecond);
538            let max_val = max(array).map(Timestamp::new_microsecond);
539            (min_val, max_val)
540        }
541        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
542            let array = timestamp_array
543                .as_any()
544                .downcast_ref::<TimestampNanosecondArray>()
545                .unwrap();
546            let min_val = min(array).map(Timestamp::new_nanosecond);
547            let max_val = max(array).map(Timestamp::new_nanosecond);
548            (min_val, max_val)
549        }
550        _ => {
551            return UnexpectedSnafu {
552                reason: format!(
553                    "Unexpected data type of time index: {:?}",
554                    timestamp_array.data_type()
555                ),
556            }
557            .fail();
558        }
559    };
560
561    // If min timestamp exists, max timestamp should also exist.
562    Ok(min_ts.zip(max_ts))
563}
564
565/// Workaround for [AsyncArrowWriter] does not provide a method to
566/// get total bytes written after close.
567struct SizeAwareWriter<W> {
568    inner: W,
569    size: Arc<AtomicUsize>,
570}
571
572impl<W> SizeAwareWriter<W> {
573    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
574        Self {
575            inner,
576            size: size.clone(),
577        }
578    }
579}
580
581impl<W> AsyncWrite for SizeAwareWriter<W>
582where
583    W: AsyncWrite + Unpin,
584{
585    fn poll_write(
586        mut self: Pin<&mut Self>,
587        cx: &mut Context<'_>,
588        buf: &[u8],
589    ) -> Poll<std::result::Result<usize, std::io::Error>> {
590        let this = self.as_mut().get_mut();
591
592        match Pin::new(&mut this.inner).poll_write(cx, buf) {
593            Poll::Ready(Ok(bytes_written)) => {
594                this.size.fetch_add(bytes_written, Ordering::Relaxed);
595                Poll::Ready(Ok(bytes_written))
596            }
597            other => other,
598        }
599    }
600
601    fn poll_flush(
602        mut self: Pin<&mut Self>,
603        cx: &mut Context<'_>,
604    ) -> Poll<std::result::Result<(), std::io::Error>> {
605        Pin::new(&mut self.inner).poll_flush(cx)
606    }
607
608    fn poll_shutdown(
609        mut self: Pin<&mut Self>,
610        cx: &mut Context<'_>,
611    ) -> Poll<std::result::Result<(), std::io::Error>> {
612        Pin::new(&mut self.inner).poll_shutdown(cx)
613    }
614}