mito2/worker/
handle_create.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Handling create request.
16
17use std::sync::Arc;
18
19use common_telemetry::info;
20use store_api::logstore::LogStore;
21use store_api::metadata::RegionMetadataBuilder;
22use store_api::region_request::{AffectedRows, RegionCreateRequest};
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::region::opener::{check_recovered_region, RegionOpener};
27use crate::worker::RegionWorkerLoop;
28
29impl<S: LogStore> RegionWorkerLoop<S> {
30    pub(crate) async fn handle_create_request(
31        &mut self,
32        region_id: RegionId,
33        request: RegionCreateRequest,
34    ) -> Result<AffectedRows> {
35        // Checks whether the table exists.
36        if let Some(region) = self.regions.get_region(region_id) {
37            // Region already exists.
38            check_recovered_region(
39                &region.metadata(),
40                region_id,
41                &request.column_metadatas,
42                &request.primary_key,
43            )?;
44
45            return Ok(0);
46        }
47
48        // Convert the request into a RegionMetadata and validate it.
49        let mut builder = RegionMetadataBuilder::new(region_id);
50        for column in request.column_metadatas {
51            builder.push_column_metadata(column);
52        }
53        builder.primary_key(request.primary_key);
54
55        // Create a MitoRegion from the RegionMetadata.
56        let region = RegionOpener::new(
57            region_id,
58            &request.table_dir,
59            request.path_type,
60            self.memtable_builder_provider.clone(),
61            self.object_store_manager.clone(),
62            self.purge_scheduler.clone(),
63            self.puffin_manager_factory.clone(),
64            self.intermediate_manager.clone(),
65            self.time_provider.clone(),
66        )
67        .metadata_builder(builder)
68        .parse_options(request.options)?
69        .cache(Some(self.cache_manager.clone()))
70        .create_or_open(&self.config, &self.wal)
71        .await?;
72
73        info!(
74            "A new region created, worker: {}, region: {:?}",
75            self.id,
76            region.metadata()
77        );
78
79        self.region_count.inc();
80
81        // Insert the MitoRegion into the RegionMap.
82        self.regions.insert_region(Arc::new(region));
83
84        Ok(0)
85    }
86}