common_datasource/
compression.rsuse std::fmt::Display;
use std::io;
use std::str::FromStr;
use async_compression::tokio::bufread::{BzDecoder, GzipDecoder, XzDecoder, ZstdDecoder};
use async_compression::tokio::write;
use bytes::Bytes;
use futures::Stream;
use serde::{Deserialize, Serialize};
use strum::EnumIter;
use tokio::io::{AsyncRead, AsyncWriteExt, BufReader};
use tokio_util::io::{ReaderStream, StreamReader};
use crate::error::{self, Error, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, EnumIter, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum CompressionType {
Gzip,
Bzip2,
Xz,
Zstd,
Uncompressed,
}
impl FromStr for CompressionType {
type Err = Error;
fn from_str(s: &str) -> Result<Self> {
let s = s.to_uppercase();
match s.as_str() {
"GZIP" | "GZ" => Ok(Self::Gzip),
"BZIP2" | "BZ2" => Ok(Self::Bzip2),
"XZ" => Ok(Self::Xz),
"ZST" | "ZSTD" => Ok(Self::Zstd),
"" => Ok(Self::Uncompressed),
_ => error::UnsupportedCompressionTypeSnafu {
compression_type: s,
}
.fail(),
}
}
}
impl Display for CompressionType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
Self::Gzip => "GZIP",
Self::Bzip2 => "BZIP2",
Self::Xz => "XZ",
Self::Zstd => "ZSTD",
Self::Uncompressed => "",
})
}
}
impl CompressionType {
pub const fn is_compressed(&self) -> bool {
!matches!(self, &Self::Uncompressed)
}
pub const fn file_extension(&self) -> &'static str {
match self {
Self::Gzip => "gz",
Self::Bzip2 => "bz2",
Self::Xz => "xz",
Self::Zstd => "zst",
Self::Uncompressed => "",
}
}
}
macro_rules! impl_compression_type {
($(($enum_item:ident, $prefix:ident)),*) => {
paste::item! {
use bytes::{Buf, BufMut, BytesMut};
impl CompressionType {
pub async fn encode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
match self {
$(
CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.remaining());
let mut encoder = write::[<$prefix Encoder>]::new(&mut buffer);
encoder.write_all_buf(&mut content).await?;
encoder.shutdown().await?;
Ok(buffer)
}
)*
CompressionType::Uncompressed => {
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
}
}
pub async fn decode<B: Buf>(&self, mut content: B) -> io::Result<Vec<u8>> {
match self {
$(
CompressionType::$enum_item => {
let mut buffer = Vec::with_capacity(content.remaining() * 2);
let mut encoder = write::[<$prefix Decoder>]::new(&mut buffer);
encoder.write_all_buf(&mut content).await?;
encoder.shutdown().await?;
Ok(buffer)
}
)*
CompressionType::Uncompressed => {
let mut bs = BytesMut::with_capacity(content.remaining());
bs.put(content);
Ok(bs.to_vec())
},
}
}
pub fn convert_async_read<T: AsyncRead + Unpin + Send + 'static>(
&self,
s: T,
) -> Box<dyn AsyncRead + Unpin + Send> {
match self {
$(CompressionType::$enum_item => Box::new([<$prefix Decoder>]::new(BufReader::new(s))),)*
CompressionType::Uncompressed => Box::new(s),
}
}
pub fn convert_stream<T: Stream<Item = io::Result<Bytes>> + Unpin + Send + 'static>(
&self,
s: T,
) -> Box<dyn Stream<Item = io::Result<Bytes>> + Send + Unpin> {
match self {
$(CompressionType::$enum_item => Box::new(ReaderStream::new([<$prefix Decoder>]::new(StreamReader::new(s)))),)*
CompressionType::Uncompressed => Box::new(s),
}
}
}
#[cfg(test)]
mod tests {
use super::CompressionType;
$(
#[tokio::test]
async fn [<test_ $enum_item:lower _compression>]() {
let string = "foo_bar".as_bytes();
let compress = CompressionType::$enum_item
.encode(string)
.await
.unwrap();
let decompress = CompressionType::$enum_item
.decode(compress.as_slice())
.await
.unwrap();
assert_eq!(decompress, string);
})*
#[tokio::test]
async fn test_uncompression() {
let string = "foo_bar".as_bytes();
let compress = CompressionType::Uncompressed
.encode(string)
.await
.unwrap();
let decompress = CompressionType::Uncompressed
.decode(compress.as_slice())
.await
.unwrap();
assert_eq!(decompress, string);
}
}
}
};
}
impl_compression_type!((Gzip, Gzip), (Bzip2, Bz), (Xz, Xz), (Zstd, Zstd));