1use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::task::{Context, Poll};
23use std::time::Instant;
24
25use common_telemetry::debug;
26use common_time::Timestamp;
27use datatypes::arrow::datatypes::SchemaRef;
28use object_store::{FuturesAsyncWriter, ObjectStore};
29use parquet::arrow::AsyncArrowWriter;
30use parquet::basic::{Compression, Encoding, ZstdLevel};
31use parquet::file::metadata::KeyValue;
32use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
33use parquet::schema::types::ColumnPath;
34use smallvec::smallvec;
35use snafu::ResultExt;
36use store_api::metadata::RegionMetadataRef;
37use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
38use store_api::storage::SequenceNumber;
39use tokio::io::AsyncWrite;
40use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
41
42use crate::access_layer::{FilePathProvider, Metrics, SstInfoArray, TempFileCleaner};
43use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
44use crate::read::{Batch, Source};
45use crate::sst::file::{FileId, RegionFileId};
46use crate::sst::index::{Indexer, IndexerBuilder};
47use crate::sst::parquet::format::WriteFormat;
48use crate::sst::parquet::helper::parse_parquet_metadata;
49use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
50use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
51
52pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
54 path_provider: P,
56 writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
57 current_file: FileId,
59 writer_factory: F,
60 metadata: RegionMetadataRef,
62 indexer_builder: I,
64 current_indexer: Option<Indexer>,
66 bytes_written: Arc<AtomicUsize>,
67 file_cleaner: Option<TempFileCleaner>,
69 metrics: Metrics,
71}
72
73pub trait WriterFactory {
74 type Writer: AsyncWrite + Send + Unpin;
75 fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
76}
77
78pub struct ObjectStoreWriterFactory {
79 object_store: ObjectStore,
80}
81
82impl WriterFactory for ObjectStoreWriterFactory {
83 type Writer = Compat<FuturesAsyncWriter>;
84
85 async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
86 self.object_store
87 .writer_with(file_path)
88 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
89 .concurrent(DEFAULT_WRITE_CONCURRENCY)
90 .await
91 .map(|v| v.into_futures_async_write().compat_write())
92 .context(OpenDalSnafu)
93 }
94}
95
96impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
97where
98 P: FilePathProvider,
99 I: IndexerBuilder,
100{
101 pub async fn new_with_object_store(
102 object_store: ObjectStore,
103 metadata: RegionMetadataRef,
104 indexer_builder: I,
105 path_provider: P,
106 metrics: Metrics,
107 ) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
108 ParquetWriter::new(
109 ObjectStoreWriterFactory { object_store },
110 metadata,
111 indexer_builder,
112 path_provider,
113 metrics,
114 )
115 .await
116 }
117
118 pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
119 self.file_cleaner = Some(cleaner);
120 self
121 }
122}
123
124impl<F, I, P> ParquetWriter<F, I, P>
125where
126 F: WriterFactory,
127 I: IndexerBuilder,
128 P: FilePathProvider,
129{
130 pub async fn new(
132 factory: F,
133 metadata: RegionMetadataRef,
134 indexer_builder: I,
135 path_provider: P,
136 metrics: Metrics,
137 ) -> ParquetWriter<F, I, P> {
138 let init_file = FileId::random();
139 let indexer = indexer_builder.build(init_file).await;
140
141 ParquetWriter {
142 path_provider,
143 writer: None,
144 current_file: init_file,
145 writer_factory: factory,
146 metadata,
147 indexer_builder,
148 current_indexer: Some(indexer),
149 bytes_written: Arc::new(AtomicUsize::new(0)),
150 file_cleaner: None,
151 metrics,
152 }
153 }
154
155 async fn finish_current_file(
157 &mut self,
158 ssts: &mut SstInfoArray,
159 stats: &mut SourceStats,
160 ) -> Result<()> {
161 if let Some(mut current_writer) = mem::take(&mut self.writer) {
163 let stats = mem::take(stats);
164 assert!(stats.num_rows > 0);
166
167 debug!(
168 "Finishing current file {}, file size: {}, num rows: {}",
169 self.current_file,
170 self.bytes_written.load(Ordering::Relaxed),
171 stats.num_rows
172 );
173
174 let index_output = self.current_indexer.as_mut().unwrap().finish().await;
177 current_writer.flush().await.context(WriteParquetSnafu)?;
178
179 let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
180 let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
181
182 let time_range = stats.time_range.unwrap();
184
185 let parquet_metadata = parse_parquet_metadata(file_meta)?;
187 ssts.push(SstInfo {
188 file_id: self.current_file,
189 time_range,
190 file_size,
191 num_rows: stats.num_rows,
192 num_row_groups: parquet_metadata.num_row_groups() as u64,
193 file_metadata: Some(Arc::new(parquet_metadata)),
194 index_metadata: index_output,
195 });
196 self.current_file = FileId::random();
197 self.bytes_written.store(0, Ordering::Relaxed)
198 };
199
200 Ok(())
201 }
202
203 pub async fn write_all(
207 &mut self,
208 source: Source,
209 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
211 ) -> Result<SstInfoArray> {
212 let res = self
213 .write_all_without_cleaning(source, override_sequence, opts)
214 .await;
215 if res.is_err() {
216 let file_id = self.current_file;
218 if let Some(cleaner) = &self.file_cleaner {
219 cleaner.clean_by_file_id(file_id).await;
220 }
221 }
222 res
223 }
224
225 async fn write_all_without_cleaning(
226 &mut self,
227 mut source: Source,
228 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
230 ) -> Result<SstInfoArray> {
231 let mut results = smallvec![];
232 let write_format =
233 WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
234 let mut stats = SourceStats::default();
235
236 while let Some(res) = self
237 .write_next_batch(&mut source, &write_format, opts)
238 .await
239 .transpose()
240 {
241 match res {
242 Ok(mut batch) => {
243 stats.update(&batch);
244 let start = Instant::now();
245 self.current_indexer
247 .as_mut()
248 .unwrap()
249 .update(&mut batch)
250 .await;
251 self.metrics.update_index += start.elapsed();
252 if let Some(max_file_size) = opts.max_file_size
253 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
254 {
255 self.finish_current_file(&mut results, &mut stats).await?;
256 }
257 }
258 Err(e) => {
259 if let Some(indexer) = &mut self.current_indexer {
260 indexer.abort().await;
261 }
262 return Err(e);
263 }
264 }
265 }
266
267 self.finish_current_file(&mut results, &mut stats).await?;
268
269 Ok(results)
271 }
272
273 fn customize_column_config(
275 builder: WriterPropertiesBuilder,
276 region_metadata: &RegionMetadataRef,
277 ) -> WriterPropertiesBuilder {
278 let ts_col = ColumnPath::new(vec![region_metadata
279 .time_index_column()
280 .column_schema
281 .name
282 .clone()]);
283 let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
284
285 builder
286 .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
287 .set_column_dictionary_enabled(seq_col, false)
288 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
289 .set_column_dictionary_enabled(ts_col, false)
290 }
291
292 async fn write_next_batch(
293 &mut self,
294 source: &mut Source,
295 write_format: &WriteFormat,
296 opts: &WriteOptions,
297 ) -> Result<Option<Batch>> {
298 let start = Instant::now();
299 let Some(batch) = source.next_batch().await? else {
300 return Ok(None);
301 };
302 self.metrics.iter_source += start.elapsed();
303
304 let arrow_batch = write_format.convert_batch(&batch)?;
305
306 let start = Instant::now();
307 self.maybe_init_writer(write_format.arrow_schema(), opts)
308 .await?
309 .write(&arrow_batch)
310 .await
311 .context(WriteParquetSnafu)?;
312 self.metrics.write_batch += start.elapsed();
313 Ok(Some(batch))
314 }
315
316 async fn maybe_init_writer(
317 &mut self,
318 schema: &SchemaRef,
319 opts: &WriteOptions,
320 ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
321 if let Some(ref mut w) = self.writer {
322 Ok(w)
323 } else {
324 let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
325 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
326
327 let props_builder = WriterProperties::builder()
329 .set_key_value_metadata(Some(vec![key_value_meta]))
330 .set_compression(Compression::ZSTD(ZstdLevel::default()))
331 .set_encoding(Encoding::PLAIN)
332 .set_max_row_group_size(opts.row_group_size);
333
334 let props_builder = Self::customize_column_config(props_builder, &self.metadata);
335 let writer_props = props_builder.build();
336
337 let sst_file_path = self.path_provider.build_sst_file_path(RegionFileId::new(
338 self.metadata.region_id,
339 self.current_file,
340 ));
341 let writer = SizeAwareWriter::new(
342 self.writer_factory.create(&sst_file_path).await?,
343 self.bytes_written.clone(),
344 );
345 let arrow_writer =
346 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
347 .context(WriteParquetSnafu)?;
348 self.writer = Some(arrow_writer);
349
350 let indexer = self.indexer_builder.build(self.current_file).await;
351 self.current_indexer = Some(indexer);
352
353 Ok(self.writer.as_mut().unwrap())
355 }
356 }
357
358 pub fn into_metrics(self) -> Metrics {
360 self.metrics
361 }
362}
363
364#[derive(Default)]
365struct SourceStats {
366 num_rows: usize,
368 time_range: Option<(Timestamp, Timestamp)>,
370}
371
372impl SourceStats {
373 fn update(&mut self, batch: &Batch) {
374 if batch.is_empty() {
375 return;
376 }
377
378 self.num_rows += batch.num_rows();
379 let (min_in_batch, max_in_batch) = (
381 batch.first_timestamp().unwrap(),
382 batch.last_timestamp().unwrap(),
383 );
384 if let Some(time_range) = &mut self.time_range {
385 time_range.0 = time_range.0.min(min_in_batch);
386 time_range.1 = time_range.1.max(max_in_batch);
387 } else {
388 self.time_range = Some((min_in_batch, max_in_batch));
389 }
390 }
391}
392
393struct SizeAwareWriter<W> {
396 inner: W,
397 size: Arc<AtomicUsize>,
398}
399
400impl<W> SizeAwareWriter<W> {
401 fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
402 Self {
403 inner,
404 size: size.clone(),
405 }
406 }
407}
408
409impl<W> AsyncWrite for SizeAwareWriter<W>
410where
411 W: AsyncWrite + Unpin,
412{
413 fn poll_write(
414 mut self: Pin<&mut Self>,
415 cx: &mut Context<'_>,
416 buf: &[u8],
417 ) -> Poll<std::result::Result<usize, std::io::Error>> {
418 let this = self.as_mut().get_mut();
419
420 match Pin::new(&mut this.inner).poll_write(cx, buf) {
421 Poll::Ready(Ok(bytes_written)) => {
422 this.size.fetch_add(bytes_written, Ordering::Relaxed);
423 Poll::Ready(Ok(bytes_written))
424 }
425 other => other,
426 }
427 }
428
429 fn poll_flush(
430 mut self: Pin<&mut Self>,
431 cx: &mut Context<'_>,
432 ) -> Poll<std::result::Result<(), std::io::Error>> {
433 Pin::new(&mut self.inner).poll_flush(cx)
434 }
435
436 fn poll_shutdown(
437 mut self: Pin<&mut Self>,
438 cx: &mut Context<'_>,
439 ) -> Poll<std::result::Result<(), std::io::Error>> {
440 Pin::new(&mut self.inner).poll_shutdown(cx)
441 }
442}