use std::any::Any;
use std::fmt;
use std::sync::{Arc, Mutex};
use api::v1::SemanticType;
use async_trait::async_trait;
use common_recordbatch::filter::SimpleFilterEvaluator;
use common_recordbatch::OrderOption;
use datafusion::catalog::{CatalogProvider, CatalogProviderList, SchemaProvider, Session};
use datafusion::datasource::TableProvider;
use datafusion::physical_plan::ExecutionPlan;
use datafusion_common::DataFusionError;
use datafusion_expr::{Expr, TableProviderFilterPushDown, TableType};
use datatypes::arrow::datatypes::SchemaRef;
use snafu::ResultExt;
use store_api::metadata::RegionMetadataRef;
use store_api::region_engine::RegionEngineRef;
use store_api::storage::{RegionId, ScanRequest, TimeSeriesRowSelector};
use table::table::scan::RegionScanExec;
use crate::error::{GetRegionMetadataSnafu, Result};
#[derive(Clone, Debug)]
pub struct DummyCatalogList {
catalog: DummyCatalogProvider,
}
impl DummyCatalogList {
pub fn with_table_provider(table_provider: Arc<dyn TableProvider>) -> Self {
let schema_provider = DummySchemaProvider {
table: table_provider,
};
let catalog_provider = DummyCatalogProvider {
schema: schema_provider,
};
Self {
catalog: catalog_provider,
}
}
}
impl CatalogProviderList for DummyCatalogList {
fn as_any(&self) -> &dyn Any {
self
}
fn register_catalog(
&self,
_name: String,
_catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
None
}
fn catalog_names(&self) -> Vec<String> {
vec![]
}
fn catalog(&self, _name: &str) -> Option<Arc<dyn CatalogProvider>> {
Some(Arc::new(self.catalog.clone()))
}
}
#[derive(Clone, Debug)]
struct DummyCatalogProvider {
schema: DummySchemaProvider,
}
impl CatalogProvider for DummyCatalogProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema_names(&self) -> Vec<String> {
vec![]
}
fn schema(&self, _name: &str) -> Option<Arc<dyn SchemaProvider>> {
Some(Arc::new(self.schema.clone()))
}
}
#[derive(Clone, Debug)]
struct DummySchemaProvider {
table: Arc<dyn TableProvider>,
}
#[async_trait]
impl SchemaProvider for DummySchemaProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn table_names(&self) -> Vec<String> {
vec![]
}
async fn table(
&self,
_name: &str,
) -> datafusion::error::Result<Option<Arc<dyn TableProvider>>> {
Ok(Some(self.table.clone()))
}
fn table_exist(&self, _name: &str) -> bool {
true
}
}
#[derive(Clone)]
pub struct DummyTableProvider {
region_id: RegionId,
engine: RegionEngineRef,
metadata: RegionMetadataRef,
scan_request: Arc<Mutex<ScanRequest>>,
}
impl fmt::Debug for DummyTableProvider {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("DummyTableProvider")
.field("region_id", &self.region_id)
.field("metadata", &self.metadata)
.field("scan_request", &self.scan_request)
.finish()
}
}
#[async_trait]
impl TableProvider for DummyTableProvider {
fn as_any(&self) -> &dyn Any {
self
}
fn schema(&self) -> SchemaRef {
self.metadata.schema.arrow_schema().clone()
}
fn table_type(&self) -> TableType {
TableType::Base
}
async fn scan(
&self,
_state: &dyn Session,
projection: Option<&Vec<usize>>,
filters: &[Expr],
limit: Option<usize>,
) -> datafusion::error::Result<Arc<dyn ExecutionPlan>> {
let mut request = self.scan_request.lock().unwrap().clone();
request.projection = projection.cloned();
request.filters = filters.to_vec();
request.limit = limit;
let scanner = self
.engine
.handle_query(self.region_id, request)
.await
.map_err(|e| DataFusionError::External(Box::new(e)))?;
Ok(Arc::new(RegionScanExec::new(scanner)))
}
fn supports_filters_pushdown(
&self,
filters: &[&Expr],
) -> datafusion::error::Result<Vec<TableProviderFilterPushDown>> {
let supported = filters
.iter()
.map(|e| {
if let Some(simple_filter) = SimpleFilterEvaluator::try_new(e) {
if self
.metadata
.column_by_name(simple_filter.column_name())
.and_then(|c| (c.semantic_type == SemanticType::Tag).then_some(()))
.is_some()
{
TableProviderFilterPushDown::Exact
} else {
TableProviderFilterPushDown::Inexact
}
} else {
TableProviderFilterPushDown::Inexact
}
})
.collect();
Ok(supported)
}
}
impl DummyTableProvider {
pub fn new(region_id: RegionId, engine: RegionEngineRef, metadata: RegionMetadataRef) -> Self {
Self {
region_id,
engine,
metadata,
scan_request: Default::default(),
}
}
pub fn region_metadata(&self) -> RegionMetadataRef {
self.metadata.clone()
}
pub fn with_ordering_hint(&self, order_opts: &[OrderOption]) {
self.scan_request.lock().unwrap().output_ordering = Some(order_opts.to_vec());
}
pub fn with_time_series_selector_hint(&self, selector: TimeSeriesRowSelector) {
self.scan_request.lock().unwrap().series_row_selector = Some(selector);
}
pub fn with_sequence(&self, sequence: u64) {
self.scan_request.lock().unwrap().sequence = Some(sequence);
}
#[cfg(test)]
pub fn scan_request(&self) -> ScanRequest {
self.scan_request.lock().unwrap().clone()
}
}
pub struct DummyTableProviderFactory;
#[async_trait]
impl TableProviderFactory for DummyTableProviderFactory {
async fn create(
&self,
region_id: RegionId,
engine: RegionEngineRef,
ctx: Option<&session::context::QueryContext>,
) -> Result<Arc<dyn TableProvider>> {
let metadata =
engine
.get_metadata(region_id)
.await
.with_context(|_| GetRegionMetadataSnafu {
engine: engine.name(),
region_id,
})?;
let scan_request = ctx
.and_then(|c| c.get_snapshot(region_id.as_u64()))
.map(|seq| ScanRequest {
sequence: Some(seq),
..Default::default()
})
.unwrap_or_default();
Ok(Arc::new(DummyTableProvider {
region_id,
engine,
metadata,
scan_request: Arc::new(Mutex::new(scan_request)),
}))
}
}
#[async_trait]
pub trait TableProviderFactory: Send + Sync {
async fn create(
&self,
region_id: RegionId,
engine: RegionEngineRef,
ctx: Option<&session::context::QueryContext>,
) -> Result<Arc<dyn TableProvider>>;
}
pub type TableProviderFactoryRef = Arc<dyn TableProviderFactory>;