mito2/worker/
handle_create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling create request.
16
17use common_telemetry::info;
18use store_api::logstore::LogStore;
19use store_api::metadata::RegionMetadataBuilder;
20use store_api::region_request::{AffectedRows, RegionCreateRequest};
21use store_api::storage::RegionId;
22
23use crate::error::Result;
24use crate::region::opener::{RegionOpener, check_recovered_region};
25use crate::worker::RegionWorkerLoop;
26
27impl<S: LogStore> RegionWorkerLoop<S> {
28    pub(crate) async fn handle_create_request(
29        &mut self,
30        region_id: RegionId,
31        request: RegionCreateRequest,
32    ) -> Result<AffectedRows> {
33        // Checks whether the table exists.
34        if let Some(region) = self.regions.get_region(region_id) {
35            // Region already exists.
36            check_recovered_region(
37                &region.metadata(),
38                region_id,
39                &request.column_metadatas,
40                &request.primary_key,
41            )?;
42
43            return Ok(0);
44        }
45
46        // Convert the request into a RegionMetadata and validate it.
47        let mut builder = RegionMetadataBuilder::new(region_id);
48        for column in request.column_metadatas {
49            builder.push_column_metadata(column);
50        }
51        builder.primary_key(request.primary_key);
52        if let Some(expr_json) = request.partition_expr_json.as_ref() {
53            builder.partition_expr_json(Some(expr_json.clone()));
54        }
55
56        // Create a MitoRegion from the RegionMetadata.
57        let region = RegionOpener::new(
58            region_id,
59            &request.table_dir,
60            request.path_type,
61            self.memtable_builder_provider.clone(),
62            self.object_store_manager.clone(),
63            self.purge_scheduler.clone(),
64            self.puffin_manager_factory.clone(),
65            self.intermediate_manager.clone(),
66            self.time_provider.clone(),
67            self.file_ref_manager.clone(),
68            self.partition_expr_fetcher.clone(),
69        )
70        .metadata_builder(builder)
71        .parse_options(request.options)?
72        .cache(Some(self.cache_manager.clone()))
73        .create_or_open(&self.config, &self.wal)
74        .await?;
75
76        info!(
77            "A new region created, worker: {}, region: {:?}",
78            self.id,
79            region.metadata()
80        );
81
82        self.region_count.inc();
83
84        // Insert the MitoRegion into the RegionMap.
85        self.regions.insert_region(region);
86
87        Ok(0)
88    }
89}