session/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod context;
16pub mod hints;
17pub mod session_config;
18pub mod table_name;
19
20use std::collections::HashMap;
21use std::net::SocketAddr;
22use std::sync::{Arc, RwLock};
23use std::time::Duration;
24
25use auth::UserInfoRef;
26use common_catalog::build_db_string;
27use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
28use common_recordbatch::cursor::RecordBatchStreamCursor;
29pub use common_session::ReadPreference;
30use common_time::timezone::get_timezone;
31use common_time::Timezone;
32use context::{ConfigurationVariables, QueryContextBuilder};
33use derive_more::Debug;
34
35use crate::context::{Channel, ConnInfo, QueryContextRef};
36
37/// Session for persistent connection such as MySQL, PostgreSQL etc.
38#[derive(Debug)]
39pub struct Session {
40    catalog: RwLock<String>,
41    mutable_inner: Arc<RwLock<MutableInner>>,
42    conn_info: ConnInfo,
43    configuration_variables: Arc<ConfigurationVariables>,
44    // the process id to use when killing the query
45    process_id: u32,
46}
47
48pub type SessionRef = Arc<Session>;
49
50/// A container for mutable items in query context
51#[derive(Debug)]
52pub(crate) struct MutableInner {
53    schema: String,
54    user_info: UserInfoRef,
55    timezone: Timezone,
56    query_timeout: Option<Duration>,
57    read_preference: ReadPreference,
58    #[debug(skip)]
59    pub(crate) cursors: HashMap<String, Arc<RecordBatchStreamCursor>>,
60}
61
62impl Default for MutableInner {
63    fn default() -> Self {
64        Self {
65            schema: DEFAULT_SCHEMA_NAME.into(),
66            user_info: auth::userinfo_by_name(None),
67            timezone: get_timezone(None).clone(),
68            query_timeout: None,
69            read_preference: ReadPreference::Leader,
70            cursors: HashMap::with_capacity(0),
71        }
72    }
73}
74
75impl Session {
76    pub fn new(
77        addr: Option<SocketAddr>,
78        channel: Channel,
79        configuration_variables: ConfigurationVariables,
80        process_id: u32,
81    ) -> Self {
82        Session {
83            catalog: RwLock::new(DEFAULT_CATALOG_NAME.into()),
84            conn_info: ConnInfo::new(addr, channel),
85            configuration_variables: Arc::new(configuration_variables),
86            mutable_inner: Arc::new(RwLock::new(MutableInner::default())),
87            process_id,
88        }
89    }
90
91    pub fn new_query_context(&self) -> QueryContextRef {
92        QueryContextBuilder::default()
93            // catalog is not allowed for update in query context so we use
94            // string here
95            .current_catalog(self.catalog.read().unwrap().clone())
96            .mutable_session_data(self.mutable_inner.clone())
97            .sql_dialect(self.conn_info.channel.dialect())
98            .configuration_parameter(self.configuration_variables.clone())
99            .channel(self.conn_info.channel)
100            .process_id(self.process_id)
101            .conn_info(self.conn_info.clone())
102            .build()
103            .into()
104    }
105
106    pub fn conn_info(&self) -> &ConnInfo {
107        &self.conn_info
108    }
109
110    pub fn timezone(&self) -> Timezone {
111        self.mutable_inner.read().unwrap().timezone.clone()
112    }
113
114    pub fn read_preference(&self) -> ReadPreference {
115        self.mutable_inner.read().unwrap().read_preference
116    }
117
118    pub fn set_timezone(&self, tz: Timezone) {
119        let mut inner = self.mutable_inner.write().unwrap();
120        inner.timezone = tz;
121    }
122
123    pub fn set_read_preference(&self, read_preference: ReadPreference) {
124        self.mutable_inner.write().unwrap().read_preference = read_preference;
125    }
126
127    pub fn user_info(&self) -> UserInfoRef {
128        self.mutable_inner.read().unwrap().user_info.clone()
129    }
130
131    pub fn set_user_info(&self, user_info: UserInfoRef) {
132        self.mutable_inner.write().unwrap().user_info = user_info;
133    }
134
135    pub fn set_catalog(&self, catalog: String) {
136        *self.catalog.write().unwrap() = catalog;
137    }
138
139    pub fn catalog(&self) -> String {
140        self.catalog.read().unwrap().clone()
141    }
142
143    pub fn schema(&self) -> String {
144        self.mutable_inner.read().unwrap().schema.clone()
145    }
146
147    pub fn set_schema(&self, schema: String) {
148        self.mutable_inner.write().unwrap().schema = schema;
149    }
150
151    pub fn get_db_string(&self) -> String {
152        build_db_string(&self.catalog(), &self.schema())
153    }
154
155    pub fn process_id(&self) -> u32 {
156        self.process_id
157    }
158}