query/query_engine/
context.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::sync::Arc;
16
17use common_query::logical_plan::SubstraitPlanDecoderRef;
18use common_telemetry::tracing_context::TracingContext;
19use datafusion::execution::context::{SessionState, TaskContext};
20use session::context::QueryContextRef;
21
22use crate::query_engine::default_serializer::DefaultPlanDecoder;
23
24#[derive(Debug)]
25pub struct QueryEngineContext {
26    state: SessionState,
27    query_ctx: QueryContextRef,
28}
29
30impl QueryEngineContext {
31    pub fn new(state: SessionState, query_ctx: QueryContextRef) -> Self {
32        Self { state, query_ctx }
33    }
34
35    #[inline]
36    pub fn state(&self) -> &SessionState {
37        &self.state
38    }
39
40    #[inline]
41    pub fn query_ctx(&self) -> QueryContextRef {
42        self.query_ctx.clone()
43    }
44
45    pub fn build_task_ctx(&self) -> Arc<TaskContext> {
46        let dbname = self.query_ctx.get_db_string();
47        let state = &self.state;
48        let tracing_context = TracingContext::from_current_span();
49
50        // pass tracing context in session_id
51        let session_id = tracing_context.to_json();
52
53        Arc::new(TaskContext::new(
54            Some(dbname),
55            session_id,
56            state.config().clone(),
57            state.scalar_functions().clone(),
58            state.aggregate_functions().clone(),
59            state.window_functions().clone(),
60            state.runtime_env().clone(),
61        ))
62    }
63
64    /// Creates a [`LogicalPlan`] decoder
65    pub fn new_plan_decoder(&self) -> crate::error::Result<SubstraitPlanDecoderRef> {
66        Ok(Arc::new(DefaultPlanDecoder::new(
67            self.state.clone(),
68            &self.query_ctx,
69        )?))
70    }
71
72    /// Mock an engine context for unit tests.
73    #[cfg(test)]
74    pub fn mock() -> Self {
75        use common_base::Plugins;
76        use session::context::QueryContext;
77
78        use crate::options::QueryOptions;
79        use crate::query_engine::QueryEngineState;
80
81        let state = Arc::new(QueryEngineState::new(
82            catalog::memory::new_memory_catalog_manager().unwrap(),
83            None,
84            None,
85            None,
86            None,
87            false,
88            Plugins::default(),
89            QueryOptions::default(),
90        ));
91
92        QueryEngineContext::new(state.session_state(), QueryContext::arc())
93    }
94}