1use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28    ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29    TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
44use store_api::storage::{FileId, SequenceNumber};
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::config::{IndexBuildMode, IndexConfig};
50use crate::error::{
51    InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
52};
53use crate::read::{Batch, FlatSource, Source};
54use crate::sst::file::RegionFileId;
55use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
56use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
57use crate::sst::parquet::format::PrimaryKeyWriteFormat;
58use crate::sst::parquet::helper::parse_parquet_metadata;
59use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
60use crate::sst::{
61    DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
62};
63
64pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
66    path_provider: P,
68    writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
69    current_file: FileId,
71    writer_factory: F,
72    metadata: RegionMetadataRef,
74    index_config: IndexConfig,
76    indexer_builder: I,
78    current_indexer: Option<Indexer>,
80    bytes_written: Arc<AtomicUsize>,
81    file_cleaner: Option<TempFileCleaner>,
83    metrics: &'a mut Metrics,
85}
86
87pub trait WriterFactory {
88    type Writer: AsyncWrite + Send + Unpin;
89    fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
90}
91
92pub struct ObjectStoreWriterFactory {
93    object_store: ObjectStore,
94}
95
96impl WriterFactory for ObjectStoreWriterFactory {
97    type Writer = Compat<FuturesAsyncWriter>;
98
99    async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
100        self.object_store
101            .writer_with(file_path)
102            .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
103            .concurrent(DEFAULT_WRITE_CONCURRENCY)
104            .await
105            .map(|v| v.into_futures_async_write().compat_write())
106            .context(OpenDalSnafu)
107    }
108}
109
110impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
111where
112    P: FilePathProvider,
113    I: IndexerBuilder,
114{
115    pub async fn new_with_object_store(
116        object_store: ObjectStore,
117        metadata: RegionMetadataRef,
118        index_config: IndexConfig,
119        indexer_builder: I,
120        path_provider: P,
121        metrics: &'a mut Metrics,
122    ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
123        ParquetWriter::new(
124            ObjectStoreWriterFactory { object_store },
125            metadata,
126            index_config,
127            indexer_builder,
128            path_provider,
129            metrics,
130        )
131        .await
132    }
133
134    pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
135        self.file_cleaner = Some(cleaner);
136        self
137    }
138}
139
140impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
141where
142    F: WriterFactory,
143    I: IndexerBuilder,
144    P: FilePathProvider,
145{
146    pub async fn new(
148        factory: F,
149        metadata: RegionMetadataRef,
150        index_config: IndexConfig,
151        indexer_builder: I,
152        path_provider: P,
153        metrics: &'a mut Metrics,
154    ) -> ParquetWriter<'a, F, I, P> {
155        let init_file = FileId::random();
156        let indexer = indexer_builder.build(init_file).await;
157
158        ParquetWriter {
159            path_provider,
160            writer: None,
161            current_file: init_file,
162            writer_factory: factory,
163            metadata,
164            index_config,
165            indexer_builder,
166            current_indexer: Some(indexer),
167            bytes_written: Arc::new(AtomicUsize::new(0)),
168            file_cleaner: None,
169            metrics,
170        }
171    }
172
173    async fn finish_current_file(
175        &mut self,
176        ssts: &mut SstInfoArray,
177        stats: &mut SourceStats,
178    ) -> Result<()> {
179        if let Some(mut current_writer) = mem::take(&mut self.writer) {
181            let mut stats = mem::take(stats);
182            assert!(stats.num_rows > 0);
184
185            debug!(
186                "Finishing current file {}, file size: {}, num rows: {}",
187                self.current_file,
188                self.bytes_written.load(Ordering::Relaxed),
189                stats.num_rows
190            );
191
192            let mut index_output = IndexOutput::default();
195            match self.index_config.build_mode {
196                IndexBuildMode::Sync => {
197                    index_output = self.current_indexer.as_mut().unwrap().finish().await;
198                }
199                IndexBuildMode::Async => {
200                    debug!(
201                        "Index for file {} will be built asynchronously later",
202                        self.current_file
203                    );
204                }
205            }
206            current_writer.flush().await.context(WriteParquetSnafu)?;
207
208            let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
209            let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
210
211            let time_range = stats.time_range.unwrap();
213
214            let parquet_metadata = parse_parquet_metadata(file_meta)?;
216            let num_series = stats.series_estimator.finish();
217            ssts.push(SstInfo {
218                file_id: self.current_file,
219                time_range,
220                file_size,
221                num_rows: stats.num_rows,
222                num_row_groups: parquet_metadata.num_row_groups() as u64,
223                file_metadata: Some(Arc::new(parquet_metadata)),
224                index_metadata: index_output,
225                num_series,
226            });
227            self.current_file = FileId::random();
228            self.bytes_written.store(0, Ordering::Relaxed)
229        };
230
231        Ok(())
232    }
233
234    pub async fn write_all(
238        &mut self,
239        source: Source,
240        override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
242    ) -> Result<SstInfoArray> {
243        let res = self
244            .write_all_without_cleaning(source, override_sequence, opts)
245            .await;
246        if res.is_err() {
247            let file_id = self.current_file;
249            if let Some(cleaner) = &self.file_cleaner {
250                cleaner.clean_by_file_id(file_id).await;
251            }
252        }
253        res
254    }
255
256    async fn write_all_without_cleaning(
257        &mut self,
258        mut source: Source,
259        override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
261    ) -> Result<SstInfoArray> {
262        let mut results = smallvec![];
263        let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone())
264            .with_override_sequence(override_sequence);
265        let mut stats = SourceStats::default();
266
267        while let Some(res) = self
268            .write_next_batch(&mut source, &write_format, opts)
269            .await
270            .transpose()
271        {
272            match res {
273                Ok(mut batch) => {
274                    stats.update(&batch);
275                    let start = Instant::now();
276                    match self.index_config.build_mode {
278                        IndexBuildMode::Sync => {
279                            self.current_indexer
280                                .as_mut()
281                                .unwrap()
282                                .update(&mut batch)
283                                .await;
284                        }
285                        IndexBuildMode::Async => {}
286                    }
287                    self.metrics.update_index += start.elapsed();
288                    if let Some(max_file_size) = opts.max_file_size
289                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
290                    {
291                        self.finish_current_file(&mut results, &mut stats).await?;
292                    }
293                }
294                Err(e) => {
295                    if let Some(indexer) = &mut self.current_indexer {
296                        indexer.abort().await;
297                    }
298                    return Err(e);
299                }
300            }
301        }
302
303        self.finish_current_file(&mut results, &mut stats).await?;
304
305        Ok(results)
307    }
308
309    pub async fn write_all_flat(
313        &mut self,
314        source: FlatSource,
315        opts: &WriteOptions,
316    ) -> Result<SstInfoArray> {
317        let res = self.write_all_flat_without_cleaning(source, opts).await;
318        if res.is_err() {
319            let file_id = self.current_file;
321            if let Some(cleaner) = &self.file_cleaner {
322                cleaner.clean_by_file_id(file_id).await;
323            }
324        }
325        res
326    }
327
328    async fn write_all_flat_without_cleaning(
329        &mut self,
330        mut source: FlatSource,
331        opts: &WriteOptions,
332    ) -> Result<SstInfoArray> {
333        let mut results = smallvec![];
334        let flat_format = FlatWriteFormat::new(
335            self.metadata.clone(),
336            &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
337        )
338        .with_override_sequence(None);
339        let mut stats = SourceStats::default();
340
341        while let Some(record_batch) = self
342            .write_next_flat_batch(&mut source, &flat_format, opts)
343            .await
344            .transpose()
345        {
346            match record_batch {
347                Ok(batch) => {
348                    stats.update_flat(&batch)?;
349                    let start = Instant::now();
350                    self.current_indexer
352                        .as_mut()
353                        .unwrap()
354                        .update_flat(&batch)
355                        .await;
356                    self.metrics.update_index += start.elapsed();
357                    if let Some(max_file_size) = opts.max_file_size
358                        && self.bytes_written.load(Ordering::Relaxed) > max_file_size
359                    {
360                        self.finish_current_file(&mut results, &mut stats).await?;
361                    }
362                }
363                Err(e) => {
364                    if let Some(indexer) = &mut self.current_indexer {
365                        indexer.abort().await;
366                    }
367                    return Err(e);
368                }
369            }
370        }
371
372        self.finish_current_file(&mut results, &mut stats).await?;
373
374        Ok(results)
376    }
377
378    fn customize_column_config(
380        builder: WriterPropertiesBuilder,
381        region_metadata: &RegionMetadataRef,
382    ) -> WriterPropertiesBuilder {
383        let ts_col = ColumnPath::new(vec![
384            region_metadata
385                .time_index_column()
386                .column_schema
387                .name
388                .clone(),
389        ]);
390        let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
391
392        builder
393            .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
394            .set_column_dictionary_enabled(seq_col, false)
395            .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
396            .set_column_dictionary_enabled(ts_col, false)
397    }
398
399    async fn write_next_batch(
400        &mut self,
401        source: &mut Source,
402        write_format: &PrimaryKeyWriteFormat,
403        opts: &WriteOptions,
404    ) -> Result<Option<Batch>> {
405        let start = Instant::now();
406        let Some(batch) = source.next_batch().await? else {
407            return Ok(None);
408        };
409        self.metrics.iter_source += start.elapsed();
410
411        let arrow_batch = write_format.convert_batch(&batch)?;
412
413        let start = Instant::now();
414        self.maybe_init_writer(write_format.arrow_schema(), opts)
415            .await?
416            .write(&arrow_batch)
417            .await
418            .context(WriteParquetSnafu)?;
419        self.metrics.write_batch += start.elapsed();
420        Ok(Some(batch))
421    }
422
423    async fn write_next_flat_batch(
424        &mut self,
425        source: &mut FlatSource,
426        flat_format: &FlatWriteFormat,
427        opts: &WriteOptions,
428    ) -> Result<Option<RecordBatch>> {
429        let start = Instant::now();
430        let Some(record_batch) = source.next_batch().await? else {
431            return Ok(None);
432        };
433        self.metrics.iter_source += start.elapsed();
434
435        let arrow_batch = flat_format.convert_batch(&record_batch)?;
436
437        let start = Instant::now();
438        self.maybe_init_writer(flat_format.arrow_schema(), opts)
439            .await?
440            .write(&arrow_batch)
441            .await
442            .context(WriteParquetSnafu)?;
443        self.metrics.write_batch += start.elapsed();
444        Ok(Some(record_batch))
445    }
446
447    async fn maybe_init_writer(
448        &mut self,
449        schema: &SchemaRef,
450        opts: &WriteOptions,
451    ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
452        if let Some(ref mut w) = self.writer {
453            Ok(w)
454        } else {
455            let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
456            let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
457
458            let props_builder = WriterProperties::builder()
460                .set_key_value_metadata(Some(vec![key_value_meta]))
461                .set_compression(Compression::ZSTD(ZstdLevel::default()))
462                .set_encoding(Encoding::PLAIN)
463                .set_max_row_group_size(opts.row_group_size)
464                .set_column_index_truncate_length(None)
465                .set_statistics_truncate_length(None);
466
467            let props_builder = Self::customize_column_config(props_builder, &self.metadata);
468            let writer_props = props_builder.build();
469
470            let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
471                self.metadata.region_id,
472                self.current_file,
473            ));
474            let writer = SizeAwareWriter::new(
475                self.writer_factory.create(&sst_file_path).await?,
476                self.bytes_written.clone(),
477            );
478            let arrow_writer =
479                AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
480                    .context(WriteParquetSnafu)?;
481            self.writer = Some(arrow_writer);
482
483            let indexer = self.indexer_builder.build(self.current_file).await;
484            self.current_indexer = Some(indexer);
485
486            Ok(self.writer.as_mut().unwrap())
488        }
489    }
490}
491
492#[derive(Default)]
493struct SourceStats {
494    num_rows: usize,
496    time_range: Option<(Timestamp, Timestamp)>,
498    series_estimator: SeriesEstimator,
500}
501
502impl SourceStats {
503    fn update(&mut self, batch: &Batch) {
504        if batch.is_empty() {
505            return;
506        }
507
508        self.num_rows += batch.num_rows();
509        self.series_estimator.update(batch);
510        let (min_in_batch, max_in_batch) = (
512            batch.first_timestamp().unwrap(),
513            batch.last_timestamp().unwrap(),
514        );
515        if let Some(time_range) = &mut self.time_range {
516            time_range.0 = time_range.0.min(min_in_batch);
517            time_range.1 = time_range.1.max(max_in_batch);
518        } else {
519            self.time_range = Some((min_in_batch, max_in_batch));
520        }
521    }
522
523    fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
524        if record_batch.num_rows() == 0 {
525            return Ok(());
526        }
527
528        self.num_rows += record_batch.num_rows();
529        self.series_estimator.update_flat(record_batch);
530
531        let time_index_col_idx = time_index_column_index(record_batch.num_columns());
533        let timestamp_array = record_batch.column(time_index_col_idx);
534
535        if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
536            if let Some(time_range) = &mut self.time_range {
537                time_range.0 = time_range.0.min(min_in_batch);
538                time_range.1 = time_range.1.max(max_in_batch);
539            } else {
540                self.time_range = Some((min_in_batch, max_in_batch));
541            }
542        }
543
544        Ok(())
545    }
546}
547
548fn timestamp_range_from_array(
550    timestamp_array: &ArrayRef,
551) -> Result<Option<(Timestamp, Timestamp)>> {
552    let (min_ts, max_ts) = match timestamp_array.data_type() {
553        DataType::Timestamp(TimeUnit::Second, _) => {
554            let array = timestamp_array
555                .as_any()
556                .downcast_ref::<TimestampSecondArray>()
557                .unwrap();
558            let min_val = min(array).map(Timestamp::new_second);
559            let max_val = max(array).map(Timestamp::new_second);
560            (min_val, max_val)
561        }
562        DataType::Timestamp(TimeUnit::Millisecond, _) => {
563            let array = timestamp_array
564                .as_any()
565                .downcast_ref::<TimestampMillisecondArray>()
566                .unwrap();
567            let min_val = min(array).map(Timestamp::new_millisecond);
568            let max_val = max(array).map(Timestamp::new_millisecond);
569            (min_val, max_val)
570        }
571        DataType::Timestamp(TimeUnit::Microsecond, _) => {
572            let array = timestamp_array
573                .as_any()
574                .downcast_ref::<TimestampMicrosecondArray>()
575                .unwrap();
576            let min_val = min(array).map(Timestamp::new_microsecond);
577            let max_val = max(array).map(Timestamp::new_microsecond);
578            (min_val, max_val)
579        }
580        DataType::Timestamp(TimeUnit::Nanosecond, _) => {
581            let array = timestamp_array
582                .as_any()
583                .downcast_ref::<TimestampNanosecondArray>()
584                .unwrap();
585            let min_val = min(array).map(Timestamp::new_nanosecond);
586            let max_val = max(array).map(Timestamp::new_nanosecond);
587            (min_val, max_val)
588        }
589        _ => {
590            return UnexpectedSnafu {
591                reason: format!(
592                    "Unexpected data type of time index: {:?}",
593                    timestamp_array.data_type()
594                ),
595            }
596            .fail();
597        }
598    };
599
600    Ok(min_ts.zip(max_ts))
602}
603
604struct SizeAwareWriter<W> {
607    inner: W,
608    size: Arc<AtomicUsize>,
609}
610
611impl<W> SizeAwareWriter<W> {
612    fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
613        Self {
614            inner,
615            size: size.clone(),
616        }
617    }
618}
619
620impl<W> AsyncWrite for SizeAwareWriter<W>
621where
622    W: AsyncWrite + Unpin,
623{
624    fn poll_write(
625        mut self: Pin<&mut Self>,
626        cx: &mut Context<'_>,
627        buf: &[u8],
628    ) -> Poll<std::result::Result<usize, std::io::Error>> {
629        let this = self.as_mut().get_mut();
630
631        match Pin::new(&mut this.inner).poll_write(cx, buf) {
632            Poll::Ready(Ok(bytes_written)) => {
633                this.size.fetch_add(bytes_written, Ordering::Relaxed);
634                Poll::Ready(Ok(bytes_written))
635            }
636            other => other,
637        }
638    }
639
640    fn poll_flush(
641        mut self: Pin<&mut Self>,
642        cx: &mut Context<'_>,
643    ) -> Poll<std::result::Result<(), std::io::Error>> {
644        Pin::new(&mut self.inner).poll_flush(cx)
645    }
646
647    fn poll_shutdown(
648        mut self: Pin<&mut Self>,
649        cx: &mut Context<'_>,
650    ) -> Poll<std::result::Result<(), std::io::Error>> {
651        Pin::new(&mut self.inner).poll_shutdown(cx)
652    }
653}