1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26 CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27 FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28 region_request, truncate_request,
29};
30use api::v1::{
31 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47 ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48 InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49 InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50 RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55 APPEND_MODE_KEY, SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
56 TWCS_TRIGGER_FILE_NUM,
57};
58use crate::path_utils::table_dir;
59use crate::storage::{ColumnId, RegionId, ScanRequest};
60
61#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
63#[repr(u8)]
64pub enum PathType {
65 Bare,
69 Data,
73 Metadata,
77}
78
79#[derive(Debug, IntoStaticStr)]
80pub enum BatchRegionDdlRequest {
81 Create(Vec<(RegionId, RegionCreateRequest)>),
82 Drop(Vec<(RegionId, RegionDropRequest)>),
83 Alter(Vec<(RegionId, RegionAlterRequest)>),
84}
85
86impl BatchRegionDdlRequest {
87 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
89 match body {
90 region_request::Body::Creates(creates) => {
91 let requests = creates
92 .requests
93 .into_iter()
94 .map(parse_region_create)
95 .collect::<Result<Vec<_>>>()?;
96 Ok(Some(Self::Create(requests)))
97 }
98 region_request::Body::Drops(drops) => {
99 let requests = drops
100 .requests
101 .into_iter()
102 .map(parse_region_drop)
103 .collect::<Result<Vec<_>>>()?;
104 Ok(Some(Self::Drop(requests)))
105 }
106 region_request::Body::Alters(alters) => {
107 let requests = alters
108 .requests
109 .into_iter()
110 .map(parse_region_alter)
111 .collect::<Result<Vec<_>>>()?;
112 Ok(Some(Self::Alter(requests)))
113 }
114 _ => Ok(None),
115 }
116 }
117
118 pub fn request_type(&self) -> &'static str {
119 self.into()
120 }
121
122 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
123 match self {
124 Self::Create(requests) => requests
125 .into_iter()
126 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
127 .collect(),
128 Self::Drop(requests) => requests
129 .into_iter()
130 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
131 .collect(),
132 Self::Alter(requests) => requests
133 .into_iter()
134 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
135 .collect(),
136 }
137 }
138}
139
140#[derive(Debug, IntoStaticStr)]
141pub enum RegionRequest {
142 Put(RegionPutRequest),
143 Delete(RegionDeleteRequest),
144 Create(RegionCreateRequest),
145 Drop(RegionDropRequest),
146 Open(RegionOpenRequest),
147 Close(RegionCloseRequest),
148 Alter(RegionAlterRequest),
149 Flush(RegionFlushRequest),
150 Compact(RegionCompactRequest),
151 BuildIndex(RegionBuildIndexRequest),
152 Truncate(RegionTruncateRequest),
153 Catchup(RegionCatchupRequest),
154 BulkInserts(RegionBulkInsertsRequest),
155 EnterStaging(EnterStagingRequest),
156 ApplyStagingManifest(ApplyStagingManifestRequest),
157}
158
159impl RegionRequest {
160 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
163 match body {
164 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
165 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
166 region_request::Body::Create(create) => make_region_create(create),
167 region_request::Body::Drop(drop) => make_region_drop(drop),
168 region_request::Body::Open(open) => make_region_open(open),
169 region_request::Body::Close(close) => make_region_close(close),
170 region_request::Body::Alter(alter) => make_region_alter(alter),
171 region_request::Body::Flush(flush) => make_region_flush(flush),
172 region_request::Body::Compact(compact) => make_region_compact(compact),
173 region_request::Body::BuildIndex(index) => make_region_build_index(index),
174 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
175 region_request::Body::Creates(creates) => make_region_creates(creates),
176 region_request::Body::Drops(drops) => make_region_drops(drops),
177 region_request::Body::Alters(alters) => make_region_alters(alters),
178 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
179 region_request::Body::Sync(_) => UnexpectedSnafu {
180 reason: "Sync request should be handled separately by RegionServer",
181 }
182 .fail(),
183 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
184 reason: "ListMetadata request should be handled separately by RegionServer",
185 }
186 .fail(),
187 region_request::Body::RemoteDynFilter(_) => UnexpectedSnafu {
188 reason: "RemoteDynFilter request should be handled separately by RegionServer",
189 }
190 .fail(),
191 region_request::Body::ApplyStagingManifest(apply) => {
192 make_region_apply_staging_manifest(apply)
193 }
194 }
195 }
196
197 pub fn request_type(&self) -> &'static str {
199 self.into()
200 }
201}
202
203fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
204 let requests = inserts
205 .requests
206 .into_iter()
207 .filter_map(|r| {
208 let region_id = r.region_id.into();
209 r.rows.map(|rows| {
210 (
211 region_id,
212 RegionRequest::Put(RegionPutRequest {
213 rows,
214 hint: None,
215 partition_expr_version: r.partition_expr_version.map(|v| v.value),
216 }),
217 )
218 })
219 })
220 .collect();
221 Ok(requests)
222}
223
224fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
225 let requests = deletes
226 .requests
227 .into_iter()
228 .filter_map(|r| {
229 let region_id = r.region_id.into();
230 r.rows.map(|rows| {
231 (
232 region_id,
233 RegionRequest::Delete(RegionDeleteRequest {
234 rows,
235 hint: None,
236 partition_expr_version: r.partition_expr_version.map(|v| v.value),
237 }),
238 )
239 })
240 })
241 .collect();
242 Ok(requests)
243}
244
245fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
246 let column_metadatas = create
247 .column_defs
248 .into_iter()
249 .map(ColumnMetadata::try_from_column_def)
250 .collect::<Result<Vec<_>>>()?;
251 let region_id = RegionId::from(create.region_id);
252 let table_dir = table_dir(&create.path, region_id.table_id());
253 let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
254 Ok((
255 region_id,
256 RegionCreateRequest {
257 engine: create.engine,
258 column_metadatas,
259 primary_key: create.primary_key,
260 options: create.options,
261 table_dir,
262 path_type: PathType::Bare,
263 partition_expr_json,
264 requirements: create
265 .requirements
266 .map(RegionRequirements::from)
267 .unwrap_or_default(),
268 },
269 ))
270}
271
272fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
273 let (region_id, request) = parse_region_create(create)?;
274 Ok(vec![(region_id, RegionRequest::Create(request))])
275}
276
277fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
278 let mut requests = Vec::with_capacity(creates.requests.len());
279 for create in creates.requests {
280 requests.extend(make_region_create(create)?);
281 }
282 Ok(requests)
283}
284
285fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
286 let region_id = drop.region_id.into();
287 Ok((
288 region_id,
289 RegionDropRequest {
290 fast_path: drop.fast_path,
291 force: drop.force,
292 partial_drop: drop.partial_drop,
293 },
294 ))
295}
296
297fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
298 let (region_id, request) = parse_region_drop(drop)?;
299 Ok(vec![(region_id, RegionRequest::Drop(request))])
300}
301
302fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
303 let mut requests = Vec::with_capacity(drops.requests.len());
304 for drop in drops.requests {
305 requests.extend(make_region_drop(drop)?);
306 }
307 Ok(requests)
308}
309
310fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
311 let region_id = RegionId::from(open.region_id);
312 let table_dir = table_dir(&open.path, region_id.table_id());
313 Ok(vec![(
314 region_id,
315 RegionRequest::Open(RegionOpenRequest {
316 engine: open.engine,
317 table_dir,
318 path_type: PathType::Bare,
319 options: open.options,
320 skip_wal_replay: false,
321 checkpoint: None,
322 requirements: Default::default(),
323 }),
324 )])
325}
326
327fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
328 let region_id = close.region_id.into();
329 Ok(vec![(
330 region_id,
331 RegionRequest::Close(RegionCloseRequest {}),
332 )])
333}
334
335fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
336 let region_id = alter.region_id.into();
337 let request = RegionAlterRequest::try_from(alter)?;
338 Ok((region_id, request))
339}
340
341fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
342 let (region_id, request) = parse_region_alter(alter)?;
343 Ok(vec![(region_id, RegionRequest::Alter(request))])
344}
345
346fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
347 let mut requests = Vec::with_capacity(alters.requests.len());
348 for alter in alters.requests {
349 requests.extend(make_region_alter(alter)?);
350 }
351 Ok(requests)
352}
353
354fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
355 let region_id = flush.region_id.into();
356 Ok(vec![(
357 region_id,
358 RegionRequest::Flush(RegionFlushRequest::default()),
359 )])
360}
361
362fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
363 let region_id = compact.region_id.into();
364 let options = compact
365 .options
366 .unwrap_or(compact_request::Options::Regular(Default::default()));
367 let parallelism = if compact.parallelism == 0 {
369 None
370 } else {
371 Some(compact.parallelism)
372 };
373 Ok(vec![(
374 region_id,
375 RegionRequest::Compact(RegionCompactRequest {
376 options,
377 parallelism,
378 }),
379 )])
380}
381
382fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
383 let region_id = index.region_id.into();
384 Ok(vec![(
385 region_id,
386 RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
387 )])
388}
389
390fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
391 let region_id = truncate.region_id.into();
392 match truncate.kind {
393 None => InvalidRawRegionRequestSnafu {
394 err: "missing kind in TruncateRequest".to_string(),
395 }
396 .fail(),
397 Some(truncate_request::Kind::All(_)) => Ok(vec![(
398 region_id,
399 RegionRequest::Truncate(RegionTruncateRequest::All),
400 )]),
401 Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
402 let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
403
404 Ok(vec![(
405 region_id,
406 RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
407 )])
408 }
409 }
410}
411
412fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
414 let region_id = request.region_id.into();
415 let partition_expr_version = request.partition_expr_version.map(|v| v.value);
416 let aligned_schema_version = request.aligned_schema_version.map(|v| v.schema_version);
417 let Some(Body::ArrowIpc(request)) = request.body else {
418 return Ok(vec![]);
419 };
420
421 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
422 .with_label_values(&["decode"])
423 .start_timer();
424 let mut decoder =
425 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
426 let payload = decoder
427 .try_decode_record_batch(&request.data_header, &request.payload)
428 .context(FlightCodecSnafu)?;
429 decoder_timer.observe_duration();
430 Ok(vec![(
431 region_id,
432 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
433 region_id,
434 payload,
435 raw_data: request,
436 partition_expr_version,
437 aligned_schema_version,
438 }),
439 )])
440}
441
442fn make_region_apply_staging_manifest(
443 api::v1::region::ApplyStagingManifestRequest {
444 region_id,
445 partition_expr,
446 central_region_id,
447 manifest_path,
448 }: api::v1::region::ApplyStagingManifestRequest,
449) -> Result<Vec<(RegionId, RegionRequest)>> {
450 let region_id = region_id.into();
451 Ok(vec![(
452 region_id,
453 RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
454 partition_expr,
455 central_region_id: central_region_id.into(),
456 manifest_path,
457 }),
458 )])
459}
460
461#[derive(Debug)]
463pub struct RegionPutRequest {
464 pub rows: Rows,
466 pub hint: Option<WriteHint>,
468 pub partition_expr_version: Option<u64>,
470}
471
472#[derive(Debug)]
473pub struct RegionReadRequest {
474 pub request: ScanRequest,
475}
476
477#[derive(Debug)]
479pub struct RegionDeleteRequest {
480 pub rows: Rows,
484 pub hint: Option<WriteHint>,
486 pub partition_expr_version: Option<u64>,
488}
489
490#[derive(Debug, Clone)]
491pub struct RegionCreateRequest {
492 pub engine: String,
494 pub column_metadatas: Vec<ColumnMetadata>,
496 pub primary_key: Vec<ColumnId>,
498 pub options: HashMap<String, String>,
500 pub table_dir: String,
502 pub path_type: PathType,
504 pub partition_expr_json: Option<String>,
507 pub requirements: RegionRequirements,
509}
510
511impl RegionCreateRequest {
512 pub fn validate(&self) -> Result<()> {
514 ensure!(
516 self.column_metadatas
517 .iter()
518 .any(|x| x.semantic_type == SemanticType::Timestamp),
519 InvalidRegionRequestSnafu {
520 region_id: RegionId::new(0, 0),
521 err: "missing timestamp column in create region request".to_string(),
522 }
523 );
524
525 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
527 for (i, c) in self.column_metadatas.iter().enumerate() {
528 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
529 return InvalidRegionRequestSnafu {
530 region_id: RegionId::new(0, 0),
531 err: format!(
532 "duplicate column id {} (at position {} and {}) in create region request",
533 c.column_id, previous, i
534 ),
535 }
536 .fail();
537 }
538 }
539
540 for column_id in &self.primary_key {
542 ensure!(
543 column_id_to_indices.contains_key(column_id),
544 InvalidRegionRequestSnafu {
545 region_id: RegionId::new(0, 0),
546 err: format!(
547 "missing primary key column {} in create region request",
548 column_id
549 ),
550 }
551 );
552 }
553
554 Ok(())
555 }
556
557 pub fn is_physical_table(&self) -> bool {
559 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
560 }
561}
562
563#[derive(Debug, Clone)]
564pub struct RegionDropRequest {
565 pub fast_path: bool,
568
569 pub force: bool,
572
573 pub partial_drop: bool,
576}
577
578#[derive(Debug, Clone, Copy, Default, PartialEq, Eq, Serialize, Deserialize)]
580#[serde(default)]
581pub struct RegionRequirements {
582 pub object_storage: bool,
584}
585
586impl RegionRequirements {
587 pub fn empty() -> Self {
589 Self::default()
590 }
591
592 pub fn object_storage() -> Self {
594 Self {
595 object_storage: true,
596 }
597 }
598}
599
600impl From<api::v1::region::RegionRequirements> for RegionRequirements {
601 fn from(value: api::v1::region::RegionRequirements) -> Self {
602 Self {
603 object_storage: value.object_storage,
604 }
605 }
606}
607
608impl From<RegionRequirements> for api::v1::region::RegionRequirements {
609 fn from(value: RegionRequirements) -> Self {
610 Self {
611 object_storage: value.object_storage,
612 }
613 }
614}
615
616#[derive(Debug, Clone)]
618pub struct RegionOpenRequest {
619 pub engine: String,
621 pub table_dir: String,
623 pub path_type: PathType,
625 pub options: HashMap<String, String>,
627 pub skip_wal_replay: bool,
629 pub checkpoint: Option<ReplayCheckpoint>,
631 pub requirements: RegionRequirements,
633}
634
635#[derive(Debug, Clone, Copy, PartialEq, Eq)]
636pub struct ReplayCheckpoint {
637 pub entry_id: u64,
638 pub metadata_entry_id: Option<u64>,
639}
640
641impl RegionOpenRequest {
642 pub fn is_physical_table(&self) -> bool {
644 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
645 }
646}
647
648#[derive(Debug)]
650pub struct RegionCloseRequest {}
651
652#[derive(Debug, PartialEq, Eq, Clone)]
654pub struct RegionAlterRequest {
655 pub kind: AlterKind,
657}
658
659impl RegionAlterRequest {
660 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
662 self.kind.validate(metadata)?;
663
664 Ok(())
665 }
666
667 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
671 debug_assert!(self.validate(metadata).is_ok());
672 self.kind.need_alter(metadata)
673 }
674}
675
676impl TryFrom<AlterRequest> for RegionAlterRequest {
677 type Error = MetadataError;
678
679 fn try_from(value: AlterRequest) -> Result<Self> {
680 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
681 err: "missing kind in AlterRequest",
682 })?;
683
684 let kind = AlterKind::try_from(kind)?;
685 Ok(RegionAlterRequest { kind })
686 }
687}
688
689#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
691pub enum AlterKind {
692 AddColumns {
694 columns: Vec<AddColumn>,
696 },
697 DropColumns {
699 names: Vec<String>,
701 },
702 ModifyColumnTypes {
704 columns: Vec<ModifyColumnType>,
706 },
707 SetRegionOptions { options: Vec<SetRegionOption> },
709 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
711 SetIndexes { options: Vec<SetIndexOption> },
713 UnsetIndexes { options: Vec<UnsetIndexOption> },
715 DropDefaults {
717 names: Vec<String>,
719 },
720 SetDefaults {
722 columns: Vec<SetDefault>,
724 },
725 SyncColumns {
727 column_metadatas: Vec<ColumnMetadata>,
728 },
729}
730#[derive(Debug, PartialEq, Eq, Clone)]
731pub struct SetDefault {
732 pub name: String,
733 pub default_constraint: Vec<u8>,
734}
735
736#[derive(Debug, PartialEq, Eq, Clone)]
737pub enum SetIndexOption {
738 Fulltext {
739 column_name: String,
740 options: FulltextOptions,
741 },
742 Inverted {
743 column_name: String,
744 },
745 Skipping {
746 column_name: String,
747 options: SkippingIndexOptions,
748 },
749}
750
751impl SetIndexOption {
752 pub fn column_name(&self) -> &String {
754 match self {
755 SetIndexOption::Fulltext { column_name, .. } => column_name,
756 SetIndexOption::Inverted { column_name } => column_name,
757 SetIndexOption::Skipping { column_name, .. } => column_name,
758 }
759 }
760
761 pub fn is_fulltext(&self) -> bool {
763 match self {
764 SetIndexOption::Fulltext { .. } => true,
765 SetIndexOption::Inverted { .. } => false,
766 SetIndexOption::Skipping { .. } => false,
767 }
768 }
769}
770
771impl TryFrom<v1::SetIndex> for SetIndexOption {
772 type Error = MetadataError;
773
774 fn try_from(value: v1::SetIndex) -> Result<Self> {
775 let option = value.options.context(InvalidRawRegionRequestSnafu {
776 err: "missing options in SetIndex",
777 })?;
778
779 let opt = match option {
780 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
781 column_name: x.column_name.clone(),
782 options: FulltextOptions::new(
783 x.enable,
784 as_fulltext_option_analyzer(
785 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
786 ),
787 x.case_sensitive,
788 as_fulltext_option_backend(
789 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
790 ),
791 x.granularity as u32,
792 x.false_positive_rate,
793 )
794 .context(InvalidIndexOptionSnafu)?,
795 },
796 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
797 column_name: i.column_name,
798 },
799 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
800 column_name: s.column_name,
801 options: SkippingIndexOptions::new(
802 s.granularity as u32,
803 s.false_positive_rate,
804 as_skipping_index_type(
805 PbSkippingIndexType::try_from(s.skipping_index_type)
806 .context(DecodeProtoSnafu)?,
807 ),
808 )
809 .context(InvalidIndexOptionSnafu)?,
810 },
811 };
812
813 Ok(opt)
814 }
815}
816
817#[derive(Debug, PartialEq, Eq, Clone)]
818pub enum UnsetIndexOption {
819 Fulltext { column_name: String },
820 Inverted { column_name: String },
821 Skipping { column_name: String },
822}
823
824impl UnsetIndexOption {
825 pub fn column_name(&self) -> &String {
826 match self {
827 UnsetIndexOption::Fulltext { column_name } => column_name,
828 UnsetIndexOption::Inverted { column_name } => column_name,
829 UnsetIndexOption::Skipping { column_name } => column_name,
830 }
831 }
832
833 pub fn is_fulltext(&self) -> bool {
834 match self {
835 UnsetIndexOption::Fulltext { .. } => true,
836 UnsetIndexOption::Inverted { .. } => false,
837 UnsetIndexOption::Skipping { .. } => false,
838 }
839 }
840}
841
842impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
843 type Error = MetadataError;
844
845 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
846 let option = value.options.context(InvalidRawRegionRequestSnafu {
847 err: "missing options in UnsetIndex",
848 })?;
849
850 let opt = match option {
851 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
852 column_name: f.column_name,
853 },
854 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
855 column_name: i.column_name,
856 },
857 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
858 column_name: s.column_name,
859 },
860 };
861
862 Ok(opt)
863 }
864}
865
866impl AlterKind {
867 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
871 match self {
872 AlterKind::AddColumns { columns } => {
873 for col_to_add in columns {
874 col_to_add.validate(metadata)?;
875 }
876 }
877 AlterKind::DropColumns { names } => {
878 for name in names {
879 Self::validate_column_to_drop(name, metadata)?;
880 }
881 }
882 AlterKind::ModifyColumnTypes { columns } => {
883 for col_to_change in columns {
884 col_to_change.validate(metadata)?;
885 }
886 }
887 AlterKind::SetRegionOptions { .. } => {}
888 AlterKind::UnsetRegionOptions { .. } => {}
889 AlterKind::SetIndexes { options } => {
890 for option in options {
891 Self::validate_column_alter_index_option(
892 option.column_name(),
893 metadata,
894 option.is_fulltext(),
895 )?;
896 }
897 }
898 AlterKind::UnsetIndexes { options } => {
899 for option in options {
900 Self::validate_column_alter_index_option(
901 option.column_name(),
902 metadata,
903 option.is_fulltext(),
904 )?;
905 }
906 }
907 AlterKind::DropDefaults { names } => {
908 names
909 .iter()
910 .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
911 }
912 AlterKind::SetDefaults { columns } => {
913 columns
914 .iter()
915 .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
916 }
917 AlterKind::SyncColumns { column_metadatas } => {
918 let new_primary_keys = column_metadatas
919 .iter()
920 .filter(|c| c.semantic_type == SemanticType::Tag)
921 .map(|c| (c.column_schema.name.as_str(), c.column_id))
922 .collect::<HashMap<_, _>>();
923
924 let old_primary_keys = metadata
925 .column_metadatas
926 .iter()
927 .filter(|c| c.semantic_type == SemanticType::Tag)
928 .map(|c| (c.column_schema.name.as_str(), c.column_id));
929
930 for (name, id) in old_primary_keys {
931 let primary_key =
932 new_primary_keys
933 .get(name)
934 .with_context(|| InvalidRegionRequestSnafu {
935 region_id: metadata.region_id,
936 err: format!("column {} is not a primary key", name),
937 })?;
938
939 ensure!(
940 *primary_key == id,
941 InvalidRegionRequestSnafu {
942 region_id: metadata.region_id,
943 err: format!(
944 "column with same name {} has different id, existing: {}, got: {}",
945 name, id, primary_key
946 ),
947 }
948 );
949 }
950
951 let new_ts_column = column_metadatas
952 .iter()
953 .find(|c| c.semantic_type == SemanticType::Timestamp)
954 .map(|c| (c.column_schema.name.as_str(), c.column_id))
955 .context(InvalidRegionRequestSnafu {
956 region_id: metadata.region_id,
957 err: "timestamp column not found",
958 })?;
959
960 let old_ts_column = metadata
962 .column_metadatas
963 .iter()
964 .find(|c| c.semantic_type == SemanticType::Timestamp)
965 .map(|c| (c.column_schema.name.as_str(), c.column_id))
966 .unwrap();
967
968 ensure!(
969 new_ts_column == old_ts_column,
970 InvalidRegionRequestSnafu {
971 region_id: metadata.region_id,
972 err: format!(
973 "timestamp column {} has different id, existing: {}, got: {}",
974 old_ts_column.0, old_ts_column.1, new_ts_column.1
975 ),
976 }
977 );
978 }
979 }
980 Ok(())
981 }
982
983 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
985 debug_assert!(self.validate(metadata).is_ok());
986 match self {
987 AlterKind::AddColumns { columns } => columns
988 .iter()
989 .any(|col_to_add| col_to_add.need_alter(metadata)),
990 AlterKind::DropColumns { names } => names
991 .iter()
992 .any(|name| metadata.column_by_name(name).is_some()),
993 AlterKind::ModifyColumnTypes { columns } => columns
994 .iter()
995 .any(|col_to_change| col_to_change.need_alter(metadata)),
996 AlterKind::SetRegionOptions { .. } => true,
997 AlterKind::UnsetRegionOptions { .. } => true,
998 AlterKind::SetIndexes { options, .. } => options
999 .iter()
1000 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
1001 AlterKind::UnsetIndexes { options } => options
1002 .iter()
1003 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
1004 AlterKind::DropDefaults { names } => names
1005 .iter()
1006 .any(|name| metadata.column_by_name(name).is_some()),
1007
1008 AlterKind::SetDefaults { columns } => columns
1009 .iter()
1010 .any(|x| metadata.column_by_name(&x.name).is_some()),
1011 AlterKind::SyncColumns { column_metadatas } => {
1012 metadata.column_metadatas != *column_metadatas
1013 }
1014 }
1015 }
1016
1017 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
1019 let Some(column) = metadata.column_by_name(name) else {
1020 return Ok(());
1021 };
1022 ensure!(
1023 column.semantic_type == SemanticType::Field,
1024 InvalidRegionRequestSnafu {
1025 region_id: metadata.region_id,
1026 err: format!("column {} is not a field and could not be dropped", name),
1027 }
1028 );
1029 Ok(())
1030 }
1031
1032 fn validate_column_alter_index_option(
1034 column_name: &String,
1035 metadata: &RegionMetadata,
1036 is_fulltext: bool,
1037 ) -> Result<()> {
1038 let column = metadata
1039 .column_by_name(column_name)
1040 .context(InvalidRegionRequestSnafu {
1041 region_id: metadata.region_id,
1042 err: format!("column {} not found", column_name),
1043 })?;
1044
1045 if is_fulltext {
1046 ensure!(
1047 column.column_schema.data_type.is_string(),
1048 InvalidRegionRequestSnafu {
1049 region_id: metadata.region_id,
1050 err: format!(
1051 "cannot change alter index options for non-string column {}",
1052 column_name
1053 ),
1054 }
1055 );
1056 }
1057
1058 Ok(())
1059 }
1060
1061 fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
1063 metadata
1064 .column_by_name(column_name)
1065 .context(InvalidRegionRequestSnafu {
1066 region_id: metadata.region_id,
1067 err: format!("column {} not found", column_name),
1068 })?;
1069
1070 Ok(())
1071 }
1072}
1073
1074impl TryFrom<alter_request::Kind> for AlterKind {
1075 type Error = MetadataError;
1076
1077 fn try_from(kind: alter_request::Kind) -> Result<Self> {
1078 let alter_kind = match kind {
1079 alter_request::Kind::AddColumns(x) => {
1080 let columns = x
1081 .add_columns
1082 .into_iter()
1083 .map(|x| x.try_into())
1084 .collect::<Result<Vec<_>>>()?;
1085 AlterKind::AddColumns { columns }
1086 }
1087 alter_request::Kind::ModifyColumnTypes(x) => {
1088 let columns = x
1089 .modify_column_types
1090 .into_iter()
1091 .map(|x| x.into())
1092 .collect::<Vec<_>>();
1093 AlterKind::ModifyColumnTypes { columns }
1094 }
1095 alter_request::Kind::DropColumns(x) => {
1096 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
1097 AlterKind::DropColumns { names }
1098 }
1099 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
1100 options: options
1101 .table_options
1102 .iter()
1103 .map(TryFrom::try_from)
1104 .collect::<Result<Vec<_>>>()?,
1105 },
1106 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1107 keys: options
1108 .keys
1109 .iter()
1110 .map(|key| UnsetRegionOption::try_from(key.as_str()))
1111 .collect::<Result<Vec<_>>>()?,
1112 },
1113 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1114 options: vec![SetIndexOption::try_from(o)?],
1115 },
1116 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1117 options: vec![UnsetIndexOption::try_from(o)?],
1118 },
1119 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1120 options: o
1121 .set_indexes
1122 .into_iter()
1123 .map(SetIndexOption::try_from)
1124 .collect::<Result<Vec<_>>>()?,
1125 },
1126 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1127 options: o
1128 .unset_indexes
1129 .into_iter()
1130 .map(UnsetIndexOption::try_from)
1131 .collect::<Result<Vec<_>>>()?,
1132 },
1133 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1134 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1135 },
1136 alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1137 columns: x
1138 .set_defaults
1139 .into_iter()
1140 .map(|x| {
1141 Ok(SetDefault {
1142 name: x.column_name,
1143 default_constraint: x.default_constraint.clone(),
1144 })
1145 })
1146 .collect::<Result<Vec<_>>>()?,
1147 },
1148 alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1149 column_metadatas: x
1150 .column_defs
1151 .into_iter()
1152 .map(ColumnMetadata::try_from_column_def)
1153 .collect::<Result<Vec<_>>>()?,
1154 },
1155 };
1156
1157 Ok(alter_kind)
1158 }
1159}
1160
1161#[derive(Debug, PartialEq, Eq, Clone)]
1163pub struct AddColumn {
1164 pub column_metadata: ColumnMetadata,
1166 pub location: Option<AddColumnLocation>,
1169}
1170
1171impl AddColumn {
1172 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1177 ensure!(
1178 self.column_metadata.column_schema.is_nullable()
1179 || self
1180 .column_metadata
1181 .column_schema
1182 .default_constraint()
1183 .is_some(),
1184 InvalidRegionRequestSnafu {
1185 region_id: metadata.region_id,
1186 err: format!(
1187 "no default value for column {}",
1188 self.column_metadata.column_schema.name
1189 ),
1190 }
1191 );
1192
1193 if let Some(existing_column) =
1194 metadata.column_by_name(&self.column_metadata.column_schema.name)
1195 {
1196 ensure!(
1198 *existing_column == self.column_metadata,
1199 InvalidRegionRequestSnafu {
1200 region_id: metadata.region_id,
1201 err: format!(
1202 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1203 self.column_metadata.column_schema.name,
1204 existing_column,
1205 self.column_metadata,
1206 ),
1207 }
1208 );
1209 ensure!(
1210 self.location.is_none(),
1211 InvalidRegionRequestSnafu {
1212 region_id: metadata.region_id,
1213 err: format!(
1214 "column {} already exists, but location is specified",
1215 self.column_metadata.column_schema.name
1216 ),
1217 }
1218 );
1219 }
1220
1221 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1222 ensure!(
1224 existing_column.column_schema.name == self.column_metadata.column_schema.name,
1225 InvalidRegionRequestSnafu {
1226 region_id: metadata.region_id,
1227 err: format!(
1228 "column id {} already exists with different name {}",
1229 self.column_metadata.column_id, existing_column.column_schema.name
1230 ),
1231 }
1232 );
1233 }
1234
1235 Ok(())
1236 }
1237
1238 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1240 debug_assert!(self.validate(metadata).is_ok());
1241 metadata
1242 .column_by_name(&self.column_metadata.column_schema.name)
1243 .is_none()
1244 }
1245}
1246
1247impl TryFrom<v1::region::AddColumn> for AddColumn {
1248 type Error = MetadataError;
1249
1250 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1251 let column_def = add_column
1252 .column_def
1253 .context(InvalidRawRegionRequestSnafu {
1254 err: "missing column_def in AddColumn",
1255 })?;
1256
1257 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1258 let location = add_column
1259 .location
1260 .map(AddColumnLocation::try_from)
1261 .transpose()?;
1262
1263 Ok(AddColumn {
1264 column_metadata,
1265 location,
1266 })
1267 }
1268}
1269
1270#[derive(Debug, PartialEq, Eq, Clone)]
1272pub enum AddColumnLocation {
1273 First,
1275 After {
1277 column_name: String,
1279 },
1280}
1281
1282impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1283 type Error = MetadataError;
1284
1285 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1286 let location_type = LocationType::try_from(location.location_type)
1287 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1288 let add_column_location = match location_type {
1289 LocationType::First => AddColumnLocation::First,
1290 LocationType::After => AddColumnLocation::After {
1291 column_name: location.after_column_name,
1292 },
1293 };
1294
1295 Ok(add_column_location)
1296 }
1297}
1298
1299#[derive(Debug, PartialEq, Eq, Clone)]
1301pub struct ModifyColumnType {
1302 pub column_name: String,
1304 pub target_type: ConcreteDataType,
1306}
1307
1308impl ModifyColumnType {
1309 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1311 let column_meta = metadata
1312 .column_by_name(&self.column_name)
1313 .with_context(|| InvalidRegionRequestSnafu {
1314 region_id: metadata.region_id,
1315 err: format!("column {} not found", self.column_name),
1316 })?;
1317
1318 ensure!(
1319 matches!(column_meta.semantic_type, SemanticType::Field),
1320 InvalidRegionRequestSnafu {
1321 region_id: metadata.region_id,
1322 err: "'timestamp' or 'tag' column cannot change type".to_string()
1323 }
1324 );
1325 ensure!(
1326 column_meta
1327 .column_schema
1328 .data_type
1329 .can_arrow_type_cast_to(&self.target_type),
1330 InvalidRegionRequestSnafu {
1331 region_id: metadata.region_id,
1332 err: format!(
1333 "column '{}' cannot be cast automatically to type '{}'",
1334 self.column_name, self.target_type
1335 ),
1336 }
1337 );
1338
1339 Ok(())
1340 }
1341
1342 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1344 debug_assert!(self.validate(metadata).is_ok());
1345 metadata.column_by_name(&self.column_name).is_some()
1346 }
1347}
1348
1349impl From<v1::ModifyColumnType> for ModifyColumnType {
1350 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1351 let target_type = ColumnDataTypeWrapper::new(
1352 modify_column_type.target_type(),
1353 modify_column_type.target_type_extension,
1354 )
1355 .into();
1356
1357 ModifyColumnType {
1358 column_name: modify_column_type.column_name,
1359 target_type,
1360 }
1361 }
1362}
1363
1364#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1365pub enum SetRegionOption {
1366 Ttl(Option<TimeToLive>),
1367 Twsc(String, String),
1369 Format(String),
1371 AppendMode(bool),
1373}
1374
1375impl TryFrom<&PbOption> for SetRegionOption {
1376 type Error = MetadataError;
1377
1378 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1379 let PbOption { key, value } = value;
1380 match key.as_str() {
1381 TTL_KEY => {
1382 let ttl = TimeToLive::from_humantime_or_str(value)
1383 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1384
1385 Ok(Self::Ttl(Some(ttl)))
1386 }
1387 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1388 Ok(Self::Twsc(key.clone(), value.clone()))
1389 }
1390 SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1391 APPEND_MODE_KEY => {
1392 let append_mode = value
1393 .parse::<bool>()
1394 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1395 Ok(Self::AppendMode(append_mode))
1396 }
1397 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1398 }
1399 }
1400}
1401
1402impl From<&UnsetRegionOption> for SetRegionOption {
1403 fn from(unset_option: &UnsetRegionOption) -> Self {
1404 match unset_option {
1405 UnsetRegionOption::TwcsTriggerFileNum => {
1406 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1407 }
1408 UnsetRegionOption::TwcsMaxOutputFileSize => {
1409 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1410 }
1411 UnsetRegionOption::TwcsTimeWindow => {
1412 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1413 }
1414 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1415 }
1416 }
1417}
1418
1419impl TryFrom<&str> for UnsetRegionOption {
1420 type Error = MetadataError;
1421
1422 fn try_from(key: &str) -> Result<Self> {
1423 match key.to_ascii_lowercase().as_str() {
1424 TTL_KEY => Ok(Self::Ttl),
1425 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1426 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1427 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1428 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1429 }
1430 }
1431}
1432
1433#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1434pub enum UnsetRegionOption {
1435 TwcsTriggerFileNum,
1436 TwcsMaxOutputFileSize,
1437 TwcsTimeWindow,
1438 Ttl,
1439}
1440
1441impl UnsetRegionOption {
1442 pub fn as_str(&self) -> &str {
1443 match self {
1444 Self::Ttl => TTL_KEY,
1445 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1446 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1447 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1448 }
1449 }
1450}
1451
1452impl Display for UnsetRegionOption {
1453 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1454 write!(f, "{}", self.as_str())
1455 }
1456}
1457
1458#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
1459pub enum RegionFlushReason {
1460 RegionMigration,
1462 Repartition,
1464 RemoteWalPrune,
1466 Closing,
1468 Downgrading,
1470}
1471
1472#[derive(Debug, Clone, Default)]
1473pub struct RegionFlushRequest {
1474 pub row_group_size: Option<usize>,
1475 pub reason: Option<RegionFlushReason>,
1476}
1477
1478#[derive(Debug)]
1479pub struct RegionCompactRequest {
1480 pub options: compact_request::Options,
1481 pub parallelism: Option<u32>,
1482}
1483
1484impl Default for RegionCompactRequest {
1485 fn default() -> Self {
1486 Self {
1487 options: compact_request::Options::Regular(Default::default()),
1489 parallelism: None,
1490 }
1491 }
1492}
1493
1494#[derive(Debug, Clone, Default)]
1495pub struct RegionBuildIndexRequest {}
1496
1497#[derive(Debug)]
1499pub enum RegionTruncateRequest {
1500 All,
1502 ByTimeRanges {
1503 time_ranges: Vec<(Timestamp, Timestamp)>,
1507 },
1508}
1509
1510#[derive(Debug, Clone, Copy, Default)]
1515pub struct RegionCatchupRequest {
1516 pub set_writable: bool,
1518 pub entry_id: Option<entry::Id>,
1521 pub metadata_entry_id: Option<entry::Id>,
1525 pub location_id: Option<u64>,
1527 pub checkpoint: Option<ReplayCheckpoint>,
1529}
1530
1531#[derive(Debug, Clone)]
1532pub struct RegionBulkInsertsRequest {
1533 pub region_id: RegionId,
1534 pub payload: DfRecordBatch,
1535 pub raw_data: ArrowIpc,
1536 pub partition_expr_version: Option<u64>,
1537 pub aligned_schema_version: Option<u64>,
1538}
1539
1540impl RegionBulkInsertsRequest {
1541 pub fn estimated_size(&self) -> usize {
1542 self.payload.get_array_memory_size()
1543 }
1544}
1545
1546#[derive(Debug, Clone, PartialEq, Eq)]
1552pub enum StagingPartitionDirective {
1553 UpdatePartitionExpr(String),
1554 RejectAllWrites,
1555}
1556
1557impl StagingPartitionDirective {
1558 pub fn partition_expr(&self) -> Option<&str> {
1560 match self {
1561 Self::UpdatePartitionExpr(expr) => Some(expr),
1562 Self::RejectAllWrites => None,
1563 }
1564 }
1565}
1566
1567#[derive(Debug, Clone)]
1568pub struct EnterStagingRequest {
1569 pub partition_directive: StagingPartitionDirective,
1571}
1572
1573impl EnterStagingRequest {
1574 pub fn with_partition_expr(partition_expr: String) -> Self {
1576 Self {
1577 partition_directive: StagingPartitionDirective::UpdatePartitionExpr(partition_expr),
1578 }
1579 }
1580}
1581
1582#[derive(Debug, Clone)]
1601pub struct ApplyStagingManifestRequest {
1602 pub partition_expr: String,
1604 pub central_region_id: RegionId,
1606 pub manifest_path: String,
1609}
1610
1611impl fmt::Display for RegionRequest {
1612 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1613 match self {
1614 RegionRequest::Put(_) => write!(f, "Put"),
1615 RegionRequest::Delete(_) => write!(f, "Delete"),
1616 RegionRequest::Create(_) => write!(f, "Create"),
1617 RegionRequest::Drop(_) => write!(f, "Drop"),
1618 RegionRequest::Open(_) => write!(f, "Open"),
1619 RegionRequest::Close(_) => write!(f, "Close"),
1620 RegionRequest::Alter(_) => write!(f, "Alter"),
1621 RegionRequest::Flush(_) => write!(f, "Flush"),
1622 RegionRequest::Compact(_) => write!(f, "Compact"),
1623 RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1624 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1625 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1626 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1627 RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1628 RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
1629 }
1630 }
1631}
1632
1633#[cfg(test)]
1634mod tests {
1635
1636 use api::v1::region::RegionColumnDef;
1637 use api::v1::{ColumnDataType, ColumnDef};
1638 use datatypes::prelude::ConcreteDataType;
1639 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1640
1641 use super::*;
1642 use crate::metadata::RegionMetadataBuilder;
1643
1644 #[test]
1645 fn test_from_proto_location() {
1646 let proto_location = v1::AddColumnLocation {
1647 location_type: LocationType::First as i32,
1648 after_column_name: String::default(),
1649 };
1650 let location = AddColumnLocation::try_from(proto_location).unwrap();
1651 assert_eq!(location, AddColumnLocation::First);
1652
1653 let proto_location = v1::AddColumnLocation {
1654 location_type: 10,
1655 after_column_name: String::default(),
1656 };
1657 AddColumnLocation::try_from(proto_location).unwrap_err();
1658
1659 let proto_location = v1::AddColumnLocation {
1660 location_type: LocationType::After as i32,
1661 after_column_name: "a".to_string(),
1662 };
1663 let location = AddColumnLocation::try_from(proto_location).unwrap();
1664 assert_eq!(
1665 location,
1666 AddColumnLocation::After {
1667 column_name: "a".to_string()
1668 }
1669 );
1670 }
1671
1672 #[test]
1673 fn test_from_none_proto_add_column() {
1674 AddColumn::try_from(v1::region::AddColumn {
1675 column_def: None,
1676 location: None,
1677 })
1678 .unwrap_err();
1679 }
1680
1681 #[test]
1682 fn test_from_proto_alter_request() {
1683 RegionAlterRequest::try_from(AlterRequest {
1684 region_id: 0,
1685 schema_version: 1,
1686 kind: None,
1687 })
1688 .unwrap_err();
1689
1690 let request = RegionAlterRequest::try_from(AlterRequest {
1691 region_id: 0,
1692 schema_version: 1,
1693 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1694 add_columns: vec![v1::region::AddColumn {
1695 column_def: Some(RegionColumnDef {
1696 column_def: Some(ColumnDef {
1697 name: "a".to_string(),
1698 data_type: ColumnDataType::String as i32,
1699 is_nullable: true,
1700 default_constraint: vec![],
1701 semantic_type: SemanticType::Field as i32,
1702 comment: String::new(),
1703 ..Default::default()
1704 }),
1705 column_id: 1,
1706 }),
1707 location: Some(v1::AddColumnLocation {
1708 location_type: LocationType::First as i32,
1709 after_column_name: String::default(),
1710 }),
1711 }],
1712 })),
1713 })
1714 .unwrap();
1715
1716 assert_eq!(
1717 request,
1718 RegionAlterRequest {
1719 kind: AlterKind::AddColumns {
1720 columns: vec![AddColumn {
1721 column_metadata: ColumnMetadata {
1722 column_schema: ColumnSchema::new(
1723 "a",
1724 ConcreteDataType::string_datatype(),
1725 true,
1726 ),
1727 semantic_type: SemanticType::Field,
1728 column_id: 1,
1729 },
1730 location: Some(AddColumnLocation::First),
1731 }]
1732 },
1733 }
1734 );
1735 }
1736
1737 fn new_metadata() -> RegionMetadata {
1740 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1741 builder
1742 .push_column_metadata(ColumnMetadata {
1743 column_schema: ColumnSchema::new(
1744 "ts",
1745 ConcreteDataType::timestamp_millisecond_datatype(),
1746 false,
1747 ),
1748 semantic_type: SemanticType::Timestamp,
1749 column_id: 1,
1750 })
1751 .push_column_metadata(ColumnMetadata {
1752 column_schema: ColumnSchema::new(
1753 "tag_0",
1754 ConcreteDataType::string_datatype(),
1755 true,
1756 ),
1757 semantic_type: SemanticType::Tag,
1758 column_id: 2,
1759 })
1760 .push_column_metadata(ColumnMetadata {
1761 column_schema: ColumnSchema::new(
1762 "field_0",
1763 ConcreteDataType::string_datatype(),
1764 true,
1765 ),
1766 semantic_type: SemanticType::Field,
1767 column_id: 3,
1768 })
1769 .push_column_metadata(ColumnMetadata {
1770 column_schema: ColumnSchema::new(
1771 "field_1",
1772 ConcreteDataType::boolean_datatype(),
1773 true,
1774 ),
1775 semantic_type: SemanticType::Field,
1776 column_id: 4,
1777 })
1778 .primary_key(vec![2]);
1779 builder.build().unwrap()
1780 }
1781
1782 #[test]
1783 fn test_add_column_validate() {
1784 let metadata = new_metadata();
1785 let add_column = AddColumn {
1786 column_metadata: ColumnMetadata {
1787 column_schema: ColumnSchema::new(
1788 "tag_1",
1789 ConcreteDataType::string_datatype(),
1790 true,
1791 ),
1792 semantic_type: SemanticType::Tag,
1793 column_id: 5,
1794 },
1795 location: None,
1796 };
1797 add_column.validate(&metadata).unwrap();
1798 assert!(add_column.need_alter(&metadata));
1799
1800 AddColumn {
1802 column_metadata: ColumnMetadata {
1803 column_schema: ColumnSchema::new(
1804 "tag_1",
1805 ConcreteDataType::string_datatype(),
1806 false,
1807 ),
1808 semantic_type: SemanticType::Tag,
1809 column_id: 5,
1810 },
1811 location: None,
1812 }
1813 .validate(&metadata)
1814 .unwrap_err();
1815
1816 let add_column = AddColumn {
1818 column_metadata: ColumnMetadata {
1819 column_schema: ColumnSchema::new(
1820 "tag_0",
1821 ConcreteDataType::string_datatype(),
1822 true,
1823 ),
1824 semantic_type: SemanticType::Tag,
1825 column_id: 2,
1826 },
1827 location: None,
1828 };
1829 add_column.validate(&metadata).unwrap();
1830 assert!(!add_column.need_alter(&metadata));
1831 }
1832
1833 #[test]
1834 fn test_add_duplicate_columns() {
1835 let kind = AlterKind::AddColumns {
1836 columns: vec![
1837 AddColumn {
1838 column_metadata: ColumnMetadata {
1839 column_schema: ColumnSchema::new(
1840 "tag_1",
1841 ConcreteDataType::string_datatype(),
1842 true,
1843 ),
1844 semantic_type: SemanticType::Tag,
1845 column_id: 5,
1846 },
1847 location: None,
1848 },
1849 AddColumn {
1850 column_metadata: ColumnMetadata {
1851 column_schema: ColumnSchema::new(
1852 "tag_1",
1853 ConcreteDataType::string_datatype(),
1854 true,
1855 ),
1856 semantic_type: SemanticType::Field,
1857 column_id: 6,
1858 },
1859 location: None,
1860 },
1861 ],
1862 };
1863 let metadata = new_metadata();
1864 kind.validate(&metadata).unwrap();
1865 assert!(kind.need_alter(&metadata));
1866 }
1867
1868 #[test]
1869 fn test_add_existing_column_different_metadata() {
1870 let metadata = new_metadata();
1871
1872 let kind = AlterKind::AddColumns {
1874 columns: vec![AddColumn {
1875 column_metadata: ColumnMetadata {
1876 column_schema: ColumnSchema::new(
1877 "tag_0",
1878 ConcreteDataType::string_datatype(),
1879 true,
1880 ),
1881 semantic_type: SemanticType::Tag,
1882 column_id: 4,
1883 },
1884 location: None,
1885 }],
1886 };
1887 kind.validate(&metadata).unwrap_err();
1888
1889 let kind = AlterKind::AddColumns {
1891 columns: vec![AddColumn {
1892 column_metadata: ColumnMetadata {
1893 column_schema: ColumnSchema::new(
1894 "tag_0",
1895 ConcreteDataType::int64_datatype(),
1896 true,
1897 ),
1898 semantic_type: SemanticType::Tag,
1899 column_id: 2,
1900 },
1901 location: None,
1902 }],
1903 };
1904 kind.validate(&metadata).unwrap_err();
1905
1906 let kind = AlterKind::AddColumns {
1908 columns: vec![AddColumn {
1909 column_metadata: ColumnMetadata {
1910 column_schema: ColumnSchema::new(
1911 "tag_1",
1912 ConcreteDataType::string_datatype(),
1913 true,
1914 ),
1915 semantic_type: SemanticType::Tag,
1916 column_id: 2,
1917 },
1918 location: None,
1919 }],
1920 };
1921 kind.validate(&metadata).unwrap_err();
1922 }
1923
1924 #[test]
1925 fn test_add_existing_column_with_location() {
1926 let metadata = new_metadata();
1927 let kind = AlterKind::AddColumns {
1928 columns: vec![AddColumn {
1929 column_metadata: ColumnMetadata {
1930 column_schema: ColumnSchema::new(
1931 "tag_0",
1932 ConcreteDataType::string_datatype(),
1933 true,
1934 ),
1935 semantic_type: SemanticType::Tag,
1936 column_id: 2,
1937 },
1938 location: Some(AddColumnLocation::First),
1939 }],
1940 };
1941 kind.validate(&metadata).unwrap_err();
1942 }
1943
1944 #[test]
1945 fn test_validate_drop_column() {
1946 let metadata = new_metadata();
1947 let kind = AlterKind::DropColumns {
1948 names: vec!["xxxx".to_string()],
1949 };
1950 kind.validate(&metadata).unwrap();
1951 assert!(!kind.need_alter(&metadata));
1952
1953 AlterKind::DropColumns {
1954 names: vec!["tag_0".to_string()],
1955 }
1956 .validate(&metadata)
1957 .unwrap_err();
1958
1959 let kind = AlterKind::DropColumns {
1960 names: vec!["field_0".to_string()],
1961 };
1962 kind.validate(&metadata).unwrap();
1963 assert!(kind.need_alter(&metadata));
1964 }
1965
1966 #[test]
1967 fn test_validate_modify_column_type() {
1968 let metadata = new_metadata();
1969 AlterKind::ModifyColumnTypes {
1970 columns: vec![ModifyColumnType {
1971 column_name: "xxxx".to_string(),
1972 target_type: ConcreteDataType::string_datatype(),
1973 }],
1974 }
1975 .validate(&metadata)
1976 .unwrap_err();
1977
1978 AlterKind::ModifyColumnTypes {
1979 columns: vec![ModifyColumnType {
1980 column_name: "field_1".to_string(),
1981 target_type: ConcreteDataType::date_datatype(),
1982 }],
1983 }
1984 .validate(&metadata)
1985 .unwrap_err();
1986
1987 AlterKind::ModifyColumnTypes {
1988 columns: vec![ModifyColumnType {
1989 column_name: "ts".to_string(),
1990 target_type: ConcreteDataType::date_datatype(),
1991 }],
1992 }
1993 .validate(&metadata)
1994 .unwrap_err();
1995
1996 AlterKind::ModifyColumnTypes {
1997 columns: vec![ModifyColumnType {
1998 column_name: "tag_0".to_string(),
1999 target_type: ConcreteDataType::date_datatype(),
2000 }],
2001 }
2002 .validate(&metadata)
2003 .unwrap_err();
2004
2005 let kind = AlterKind::ModifyColumnTypes {
2006 columns: vec![ModifyColumnType {
2007 column_name: "field_0".to_string(),
2008 target_type: ConcreteDataType::int32_datatype(),
2009 }],
2010 };
2011 kind.validate(&metadata).unwrap();
2012 assert!(kind.need_alter(&metadata));
2013 }
2014
2015 #[test]
2016 fn test_validate_add_columns() {
2017 let kind = AlterKind::AddColumns {
2018 columns: vec![
2019 AddColumn {
2020 column_metadata: ColumnMetadata {
2021 column_schema: ColumnSchema::new(
2022 "tag_1",
2023 ConcreteDataType::string_datatype(),
2024 true,
2025 ),
2026 semantic_type: SemanticType::Tag,
2027 column_id: 5,
2028 },
2029 location: None,
2030 },
2031 AddColumn {
2032 column_metadata: ColumnMetadata {
2033 column_schema: ColumnSchema::new(
2034 "field_2",
2035 ConcreteDataType::string_datatype(),
2036 true,
2037 ),
2038 semantic_type: SemanticType::Field,
2039 column_id: 6,
2040 },
2041 location: None,
2042 },
2043 ],
2044 };
2045 let request = RegionAlterRequest { kind };
2046 let mut metadata = new_metadata();
2047 metadata.schema_version = 1;
2048 request.validate(&metadata).unwrap();
2049 }
2050
2051 #[test]
2052 fn test_validate_create_region() {
2053 let column_metadatas = vec![
2054 ColumnMetadata {
2055 column_schema: ColumnSchema::new(
2056 "ts",
2057 ConcreteDataType::timestamp_millisecond_datatype(),
2058 false,
2059 ),
2060 semantic_type: SemanticType::Timestamp,
2061 column_id: 1,
2062 },
2063 ColumnMetadata {
2064 column_schema: ColumnSchema::new(
2065 "tag_0",
2066 ConcreteDataType::string_datatype(),
2067 true,
2068 ),
2069 semantic_type: SemanticType::Tag,
2070 column_id: 2,
2071 },
2072 ColumnMetadata {
2073 column_schema: ColumnSchema::new(
2074 "field_0",
2075 ConcreteDataType::string_datatype(),
2076 true,
2077 ),
2078 semantic_type: SemanticType::Field,
2079 column_id: 3,
2080 },
2081 ];
2082 let create = RegionCreateRequest {
2083 engine: "mito".to_string(),
2084 column_metadatas,
2085 primary_key: vec![3, 4],
2086 options: HashMap::new(),
2087 table_dir: "path".to_string(),
2088 path_type: PathType::Bare,
2089 partition_expr_json: Some("".to_string()),
2090 requirements: Default::default(),
2091 };
2092
2093 assert!(create.validate().is_err());
2094 }
2095
2096 #[test]
2097 fn test_parse_create_region_requirements_defaults_to_empty() {
2098 let create = CreateRequest {
2099 region_id: RegionId::new(42, 0).as_u64(),
2100 engine: "mito".to_string(),
2101 column_defs: vec![],
2102 primary_key: vec![],
2103 path: "test".to_string(),
2104 options: HashMap::new(),
2105 partition: None,
2106 requirements: None,
2107 };
2108
2109 let requests =
2110 RegionRequest::try_from_request_body(region_request::Body::Create(create)).unwrap();
2111 let RegionRequest::Create(request) = &requests[0].1 else {
2112 unreachable!()
2113 };
2114
2115 assert_eq!(request.requirements, RegionRequirements::empty());
2116 }
2117
2118 #[test]
2119 fn test_parse_create_region_requirements_from_proto() {
2120 let create = CreateRequest {
2121 region_id: RegionId::new(42, 0).as_u64(),
2122 engine: "mito".to_string(),
2123 column_defs: vec![],
2124 primary_key: vec![],
2125 path: "test".to_string(),
2126 options: HashMap::new(),
2127 partition: None,
2128 requirements: Some(api::v1::region::RegionRequirements {
2129 object_storage: true,
2130 }),
2131 };
2132
2133 let requests =
2134 RegionRequest::try_from_request_body(region_request::Body::Create(create)).unwrap();
2135 let RegionRequest::Create(request) = &requests[0].1 else {
2136 unreachable!()
2137 };
2138
2139 assert_eq!(request.requirements, RegionRequirements::object_storage());
2140 }
2141
2142 #[test]
2143 fn test_validate_modify_column_fulltext_options() {
2144 let kind = AlterKind::SetIndexes {
2145 options: vec![SetIndexOption::Fulltext {
2146 column_name: "tag_0".to_string(),
2147 options: FulltextOptions::new_unchecked(
2148 true,
2149 FulltextAnalyzer::Chinese,
2150 false,
2151 FulltextBackend::Bloom,
2152 1000,
2153 0.01,
2154 ),
2155 }],
2156 };
2157 let request = RegionAlterRequest { kind };
2158 let mut metadata = new_metadata();
2159 metadata.schema_version = 1;
2160 request.validate(&metadata).unwrap();
2161
2162 let kind = AlterKind::UnsetIndexes {
2163 options: vec![UnsetIndexOption::Fulltext {
2164 column_name: "tag_0".to_string(),
2165 }],
2166 };
2167 let request = RegionAlterRequest { kind };
2168 let mut metadata = new_metadata();
2169 metadata.schema_version = 1;
2170 request.validate(&metadata).unwrap();
2171 }
2172
2173 #[test]
2174 fn test_validate_sync_columns() {
2175 let metadata = new_metadata();
2176 let kind = AlterKind::SyncColumns {
2177 column_metadatas: vec![
2178 ColumnMetadata {
2179 column_schema: ColumnSchema::new(
2180 "tag_1",
2181 ConcreteDataType::string_datatype(),
2182 true,
2183 ),
2184 semantic_type: SemanticType::Tag,
2185 column_id: 5,
2186 },
2187 ColumnMetadata {
2188 column_schema: ColumnSchema::new(
2189 "field_2",
2190 ConcreteDataType::string_datatype(),
2191 true,
2192 ),
2193 semantic_type: SemanticType::Field,
2194 column_id: 6,
2195 },
2196 ],
2197 };
2198 let err = kind.validate(&metadata).unwrap_err();
2199 assert!(err.to_string().contains("not a primary key"));
2200
2201 let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
2203 let ts_column = column_metadatas_with_different_ts_column
2204 .iter_mut()
2205 .find(|c| c.semantic_type == SemanticType::Timestamp)
2206 .unwrap();
2207 ts_column.column_schema.name = "ts1".to_string();
2208
2209 let kind = AlterKind::SyncColumns {
2210 column_metadatas: column_metadatas_with_different_ts_column,
2211 };
2212 let err = kind.validate(&metadata).unwrap_err();
2213 assert!(
2214 err.to_string()
2215 .contains("timestamp column ts has different id")
2216 );
2217
2218 let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
2220 let pk_column = column_metadatas_with_different_pk_column
2221 .iter_mut()
2222 .find(|c| c.column_schema.name == "tag_0")
2223 .unwrap();
2224 pk_column.column_id = 100;
2225 let kind = AlterKind::SyncColumns {
2226 column_metadatas: column_metadatas_with_different_pk_column,
2227 };
2228 let err = kind.validate(&metadata).unwrap_err();
2229 assert!(
2230 err.to_string()
2231 .contains("column with same name tag_0 has different id")
2232 );
2233
2234 let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2236 column_metadatas_with_new_field_column.push(ColumnMetadata {
2237 column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2238 semantic_type: SemanticType::Field,
2239 column_id: 4,
2240 });
2241 let kind = AlterKind::SyncColumns {
2242 column_metadatas: column_metadatas_with_new_field_column,
2243 };
2244 kind.validate(&metadata).unwrap();
2245 }
2246
2247 #[test]
2248 fn test_cast_path_type_to_primitive() {
2249 assert_eq!(PathType::Bare as u8, 0);
2250 assert_eq!(PathType::Data as u8, 1);
2251 assert_eq!(PathType::Metadata as u8, 2);
2252 assert_eq!(
2253 PathType::try_from(PathType::Bare as u8).unwrap(),
2254 PathType::Bare
2255 );
2256 assert_eq!(
2257 PathType::try_from(PathType::Data as u8).unwrap(),
2258 PathType::Data
2259 );
2260 assert_eq!(
2261 PathType::try_from(PathType::Metadata as u8).unwrap(),
2262 PathType::Metadata
2263 );
2264 }
2265}