use std::fmt;
use std::sync::Arc;
use common_telemetry::{error, info};
use crate::access_layer::AccessLayerRef;
use crate::cache::file_cache::{FileType, IndexKey};
use crate::cache::CacheManagerRef;
use crate::schedule::scheduler::SchedulerRef;
use crate::sst::file::FileMeta;
#[derive(Debug)]
pub struct PurgeRequest {
pub file_meta: FileMeta,
}
pub trait FilePurger: Send + Sync + fmt::Debug {
fn send_request(&self, request: PurgeRequest);
}
pub type FilePurgerRef = Arc<dyn FilePurger>;
pub struct LocalFilePurger {
scheduler: SchedulerRef,
sst_layer: AccessLayerRef,
cache_manager: Option<CacheManagerRef>,
}
impl fmt::Debug for LocalFilePurger {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("LocalFilePurger")
.field("sst_layer", &self.sst_layer)
.finish()
}
}
impl LocalFilePurger {
pub fn new(
scheduler: SchedulerRef,
sst_layer: AccessLayerRef,
cache_manager: Option<CacheManagerRef>,
) -> Self {
Self {
scheduler,
sst_layer,
cache_manager,
}
}
}
impl FilePurger for LocalFilePurger {
fn send_request(&self, request: PurgeRequest) {
let file_meta = request.file_meta;
let sst_layer = self.sst_layer.clone();
if let Some(cache) = &self.cache_manager {
cache.remove_parquet_meta_data(file_meta.region_id, file_meta.file_id);
}
let cache_manager = self.cache_manager.clone();
if let Err(e) = self.scheduler.schedule(Box::pin(async move {
if let Err(e) = sst_layer.delete_sst(&file_meta).await {
error!(e; "Failed to delete SST file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id);
} else {
info!(
"Successfully deleted SST file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id
);
}
if let Some(write_cache) = cache_manager.as_ref().and_then(|cache| cache.write_cache())
{
if file_meta.exists_index() {
write_cache
.remove(IndexKey::new(
file_meta.region_id,
file_meta.file_id,
FileType::Puffin,
))
.await;
}
write_cache
.remove(IndexKey::new(
file_meta.region_id,
file_meta.file_id,
FileType::Parquet,
))
.await;
}
if let Err(e) = sst_layer
.puffin_manager_factory()
.purge_stager(file_meta.file_id)
.await
{
error!(e; "Failed to purge stager with index file, file_id: {}, region: {}",
file_meta.file_id, file_meta.region_id);
}
})) {
error!(e; "Failed to schedule the file purge request");
}
}
}
#[cfg(test)]
mod tests {
use std::num::NonZeroU64;
use common_test_util::temp_dir::create_temp_dir;
use object_store::services::Fs;
use object_store::ObjectStore;
use smallvec::SmallVec;
use super::*;
use crate::access_layer::AccessLayer;
use crate::schedule::scheduler::{LocalScheduler, Scheduler};
use crate::sst::file::{FileHandle, FileId, FileMeta, FileTimeRange, IndexType};
use crate::sst::index::intermediate::IntermediateManager;
use crate::sst::index::puffin_manager::PuffinManagerFactory;
use crate::sst::location;
#[tokio::test]
async fn test_file_purge() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("file-purge");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let sst_file_id = FileId::random();
let sst_dir = "table1";
let path = location::sst_file_path(sst_dir, sst_file_id);
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let object_store = ObjectStore::new(builder).unwrap().finish();
object_store.write(&path, vec![0; 4096]).await.unwrap();
let scheduler = Arc::new(LocalScheduler::new(3));
let layer = Arc::new(AccessLayer::new(
sst_dir,
object_store.clone(),
puffin_mgr,
intm_mgr,
));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
{
let handle = FileHandle::new(
FileMeta {
region_id: 0.into(),
file_id: sst_file_id,
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
available_indexes: Default::default(),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
},
file_purger,
);
handle.mark_deleted();
}
scheduler.stop(true).await.unwrap();
assert!(!object_store.exists(&path).await.unwrap());
}
#[tokio::test]
async fn test_file_purge_with_index() {
common_telemetry::init_default_ut_logging();
let dir = create_temp_dir("file-purge");
let dir_path = dir.path().display().to_string();
let builder = Fs::default().root(&dir_path);
let sst_file_id = FileId::random();
let sst_dir = "table1";
let index_aux_path = dir.path().join("index_aux");
let puffin_mgr = PuffinManagerFactory::new(&index_aux_path, 4096, None, None)
.await
.unwrap();
let intm_mgr = IntermediateManager::init_fs(index_aux_path.to_str().unwrap())
.await
.unwrap();
let path = location::sst_file_path(sst_dir, sst_file_id);
let object_store = ObjectStore::new(builder).unwrap().finish();
object_store.write(&path, vec![0; 4096]).await.unwrap();
let index_path = location::index_file_path(sst_dir, sst_file_id);
object_store
.write(&index_path, vec![0; 4096])
.await
.unwrap();
let scheduler = Arc::new(LocalScheduler::new(3));
let layer = Arc::new(AccessLayer::new(
sst_dir,
object_store.clone(),
puffin_mgr,
intm_mgr,
));
let file_purger = Arc::new(LocalFilePurger::new(scheduler.clone(), layer, None));
{
let handle = FileHandle::new(
FileMeta {
region_id: 0.into(),
file_id: sst_file_id,
time_range: FileTimeRange::default(),
level: 0,
file_size: 4096,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
index_file_size: 4096,
num_rows: 1024,
num_row_groups: 1,
sequence: NonZeroU64::new(4096),
},
file_purger,
);
handle.mark_deleted();
}
scheduler.stop(true).await.unwrap();
assert!(!object_store.exists(&path).await.unwrap());
assert!(!object_store.exists(&index_path).await.unwrap());
}
}