use std::fmt;
use std::num::NonZeroU64;
use std::str::FromStr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
use common_time::Timestamp;
use serde::{Deserialize, Serialize};
use smallvec::SmallVec;
use snafu::{ResultExt, Snafu};
use store_api::storage::RegionId;
use uuid::Uuid;
use crate::sst::file_purger::{FilePurgerRef, PurgeRequest};
use crate::sst::location;
pub type Level = u8;
pub const MAX_LEVEL: Level = 2;
#[derive(Debug, Snafu, PartialEq)]
pub struct ParseIdError {
source: uuid::Error,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
pub struct FileId(Uuid);
impl FileId {
pub fn random() -> FileId {
FileId(Uuid::new_v4())
}
pub fn parse_str(input: &str) -> std::result::Result<FileId, ParseIdError> {
Uuid::parse_str(input).map(FileId).context(ParseIdSnafu)
}
pub fn as_parquet(&self) -> String {
format!("{}{}", self, ".parquet")
}
pub fn as_puffin(&self) -> String {
format!("{}{}", self, ".puffin")
}
pub fn as_bytes(&self) -> &[u8] {
self.0.as_bytes()
}
}
impl From<FileId> for Uuid {
fn from(value: FileId) -> Self {
value.0
}
}
impl fmt::Display for FileId {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl FromStr for FileId {
type Err = ParseIdError;
fn from_str(s: &str) -> std::result::Result<FileId, ParseIdError> {
FileId::parse_str(s)
}
}
pub type FileTimeRange = (Timestamp, Timestamp);
pub(crate) fn overlaps(l: &FileTimeRange, r: &FileTimeRange) -> bool {
let (l, r) = if l.0 <= r.0 { (l, r) } else { (r, l) };
let (_, l_end) = l;
let (r_start, _) = r;
r_start <= l_end
}
#[derive(Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
#[serde(default)]
pub struct FileMeta {
pub region_id: RegionId,
pub file_id: FileId,
pub time_range: FileTimeRange,
pub level: Level,
pub file_size: u64,
pub available_indexes: SmallVec<[IndexType; 4]>,
pub index_file_size: u64,
pub num_rows: u64,
pub num_row_groups: u64,
pub sequence: Option<NonZeroU64>,
}
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub enum IndexType {
InvertedIndex,
FulltextIndex,
BloomFilterIndex,
}
impl FileMeta {
pub fn exists_index(&self) -> bool {
!self.available_indexes.is_empty()
}
pub fn inverted_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::InvertedIndex)
}
pub fn fulltext_index_available(&self) -> bool {
self.available_indexes.contains(&IndexType::FulltextIndex)
}
pub fn bloom_filter_index_available(&self) -> bool {
self.available_indexes
.contains(&IndexType::BloomFilterIndex)
}
pub fn index_file_size(&self) -> u64 {
self.index_file_size
}
}
#[derive(Clone)]
pub struct FileHandle {
inner: Arc<FileHandleInner>,
}
impl fmt::Debug for FileHandle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("FileHandle")
.field("region_id", &self.inner.meta.region_id)
.field("file_id", &self.inner.meta.file_id)
.field("time_range", &self.inner.meta.time_range)
.field("size", &self.inner.meta.file_size)
.field("level", &self.inner.meta.level)
.field("compacting", &self.inner.compacting)
.field("deleted", &self.inner.deleted)
.finish()
}
}
impl FileHandle {
pub fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandle {
FileHandle {
inner: Arc::new(FileHandleInner::new(meta, file_purger)),
}
}
pub fn region_id(&self) -> RegionId {
self.inner.meta.region_id
}
pub fn file_id(&self) -> FileId {
self.inner.meta.file_id
}
pub fn file_path(&self, file_dir: &str) -> String {
location::sst_file_path(file_dir, self.file_id())
}
pub fn time_range(&self) -> FileTimeRange {
self.inner.meta.time_range
}
pub fn mark_deleted(&self) {
self.inner.deleted.store(true, Ordering::Relaxed);
}
pub fn compacting(&self) -> bool {
self.inner.compacting.load(Ordering::Relaxed)
}
pub fn set_compacting(&self, compacting: bool) {
self.inner.compacting.store(compacting, Ordering::Relaxed);
}
pub fn meta_ref(&self) -> &FileMeta {
&self.inner.meta
}
pub fn size(&self) -> u64 {
self.inner.meta.file_size
}
pub fn index_size(&self) -> u64 {
self.inner.meta.index_file_size
}
pub fn num_rows(&self) -> usize {
self.inner.meta.num_rows as usize
}
}
struct FileHandleInner {
meta: FileMeta,
compacting: AtomicBool,
deleted: AtomicBool,
file_purger: FilePurgerRef,
}
impl Drop for FileHandleInner {
fn drop(&mut self) {
if self.deleted.load(Ordering::Relaxed) {
self.file_purger.send_request(PurgeRequest {
file_meta: self.meta.clone(),
});
}
}
}
impl FileHandleInner {
fn new(meta: FileMeta, file_purger: FilePurgerRef) -> FileHandleInner {
FileHandleInner {
meta,
compacting: AtomicBool::new(false),
deleted: AtomicBool::new(false),
file_purger,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_file_id() {
let id = FileId::random();
let uuid_str = id.to_string();
assert_eq!(id.0.to_string(), uuid_str);
let parsed = FileId::parse_str(&uuid_str).unwrap();
assert_eq!(id, parsed);
let parsed = uuid_str.parse().unwrap();
assert_eq!(id, parsed);
}
#[test]
fn test_file_id_serialization() {
let id = FileId::random();
let json = serde_json::to_string(&id).unwrap();
assert_eq!(format!("\"{id}\""), json);
let parsed = serde_json::from_str(&json).unwrap();
assert_eq!(id, parsed);
}
#[test]
fn test_file_id_as_parquet() {
let id = FileId::from_str("67e55044-10b1-426f-9247-bb680e5fe0c8").unwrap();
assert_eq!(
"67e55044-10b1-426f-9247-bb680e5fe0c8.parquet",
id.as_parquet()
);
}
fn create_file_meta(file_id: FileId, level: Level) -> FileMeta {
FileMeta {
region_id: 0.into(),
file_id,
time_range: FileTimeRange::default(),
level,
file_size: 0,
available_indexes: SmallVec::from_iter([IndexType::InvertedIndex]),
index_file_size: 0,
num_rows: 0,
num_row_groups: 0,
sequence: None,
}
}
#[test]
fn test_deserialize_file_meta() {
let file_meta = create_file_meta(FileId::random(), 0);
let serialized_file_meta = serde_json::to_string(&file_meta).unwrap();
let deserialized_file_meta = serde_json::from_str(&serialized_file_meta);
assert_eq!(file_meta, deserialized_file_meta.unwrap());
}
#[test]
fn test_deserialize_from_string() {
let json_file_meta = "{\"region_id\":0,\"file_id\":\"bc5896ec-e4d8-4017-a80d-f2de73188d55\",\
\"time_range\":[{\"value\":0,\"unit\":\"Millisecond\"},{\"value\":0,\"unit\":\"Millisecond\"}],\
\"available_indexes\":[\"InvertedIndex\"],\"level\":0}";
let file_meta = create_file_meta(
FileId::from_str("bc5896ec-e4d8-4017-a80d-f2de73188d55").unwrap(),
0,
);
let deserialized_file_meta: FileMeta = serde_json::from_str(json_file_meta).unwrap();
assert_eq!(file_meta, deserialized_file_meta);
}
}