1use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21 as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25 AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26 CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27 FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28 region_request, truncate_request,
29};
30use api::v1::{
31 self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32 SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47 ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48 InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49 InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50 RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55 SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
56};
57use crate::path_utils::table_dir;
58use crate::storage::{ColumnId, RegionId, ScanRequest};
59
60#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
62#[repr(u8)]
63pub enum PathType {
64 Bare,
68 Data,
72 Metadata,
76}
77
78#[derive(Debug, IntoStaticStr)]
79pub enum BatchRegionDdlRequest {
80 Create(Vec<(RegionId, RegionCreateRequest)>),
81 Drop(Vec<(RegionId, RegionDropRequest)>),
82 Alter(Vec<(RegionId, RegionAlterRequest)>),
83}
84
85impl BatchRegionDdlRequest {
86 pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
88 match body {
89 region_request::Body::Creates(creates) => {
90 let requests = creates
91 .requests
92 .into_iter()
93 .map(parse_region_create)
94 .collect::<Result<Vec<_>>>()?;
95 Ok(Some(Self::Create(requests)))
96 }
97 region_request::Body::Drops(drops) => {
98 let requests = drops
99 .requests
100 .into_iter()
101 .map(parse_region_drop)
102 .collect::<Result<Vec<_>>>()?;
103 Ok(Some(Self::Drop(requests)))
104 }
105 region_request::Body::Alters(alters) => {
106 let requests = alters
107 .requests
108 .into_iter()
109 .map(parse_region_alter)
110 .collect::<Result<Vec<_>>>()?;
111 Ok(Some(Self::Alter(requests)))
112 }
113 _ => Ok(None),
114 }
115 }
116
117 pub fn request_type(&self) -> &'static str {
118 self.into()
119 }
120
121 pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
122 match self {
123 Self::Create(requests) => requests
124 .into_iter()
125 .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
126 .collect(),
127 Self::Drop(requests) => requests
128 .into_iter()
129 .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
130 .collect(),
131 Self::Alter(requests) => requests
132 .into_iter()
133 .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
134 .collect(),
135 }
136 }
137}
138
139#[derive(Debug, IntoStaticStr)]
140pub enum RegionRequest {
141 Put(RegionPutRequest),
142 Delete(RegionDeleteRequest),
143 Create(RegionCreateRequest),
144 Drop(RegionDropRequest),
145 Open(RegionOpenRequest),
146 Close(RegionCloseRequest),
147 Alter(RegionAlterRequest),
148 Flush(RegionFlushRequest),
149 Compact(RegionCompactRequest),
150 BuildIndex(RegionBuildIndexRequest),
151 Truncate(RegionTruncateRequest),
152 Catchup(RegionCatchupRequest),
153 BulkInserts(RegionBulkInsertsRequest),
154 EnterStaging(EnterStagingRequest),
155 ApplyStagingManifest(ApplyStagingManifestRequest),
156}
157
158impl RegionRequest {
159 pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
162 match body {
163 region_request::Body::Inserts(inserts) => make_region_puts(inserts),
164 region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
165 region_request::Body::Create(create) => make_region_create(create),
166 region_request::Body::Drop(drop) => make_region_drop(drop),
167 region_request::Body::Open(open) => make_region_open(open),
168 region_request::Body::Close(close) => make_region_close(close),
169 region_request::Body::Alter(alter) => make_region_alter(alter),
170 region_request::Body::Flush(flush) => make_region_flush(flush),
171 region_request::Body::Compact(compact) => make_region_compact(compact),
172 region_request::Body::BuildIndex(index) => make_region_build_index(index),
173 region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
174 region_request::Body::Creates(creates) => make_region_creates(creates),
175 region_request::Body::Drops(drops) => make_region_drops(drops),
176 region_request::Body::Alters(alters) => make_region_alters(alters),
177 region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
178 region_request::Body::Sync(_) => UnexpectedSnafu {
179 reason: "Sync request should be handled separately by RegionServer",
180 }
181 .fail(),
182 region_request::Body::ListMetadata(_) => UnexpectedSnafu {
183 reason: "ListMetadata request should be handled separately by RegionServer",
184 }
185 .fail(),
186 region_request::Body::ApplyStagingManifest(apply) => {
187 make_region_apply_staging_manifest(apply)
188 }
189 }
190 }
191
192 pub fn request_type(&self) -> &'static str {
194 self.into()
195 }
196}
197
198fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
199 let requests = inserts
200 .requests
201 .into_iter()
202 .filter_map(|r| {
203 let region_id = r.region_id.into();
204 r.rows.map(|rows| {
205 (
206 region_id,
207 RegionRequest::Put(RegionPutRequest {
208 rows,
209 hint: None,
210 partition_expr_version: r.partition_expr_version.map(|v| v.value),
211 }),
212 )
213 })
214 })
215 .collect();
216 Ok(requests)
217}
218
219fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
220 let requests = deletes
221 .requests
222 .into_iter()
223 .filter_map(|r| {
224 let region_id = r.region_id.into();
225 r.rows.map(|rows| {
226 (
227 region_id,
228 RegionRequest::Delete(RegionDeleteRequest {
229 rows,
230 hint: None,
231 partition_expr_version: r.partition_expr_version.map(|v| v.value),
232 }),
233 )
234 })
235 })
236 .collect();
237 Ok(requests)
238}
239
240fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
241 let column_metadatas = create
242 .column_defs
243 .into_iter()
244 .map(ColumnMetadata::try_from_column_def)
245 .collect::<Result<Vec<_>>>()?;
246 let region_id = RegionId::from(create.region_id);
247 let table_dir = table_dir(&create.path, region_id.table_id());
248 let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
249 Ok((
250 region_id,
251 RegionCreateRequest {
252 engine: create.engine,
253 column_metadatas,
254 primary_key: create.primary_key,
255 options: create.options,
256 table_dir,
257 path_type: PathType::Bare,
258 partition_expr_json,
259 },
260 ))
261}
262
263fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
264 let (region_id, request) = parse_region_create(create)?;
265 Ok(vec![(region_id, RegionRequest::Create(request))])
266}
267
268fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
269 let mut requests = Vec::with_capacity(creates.requests.len());
270 for create in creates.requests {
271 requests.extend(make_region_create(create)?);
272 }
273 Ok(requests)
274}
275
276fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
277 let region_id = drop.region_id.into();
278 Ok((
279 region_id,
280 RegionDropRequest {
281 fast_path: drop.fast_path,
282 force: drop.force,
283 partial_drop: drop.partial_drop,
284 },
285 ))
286}
287
288fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
289 let (region_id, request) = parse_region_drop(drop)?;
290 Ok(vec![(region_id, RegionRequest::Drop(request))])
291}
292
293fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
294 let mut requests = Vec::with_capacity(drops.requests.len());
295 for drop in drops.requests {
296 requests.extend(make_region_drop(drop)?);
297 }
298 Ok(requests)
299}
300
301fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
302 let region_id = RegionId::from(open.region_id);
303 let table_dir = table_dir(&open.path, region_id.table_id());
304 Ok(vec![(
305 region_id,
306 RegionRequest::Open(RegionOpenRequest {
307 engine: open.engine,
308 table_dir,
309 path_type: PathType::Bare,
310 options: open.options,
311 skip_wal_replay: false,
312 checkpoint: None,
313 }),
314 )])
315}
316
317fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
318 let region_id = close.region_id.into();
319 Ok(vec![(
320 region_id,
321 RegionRequest::Close(RegionCloseRequest {}),
322 )])
323}
324
325fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
326 let region_id = alter.region_id.into();
327 let request = RegionAlterRequest::try_from(alter)?;
328 Ok((region_id, request))
329}
330
331fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
332 let (region_id, request) = parse_region_alter(alter)?;
333 Ok(vec![(region_id, RegionRequest::Alter(request))])
334}
335
336fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
337 let mut requests = Vec::with_capacity(alters.requests.len());
338 for alter in alters.requests {
339 requests.extend(make_region_alter(alter)?);
340 }
341 Ok(requests)
342}
343
344fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
345 let region_id = flush.region_id.into();
346 Ok(vec![(
347 region_id,
348 RegionRequest::Flush(RegionFlushRequest {
349 row_group_size: None,
350 }),
351 )])
352}
353
354fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
355 let region_id = compact.region_id.into();
356 let options = compact
357 .options
358 .unwrap_or(compact_request::Options::Regular(Default::default()));
359 let parallelism = if compact.parallelism == 0 {
361 None
362 } else {
363 Some(compact.parallelism)
364 };
365 Ok(vec![(
366 region_id,
367 RegionRequest::Compact(RegionCompactRequest {
368 options,
369 parallelism,
370 }),
371 )])
372}
373
374fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
375 let region_id = index.region_id.into();
376 Ok(vec![(
377 region_id,
378 RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
379 )])
380}
381
382fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
383 let region_id = truncate.region_id.into();
384 match truncate.kind {
385 None => InvalidRawRegionRequestSnafu {
386 err: "missing kind in TruncateRequest".to_string(),
387 }
388 .fail(),
389 Some(truncate_request::Kind::All(_)) => Ok(vec![(
390 region_id,
391 RegionRequest::Truncate(RegionTruncateRequest::All),
392 )]),
393 Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
394 let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
395
396 Ok(vec![(
397 region_id,
398 RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
399 )])
400 }
401 }
402}
403
404fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
406 let region_id = request.region_id.into();
407 let partition_expr_version = request.partition_expr_version.map(|v| v.value);
408 let Some(Body::ArrowIpc(request)) = request.body else {
409 return Ok(vec![]);
410 };
411
412 let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
413 .with_label_values(&["decode"])
414 .start_timer();
415 let mut decoder =
416 FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
417 let payload = decoder
418 .try_decode_record_batch(&request.data_header, &request.payload)
419 .context(FlightCodecSnafu)?;
420 decoder_timer.observe_duration();
421 Ok(vec![(
422 region_id,
423 RegionRequest::BulkInserts(RegionBulkInsertsRequest {
424 region_id,
425 payload,
426 raw_data: request,
427 partition_expr_version,
428 }),
429 )])
430}
431
432fn make_region_apply_staging_manifest(
433 api::v1::region::ApplyStagingManifestRequest {
434 region_id,
435 partition_expr,
436 central_region_id,
437 manifest_path,
438 }: api::v1::region::ApplyStagingManifestRequest,
439) -> Result<Vec<(RegionId, RegionRequest)>> {
440 let region_id = region_id.into();
441 Ok(vec![(
442 region_id,
443 RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
444 partition_expr,
445 central_region_id: central_region_id.into(),
446 manifest_path,
447 }),
448 )])
449}
450
451#[derive(Debug)]
453pub struct RegionPutRequest {
454 pub rows: Rows,
456 pub hint: Option<WriteHint>,
458 pub partition_expr_version: Option<u64>,
460}
461
462#[derive(Debug)]
463pub struct RegionReadRequest {
464 pub request: ScanRequest,
465}
466
467#[derive(Debug)]
469pub struct RegionDeleteRequest {
470 pub rows: Rows,
474 pub hint: Option<WriteHint>,
476 pub partition_expr_version: Option<u64>,
478}
479
480#[derive(Debug, Clone)]
481pub struct RegionCreateRequest {
482 pub engine: String,
484 pub column_metadatas: Vec<ColumnMetadata>,
486 pub primary_key: Vec<ColumnId>,
488 pub options: HashMap<String, String>,
490 pub table_dir: String,
492 pub path_type: PathType,
494 pub partition_expr_json: Option<String>,
497}
498
499impl RegionCreateRequest {
500 pub fn validate(&self) -> Result<()> {
502 ensure!(
504 self.column_metadatas
505 .iter()
506 .any(|x| x.semantic_type == SemanticType::Timestamp),
507 InvalidRegionRequestSnafu {
508 region_id: RegionId::new(0, 0),
509 err: "missing timestamp column in create region request".to_string(),
510 }
511 );
512
513 let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
515 for (i, c) in self.column_metadatas.iter().enumerate() {
516 if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
517 return InvalidRegionRequestSnafu {
518 region_id: RegionId::new(0, 0),
519 err: format!(
520 "duplicate column id {} (at position {} and {}) in create region request",
521 c.column_id, previous, i
522 ),
523 }
524 .fail();
525 }
526 }
527
528 for column_id in &self.primary_key {
530 ensure!(
531 column_id_to_indices.contains_key(column_id),
532 InvalidRegionRequestSnafu {
533 region_id: RegionId::new(0, 0),
534 err: format!(
535 "missing primary key column {} in create region request",
536 column_id
537 ),
538 }
539 );
540 }
541
542 Ok(())
543 }
544
545 pub fn is_physical_table(&self) -> bool {
547 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
548 }
549}
550
551#[derive(Debug, Clone)]
552pub struct RegionDropRequest {
553 pub fast_path: bool,
556
557 pub force: bool,
560
561 pub partial_drop: bool,
564}
565
566#[derive(Debug, Clone)]
568pub struct RegionOpenRequest {
569 pub engine: String,
571 pub table_dir: String,
573 pub path_type: PathType,
575 pub options: HashMap<String, String>,
577 pub skip_wal_replay: bool,
579 pub checkpoint: Option<ReplayCheckpoint>,
581}
582
583#[derive(Debug, Clone, Copy, PartialEq, Eq)]
584pub struct ReplayCheckpoint {
585 pub entry_id: u64,
586 pub metadata_entry_id: Option<u64>,
587}
588
589impl RegionOpenRequest {
590 pub fn is_physical_table(&self) -> bool {
592 self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
593 }
594}
595
596#[derive(Debug)]
598pub struct RegionCloseRequest {}
599
600#[derive(Debug, PartialEq, Eq, Clone)]
602pub struct RegionAlterRequest {
603 pub kind: AlterKind,
605}
606
607impl RegionAlterRequest {
608 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
610 self.kind.validate(metadata)?;
611
612 Ok(())
613 }
614
615 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
619 debug_assert!(self.validate(metadata).is_ok());
620 self.kind.need_alter(metadata)
621 }
622}
623
624impl TryFrom<AlterRequest> for RegionAlterRequest {
625 type Error = MetadataError;
626
627 fn try_from(value: AlterRequest) -> Result<Self> {
628 let kind = value.kind.context(InvalidRawRegionRequestSnafu {
629 err: "missing kind in AlterRequest",
630 })?;
631
632 let kind = AlterKind::try_from(kind)?;
633 Ok(RegionAlterRequest { kind })
634 }
635}
636
637#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
639pub enum AlterKind {
640 AddColumns {
642 columns: Vec<AddColumn>,
644 },
645 DropColumns {
647 names: Vec<String>,
649 },
650 ModifyColumnTypes {
652 columns: Vec<ModifyColumnType>,
654 },
655 SetRegionOptions { options: Vec<SetRegionOption> },
657 UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
659 SetIndexes { options: Vec<SetIndexOption> },
661 UnsetIndexes { options: Vec<UnsetIndexOption> },
663 DropDefaults {
665 names: Vec<String>,
667 },
668 SetDefaults {
670 columns: Vec<SetDefault>,
672 },
673 SyncColumns {
675 column_metadatas: Vec<ColumnMetadata>,
676 },
677}
678#[derive(Debug, PartialEq, Eq, Clone)]
679pub struct SetDefault {
680 pub name: String,
681 pub default_constraint: Vec<u8>,
682}
683
684#[derive(Debug, PartialEq, Eq, Clone)]
685pub enum SetIndexOption {
686 Fulltext {
687 column_name: String,
688 options: FulltextOptions,
689 },
690 Inverted {
691 column_name: String,
692 },
693 Skipping {
694 column_name: String,
695 options: SkippingIndexOptions,
696 },
697}
698
699impl SetIndexOption {
700 pub fn column_name(&self) -> &String {
702 match self {
703 SetIndexOption::Fulltext { column_name, .. } => column_name,
704 SetIndexOption::Inverted { column_name } => column_name,
705 SetIndexOption::Skipping { column_name, .. } => column_name,
706 }
707 }
708
709 pub fn is_fulltext(&self) -> bool {
711 match self {
712 SetIndexOption::Fulltext { .. } => true,
713 SetIndexOption::Inverted { .. } => false,
714 SetIndexOption::Skipping { .. } => false,
715 }
716 }
717}
718
719impl TryFrom<v1::SetIndex> for SetIndexOption {
720 type Error = MetadataError;
721
722 fn try_from(value: v1::SetIndex) -> Result<Self> {
723 let option = value.options.context(InvalidRawRegionRequestSnafu {
724 err: "missing options in SetIndex",
725 })?;
726
727 let opt = match option {
728 v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
729 column_name: x.column_name.clone(),
730 options: FulltextOptions::new(
731 x.enable,
732 as_fulltext_option_analyzer(
733 Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
734 ),
735 x.case_sensitive,
736 as_fulltext_option_backend(
737 PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
738 ),
739 x.granularity as u32,
740 x.false_positive_rate,
741 )
742 .context(InvalidIndexOptionSnafu)?,
743 },
744 v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
745 column_name: i.column_name,
746 },
747 v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
748 column_name: s.column_name,
749 options: SkippingIndexOptions::new(
750 s.granularity as u32,
751 s.false_positive_rate,
752 as_skipping_index_type(
753 PbSkippingIndexType::try_from(s.skipping_index_type)
754 .context(DecodeProtoSnafu)?,
755 ),
756 )
757 .context(InvalidIndexOptionSnafu)?,
758 },
759 };
760
761 Ok(opt)
762 }
763}
764
765#[derive(Debug, PartialEq, Eq, Clone)]
766pub enum UnsetIndexOption {
767 Fulltext { column_name: String },
768 Inverted { column_name: String },
769 Skipping { column_name: String },
770}
771
772impl UnsetIndexOption {
773 pub fn column_name(&self) -> &String {
774 match self {
775 UnsetIndexOption::Fulltext { column_name } => column_name,
776 UnsetIndexOption::Inverted { column_name } => column_name,
777 UnsetIndexOption::Skipping { column_name } => column_name,
778 }
779 }
780
781 pub fn is_fulltext(&self) -> bool {
782 match self {
783 UnsetIndexOption::Fulltext { .. } => true,
784 UnsetIndexOption::Inverted { .. } => false,
785 UnsetIndexOption::Skipping { .. } => false,
786 }
787 }
788}
789
790impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
791 type Error = MetadataError;
792
793 fn try_from(value: v1::UnsetIndex) -> Result<Self> {
794 let option = value.options.context(InvalidRawRegionRequestSnafu {
795 err: "missing options in UnsetIndex",
796 })?;
797
798 let opt = match option {
799 v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
800 column_name: f.column_name,
801 },
802 v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
803 column_name: i.column_name,
804 },
805 v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
806 column_name: s.column_name,
807 },
808 };
809
810 Ok(opt)
811 }
812}
813
814impl AlterKind {
815 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
819 match self {
820 AlterKind::AddColumns { columns } => {
821 for col_to_add in columns {
822 col_to_add.validate(metadata)?;
823 }
824 }
825 AlterKind::DropColumns { names } => {
826 for name in names {
827 Self::validate_column_to_drop(name, metadata)?;
828 }
829 }
830 AlterKind::ModifyColumnTypes { columns } => {
831 for col_to_change in columns {
832 col_to_change.validate(metadata)?;
833 }
834 }
835 AlterKind::SetRegionOptions { .. } => {}
836 AlterKind::UnsetRegionOptions { .. } => {}
837 AlterKind::SetIndexes { options } => {
838 for option in options {
839 Self::validate_column_alter_index_option(
840 option.column_name(),
841 metadata,
842 option.is_fulltext(),
843 )?;
844 }
845 }
846 AlterKind::UnsetIndexes { options } => {
847 for option in options {
848 Self::validate_column_alter_index_option(
849 option.column_name(),
850 metadata,
851 option.is_fulltext(),
852 )?;
853 }
854 }
855 AlterKind::DropDefaults { names } => {
856 names
857 .iter()
858 .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
859 }
860 AlterKind::SetDefaults { columns } => {
861 columns
862 .iter()
863 .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
864 }
865 AlterKind::SyncColumns { column_metadatas } => {
866 let new_primary_keys = column_metadatas
867 .iter()
868 .filter(|c| c.semantic_type == SemanticType::Tag)
869 .map(|c| (c.column_schema.name.as_str(), c.column_id))
870 .collect::<HashMap<_, _>>();
871
872 let old_primary_keys = metadata
873 .column_metadatas
874 .iter()
875 .filter(|c| c.semantic_type == SemanticType::Tag)
876 .map(|c| (c.column_schema.name.as_str(), c.column_id));
877
878 for (name, id) in old_primary_keys {
879 let primary_key =
880 new_primary_keys
881 .get(name)
882 .with_context(|| InvalidRegionRequestSnafu {
883 region_id: metadata.region_id,
884 err: format!("column {} is not a primary key", name),
885 })?;
886
887 ensure!(
888 *primary_key == id,
889 InvalidRegionRequestSnafu {
890 region_id: metadata.region_id,
891 err: format!(
892 "column with same name {} has different id, existing: {}, got: {}",
893 name, id, primary_key
894 ),
895 }
896 );
897 }
898
899 let new_ts_column = column_metadatas
900 .iter()
901 .find(|c| c.semantic_type == SemanticType::Timestamp)
902 .map(|c| (c.column_schema.name.as_str(), c.column_id))
903 .context(InvalidRegionRequestSnafu {
904 region_id: metadata.region_id,
905 err: "timestamp column not found",
906 })?;
907
908 let old_ts_column = metadata
910 .column_metadatas
911 .iter()
912 .find(|c| c.semantic_type == SemanticType::Timestamp)
913 .map(|c| (c.column_schema.name.as_str(), c.column_id))
914 .unwrap();
915
916 ensure!(
917 new_ts_column == old_ts_column,
918 InvalidRegionRequestSnafu {
919 region_id: metadata.region_id,
920 err: format!(
921 "timestamp column {} has different id, existing: {}, got: {}",
922 old_ts_column.0, old_ts_column.1, new_ts_column.1
923 ),
924 }
925 );
926 }
927 }
928 Ok(())
929 }
930
931 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
933 debug_assert!(self.validate(metadata).is_ok());
934 match self {
935 AlterKind::AddColumns { columns } => columns
936 .iter()
937 .any(|col_to_add| col_to_add.need_alter(metadata)),
938 AlterKind::DropColumns { names } => names
939 .iter()
940 .any(|name| metadata.column_by_name(name).is_some()),
941 AlterKind::ModifyColumnTypes { columns } => columns
942 .iter()
943 .any(|col_to_change| col_to_change.need_alter(metadata)),
944 AlterKind::SetRegionOptions { .. } => true,
945 AlterKind::UnsetRegionOptions { .. } => true,
946 AlterKind::SetIndexes { options, .. } => options
947 .iter()
948 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
949 AlterKind::UnsetIndexes { options } => options
950 .iter()
951 .any(|option| metadata.column_by_name(option.column_name()).is_some()),
952 AlterKind::DropDefaults { names } => names
953 .iter()
954 .any(|name| metadata.column_by_name(name).is_some()),
955
956 AlterKind::SetDefaults { columns } => columns
957 .iter()
958 .any(|x| metadata.column_by_name(&x.name).is_some()),
959 AlterKind::SyncColumns { column_metadatas } => {
960 metadata.column_metadatas != *column_metadatas
961 }
962 }
963 }
964
965 fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
967 let Some(column) = metadata.column_by_name(name) else {
968 return Ok(());
969 };
970 ensure!(
971 column.semantic_type == SemanticType::Field,
972 InvalidRegionRequestSnafu {
973 region_id: metadata.region_id,
974 err: format!("column {} is not a field and could not be dropped", name),
975 }
976 );
977 Ok(())
978 }
979
980 fn validate_column_alter_index_option(
982 column_name: &String,
983 metadata: &RegionMetadata,
984 is_fulltext: bool,
985 ) -> Result<()> {
986 let column = metadata
987 .column_by_name(column_name)
988 .context(InvalidRegionRequestSnafu {
989 region_id: metadata.region_id,
990 err: format!("column {} not found", column_name),
991 })?;
992
993 if is_fulltext {
994 ensure!(
995 column.column_schema.data_type.is_string(),
996 InvalidRegionRequestSnafu {
997 region_id: metadata.region_id,
998 err: format!(
999 "cannot change alter index options for non-string column {}",
1000 column_name
1001 ),
1002 }
1003 );
1004 }
1005
1006 Ok(())
1007 }
1008
1009 fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
1011 metadata
1012 .column_by_name(column_name)
1013 .context(InvalidRegionRequestSnafu {
1014 region_id: metadata.region_id,
1015 err: format!("column {} not found", column_name),
1016 })?;
1017
1018 Ok(())
1019 }
1020}
1021
1022impl TryFrom<alter_request::Kind> for AlterKind {
1023 type Error = MetadataError;
1024
1025 fn try_from(kind: alter_request::Kind) -> Result<Self> {
1026 let alter_kind = match kind {
1027 alter_request::Kind::AddColumns(x) => {
1028 let columns = x
1029 .add_columns
1030 .into_iter()
1031 .map(|x| x.try_into())
1032 .collect::<Result<Vec<_>>>()?;
1033 AlterKind::AddColumns { columns }
1034 }
1035 alter_request::Kind::ModifyColumnTypes(x) => {
1036 let columns = x
1037 .modify_column_types
1038 .into_iter()
1039 .map(|x| x.into())
1040 .collect::<Vec<_>>();
1041 AlterKind::ModifyColumnTypes { columns }
1042 }
1043 alter_request::Kind::DropColumns(x) => {
1044 let names = x.drop_columns.into_iter().map(|x| x.name).collect();
1045 AlterKind::DropColumns { names }
1046 }
1047 alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
1048 options: options
1049 .table_options
1050 .iter()
1051 .map(TryFrom::try_from)
1052 .collect::<Result<Vec<_>>>()?,
1053 },
1054 alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1055 keys: options
1056 .keys
1057 .iter()
1058 .map(|key| UnsetRegionOption::try_from(key.as_str()))
1059 .collect::<Result<Vec<_>>>()?,
1060 },
1061 alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1062 options: vec![SetIndexOption::try_from(o)?],
1063 },
1064 alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1065 options: vec![UnsetIndexOption::try_from(o)?],
1066 },
1067 alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1068 options: o
1069 .set_indexes
1070 .into_iter()
1071 .map(SetIndexOption::try_from)
1072 .collect::<Result<Vec<_>>>()?,
1073 },
1074 alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1075 options: o
1076 .unset_indexes
1077 .into_iter()
1078 .map(UnsetIndexOption::try_from)
1079 .collect::<Result<Vec<_>>>()?,
1080 },
1081 alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1082 names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1083 },
1084 alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1085 columns: x
1086 .set_defaults
1087 .into_iter()
1088 .map(|x| {
1089 Ok(SetDefault {
1090 name: x.column_name,
1091 default_constraint: x.default_constraint.clone(),
1092 })
1093 })
1094 .collect::<Result<Vec<_>>>()?,
1095 },
1096 alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1097 column_metadatas: x
1098 .column_defs
1099 .into_iter()
1100 .map(ColumnMetadata::try_from_column_def)
1101 .collect::<Result<Vec<_>>>()?,
1102 },
1103 };
1104
1105 Ok(alter_kind)
1106 }
1107}
1108
1109#[derive(Debug, PartialEq, Eq, Clone)]
1111pub struct AddColumn {
1112 pub column_metadata: ColumnMetadata,
1114 pub location: Option<AddColumnLocation>,
1117}
1118
1119impl AddColumn {
1120 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1125 ensure!(
1126 self.column_metadata.column_schema.is_nullable()
1127 || self
1128 .column_metadata
1129 .column_schema
1130 .default_constraint()
1131 .is_some(),
1132 InvalidRegionRequestSnafu {
1133 region_id: metadata.region_id,
1134 err: format!(
1135 "no default value for column {}",
1136 self.column_metadata.column_schema.name
1137 ),
1138 }
1139 );
1140
1141 if let Some(existing_column) =
1142 metadata.column_by_name(&self.column_metadata.column_schema.name)
1143 {
1144 ensure!(
1146 *existing_column == self.column_metadata,
1147 InvalidRegionRequestSnafu {
1148 region_id: metadata.region_id,
1149 err: format!(
1150 "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1151 self.column_metadata.column_schema.name,
1152 existing_column,
1153 self.column_metadata,
1154 ),
1155 }
1156 );
1157 ensure!(
1158 self.location.is_none(),
1159 InvalidRegionRequestSnafu {
1160 region_id: metadata.region_id,
1161 err: format!(
1162 "column {} already exists, but location is specified",
1163 self.column_metadata.column_schema.name
1164 ),
1165 }
1166 );
1167 }
1168
1169 if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1170 ensure!(
1172 existing_column.column_schema.name == self.column_metadata.column_schema.name,
1173 InvalidRegionRequestSnafu {
1174 region_id: metadata.region_id,
1175 err: format!(
1176 "column id {} already exists with different name {}",
1177 self.column_metadata.column_id, existing_column.column_schema.name
1178 ),
1179 }
1180 );
1181 }
1182
1183 Ok(())
1184 }
1185
1186 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1188 debug_assert!(self.validate(metadata).is_ok());
1189 metadata
1190 .column_by_name(&self.column_metadata.column_schema.name)
1191 .is_none()
1192 }
1193}
1194
1195impl TryFrom<v1::region::AddColumn> for AddColumn {
1196 type Error = MetadataError;
1197
1198 fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1199 let column_def = add_column
1200 .column_def
1201 .context(InvalidRawRegionRequestSnafu {
1202 err: "missing column_def in AddColumn",
1203 })?;
1204
1205 let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1206 let location = add_column
1207 .location
1208 .map(AddColumnLocation::try_from)
1209 .transpose()?;
1210
1211 Ok(AddColumn {
1212 column_metadata,
1213 location,
1214 })
1215 }
1216}
1217
1218#[derive(Debug, PartialEq, Eq, Clone)]
1220pub enum AddColumnLocation {
1221 First,
1223 After {
1225 column_name: String,
1227 },
1228}
1229
1230impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1231 type Error = MetadataError;
1232
1233 fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1234 let location_type = LocationType::try_from(location.location_type)
1235 .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1236 let add_column_location = match location_type {
1237 LocationType::First => AddColumnLocation::First,
1238 LocationType::After => AddColumnLocation::After {
1239 column_name: location.after_column_name,
1240 },
1241 };
1242
1243 Ok(add_column_location)
1244 }
1245}
1246
1247#[derive(Debug, PartialEq, Eq, Clone)]
1249pub struct ModifyColumnType {
1250 pub column_name: String,
1252 pub target_type: ConcreteDataType,
1254}
1255
1256impl ModifyColumnType {
1257 pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1259 let column_meta = metadata
1260 .column_by_name(&self.column_name)
1261 .with_context(|| InvalidRegionRequestSnafu {
1262 region_id: metadata.region_id,
1263 err: format!("column {} not found", self.column_name),
1264 })?;
1265
1266 ensure!(
1267 matches!(column_meta.semantic_type, SemanticType::Field),
1268 InvalidRegionRequestSnafu {
1269 region_id: metadata.region_id,
1270 err: "'timestamp' or 'tag' column cannot change type".to_string()
1271 }
1272 );
1273 ensure!(
1274 column_meta
1275 .column_schema
1276 .data_type
1277 .can_arrow_type_cast_to(&self.target_type),
1278 InvalidRegionRequestSnafu {
1279 region_id: metadata.region_id,
1280 err: format!(
1281 "column '{}' cannot be cast automatically to type '{}'",
1282 self.column_name, self.target_type
1283 ),
1284 }
1285 );
1286
1287 Ok(())
1288 }
1289
1290 pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1292 debug_assert!(self.validate(metadata).is_ok());
1293 metadata.column_by_name(&self.column_name).is_some()
1294 }
1295}
1296
1297impl From<v1::ModifyColumnType> for ModifyColumnType {
1298 fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1299 let target_type = ColumnDataTypeWrapper::new(
1300 modify_column_type.target_type(),
1301 modify_column_type.target_type_extension,
1302 )
1303 .into();
1304
1305 ModifyColumnType {
1306 column_name: modify_column_type.column_name,
1307 target_type,
1308 }
1309 }
1310}
1311
1312#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1313pub enum SetRegionOption {
1314 Ttl(Option<TimeToLive>),
1315 Twsc(String, String),
1317 Format(String),
1319}
1320
1321impl TryFrom<&PbOption> for SetRegionOption {
1322 type Error = MetadataError;
1323
1324 fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1325 let PbOption { key, value } = value;
1326 match key.as_str() {
1327 TTL_KEY => {
1328 let ttl = TimeToLive::from_humantime_or_str(value)
1329 .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1330
1331 Ok(Self::Ttl(Some(ttl)))
1332 }
1333 TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1334 Ok(Self::Twsc(key.clone(), value.clone()))
1335 }
1336 SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1337 _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1338 }
1339 }
1340}
1341
1342impl From<&UnsetRegionOption> for SetRegionOption {
1343 fn from(unset_option: &UnsetRegionOption) -> Self {
1344 match unset_option {
1345 UnsetRegionOption::TwcsTriggerFileNum => {
1346 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1347 }
1348 UnsetRegionOption::TwcsMaxOutputFileSize => {
1349 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1350 }
1351 UnsetRegionOption::TwcsTimeWindow => {
1352 SetRegionOption::Twsc(unset_option.to_string(), String::new())
1353 }
1354 UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1355 }
1356 }
1357}
1358
1359impl TryFrom<&str> for UnsetRegionOption {
1360 type Error = MetadataError;
1361
1362 fn try_from(key: &str) -> Result<Self> {
1363 match key.to_ascii_lowercase().as_str() {
1364 TTL_KEY => Ok(Self::Ttl),
1365 TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1366 TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1367 TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1368 _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1369 }
1370 }
1371}
1372
1373#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1374pub enum UnsetRegionOption {
1375 TwcsTriggerFileNum,
1376 TwcsMaxOutputFileSize,
1377 TwcsTimeWindow,
1378 Ttl,
1379}
1380
1381impl UnsetRegionOption {
1382 pub fn as_str(&self) -> &str {
1383 match self {
1384 Self::Ttl => TTL_KEY,
1385 Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1386 Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1387 Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1388 }
1389 }
1390}
1391
1392impl Display for UnsetRegionOption {
1393 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1394 write!(f, "{}", self.as_str())
1395 }
1396}
1397
1398#[derive(Debug, Clone, Default)]
1399pub struct RegionFlushRequest {
1400 pub row_group_size: Option<usize>,
1401}
1402
1403#[derive(Debug)]
1404pub struct RegionCompactRequest {
1405 pub options: compact_request::Options,
1406 pub parallelism: Option<u32>,
1407}
1408
1409impl Default for RegionCompactRequest {
1410 fn default() -> Self {
1411 Self {
1412 options: compact_request::Options::Regular(Default::default()),
1414 parallelism: None,
1415 }
1416 }
1417}
1418
1419#[derive(Debug, Clone, Default)]
1420pub struct RegionBuildIndexRequest {}
1421
1422#[derive(Debug)]
1424pub enum RegionTruncateRequest {
1425 All,
1427 ByTimeRanges {
1428 time_ranges: Vec<(Timestamp, Timestamp)>,
1432 },
1433}
1434
1435#[derive(Debug, Clone, Copy, Default)]
1440pub struct RegionCatchupRequest {
1441 pub set_writable: bool,
1443 pub entry_id: Option<entry::Id>,
1446 pub metadata_entry_id: Option<entry::Id>,
1450 pub location_id: Option<u64>,
1452 pub checkpoint: Option<ReplayCheckpoint>,
1454}
1455
1456#[derive(Debug, Clone)]
1457pub struct RegionBulkInsertsRequest {
1458 pub region_id: RegionId,
1459 pub payload: DfRecordBatch,
1460 pub raw_data: ArrowIpc,
1461 pub partition_expr_version: Option<u64>,
1462}
1463
1464impl RegionBulkInsertsRequest {
1465 pub fn estimated_size(&self) -> usize {
1466 self.payload.get_array_memory_size()
1467 }
1468}
1469
1470#[derive(Debug, Clone)]
1476pub struct EnterStagingRequest {
1477 pub partition_expr: String,
1479}
1480
1481#[derive(Debug, Clone)]
1500pub struct ApplyStagingManifestRequest {
1501 pub partition_expr: String,
1503 pub central_region_id: RegionId,
1505 pub manifest_path: String,
1508}
1509
1510impl fmt::Display for RegionRequest {
1511 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1512 match self {
1513 RegionRequest::Put(_) => write!(f, "Put"),
1514 RegionRequest::Delete(_) => write!(f, "Delete"),
1515 RegionRequest::Create(_) => write!(f, "Create"),
1516 RegionRequest::Drop(_) => write!(f, "Drop"),
1517 RegionRequest::Open(_) => write!(f, "Open"),
1518 RegionRequest::Close(_) => write!(f, "Close"),
1519 RegionRequest::Alter(_) => write!(f, "Alter"),
1520 RegionRequest::Flush(_) => write!(f, "Flush"),
1521 RegionRequest::Compact(_) => write!(f, "Compact"),
1522 RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1523 RegionRequest::Truncate(_) => write!(f, "Truncate"),
1524 RegionRequest::Catchup(_) => write!(f, "Catchup"),
1525 RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1526 RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1527 RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
1528 }
1529 }
1530}
1531
1532#[cfg(test)]
1533mod tests {
1534
1535 use api::v1::region::RegionColumnDef;
1536 use api::v1::{ColumnDataType, ColumnDef};
1537 use datatypes::prelude::ConcreteDataType;
1538 use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1539
1540 use super::*;
1541 use crate::metadata::RegionMetadataBuilder;
1542
1543 #[test]
1544 fn test_from_proto_location() {
1545 let proto_location = v1::AddColumnLocation {
1546 location_type: LocationType::First as i32,
1547 after_column_name: String::default(),
1548 };
1549 let location = AddColumnLocation::try_from(proto_location).unwrap();
1550 assert_eq!(location, AddColumnLocation::First);
1551
1552 let proto_location = v1::AddColumnLocation {
1553 location_type: 10,
1554 after_column_name: String::default(),
1555 };
1556 AddColumnLocation::try_from(proto_location).unwrap_err();
1557
1558 let proto_location = v1::AddColumnLocation {
1559 location_type: LocationType::After as i32,
1560 after_column_name: "a".to_string(),
1561 };
1562 let location = AddColumnLocation::try_from(proto_location).unwrap();
1563 assert_eq!(
1564 location,
1565 AddColumnLocation::After {
1566 column_name: "a".to_string()
1567 }
1568 );
1569 }
1570
1571 #[test]
1572 fn test_from_none_proto_add_column() {
1573 AddColumn::try_from(v1::region::AddColumn {
1574 column_def: None,
1575 location: None,
1576 })
1577 .unwrap_err();
1578 }
1579
1580 #[test]
1581 fn test_from_proto_alter_request() {
1582 RegionAlterRequest::try_from(AlterRequest {
1583 region_id: 0,
1584 schema_version: 1,
1585 kind: None,
1586 })
1587 .unwrap_err();
1588
1589 let request = RegionAlterRequest::try_from(AlterRequest {
1590 region_id: 0,
1591 schema_version: 1,
1592 kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1593 add_columns: vec![v1::region::AddColumn {
1594 column_def: Some(RegionColumnDef {
1595 column_def: Some(ColumnDef {
1596 name: "a".to_string(),
1597 data_type: ColumnDataType::String as i32,
1598 is_nullable: true,
1599 default_constraint: vec![],
1600 semantic_type: SemanticType::Field as i32,
1601 comment: String::new(),
1602 ..Default::default()
1603 }),
1604 column_id: 1,
1605 }),
1606 location: Some(v1::AddColumnLocation {
1607 location_type: LocationType::First as i32,
1608 after_column_name: String::default(),
1609 }),
1610 }],
1611 })),
1612 })
1613 .unwrap();
1614
1615 assert_eq!(
1616 request,
1617 RegionAlterRequest {
1618 kind: AlterKind::AddColumns {
1619 columns: vec![AddColumn {
1620 column_metadata: ColumnMetadata {
1621 column_schema: ColumnSchema::new(
1622 "a",
1623 ConcreteDataType::string_datatype(),
1624 true,
1625 ),
1626 semantic_type: SemanticType::Field,
1627 column_id: 1,
1628 },
1629 location: Some(AddColumnLocation::First),
1630 }]
1631 },
1632 }
1633 );
1634 }
1635
1636 fn new_metadata() -> RegionMetadata {
1639 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1640 builder
1641 .push_column_metadata(ColumnMetadata {
1642 column_schema: ColumnSchema::new(
1643 "ts",
1644 ConcreteDataType::timestamp_millisecond_datatype(),
1645 false,
1646 ),
1647 semantic_type: SemanticType::Timestamp,
1648 column_id: 1,
1649 })
1650 .push_column_metadata(ColumnMetadata {
1651 column_schema: ColumnSchema::new(
1652 "tag_0",
1653 ConcreteDataType::string_datatype(),
1654 true,
1655 ),
1656 semantic_type: SemanticType::Tag,
1657 column_id: 2,
1658 })
1659 .push_column_metadata(ColumnMetadata {
1660 column_schema: ColumnSchema::new(
1661 "field_0",
1662 ConcreteDataType::string_datatype(),
1663 true,
1664 ),
1665 semantic_type: SemanticType::Field,
1666 column_id: 3,
1667 })
1668 .push_column_metadata(ColumnMetadata {
1669 column_schema: ColumnSchema::new(
1670 "field_1",
1671 ConcreteDataType::boolean_datatype(),
1672 true,
1673 ),
1674 semantic_type: SemanticType::Field,
1675 column_id: 4,
1676 })
1677 .primary_key(vec![2]);
1678 builder.build().unwrap()
1679 }
1680
1681 #[test]
1682 fn test_add_column_validate() {
1683 let metadata = new_metadata();
1684 let add_column = AddColumn {
1685 column_metadata: ColumnMetadata {
1686 column_schema: ColumnSchema::new(
1687 "tag_1",
1688 ConcreteDataType::string_datatype(),
1689 true,
1690 ),
1691 semantic_type: SemanticType::Tag,
1692 column_id: 5,
1693 },
1694 location: None,
1695 };
1696 add_column.validate(&metadata).unwrap();
1697 assert!(add_column.need_alter(&metadata));
1698
1699 AddColumn {
1701 column_metadata: ColumnMetadata {
1702 column_schema: ColumnSchema::new(
1703 "tag_1",
1704 ConcreteDataType::string_datatype(),
1705 false,
1706 ),
1707 semantic_type: SemanticType::Tag,
1708 column_id: 5,
1709 },
1710 location: None,
1711 }
1712 .validate(&metadata)
1713 .unwrap_err();
1714
1715 let add_column = AddColumn {
1717 column_metadata: ColumnMetadata {
1718 column_schema: ColumnSchema::new(
1719 "tag_0",
1720 ConcreteDataType::string_datatype(),
1721 true,
1722 ),
1723 semantic_type: SemanticType::Tag,
1724 column_id: 2,
1725 },
1726 location: None,
1727 };
1728 add_column.validate(&metadata).unwrap();
1729 assert!(!add_column.need_alter(&metadata));
1730 }
1731
1732 #[test]
1733 fn test_add_duplicate_columns() {
1734 let kind = AlterKind::AddColumns {
1735 columns: vec![
1736 AddColumn {
1737 column_metadata: ColumnMetadata {
1738 column_schema: ColumnSchema::new(
1739 "tag_1",
1740 ConcreteDataType::string_datatype(),
1741 true,
1742 ),
1743 semantic_type: SemanticType::Tag,
1744 column_id: 5,
1745 },
1746 location: None,
1747 },
1748 AddColumn {
1749 column_metadata: ColumnMetadata {
1750 column_schema: ColumnSchema::new(
1751 "tag_1",
1752 ConcreteDataType::string_datatype(),
1753 true,
1754 ),
1755 semantic_type: SemanticType::Field,
1756 column_id: 6,
1757 },
1758 location: None,
1759 },
1760 ],
1761 };
1762 let metadata = new_metadata();
1763 kind.validate(&metadata).unwrap();
1764 assert!(kind.need_alter(&metadata));
1765 }
1766
1767 #[test]
1768 fn test_add_existing_column_different_metadata() {
1769 let metadata = new_metadata();
1770
1771 let kind = AlterKind::AddColumns {
1773 columns: vec![AddColumn {
1774 column_metadata: ColumnMetadata {
1775 column_schema: ColumnSchema::new(
1776 "tag_0",
1777 ConcreteDataType::string_datatype(),
1778 true,
1779 ),
1780 semantic_type: SemanticType::Tag,
1781 column_id: 4,
1782 },
1783 location: None,
1784 }],
1785 };
1786 kind.validate(&metadata).unwrap_err();
1787
1788 let kind = AlterKind::AddColumns {
1790 columns: vec![AddColumn {
1791 column_metadata: ColumnMetadata {
1792 column_schema: ColumnSchema::new(
1793 "tag_0",
1794 ConcreteDataType::int64_datatype(),
1795 true,
1796 ),
1797 semantic_type: SemanticType::Tag,
1798 column_id: 2,
1799 },
1800 location: None,
1801 }],
1802 };
1803 kind.validate(&metadata).unwrap_err();
1804
1805 let kind = AlterKind::AddColumns {
1807 columns: vec![AddColumn {
1808 column_metadata: ColumnMetadata {
1809 column_schema: ColumnSchema::new(
1810 "tag_1",
1811 ConcreteDataType::string_datatype(),
1812 true,
1813 ),
1814 semantic_type: SemanticType::Tag,
1815 column_id: 2,
1816 },
1817 location: None,
1818 }],
1819 };
1820 kind.validate(&metadata).unwrap_err();
1821 }
1822
1823 #[test]
1824 fn test_add_existing_column_with_location() {
1825 let metadata = new_metadata();
1826 let kind = AlterKind::AddColumns {
1827 columns: vec![AddColumn {
1828 column_metadata: ColumnMetadata {
1829 column_schema: ColumnSchema::new(
1830 "tag_0",
1831 ConcreteDataType::string_datatype(),
1832 true,
1833 ),
1834 semantic_type: SemanticType::Tag,
1835 column_id: 2,
1836 },
1837 location: Some(AddColumnLocation::First),
1838 }],
1839 };
1840 kind.validate(&metadata).unwrap_err();
1841 }
1842
1843 #[test]
1844 fn test_validate_drop_column() {
1845 let metadata = new_metadata();
1846 let kind = AlterKind::DropColumns {
1847 names: vec!["xxxx".to_string()],
1848 };
1849 kind.validate(&metadata).unwrap();
1850 assert!(!kind.need_alter(&metadata));
1851
1852 AlterKind::DropColumns {
1853 names: vec!["tag_0".to_string()],
1854 }
1855 .validate(&metadata)
1856 .unwrap_err();
1857
1858 let kind = AlterKind::DropColumns {
1859 names: vec!["field_0".to_string()],
1860 };
1861 kind.validate(&metadata).unwrap();
1862 assert!(kind.need_alter(&metadata));
1863 }
1864
1865 #[test]
1866 fn test_validate_modify_column_type() {
1867 let metadata = new_metadata();
1868 AlterKind::ModifyColumnTypes {
1869 columns: vec![ModifyColumnType {
1870 column_name: "xxxx".to_string(),
1871 target_type: ConcreteDataType::string_datatype(),
1872 }],
1873 }
1874 .validate(&metadata)
1875 .unwrap_err();
1876
1877 AlterKind::ModifyColumnTypes {
1878 columns: vec![ModifyColumnType {
1879 column_name: "field_1".to_string(),
1880 target_type: ConcreteDataType::date_datatype(),
1881 }],
1882 }
1883 .validate(&metadata)
1884 .unwrap_err();
1885
1886 AlterKind::ModifyColumnTypes {
1887 columns: vec![ModifyColumnType {
1888 column_name: "ts".to_string(),
1889 target_type: ConcreteDataType::date_datatype(),
1890 }],
1891 }
1892 .validate(&metadata)
1893 .unwrap_err();
1894
1895 AlterKind::ModifyColumnTypes {
1896 columns: vec![ModifyColumnType {
1897 column_name: "tag_0".to_string(),
1898 target_type: ConcreteDataType::date_datatype(),
1899 }],
1900 }
1901 .validate(&metadata)
1902 .unwrap_err();
1903
1904 let kind = AlterKind::ModifyColumnTypes {
1905 columns: vec![ModifyColumnType {
1906 column_name: "field_0".to_string(),
1907 target_type: ConcreteDataType::int32_datatype(),
1908 }],
1909 };
1910 kind.validate(&metadata).unwrap();
1911 assert!(kind.need_alter(&metadata));
1912 }
1913
1914 #[test]
1915 fn test_validate_add_columns() {
1916 let kind = AlterKind::AddColumns {
1917 columns: vec![
1918 AddColumn {
1919 column_metadata: ColumnMetadata {
1920 column_schema: ColumnSchema::new(
1921 "tag_1",
1922 ConcreteDataType::string_datatype(),
1923 true,
1924 ),
1925 semantic_type: SemanticType::Tag,
1926 column_id: 5,
1927 },
1928 location: None,
1929 },
1930 AddColumn {
1931 column_metadata: ColumnMetadata {
1932 column_schema: ColumnSchema::new(
1933 "field_2",
1934 ConcreteDataType::string_datatype(),
1935 true,
1936 ),
1937 semantic_type: SemanticType::Field,
1938 column_id: 6,
1939 },
1940 location: None,
1941 },
1942 ],
1943 };
1944 let request = RegionAlterRequest { kind };
1945 let mut metadata = new_metadata();
1946 metadata.schema_version = 1;
1947 request.validate(&metadata).unwrap();
1948 }
1949
1950 #[test]
1951 fn test_validate_create_region() {
1952 let column_metadatas = vec![
1953 ColumnMetadata {
1954 column_schema: ColumnSchema::new(
1955 "ts",
1956 ConcreteDataType::timestamp_millisecond_datatype(),
1957 false,
1958 ),
1959 semantic_type: SemanticType::Timestamp,
1960 column_id: 1,
1961 },
1962 ColumnMetadata {
1963 column_schema: ColumnSchema::new(
1964 "tag_0",
1965 ConcreteDataType::string_datatype(),
1966 true,
1967 ),
1968 semantic_type: SemanticType::Tag,
1969 column_id: 2,
1970 },
1971 ColumnMetadata {
1972 column_schema: ColumnSchema::new(
1973 "field_0",
1974 ConcreteDataType::string_datatype(),
1975 true,
1976 ),
1977 semantic_type: SemanticType::Field,
1978 column_id: 3,
1979 },
1980 ];
1981 let create = RegionCreateRequest {
1982 engine: "mito".to_string(),
1983 column_metadatas,
1984 primary_key: vec![3, 4],
1985 options: HashMap::new(),
1986 table_dir: "path".to_string(),
1987 path_type: PathType::Bare,
1988 partition_expr_json: Some("".to_string()),
1989 };
1990
1991 assert!(create.validate().is_err());
1992 }
1993
1994 #[test]
1995 fn test_validate_modify_column_fulltext_options() {
1996 let kind = AlterKind::SetIndexes {
1997 options: vec![SetIndexOption::Fulltext {
1998 column_name: "tag_0".to_string(),
1999 options: FulltextOptions::new_unchecked(
2000 true,
2001 FulltextAnalyzer::Chinese,
2002 false,
2003 FulltextBackend::Bloom,
2004 1000,
2005 0.01,
2006 ),
2007 }],
2008 };
2009 let request = RegionAlterRequest { kind };
2010 let mut metadata = new_metadata();
2011 metadata.schema_version = 1;
2012 request.validate(&metadata).unwrap();
2013
2014 let kind = AlterKind::UnsetIndexes {
2015 options: vec![UnsetIndexOption::Fulltext {
2016 column_name: "tag_0".to_string(),
2017 }],
2018 };
2019 let request = RegionAlterRequest { kind };
2020 let mut metadata = new_metadata();
2021 metadata.schema_version = 1;
2022 request.validate(&metadata).unwrap();
2023 }
2024
2025 #[test]
2026 fn test_validate_sync_columns() {
2027 let metadata = new_metadata();
2028 let kind = AlterKind::SyncColumns {
2029 column_metadatas: vec![
2030 ColumnMetadata {
2031 column_schema: ColumnSchema::new(
2032 "tag_1",
2033 ConcreteDataType::string_datatype(),
2034 true,
2035 ),
2036 semantic_type: SemanticType::Tag,
2037 column_id: 5,
2038 },
2039 ColumnMetadata {
2040 column_schema: ColumnSchema::new(
2041 "field_2",
2042 ConcreteDataType::string_datatype(),
2043 true,
2044 ),
2045 semantic_type: SemanticType::Field,
2046 column_id: 6,
2047 },
2048 ],
2049 };
2050 let err = kind.validate(&metadata).unwrap_err();
2051 assert!(err.to_string().contains("not a primary key"));
2052
2053 let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
2055 let ts_column = column_metadatas_with_different_ts_column
2056 .iter_mut()
2057 .find(|c| c.semantic_type == SemanticType::Timestamp)
2058 .unwrap();
2059 ts_column.column_schema.name = "ts1".to_string();
2060
2061 let kind = AlterKind::SyncColumns {
2062 column_metadatas: column_metadatas_with_different_ts_column,
2063 };
2064 let err = kind.validate(&metadata).unwrap_err();
2065 assert!(
2066 err.to_string()
2067 .contains("timestamp column ts has different id")
2068 );
2069
2070 let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
2072 let pk_column = column_metadatas_with_different_pk_column
2073 .iter_mut()
2074 .find(|c| c.column_schema.name == "tag_0")
2075 .unwrap();
2076 pk_column.column_id = 100;
2077 let kind = AlterKind::SyncColumns {
2078 column_metadatas: column_metadatas_with_different_pk_column,
2079 };
2080 let err = kind.validate(&metadata).unwrap_err();
2081 assert!(
2082 err.to_string()
2083 .contains("column with same name tag_0 has different id")
2084 );
2085
2086 let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2088 column_metadatas_with_new_field_column.push(ColumnMetadata {
2089 column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2090 semantic_type: SemanticType::Field,
2091 column_id: 4,
2092 });
2093 let kind = AlterKind::SyncColumns {
2094 column_metadatas: column_metadatas_with_new_field_column,
2095 };
2096 kind.validate(&metadata).unwrap();
2097 }
2098
2099 #[test]
2100 fn test_cast_path_type_to_primitive() {
2101 assert_eq!(PathType::Bare as u8, 0);
2102 assert_eq!(PathType::Data as u8, 1);
2103 assert_eq!(PathType::Metadata as u8, 2);
2104 assert_eq!(
2105 PathType::try_from(PathType::Bare as u8).unwrap(),
2106 PathType::Bare
2107 );
2108 assert_eq!(
2109 PathType::try_from(PathType::Data as u8).unwrap(),
2110 PathType::Data
2111 );
2112 assert_eq!(
2113 PathType::try_from(PathType::Metadata as u8).unwrap(),
2114 PathType::Metadata
2115 );
2116 }
2117}