1use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28 ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29 TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
44use store_api::storage::{FileId, SequenceNumber};
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::config::{IndexBuildMode, IndexConfig};
50use crate::error::{
51 InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
52};
53use crate::read::{Batch, FlatSource, Source};
54use crate::sst::file::RegionFileId;
55use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
56use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
57use crate::sst::parquet::format::PrimaryKeyWriteFormat;
58use crate::sst::parquet::helper::parse_parquet_metadata;
59use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
60use crate::sst::{
61 DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
62};
63
64pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
66 path_provider: P,
68 writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
69 current_file: FileId,
71 writer_factory: F,
72 metadata: RegionMetadataRef,
74 index_config: IndexConfig,
76 indexer_builder: I,
78 current_indexer: Option<Indexer>,
80 bytes_written: Arc<AtomicUsize>,
81 file_cleaner: Option<TempFileCleaner>,
83 metrics: &'a mut Metrics,
85}
86
87pub trait WriterFactory {
88 type Writer: AsyncWrite + Send + Unpin;
89 fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
90}
91
92pub struct ObjectStoreWriterFactory {
93 object_store: ObjectStore,
94}
95
96impl WriterFactory for ObjectStoreWriterFactory {
97 type Writer = Compat<FuturesAsyncWriter>;
98
99 async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
100 self.object_store
101 .writer_with(file_path)
102 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
103 .concurrent(DEFAULT_WRITE_CONCURRENCY)
104 .await
105 .map(|v| v.into_futures_async_write().compat_write())
106 .context(OpenDalSnafu)
107 }
108}
109
110impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
111where
112 P: FilePathProvider,
113 I: IndexerBuilder,
114{
115 pub async fn new_with_object_store(
116 object_store: ObjectStore,
117 metadata: RegionMetadataRef,
118 index_config: IndexConfig,
119 indexer_builder: I,
120 path_provider: P,
121 metrics: &'a mut Metrics,
122 ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
123 ParquetWriter::new(
124 ObjectStoreWriterFactory { object_store },
125 metadata,
126 index_config,
127 indexer_builder,
128 path_provider,
129 metrics,
130 )
131 .await
132 }
133
134 pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
135 self.file_cleaner = Some(cleaner);
136 self
137 }
138}
139
140impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
141where
142 F: WriterFactory,
143 I: IndexerBuilder,
144 P: FilePathProvider,
145{
146 pub async fn new(
148 factory: F,
149 metadata: RegionMetadataRef,
150 index_config: IndexConfig,
151 indexer_builder: I,
152 path_provider: P,
153 metrics: &'a mut Metrics,
154 ) -> ParquetWriter<'a, F, I, P> {
155 let init_file = FileId::random();
156 let indexer = indexer_builder.build(init_file, 0).await;
157
158 ParquetWriter {
159 path_provider,
160 writer: None,
161 current_file: init_file,
162 writer_factory: factory,
163 metadata,
164 index_config,
165 indexer_builder,
166 current_indexer: Some(indexer),
167 bytes_written: Arc::new(AtomicUsize::new(0)),
168 file_cleaner: None,
169 metrics,
170 }
171 }
172
173 async fn finish_current_file(
175 &mut self,
176 ssts: &mut SstInfoArray,
177 stats: &mut SourceStats,
178 ) -> Result<()> {
179 if let Some(mut current_writer) = mem::take(&mut self.writer) {
181 let mut stats = mem::take(stats);
182 assert!(stats.num_rows > 0);
184
185 debug!(
186 "Finishing current file {}, file size: {}, num rows: {}",
187 self.current_file,
188 self.bytes_written.load(Ordering::Relaxed),
189 stats.num_rows
190 );
191
192 let mut index_output = IndexOutput::default();
195 match self.index_config.build_mode {
196 IndexBuildMode::Sync => {
197 index_output = self.current_indexer.as_mut().unwrap().finish().await;
198 }
199 IndexBuildMode::Async => {
200 debug!(
201 "Index for file {} will be built asynchronously later",
202 self.current_file
203 );
204 }
205 }
206 current_writer.flush().await.context(WriteParquetSnafu)?;
207
208 let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
209 let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
210
211 let time_range = stats.time_range.unwrap();
213
214 let parquet_metadata = parse_parquet_metadata(file_meta)?;
216 let max_row_group_uncompressed_size: u64 = parquet_metadata
217 .row_groups()
218 .iter()
219 .map(|rg| {
220 rg.columns()
221 .iter()
222 .map(|c| c.uncompressed_size() as u64)
223 .sum::<u64>()
224 })
225 .max()
226 .unwrap_or(0);
227 let num_series = stats.series_estimator.finish();
228 ssts.push(SstInfo {
229 file_id: self.current_file,
230 time_range,
231 file_size,
232 max_row_group_uncompressed_size,
233 num_rows: stats.num_rows,
234 num_row_groups: parquet_metadata.num_row_groups() as u64,
235 file_metadata: Some(Arc::new(parquet_metadata)),
236 index_metadata: index_output,
237 num_series,
238 });
239 self.current_file = FileId::random();
240 self.bytes_written.store(0, Ordering::Relaxed)
241 };
242
243 Ok(())
244 }
245
246 pub async fn write_all(
250 &mut self,
251 source: Source,
252 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
254 ) -> Result<SstInfoArray> {
255 let res = self
256 .write_all_without_cleaning(source, override_sequence, opts)
257 .await;
258 if res.is_err() {
259 let file_id = self.current_file;
261 if let Some(cleaner) = &self.file_cleaner {
262 cleaner.clean_by_file_id(file_id).await;
263 }
264 }
265 res
266 }
267
268 async fn write_all_without_cleaning(
269 &mut self,
270 mut source: Source,
271 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
273 ) -> Result<SstInfoArray> {
274 let mut results = smallvec![];
275 let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone())
276 .with_override_sequence(override_sequence);
277 let mut stats = SourceStats::default();
278
279 while let Some(res) = self
280 .write_next_batch(&mut source, &write_format, opts)
281 .await
282 .transpose()
283 {
284 match res {
285 Ok(mut batch) => {
286 stats.update(&batch);
287 let start = Instant::now();
288 match self.index_config.build_mode {
290 IndexBuildMode::Sync => {
291 self.current_indexer
292 .as_mut()
293 .unwrap()
294 .update(&mut batch)
295 .await;
296 }
297 IndexBuildMode::Async => {}
298 }
299 self.metrics.update_index += start.elapsed();
300 if let Some(max_file_size) = opts.max_file_size
301 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
302 {
303 self.finish_current_file(&mut results, &mut stats).await?;
304 }
305 }
306 Err(e) => {
307 if let Some(indexer) = &mut self.current_indexer {
308 indexer.abort().await;
309 }
310 return Err(e);
311 }
312 }
313 }
314
315 self.finish_current_file(&mut results, &mut stats).await?;
316
317 Ok(results)
319 }
320
321 pub async fn write_all_flat(
325 &mut self,
326 source: FlatSource,
327 opts: &WriteOptions,
328 ) -> Result<SstInfoArray> {
329 let res = self.write_all_flat_without_cleaning(source, opts).await;
330 if res.is_err() {
331 let file_id = self.current_file;
333 if let Some(cleaner) = &self.file_cleaner {
334 cleaner.clean_by_file_id(file_id).await;
335 }
336 }
337 res
338 }
339
340 async fn write_all_flat_without_cleaning(
341 &mut self,
342 mut source: FlatSource,
343 opts: &WriteOptions,
344 ) -> Result<SstInfoArray> {
345 let mut results = smallvec![];
346 let flat_format = FlatWriteFormat::new(
347 self.metadata.clone(),
348 &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
349 )
350 .with_override_sequence(None);
351 let mut stats = SourceStats::default();
352
353 while let Some(record_batch) = self
354 .write_next_flat_batch(&mut source, &flat_format, opts)
355 .await
356 .transpose()
357 {
358 match record_batch {
359 Ok(batch) => {
360 stats.update_flat(&batch)?;
361 let start = Instant::now();
362 self.current_indexer
364 .as_mut()
365 .unwrap()
366 .update_flat(&batch)
367 .await;
368 self.metrics.update_index += start.elapsed();
369 if let Some(max_file_size) = opts.max_file_size
370 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
371 {
372 self.finish_current_file(&mut results, &mut stats).await?;
373 }
374 }
375 Err(e) => {
376 if let Some(indexer) = &mut self.current_indexer {
377 indexer.abort().await;
378 }
379 return Err(e);
380 }
381 }
382 }
383
384 self.finish_current_file(&mut results, &mut stats).await?;
385
386 Ok(results)
388 }
389
390 fn customize_column_config(
392 builder: WriterPropertiesBuilder,
393 region_metadata: &RegionMetadataRef,
394 ) -> WriterPropertiesBuilder {
395 let ts_col = ColumnPath::new(vec![
396 region_metadata
397 .time_index_column()
398 .column_schema
399 .name
400 .clone(),
401 ]);
402 let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
403 let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
404
405 builder
406 .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
407 .set_column_dictionary_enabled(seq_col, false)
408 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
409 .set_column_dictionary_enabled(ts_col, false)
410 .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
411 }
412
413 async fn write_next_batch(
414 &mut self,
415 source: &mut Source,
416 write_format: &PrimaryKeyWriteFormat,
417 opts: &WriteOptions,
418 ) -> Result<Option<Batch>> {
419 let start = Instant::now();
420 let Some(batch) = source.next_batch().await? else {
421 return Ok(None);
422 };
423 self.metrics.iter_source += start.elapsed();
424
425 let arrow_batch = write_format.convert_batch(&batch)?;
426
427 let start = Instant::now();
428 self.maybe_init_writer(write_format.arrow_schema(), opts)
429 .await?
430 .write(&arrow_batch)
431 .await
432 .context(WriteParquetSnafu)?;
433 self.metrics.write_batch += start.elapsed();
434 Ok(Some(batch))
435 }
436
437 async fn write_next_flat_batch(
438 &mut self,
439 source: &mut FlatSource,
440 flat_format: &FlatWriteFormat,
441 opts: &WriteOptions,
442 ) -> Result<Option<RecordBatch>> {
443 let start = Instant::now();
444 let Some(record_batch) = source.next_batch().await? else {
445 return Ok(None);
446 };
447 self.metrics.iter_source += start.elapsed();
448
449 let arrow_batch = flat_format.convert_batch(&record_batch)?;
450
451 let start = Instant::now();
452 self.maybe_init_writer(flat_format.arrow_schema(), opts)
453 .await?
454 .write(&arrow_batch)
455 .await
456 .context(WriteParquetSnafu)?;
457 self.metrics.write_batch += start.elapsed();
458 Ok(Some(record_batch))
459 }
460
461 async fn maybe_init_writer(
462 &mut self,
463 schema: &SchemaRef,
464 opts: &WriteOptions,
465 ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
466 if let Some(ref mut w) = self.writer {
467 Ok(w)
468 } else {
469 let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
470 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
471
472 let props_builder = WriterProperties::builder()
474 .set_key_value_metadata(Some(vec![key_value_meta]))
475 .set_compression(Compression::ZSTD(ZstdLevel::default()))
476 .set_encoding(Encoding::PLAIN)
477 .set_max_row_group_size(opts.row_group_size)
478 .set_column_index_truncate_length(None)
479 .set_statistics_truncate_length(None);
480
481 let props_builder = Self::customize_column_config(props_builder, &self.metadata);
482 let writer_props = props_builder.build();
483
484 let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
485 self.metadata.region_id,
486 self.current_file,
487 ));
488 let writer = SizeAwareWriter::new(
489 self.writer_factory.create(&sst_file_path).await?,
490 self.bytes_written.clone(),
491 );
492 let arrow_writer =
493 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
494 .context(WriteParquetSnafu)?;
495 self.writer = Some(arrow_writer);
496
497 let indexer = self.indexer_builder.build(self.current_file, 0).await;
498 self.current_indexer = Some(indexer);
499
500 Ok(self.writer.as_mut().unwrap())
502 }
503 }
504}
505
506#[derive(Default)]
507struct SourceStats {
508 num_rows: usize,
510 time_range: Option<(Timestamp, Timestamp)>,
512 series_estimator: SeriesEstimator,
514}
515
516impl SourceStats {
517 fn update(&mut self, batch: &Batch) {
518 if batch.is_empty() {
519 return;
520 }
521
522 self.num_rows += batch.num_rows();
523 self.series_estimator.update(batch);
524 let (min_in_batch, max_in_batch) = (
526 batch.first_timestamp().unwrap(),
527 batch.last_timestamp().unwrap(),
528 );
529 if let Some(time_range) = &mut self.time_range {
530 time_range.0 = time_range.0.min(min_in_batch);
531 time_range.1 = time_range.1.max(max_in_batch);
532 } else {
533 self.time_range = Some((min_in_batch, max_in_batch));
534 }
535 }
536
537 fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
538 if record_batch.num_rows() == 0 {
539 return Ok(());
540 }
541
542 self.num_rows += record_batch.num_rows();
543 self.series_estimator.update_flat(record_batch);
544
545 let time_index_col_idx = time_index_column_index(record_batch.num_columns());
547 let timestamp_array = record_batch.column(time_index_col_idx);
548
549 if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
550 if let Some(time_range) = &mut self.time_range {
551 time_range.0 = time_range.0.min(min_in_batch);
552 time_range.1 = time_range.1.max(max_in_batch);
553 } else {
554 self.time_range = Some((min_in_batch, max_in_batch));
555 }
556 }
557
558 Ok(())
559 }
560}
561
562fn timestamp_range_from_array(
564 timestamp_array: &ArrayRef,
565) -> Result<Option<(Timestamp, Timestamp)>> {
566 let (min_ts, max_ts) = match timestamp_array.data_type() {
567 DataType::Timestamp(TimeUnit::Second, _) => {
568 let array = timestamp_array
569 .as_any()
570 .downcast_ref::<TimestampSecondArray>()
571 .unwrap();
572 let min_val = min(array).map(Timestamp::new_second);
573 let max_val = max(array).map(Timestamp::new_second);
574 (min_val, max_val)
575 }
576 DataType::Timestamp(TimeUnit::Millisecond, _) => {
577 let array = timestamp_array
578 .as_any()
579 .downcast_ref::<TimestampMillisecondArray>()
580 .unwrap();
581 let min_val = min(array).map(Timestamp::new_millisecond);
582 let max_val = max(array).map(Timestamp::new_millisecond);
583 (min_val, max_val)
584 }
585 DataType::Timestamp(TimeUnit::Microsecond, _) => {
586 let array = timestamp_array
587 .as_any()
588 .downcast_ref::<TimestampMicrosecondArray>()
589 .unwrap();
590 let min_val = min(array).map(Timestamp::new_microsecond);
591 let max_val = max(array).map(Timestamp::new_microsecond);
592 (min_val, max_val)
593 }
594 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
595 let array = timestamp_array
596 .as_any()
597 .downcast_ref::<TimestampNanosecondArray>()
598 .unwrap();
599 let min_val = min(array).map(Timestamp::new_nanosecond);
600 let max_val = max(array).map(Timestamp::new_nanosecond);
601 (min_val, max_val)
602 }
603 _ => {
604 return UnexpectedSnafu {
605 reason: format!(
606 "Unexpected data type of time index: {:?}",
607 timestamp_array.data_type()
608 ),
609 }
610 .fail();
611 }
612 };
613
614 Ok(min_ts.zip(max_ts))
616}
617
618struct SizeAwareWriter<W> {
621 inner: W,
622 size: Arc<AtomicUsize>,
623}
624
625impl<W> SizeAwareWriter<W> {
626 fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
627 Self {
628 inner,
629 size: size.clone(),
630 }
631 }
632}
633
634impl<W> AsyncWrite for SizeAwareWriter<W>
635where
636 W: AsyncWrite + Unpin,
637{
638 fn poll_write(
639 mut self: Pin<&mut Self>,
640 cx: &mut Context<'_>,
641 buf: &[u8],
642 ) -> Poll<std::result::Result<usize, std::io::Error>> {
643 let this = self.as_mut().get_mut();
644
645 match Pin::new(&mut this.inner).poll_write(cx, buf) {
646 Poll::Ready(Ok(bytes_written)) => {
647 this.size.fetch_add(bytes_written, Ordering::Relaxed);
648 Poll::Ready(Ok(bytes_written))
649 }
650 other => other,
651 }
652 }
653
654 fn poll_flush(
655 mut self: Pin<&mut Self>,
656 cx: &mut Context<'_>,
657 ) -> Poll<std::result::Result<(), std::io::Error>> {
658 Pin::new(&mut self.inner).poll_flush(cx)
659 }
660
661 fn poll_shutdown(
662 mut self: Pin<&mut Self>,
663 cx: &mut Context<'_>,
664 ) -> Poll<std::result::Result<(), std::io::Error>> {
665 Pin::new(&mut self.inner).poll_shutdown(cx)
666 }
667}