mito2/sst/parquet/
writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Parquet writer.
16
17use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::datatypes::SchemaRef;
28use object_store::{FuturesAsyncWriter, ObjectStore};
29use parquet::arrow::AsyncArrowWriter;
30use parquet::basic::{Compression, Encoding, ZstdLevel};
31use parquet::file::metadata::KeyValue;
32use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
33use parquet::schema::types::ColumnPath;
34use smallvec::smallvec;
35use snafu::ResultExt;
36use store_api::metadata::RegionMetadataRef;
37use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
38use store_api::storage::SequenceNumber;
39use tokio::io::AsyncWrite;
40use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
41
42use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
43use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
44use crate::read::{Batch, Source};
45use crate::sst::file::{FileId, RegionFileId};
46use crate::sst::index::{Indexer, IndexerBuilder};
47use crate::sst::parquet::format::WriteFormat;
48use crate::sst::parquet::helper::parse_parquet_metadata;
49use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
50use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
51
52/// Parquet SST writer.
53pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
54    /// Path provider that creates SST and index file paths according to file id.
55    path_provider: P,
56    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
57    /// Current active file id.
58    current_file: FileId,
59    writer_factory: F,
60    /// Region metadata of the source and the target SST.
61    metadata: RegionMetadataRef,
62    /// Indexer build that can create indexer for multiple files.
63    indexer_builder: I,
64    /// Current active indexer.
65    current_indexer: Option<Indexer>,
66    bytes_written: Arc<AtomicUsize>,
67    /// Cleaner to remove temp files on failure.
68    file_cleaner: Option<TempFileCleaner>,
69    /// Write metrics
70    metrics: Metrics,
71}
72
73pub trait WriterFactory {
74    type Writer: AsyncWrite + Send + Unpin;
75    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
76}
77
78pub struct ObjectStoreWriterFactory {
79    object_store: ObjectStore,
80}
81
82impl WriterFactory for ObjectStoreWriterFactory {
83    type Writer = Compat<FuturesAsyncWriter>;
84
85    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
86        self.object_store
87            .writer_with(file_path)
88            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
89            .concurrent(DEFAULT_WRITE_CONCURRENCY)
90            .await
91            .map(|v| v.into_futures_async_write().compat_write())
92            .context(OpenDalSnafu)
93    }
94}
95
96impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
97where
98    P: FilePathProvider,
99    I: IndexerBuilder,
100{
101    pub async fn new_with_object_store(
102        object_store: ObjectStore,
103        metadata: RegionMetadataRef,
104        indexer_builder: I,
105        path_provider: P,
106        metrics: Metrics,
107    ) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
108        ParquetWriter::new(
109            ObjectStoreWriterFactory { object_store },
110            metadata,
111            indexer_builder,
112            path_provider,
113            metrics,
114        )
115        .await
116    }
117
118    pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
119        self.file_cleaner = Some(cleaner);
120        self
121    }
122}
123
124impl<F, I, P> ParquetWriter<F, I, P>
125where
126    F: WriterFactory,
127    I: IndexerBuilder,
128    P: FilePathProvider,
129{
130    /// Creates a new parquet SST writer.
131    pub async fn new(
132        factory: F,
133        metadata: RegionMetadataRef,
134        indexer_builder: I,
135        path_provider: P,
136        metrics: Metrics,
137    ) -> ParquetWriter<F, I, P> {
138        let init_file = FileId::random();
139        let indexer = indexer_builder.build(init_file).await;
140
141        ParquetWriter {
142            path_provider,
143            writer: None,
144            current_file: init_file,
145            writer_factory: factory,
146            metadata,
147            indexer_builder,
148            current_indexer: Some(indexer),
149            bytes_written: Arc::new(AtomicUsize::new(0)),
150            file_cleaner: None,
151            metrics,
152        }
153    }
154
155    /// Finishes current SST file and index file.
156    async fn finish_current_file(
157        &mut self,
158        ssts: &mut SstInfoArray,
159        stats: &mut SourceStats,
160    ) -> Result<()> {
161        // maybe_init_writer will re-create a new file.
162        if let Some(mut current_writer) = mem::take(&mut self.writer) {
163            let stats = mem::take(stats);
164            // At least one row has been written.
165            assert!(stats.num_rows > 0);
166
167            debug!(
168                "Finishing current file {}, file size: {}, num rows: {}",
169                self.current_file,
170                self.bytes_written.load(Ordering::Relaxed),
171                stats.num_rows
172            );
173
174            // Finish indexer and writer.
175            // safety: writer and index can only be both present or not.
176            let index_output = self.current_indexer.as_mut().unwrap().finish().await;
177            current_writer.flush().await.context(WriteParquetSnafu)?;
178
179            let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
180            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
181
182            // Safety: num rows > 0 so we must have min/max.
183            let time_range = stats.time_range.unwrap();
184
185            // convert FileMetaData to ParquetMetaData
186            let parquet_metadata = parse_parquet_metadata(file_meta)?;
187            ssts.push(SstInfo {
188                file_id: self.current_file,
189                time_range,
190                file_size,
191                num_rows: stats.num_rows,
192                num_row_groups: parquet_metadata.num_row_groups() as u64,
193                file_metadata: Some(Arc::new(parquet_metadata)),
194                index_metadata: index_output,
195            });
196            self.current_file = FileId::random();
197            self.bytes_written.store(0, Ordering::Relaxed)
198        };
199
200        Ok(())
201    }
202
203    /// Iterates source and writes all rows to Parquet file.
204    ///
205    /// Returns the [SstInfo] if the SST is written.
206    pub async fn write_all(
207        &mut self,
208        source: Source,
209        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
210        opts: &WriteOptions,
211    ) -> Result<SstInfoArray> {
212        let res = self
213            .write_all_without_cleaning(source, override_sequence, opts)
214            .await;
215        if res.is_err() {
216            // Clean tmp files explicitly on failure.
217            let file_id = self.current_file;
218            if let Some(cleaner) = &self.file_cleaner {
219                cleaner.clean_by_file_id(file_id).await;
220            }
221        }
222        res
223    }
224
225    async fn write_all_without_cleaning(
226        &mut self,
227        mut source: Source,
228        override_sequence: Option<SequenceNumber>, // override the `sequence` field from `Source`
229        opts: &WriteOptions,
230    ) -> Result<SstInfoArray> {
231        let mut results = smallvec![];
232        let write_format =
233            WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
234        let mut stats = SourceStats::default();
235
236        while let Some(res) = self
237            .write_next_batch(&mut source, &write_format, opts)
238            .await
239            .transpose()
240        {
241            match res {
242                Ok(mut batch) => {
243                    stats.update(&batch);
244                    let start = Instant::now();
245                    // safety: self.current_indexer must be set when first batch has been written.
246                    self.current_indexer
247                        .as_mut()
248                        .unwrap()
249                        .update(&mut batch)
250                        .await;
251                    self.metrics.update_index += start.elapsed();
252                    if let Some(max_file_size) = opts.max_file_size
253                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
254                    {
255                        self.finish_current_file(&mut results, &mut stats).await?;
256                    }
257                }
258                Err(e) => {
259                    if let Some(indexer) = &mut self.current_indexer {
260                        indexer.abort().await;
261                    }
262                    return Err(e);
263                }
264            }
265        }
266
267        self.finish_current_file(&mut results, &mut stats).await?;
268
269        // object_store.write will make sure all bytes are written or an error is raised.
270        Ok(results)
271    }
272
273    /// Customizes per-column config according to schema and maybe column cardinality.
274    fn customize_column_config(
275        builder: WriterPropertiesBuilder,
276        region_metadata: &RegionMetadataRef,
277    ) -> WriterPropertiesBuilder {
278        let ts_col = ColumnPath::new(vec![region_metadata
279            .time_index_column()
280            .column_schema
281            .name
282            .clone()]);
283        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
284
285        builder
286            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
287            .set_column_dictionary_enabled(seq_col, false)
288            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
289            .set_column_dictionary_enabled(ts_col, false)
290    }
291
292    async fn write_next_batch(
293        &mut self,
294        source: &mut Source,
295        write_format: &WriteFormat,
296        opts: &WriteOptions,
297    ) -> Result<Option<Batch>> {
298        let start = Instant::now();
299        let Some(batch) = source.next_batch().await? else {
300            return Ok(None);
301        };
302        self.metrics.iter_source += start.elapsed();
303
304        let arrow_batch = write_format.convert_batch(&batch)?;
305
306        let start = Instant::now();
307        self.maybe_init_writer(write_format.arrow_schema(), opts)
308            .await?
309            .write(&arrow_batch)
310            .await
311            .context(WriteParquetSnafu)?;
312        self.metrics.write_batch += start.elapsed();
313        Ok(Some(batch))
314    }
315
316    async fn maybe_init_writer(
317        &mut self,
318        schema: &SchemaRef,
319        opts: &WriteOptions,
320    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
321        if let Some(ref mut w) = self.writer {
322            Ok(w)
323        } else {
324            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
325            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
326
327            // TODO(yingwen): Find and set proper column encoding for internal columns: op type and tsid.
328            let props_builder = WriterProperties::builder()
329                .set_key_value_metadata(Some(vec![key_value_meta]))
330                .set_compression(Compression::ZSTD(ZstdLevel::default()))
331                .set_encoding(Encoding::PLAIN)
332                .set_max_row_group_size(opts.row_group_size);
333
334            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
335            let writer_props = props_builder.build();
336
337            let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
338                self.metadata.region_id,
339                self.current_file,
340            ));
341            let writer = SizeAwareWriter::new(
342                self.writer_factory.create(&sst_file_path).await?,
343                self.bytes_written.clone(),
344            );
345            let arrow_writer =
346                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
347                    .context(WriteParquetSnafu)?;
348            self.writer = Some(arrow_writer);
349
350            let indexer = self.indexer_builder.build(self.current_file).await;
351            self.current_indexer = Some(indexer);
352
353            // safety: self.writer is assigned above
354            Ok(self.writer.as_mut().unwrap())
355        }
356    }
357
358    /// Consumes write and return the collected metrics.
359    pub fn into_metrics(self) -> Metrics {
360        self.metrics
361    }
362}
363
364#[derive(Default)]
365struct SourceStats {
366    /// Number of rows fetched.
367    num_rows: usize,
368    /// Time range of fetched batches.
369    time_range: Option<(Timestamp, Timestamp)>,
370}
371
372impl SourceStats {
373    fn update(&mut self, batch: &Batch) {
374        if batch.is_empty() {
375            return;
376        }
377
378        self.num_rows += batch.num_rows();
379        // Safety: batch is not empty.
380        let (min_in_batch, max_in_batch) = (
381            batch.first_timestamp().unwrap(),
382            batch.last_timestamp().unwrap(),
383        );
384        if let Some(time_range) = &mut self.time_range {
385            time_range.0 = time_range.0.min(min_in_batch);
386            time_range.1 = time_range.1.max(max_in_batch);
387        } else {
388            self.time_range = Some((min_in_batch, max_in_batch));
389        }
390    }
391}
392
393/// Workaround for [AsyncArrowWriter] does not provide a method to
394/// get total bytes written after close.
395struct SizeAwareWriter<W> {
396    inner: W,
397    size: Arc<AtomicUsize>,
398}
399
400impl<W> SizeAwareWriter<W> {
401    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
402        Self {
403            inner,
404            size: size.clone(),
405        }
406    }
407}
408
409impl<W> AsyncWrite for SizeAwareWriter<W>
410where
411    W: AsyncWrite + Unpin,
412{
413    fn poll_write(
414        mut self: Pin<&mut Self>,
415        cx: &mut Context<'_>,
416        buf: &[u8],
417    ) -> Poll<std::result::Result<usize, std::io::Error>> {
418        let this = self.as_mut().get_mut();
419
420        match Pin::new(&mut this.inner).poll_write(cx, buf) {
421            Poll::Ready(Ok(bytes_written)) => {
422                this.size.fetch_add(bytes_written, Ordering::Relaxed);
423                Poll::Ready(Ok(bytes_written))
424            }
425            other => other,
426        }
427    }
428
429    fn poll_flush(
430        mut self: Pin<&mut Self>,
431        cx: &mut Context<'_>,
432    ) -> Poll<std::result::Result<(), std::io::Error>> {
433        Pin::new(&mut self.inner).poll_flush(cx)
434    }
435
436    fn poll_shutdown(
437        mut self: Pin<&mut Self>,
438        cx: &mut Context<'_>,
439    ) -> Poll<std::result::Result<(), std::io::Error>> {
440        Pin::new(&mut self.inner).poll_shutdown(cx)
441    }
442}