1use std::collections::HashMap;
18use std::future::Future;
19use std::mem;
20use std::pin::Pin;
21use std::sync::Arc;
22use std::sync::atomic::{AtomicUsize, Ordering};
23use std::task::{Context, Poll};
24use std::time::Instant;
25
26use common_telemetry::debug;
27use common_time::Timestamp;
28use datatypes::arrow::array::{
29 ArrayRef, TimestampMicrosecondArray, TimestampMillisecondArray, TimestampNanosecondArray,
30 TimestampSecondArray,
31};
32use datatypes::arrow::compute::{max, min};
33use datatypes::arrow::datatypes::{DataType, SchemaRef, TimeUnit};
34use datatypes::arrow::record_batch::RecordBatch;
35use datatypes::extension::json::is_structured_json_field;
36use object_store::{FuturesAsyncWriter, ObjectStore};
37use parquet::arrow::AsyncArrowWriter;
38use parquet::basic::{Compression, Encoding, ZstdLevel};
39use parquet::file::metadata::KeyValue;
40use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
41use parquet::schema::types::ColumnPath;
42use smallvec::smallvec;
43use snafu::ResultExt;
44use store_api::metadata::RegionMetadataRef;
45use store_api::storage::consts::{OP_TYPE_COLUMN_NAME, SEQUENCE_COLUMN_NAME};
46use store_api::storage::{FileId, SequenceNumber};
47use tokio::io::AsyncWrite;
48use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
49
50use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
51use crate::config::{IndexBuildMode, IndexConfig};
52use crate::error::{
53 InvalidMetadataSnafu, OpenDalSnafu, Result, UnexpectedSnafu, WriteParquetSnafu,
54};
55use crate::read::FlatSource;
56use crate::sst::file::RegionFileId;
57use crate::sst::index::{IndexOutput, Indexer, IndexerBuilder};
58use crate::sst::parquet::flat_format::{FlatWriteFormat, time_index_column_index};
59use crate::sst::parquet::format::PrimaryKeyWriteFormat;
60use crate::sst::parquet::{PARQUET_METADATA_KEY, SstInfo, WriteOptions};
61use crate::sst::{
62 DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY, FlatSchemaOptions, SeriesEstimator,
63};
64
65enum FlatBatchConverter {
67 Flat(FlatWriteFormat),
69 PrimaryKey {
71 format: PrimaryKeyWriteFormat,
72 num_fields: usize,
73 },
74}
75
76impl FlatBatchConverter {
77 fn convert_batch(&self, batch: &RecordBatch) -> Result<RecordBatch> {
78 match self {
79 FlatBatchConverter::Flat(f) => f.convert_batch(batch),
80 FlatBatchConverter::PrimaryKey { format, num_fields } => {
81 format.convert_flat_batch(batch, *num_fields)
82 }
83 }
84 }
85}
86
87pub struct ParquetWriter<'a, F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
89 path_provider: P,
91 writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
92 current_file: FileId,
94 writer_factory: F,
95 metadata: RegionMetadataRef,
97 index_config: IndexConfig,
99 indexer_builder: I,
101 current_indexer: Option<Indexer>,
103 bytes_written: Arc<AtomicUsize>,
104 file_cleaner: Option<TempFileCleaner>,
106 metrics: &'a mut Metrics,
108}
109
110pub trait WriterFactory {
111 type Writer: AsyncWrite + Send + Unpin;
112 fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
113}
114
115pub struct ObjectStoreWriterFactory {
116 object_store: ObjectStore,
117}
118
119impl WriterFactory for ObjectStoreWriterFactory {
120 type Writer = Compat<FuturesAsyncWriter>;
121
122 async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
123 self.object_store
124 .writer_with(file_path)
125 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
126 .concurrent(DEFAULT_WRITE_CONCURRENCY)
127 .await
128 .map(|v| v.into_futures_async_write().compat_write())
129 .context(OpenDalSnafu)
130 }
131}
132
133impl<'a, I, P> ParquetWriter<'a, ObjectStoreWriterFactory, I, P>
134where
135 P: FilePathProvider,
136 I: IndexerBuilder,
137{
138 pub async fn new_with_object_store(
139 object_store: ObjectStore,
140 metadata: RegionMetadataRef,
141 index_config: IndexConfig,
142 indexer_builder: I,
143 path_provider: P,
144 metrics: &'a mut Metrics,
145 ) -> ParquetWriter<'a, ObjectStoreWriterFactory, I, P> {
146 ParquetWriter::new(
147 ObjectStoreWriterFactory { object_store },
148 metadata,
149 index_config,
150 indexer_builder,
151 path_provider,
152 metrics,
153 )
154 .await
155 }
156
157 pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
158 self.file_cleaner = Some(cleaner);
159 self
160 }
161}
162
163impl<'a, F, I, P> ParquetWriter<'a, F, I, P>
164where
165 F: WriterFactory,
166 I: IndexerBuilder,
167 P: FilePathProvider,
168{
169 pub async fn new(
171 factory: F,
172 metadata: RegionMetadataRef,
173 index_config: IndexConfig,
174 indexer_builder: I,
175 path_provider: P,
176 metrics: &'a mut Metrics,
177 ) -> ParquetWriter<'a, F, I, P> {
178 let init_file = FileId::random();
179 let indexer = indexer_builder.build(init_file, 0).await;
180
181 ParquetWriter {
182 path_provider,
183 writer: None,
184 current_file: init_file,
185 writer_factory: factory,
186 metadata,
187 index_config,
188 indexer_builder,
189 current_indexer: Some(indexer),
190 bytes_written: Arc::new(AtomicUsize::new(0)),
191 file_cleaner: None,
192 metrics,
193 }
194 }
195
196 async fn finish_current_file(
198 &mut self,
199 ssts: &mut SstInfoArray,
200 stats: &mut SourceStats,
201 ) -> Result<()> {
202 if let Some(mut current_writer) = mem::take(&mut self.writer) {
204 let mut stats = mem::take(stats);
205 assert!(stats.num_rows > 0);
207
208 debug!(
209 "Finishing current file {}, file size: {}, num rows: {}",
210 self.current_file,
211 self.bytes_written.load(Ordering::Relaxed),
212 stats.num_rows
213 );
214
215 let mut index_output = IndexOutput::default();
218 match self.index_config.build_mode {
219 IndexBuildMode::Sync => {
220 index_output = self.current_indexer.as_mut().unwrap().finish().await;
221 }
222 IndexBuildMode::Async => {
223 debug!(
224 "Index for file {} will be built asynchronously later",
225 self.current_file
226 );
227 }
228 }
229 current_writer.flush().await.context(WriteParquetSnafu)?;
230
231 let parquet_metadata = current_writer.close().await.context(WriteParquetSnafu)?;
232 let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
233
234 let time_range = stats.time_range.unwrap();
236
237 let max_row_group_uncompressed_size: u64 = parquet_metadata
238 .row_groups()
239 .iter()
240 .map(|rg| {
241 rg.columns()
242 .iter()
243 .map(|c| c.uncompressed_size() as u64)
244 .sum::<u64>()
245 })
246 .max()
247 .unwrap_or(0);
248 let num_series = stats.series_estimator.finish();
249 ssts.push(SstInfo {
250 file_id: self.current_file,
251 time_range,
252 file_size,
253 max_row_group_uncompressed_size,
254 num_rows: stats.num_rows,
255 num_row_groups: parquet_metadata.num_row_groups() as u64,
256 file_metadata: Some(Arc::new(parquet_metadata)),
257 index_metadata: index_output,
258 num_series,
259 });
260 self.current_file = FileId::random();
261 self.bytes_written.store(0, Ordering::Relaxed)
262 };
263
264 Ok(())
265 }
266
267 pub async fn write_all_flat(
271 &mut self,
272 source: FlatSource,
273 override_sequence: Option<SequenceNumber>,
274 opts: &WriteOptions,
275 ) -> Result<SstInfoArray> {
276 let mut options = FlatSchemaOptions::from_encoding(self.metadata.primary_key_encoding);
277
278 if source
279 .schema()
280 .fields()
281 .iter()
282 .any(is_structured_json_field)
283 {
284 options.concretized_json_types = source
285 .schema()
286 .fields()
287 .iter()
288 .filter(|&field| is_structured_json_field(field))
289 .map(|field| (field.name().clone(), field.data_type().clone()))
290 .collect::<HashMap<_, _>>();
291 }
292
293 let converter = FlatBatchConverter::Flat(
294 FlatWriteFormat::new(self.metadata.clone(), &options)
295 .with_override_sequence(override_sequence),
296 );
297 let res = self.write_all_flat_inner(source, &converter, opts).await;
298 if res.is_err() {
299 let file_id = self.current_file;
300 if let Some(cleaner) = &self.file_cleaner {
301 cleaner.clean_by_file_id(file_id).await;
302 }
303 }
304 res
305 }
306
307 pub async fn write_all_flat_as_primary_key(
311 &mut self,
312 source: FlatSource,
313 override_sequence: Option<SequenceNumber>,
314 opts: &WriteOptions,
315 ) -> Result<SstInfoArray> {
316 let num_fields = self.metadata.field_columns().count();
317 let converter = FlatBatchConverter::PrimaryKey {
318 format: PrimaryKeyWriteFormat::new(self.metadata.clone())
319 .with_override_sequence(override_sequence),
320 num_fields,
321 };
322 let res = self.write_all_flat_inner(source, &converter, opts).await;
323 if res.is_err() {
324 let file_id = self.current_file;
325 if let Some(cleaner) = &self.file_cleaner {
326 cleaner.clean_by_file_id(file_id).await;
327 }
328 }
329 res
330 }
331
332 async fn write_all_flat_inner(
333 &mut self,
334 mut source: FlatSource,
335 converter: &FlatBatchConverter,
336 opts: &WriteOptions,
337 ) -> Result<SstInfoArray> {
338 let mut results = smallvec![];
339 let mut stats = SourceStats::default();
340
341 while let Some(record_batch) = self
342 .write_next_flat_batch(&mut source, converter, opts)
343 .await
344 .transpose()
345 {
346 match record_batch {
347 Ok(batch) => {
348 stats.update_flat(&batch)?;
349 if matches!(self.index_config.build_mode, IndexBuildMode::Sync) {
350 let start = Instant::now();
351 self.current_indexer
353 .as_mut()
354 .unwrap()
355 .update_flat(&batch)
356 .await;
357 self.metrics.update_index += start.elapsed();
358 }
359 if let Some(max_file_size) = opts.max_file_size
360 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
361 {
362 self.finish_current_file(&mut results, &mut stats).await?;
363 }
364 }
365 Err(e) => {
366 if let Some(indexer) = &mut self.current_indexer {
367 indexer.abort().await;
368 }
369 return Err(e);
370 }
371 }
372 }
373
374 self.finish_current_file(&mut results, &mut stats).await?;
375
376 Ok(results)
378 }
379
380 fn customize_column_config(
382 builder: WriterPropertiesBuilder,
383 region_metadata: &RegionMetadataRef,
384 ) -> WriterPropertiesBuilder {
385 let ts_col = ColumnPath::new(vec![
386 region_metadata
387 .time_index_column()
388 .column_schema
389 .name
390 .clone(),
391 ]);
392 let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
393 let op_type_col = ColumnPath::new(vec![OP_TYPE_COLUMN_NAME.to_string()]);
394
395 builder
396 .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
397 .set_column_dictionary_enabled(seq_col, false)
398 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
399 .set_column_dictionary_enabled(ts_col, false)
400 .set_column_compression(op_type_col, Compression::UNCOMPRESSED)
401 }
402
403 async fn write_next_flat_batch(
404 &mut self,
405 source: &mut FlatSource,
406 converter: &FlatBatchConverter,
407 opts: &WriteOptions,
408 ) -> Result<Option<RecordBatch>> {
409 let start = Instant::now();
410 let Some(record_batch) = source.next_batch().await? else {
411 return Ok(None);
412 };
413 self.metrics.iter_source += start.elapsed();
414
415 let arrow_batch = converter.convert_batch(&record_batch)?;
416
417 let start = Instant::now();
418 self.maybe_init_writer(arrow_batch.schema_ref(), opts)
419 .await?
420 .write(&arrow_batch)
421 .await
422 .context(WriteParquetSnafu)?;
423 self.metrics.write_batch += start.elapsed();
424 Ok(Some(record_batch))
426 }
427
428 async fn maybe_init_writer(
429 &mut self,
430 schema: &SchemaRef,
431 opts: &WriteOptions,
432 ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
433 if let Some(ref mut w) = self.writer {
434 Ok(w)
435 } else {
436 let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
437 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
438
439 let props_builder = WriterProperties::builder()
441 .set_key_value_metadata(Some(vec![key_value_meta]))
442 .set_compression(Compression::ZSTD(ZstdLevel::default()))
443 .set_encoding(Encoding::PLAIN)
444 .set_max_row_group_row_count(Some(opts.row_group_size))
445 .set_column_index_truncate_length(None)
446 .set_statistics_truncate_length(None);
447
448 let props_builder = Self::customize_column_config(props_builder, &self.metadata);
449 let writer_props = props_builder.build();
450
451 let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
452 self.metadata.region_id,
453 self.current_file,
454 ));
455 let writer = SizeAwareWriter::new(
456 self.writer_factory.create(&sst_file_path).await?,
457 self.bytes_written.clone(),
458 );
459 let arrow_writer =
460 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
461 .context(WriteParquetSnafu)?;
462 self.writer = Some(arrow_writer);
463
464 let indexer = self.indexer_builder.build(self.current_file, 0).await;
465 self.current_indexer = Some(indexer);
466
467 Ok(self.writer.as_mut().unwrap())
469 }
470 }
471}
472
473#[derive(Default)]
474struct SourceStats {
475 num_rows: usize,
477 time_range: Option<(Timestamp, Timestamp)>,
479 series_estimator: SeriesEstimator,
481}
482
483impl SourceStats {
484 fn update_flat(&mut self, record_batch: &RecordBatch) -> Result<()> {
485 if record_batch.num_rows() == 0 {
486 return Ok(());
487 }
488
489 self.num_rows += record_batch.num_rows();
490 self.series_estimator.update_flat(record_batch);
491
492 let time_index_col_idx = time_index_column_index(record_batch.num_columns());
494 let timestamp_array = record_batch.column(time_index_col_idx);
495
496 if let Some((min_in_batch, max_in_batch)) = timestamp_range_from_array(timestamp_array)? {
497 if let Some(time_range) = &mut self.time_range {
498 time_range.0 = time_range.0.min(min_in_batch);
499 time_range.1 = time_range.1.max(max_in_batch);
500 } else {
501 self.time_range = Some((min_in_batch, max_in_batch));
502 }
503 }
504
505 Ok(())
506 }
507}
508
509fn timestamp_range_from_array(
511 timestamp_array: &ArrayRef,
512) -> Result<Option<(Timestamp, Timestamp)>> {
513 let (min_ts, max_ts) = match timestamp_array.data_type() {
514 DataType::Timestamp(TimeUnit::Second, _) => {
515 let array = timestamp_array
516 .as_any()
517 .downcast_ref::<TimestampSecondArray>()
518 .unwrap();
519 let min_val = min(array).map(Timestamp::new_second);
520 let max_val = max(array).map(Timestamp::new_second);
521 (min_val, max_val)
522 }
523 DataType::Timestamp(TimeUnit::Millisecond, _) => {
524 let array = timestamp_array
525 .as_any()
526 .downcast_ref::<TimestampMillisecondArray>()
527 .unwrap();
528 let min_val = min(array).map(Timestamp::new_millisecond);
529 let max_val = max(array).map(Timestamp::new_millisecond);
530 (min_val, max_val)
531 }
532 DataType::Timestamp(TimeUnit::Microsecond, _) => {
533 let array = timestamp_array
534 .as_any()
535 .downcast_ref::<TimestampMicrosecondArray>()
536 .unwrap();
537 let min_val = min(array).map(Timestamp::new_microsecond);
538 let max_val = max(array).map(Timestamp::new_microsecond);
539 (min_val, max_val)
540 }
541 DataType::Timestamp(TimeUnit::Nanosecond, _) => {
542 let array = timestamp_array
543 .as_any()
544 .downcast_ref::<TimestampNanosecondArray>()
545 .unwrap();
546 let min_val = min(array).map(Timestamp::new_nanosecond);
547 let max_val = max(array).map(Timestamp::new_nanosecond);
548 (min_val, max_val)
549 }
550 _ => {
551 return UnexpectedSnafu {
552 reason: format!(
553 "Unexpected data type of time index: {:?}",
554 timestamp_array.data_type()
555 ),
556 }
557 .fail();
558 }
559 };
560
561 Ok(min_ts.zip(max_ts))
563}
564
565struct SizeAwareWriter<W> {
568 inner: W,
569 size: Arc<AtomicUsize>,
570}
571
572impl<W> SizeAwareWriter<W> {
573 fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
574 Self {
575 inner,
576 size: size.clone(),
577 }
578 }
579}
580
581impl<W> AsyncWrite for SizeAwareWriter<W>
582where
583 W: AsyncWrite + Unpin,
584{
585 fn poll_write(
586 mut self: Pin<&mut Self>,
587 cx: &mut Context<'_>,
588 buf: &[u8],
589 ) -> Poll<std::result::Result<usize, std::io::Error>> {
590 let this = self.as_mut().get_mut();
591
592 match Pin::new(&mut this.inner).poll_write(cx, buf) {
593 Poll::Ready(Ok(bytes_written)) => {
594 this.size.fetch_add(bytes_written, Ordering::Relaxed);
595 Poll::Ready(Ok(bytes_written))
596 }
597 other => other,
598 }
599 }
600
601 fn poll_flush(
602 mut self: Pin<&mut Self>,
603 cx: &mut Context<'_>,
604 ) -> Poll<std::result::Result<(), std::io::Error>> {
605 Pin::new(&mut self.inner).poll_flush(cx)
606 }
607
608 fn poll_shutdown(
609 mut self: Pin<&mut Self>,
610 cx: &mut Context<'_>,
611 ) -> Poll<std::result::Result<(), std::io::Error>> {
612 Pin::new(&mut self.inner).poll_shutdown(cx)
613 }
614}