1use std::future::Future;
18use std::pin::Pin;
19use std::sync::atomic::{AtomicUsize, Ordering};
20use std::sync::Arc;
21use std::task::{Context, Poll};
22
23use common_time::Timestamp;
24use datatypes::arrow::datatypes::SchemaRef;
25use object_store::{FuturesAsyncWriter, ObjectStore};
26use parquet::arrow::AsyncArrowWriter;
27use parquet::basic::{Compression, Encoding, ZstdLevel};
28use parquet::file::metadata::KeyValue;
29use parquet::file::properties::{WriterProperties, WriterPropertiesBuilder};
30use parquet::schema::types::ColumnPath;
31use smallvec::smallvec;
32use snafu::ResultExt;
33use store_api::metadata::RegionMetadataRef;
34use store_api::storage::consts::SEQUENCE_COLUMN_NAME;
35use store_api::storage::SequenceNumber;
36use tokio::io::AsyncWrite;
37use tokio_util::compat::{Compat, FuturesAsyncWriteCompatExt};
38
39use crate::access_layer::{FilePathProvider, SstInfoArray};
40use crate::error::{InvalidMetadataSnafu, OpenDalSnafu, Result, WriteParquetSnafu};
41use crate::read::{Batch, Source};
42use crate::sst::file::FileId;
43use crate::sst::index::{Indexer, IndexerBuilder};
44use crate::sst::parquet::format::WriteFormat;
45use crate::sst::parquet::helper::parse_parquet_metadata;
46use crate::sst::parquet::{SstInfo, WriteOptions, PARQUET_METADATA_KEY};
47use crate::sst::{DEFAULT_WRITE_BUFFER_SIZE, DEFAULT_WRITE_CONCURRENCY};
48
49pub struct ParquetWriter<F: WriterFactory, I: IndexerBuilder, P: FilePathProvider> {
51 path_provider: P,
53 writer: Option<AsyncArrowWriter<SizeAwareWriter<F::Writer>>>,
54 current_file: FileId,
56 writer_factory: F,
57 metadata: RegionMetadataRef,
59 indexer_builder: I,
61 current_indexer: Option<Indexer>,
63 bytes_written: Arc<AtomicUsize>,
64}
65
66pub trait WriterFactory {
67 type Writer: AsyncWrite + Send + Unpin;
68 fn create(&mut self, file_path: &str) -> impl Future<Output = Result<Self::Writer>>;
69}
70
71pub struct ObjectStoreWriterFactory {
72 object_store: ObjectStore,
73}
74
75impl WriterFactory for ObjectStoreWriterFactory {
76 type Writer = Compat<FuturesAsyncWriter>;
77
78 async fn create(&mut self, file_path: &str) -> Result<Self::Writer> {
79 self.object_store
80 .writer_with(file_path)
81 .chunk(DEFAULT_WRITE_BUFFER_SIZE.as_bytes() as usize)
82 .concurrent(DEFAULT_WRITE_CONCURRENCY)
83 .await
84 .map(|v| v.into_futures_async_write().compat_write())
85 .context(OpenDalSnafu)
86 }
87}
88
89impl<I, P> ParquetWriter<ObjectStoreWriterFactory, I, P>
90where
91 P: FilePathProvider,
92 I: IndexerBuilder,
93{
94 pub async fn new_with_object_store(
95 object_store: ObjectStore,
96 metadata: RegionMetadataRef,
97 indexer_builder: I,
98 path_provider: P,
99 ) -> ParquetWriter<ObjectStoreWriterFactory, I, P> {
100 ParquetWriter::new(
101 ObjectStoreWriterFactory { object_store },
102 metadata,
103 indexer_builder,
104 path_provider,
105 )
106 .await
107 }
108}
109
110impl<F, I, P> ParquetWriter<F, I, P>
111where
112 F: WriterFactory,
113 I: IndexerBuilder,
114 P: FilePathProvider,
115{
116 pub async fn new(
118 factory: F,
119 metadata: RegionMetadataRef,
120 indexer_builder: I,
121 path_provider: P,
122 ) -> ParquetWriter<F, I, P> {
123 let init_file = FileId::random();
124 let indexer = indexer_builder.build(init_file).await;
125
126 ParquetWriter {
127 path_provider,
128 writer: None,
129 current_file: init_file,
130 writer_factory: factory,
131 metadata,
132 indexer_builder,
133 current_indexer: Some(indexer),
134 bytes_written: Arc::new(AtomicUsize::new(0)),
135 }
136 }
137
138 async fn get_or_create_indexer(&mut self) -> &mut Indexer {
139 match self.current_indexer {
140 None => {
141 self.current_file = FileId::random();
142 let indexer = self.indexer_builder.build(self.current_file).await;
143 self.current_indexer = Some(indexer);
144 self.current_indexer.as_mut().unwrap()
146 }
147 Some(ref mut indexer) => indexer,
148 }
149 }
150
151 pub async fn write_all(
155 &mut self,
156 mut source: Source,
157 override_sequence: Option<SequenceNumber>, opts: &WriteOptions,
159 ) -> Result<SstInfoArray> {
160 let write_format =
161 WriteFormat::new(self.metadata.clone()).with_override_sequence(override_sequence);
162 let mut stats = SourceStats::default();
163
164 while let Some(res) = self
165 .write_next_batch(&mut source, &write_format, opts)
166 .await
167 .transpose()
168 {
169 match res {
170 Ok(mut batch) => {
171 stats.update(&batch);
172 self.get_or_create_indexer().await.update(&mut batch).await;
173 }
174 Err(e) => {
175 self.get_or_create_indexer().await.abort().await;
176 return Err(e);
177 }
178 }
179 }
180
181 let index_output = self.get_or_create_indexer().await.finish().await;
182
183 if stats.num_rows == 0 {
184 return Ok(smallvec![]);
185 }
186
187 let Some(mut arrow_writer) = self.writer.take() else {
188 return Ok(smallvec![]);
190 };
191
192 arrow_writer.flush().await.context(WriteParquetSnafu)?;
193
194 let file_meta = arrow_writer.close().await.context(WriteParquetSnafu)?;
195 let file_size = self.bytes_written.load(Ordering::Relaxed) as u64;
196
197 let time_range = stats.time_range.unwrap();
199
200 let parquet_metadata = parse_parquet_metadata(file_meta)?;
202
203 let file_id = self.current_file;
204
205 Ok(smallvec![SstInfo {
207 file_id,
208 time_range,
209 file_size,
210 num_rows: stats.num_rows,
211 num_row_groups: parquet_metadata.num_row_groups() as u64,
212 file_metadata: Some(Arc::new(parquet_metadata)),
213 index_metadata: index_output,
214 }])
215 }
216
217 fn customize_column_config(
219 builder: WriterPropertiesBuilder,
220 region_metadata: &RegionMetadataRef,
221 ) -> WriterPropertiesBuilder {
222 let ts_col = ColumnPath::new(vec![region_metadata
223 .time_index_column()
224 .column_schema
225 .name
226 .clone()]);
227 let seq_col = ColumnPath::new(vec![SEQUENCE_COLUMN_NAME.to_string()]);
228
229 builder
230 .set_column_encoding(seq_col.clone(), Encoding::DELTA_BINARY_PACKED)
231 .set_column_dictionary_enabled(seq_col, false)
232 .set_column_encoding(ts_col.clone(), Encoding::DELTA_BINARY_PACKED)
233 .set_column_dictionary_enabled(ts_col, false)
234 }
235
236 async fn write_next_batch(
237 &mut self,
238 source: &mut Source,
239 write_format: &WriteFormat,
240 opts: &WriteOptions,
241 ) -> Result<Option<Batch>> {
242 let Some(batch) = source.next_batch().await? else {
243 return Ok(None);
244 };
245
246 let arrow_batch = write_format.convert_batch(&batch)?;
247 self.maybe_init_writer(write_format.arrow_schema(), opts)
248 .await?
249 .write(&arrow_batch)
250 .await
251 .context(WriteParquetSnafu)?;
252 Ok(Some(batch))
253 }
254
255 async fn maybe_init_writer(
256 &mut self,
257 schema: &SchemaRef,
258 opts: &WriteOptions,
259 ) -> Result<&mut AsyncArrowWriter<SizeAwareWriter<F::Writer>>> {
260 if let Some(ref mut w) = self.writer {
261 Ok(w)
262 } else {
263 let json = self.metadata.to_json().context(InvalidMetadataSnafu)?;
264 let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
265
266 let props_builder = WriterProperties::builder()
268 .set_key_value_metadata(Some(vec![key_value_meta]))
269 .set_compression(Compression::ZSTD(ZstdLevel::default()))
270 .set_encoding(Encoding::PLAIN)
271 .set_max_row_group_size(opts.row_group_size);
272
273 let props_builder = Self::customize_column_config(props_builder, &self.metadata);
274 let writer_props = props_builder.build();
275
276 let sst_file_path = self.path_provider.build_sst_file_path(self.current_file);
277 let writer = SizeAwareWriter::new(
278 self.writer_factory.create(&sst_file_path).await?,
279 self.bytes_written.clone(),
280 );
281 let arrow_writer =
282 AsyncArrowWriter::try_new(writer, schema.clone(), Some(writer_props))
283 .context(WriteParquetSnafu)?;
284 self.writer = Some(arrow_writer);
285 Ok(self.writer.as_mut().unwrap())
287 }
288 }
289}
290
291#[derive(Default)]
292struct SourceStats {
293 num_rows: usize,
295 time_range: Option<(Timestamp, Timestamp)>,
297}
298
299impl SourceStats {
300 fn update(&mut self, batch: &Batch) {
301 if batch.is_empty() {
302 return;
303 }
304
305 self.num_rows += batch.num_rows();
306 let (min_in_batch, max_in_batch) = (
308 batch.first_timestamp().unwrap(),
309 batch.last_timestamp().unwrap(),
310 );
311 if let Some(time_range) = &mut self.time_range {
312 time_range.0 = time_range.0.min(min_in_batch);
313 time_range.1 = time_range.1.max(max_in_batch);
314 } else {
315 self.time_range = Some((min_in_batch, max_in_batch));
316 }
317 }
318}
319
320struct SizeAwareWriter<W> {
323 inner: W,
324 size: Arc<AtomicUsize>,
325}
326
327impl<W> SizeAwareWriter<W> {
328 fn new(inner: W, size: Arc<AtomicUsize>) -> Self {
329 Self {
330 inner,
331 size: size.clone(),
332 }
333 }
334}
335
336impl<W> AsyncWrite for SizeAwareWriter<W>
337where
338 W: AsyncWrite + Unpin,
339{
340 fn poll_write(
341 mut self: Pin<&mut Self>,
342 cx: &mut Context<'_>,
343 buf: &[u8],
344 ) -> Poll<std::result::Result<usize, std::io::Error>> {
345 let this = self.as_mut().get_mut();
346
347 match Pin::new(&mut this.inner).poll_write(cx, buf) {
348 Poll::Ready(Ok(bytes_written)) => {
349 this.size.fetch_add(bytes_written, Ordering::Relaxed);
350 Poll::Ready(Ok(bytes_written))
351 }
352 other => other,
353 }
354 }
355
356 fn poll_flush(
357 mut self: Pin<&mut Self>,
358 cx: &mut Context<'_>,
359 ) -> Poll<std::result::Result<(), std::io::Error>> {
360 Pin::new(&mut self.inner).poll_flush(cx)
361 }
362
363 fn poll_shutdown(
364 mut self: Pin<&mut Self>,
365 cx: &mut Context<'_>,
366 ) -> Poll<std::result::Result<(), std::io::Error>> {
367 Pin::new(&mut self.inner).poll_shutdown(cx)
368 }
369}