session/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod context;
16pub mod hints;
17pub mod protocol_ctx;
18pub mod session_config;
19pub mod table_name;
20
21use std::collections::HashMap;
22use std::net::SocketAddr;
23use std::sync::{Arc, RwLock};
24use std::time::Duration;
25
26use auth::UserInfoRef;
27use common_catalog::build_db_string;
28use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
29use common_recordbatch::cursor::RecordBatchStreamCursor;
30pub use common_session::ReadPreference;
31use common_time::timezone::get_timezone;
32use common_time::Timezone;
33use context::{ConfigurationVariables, QueryContextBuilder};
34use derive_more::Debug;
35
36use crate::context::{Channel, ConnInfo, QueryContextRef};
37
38/// Session for persistent connection such as MySQL, PostgreSQL etc.
39#[derive(Debug)]
40pub struct Session {
41    catalog: RwLock<String>,
42    mutable_inner: Arc<RwLock<MutableInner>>,
43    conn_info: ConnInfo,
44    configuration_variables: Arc<ConfigurationVariables>,
45    // the process id to use when killing the query
46    process_id: u32,
47}
48
49pub type SessionRef = Arc<Session>;
50
51/// A container for mutable items in query context
52#[derive(Debug)]
53pub(crate) struct MutableInner {
54    schema: String,
55    user_info: UserInfoRef,
56    timezone: Timezone,
57    query_timeout: Option<Duration>,
58    read_preference: ReadPreference,
59    #[debug(skip)]
60    pub(crate) cursors: HashMap<String, Arc<RecordBatchStreamCursor>>,
61}
62
63impl Default for MutableInner {
64    fn default() -> Self {
65        Self {
66            schema: DEFAULT_SCHEMA_NAME.into(),
67            user_info: auth::userinfo_by_name(None),
68            timezone: get_timezone(None).clone(),
69            query_timeout: None,
70            read_preference: ReadPreference::Leader,
71            cursors: HashMap::with_capacity(0),
72        }
73    }
74}
75
76impl Session {
77    pub fn new(
78        addr: Option<SocketAddr>,
79        channel: Channel,
80        configuration_variables: ConfigurationVariables,
81        process_id: u32,
82    ) -> Self {
83        Session {
84            catalog: RwLock::new(DEFAULT_CATALOG_NAME.into()),
85            conn_info: ConnInfo::new(addr, channel),
86            configuration_variables: Arc::new(configuration_variables),
87            mutable_inner: Arc::new(RwLock::new(MutableInner::default())),
88            process_id,
89        }
90    }
91
92    pub fn new_query_context(&self) -> QueryContextRef {
93        QueryContextBuilder::default()
94            // catalog is not allowed for update in query context so we use
95            // string here
96            .current_catalog(self.catalog.read().unwrap().clone())
97            .mutable_session_data(self.mutable_inner.clone())
98            .sql_dialect(self.conn_info.channel.dialect())
99            .configuration_parameter(self.configuration_variables.clone())
100            .channel(self.conn_info.channel)
101            .process_id(self.process_id)
102            .conn_info(self.conn_info.clone())
103            .build()
104            .into()
105    }
106
107    pub fn conn_info(&self) -> &ConnInfo {
108        &self.conn_info
109    }
110
111    pub fn timezone(&self) -> Timezone {
112        self.mutable_inner.read().unwrap().timezone.clone()
113    }
114
115    pub fn read_preference(&self) -> ReadPreference {
116        self.mutable_inner.read().unwrap().read_preference
117    }
118
119    pub fn set_timezone(&self, tz: Timezone) {
120        let mut inner = self.mutable_inner.write().unwrap();
121        inner.timezone = tz;
122    }
123
124    pub fn set_read_preference(&self, read_preference: ReadPreference) {
125        self.mutable_inner.write().unwrap().read_preference = read_preference;
126    }
127
128    pub fn user_info(&self) -> UserInfoRef {
129        self.mutable_inner.read().unwrap().user_info.clone()
130    }
131
132    pub fn set_user_info(&self, user_info: UserInfoRef) {
133        self.mutable_inner.write().unwrap().user_info = user_info;
134    }
135
136    pub fn set_catalog(&self, catalog: String) {
137        *self.catalog.write().unwrap() = catalog;
138    }
139
140    pub fn catalog(&self) -> String {
141        self.catalog.read().unwrap().clone()
142    }
143
144    pub fn schema(&self) -> String {
145        self.mutable_inner.read().unwrap().schema.clone()
146    }
147
148    pub fn set_schema(&self, schema: String) {
149        self.mutable_inner.write().unwrap().schema = schema;
150    }
151
152    pub fn get_db_string(&self) -> String {
153        build_db_string(&self.catalog(), &self.schema())
154    }
155
156    pub fn process_id(&self) -> u32 {
157        self.process_id
158    }
159}