1use std::future::Future;
18use std::mem;
19use std::pin::Pin;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::sync::Arc;
22use std::task::{Context, Poll};
23
24use common_telemetry::debug;
25use common_time::Timestamp;
26use datatypes::arrow::datatypes::SchemaRef;
27use object_store::{FuturesAsyncWriter, ObjectStore};
28use parquet::arrow::AsyncArrowWriter;
29use parquet::basic::{Compression, Encoding, ZstdLevel};
30use parquet::file::metadata::KeyValue;
31use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
32use parquet::schema::types::ColumnPath;
33use smallvec::smallvec;
34use snafu::ResultExt;
35use store_api::metadata::RegionMetadataRef;
36use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
37use store_api::storage::SequenceNumber;
38use tokio::io::AsyncWrite;
39use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
40
41use crate::access_layer::{FilePathProvider, SstInfoArray, TempFileCleaner};
42use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
43use crate::read::{Batch, Source};
44use crate::sst::file::FileId;
45use crate::sst::index::{Indexer, IndexerBuilder};
46use crate::sst::parquet::format::WriteFormat;
47use crate::sst::parquet::helper::parse_parquet_metadata;
48use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
49use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
50
51pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
53 path_provider: P,
55 writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
56 current_file: FileId,
58 writer_factory: F,
59 metadata: RegionMetadataRef,
61 indexer_builder: I,
63 current_indexer: Option<Indexer>,
65 bytes_written: Arc<AtomicUsize>,
66 file_cleaner: Option<TempFileCleaner>,
68}
69
70pub trait WriterFactory {
71 type Writer: AsyncWrite + Send + Unpin;
72 fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
73}
74
75pub struct ObjectStoreWriterFactory {
76 object_store: ObjectStore,
77}
78
79impl WriterFactory for ObjectStoreWriterFactory {
80 type Writer = Compat<FuturesAsyncWriter>;
81
82 async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
83 self.object_store
84 .writer_with(file_path)
85 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
86 .concurrent(DEFAULT_WRITE_CONCURRENCY)
87 .await
88 .map(|v| v.into_futures_async_write().compat_write())
89 .context(OpenDalSnafu)
90 }
91}
92
93impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
94where
95 P: FilePathProvider,
96 I: IndexerBuilder,
97{
98 pub async fn new_with_object_store(
99 object_store: ObjectStore,
100 metadata: RegionMetadataRef,
101 indexer_builder: I,
102 path_provider: P,
103 ) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
104 ParquetWriter::new(
105 ObjectStoreWriterFactory { object_store },
106 metadata,
107 indexer_builder,
108 path_provider,
109 )
110 .await
111 }
112
113 pub(crate) fn with_file_cleaner(mut self, cleaner: TempFileCleaner) -> Self {
114 self.file_cleaner = Some(cleaner);
115 self
116 }
117}
118
119impl<F, I, P> ParquetWriter<F, I, P>
120where
121 F: WriterFactory,
122 I: IndexerBuilder,
123 P: FilePathProvider,
124{
125 pub async fn new(
127 factory: F,
128 metadata: RegionMetadataRef,
129 indexer_builder: I,
130 path_provider: P,
131 ) -> ParquetWriter<F, I, P> {
132 let init_file = FileId::random();
133 let indexer = indexer_builder.build(init_file).await;
134
135 ParquetWriter {
136 path_provider,
137 writer: None,
138 current_file: init_file,
139 writer_factory: factory,
140 metadata,
141 indexer_builder,
142 current_indexer: Some(indexer),
143 bytes_written: Arc::new(AtomicUsize::new(0)),
144 file_cleaner: None,
145 }
146 }
147
148 async fn finish_current_file(
150 &mut self,
151 ssts: &mut SstInfoArray,
152 stats: &mut SourceStats,
153 ) -> Result<()> {
154 if let Some(mut current_writer) = mem::take(&mut self.writer) {
156 let stats = mem::take(stats);
157 assert!(stats.num_rows > 0);
159
160 debug!(
161 "Finishing current file {}, file size: {}, num rows: {}",
162 self.current_file,
163 self.bytes_written.load(Ordering::Relaxed),
164 stats.num_rows
165 );
166
167 let index_output = self.current_indexer.as_mut().unwrap().finish().await;
170 current_writer.flush().await.context(WriteParquetSnafu)?;
171
172 let file_meta = current_writer.close().await.context(WriteParquetSnafu)?;
173 let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
174
175 let time_range = stats.time_range.unwrap();
177
178 let parquet_metadata = parse_parquet_metadata(file_meta)?;
180 ssts.push(SstInfo {
181 file_id: self.current_file,
182 time_range,
183 file_size,
184 num_rows: stats.num_rows,
185 num_row_groups: parquet_metadata.num_row_groups() as u64,
186 file_metadata: Some(Arc::new(parquet_metadata)),
187 index_metadata: index_output,
188 });
189 self.current_file = FileId::random();
190 self.bytes_written.store(0, Ordering::Relaxed)
191 };
192
193 Ok(())
194 }
195
196 pub async fn write_all(
200 &mut self,
201 source: Source,
202 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
204 ) -> Result<SstInfoArray> {
205 let res = self
206 .write_all_without_cleaning(source, override_sequence, opts)
207 .await;
208 if res.is_err() {
209 let file_id = self.current_file;
211 if let Some(cleaner) = &self.file_cleaner {
212 cleaner.clean_by_file_id(file_id).await;
213 }
214 }
215 res
216 }
217
218 async fn write_all_without_cleaning(
219 &mut self,
220 mut source: Source,
221 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
223 ) -> Result<SstInfoArray> {
224 let mut results = smallvec![];
225 let write_format =
226 WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
227 let mut stats = SourceStats::default();
228
229 while let Some(res) = self
230 .write_next_batch(&mut source, &write_format, opts)
231 .await
232 .transpose()
233 {
234 match res {
235 Ok(mut batch) => {
236 stats.update(&batch);
237 self.current_indexer
239 .as_mut()
240 .unwrap()
241 .update(&mut batch)
242 .await;
243 if let Some(max_file_size) = opts.max_file_size
244 && self.bytes_written.load(Ordering::Relaxed) > max_file_size
245 {
246 self.finish_current_file(&mut results, &mut stats).await?;
247 }
248 }
249 Err(e) => {
250 if let Some(indexer) = &mut self.current_indexer {
251 indexer.abort().await;
252 }
253 return Err(e);
254 }
255 }
256 }
257
258 self.finish_current_file(&mut results, &mut stats).await?;
259
260 Ok(results)
262 }
263
264 fn customize_column_config(
266 builder: WriterPropertiesBuilder,
267 region_metadata: &RegionMetadataRef,
268 ) -> WriterPropertiesBuilder {
269 let ts_col = ColumnPath::new(vec![region_metadata
270 .time_index_column()
271 .column_schema
272 .name
273 .clone()]);
274 let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
275
276 builder
277 .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
278 .set_column_dictionary_enabled(seq_col, false)
279 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
280 .set_column_dictionary_enabled(ts_col, false)
281 }
282
283 async fn write_next_batch(
284 &mut self,
285 source: &mut Source,
286 write_format: &WriteFormat,
287 opts: &WriteOptions,
288 ) -> Result<Option<Batch>> {
289 let Some(batch) = source.next_batch().await? else {
290 return Ok(None);
291 };
292
293 let arrow_batch = write_format.convert_batch(&batch)?;
294 self.maybe_init_writer(write_format.arrow_schema(), opts)
295 .await?
296 .write(&arrow_batch)
297 .await
298 .context(WriteParquetSnafu)?;
299 Ok(Some(batch))
300 }
301
302 async fn maybe_init_writer(
303 &mut self,
304 schema: &SchemaRef,
305 opts: &WriteOptions,
306 ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
307 if let Some(ref mut w) = self.writer {
308 Ok(w)
309 } else {
310 let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
311 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
312
313 let props_builder = WriterProperties::builder()
315 .set_key_value_metadata(Some(vec![key_value_meta]))
316 .set_compression(Compression::ZSTD(ZstdLevel::default()))
317 .set_encoding(Encoding::PLAIN)
318 .set_max_row_group_size(opts.row_group_size);
319
320 let props_builder = Self::customize_column_config(props_builder, &self.metadata);
321 let writer_props = props_builder.build();
322
323 let sst_file_path = self.path_provider.build_sst_file_path(self.current_file);
324 let writer = SizeAwareWriter::new(
325 self.writer_factory.create(&sst_file_path).await?,
326 self.bytes_written.clone(),
327 );
328 let arrow_writer =
329 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
330 .context(WriteParquetSnafu)?;
331 self.writer = Some(arrow_writer);
332
333 let indexer = self.indexer_builder.build(self.current_file).await;
334 self.current_indexer = Some(indexer);
335
336 Ok(self.writer.as_mut().unwrap())
338 }
339 }
340}
341
342#[derive(Default)]
343struct SourceStats {
344 num_rows: usize,
346 time_range: Option<(Timestamp, Timestamp)>,
348}
349
350impl SourceStats {
351 fn update(&mut self, batch: &Batch) {
352 if batch.is_empty() {
353 return;
354 }
355
356 self.num_rows += batch.num_rows();
357 let (min_in_batch, max_in_batch) = (
359 batch.first_timestamp().unwrap(),
360 batch.last_timestamp().unwrap(),
361 );
362 if let Some(time_range) = &mut self.time_range {
363 time_range.0 = time_range.0.min(min_in_batch);
364 time_range.1 = time_range.1.max(max_in_batch);
365 } else {
366 self.time_range = Some((min_in_batch, max_in_batch));
367 }
368 }
369}
370
371struct SizeAwareWriter<W> {
374 inner: W,
375 size: Arc<AtomicUsize>,
376}
377
378impl<W> SizeAwareWriter<W> {
379 fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
380 Self {
381 inner,
382 size: size.clone(),
383 }
384 }
385}
386
387impl<W> AsyncWrite for SizeAwareWriter<W>
388where
389 W: AsyncWrite + Unpin,
390{
391 fn poll_write(
392 mut self: Pin<&mut Self>,
393 cx: &mut Context<'_>,
394 buf: &[u8],
395 ) -> Poll<std::result::Result<usize, std::io::Error>> {
396 let this = self.as_mut().get_mut();
397
398 match Pin::new(&mut this.inner).poll_write(cx, buf) {
399 Poll::Ready(Ok(bytes_written)) => {
400 this.size.fetch_add(bytes_written, Ordering::Relaxed);
401 Poll::Ready(Ok(bytes_written))
402 }
403 other => other,
404 }
405 }
406
407 fn poll_flush(
408 mut self: Pin<&mut Self>,
409 cx: &mut Context<'_>,
410 ) -> Poll<std::result::Result<(), std::io::Error>> {
411 Pin::new(&mut self.inner).poll_flush(cx)
412 }
413
414 fn poll_shutdown(
415 mut self: Pin<&mut Self>,
416 cx: &mut Context<'_>,
417 ) -> Poll<std::result::Result<(), std::io::Error>> {
418 Pin::new(&mut self.inner).poll_shutdown(cx)
419 }
420}