mito2/worker/
handle_create.rs1use common_telemetry::info;
18use store_api::logstore::LogStore;
19use store_api::metadata::RegionMetadataBuilder;
20use store_api::region_request::{AffectedRows, RegionCreateRequest};
21use store_api::storage::RegionId;
22
23use crate::error::Result;
24use crate::region::opener::{RegionOpener, check_recovered_region};
25use crate::worker::RegionWorkerLoop;
26
27impl<S: LogStore> RegionWorkerLoop<S> {
28 pub(crate) async fn handle_create_request(
29 &mut self,
30 region_id: RegionId,
31 request: RegionCreateRequest,
32 ) -> Result<AffectedRows> {
33 if let Some(region) = self.regions.get_region(region_id) {
35 check_recovered_region(
37 ®ion.metadata(),
38 region_id,
39 &request.column_metadatas,
40 &request.primary_key,
41 )?;
42
43 return Ok(0);
44 }
45
46 let mut builder = RegionMetadataBuilder::new(region_id);
48 for column in request.column_metadatas {
49 builder.push_column_metadata(column);
50 }
51 builder.primary_key(request.primary_key);
52 if let Some(expr_json) = request.partition_expr_json.as_ref() {
53 builder.partition_expr_json(Some(expr_json.clone()));
54 }
55
56 let region = RegionOpener::new(
58 region_id,
59 &request.table_dir,
60 request.path_type,
61 self.memtable_builder_provider.clone(),
62 self.object_store_manager.clone(),
63 self.purge_scheduler.clone(),
64 self.puffin_manager_factory.clone(),
65 self.intermediate_manager.clone(),
66 self.time_provider.clone(),
67 self.file_ref_manager.clone(),
68 self.partition_expr_fetcher.clone(),
69 )
70 .metadata_builder(builder)
71 .parse_options(request.options)?
72 .cache(Some(self.cache_manager.clone()))
73 .create_or_open(&self.config, &self.wal)
74 .await?;
75
76 info!(
77 "A new region created, worker: {}, region: {:?}",
78 self.id,
79 region.metadata()
80 );
81
82 self.region_count.inc();
83
84 self.regions.insert_region(region);
86
87 Ok(0)
88 }
89}