1use std::collections::HashMap;
18use std::future::Future;
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::task::{Context, Poll};
24use std::time::Instant;
25
26use common_telemetry::debug;
27use common_time::Timestamp;
28use datatypes::arrow::array::{
29 ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30 TimestampSecondArray,
31};
32use datatypes::arrow::compute::{max, min};
33use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
34use datatypes::arrow::record_batch::RecordBatch;
35use datatypes::extension::json::is_json_extension_type;
36use datatypes::schema::ext::ArrowSchemaExt;
37use object_store::{FuturesAsyncWriter, ObjectStore};
38use parquet::arrow::AsyncArrowWriter;
39use parquet::basic::{Compression, Encoding, ZstdLevel};
40use parquet::file::metadata::KeyValue;
41use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
42use parquet::schema::types::ColumnPath;
43use smallvec::smallvec;
44use snafu::ResultExt;
45use store_api::metadata::RegionMetadataRef;
46use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
47use store_api::storage::{FileId, SequenceNumber};
48use tokio::io::AsyncWrite;
49use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
50
51use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
52use crate::config::{IndexBuildMode, IndexConfig};
53use crate::error::{
54 InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
55};
56use crate::read::FlatSource;
57use crate::sst::file::RegionFileId;
58use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
59use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
60use crate::sst::parquet::format::PrimaryKeyWriteFormat;
61use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
62use crate::sst::{
63 DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
64};
65
66enum FlatBatchConverter {
68 Flat(FlatWriteFormat),
70 PrimaryKey {
72 format: PrimaryKeyWriteFormat,
73 num_fields: usize,
74 },
75}
76
77impl FlatBatchConverter {
78 fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
79 match self {
80 FlatBatchConverter::Flat(f) => f.convert_batch(batch),
81 FlatBatchConverter::PrimaryKey { format, num_fields } => {
82 format.convert_flat_batch(batch, *num_fields)
83 }
84 }
85 }
86}
87
88pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
90 path_provider: P,
92 writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
93 current_file: FileId,
95 writer_factory: F,
96 metadata: RegionMetadataRef,
98 index_config: IndexConfig,
100 indexer_builder: I,
102 current_indexer: Option<Indexer>,
104 bytes_written: Arc<AtomicUsize>,
105 file_cleaner: Option<TempFileCleaner>,
107 metrics: &'a mut Metrics,
109}
110
111pub trait WriterFactory {
112 type Writer: AsyncWrite + Send + Unpin;
113 fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
114}
115
116pub struct ObjectStoreWriterFactory {
117 object_store: ObjectStore,
118}
119
120impl WriterFactory for ObjectStoreWriterFactory {
121 type Writer = Compat<FuturesAsyncWriter>;
122
123 async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
124 self.object_store
125 .writer_with(file_path)
126 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
127 .concurrent(DEFAULT_WRITE_CONCURRENCY)
128 .await
129 .map(|v| v.into_futures_async_write().compat_write())
130 .context(OpenDalSnafu)
131 }
132}
133
134impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
135where
136 P: FilePathProvider,
137 I: IndexerBuilder,
138{
139 pub async fn new_with_object_store(
140 object_store: ObjectStore,
141 metadata: RegionMetadataRef,
142 index_config: IndexConfig,
143 indexer_builder: I,
144 path_provider: P,
145 metrics: &'a mut Metrics,
146 ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
147 ParquetWriter::new(
148 ObjectStoreWriterFactory { object_store },
149 metadata,
150 index_config,
151 indexer_builder,
152 path_provider,
153 metrics,
154 )
155 .await
156 }
157
158 pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
159 self.file_cleaner = Some(cleaner);
160 self
161 }
162}
163
164impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
165where
166 F: WriterFactory,
167 I: IndexerBuilder,
168 P: FilePathProvider,
169{
170 pub async fn new(
172 factory: F,
173 metadata: RegionMetadataRef,
174 index_config: IndexConfig,
175 indexer_builder: I,
176 path_provider: P,
177 metrics: &'a mut Metrics,
178 ) -> ParquetWriter<'a, F, I, P> {
179 let init_file = FileId::random();
180 let indexer = indexer_builder.build(init_file, 0).await;
181
182 ParquetWriter {
183 path_provider,
184 writer: None,
185 current_file: init_file,
186 writer_factory: factory,
187 metadata,
188 index_config,
189 indexer_builder,
190 current_indexer: Some(indexer),
191 bytes_written: Arc::new(AtomicUsize::new(0)),
192 file_cleaner: None,
193 metrics,
194 }
195 }
196
197 async fn finish_current_file(
199 &mut self,
200 ssts: &mut SstInfoArray,
201 stats: &mut SourceStats,
202 ) -> Result<()> {
203 if let Some(mut current_writer) = mem::take(&mut self.writer) {
205 let mut stats = mem::take(stats);
206 assert!(stats.num_rows > 0);
208
209 debug!(
210 "Finishing current file {}, file size: {}, num rows: {}",
211 self.current_file,
212 self.bytes_written.load(Ordering::Relaxed),
213 stats.num_rows
214 );
215
216 let mut index_output = IndexOutput::default();
219 match self.index_config.build_mode {
220 IndexBuildMode::Sync => {
221 index_output = self.current_indexer.as_mut().unwrap().finish().await;
222 }
223 IndexBuildMode::Async => {
224 debug!(
225 "Index for file {} will be built asynchronously later",
226 self.current_file
227 );
228 }
229 }
230 current_writer.flush().await.context(WriteParquetSnafu)?;
231
232 let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
233 let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
234
235 let time_range = stats.time_range.unwrap();
237
238 let max_row_group_uncompressed_size: u64 = parquet_metadata
239 .row_groups()
240 .iter()
241 .map(|rg| {
242 rg.columns()
243 .iter()
244 .map(|c| c.uncompressed_size() as u64)
245 .sum::<u64>()
246 })
247 .max()
248 .unwrap_or(0);
249 let num_series = stats.series_estimator.finish();
250 ssts.push(SstInfo {
251 file_id: self.current_file,
252 time_range,
253 file_size,
254 max_row_group_uncompressed_size,
255 num_rows: stats.num_rows,
256 num_row_groups: parquet_metadata.num_row_groups() as u64,
257 file_metadata: Some(Arc::new(parquet_metadata)),
258 index_metadata: index_output,
259 num_series,
260 });
261 self.current_file = FileId::random();
262 self.bytes_written.store(0, Ordering::Relaxed)
263 };
264
265 Ok(())
266 }
267
268 pub async fn write_all_flat(
272 &mut self,
273 source: FlatSource,
274 override_sequence: Option<SequenceNumber>,
275 opts: &WriteOptions,
276 ) -> Result<SstInfoArray> {
277 let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
278
279 if source.schema().has_json_extension_field() {
280 options.concretized_json_types = source
281 .schema()
282 .fields()
283 .iter()
284 .filter(|&field| is_json_extension_type(field))
285 .map(|field| (field.name().clone(), field.data_type().clone()))
286 .collect::<HashMap<_, _>>();
287 }
288
289 let converter = FlatBatchConverter::Flat(
290 FlatWriteFormat::new(self.metadata.clone(), &options)
291 .with_override_sequence(override_sequence),
292 );
293 let res = self.write_all_flat_inner(source, &converter, opts).await;
294 if res.is_err() {
295 let file_id = self.current_file;
296 if let Some(cleaner) = &self.file_cleaner {
297 cleaner.clean_by_file_id(file_id).await;
298 }
299 }
300 res
301 }
302
303 pub async fn write_all_flat_as_primary_key(
307 &mut self,
308 source: FlatSource,
309 override_sequence: Option<SequenceNumber>,
310 opts: &WriteOptions,
311 ) -> Result<SstInfoArray> {
312 let num_fields = self.metadata.field_columns().count();
313 let converter = FlatBatchConverter::PrimaryKey {
314 format: PrimaryKeyWriteFormat::new(self.metadata.clone())
315 .with_override_sequence(override_sequence),
316 num_fields,
317 };
318 let res = self.write_all_flat_inner(source, &converter, opts).await;
319 if res.is_err() {
320 let file_id = self.current_file;
321 if let Some(cleaner) = &self.file_cleaner {
322 cleaner.clean_by_file_id(file_id).await;
323 }
324 }
325 res
326 }
327
328 async fn write_all_flat_inner(
329 &mut self,
330 mut source: FlatSource,
331 converter: &FlatBatchConverter,
332 opts: &WriteOptions,
333 ) -> Result<SstInfoArray> {
334 let mut results = smallvec![];
335 let mut stats = SourceStats::default();
336
337 while let Some(record_batch) = self
338 .write_next_flat_batch(&mut source, converter, opts)
339 .await
340 .transpose()
341 {
342 match record_batch {
343 Ok(batch) => {
344 stats.update_flat(&batch)?;
345 if matches!(self.index_config.build_mode, IndexBuildMode::Sync) {
346 let start = Instant::now();
347 self.current_indexer
349 .as_mut()
350 .unwrap()
351 .update_flat(&batch)
352 .await;
353 self.metrics.update_index += start.elapsed();
354 }
355 if let Some(max_file_size) = opts.max_file_size
356 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
357 {
358 self.finish_current_file(&mut results, &mut stats).await?;
359 }
360 }
361 Err(e) => {
362 if let Some(indexer) = &mut self.current_indexer {
363 indexer.abort().await;
364 }
365 return Err(e);
366 }
367 }
368 }
369
370 self.finish_current_file(&mut results, &mut stats).await?;
371
372 Ok(results)
374 }
375
376 fn customize_column_config(
378 builder: WriterPropertiesBuilder,
379 region_metadata: &RegionMetadataRef,
380 ) -> WriterPropertiesBuilder {
381 let ts_col = ColumnPath::new(vec![
382 region_metadata
383 .time_index_column()
384 .column_schema
385 .name
386 .clone(),
387 ]);
388 let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
389 let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
390
391 builder
392 .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
393 .set_column_dictionary_enabled(seq_col, false)
394 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
395 .set_column_dictionary_enabled(ts_col, false)
396 .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
397 }
398
399 async fn write_next_flat_batch(
400 &mut self,
401 source: &mut FlatSource,
402 converter: &FlatBatchConverter,
403 opts: &WriteOptions,
404 ) -> Result<Option<RecordBatch>> {
405 let start = Instant::now();
406 let Some(record_batch) = source.next_batch().await? else {
407 return Ok(None);
408 };
409 self.metrics.iter_source += start.elapsed();
410
411 let arrow_batch = converter.convert_batch(&record_batch)?;
412
413 let start = Instant::now();
414 self.maybe_init_writer(arrow_batch.schema_ref(), opts)
415 .await?
416 .write(&arrow_batch)
417 .await
418 .context(WriteParquetSnafu)?;
419 self.metrics.write_batch += start.elapsed();
420 Ok(Some(record_batch))
422 }
423
424 async fn maybe_init_writer(
425 &mut self,
426 schema: &SchemaRef,
427 opts: &WriteOptions,
428 ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
429 if let Some(ref mut w) = self.writer {
430 Ok(w)
431 } else {
432 let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
433 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
434
435 let props_builder = WriterProperties::builder()
437 .set_key_value_metadata(Some(vec![key_value_meta]))
438 .set_compression(Compression::ZSTD(ZstdLevel::default()))
439 .set_encoding(Encoding::PLAIN)
440 .set_max_row_group_row_count(Some(opts.row_group_size))
441 .set_column_index_truncate_length(None)
442 .set_statistics_truncate_length(None);
443
444 let props_builder = Self::customize_column_config(props_builder, &self.metadata);
445 let writer_props = props_builder.build();
446
447 let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
448 self.metadata.region_id,
449 self.current_file,
450 ));
451 let writer = SizeAwareWriter::new(
452 self.writer_factory.create(&sst_file_path).await?,
453 self.bytes_written.clone(),
454 );
455 let arrow_writer =
456 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
457 .context(WriteParquetSnafu)?;
458 self.writer = Some(arrow_writer);
459
460 let indexer = self.indexer_builder.build(self.current_file, 0).await;
461 self.current_indexer = Some(indexer);
462
463 Ok(self.writer.as_mut().unwrap())
465 }
466 }
467}
468
469#[derive(Default)]
470struct SourceStats {
471 num_rows: usize,
473 time_range: Option<(Timestamp, Timestamp)>,
475 series_estimator: SeriesEstimator,
477}
478
479impl SourceStats {
480 fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
481 if record_batch.num_rows() == 0 {
482 return Ok(());
483 }
484
485 self.num_rows += record_batch.num_rows();
486 self.series_estimator.update_flat(record_batch);
487
488 let time_index_col_idx = time_index_column_index(record_batch.num_columns());
490 let timestamp_array = record_batch.column(time_index_col_idx);
491
492 if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
493 if let Some(time_range) = &mut self.time_range {
494 time_range.0 = time_range.0.min(min_in_batch);
495 time_range.1 = time_range.1.max(max_in_batch);
496 } else {
497 self.time_range = Some((min_in_batch, max_in_batch));
498 }
499 }
500
501 Ok(())
502 }
503}
504
505fn timestamp_range_from_array(
507 timestamp_array: &ArrayRef,
508) -> Result<Option<(Timestamp, Timestamp)>> {
509 let (min_ts, max_ts) = match timestamp_array.data_type() {
510 DataType::Timestamp(TimeUnit::Second, _) => {
511 let array = timestamp_array
512 .as_any()
513 .downcast_ref::<TimestampSecondArray>()
514 .unwrap();
515 let min_val = min(array).map(Timestamp::new_second);
516 let max_val = max(array).map(Timestamp::new_second);
517 (min_val, max_val)
518 }
519 DataType::Timestamp(TimeUnit::Millisecond, _) => {
520 let array = timestamp_array
521 .as_any()
522 .downcast_ref::<TimestampMillisecondArray>()
523 .unwrap();
524 let min_val = min(array).map(Timestamp::new_millisecond);
525 let max_val = max(array).map(Timestamp::new_millisecond);
526 (min_val, max_val)
527 }
528 DataType::Timestamp(TimeUnit::Microsecond, _) => {
529 let array = timestamp_array
530 .as_any()
531 .downcast_ref::<TimestampMicrosecondArray>()
532 .unwrap();
533 let min_val = min(array).map(Timestamp::new_microsecond);
534 let max_val = max(array).map(Timestamp::new_microsecond);
535 (min_val, max_val)
536 }
537 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
538 let array = timestamp_array
539 .as_any()
540 .downcast_ref::<TimestampNanosecondArray>()
541 .unwrap();
542 let min_val = min(array).map(Timestamp::new_nanosecond);
543 let max_val = max(array).map(Timestamp::new_nanosecond);
544 (min_val, max_val)
545 }
546 _ => {
547 return UnexpectedSnafu {
548 reason: format!(
549 "Unexpected data type of time index: {:?}",
550 timestamp_array.data_type()
551 ),
552 }
553 .fail();
554 }
555 };
556
557 Ok(min_ts.zip(max_ts))
559}
560
561struct SizeAwareWriter<W> {
564 inner: W,
565 size: Arc<AtomicUsize>,
566}
567
568impl<W> SizeAwareWriter<W> {
569 fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
570 Self {
571 inner,
572 size: size.clone(),
573 }
574 }
575}
576
577impl<W> AsyncWrite for SizeAwareWriter<W>
578where
579 W: AsyncWrite + Unpin,
580{
581 fn poll_write(
582 mut self: Pin<&mut Self>,
583 cx: &mut Context<'_>,
584 buf: &[u8],
585 ) -> Poll<std::result::Result<usize, std::io::Error>> {
586 let this = self.as_mut().get_mut();
587
588 match Pin::new(&mut this.inner).poll_write(cx, buf) {
589 Poll::Ready(Ok(bytes_written)) => {
590 this.size.fetch_add(bytes_written, Ordering::Relaxed);
591 Poll::Ready(Ok(bytes_written))
592 }
593 other => other,
594 }
595 }
596
597 fn poll_flush(
598 mut self: Pin<&mut Self>,
599 cx: &mut Context<'_>,
600 ) -> Poll<std::result::Result<(), std::io::Error>> {
601 Pin::new(&mut self.inner).poll_flush(cx)
602 }
603
604 fn poll_shutdown(
605 mut self: Pin<&mut Self>,
606 cx: &mut Context<'_>,
607 ) -> Poll<std::result::Result<(), std::io::Error>> {
608 Pin::new(&mut self.inner).poll_shutdown(cx)
609 }
610}