1use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28 ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29 TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
44use store_api::storage::{FileId, SequenceNumber};
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::config::{IndexBuildMode, IndexConfig};
50use crate::error::{
51 InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
52};
53use crate::read::{Batch, FlatSource, Source};
54use crate::sst::file::RegionFileId;
55use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
56use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
57use crate::sst::parquet::format::PrimaryKeyWriteFormat;
58use crate::sst::parquet::helper::parse_parquet_metadata;
59use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
60use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions};
61
62pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
64 path_provider: P,
66 writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
67 current_file: FileId,
69 writer_factory: F,
70 metadata: RegionMetadataRef,
72 index_config: IndexConfig,
74 indexer_builder: I,
76 current_indexer: Option<Indexer>,
78 bytes_written: Arc<AtomicUsize>,
79 file_cleaner: Option<TempFileCleaner>,
81 metrics: Metrics,
83}
84
85pub trait WriterFactory {
86 type Writer: AsyncWrite + Send + Unpin;
87 fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
88}
89
90pub struct ObjectStoreWriterFactory {
91 object_store: ObjectStore,
92}
93
94impl WriterFactory for ObjectStoreWriterFactory {
95 type Writer = Compat<FuturesAsyncWriter>;
96
97 async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
98 self.object_store
99 .writer_with(file_path)
100 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
101 .concurrent(DEFAULT_WRITE_CONCURRENCY)
102 .await
103 .map(|v| v.into_futures_async_write().compat_write())
104 .context(OpenDalSnafu)
105 }
106}
107
108impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
109where
110 P: FilePathProvider,
111 I: IndexerBuilder,
112{
113 pub async fn new_with_object_store(
114 object_store: ObjectStore,
115 metadata: RegionMetadataRef,
116 index_config: IndexConfig,
117 indexer_builder: I,
118 path_provider: P,
119 metrics: Metrics,
120 ) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
121 ParquetWriter::new(
122 ObjectStoreWriterFactory { object_store },
123 metadata,
124 index_config,
125 indexer_builder,
126 path_provider,
127 metrics,
128 )
129 .await
130 }
131
132 pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
133 self.file_cleaner = Some(cleaner);
134 self
135 }
136}
137
138impl<F, I, P> ParquetWriter<F, I, P>
139where
140 F: WriterFactory,
141 I: IndexerBuilder,
142 P: FilePathProvider,
143{
144 pub async fn new(
146 factory: F,
147 metadata: RegionMetadataRef,
148 index_config: IndexConfig,
149 indexer_builder: I,
150 path_provider: P,
151 metrics: Metrics,
152 ) -> ParquetWriter<F, I, P> {
153 let init_file = FileId::random();
154 let indexer = indexer_builder.build(init_file).await;
155
156 ParquetWriter {
157 path_provider,
158 writer: None,
159 current_file: init_file,
160 writer_factory: factory,
161 metadata,
162 index_config,
163 indexer_builder,
164 current_indexer: Some(indexer),
165 bytes_written: Arc::new(AtomicUsize::new(0)),
166 file_cleaner: None,
167 metrics,
168 }
169 }
170
171 async fn finish_current_file(
173 &mut self,
174 ssts: &mut SstInfoArray,
175 stats: &mut SourceStats,
176 ) -> Result<()> {
177 if let Some(mut current_writer) = mem::take(&mut self.writer) {
179 let stats = mem::take(stats);
180 assert!(stats.num_rows > 0);
182
183 debug!(
184 "Finishing current file {}, file size: {}, num rows: {}",
185 self.current_file,
186 self.bytes_written.load(Ordering::Relaxed),
187 stats.num_rows
188 );
189
190 let mut index_output = IndexOutput::default();
193 match self.index_config.build_mode {
194 IndexBuildMode::Sync => {
195 index_output = self.current_indexer.as_mut().unwrap().finish().await;
196 }
197 IndexBuildMode::Async => {
198 debug!(
199 "Index for file {} will be built asynchronously later",
200 self.current_file
201 );
202 }
203 }
204 current_writer.flush().await.context(WriteParquetSnafu)?;
205
206 let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
207 let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
208
209 let time_range = stats.time_range.unwrap();
211
212 let parquet_metadata = parse_parquet_metadata(file_meta)?;
214 ssts.push(SstInfo {
215 file_id: self.current_file,
216 time_range,
217 file_size,
218 num_rows: stats.num_rows,
219 num_row_groups: parquet_metadata.num_row_groups() as u64,
220 file_metadata: Some(Arc::new(parquet_metadata)),
221 index_metadata: index_output,
222 });
223 self.current_file = FileId::random();
224 self.bytes_written.store(0, Ordering::Relaxed)
225 };
226
227 Ok(())
228 }
229
230 pub async fn write_all(
234 &mut self,
235 source: Source,
236 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
238 ) -> Result<SstInfoArray> {
239 let res = self
240 .write_all_without_cleaning(source, override_sequence, opts)
241 .await;
242 if res.is_err() {
243 let file_id = self.current_file;
245 if let Some(cleaner) = &self.file_cleaner {
246 cleaner.clean_by_file_id(file_id).await;
247 }
248 }
249 res
250 }
251
252 async fn write_all_without_cleaning(
253 &mut self,
254 mut source: Source,
255 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
257 ) -> Result<SstInfoArray> {
258 let mut results = smallvec![];
259 let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone())
260 .with_override_sequence(override_sequence);
261 let mut stats = SourceStats::default();
262
263 while let Some(res) = self
264 .write_next_batch(&mut source, &write_format, opts)
265 .await
266 .transpose()
267 {
268 match res {
269 Ok(mut batch) => {
270 stats.update(&batch);
271 let start = Instant::now();
272 match self.index_config.build_mode {
274 IndexBuildMode::Sync => {
275 self.current_indexer
276 .as_mut()
277 .unwrap()
278 .update(&mut batch)
279 .await;
280 }
281 IndexBuildMode::Async => {}
282 }
283 self.metrics.update_index += start.elapsed();
284 if let Some(max_file_size) = opts.max_file_size
285 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
286 {
287 self.finish_current_file(&mut results, &mut stats).await?;
288 }
289 }
290 Err(e) => {
291 if let Some(indexer) = &mut self.current_indexer {
292 indexer.abort().await;
293 }
294 return Err(e);
295 }
296 }
297 }
298
299 self.finish_current_file(&mut results, &mut stats).await?;
300
301 Ok(results)
303 }
304
305 pub async fn write_all_flat(
309 &mut self,
310 source: FlatSource,
311 opts: &WriteOptions,
312 ) -> Result<SstInfoArray> {
313 let res = self.write_all_flat_without_cleaning(source, opts).await;
314 if res.is_err() {
315 let file_id = self.current_file;
317 if let Some(cleaner) = &self.file_cleaner {
318 cleaner.clean_by_file_id(file_id).await;
319 }
320 }
321 res
322 }
323
324 async fn write_all_flat_without_cleaning(
325 &mut self,
326 mut source: FlatSource,
327 opts: &WriteOptions,
328 ) -> Result<SstInfoArray> {
329 let mut results = smallvec![];
330 let flat_format = FlatWriteFormat::new(
331 self.metadata.clone(),
332 &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
333 )
334 .with_override_sequence(None);
335 let mut stats = SourceStats::default();
336
337 while let Some(record_batch) = self
338 .write_next_flat_batch(&mut source, &flat_format, opts)
339 .await
340 .transpose()
341 {
342 match record_batch {
343 Ok(batch) => {
344 stats.update_flat(&batch)?;
345 let start = Instant::now();
346 self.current_indexer
348 .as_mut()
349 .unwrap()
350 .update_flat(&batch)
351 .await;
352 self.metrics.update_index += start.elapsed();
353 if let Some(max_file_size) = opts.max_file_size
354 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
355 {
356 self.finish_current_file(&mut results, &mut stats).await?;
357 }
358 }
359 Err(e) => {
360 if let Some(indexer) = &mut self.current_indexer {
361 indexer.abort().await;
362 }
363 return Err(e);
364 }
365 }
366 }
367
368 self.finish_current_file(&mut results, &mut stats).await?;
369
370 Ok(results)
372 }
373
374 fn customize_column_config(
376 builder: WriterPropertiesBuilder,
377 region_metadata: &RegionMetadataRef,
378 ) -> WriterPropertiesBuilder {
379 let ts_col = ColumnPath::new(vec![
380 region_metadata
381 .time_index_column()
382 .column_schema
383 .name
384 .clone(),
385 ]);
386 let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
387
388 builder
389 .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
390 .set_column_dictionary_enabled(seq_col, false)
391 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
392 .set_column_dictionary_enabled(ts_col, false)
393 }
394
395 async fn write_next_batch(
396 &mut self,
397 source: &mut Source,
398 write_format: &PrimaryKeyWriteFormat,
399 opts: &WriteOptions,
400 ) -> Result<Option<Batch>> {
401 let start = Instant::now();
402 let Some(batch) = source.next_batch().await? else {
403 return Ok(None);
404 };
405 self.metrics.iter_source += start.elapsed();
406
407 let arrow_batch = write_format.convert_batch(&batch)?;
408
409 let start = Instant::now();
410 self.maybe_init_writer(write_format.arrow_schema(), opts)
411 .await?
412 .write(&arrow_batch)
413 .await
414 .context(WriteParquetSnafu)?;
415 self.metrics.write_batch += start.elapsed();
416 Ok(Some(batch))
417 }
418
419 async fn write_next_flat_batch(
420 &mut self,
421 source: &mut FlatSource,
422 flat_format: &FlatWriteFormat,
423 opts: &WriteOptions,
424 ) -> Result<Option<RecordBatch>> {
425 let start = Instant::now();
426 let Some(record_batch) = source.next_batch().await? else {
427 return Ok(None);
428 };
429 self.metrics.iter_source += start.elapsed();
430
431 let arrow_batch = flat_format.convert_batch(&record_batch)?;
432
433 let start = Instant::now();
434 self.maybe_init_writer(flat_format.arrow_schema(), opts)
435 .await?
436 .write(&arrow_batch)
437 .await
438 .context(WriteParquetSnafu)?;
439 self.metrics.write_batch += start.elapsed();
440 Ok(Some(record_batch))
441 }
442
443 async fn maybe_init_writer(
444 &mut self,
445 schema: &SchemaRef,
446 opts: &WriteOptions,
447 ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
448 if let Some(ref mut w) = self.writer {
449 Ok(w)
450 } else {
451 let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
452 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
453
454 let props_builder = WriterProperties::builder()
456 .set_key_value_metadata(Some(vec![key_value_meta]))
457 .set_compression(Compression::ZSTD(ZstdLevel::default()))
458 .set_encoding(Encoding::PLAIN)
459 .set_max_row_group_size(opts.row_group_size)
460 .set_column_index_truncate_length(None)
461 .set_statistics_truncate_length(None);
462
463 let props_builder = Self::customize_column_config(props_builder, &self.metadata);
464 let writer_props = props_builder.build();
465
466 let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
467 self.metadata.region_id,
468 self.current_file,
469 ));
470 let writer = SizeAwareWriter::new(
471 self.writer_factory.create(&sst_file_path).await?,
472 self.bytes_written.clone(),
473 );
474 let arrow_writer =
475 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
476 .context(WriteParquetSnafu)?;
477 self.writer = Some(arrow_writer);
478
479 let indexer = self.indexer_builder.build(self.current_file).await;
480 self.current_indexer = Some(indexer);
481
482 Ok(self.writer.as_mut().unwrap())
484 }
485 }
486
487 pub fn into_metrics(self) -> Metrics {
489 self.metrics
490 }
491}
492
493#[derive(Default)]
494struct SourceStats {
495 num_rows: usize,
497 time_range: Option<(Timestamp, Timestamp)>,
499}
500
501impl SourceStats {
502 fn update(&mut self, batch: &Batch) {
503 if batch.is_empty() {
504 return;
505 }
506
507 self.num_rows += batch.num_rows();
508 let (min_in_batch, max_in_batch) = (
510 batch.first_timestamp().unwrap(),
511 batch.last_timestamp().unwrap(),
512 );
513 if let Some(time_range) = &mut self.time_range {
514 time_range.0 = time_range.0.min(min_in_batch);
515 time_range.1 = time_range.1.max(max_in_batch);
516 } else {
517 self.time_range = Some((min_in_batch, max_in_batch));
518 }
519 }
520
521 fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
522 if record_batch.num_rows() == 0 {
523 return Ok(());
524 }
525
526 self.num_rows += record_batch.num_rows();
527
528 let time_index_col_idx = time_index_column_index(record_batch.num_columns());
530 let timestamp_array = record_batch.column(time_index_col_idx);
531
532 if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
533 if let Some(time_range) = &mut self.time_range {
534 time_range.0 = time_range.0.min(min_in_batch);
535 time_range.1 = time_range.1.max(max_in_batch);
536 } else {
537 self.time_range = Some((min_in_batch, max_in_batch));
538 }
539 }
540
541 Ok(())
542 }
543}
544
545fn timestamp_range_from_array(
547 timestamp_array: &ArrayRef,
548) -> Result<Option<(Timestamp, Timestamp)>> {
549 let (min_ts, max_ts) = match timestamp_array.data_type() {
550 DataType::Timestamp(TimeUnit::Second, _) => {
551 let array = timestamp_array
552 .as_any()
553 .downcast_ref::<TimestampSecondArray>()
554 .unwrap();
555 let min_val = min(array).map(Timestamp::new_second);
556 let max_val = max(array).map(Timestamp::new_second);
557 (min_val, max_val)
558 }
559 DataType::Timestamp(TimeUnit::Millisecond, _) => {
560 let array = timestamp_array
561 .as_any()
562 .downcast_ref::<TimestampMillisecondArray>()
563 .unwrap();
564 let min_val = min(array).map(Timestamp::new_millisecond);
565 let max_val = max(array).map(Timestamp::new_millisecond);
566 (min_val, max_val)
567 }
568 DataType::Timestamp(TimeUnit::Microsecond, _) => {
569 let array = timestamp_array
570 .as_any()
571 .downcast_ref::<TimestampMicrosecondArray>()
572 .unwrap();
573 let min_val = min(array).map(Timestamp::new_microsecond);
574 let max_val = max(array).map(Timestamp::new_microsecond);
575 (min_val, max_val)
576 }
577 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
578 let array = timestamp_array
579 .as_any()
580 .downcast_ref::<TimestampNanosecondArray>()
581 .unwrap();
582 let min_val = min(array).map(Timestamp::new_nanosecond);
583 let max_val = max(array).map(Timestamp::new_nanosecond);
584 (min_val, max_val)
585 }
586 _ => {
587 return UnexpectedSnafu {
588 reason: format!(
589 "Unexpected data type of time index: {:?}",
590 timestamp_array.data_type()
591 ),
592 }
593 .fail();
594 }
595 };
596
597 Ok(min_ts.zip(max_ts))
599}
600
601struct SizeAwareWriter<W> {
604 inner: W,
605 size: Arc<AtomicUsize>,
606}
607
608impl<W> SizeAwareWriter<W> {
609 fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
610 Self {
611 inner,
612 size: size.clone(),
613 }
614 }
615}
616
617impl<W> AsyncWrite for SizeAwareWriter<W>
618where
619 W: AsyncWrite + Unpin,
620{
621 fn poll_write(
622 mut self: Pin<&mut Self>,
623 cx: &mut Context<'_>,
624 buf: &[u8],
625 ) -> Poll<std::result::Result<usize, std::io::Error>> {
626 let this = self.as_mut().get_mut();
627
628 match Pin::new(&mut this.inner).poll_write(cx, buf) {
629 Poll::Ready(Ok(bytes_written)) => {
630 this.size.fetch_add(bytes_written, Ordering::Relaxed);
631 Poll::Ready(Ok(bytes_written))
632 }
633 other => other,
634 }
635 }
636
637 fn poll_flush(
638 mut self: Pin<&mut Self>,
639 cx: &mut Context<'_>,
640 ) -> Poll<std::result::Result<(), std::io::Error>> {
641 Pin::new(&mut self.inner).poll_flush(cx)
642 }
643
644 fn poll_shutdown(
645 mut self: Pin<&mut Self>,
646 cx: &mut Context<'_>,
647 ) -> Poll<std::result::Result<(), std::io::Error>> {
648 Pin::new(&mut self.inner).poll_shutdown(cx)
649 }
650}