session/
lib.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15pub mod context;
16pub mod session_config;
17pub mod table_name;
18
19use std::collections::HashMap;
20use std::net::SocketAddr;
21use std::sync::{Arc, RwLock};
22use std::time::Duration;
23
24use auth::UserInfoRef;
25use common_catalog::build_db_string;
26use common_catalog::consts::{DEFAULT_CATALOG_NAME, DEFAULT_SCHEMA_NAME};
27use common_recordbatch::cursor::RecordBatchStreamCursor;
28pub use common_session::ReadPreference;
29use common_time::timezone::get_timezone;
30use common_time::Timezone;
31use context::{ConfigurationVariables, QueryContextBuilder};
32use derive_more::Debug;
33
34use crate::context::{Channel, ConnInfo, QueryContextRef};
35
36/// Session for persistent connection such as MySQL, PostgreSQL etc.
37#[derive(Debug)]
38pub struct Session {
39    catalog: RwLock<String>,
40    mutable_inner: Arc<RwLock<MutableInner>>,
41    conn_info: ConnInfo,
42    configuration_variables: Arc<ConfigurationVariables>,
43}
44
45pub type SessionRef = Arc<Session>;
46
47/// A container for mutable items in query context
48#[derive(Debug)]
49pub(crate) struct MutableInner {
50    schema: String,
51    user_info: UserInfoRef,
52    timezone: Timezone,
53    query_timeout: Option<Duration>,
54    read_preference: ReadPreference,
55    #[debug(skip)]
56    pub(crate) cursors: HashMap<String, Arc<RecordBatchStreamCursor>>,
57}
58
59impl Default for MutableInner {
60    fn default() -> Self {
61        Self {
62            schema: DEFAULT_SCHEMA_NAME.into(),
63            user_info: auth::userinfo_by_name(None),
64            timezone: get_timezone(None).clone(),
65            query_timeout: None,
66            read_preference: ReadPreference::Leader,
67            cursors: HashMap::with_capacity(0),
68        }
69    }
70}
71
72impl Session {
73    pub fn new(
74        addr: Option<SocketAddr>,
75        channel: Channel,
76        configuration_variables: ConfigurationVariables,
77    ) -> Self {
78        Session {
79            catalog: RwLock::new(DEFAULT_CATALOG_NAME.into()),
80            conn_info: ConnInfo::new(addr, channel),
81            configuration_variables: Arc::new(configuration_variables),
82            mutable_inner: Arc::new(RwLock::new(MutableInner::default())),
83        }
84    }
85
86    pub fn new_query_context(&self) -> QueryContextRef {
87        QueryContextBuilder::default()
88            // catalog is not allowed for update in query context so we use
89            // string here
90            .current_catalog(self.catalog.read().unwrap().clone())
91            .mutable_session_data(self.mutable_inner.clone())
92            .sql_dialect(self.conn_info.channel.dialect())
93            .configuration_parameter(self.configuration_variables.clone())
94            .channel(self.conn_info.channel)
95            .build()
96            .into()
97    }
98
99    pub fn conn_info(&self) -> &ConnInfo {
100        &self.conn_info
101    }
102
103    pub fn timezone(&self) -> Timezone {
104        self.mutable_inner.read().unwrap().timezone.clone()
105    }
106
107    pub fn read_preference(&self) -> ReadPreference {
108        self.mutable_inner.read().unwrap().read_preference
109    }
110
111    pub fn set_timezone(&self, tz: Timezone) {
112        let mut inner = self.mutable_inner.write().unwrap();
113        inner.timezone = tz;
114    }
115
116    pub fn set_read_preference(&self, read_preference: ReadPreference) {
117        self.mutable_inner.write().unwrap().read_preference = read_preference;
118    }
119
120    pub fn user_info(&self) -> UserInfoRef {
121        self.mutable_inner.read().unwrap().user_info.clone()
122    }
123
124    pub fn set_user_info(&self, user_info: UserInfoRef) {
125        self.mutable_inner.write().unwrap().user_info = user_info;
126    }
127
128    pub fn set_catalog(&self, catalog: String) {
129        *self.catalog.write().unwrap() = catalog;
130    }
131
132    pub fn catalog(&self) -> String {
133        self.catalog.read().unwrap().clone()
134    }
135
136    pub fn schema(&self) -> String {
137        self.mutable_inner.read().unwrap().schema.clone()
138    }
139
140    pub fn set_schema(&self, schema: String) {
141        self.mutable_inner.write().unwrap().schema = schema;
142    }
143
144    pub fn get_db_string(&self) -> String {
145        build_db_string(&self.catalog(), &self.schema())
146    }
147}