mito2/sst/parquet/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet writer.
16
17use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28    ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29    TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
44use store_api::storage::SequenceNumber;
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::error::{
50    InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
51};
52use crate::read::{Batch, FlatSource, Source};
53use crate::sst::file::{FileId, RegionFileId};
54use crate::sst::index::{Indexer, IndexerBuilder};
55use crate::sst::parquet::flat_format::{time_index_column_index, FlatWriteFormat};
56use crate::sst::parquet::format::PrimaryKeyWriteFormat;
57use crate::sst::parquet::helper::parse_parquet_metadata;
58use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
59use crate::sst::{FlatSchemaOptions, DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
60
61/// Parquet SST writer.
62pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
63    /// Path provider that creates SST and index file paths according to file id.
64    path_provider: P,
65    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
66    /// Current active file id.
67    current_file: FileId,
68    writer_factory: F,
69    /// Region metadata of the source and the target SST.
70    metadata: RegionMetadataRef,
71    /// Indexer build that can create indexer for multiple files.
72    indexer_builder: I,
73    /// Current active indexer.
74    current_indexer: Option<Indexer>,
75    bytes_written: Arc<AtomicUsize>,
76    /// Cleaner to remove temp files on failure.
77    file_cleaner: Option<TempFileCleaner>,
78    /// Write metrics
79    metrics: Metrics,
80}
81
82pub trait WriterFactory {
83    type Writer: AsyncWrite + Send + Unpin;
84    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
85}
86
87pub struct ObjectStoreWriterFactory {
88    object_store: ObjectStore,
89}
90
91impl WriterFactory for ObjectStoreWriterFactory {
92    type Writer = Compat<FuturesAsyncWriter>;
93
94    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
95        self.object_store
96            .writer_with(file_path)
97            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
98            .concurrent(DEFAULT_WRITE_CONCURRENCY)
99            .await
100            .map(|v| v.into_futures_async_write().compat_write())
101            .context(OpenDalSnafu)
102    }
103}
104
105impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
106where
107    P: FilePathProvider,
108    I: IndexerBuilder,
109{
110    pub async fn new_with_object_store(
111        object_store: ObjectStore,
112        metadata: RegionMetadataRef,
113        indexer_builder: I,
114        path_provider: P,
115        metrics: Metrics,
116    ) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
117        ParquetWriter::new(
118            ObjectStoreWriterFactory { object_store },
119            metadata,
120            indexer_builder,
121            path_provider,
122            metrics,
123        )
124        .await
125    }
126
127    pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
128        self.file_cleaner = Some(cleaner);
129        self
130    }
131}
132
133impl<F, I, P> ParquetWriter<F, I, P>
134where
135    F: WriterFactory,
136    I: IndexerBuilder,
137    P: FilePathProvider,
138{
139    /// Creates a new parquet SST writer.
140    pub async fn new(
141        factory: F,
142        metadata: RegionMetadataRef,
143        indexer_builder: I,
144        path_provider: P,
145        metrics: Metrics,
146    ) -> ParquetWriter<F, I, P> {
147        let init_file = FileId::random();
148        let indexer = indexer_builder.build(init_file).await;
149
150        ParquetWriter {
151            path_provider,
152            writer: None,
153            current_file: init_file,
154            writer_factory: factory,
155            metadata,
156            indexer_builder,
157            current_indexer: Some(indexer),
158            bytes_written: Arc::new(AtomicUsize::new(0)),
159            file_cleaner: None,
160            metrics,
161        }
162    }
163
164    /// Finishes current SST file and index file.
165    async fn finish_current_file(
166        &mut self,
167        ssts: &mut SstInfoArray,
168        stats: &mut SourceStats,
169    ) -> Result<()> {
170        // maybe_init_writer will re-create a new file.
171        if let Some(mut current_writer) = mem::take(&mut self.writer) {
172            let stats = mem::take(stats);
173            // At least one row has been written.
174            assert!(stats.num_rows > 0);
175
176            debug!(
177                "Finishing current file {}, file size: {}, num rows: {}",
178                self.current_file,
179                self.bytes_written.load(Ordering::Relaxed),
180                stats.num_rows
181            );
182
183            // Finish indexer and writer.
184            // safety: writer and index can only be both present or not.
185            let index_output = self.current_indexer.as_mut().unwrap().finish().await;
186            current_writer.flush().await.context(WriteParquetSnafu)?;
187
188            let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
189            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
190
191            // Safety: num rows > 0 so we must have min/max.
192            let time_range = stats.time_range.unwrap();
193
194            // convert FileMetaData to ParquetMetaData
195            let parquet_metadata = parse_parquet_metadata(file_meta)?;
196            ssts.push(SstInfo {
197                file_id: self.current_file,
198                time_range,
199                file_size,
200                num_rows: stats.num_rows,
201                num_row_groups: parquet_metadata.num_row_groups() as u64,
202                file_metadata: Some(Arc::new(parquet_metadata)),
203                index_metadata: index_output,
204            });
205            self.current_file = FileId::random();
206            self.bytes_written.store(0, Ordering::Relaxed)
207        };
208
209        Ok(())
210    }
211
212    /// Iterates source and writes all rows to Parquet file.
213    ///
214    /// Returns the [SstInfo] if the SST is written.
215    pub async fn write_all(
216        &mut self,
217        source: Source,
218        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
219        opts: &WriteOptions,
220    ) -> Result<SstInfoArray> {
221        let res = self
222            .write_all_without_cleaning(source, override_sequence, opts)
223            .await;
224        if res.is_err() {
225            // Clean tmp files explicitly on failure.
226            let file_id = self.current_file;
227            if let Some(cleaner) = &self.file_cleaner {
228                cleaner.clean_by_file_id(file_id).await;
229            }
230        }
231        res
232    }
233
234    async fn write_all_without_cleaning(
235        &mut self,
236        mut source: Source,
237        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
238        opts: &WriteOptions,
239    ) -> Result<SstInfoArray> {
240        let mut results = smallvec![];
241        let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone())
242            .with_override_sequence(override_sequence);
243        let mut stats = SourceStats::default();
244
245        while let Some(res) = self
246            .write_next_batch(&mut source, &write_format, opts)
247            .await
248            .transpose()
249        {
250            match res {
251                Ok(mut batch) => {
252                    stats.update(&batch);
253                    let start = Instant::now();
254                    // safety: self.current_indexer must be set when first batch has been written.
255                    self.current_indexer
256                        .as_mut()
257                        .unwrap()
258                        .update(&mut batch)
259                        .await;
260                    self.metrics.update_index += start.elapsed();
261                    if let Some(max_file_size) = opts.max_file_size
262                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
263                    {
264                        self.finish_current_file(&mut results, &mut stats).await?;
265                    }
266                }
267                Err(e) => {
268                    if let Some(indexer) = &mut self.current_indexer {
269                        indexer.abort().await;
270                    }
271                    return Err(e);
272                }
273            }
274        }
275
276        self.finish_current_file(&mut results, &mut stats).await?;
277
278        // object_store.write will make sure all bytes are written or an error is raised.
279        Ok(results)
280    }
281
282    /// Iterates FlatSource and writes all RecordBatch in flat format to Parquet file.
283    ///
284    /// Returns the [SstInfo] if the SST is written.
285    pub async fn write_all_flat(
286        &mut self,
287        source: FlatSource,
288        opts: &WriteOptions,
289    ) -> Result<SstInfoArray> {
290        let res = self.write_all_flat_without_cleaning(source, opts).await;
291        if res.is_err() {
292            // Clean tmp files explicitly on failure.
293            let file_id = self.current_file;
294            if let Some(cleaner) = &self.file_cleaner {
295                cleaner.clean_by_file_id(file_id).await;
296            }
297        }
298        res
299    }
300
301    async fn write_all_flat_without_cleaning(
302        &mut self,
303        mut source: FlatSource,
304        opts: &WriteOptions,
305    ) -> Result<SstInfoArray> {
306        let mut results = smallvec![];
307        let flat_format =
308            FlatWriteFormat::new(self.metadata.clone(), &FlatSchemaOptions::default())
309                .with_override_sequence(None);
310        let mut stats = SourceStats::default();
311
312        while let Some(record_batch) = self
313            .write_next_flat_batch(&mut source, &flat_format, opts)
314            .await
315            .transpose()
316        {
317            match record_batch {
318                Ok(batch) => {
319                    stats.update_flat(&batch)?;
320                    let start = Instant::now();
321                    // safety: self.current_indexer must be set when first batch has been written.
322                    self.current_indexer
323                        .as_mut()
324                        .unwrap()
325                        .update_flat(&batch)
326                        .await;
327                    self.metrics.update_index += start.elapsed();
328                    if let Some(max_file_size) = opts.max_file_size
329                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
330                    {
331                        self.finish_current_file(&mut results, &mut stats).await?;
332                    }
333                }
334                Err(e) => {
335                    if let Some(indexer) = &mut self.current_indexer {
336                        indexer.abort().await;
337                    }
338                    return Err(e);
339                }
340            }
341        }
342
343        self.finish_current_file(&mut results, &mut stats).await?;
344
345        // object_store.write will make sure all bytes are written or an error is raised.
346        Ok(results)
347    }
348
349    /// Customizes per-column config according to schema and maybe column cardinality.
350    fn customize_column_config(
351        builder: WriterPropertiesBuilder,
352        region_metadata: &RegionMetadataRef,
353    ) -> WriterPropertiesBuilder {
354        let ts_col = ColumnPath::new(vec![region_metadata
355            .time_index_column()
356            .column_schema
357            .name
358            .clone()]);
359        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
360
361        builder
362            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
363            .set_column_dictionary_enabled(seq_col, false)
364            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
365            .set_column_dictionary_enabled(ts_col, false)
366    }
367
368    async fn write_next_batch(
369        &mut self,
370        source: &mut Source,
371        write_format: &PrimaryKeyWriteFormat,
372        opts: &WriteOptions,
373    ) -> Result<Option<Batch>> {
374        let start = Instant::now();
375        let Some(batch) = source.next_batch().await? else {
376            return Ok(None);
377        };
378        self.metrics.iter_source += start.elapsed();
379
380        let arrow_batch = write_format.convert_batch(&batch)?;
381
382        let start = Instant::now();
383        self.maybe_init_writer(write_format.arrow_schema(), opts)
384            .await?
385            .write(&arrow_batch)
386            .await
387            .context(WriteParquetSnafu)?;
388        self.metrics.write_batch += start.elapsed();
389        Ok(Some(batch))
390    }
391
392    async fn write_next_flat_batch(
393        &mut self,
394        source: &mut FlatSource,
395        flat_format: &FlatWriteFormat,
396        opts: &WriteOptions,
397    ) -> Result<Option<RecordBatch>> {
398        let start = Instant::now();
399        let Some(record_batch) = source.next_batch().await? else {
400            return Ok(None);
401        };
402        self.metrics.iter_source += start.elapsed();
403
404        let arrow_batch = flat_format.convert_batch(&record_batch)?;
405
406        let start = Instant::now();
407        self.maybe_init_writer(flat_format.arrow_schema(), opts)
408            .await?
409            .write(&arrow_batch)
410            .await
411            .context(WriteParquetSnafu)?;
412        self.metrics.write_batch += start.elapsed();
413        Ok(Some(record_batch))
414    }
415
416    async fn maybe_init_writer(
417        &mut self,
418        schema: &SchemaRef,
419        opts: &WriteOptions,
420    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
421        if let Some(ref mut w) = self.writer {
422            Ok(w)
423        } else {
424            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
425            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
426
427            // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
428            let props_builder = WriterProperties::builder()
429                .set_key_value_metadata(Some(vec![key_value_meta]))
430                .set_compression(Compression::ZSTD(ZstdLevel::default()))
431                .set_encoding(Encoding::PLAIN)
432                .set_max_row_group_size(opts.row_group_size);
433
434            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
435            let writer_props = props_builder.build();
436
437            let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
438                self.metadata.region_id,
439                self.current_file,
440            ));
441            let writer = SizeAwareWriter::new(
442                self.writer_factory.create(&sst_file_path).await?,
443                self.bytes_written.clone(),
444            );
445            let arrow_writer =
446                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
447                    .context(WriteParquetSnafu)?;
448            self.writer = Some(arrow_writer);
449
450            let indexer = self.indexer_builder.build(self.current_file).await;
451            self.current_indexer = Some(indexer);
452
453            // safety: self.writer is assigned above
454            Ok(self.writer.as_mut().unwrap())
455        }
456    }
457
458    /// Consumes write and return the collected metrics.
459    pub fn into_metrics(self) -> Metrics {
460        self.metrics
461    }
462}
463
464#[derive(Default)]
465struct SourceStats {
466    /// Number of rows fetched.
467    num_rows: usize,
468    /// Time range of fetched batches.
469    time_range: Option<(Timestamp, Timestamp)>,
470}
471
472impl SourceStats {
473    fn update(&mut self, batch: &Batch) {
474        if batch.is_empty() {
475            return;
476        }
477
478        self.num_rows += batch.num_rows();
479        // Safety: batch is not empty.
480        let (min_in_batch, max_in_batch) = (
481            batch.first_timestamp().unwrap(),
482            batch.last_timestamp().unwrap(),
483        );
484        if let Some(time_range) = &mut self.time_range {
485            time_range.0 = time_range.0.min(min_in_batch);
486            time_range.1 = time_range.1.max(max_in_batch);
487        } else {
488            self.time_range = Some((min_in_batch, max_in_batch));
489        }
490    }
491
492    fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
493        if record_batch.num_rows() == 0 {
494            return Ok(());
495        }
496
497        self.num_rows += record_batch.num_rows();
498
499        // Get the timestamp column by index
500        let time_index_col_idx = time_index_column_index(record_batch.num_columns());
501        let timestamp_array = record_batch.column(time_index_col_idx);
502
503        if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
504            if let Some(time_range) = &mut self.time_range {
505                time_range.0 = time_range.0.min(min_in_batch);
506                time_range.1 = time_range.1.max(max_in_batch);
507            } else {
508                self.time_range = Some((min_in_batch, max_in_batch));
509            }
510        }
511
512        Ok(())
513    }
514}
515
516/// Gets min and max timestamp from an timestamp array.
517fn timestamp_range_from_array(
518    timestamp_array: &ArrayRef,
519) -> Result<Option<(Timestamp, Timestamp)>> {
520    let (min_ts, max_ts) = match timestamp_array.data_type() {
521        DataType::Timestamp(TimeUnit::Second, _) => {
522            let array = timestamp_array
523                .as_any()
524                .downcast_ref::<TimestampSecondArray>()
525                .unwrap();
526            let min_val = min(array).map(Timestamp::new_second);
527            let max_val = max(array).map(Timestamp::new_second);
528            (min_val, max_val)
529        }
530        DataType::Timestamp(TimeUnit::Millisecond, _) => {
531            let array = timestamp_array
532                .as_any()
533                .downcast_ref::<TimestampMillisecondArray>()
534                .unwrap();
535            let min_val = min(array).map(Timestamp::new_millisecond);
536            let max_val = max(array).map(Timestamp::new_millisecond);
537            (min_val, max_val)
538        }
539        DataType::Timestamp(TimeUnit::Microsecond, _) => {
540            let array = timestamp_array
541                .as_any()
542                .downcast_ref::<TimestampMicrosecondArray>()
543                .unwrap();
544            let min_val = min(array).map(Timestamp::new_microsecond);
545            let max_val = max(array).map(Timestamp::new_microsecond);
546            (min_val, max_val)
547        }
548        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
549            let array = timestamp_array
550                .as_any()
551                .downcast_ref::<TimestampNanosecondArray>()
552                .unwrap();
553            let min_val = min(array).map(Timestamp::new_nanosecond);
554            let max_val = max(array).map(Timestamp::new_nanosecond);
555            (min_val, max_val)
556        }
557        _ => {
558            return UnexpectedSnafu {
559                reason: format!(
560                    "Unexpected data type of time index: {:?}",
561                    timestamp_array.data_type()
562                ),
563            }
564            .fail()
565        }
566    };
567
568    // If min timestamp exists, max timestamp should also exist.
569    Ok(min_ts.zip(max_ts))
570}
571
572/// Workaround for [AsyncArrowWriter] does not provide a method to
573/// get total bytes written after close.
574struct SizeAwareWriter<W> {
575    inner: W,
576    size: Arc<AtomicUsize>,
577}
578
579impl<W> SizeAwareWriter<W> {
580    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
581        Self {
582            inner,
583            size: size.clone(),
584        }
585    }
586}
587
588impl<W> AsyncWrite for SizeAwareWriter<W>
589where
590    W: AsyncWrite + Unpin,
591{
592    fn poll_write(
593        mut self: Pin<&mut Self>,
594        cx: &mut Context<'_>,
595        buf: &[u8],
596    ) -> Poll<std::result::Result<usize, std::io::Error>> {
597        let this = self.as_mut().get_mut();
598
599        match Pin::new(&mut this.inner).poll_write(cx, buf) {
600            Poll::Ready(Ok(bytes_written)) => {
601                this.size.fetch_add(bytes_written, Ordering::Relaxed);
602                Poll::Ready(Ok(bytes_written))
603            }
604            other => other,
605        }
606    }
607
608    fn poll_flush(
609        mut self: Pin<&mut Self>,
610        cx: &mut Context<'_>,
611    ) -> Poll<std::result::Result<(), std::io::Error>> {
612        Pin::new(&mut self.inner).poll_flush(cx)
613    }
614
615    fn poll_shutdown(
616        mut self: Pin<&mut Self>,
617        cx: &mut Context<'_>,
618    ) -> Poll<std::result::Result<(), std::io::Error>> {
619        Pin::new(&mut self.inner).poll_shutdown(cx)
620    }
621}