mito2/worker/
handle_create.rs

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

//! Handling create request.

use std::sync::Arc;

use common_telemetry::info;
use store_api::logstore::LogStore;
use store_api::metadata::RegionMetadataBuilder;
use store_api::region_request::{AffectedRows, RegionCreateRequest};
use store_api::storage::RegionId;

use crate::error::Result;
use crate::region::opener::{check_recovered_region, RegionOpener};
use crate::worker::RegionWorkerLoop;

impl<S: LogStore> RegionWorkerLoop<S> {
    pub(crate) async fn handle_create_request(
        &mut self,
        region_id: RegionId,
        request: RegionCreateRequest,
    ) -> Result<AffectedRows> {
        // Checks whether the table exists.
        if let Some(region) = self.regions.get_region(region_id) {
            // Region already exists.
            check_recovered_region(
                &region.metadata(),
                region_id,
                &request.column_metadatas,
                &request.primary_key,
            )?;

            return Ok(0);
        }

        // Convert the request into a RegionMetadata and validate it.
        let mut builder = RegionMetadataBuilder::new(region_id);
        for column in request.column_metadatas {
            builder.push_column_metadata(column);
        }
        builder.primary_key(request.primary_key);

        // Create a MitoRegion from the RegionMetadata.
        let region = RegionOpener::new(
            region_id,
            &request.region_dir,
            self.memtable_builder_provider.clone(),
            self.object_store_manager.clone(),
            self.purge_scheduler.clone(),
            self.puffin_manager_factory.clone(),
            self.intermediate_manager.clone(),
            self.time_provider.clone(),
        )
        .metadata_builder(builder)
        .parse_options(request.options)?
        .cache(Some(self.cache_manager.clone()))
        .create_or_open(&self.config, &self.wal)
        .await?;

        info!(
            "A new region created, worker: {}, region: {:?}",
            self.id,
            region.metadata()
        );

        self.region_count.inc();

        // Insert the MitoRegion into the RegionMap.
        self.regions.insert_region(Arc::new(region));

        Ok(0)
    }
}