mito2/worker/
handle_create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling create request.
16
17use std::sync::Arc;
18
19use common_telemetry::info;
20use store_api::logstore::LogStore;
21use store_api::metadata::RegionMetadataBuilder;
22use store_api::region_request::{AffectedRows, RegionCreateRequest};
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::region::opener::{check_recovered_region, RegionOpener};
27use crate::worker::RegionWorkerLoop;
28
29impl<S: LogStore> RegionWorkerLoop<S> {
30    pub(crate) async fn handle_create_request(
31        &mut self,
32        region_id: RegionId,
33        request: RegionCreateRequest,
34    ) -> Result<AffectedRows> {
35        // Checks whether the table exists.
36        if let Some(region) = self.regions.get_region(region_id) {
37            // Region already exists.
38            check_recovered_region(
39                &region.metadata(),
40                region_id,
41                &request.column_metadatas,
42                &request.primary_key,
43            )?;
44
45            return Ok(0);
46        }
47
48        // Convert the request into a RegionMetadata and validate it.
49        let mut builder = RegionMetadataBuilder::new(region_id);
50        for column in request.column_metadatas {
51            builder.push_column_metadata(column);
52        }
53        builder.primary_key(request.primary_key);
54        if let Some(expr_json) = request.partition_expr_json.as_ref() {
55            builder.partition_expr_json(Some(expr_json.clone()));
56        }
57
58        // Create a MitoRegion from the RegionMetadata.
59        let region = RegionOpener::new(
60            region_id,
61            &request.table_dir,
62            request.path_type,
63            self.memtable_builder_provider.clone(),
64            self.object_store_manager.clone(),
65            self.purge_scheduler.clone(),
66            self.puffin_manager_factory.clone(),
67            self.intermediate_manager.clone(),
68            self.time_provider.clone(),
69            self.file_ref_manager.clone(),
70        )
71        .metadata_builder(builder)
72        .parse_options(request.options)?
73        .cache(Some(self.cache_manager.clone()))
74        .create_or_open(&self.config, &self.wal)
75        .await?;
76
77        info!(
78            "A new region created, worker: {}, region: {:?}",
79            self.id,
80            region.metadata()
81        );
82
83        self.region_count.inc();
84
85        // Insert the MitoRegion into the RegionMap.
86        self.regions.insert_region(Arc::new(region));
87
88        Ok(0)
89    }
90}