query/query_engine/
context.rs1use std::sync::Arc;
16
17use common_query::logical_plan::SubstraitPlanDecoderRef;
18use common_telemetry::tracing_context::TracingContext;
19use datafusion::execution::context::{SessionState, TaskContext};
20use session::context::QueryContextRef;
21
22use crate::query_engine::default_serializer::DefaultPlanDecoder;
23
24#[derive(Debug)]
25pub struct QueryEngineContext {
26 state: SessionState,
27 query_ctx: QueryContextRef,
28}
29
30impl QueryEngineContext {
31 pub fn new(state: SessionState, query_ctx: QueryContextRef) -> Self {
32 Self { state, query_ctx }
33 }
34
35 #[inline]
36 pub fn state(&self) -> &SessionState {
37 &self.state
38 }
39
40 #[inline]
41 pub fn query_ctx(&self) -> QueryContextRef {
42 self.query_ctx.clone()
43 }
44
45 pub fn build_task_ctx(&self) -> Arc<TaskContext> {
46 let dbname = self.query_ctx.get_db_string();
47 let state = &self.state;
48 let tracing_context = TracingContext::from_current_span();
49
50 let session_id = tracing_context.to_json();
52
53 Arc::new(TaskContext::new(
54 Some(dbname),
55 session_id,
56 state.config().clone(),
57 state.scalar_functions().clone(),
58 state.aggregate_functions().clone(),
59 state.window_functions().clone(),
60 state.runtime_env().clone(),
61 ))
62 }
63
64 pub fn new_plan_decoder(&self) -> crate::error::Result<SubstraitPlanDecoderRef> {
66 Ok(Arc::new(DefaultPlanDecoder::new(
67 self.state.clone(),
68 &self.query_ctx,
69 )?))
70 }
71
72 #[cfg(test)]
74 pub fn mock() -> Self {
75 use common_base::Plugins;
76 use session::context::QueryContext;
77
78 use crate::options::QueryOptions;
79 use crate::query_engine::QueryEngineState;
80
81 let state = Arc::new(QueryEngineState::new(
82 catalog::memory::new_memory_catalog_manager().unwrap(),
83 None,
84 None,
85 None,
86 None,
87 false,
88 Plugins::default(),
89 QueryOptions::default(),
90 ));
91
92 QueryEngineContext::new(state.session_state(), QueryContext::arc())
93 }
94}