1use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28 ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29 TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
44use store_api::storage::SequenceNumber;
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::error::{
50 InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
51};
52use crate::read::{Batch, FlatSource, Source};
53use crate::sst::file::{FileId, RegionFileId};
54use crate::sst::index::{Indexer, IndexerBuilder};
55use crate::sst::parquet::flat_format::{time_index_column_index, FlatWriteFormat};
56use crate::sst::parquet::format::PrimaryKeyWriteFormat;
57use crate::sst::parquet::helper::parse_parquet_metadata;
58use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
59use crate::sst::{FlatSchemaOptions, DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
60
61pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
63 path_provider: P,
65 writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
66 current_file: FileId,
68 writer_factory: F,
69 metadata: RegionMetadataRef,
71 indexer_builder: I,
73 current_indexer: Option<Indexer>,
75 bytes_written: Arc<AtomicUsize>,
76 file_cleaner: Option<TempFileCleaner>,
78 metrics: Metrics,
80}
81
82pub trait WriterFactory {
83 type Writer: AsyncWrite + Send + Unpin;
84 fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
85}
86
87pub struct ObjectStoreWriterFactory {
88 object_store: ObjectStore,
89}
90
91impl WriterFactory for ObjectStoreWriterFactory {
92 type Writer = Compat<FuturesAsyncWriter>;
93
94 async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
95 self.object_store
96 .writer_with(file_path)
97 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
98 .concurrent(DEFAULT_WRITE_CONCURRENCY)
99 .await
100 .map(|v| v.into_futures_async_write().compat_write())
101 .context(OpenDalSnafu)
102 }
103}
104
105impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
106where
107 P: FilePathProvider,
108 I: IndexerBuilder,
109{
110 pub async fn new_with_object_store(
111 object_store: ObjectStore,
112 metadata: RegionMetadataRef,
113 indexer_builder: I,
114 path_provider: P,
115 metrics: Metrics,
116 ) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
117 ParquetWriter::new(
118 ObjectStoreWriterFactory { object_store },
119 metadata,
120 indexer_builder,
121 path_provider,
122 metrics,
123 )
124 .await
125 }
126
127 pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
128 self.file_cleaner = Some(cleaner);
129 self
130 }
131}
132
133impl<F, I, P> ParquetWriter<F, I, P>
134where
135 F: WriterFactory,
136 I: IndexerBuilder,
137 P: FilePathProvider,
138{
139 pub async fn new(
141 factory: F,
142 metadata: RegionMetadataRef,
143 indexer_builder: I,
144 path_provider: P,
145 metrics: Metrics,
146 ) -> ParquetWriter<F, I, P> {
147 let init_file = FileId::random();
148 let indexer = indexer_builder.build(init_file).await;
149
150 ParquetWriter {
151 path_provider,
152 writer: None,
153 current_file: init_file,
154 writer_factory: factory,
155 metadata,
156 indexer_builder,
157 current_indexer: Some(indexer),
158 bytes_written: Arc::new(AtomicUsize::new(0)),
159 file_cleaner: None,
160 metrics,
161 }
162 }
163
164 async fn finish_current_file(
166 &mut self,
167 ssts: &mut SstInfoArray,
168 stats: &mut SourceStats,
169 ) -> Result<()> {
170 if let Some(mut current_writer) = mem::take(&mut self.writer) {
172 let stats = mem::take(stats);
173 assert!(stats.num_rows > 0);
175
176 debug!(
177 "Finishing current file {}, file size: {}, num rows: {}",
178 self.current_file,
179 self.bytes_written.load(Ordering::Relaxed),
180 stats.num_rows
181 );
182
183 let index_output = self.current_indexer.as_mut().unwrap().finish().await;
186 current_writer.flush().await.context(WriteParquetSnafu)?;
187
188 let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
189 let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
190
191 let time_range = stats.time_range.unwrap();
193
194 let parquet_metadata = parse_parquet_metadata(file_meta)?;
196 ssts.push(SstInfo {
197 file_id: self.current_file,
198 time_range,
199 file_size,
200 num_rows: stats.num_rows,
201 num_row_groups: parquet_metadata.num_row_groups() as u64,
202 file_metadata: Some(Arc::new(parquet_metadata)),
203 index_metadata: index_output,
204 });
205 self.current_file = FileId::random();
206 self.bytes_written.store(0, Ordering::Relaxed)
207 };
208
209 Ok(())
210 }
211
212 pub async fn write_all(
216 &mut self,
217 source: Source,
218 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
220 ) -> Result<SstInfoArray> {
221 let res = self
222 .write_all_without_cleaning(source, override_sequence, opts)
223 .await;
224 if res.is_err() {
225 let file_id = self.current_file;
227 if let Some(cleaner) = &self.file_cleaner {
228 cleaner.clean_by_file_id(file_id).await;
229 }
230 }
231 res
232 }
233
234 async fn write_all_without_cleaning(
235 &mut self,
236 mut source: Source,
237 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
239 ) -> Result<SstInfoArray> {
240 let mut results = smallvec![];
241 let write_format = PrimaryKeyWriteFormat::new(self.metadata.clone())
242 .with_override_sequence(override_sequence);
243 let mut stats = SourceStats::default();
244
245 while let Some(res) = self
246 .write_next_batch(&mut source, &write_format, opts)
247 .await
248 .transpose()
249 {
250 match res {
251 Ok(mut batch) => {
252 stats.update(&batch);
253 let start = Instant::now();
254 self.current_indexer
256 .as_mut()
257 .unwrap()
258 .update(&mut batch)
259 .await;
260 self.metrics.update_index += start.elapsed();
261 if let Some(max_file_size) = opts.max_file_size
262 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
263 {
264 self.finish_current_file(&mut results, &mut stats).await?;
265 }
266 }
267 Err(e) => {
268 if let Some(indexer) = &mut self.current_indexer {
269 indexer.abort().await;
270 }
271 return Err(e);
272 }
273 }
274 }
275
276 self.finish_current_file(&mut results, &mut stats).await?;
277
278 Ok(results)
280 }
281
282 pub async fn write_all_flat(
286 &mut self,
287 source: FlatSource,
288 opts: &WriteOptions,
289 ) -> Result<SstInfoArray> {
290 let res = self.write_all_flat_without_cleaning(source, opts).await;
291 if res.is_err() {
292 let file_id = self.current_file;
294 if let Some(cleaner) = &self.file_cleaner {
295 cleaner.clean_by_file_id(file_id).await;
296 }
297 }
298 res
299 }
300
301 async fn write_all_flat_without_cleaning(
302 &mut self,
303 mut source: FlatSource,
304 opts: &WriteOptions,
305 ) -> Result<SstInfoArray> {
306 let mut results = smallvec![];
307 let flat_format =
308 FlatWriteFormat::new(self.metadata.clone(), &FlatSchemaOptions::default())
309 .with_override_sequence(None);
310 let mut stats = SourceStats::default();
311
312 while let Some(record_batch) = self
313 .write_next_flat_batch(&mut source, &flat_format, opts)
314 .await
315 .transpose()
316 {
317 match record_batch {
318 Ok(batch) => {
319 stats.update_flat(&batch)?;
320 let start = Instant::now();
321 self.current_indexer
323 .as_mut()
324 .unwrap()
325 .update_flat(&batch)
326 .await;
327 self.metrics.update_index += start.elapsed();
328 if let Some(max_file_size) = opts.max_file_size
329 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
330 {
331 self.finish_current_file(&mut results, &mut stats).await?;
332 }
333 }
334 Err(e) => {
335 if let Some(indexer) = &mut self.current_indexer {
336 indexer.abort().await;
337 }
338 return Err(e);
339 }
340 }
341 }
342
343 self.finish_current_file(&mut results, &mut stats).await?;
344
345 Ok(results)
347 }
348
349 fn customize_column_config(
351 builder: WriterPropertiesBuilder,
352 region_metadata: &RegionMetadataRef,
353 ) -> WriterPropertiesBuilder {
354 let ts_col = ColumnPath::new(vec![region_metadata
355 .time_index_column()
356 .column_schema
357 .name
358 .clone()]);
359 let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
360
361 builder
362 .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
363 .set_column_dictionary_enabled(seq_col, false)
364 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
365 .set_column_dictionary_enabled(ts_col, false)
366 }
367
368 async fn write_next_batch(
369 &mut self,
370 source: &mut Source,
371 write_format: &PrimaryKeyWriteFormat,
372 opts: &WriteOptions,
373 ) -> Result<Option<Batch>> {
374 let start = Instant::now();
375 let Some(batch) = source.next_batch().await? else {
376 return Ok(None);
377 };
378 self.metrics.iter_source += start.elapsed();
379
380 let arrow_batch = write_format.convert_batch(&batch)?;
381
382 let start = Instant::now();
383 self.maybe_init_writer(write_format.arrow_schema(), opts)
384 .await?
385 .write(&arrow_batch)
386 .await
387 .context(WriteParquetSnafu)?;
388 self.metrics.write_batch += start.elapsed();
389 Ok(Some(batch))
390 }
391
392 async fn write_next_flat_batch(
393 &mut self,
394 source: &mut FlatSource,
395 flat_format: &FlatWriteFormat,
396 opts: &WriteOptions,
397 ) -> Result<Option<RecordBatch>> {
398 let start = Instant::now();
399 let Some(record_batch) = source.next_batch().await? else {
400 return Ok(None);
401 };
402 self.metrics.iter_source += start.elapsed();
403
404 let arrow_batch = flat_format.convert_batch(&record_batch)?;
405
406 let start = Instant::now();
407 self.maybe_init_writer(flat_format.arrow_schema(), opts)
408 .await?
409 .write(&arrow_batch)
410 .await
411 .context(WriteParquetSnafu)?;
412 self.metrics.write_batch += start.elapsed();
413 Ok(Some(record_batch))
414 }
415
416 async fn maybe_init_writer(
417 &mut self,
418 schema: &SchemaRef,
419 opts: &WriteOptions,
420 ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
421 if let Some(ref mut w) = self.writer {
422 Ok(w)
423 } else {
424 let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
425 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
426
427 let props_builder = WriterProperties::builder()
429 .set_key_value_metadata(Some(vec![key_value_meta]))
430 .set_compression(Compression::ZSTD(ZstdLevel::default()))
431 .set_encoding(Encoding::PLAIN)
432 .set_max_row_group_size(opts.row_group_size);
433
434 let props_builder = Self::customize_column_config(props_builder, &self.metadata);
435 let writer_props = props_builder.build();
436
437 let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
438 self.metadata.region_id,
439 self.current_file,
440 ));
441 let writer = SizeAwareWriter::new(
442 self.writer_factory.create(&sst_file_path).await?,
443 self.bytes_written.clone(),
444 );
445 let arrow_writer =
446 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
447 .context(WriteParquetSnafu)?;
448 self.writer = Some(arrow_writer);
449
450 let indexer = self.indexer_builder.build(self.current_file).await;
451 self.current_indexer = Some(indexer);
452
453 Ok(self.writer.as_mut().unwrap())
455 }
456 }
457
458 pub fn into_metrics(self) -> Metrics {
460 self.metrics
461 }
462}
463
464#[derive(Default)]
465struct SourceStats {
466 num_rows: usize,
468 time_range: Option<(Timestamp, Timestamp)>,
470}
471
472impl SourceStats {
473 fn update(&mut self, batch: &Batch) {
474 if batch.is_empty() {
475 return;
476 }
477
478 self.num_rows += batch.num_rows();
479 let (min_in_batch, max_in_batch) = (
481 batch.first_timestamp().unwrap(),
482 batch.last_timestamp().unwrap(),
483 );
484 if let Some(time_range) = &mut self.time_range {
485 time_range.0 = time_range.0.min(min_in_batch);
486 time_range.1 = time_range.1.max(max_in_batch);
487 } else {
488 self.time_range = Some((min_in_batch, max_in_batch));
489 }
490 }
491
492 fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
493 if record_batch.num_rows() == 0 {
494 return Ok(());
495 }
496
497 self.num_rows += record_batch.num_rows();
498
499 let time_index_col_idx = time_index_column_index(record_batch.num_columns());
501 let timestamp_array = record_batch.column(time_index_col_idx);
502
503 if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
504 if let Some(time_range) = &mut self.time_range {
505 time_range.0 = time_range.0.min(min_in_batch);
506 time_range.1 = time_range.1.max(max_in_batch);
507 } else {
508 self.time_range = Some((min_in_batch, max_in_batch));
509 }
510 }
511
512 Ok(())
513 }
514}
515
516fn timestamp_range_from_array(
518 timestamp_array: &ArrayRef,
519) -> Result<Option<(Timestamp, Timestamp)>> {
520 let (min_ts, max_ts) = match timestamp_array.data_type() {
521 DataType::Timestamp(TimeUnit::Second, _) => {
522 let array = timestamp_array
523 .as_any()
524 .downcast_ref::<TimestampSecondArray>()
525 .unwrap();
526 let min_val = min(array).map(Timestamp::new_second);
527 let max_val = max(array).map(Timestamp::new_second);
528 (min_val, max_val)
529 }
530 DataType::Timestamp(TimeUnit::Millisecond, _) => {
531 let array = timestamp_array
532 .as_any()
533 .downcast_ref::<TimestampMillisecondArray>()
534 .unwrap();
535 let min_val = min(array).map(Timestamp::new_millisecond);
536 let max_val = max(array).map(Timestamp::new_millisecond);
537 (min_val, max_val)
538 }
539 DataType::Timestamp(TimeUnit::Microsecond, _) => {
540 let array = timestamp_array
541 .as_any()
542 .downcast_ref::<TimestampMicrosecondArray>()
543 .unwrap();
544 let min_val = min(array).map(Timestamp::new_microsecond);
545 let max_val = max(array).map(Timestamp::new_microsecond);
546 (min_val, max_val)
547 }
548 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
549 let array = timestamp_array
550 .as_any()
551 .downcast_ref::<TimestampNanosecondArray>()
552 .unwrap();
553 let min_val = min(array).map(Timestamp::new_nanosecond);
554 let max_val = max(array).map(Timestamp::new_nanosecond);
555 (min_val, max_val)
556 }
557 _ => {
558 return UnexpectedSnafu {
559 reason: format!(
560 "Unexpected data type of time index: {:?}",
561 timestamp_array.data_type()
562 ),
563 }
564 .fail()
565 }
566 };
567
568 Ok(min_ts.zip(max_ts))
570}
571
572struct SizeAwareWriter<W> {
575 inner: W,
576 size: Arc<AtomicUsize>,
577}
578
579impl<W> SizeAwareWriter<W> {
580 fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
581 Self {
582 inner,
583 size: size.clone(),
584 }
585 }
586}
587
588impl<W> AsyncWrite for SizeAwareWriter<W>
589where
590 W: AsyncWrite + Unpin,
591{
592 fn poll_write(
593 mut self: Pin<&mut Self>,
594 cx: &mut Context<'_>,
595 buf: &[u8],
596 ) -> Poll<std::result::Result<usize, std::io::Error>> {
597 let this = self.as_mut().get_mut();
598
599 match Pin::new(&mut this.inner).poll_write(cx, buf) {
600 Poll::Ready(Ok(bytes_written)) => {
601 this.size.fetch_add(bytes_written, Ordering::Relaxed);
602 Poll::Ready(Ok(bytes_written))
603 }
604 other => other,
605 }
606 }
607
608 fn poll_flush(
609 mut self: Pin<&mut Self>,
610 cx: &mut Context<'_>,
611 ) -> Poll<std::result::Result<(), std::io::Error>> {
612 Pin::new(&mut self.inner).poll_flush(cx)
613 }
614
615 fn poll_shutdown(
616 mut self: Pin<&mut Self>,
617 cx: &mut Context<'_>,
618 ) -> Poll<std::result::Result<(), std::io::Error>> {
619 Pin::new(&mut self.inner).poll_shutdown(cx)
620 }
621}