mito2/worker/
handle_create.rs1use std::sync::Arc;
18
19use common_telemetry::info;
20use store_api::logstore::LogStore;
21use store_api::metadata::RegionMetadataBuilder;
22use store_api::region_request::{AffectedRows, RegionCreateRequest};
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::region::opener::{check_recovered_region, RegionOpener};
27use crate::worker::RegionWorkerLoop;
28
29impl<S: LogStore> RegionWorkerLoop<S> {
30 pub(crate) async fn handle_create_request(
31 &mut self,
32 region_id: RegionId,
33 request: RegionCreateRequest,
34 ) -> Result<AffectedRows> {
35 if let Some(region) = self.regions.get_region(region_id) {
37 check_recovered_region(
39 ®ion.metadata(),
40 region_id,
41 &request.column_metadatas,
42 &request.primary_key,
43 )?;
44
45 return Ok(0);
46 }
47
48 let mut builder = RegionMetadataBuilder::new(region_id);
50 for column in request.column_metadatas {
51 builder.push_column_metadata(column);
52 }
53 builder.primary_key(request.primary_key);
54
55 let region = RegionOpener::new(
57 region_id,
58 &request.region_dir,
59 self.memtable_builder_provider.clone(),
60 self.object_store_manager.clone(),
61 self.purge_scheduler.clone(),
62 self.puffin_manager_factory.clone(),
63 self.intermediate_manager.clone(),
64 self.time_provider.clone(),
65 )
66 .metadata_builder(builder)
67 .parse_options(request.options)?
68 .cache(Some(self.cache_manager.clone()))
69 .create_or_open(&self.config, &self.wal)
70 .await?;
71
72 info!(
73 "A new region created, worker: {}, region: {:?}",
74 self.id,
75 region.metadata()
76 );
77
78 self.region_count.inc();
79
80 self.regions.insert_region(Arc::new(region));
82
83 Ok(0)
84 }
85}