1use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::Arc;
21use std::sync::atomic::{AtomicUsize, Ordering};
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::array::{
28 ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
29 TimestampSecondArray,
30};
31use datatypes::arrow::compute::{max, min};
32use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
33use datatypes::arrow::record_batch::RecordBatch;
34use object_store::{FuturesAsyncWriter, ObjectStore};
35use parquet::arrow::AsyncArrowWriter;
36use parquet::basic::{Compression, Encoding, ZstdLevel};
37use parquet::file::metadata::KeyValue;
38use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
39use parquet::schema::types::ColumnPath;
40use smallvec::smallvec;
41use snafu::ResultExt;
42use store_api::metadata::RegionMetadataRef;
43use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
44use store_api::storage::{FileId, SequenceNumber};
45use tokio::io::AsyncWrite;
46use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
47
48use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
49use crate::config::{IndexBuildMode, IndexConfig};
50use crate::error::{
51 InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
52};
53use crate::read::FlatSource;
54use crate::sst::file::RegionFileId;
55use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
56use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
57use crate::sst::parquet::format::PrimaryKeyWriteFormat;
58use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
59use crate::sst::{
60 DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
61};
62
63enum FlatBatchConverter {
65 Flat(FlatWriteFormat),
67 PrimaryKey {
69 format: PrimaryKeyWriteFormat,
70 num_fields: usize,
71 },
72}
73
74impl FlatBatchConverter {
75 fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
76 match self {
77 FlatBatchConverter::Flat(f) => f.convert_batch(batch),
78 FlatBatchConverter::PrimaryKey { format, num_fields } => {
79 format.convert_flat_batch(batch, *num_fields)
80 }
81 }
82 }
83}
84
85pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
87 path_provider: P,
89 writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
90 current_file: FileId,
92 writer_factory: F,
93 metadata: RegionMetadataRef,
95 index_config: IndexConfig,
97 indexer_builder: I,
99 current_indexer: Option<Indexer>,
101 bytes_written: Arc<AtomicUsize>,
102 file_cleaner: Option<TempFileCleaner>,
104 metrics: &'a mut Metrics,
106}
107
108pub trait WriterFactory {
109 type Writer: AsyncWrite + Send + Unpin;
110 fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
111}
112
113pub struct ObjectStoreWriterFactory {
114 object_store: ObjectStore,
115}
116
117impl WriterFactory for ObjectStoreWriterFactory {
118 type Writer = Compat<FuturesAsyncWriter>;
119
120 async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
121 self.object_store
122 .writer_with(file_path)
123 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
124 .concurrent(DEFAULT_WRITE_CONCURRENCY)
125 .await
126 .map(|v| v.into_futures_async_write().compat_write())
127 .context(OpenDalSnafu)
128 }
129}
130
131impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
132where
133 P: FilePathProvider,
134 I: IndexerBuilder,
135{
136 pub async fn new_with_object_store(
137 object_store: ObjectStore,
138 metadata: RegionMetadataRef,
139 index_config: IndexConfig,
140 indexer_builder: I,
141 path_provider: P,
142 metrics: &'a mut Metrics,
143 ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
144 ParquetWriter::new(
145 ObjectStoreWriterFactory { object_store },
146 metadata,
147 index_config,
148 indexer_builder,
149 path_provider,
150 metrics,
151 )
152 .await
153 }
154
155 pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
156 self.file_cleaner = Some(cleaner);
157 self
158 }
159}
160
161impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
162where
163 F: WriterFactory,
164 I: IndexerBuilder,
165 P: FilePathProvider,
166{
167 pub async fn new(
169 factory: F,
170 metadata: RegionMetadataRef,
171 index_config: IndexConfig,
172 indexer_builder: I,
173 path_provider: P,
174 metrics: &'a mut Metrics,
175 ) -> ParquetWriter<'a, F, I, P> {
176 let init_file = FileId::random();
177 let indexer = indexer_builder.build(init_file, 0).await;
178
179 ParquetWriter {
180 path_provider,
181 writer: None,
182 current_file: init_file,
183 writer_factory: factory,
184 metadata,
185 index_config,
186 indexer_builder,
187 current_indexer: Some(indexer),
188 bytes_written: Arc::new(AtomicUsize::new(0)),
189 file_cleaner: None,
190 metrics,
191 }
192 }
193
194 async fn finish_current_file(
196 &mut self,
197 ssts: &mut SstInfoArray,
198 stats: &mut SourceStats,
199 ) -> Result<()> {
200 if let Some(mut current_writer) = mem::take(&mut self.writer) {
202 let mut stats = mem::take(stats);
203 assert!(stats.num_rows > 0);
205
206 debug!(
207 "Finishing current file {}, file size: {}, num rows: {}",
208 self.current_file,
209 self.bytes_written.load(Ordering::Relaxed),
210 stats.num_rows
211 );
212
213 let mut index_output = IndexOutput::default();
216 match self.index_config.build_mode {
217 IndexBuildMode::Sync => {
218 index_output = self.current_indexer.as_mut().unwrap().finish().await;
219 }
220 IndexBuildMode::Async => {
221 debug!(
222 "Index for file {} will be built asynchronously later",
223 self.current_file
224 );
225 }
226 }
227 current_writer.flush().await.context(WriteParquetSnafu)?;
228
229 let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
230 let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
231
232 let time_range = stats.time_range.unwrap();
234
235 let max_row_group_uncompressed_size: u64 = parquet_metadata
236 .row_groups()
237 .iter()
238 .map(|rg| {
239 rg.columns()
240 .iter()
241 .map(|c| c.uncompressed_size() as u64)
242 .sum::<u64>()
243 })
244 .max()
245 .unwrap_or(0);
246 let num_series = stats.series_estimator.finish();
247 ssts.push(SstInfo {
248 file_id: self.current_file,
249 time_range,
250 file_size,
251 max_row_group_uncompressed_size,
252 num_rows: stats.num_rows,
253 num_row_groups: parquet_metadata.num_row_groups() as u64,
254 file_metadata: Some(Arc::new(parquet_metadata)),
255 index_metadata: index_output,
256 num_series,
257 });
258 self.current_file = FileId::random();
259 self.bytes_written.store(0, Ordering::Relaxed)
260 };
261
262 Ok(())
263 }
264
265 pub async fn write_all_flat(
269 &mut self,
270 source: FlatSource,
271 override_sequence: Option<SequenceNumber>,
272 opts: &WriteOptions,
273 ) -> Result<SstInfoArray> {
274 let converter = FlatBatchConverter::Flat(
275 FlatWriteFormat::new(
276 self.metadata.clone(),
277 &FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding),
278 )
279 .with_override_sequence(override_sequence),
280 );
281 let res = self.write_all_flat_inner(source, &converter, opts).await;
282 if res.is_err() {
283 let file_id = self.current_file;
284 if let Some(cleaner) = &self.file_cleaner {
285 cleaner.clean_by_file_id(file_id).await;
286 }
287 }
288 res
289 }
290
291 pub async fn write_all_flat_as_primary_key(
295 &mut self,
296 source: FlatSource,
297 override_sequence: Option<SequenceNumber>,
298 opts: &WriteOptions,
299 ) -> Result<SstInfoArray> {
300 let num_fields = self.metadata.field_columns().count();
301 let converter = FlatBatchConverter::PrimaryKey {
302 format: PrimaryKeyWriteFormat::new(self.metadata.clone())
303 .with_override_sequence(override_sequence),
304 num_fields,
305 };
306 let res = self.write_all_flat_inner(source, &converter, opts).await;
307 if res.is_err() {
308 let file_id = self.current_file;
309 if let Some(cleaner) = &self.file_cleaner {
310 cleaner.clean_by_file_id(file_id).await;
311 }
312 }
313 res
314 }
315
316 async fn write_all_flat_inner(
317 &mut self,
318 mut source: FlatSource,
319 converter: &FlatBatchConverter,
320 opts: &WriteOptions,
321 ) -> Result<SstInfoArray> {
322 let mut results = smallvec![];
323 let mut stats = SourceStats::default();
324
325 while let Some(record_batch) = self
326 .write_next_flat_batch(&mut source, converter, opts)
327 .await
328 .transpose()
329 {
330 match record_batch {
331 Ok(batch) => {
332 stats.update_flat(&batch)?;
333 if matches!(self.index_config.build_mode, IndexBuildMode::Sync) {
334 let start = Instant::now();
335 self.current_indexer
337 .as_mut()
338 .unwrap()
339 .update_flat(&batch)
340 .await;
341 self.metrics.update_index += start.elapsed();
342 }
343 if let Some(max_file_size) = opts.max_file_size
344 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
345 {
346 self.finish_current_file(&mut results, &mut stats).await?;
347 }
348 }
349 Err(e) => {
350 if let Some(indexer) = &mut self.current_indexer {
351 indexer.abort().await;
352 }
353 return Err(e);
354 }
355 }
356 }
357
358 self.finish_current_file(&mut results, &mut stats).await?;
359
360 Ok(results)
362 }
363
364 fn customize_column_config(
366 builder: WriterPropertiesBuilder,
367 region_metadata: &RegionMetadataRef,
368 ) -> WriterPropertiesBuilder {
369 let ts_col = ColumnPath::new(vec![
370 region_metadata
371 .time_index_column()
372 .column_schema
373 .name
374 .clone(),
375 ]);
376 let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
377 let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
378
379 builder
380 .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
381 .set_column_dictionary_enabled(seq_col, false)
382 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
383 .set_column_dictionary_enabled(ts_col, false)
384 .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
385 }
386
387 async fn write_next_flat_batch(
388 &mut self,
389 source: &mut FlatSource,
390 converter: &FlatBatchConverter,
391 opts: &WriteOptions,
392 ) -> Result<Option<RecordBatch>> {
393 let start = Instant::now();
394 let Some(record_batch) = source.next_batch().await? else {
395 return Ok(None);
396 };
397 self.metrics.iter_source += start.elapsed();
398
399 let arrow_batch = converter.convert_batch(&record_batch)?;
400
401 let start = Instant::now();
402 self.maybe_init_writer(arrow_batch.schema_ref(), opts)
403 .await?
404 .write(&arrow_batch)
405 .await
406 .context(WriteParquetSnafu)?;
407 self.metrics.write_batch += start.elapsed();
408 Ok(Some(record_batch))
410 }
411
412 async fn maybe_init_writer(
413 &mut self,
414 schema: &SchemaRef,
415 opts: &WriteOptions,
416 ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
417 if let Some(ref mut w) = self.writer {
418 Ok(w)
419 } else {
420 let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
421 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
422
423 let props_builder = WriterProperties::builder()
425 .set_key_value_metadata(Some(vec![key_value_meta]))
426 .set_compression(Compression::ZSTD(ZstdLevel::default()))
427 .set_encoding(Encoding::PLAIN)
428 .set_max_row_group_size(opts.row_group_size)
429 .set_column_index_truncate_length(None)
430 .set_statistics_truncate_length(None);
431
432 let props_builder = Self::customize_column_config(props_builder, &self.metadata);
433 let writer_props = props_builder.build();
434
435 let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
436 self.metadata.region_id,
437 self.current_file,
438 ));
439 let writer = SizeAwareWriter::new(
440 self.writer_factory.create(&sst_file_path).await?,
441 self.bytes_written.clone(),
442 );
443 let arrow_writer =
444 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
445 .context(WriteParquetSnafu)?;
446 self.writer = Some(arrow_writer);
447
448 let indexer = self.indexer_builder.build(self.current_file, 0).await;
449 self.current_indexer = Some(indexer);
450
451 Ok(self.writer.as_mut().unwrap())
453 }
454 }
455}
456
457#[derive(Default)]
458struct SourceStats {
459 num_rows: usize,
461 time_range: Option<(Timestamp, Timestamp)>,
463 series_estimator: SeriesEstimator,
465}
466
467impl SourceStats {
468 fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
469 if record_batch.num_rows() == 0 {
470 return Ok(());
471 }
472
473 self.num_rows += record_batch.num_rows();
474 self.series_estimator.update_flat(record_batch);
475
476 let time_index_col_idx = time_index_column_index(record_batch.num_columns());
478 let timestamp_array = record_batch.column(time_index_col_idx);
479
480 if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
481 if let Some(time_range) = &mut self.time_range {
482 time_range.0 = time_range.0.min(min_in_batch);
483 time_range.1 = time_range.1.max(max_in_batch);
484 } else {
485 self.time_range = Some((min_in_batch, max_in_batch));
486 }
487 }
488
489 Ok(())
490 }
491}
492
493fn timestamp_range_from_array(
495 timestamp_array: &ArrayRef,
496) -> Result<Option<(Timestamp, Timestamp)>> {
497 let (min_ts, max_ts) = match timestamp_array.data_type() {
498 DataType::Timestamp(TimeUnit::Second, _) => {
499 let array = timestamp_array
500 .as_any()
501 .downcast_ref::<TimestampSecondArray>()
502 .unwrap();
503 let min_val = min(array).map(Timestamp::new_second);
504 let max_val = max(array).map(Timestamp::new_second);
505 (min_val, max_val)
506 }
507 DataType::Timestamp(TimeUnit::Millisecond, _) => {
508 let array = timestamp_array
509 .as_any()
510 .downcast_ref::<TimestampMillisecondArray>()
511 .unwrap();
512 let min_val = min(array).map(Timestamp::new_millisecond);
513 let max_val = max(array).map(Timestamp::new_millisecond);
514 (min_val, max_val)
515 }
516 DataType::Timestamp(TimeUnit::Microsecond, _) => {
517 let array = timestamp_array
518 .as_any()
519 .downcast_ref::<TimestampMicrosecondArray>()
520 .unwrap();
521 let min_val = min(array).map(Timestamp::new_microsecond);
522 let max_val = max(array).map(Timestamp::new_microsecond);
523 (min_val, max_val)
524 }
525 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
526 let array = timestamp_array
527 .as_any()
528 .downcast_ref::<TimestampNanosecondArray>()
529 .unwrap();
530 let min_val = min(array).map(Timestamp::new_nanosecond);
531 let max_val = max(array).map(Timestamp::new_nanosecond);
532 (min_val, max_val)
533 }
534 _ => {
535 return UnexpectedSnafu {
536 reason: format!(
537 "Unexpected data type of time index: {:?}",
538 timestamp_array.data_type()
539 ),
540 }
541 .fail();
542 }
543 };
544
545 Ok(min_ts.zip(max_ts))
547}
548
549struct SizeAwareWriter<W> {
552 inner: W,
553 size: Arc<AtomicUsize>,
554}
555
556impl<W> SizeAwareWriter<W> {
557 fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
558 Self {
559 inner,
560 size: size.clone(),
561 }
562 }
563}
564
565impl<W> AsyncWrite for SizeAwareWriter<W>
566where
567 W: AsyncWrite + Unpin,
568{
569 fn poll_write(
570 mut self: Pin<&mut Self>,
571 cx: &mut Context<'_>,
572 buf: &[u8],
573 ) -> Poll<std::result::Result<usize, std::io::Error>> {
574 let this = self.as_mut().get_mut();
575
576 match Pin::new(&mut this.inner).poll_write(cx, buf) {
577 Poll::Ready(Ok(bytes_written)) => {
578 this.size.fetch_add(bytes_written, Ordering::Relaxed);
579 Poll::Ready(Ok(bytes_written))
580 }
581 other => other,
582 }
583 }
584
585 fn poll_flush(
586 mut self: Pin<&mut Self>,
587 cx: &mut Context<'_>,
588 ) -> Poll<std::result::Result<(), std::io::Error>> {
589 Pin::new(&mut self.inner).poll_flush(cx)
590 }
591
592 fn poll_shutdown(
593 mut self: Pin<&mut Self>,
594 cx: &mut Context<'_>,
595 ) -> Poll<std::result::Result<(), std::io::Error>> {
596 Pin::new(&mut self.inner).poll_shutdown(cx)
597 }
598}