use common_telemetry::{debug, warn};
use puffin::puffin_manager::{PuffinManager, PuffinWriter};
use crate::sst::index::bloom_filter::creator::BloomFilterIndexer;
use crate::sst::index::fulltext_index::creator::FulltextIndexer;
use crate::sst::index::inverted_index::creator::InvertedIndexer;
use crate::sst::index::puffin_manager::SstPuffinWriter;
use crate::sst::index::statistics::{ByteCount, RowCount};
use crate::sst::index::{
BloomFilterOutput, FulltextIndexOutput, IndexOutput, Indexer, InvertedIndexOutput,
};
impl Indexer {
pub(crate) async fn do_finish(&mut self) -> IndexOutput {
let mut output = IndexOutput::default();
let Some(mut writer) = self.build_puffin_writer().await else {
self.do_abort().await;
return output;
};
let success = self
.do_finish_inverted_index(&mut writer, &mut output)
.await;
if !success {
self.do_abort().await;
return IndexOutput::default();
}
let success = self
.do_finish_fulltext_index(&mut writer, &mut output)
.await;
if !success {
self.do_abort().await;
return IndexOutput::default();
}
let success = self.do_finish_bloom_filter(&mut writer, &mut output).await;
if !success {
self.do_abort().await;
return IndexOutput::default();
}
output.file_size = self.do_finish_puffin_writer(writer).await;
output
}
async fn build_puffin_writer(&mut self) -> Option<SstPuffinWriter> {
let puffin_manager = self.puffin_manager.take()?;
let err = match puffin_manager.writer(&self.file_id).await {
Ok(writer) => return Some(writer),
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to create puffin writer, region_id: {}, file_id: {}, err: {:?}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to create puffin writer, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
None
}
async fn do_finish_puffin_writer(&mut self, writer: SstPuffinWriter) -> ByteCount {
let err = match writer.finish().await {
Ok(size) => return size,
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to finish puffin writer, region_id: {}, file_id: {}, err: {:?}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to finish puffin writer, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
0
}
async fn do_finish_inverted_index(
&mut self,
puffin_writer: &mut SstPuffinWriter,
index_output: &mut IndexOutput,
) -> bool {
let Some(mut indexer) = self.inverted_indexer.take() else {
return true;
};
let err = match indexer.finish(puffin_writer).await {
Ok((row_count, byte_count)) => {
self.fill_inverted_index_output(
&mut index_output.inverted_index,
row_count,
byte_count,
&indexer,
);
return true;
}
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to finish inverted index, region_id: {}, file_id: {}, err: {:?}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to finish inverted index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
false
}
async fn do_finish_fulltext_index(
&mut self,
puffin_writer: &mut SstPuffinWriter,
index_output: &mut IndexOutput,
) -> bool {
let Some(mut indexer) = self.fulltext_indexer.take() else {
return true;
};
let err = match indexer.finish(puffin_writer).await {
Ok((row_count, byte_count)) => {
self.fill_fulltext_index_output(
&mut index_output.fulltext_index,
row_count,
byte_count,
&indexer,
);
return true;
}
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to finish full-text index, region_id: {}, file_id: {}, err: {:?}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to finish full-text index, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
false
}
async fn do_finish_bloom_filter(
&mut self,
puffin_writer: &mut SstPuffinWriter,
index_output: &mut IndexOutput,
) -> bool {
let Some(mut indexer) = self.bloom_filter_indexer.take() else {
return true;
};
let err = match indexer.finish(puffin_writer).await {
Ok((row_count, byte_count)) => {
self.fill_bloom_filter_output(
&mut index_output.bloom_filter,
row_count,
byte_count,
&indexer,
);
return true;
}
Err(err) => err,
};
if cfg!(any(test, feature = "test")) {
panic!(
"Failed to finish bloom filter, region_id: {}, file_id: {}, err: {:?}",
self.region_id, self.file_id, err
);
} else {
warn!(
err; "Failed to finish bloom filter, region_id: {}, file_id: {}",
self.region_id, self.file_id,
);
}
false
}
fn fill_inverted_index_output(
&mut self,
output: &mut InvertedIndexOutput,
row_count: RowCount,
byte_count: ByteCount,
indexer: &InvertedIndexer,
) {
debug!(
"Inverted index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
self.region_id, self.file_id, byte_count, row_count
);
output.index_size = byte_count;
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
}
fn fill_fulltext_index_output(
&mut self,
output: &mut FulltextIndexOutput,
row_count: RowCount,
byte_count: ByteCount,
indexer: &FulltextIndexer,
) {
debug!(
"Full-text index created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
self.region_id, self.file_id, byte_count, row_count
);
output.index_size = byte_count;
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
}
fn fill_bloom_filter_output(
&mut self,
output: &mut BloomFilterOutput,
row_count: RowCount,
byte_count: ByteCount,
indexer: &BloomFilterIndexer,
) {
debug!(
"Bloom filter created, region_id: {}, file_id: {}, written_bytes: {}, written_rows: {}",
self.region_id, self.file_id, byte_count, row_count
);
output.index_size = byte_count;
output.row_count = row_count;
output.columns = indexer.column_ids().collect();
}
}