mito2/worker/
handle_create.rs1use std::sync::Arc;
18
19use common_telemetry::info;
20use store_api::logstore::LogStore;
21use store_api::metadata::RegionMetadataBuilder;
22use store_api::region_request::{AffectedRows, RegionCreateRequest};
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::region::opener::{check_recovered_region, RegionOpener};
27use crate::worker::RegionWorkerLoop;
28
29impl<S: LogStore> RegionWorkerLoop<S> {
30 pub(crate) async fn handle_create_request(
31 &mut self,
32 region_id: RegionId,
33 request: RegionCreateRequest,
34 ) -> Result<AffectedRows> {
35 if let Some(region) = self.regions.get_region(region_id) {
37 check_recovered_region(
39 ®ion.metadata(),
40 region_id,
41 &request.column_metadatas,
42 &request.primary_key,
43 )?;
44
45 return Ok(0);
46 }
47
48 let mut builder = RegionMetadataBuilder::new(region_id);
50 for column in request.column_metadatas {
51 builder.push_column_metadata(column);
52 }
53 builder.primary_key(request.primary_key);
54
55 let region = RegionOpener::new(
57 region_id,
58 &request.table_dir,
59 request.path_type,
60 self.memtable_builder_provider.clone(),
61 self.object_store_manager.clone(),
62 self.purge_scheduler.clone(),
63 self.puffin_manager_factory.clone(),
64 self.intermediate_manager.clone(),
65 self.time_provider.clone(),
66 )
67 .metadata_builder(builder)
68 .parse_options(request.options)?
69 .cache(Some(self.cache_manager.clone()))
70 .create_or_open(&self.config, &self.wal)
71 .await?;
72
73 info!(
74 "A new region created, worker: {}, region: {:?}",
75 self.id,
76 region.metadata()
77 );
78
79 self.region_count.inc();
80
81 self.regions.insert_region(Arc::new(region));
83
84 Ok(0)
85 }
86}