use std::sync::Arc;
use common_base::readable_size::ReadableSize;
use parquet::file::metadata::ParquetMetaData;
use crate::sst::file::{FileId, FileTimeRange};
use crate::sst::index::IndexOutput;
use crate::sst::DEFAULT_WRITE_BUFFER_SIZE;
pub(crate) mod file_range;
pub mod format;
pub(crate) mod helper;
pub(crate) mod metadata;
pub(crate) mod page_reader;
pub mod reader;
pub mod row_group;
mod row_selection;
pub(crate) mod stats;
pub mod writer;
pub const PARQUET_METADATA_KEY: &str = "greptime:metadata";
pub(crate) const DEFAULT_READ_BATCH_SIZE: usize = 1024;
pub(crate) const DEFAULT_ROW_GROUP_SIZE: usize = 100 * DEFAULT_READ_BATCH_SIZE;
#[derive(Debug)]
pub struct WriteOptions {
pub write_buffer_size: ReadableSize,
pub row_group_size: usize,
}
impl Default for WriteOptions {
fn default() -> Self {
WriteOptions {
write_buffer_size: DEFAULT_WRITE_BUFFER_SIZE,
row_group_size: DEFAULT_ROW_GROUP_SIZE,
}
}
}
#[derive(Debug)]
pub struct SstInfo {
pub file_id: FileId,
pub time_range: FileTimeRange,
pub file_size: u64,
pub num_rows: usize,
pub num_row_groups: u64,
pub file_metadata: Option<Arc<ParquetMetaData>>,
pub index_metadata: IndexOutput,
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use common_time::Timestamp;
use datafusion_common::{Column, ScalarValue};
use datafusion_expr::{BinaryExpr, Expr, Operator};
use datatypes::arrow;
use datatypes::arrow::array::RecordBatch;
use datatypes::arrow::datatypes::{DataType, Field, Schema};
use parquet::arrow::AsyncArrowWriter;
use parquet::basic::{Compression, Encoding, ZstdLevel};
use parquet::file::metadata::KeyValue;
use parquet::file::properties::WriterProperties;
use table::predicate::Predicate;
use tokio_util::compat::FuturesAsyncWriteCompatExt;
use super::*;
use crate::access_layer::FilePathProvider;
use crate::cache::{CacheManager, CacheStrategy, PageKey};
use crate::sst::index::{Indexer, IndexerBuilder};
use crate::sst::parquet::format::WriteFormat;
use crate::sst::parquet::reader::ParquetReaderBuilder;
use crate::sst::parquet::writer::ParquetWriter;
use crate::sst::{location, DEFAULT_WRITE_CONCURRENCY};
use crate::test_util::sst_util::{
assert_parquet_metadata_eq, build_test_binary_test_region_metadata, new_batch_by_range,
new_batch_with_binary, new_source, sst_file_handle, sst_region_metadata,
};
use crate::test_util::{check_reader_result, TestEnv};
const FILE_DIR: &str = "/";
#[derive(Clone)]
struct FixedPathProvider {
file_id: FileId,
}
impl FilePathProvider for FixedPathProvider {
fn build_index_file_path(&self, _file_id: FileId) -> String {
location::index_file_path(FILE_DIR, self.file_id)
}
fn build_sst_file_path(&self, _file_id: FileId) -> String {
location::sst_file_path(FILE_DIR, self.file_id)
}
}
struct NoopIndexBuilder;
#[async_trait::async_trait]
impl IndexerBuilder for NoopIndexBuilder {
async fn build(&self, _file_id: FileId) -> Indexer {
Indexer::default()
}
}
#[tokio::test]
async fn test_write_read() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = FixedPathProvider {
file_id: handle.file_id(),
};
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
NoopIndexBuilder,
file_path,
)
.await;
let info = writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
assert_eq!(200, info.num_rows);
assert!(info.file_size > 0);
assert_eq!(
(
Timestamp::new_millisecond(0),
Timestamp::new_millisecond(199)
),
info.time_range
);
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_by_range(&["a", "d"], 0, 50),
new_batch_by_range(&["a", "d"], 50, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 150),
new_batch_by_range(&["b", "h"], 150, 200),
],
)
.await;
}
#[tokio::test]
async fn test_read_with_cache() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
let cache = CacheStrategy::EnableAll(Arc::new(
CacheManager::builder()
.page_cache_size(64 * 1024 * 1024)
.build(),
));
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
.cache(cache.clone());
for _ in 0..3 {
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_by_range(&["a", "d"], 0, 50),
new_batch_by_range(&["a", "d"], 50, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 150),
new_batch_by_range(&["b", "h"], 150, 200),
],
)
.await;
}
let page_key = PageKey::new_compressed(metadata.region_id, handle.file_id(), 0, 0);
assert!(cache.get_pages(&page_key).is_none());
for i in 0..4 {
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), i, 0);
assert!(cache.get_pages(&page_key).is_some());
}
let page_key = PageKey::new_uncompressed(metadata.region_id, handle.file_id(), 5, 0);
assert!(cache.get_pages(&page_key).is_none());
}
#[tokio::test]
async fn test_parquet_metadata_eq() {
let mut env = crate::test_util::TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
let sst_info = writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
let writer_metadata = sst_info.file_metadata.unwrap();
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let reader = builder.build().await.unwrap();
let reader_metadata = reader.parquet_metadata();
assert_parquet_metadata_eq(writer_metadata, reader_metadata)
}
#[tokio::test]
async fn test_read_with_tag_filter() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column::from_name("tag_0"))),
op: Operator::Eq,
right: Box::new(Expr::Literal(ScalarValue::Utf8(Some("a".to_string())))),
})]));
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
.predicate(predicate);
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_by_range(&["a", "d"], 0, 50),
new_batch_by_range(&["a", "d"], 50, 60),
],
)
.await;
}
#[tokio::test]
async fn test_read_empty_batch() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "z"], 0, 0),
new_batch_by_range(&["a", "z"], 100, 100),
new_batch_by_range(&["a", "z"], 200, 230),
]);
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["a", "z"], 200, 230)]).await;
}
#[tokio::test]
async fn test_read_with_field_filter() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let metadata = Arc::new(sst_region_metadata());
let source = new_source(&[
new_batch_by_range(&["a", "d"], 0, 60),
new_batch_by_range(&["b", "f"], 0, 40),
new_batch_by_range(&["b", "h"], 100, 200),
]);
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};
let mut writer = ParquetWriter::new_with_object_store(
object_store.clone(),
metadata.clone(),
NoopIndexBuilder,
FixedPathProvider {
file_id: handle.file_id(),
},
)
.await;
writer
.write_all(source, None, &write_opts)
.await
.unwrap()
.remove(0);
let predicate = Some(Predicate::new(vec![Expr::BinaryExpr(BinaryExpr {
left: Box::new(Expr::Column(Column::from_name("field_0"))),
op: Operator::GtEq,
right: Box::new(Expr::Literal(ScalarValue::UInt64(Some(150)))),
})]));
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store)
.predicate(predicate);
let mut reader = builder.build().await.unwrap();
check_reader_result(&mut reader, &[new_batch_by_range(&["b", "h"], 150, 200)]).await;
}
#[tokio::test]
async fn test_read_large_binary() {
let mut env = TestEnv::new();
let object_store = env.init_object_store_manager();
let handle = sst_file_handle(0, 1000);
let file_path = handle.file_path(FILE_DIR);
let write_opts = WriteOptions {
row_group_size: 50,
..Default::default()
};
let metadata = build_test_binary_test_region_metadata();
let json = metadata.to_json().unwrap();
let key_value_meta = KeyValue::new(PARQUET_METADATA_KEY.to_string(), json);
let props_builder = WriterProperties::builder()
.set_key_value_metadata(Some(vec![key_value_meta]))
.set_compression(Compression::ZSTD(ZstdLevel::default()))
.set_encoding(Encoding::PLAIN)
.set_max_row_group_size(write_opts.row_group_size);
let writer_props = props_builder.build();
let write_format = WriteFormat::new(metadata);
let fields: Vec<_> = write_format
.arrow_schema()
.fields()
.into_iter()
.map(|field| {
let data_type = field.data_type().clone();
if data_type == DataType::Binary {
Field::new(field.name(), DataType::LargeBinary, field.is_nullable())
} else {
Field::new(field.name(), data_type, field.is_nullable())
}
})
.collect();
let arrow_schema = Arc::new(Schema::new(fields));
assert_eq!(
&DataType::LargeBinary,
arrow_schema.field_with_name("field_0").unwrap().data_type()
);
let mut writer = AsyncArrowWriter::try_new(
object_store
.writer_with(&file_path)
.concurrent(DEFAULT_WRITE_CONCURRENCY)
.await
.map(|w| w.into_futures_async_write().compat_write())
.unwrap(),
arrow_schema.clone(),
Some(writer_props),
)
.unwrap();
let batch = new_batch_with_binary(&["a"], 0, 60);
let arrow_batch = write_format.convert_batch(&batch).unwrap();
let arrays: Vec<_> = arrow_batch
.columns()
.iter()
.map(|array| {
let data_type = array.data_type().clone();
if data_type == DataType::Binary {
arrow::compute::cast(array, &DataType::LargeBinary).unwrap()
} else {
array.clone()
}
})
.collect();
let result = RecordBatch::try_new(arrow_schema, arrays).unwrap();
writer.write(&result).await.unwrap();
writer.close().await.unwrap();
let builder = ParquetReaderBuilder::new(FILE_DIR.to_string(), handle.clone(), object_store);
let mut reader = builder.build().await.unwrap();
check_reader_result(
&mut reader,
&[
new_batch_with_binary(&["a"], 0, 50),
new_batch_with_binary(&["a"], 50, 60),
],
)
.await;
}
}