1use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28 ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29 TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
44use store_api::storage::{FileId, SequenceNumber};
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::config::{IndexBuildMode, IndexConfig};
50use crate::error::{
51 InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
52};
53use crate::read::FlatSource;
54use crate::sst::file::RegionFileId;
55use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
56use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
57use crate::sst::parquet::format::PrimaryKeyWriteFormat;
58use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
59use crate::sst::{
60 DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
61};
62
63enum FlatBatchConverter {
65 Flat(FlatWriteFormat),
67 PrimaryKey {
69 format: PrimaryKeyWriteFormat,
70 num_fields: usize,
71 },
72}
73
74impl FlatBatchConverter {
75 fn arrow_schema(&self) -> &SchemaRef {
76 match self {
77 FlatBatchConverter::Flat(f) => f.arrow_schema(),
78 FlatBatchConverter::PrimaryKey { format, .. } => format.arrow_schema(),
79 }
80 }
81
82 fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
83 match self {
84 FlatBatchConverter::Flat(f) => f.convert_batch(batch),
85 FlatBatchConverter::PrimaryKey { format, num_fields } => {
86 format.convert_flat_batch(batch, *num_fields)
87 }
88 }
89 }
90}
91
92pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
94 path_provider: P,
96 writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
97 current_file: FileId,
99 writer_factory: F,
100 metadata: RegionMetadataRef,
102 index_config: IndexConfig,
104 indexer_builder: I,
106 current_indexer: Option<Indexer>,
108 bytes_written: Arc<AtomicUsize>,
109 file_cleaner: Option<TempFileCleaner>,
111 metrics: &'a mut Metrics,
113}
114
115pub trait WriterFactory {
116 type Writer: AsyncWrite + Send + Unpin;
117 fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
118}
119
120pub struct ObjectStoreWriterFactory {
121 object_store: ObjectStore,
122}
123
124impl WriterFactory for ObjectStoreWriterFactory {
125 type Writer = Compat<FuturesAsyncWriter>;
126
127 async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
128 self.object_store
129 .writer_with(file_path)
130 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
131 .concurrent(DEFAULT_WRITE_CONCURRENCY)
132 .await
133 .map(|v| v.into_futures_async_write().compat_write())
134 .context(OpenDalSnafu)
135 }
136}
137
138impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
139where
140 P: FilePathProvider,
141 I: IndexerBuilder,
142{
143 pub async fn new_with_object_store(
144 object_store: ObjectStore,
145 metadata: RegionMetadataRef,
146 index_config: IndexConfig,
147 indexer_builder: I,
148 path_provider: P,
149 metrics: &'a mut Metrics,
150 ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
151 ParquetWriter::new(
152 ObjectStoreWriterFactory { object_store },
153 metadata,
154 index_config,
155 indexer_builder,
156 path_provider,
157 metrics,
158 )
159 .await
160 }
161
162 pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
163 self.file_cleaner = Some(cleaner);
164 self
165 }
166}
167
168impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
169where
170 F: WriterFactory,
171 I: IndexerBuilder,
172 P: FilePathProvider,
173{
174 pub async fn new(
176 factory: F,
177 metadata: RegionMetadataRef,
178 index_config: IndexConfig,
179 indexer_builder: I,
180 path_provider: P,
181 metrics: &'a mut Metrics,
182 ) -> ParquetWriter<'a, F, I, P> {
183 let init_file = FileId::random();
184 let indexer = indexer_builder.build(init_file, 0).await;
185
186 ParquetWriter {
187 path_provider,
188 writer: None,
189 current_file: init_file,
190 writer_factory: factory,
191 metadata,
192 index_config,
193 indexer_builder,
194 current_indexer: Some(indexer),
195 bytes_written: Arc::new(AtomicUsize::new(0)),
196 file_cleaner: None,
197 metrics,
198 }
199 }
200
201 async fn finish_current_file(
203 &mut self,
204 ssts: &mut SstInfoArray,
205 stats: &mut SourceStats,
206 ) -> Result<()> {
207 if let Some(mut current_writer) = mem::take(&mut self.writer) {
209 let mut stats = mem::take(stats);
210 assert!(stats.num_rows > 0);
212
213 debug!(
214 "Finishing current file {}, file size: {}, num rows: {}",
215 self.current_file,
216 self.bytes_written.load(Ordering::Relaxed),
217 stats.num_rows
218 );
219
220 let mut index_output = IndexOutput::default();
223 match self.index_config.build_mode {
224 IndexBuildMode::Sync => {
225 index_output = self.current_indexer.as_mut().unwrap().finish().await;
226 }
227 IndexBuildMode::Async => {
228 debug!(
229 "Index for file {} will be built asynchronously later",
230 self.current_file
231 );
232 }
233 }
234 current_writer.flush().await.context(WriteParquetSnafu)?;
235
236 let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
237 let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
238
239 let time_range = stats.time_range.unwrap();
241
242 let max_row_group_uncompressed_size: u64 = parquet_metadata
243 .row_groups()
244 .iter()
245 .map(|rg| {
246 rg.columns()
247 .iter()
248 .map(|c| c.uncompressed_size() as u64)
249 .sum::<u64>()
250 })
251 .max()
252 .unwrap_or(0);
253 let num_series = stats.series_estimator.finish();
254 ssts.push(SstInfo {
255 file_id: self.current_file,
256 time_range,
257 file_size,
258 max_row_group_uncompressed_size,
259 num_rows: stats.num_rows,
260 num_row_groups: parquet_metadata.num_row_groups() as u64,
261 file_metadata: Some(Arc::new(parquet_metadata)),
262 index_metadata: index_output,
263 num_series,
264 });
265 self.current_file = FileId::random();
266 self.bytes_written.store(0, Ordering::Relaxed)
267 };
268
269 Ok(())
270 }
271
272 pub async fn write_all_flat(
276 &mut self,
277 source: FlatSource,
278 override_sequence: Option<SequenceNumber>,
279 opts: &WriteOptions,
280 ) -> Result<SstInfoArray> {
281 let converter = FlatBatchConverter::Flat(
282 FlatWriteFormat::new(
283 self.metadata.clone(),
284 &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
285 )
286 .with_override_sequence(override_sequence),
287 );
288 let res = self.write_all_flat_inner(source, &converter, opts).await;
289 if res.is_err() {
290 let file_id = self.current_file;
291 if let Some(cleaner) = &self.file_cleaner {
292 cleaner.clean_by_file_id(file_id).await;
293 }
294 }
295 res
296 }
297
298 pub async fn write_all_flat_as_primary_key(
302 &mut self,
303 source: FlatSource,
304 override_sequence: Option<SequenceNumber>,
305 opts: &WriteOptions,
306 ) -> Result<SstInfoArray> {
307 let num_fields = self.metadata.field_columns().count();
308 let converter = FlatBatchConverter::PrimaryKey {
309 format: PrimaryKeyWriteFormat::new(self.metadata.clone())
310 .with_override_sequence(override_sequence),
311 num_fields,
312 };
313 let res = self.write_all_flat_inner(source, &converter, opts).await;
314 if res.is_err() {
315 let file_id = self.current_file;
316 if let Some(cleaner) = &self.file_cleaner {
317 cleaner.clean_by_file_id(file_id).await;
318 }
319 }
320 res
321 }
322
323 async fn write_all_flat_inner(
324 &mut self,
325 mut source: FlatSource,
326 converter: &FlatBatchConverter,
327 opts: &WriteOptions,
328 ) -> Result<SstInfoArray> {
329 let mut results = smallvec![];
330 let mut stats = SourceStats::default();
331
332 while let Some(record_batch) = self
333 .write_next_flat_batch(&mut source, converter, opts)
334 .await
335 .transpose()
336 {
337 match record_batch {
338 Ok(batch) => {
339 stats.update_flat(&batch)?;
340 if matches!(self.index_config.build_mode, IndexBuildMode::Sync) {
341 let start = Instant::now();
342 self.current_indexer
344 .as_mut()
345 .unwrap()
346 .update_flat(&batch)
347 .await;
348 self.metrics.update_index += start.elapsed();
349 }
350 if let Some(max_file_size) = opts.max_file_size
351 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
352 {
353 self.finish_current_file(&mut results, &mut stats).await?;
354 }
355 }
356 Err(e) => {
357 if let Some(indexer) = &mut self.current_indexer {
358 indexer.abort().await;
359 }
360 return Err(e);
361 }
362 }
363 }
364
365 self.finish_current_file(&mut results, &mut stats).await?;
366
367 Ok(results)
369 }
370
371 fn customize_column_config(
373 builder: WriterPropertiesBuilder,
374 region_metadata: &RegionMetadataRef,
375 ) -> WriterPropertiesBuilder {
376 let ts_col = ColumnPath::new(vec![
377 region_metadata
378 .time_index_column()
379 .column_schema
380 .name
381 .clone(),
382 ]);
383 let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
384 let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
385
386 builder
387 .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
388 .set_column_dictionary_enabled(seq_col, false)
389 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
390 .set_column_dictionary_enabled(ts_col, false)
391 .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
392 }
393
394 async fn write_next_flat_batch(
395 &mut self,
396 source: &mut FlatSource,
397 converter: &FlatBatchConverter,
398 opts: &WriteOptions,
399 ) -> Result<Option<RecordBatch>> {
400 let start = Instant::now();
401 let Some(record_batch) = source.next_batch().await? else {
402 return Ok(None);
403 };
404 self.metrics.iter_source += start.elapsed();
405
406 let arrow_batch = converter.convert_batch(&record_batch)?;
407
408 let start = Instant::now();
409 self.maybe_init_writer(converter.arrow_schema(), opts)
410 .await?
411 .write(&arrow_batch)
412 .await
413 .context(WriteParquetSnafu)?;
414 self.metrics.write_batch += start.elapsed();
415 Ok(Some(record_batch))
417 }
418
419 async fn maybe_init_writer(
420 &mut self,
421 schema: &SchemaRef,
422 opts: &WriteOptions,
423 ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
424 if let Some(ref mut w) = self.writer {
425 Ok(w)
426 } else {
427 let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
428 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
429
430 let props_builder = WriterProperties::builder()
432 .set_key_value_metadata(Some(vec![key_value_meta]))
433 .set_compression(Compression::ZSTD(ZstdLevel::default()))
434 .set_encoding(Encoding::PLAIN)
435 .set_max_row_group_size(opts.row_group_size)
436 .set_column_index_truncate_length(None)
437 .set_statistics_truncate_length(None);
438
439 let props_builder = Self::customize_column_config(props_builder, &self.metadata);
440 let writer_props = props_builder.build();
441
442 let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
443 self.metadata.region_id,
444 self.current_file,
445 ));
446 let writer = SizeAwareWriter::new(
447 self.writer_factory.create(&sst_file_path).await?,
448 self.bytes_written.clone(),
449 );
450 let arrow_writer =
451 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
452 .context(WriteParquetSnafu)?;
453 self.writer = Some(arrow_writer);
454
455 let indexer = self.indexer_builder.build(self.current_file, 0).await;
456 self.current_indexer = Some(indexer);
457
458 Ok(self.writer.as_mut().unwrap())
460 }
461 }
462}
463
464#[derive(Default)]
465struct SourceStats {
466 num_rows: usize,
468 time_range: Option<(Timestamp, Timestamp)>,
470 series_estimator: SeriesEstimator,
472}
473
474impl SourceStats {
475 fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
476 if record_batch.num_rows() == 0 {
477 return Ok(());
478 }
479
480 self.num_rows += record_batch.num_rows();
481 self.series_estimator.update_flat(record_batch);
482
483 let time_index_col_idx = time_index_column_index(record_batch.num_columns());
485 let timestamp_array = record_batch.column(time_index_col_idx);
486
487 if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
488 if let Some(time_range) = &mut self.time_range {
489 time_range.0 = time_range.0.min(min_in_batch);
490 time_range.1 = time_range.1.max(max_in_batch);
491 } else {
492 self.time_range = Some((min_in_batch, max_in_batch));
493 }
494 }
495
496 Ok(())
497 }
498}
499
500fn timestamp_range_from_array(
502 timestamp_array: &ArrayRef,
503) -> Result<Option<(Timestamp, Timestamp)>> {
504 let (min_ts, max_ts) = match timestamp_array.data_type() {
505 DataType::Timestamp(TimeUnit::Second, _) => {
506 let array = timestamp_array
507 .as_any()
508 .downcast_ref::<TimestampSecondArray>()
509 .unwrap();
510 let min_val = min(array).map(Timestamp::new_second);
511 let max_val = max(array).map(Timestamp::new_second);
512 (min_val, max_val)
513 }
514 DataType::Timestamp(TimeUnit::Millisecond, _) => {
515 let array = timestamp_array
516 .as_any()
517 .downcast_ref::<TimestampMillisecondArray>()
518 .unwrap();
519 let min_val = min(array).map(Timestamp::new_millisecond);
520 let max_val = max(array).map(Timestamp::new_millisecond);
521 (min_val, max_val)
522 }
523 DataType::Timestamp(TimeUnit::Microsecond, _) => {
524 let array = timestamp_array
525 .as_any()
526 .downcast_ref::<TimestampMicrosecondArray>()
527 .unwrap();
528 let min_val = min(array).map(Timestamp::new_microsecond);
529 let max_val = max(array).map(Timestamp::new_microsecond);
530 (min_val, max_val)
531 }
532 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
533 let array = timestamp_array
534 .as_any()
535 .downcast_ref::<TimestampNanosecondArray>()
536 .unwrap();
537 let min_val = min(array).map(Timestamp::new_nanosecond);
538 let max_val = max(array).map(Timestamp::new_nanosecond);
539 (min_val, max_val)
540 }
541 _ => {
542 return UnexpectedSnafu {
543 reason: format!(
544 "Unexpected data type of time index: {:?}",
545 timestamp_array.data_type()
546 ),
547 }
548 .fail();
549 }
550 };
551
552 Ok(min_ts.zip(max_ts))
554}
555
556struct SizeAwareWriter<W> {
559 inner: W,
560 size: Arc<AtomicUsize>,
561}
562
563impl<W> SizeAwareWriter<W> {
564 fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
565 Self {
566 inner,
567 size: size.clone(),
568 }
569 }
570}
571
572impl<W> AsyncWrite for SizeAwareWriter<W>
573where
574 W: AsyncWrite + Unpin,
575{
576 fn poll_write(
577 mut self: Pin<&mut Self>,
578 cx: &mut Context<'_>,
579 buf: &[u8],
580 ) -> Poll<std::result::Result<usize, std::io::Error>> {
581 let this = self.as_mut().get_mut();
582
583 match Pin::new(&mut this.inner).poll_write(cx, buf) {
584 Poll::Ready(Ok(bytes_written)) => {
585 this.size.fetch_add(bytes_written, Ordering::Relaxed);
586 Poll::Ready(Ok(bytes_written))
587 }
588 other => other,
589 }
590 }
591
592 fn poll_flush(
593 mut self: Pin<&mut Self>,
594 cx: &mut Context<'_>,
595 ) -> Poll<std::result::Result<(), std::io::Error>> {
596 Pin::new(&mut self.inner).poll_flush(cx)
597 }
598
599 fn poll_shutdown(
600 mut self: Pin<&mut Self>,
601 cx: &mut Context<'_>,
602 ) -> Poll<std::result::Result<(), std::io::Error>> {
603 Pin::new(&mut self.inner).poll_shutdown(cx)
604 }
605}