1use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28 ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29 TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
44use store_api::storage::{FileId, SequenceNumber};
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::config::{IndexBuildMode, IndexConfig};
50use crate::error::{
51 InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
52};
53use crate::read::{Batch, FlatSource, Source};
54use crate::sst::file::RegionFileId;
55use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
56use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
57use crate::sst::parquet::format::PrimaryKeyWriteFormat;
58use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
59use crate::sst::{
60 DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
61};
62
63pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
65 path_provider: P,
67 writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
68 current_file: FileId,
70 writer_factory: F,
71 metadata: RegionMetadataRef,
73 index_config: IndexConfig,
75 indexer_builder: I,
77 current_indexer: Option<Indexer>,
79 bytes_written: Arc<AtomicUsize>,
80 file_cleaner: Option<TempFileCleaner>,
82 metrics: &'a mut Metrics,
84}
85
86pub trait WriterFactory {
87 type Writer: AsyncWrite + Send + Unpin;
88 fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
89}
90
91pub struct ObjectStoreWriterFactory {
92 object_store: ObjectStore,
93}
94
95impl WriterFactory for ObjectStoreWriterFactory {
96 type Writer = Compat<FuturesAsyncWriter>;
97
98 async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
99 self.object_store
100 .writer_with(file_path)
101 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
102 .concurrent(DEFAULT_WRITE_CONCURRENCY)
103 .await
104 .map(|v| v.into_futures_async_write().compat_write())
105 .context(OpenDalSnafu)
106 }
107}
108
109impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
110where
111 P: FilePathProvider,
112 I: IndexerBuilder,
113{
114 pub async fn new_with_object_store(
115 object_store: ObjectStore,
116 metadata: RegionMetadataRef,
117 index_config: IndexConfig,
118 indexer_builder: I,
119 path_provider: P,
120 metrics: &'a mut Metrics,
121 ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
122 ParquetWriter::new(
123 ObjectStoreWriterFactory { object_store },
124 metadata,
125 index_config,
126 indexer_builder,
127 path_provider,
128 metrics,
129 )
130 .await
131 }
132
133 pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
134 self.file_cleaner = Some(cleaner);
135 self
136 }
137}
138
139impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
140where
141 F: WriterFactory,
142 I: IndexerBuilder,
143 P: FilePathProvider,
144{
145 pub async fn new(
147 factory: F,
148 metadata: RegionMetadataRef,
149 index_config: IndexConfig,
150 indexer_builder: I,
151 path_provider: P,
152 metrics: &'a mut Metrics,
153 ) -> ParquetWriter<'a, F, I, P> {
154 let init_file = FileId::random();
155 let indexer = indexer_builder.build(init_file, 0).await;
156
157 ParquetWriter {
158 path_provider,
159 writer: None,
160 current_file: init_file,
161 writer_factory: factory,
162 metadata,
163 index_config,
164 indexer_builder,
165 current_indexer: Some(indexer),
166 bytes_written: Arc::new(AtomicUsize::new(0)),
167 file_cleaner: None,
168 metrics,
169 }
170 }
171
172 async fn finish_current_file(
174 &mut self,
175 ssts: &mut SstInfoArray,
176 stats: &mut SourceStats,
177 ) -> Result<()> {
178 if let Some(mut current_writer) = mem::take(&mut self.writer) {
180 let mut stats = mem::take(stats);
181 assert!(stats.num_rows > 0);
183
184 debug!(
185 "Finishing current file {}, file size: {}, num rows: {}",
186 self.current_file,
187 self.bytes_written.load(Ordering::Relaxed),
188 stats.num_rows
189 );
190
191 let mut index_output = IndexOutput::default();
194 match self.index_config.build_mode {
195 IndexBuildMode::Sync => {
196 index_output = self.current_indexer.as_mut().unwrap().finish().await;
197 }
198 IndexBuildMode::Async => {
199 debug!(
200 "Index for file {} will be built asynchronously later",
201 self.current_file
202 );
203 }
204 }
205 current_writer.flush().await.context(WriteParquetSnafu)?;
206
207 let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
208 let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
209
210 let time_range = stats.time_range.unwrap();
212
213 let max_row_group_uncompressed_size: u64 = parquet_metadata
214 .row_groups()
215 .iter()
216 .map(|rg| {
217 rg.columns()
218 .iter()
219 .map(|c| c.uncompressed_size() as u64)
220 .sum::<u64>()
221 })
222 .max()
223 .unwrap_or(0);
224 let num_series = stats.series_estimator.finish();
225 ssts.push(SstInfo {
226 file_id: self.current_file,
227 time_range,
228 file_size,
229 max_row_group_uncompressed_size,
230 num_rows: stats.num_rows,
231 num_row_groups: parquet_metadata.num_row_groups() as u64,
232 file_metadata: Some(Arc::new(parquet_metadata)),
233 index_metadata: index_output,
234 num_series,
235 });
236 self.current_file = FileId::random();
237 self.bytes_written.store(0, Ordering::Relaxed)
238 };
239
240 Ok(())
241 }
242
243 pub async fn write_all(
247 &mut self,
248 source: Source,
249 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
251 ) -> Result<SstInfoArray> {
252 let res = self
253 .write_all_without_cleaning(source, override_sequence, opts)
254 .await;
255 if res.is_err() {
256 let file_id = self.current_file;
258 if let Some(cleaner) = &self.file_cleaner {
259 cleaner.clean_by_file_id(file_id).await;
260 }
261 }
262 res
263 }
264
265 async fn write_all_without_cleaning(
266 &mut self,
267 mut source: Source,
268 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
270 ) -> Result<SstInfoArray> {
271 let mut results = smallvec![];
272 let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone())
273 .with_override_sequence(override_sequence);
274 let mut stats = SourceStats::default();
275
276 while let Some(res) = self
277 .write_next_batch(&mut source, &write_format, opts)
278 .await
279 .transpose()
280 {
281 match res {
282 Ok(mut batch) => {
283 stats.update(&batch);
284 let start = Instant::now();
285 match self.index_config.build_mode {
287 IndexBuildMode::Sync => {
288 self.current_indexer
289 .as_mut()
290 .unwrap()
291 .update(&mut batch)
292 .await;
293 }
294 IndexBuildMode::Async => {}
295 }
296 self.metrics.update_index += start.elapsed();
297 if let Some(max_file_size) = opts.max_file_size
298 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
299 {
300 self.finish_current_file(&mut results, &mut stats).await?;
301 }
302 }
303 Err(e) => {
304 if let Some(indexer) = &mut self.current_indexer {
305 indexer.abort().await;
306 }
307 return Err(e);
308 }
309 }
310 }
311
312 self.finish_current_file(&mut results, &mut stats).await?;
313
314 Ok(results)
316 }
317
318 pub async fn write_all_flat(
322 &mut self,
323 source: FlatSource,
324 opts: &WriteOptions,
325 ) -> Result<SstInfoArray> {
326 let res = self.write_all_flat_without_cleaning(source, opts).await;
327 if res.is_err() {
328 let file_id = self.current_file;
330 if let Some(cleaner) = &self.file_cleaner {
331 cleaner.clean_by_file_id(file_id).await;
332 }
333 }
334 res
335 }
336
337 async fn write_all_flat_without_cleaning(
338 &mut self,
339 mut source: FlatSource,
340 opts: &WriteOptions,
341 ) -> Result<SstInfoArray> {
342 let mut results = smallvec![];
343 let flat_format = FlatWriteFormat::new(
344 self.metadata.clone(),
345 &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
346 )
347 .with_override_sequence(None);
348 let mut stats = SourceStats::default();
349
350 while let Some(record_batch) = self
351 .write_next_flat_batch(&mut source, &flat_format, opts)
352 .await
353 .transpose()
354 {
355 match record_batch {
356 Ok(batch) => {
357 stats.update_flat(&batch)?;
358 let start = Instant::now();
359 self.current_indexer
361 .as_mut()
362 .unwrap()
363 .update_flat(&batch)
364 .await;
365 self.metrics.update_index += start.elapsed();
366 if let Some(max_file_size) = opts.max_file_size
367 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
368 {
369 self.finish_current_file(&mut results, &mut stats).await?;
370 }
371 }
372 Err(e) => {
373 if let Some(indexer) = &mut self.current_indexer {
374 indexer.abort().await;
375 }
376 return Err(e);
377 }
378 }
379 }
380
381 self.finish_current_file(&mut results, &mut stats).await?;
382
383 Ok(results)
385 }
386
387 fn customize_column_config(
389 builder: WriterPropertiesBuilder,
390 region_metadata: &RegionMetadataRef,
391 ) -> WriterPropertiesBuilder {
392 let ts_col = ColumnPath::new(vec![
393 region_metadata
394 .time_index_column()
395 .column_schema
396 .name
397 .clone(),
398 ]);
399 let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
400 let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
401
402 builder
403 .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
404 .set_column_dictionary_enabled(seq_col, false)
405 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
406 .set_column_dictionary_enabled(ts_col, false)
407 .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
408 }
409
410 async fn write_next_batch(
411 &mut self,
412 source: &mut Source,
413 write_format: &PrimaryKeyWriteFormat,
414 opts: &WriteOptions,
415 ) -> Result<Option<Batch>> {
416 let start = Instant::now();
417 let Some(batch) = source.next_batch().await? else {
418 return Ok(None);
419 };
420 self.metrics.iter_source += start.elapsed();
421
422 let arrow_batch = write_format.convert_batch(&batch)?;
423
424 let start = Instant::now();
425 self.maybe_init_writer(write_format.arrow_schema(), opts)
426 .await?
427 .write(&arrow_batch)
428 .await
429 .context(WriteParquetSnafu)?;
430 self.metrics.write_batch += start.elapsed();
431 Ok(Some(batch))
432 }
433
434 async fn write_next_flat_batch(
435 &mut self,
436 source: &mut FlatSource,
437 flat_format: &FlatWriteFormat,
438 opts: &WriteOptions,
439 ) -> Result<Option<RecordBatch>> {
440 let start = Instant::now();
441 let Some(record_batch) = source.next_batch().await? else {
442 return Ok(None);
443 };
444 self.metrics.iter_source += start.elapsed();
445
446 let arrow_batch = flat_format.convert_batch(&record_batch)?;
447
448 let start = Instant::now();
449 self.maybe_init_writer(flat_format.arrow_schema(), opts)
450 .await?
451 .write(&arrow_batch)
452 .await
453 .context(WriteParquetSnafu)?;
454 self.metrics.write_batch += start.elapsed();
455 Ok(Some(record_batch))
456 }
457
458 async fn maybe_init_writer(
459 &mut self,
460 schema: &SchemaRef,
461 opts: &WriteOptions,
462 ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
463 if let Some(ref mut w) = self.writer {
464 Ok(w)
465 } else {
466 let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
467 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
468
469 let props_builder = WriterProperties::builder()
471 .set_key_value_metadata(Some(vec![key_value_meta]))
472 .set_compression(Compression::ZSTD(ZstdLevel::default()))
473 .set_encoding(Encoding::PLAIN)
474 .set_max_row_group_size(opts.row_group_size)
475 .set_column_index_truncate_length(None)
476 .set_statistics_truncate_length(None);
477
478 let props_builder = Self::customize_column_config(props_builder, &self.metadata);
479 let writer_props = props_builder.build();
480
481 let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
482 self.metadata.region_id,
483 self.current_file,
484 ));
485 let writer = SizeAwareWriter::new(
486 self.writer_factory.create(&sst_file_path).await?,
487 self.bytes_written.clone(),
488 );
489 let arrow_writer =
490 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
491 .context(WriteParquetSnafu)?;
492 self.writer = Some(arrow_writer);
493
494 let indexer = self.indexer_builder.build(self.current_file, 0).await;
495 self.current_indexer = Some(indexer);
496
497 Ok(self.writer.as_mut().unwrap())
499 }
500 }
501}
502
503#[derive(Default)]
504struct SourceStats {
505 num_rows: usize,
507 time_range: Option<(Timestamp, Timestamp)>,
509 series_estimator: SeriesEstimator,
511}
512
513impl SourceStats {
514 fn update(&mut self, batch: &Batch) {
515 if batch.is_empty() {
516 return;
517 }
518
519 self.num_rows += batch.num_rows();
520 self.series_estimator.update(batch);
521 let (min_in_batch, max_in_batch) = (
523 batch.first_timestamp().unwrap(),
524 batch.last_timestamp().unwrap(),
525 );
526 if let Some(time_range) = &mut self.time_range {
527 time_range.0 = time_range.0.min(min_in_batch);
528 time_range.1 = time_range.1.max(max_in_batch);
529 } else {
530 self.time_range = Some((min_in_batch, max_in_batch));
531 }
532 }
533
534 fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
535 if record_batch.num_rows() == 0 {
536 return Ok(());
537 }
538
539 self.num_rows += record_batch.num_rows();
540 self.series_estimator.update_flat(record_batch);
541
542 let time_index_col_idx = time_index_column_index(record_batch.num_columns());
544 let timestamp_array = record_batch.column(time_index_col_idx);
545
546 if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
547 if let Some(time_range) = &mut self.time_range {
548 time_range.0 = time_range.0.min(min_in_batch);
549 time_range.1 = time_range.1.max(max_in_batch);
550 } else {
551 self.time_range = Some((min_in_batch, max_in_batch));
552 }
553 }
554
555 Ok(())
556 }
557}
558
559fn timestamp_range_from_array(
561 timestamp_array: &ArrayRef,
562) -> Result<Option<(Timestamp, Timestamp)>> {
563 let (min_ts, max_ts) = match timestamp_array.data_type() {
564 DataType::Timestamp(TimeUnit::Second, _) => {
565 let array = timestamp_array
566 .as_any()
567 .downcast_ref::<TimestampSecondArray>()
568 .unwrap();
569 let min_val = min(array).map(Timestamp::new_second);
570 let max_val = max(array).map(Timestamp::new_second);
571 (min_val, max_val)
572 }
573 DataType::Timestamp(TimeUnit::Millisecond, _) => {
574 let array = timestamp_array
575 .as_any()
576 .downcast_ref::<TimestampMillisecondArray>()
577 .unwrap();
578 let min_val = min(array).map(Timestamp::new_millisecond);
579 let max_val = max(array).map(Timestamp::new_millisecond);
580 (min_val, max_val)
581 }
582 DataType::Timestamp(TimeUnit::Microsecond, _) => {
583 let array = timestamp_array
584 .as_any()
585 .downcast_ref::<TimestampMicrosecondArray>()
586 .unwrap();
587 let min_val = min(array).map(Timestamp::new_microsecond);
588 let max_val = max(array).map(Timestamp::new_microsecond);
589 (min_val, max_val)
590 }
591 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
592 let array = timestamp_array
593 .as_any()
594 .downcast_ref::<TimestampNanosecondArray>()
595 .unwrap();
596 let min_val = min(array).map(Timestamp::new_nanosecond);
597 let max_val = max(array).map(Timestamp::new_nanosecond);
598 (min_val, max_val)
599 }
600 _ => {
601 return UnexpectedSnafu {
602 reason: format!(
603 "Unexpected data type of time index: {:?}",
604 timestamp_array.data_type()
605 ),
606 }
607 .fail();
608 }
609 };
610
611 Ok(min_ts.zip(max_ts))
613}
614
615struct SizeAwareWriter<W> {
618 inner: W,
619 size: Arc<AtomicUsize>,
620}
621
622impl<W> SizeAwareWriter<W> {
623 fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
624 Self {
625 inner,
626 size: size.clone(),
627 }
628 }
629}
630
631impl<W> AsyncWrite for SizeAwareWriter<W>
632where
633 W: AsyncWrite + Unpin,
634{
635 fn poll_write(
636 mut self: Pin<&mut Self>,
637 cx: &mut Context<'_>,
638 buf: &[u8],
639 ) -> Poll<std::result::Result<usize, std::io::Error>> {
640 let this = self.as_mut().get_mut();
641
642 match Pin::new(&mut this.inner).poll_write(cx, buf) {
643 Poll::Ready(Ok(bytes_written)) => {
644 this.size.fetch_add(bytes_written, Ordering::Relaxed);
645 Poll::Ready(Ok(bytes_written))
646 }
647 other => other,
648 }
649 }
650
651 fn poll_flush(
652 mut self: Pin<&mut Self>,
653 cx: &mut Context<'_>,
654 ) -> Poll<std::result::Result<(), std::io::Error>> {
655 Pin::new(&mut self.inner).poll_flush(cx)
656 }
657
658 fn poll_shutdown(
659 mut self: Pin<&mut Self>,
660 cx: &mut Context<'_>,
661 ) -> Poll<std::result::Result<(), std::io::Error>> {
662 Pin::new(&mut self.inner).poll_shutdown(cx)
663 }
664}