mito2/worker/
handle_create.rs1use std::sync::Arc;
18
19use common_telemetry::info;
20use store_api::logstore::LogStore;
21use store_api::metadata::RegionMetadataBuilder;
22use store_api::region_request::{AffectedRows, RegionCreateRequest};
23use store_api::storage::RegionId;
24
25use crate::error::Result;
26use crate::region::opener::{check_recovered_region, RegionOpener};
27use crate::worker::RegionWorkerLoop;
28
29impl<S: LogStore> RegionWorkerLoop<S> {
30 pub(crate) async fn handle_create_request(
31 &mut self,
32 region_id: RegionId,
33 request: RegionCreateRequest,
34 ) -> Result<AffectedRows> {
35 if let Some(region) = self.regions.get_region(region_id) {
37 check_recovered_region(
39 ®ion.metadata(),
40 region_id,
41 &request.column_metadatas,
42 &request.primary_key,
43 )?;
44
45 return Ok(0);
46 }
47
48 let mut builder = RegionMetadataBuilder::new(region_id);
50 for column in request.column_metadatas {
51 builder.push_column_metadata(column);
52 }
53 builder.primary_key(request.primary_key);
54 if let Some(expr_json) = request.partition_expr_json.as_ref() {
55 builder.partition_expr_json(Some(expr_json.clone()));
56 }
57
58 let region = RegionOpener::new(
60 region_id,
61 &request.table_dir,
62 request.path_type,
63 self.memtable_builder_provider.clone(),
64 self.object_store_manager.clone(),
65 self.purge_scheduler.clone(),
66 self.puffin_manager_factory.clone(),
67 self.intermediate_manager.clone(),
68 self.time_provider.clone(),
69 self.file_ref_manager.clone(),
70 )
71 .metadata_builder(builder)
72 .parse_options(request.options)?
73 .cache(Some(self.cache_manager.clone()))
74 .create_or_open(&self.config, &self.wal)
75 .await?;
76
77 info!(
78 "A new region created, worker: {}, region: {:?}",
79 self.id,
80 region.metadata()
81 );
82
83 self.region_count.inc();
84
85 self.regions.insert_region(Arc::new(region));
87
88 Ok(0)
89 }
90}