store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::{ColumnDataTypeWrapper, from_pb_time_ranges};
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    AlterRequest, AlterRequests, BuildIndexRequest, BulkInsertRequest, CloseRequest,
26    CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest, DropRequests,
27    FlushRequest, InsertRequests, OpenRequest, TruncateRequest, alter_request, compact_request,
28    region_request, truncate_request,
29};
30use api::v1::{
31    self, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
32    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
33};
34pub use common_base::AffectedRows;
35use common_grpc::flight::FlightDecoder;
36use common_recordbatch::DfRecordBatch;
37use common_time::{TimeToLive, Timestamp};
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use num_enum::TryFromPrimitive;
41use serde::{Deserialize, Serialize};
42use snafu::{OptionExt, ResultExt, ensure};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47    ColumnMetadata, ConvertTimeRangesSnafu, DecodeProtoSnafu, FlightCodecSnafu,
48    InvalidIndexOptionSnafu, InvalidRawRegionRequestSnafu, InvalidRegionRequestSnafu,
49    InvalidSetRegionOptionRequestSnafu, InvalidUnsetRegionOptionRequestSnafu, MetadataError,
50    RegionMetadata, Result, UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55    APPEND_MODE_KEY, SST_FORMAT_KEY, TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW,
56    TWCS_TRIGGER_FILE_NUM,
57};
58use crate::path_utils::table_dir;
59use crate::storage::{ColumnId, RegionId, ScanRequest};
60
61/// The type of path to generate.
62#[derive(Debug, Clone, Copy, PartialEq, TryFromPrimitive)]
63#[repr(u8)]
64pub enum PathType {
65    /// A bare path - the original path of an engine.
66    ///
67    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/`.
68    Bare,
69    /// A path for the data region of a metric engine table.
70    ///
71    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/data/`.
72    Data,
73    /// A path for the metadata region of a metric engine table.
74    ///
75    /// The path prefix is `{table_dir}/{table_id}_{region_sequence}/metadata/`.
76    Metadata,
77}
78
79#[derive(Debug, IntoStaticStr)]
80pub enum BatchRegionDdlRequest {
81    Create(Vec<(RegionId, RegionCreateRequest)>),
82    Drop(Vec<(RegionId, RegionDropRequest)>),
83    Alter(Vec<(RegionId, RegionAlterRequest)>),
84}
85
86impl BatchRegionDdlRequest {
87    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
88    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
89        match body {
90            region_request::Body::Creates(creates) => {
91                let requests = creates
92                    .requests
93                    .into_iter()
94                    .map(parse_region_create)
95                    .collect::<Result<Vec<_>>>()?;
96                Ok(Some(Self::Create(requests)))
97            }
98            region_request::Body::Drops(drops) => {
99                let requests = drops
100                    .requests
101                    .into_iter()
102                    .map(parse_region_drop)
103                    .collect::<Result<Vec<_>>>()?;
104                Ok(Some(Self::Drop(requests)))
105            }
106            region_request::Body::Alters(alters) => {
107                let requests = alters
108                    .requests
109                    .into_iter()
110                    .map(parse_region_alter)
111                    .collect::<Result<Vec<_>>>()?;
112                Ok(Some(Self::Alter(requests)))
113            }
114            _ => Ok(None),
115        }
116    }
117
118    pub fn request_type(&self) -> &'static str {
119        self.into()
120    }
121
122    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
123        match self {
124            Self::Create(requests) => requests
125                .into_iter()
126                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
127                .collect(),
128            Self::Drop(requests) => requests
129                .into_iter()
130                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
131                .collect(),
132            Self::Alter(requests) => requests
133                .into_iter()
134                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
135                .collect(),
136        }
137    }
138}
139
140#[derive(Debug, IntoStaticStr)]
141pub enum RegionRequest {
142    Put(RegionPutRequest),
143    Delete(RegionDeleteRequest),
144    Create(RegionCreateRequest),
145    Drop(RegionDropRequest),
146    Open(RegionOpenRequest),
147    Close(RegionCloseRequest),
148    Alter(RegionAlterRequest),
149    Flush(RegionFlushRequest),
150    Compact(RegionCompactRequest),
151    BuildIndex(RegionBuildIndexRequest),
152    Truncate(RegionTruncateRequest),
153    Catchup(RegionCatchupRequest),
154    BulkInserts(RegionBulkInsertsRequest),
155    EnterStaging(EnterStagingRequest),
156    ApplyStagingManifest(ApplyStagingManifestRequest),
157}
158
159impl RegionRequest {
160    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
161    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
162    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
163        match body {
164            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
165            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
166            region_request::Body::Create(create) => make_region_create(create),
167            region_request::Body::Drop(drop) => make_region_drop(drop),
168            region_request::Body::Open(open) => make_region_open(open),
169            region_request::Body::Close(close) => make_region_close(close),
170            region_request::Body::Alter(alter) => make_region_alter(alter),
171            region_request::Body::Flush(flush) => make_region_flush(flush),
172            region_request::Body::Compact(compact) => make_region_compact(compact),
173            region_request::Body::BuildIndex(index) => make_region_build_index(index),
174            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
175            region_request::Body::Creates(creates) => make_region_creates(creates),
176            region_request::Body::Drops(drops) => make_region_drops(drops),
177            region_request::Body::Alters(alters) => make_region_alters(alters),
178            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
179            region_request::Body::Sync(_) => UnexpectedSnafu {
180                reason: "Sync request should be handled separately by RegionServer",
181            }
182            .fail(),
183            region_request::Body::ListMetadata(_) => UnexpectedSnafu {
184                reason: "ListMetadata request should be handled separately by RegionServer",
185            }
186            .fail(),
187            region_request::Body::ApplyStagingManifest(apply) => {
188                make_region_apply_staging_manifest(apply)
189            }
190        }
191    }
192
193    /// Returns the type name of the request.
194    pub fn request_type(&self) -> &'static str {
195        self.into()
196    }
197}
198
199fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
200    let requests = inserts
201        .requests
202        .into_iter()
203        .filter_map(|r| {
204            let region_id = r.region_id.into();
205            r.rows.map(|rows| {
206                (
207                    region_id,
208                    RegionRequest::Put(RegionPutRequest {
209                        rows,
210                        hint: None,
211                        partition_expr_version: r.partition_expr_version.map(|v| v.value),
212                    }),
213                )
214            })
215        })
216        .collect();
217    Ok(requests)
218}
219
220fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
221    let requests = deletes
222        .requests
223        .into_iter()
224        .filter_map(|r| {
225            let region_id = r.region_id.into();
226            r.rows.map(|rows| {
227                (
228                    region_id,
229                    RegionRequest::Delete(RegionDeleteRequest {
230                        rows,
231                        hint: None,
232                        partition_expr_version: r.partition_expr_version.map(|v| v.value),
233                    }),
234                )
235            })
236        })
237        .collect();
238    Ok(requests)
239}
240
241fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
242    let column_metadatas = create
243        .column_defs
244        .into_iter()
245        .map(ColumnMetadata::try_from_column_def)
246        .collect::<Result<Vec<_>>>()?;
247    let region_id = RegionId::from(create.region_id);
248    let table_dir = table_dir(&create.path, region_id.table_id());
249    let partition_expr_json = create.partition.as_ref().map(|p| p.expression.clone());
250    Ok((
251        region_id,
252        RegionCreateRequest {
253            engine: create.engine,
254            column_metadatas,
255            primary_key: create.primary_key,
256            options: create.options,
257            table_dir,
258            path_type: PathType::Bare,
259            partition_expr_json,
260        },
261    ))
262}
263
264fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
265    let (region_id, request) = parse_region_create(create)?;
266    Ok(vec![(region_id, RegionRequest::Create(request))])
267}
268
269fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
270    let mut requests = Vec::with_capacity(creates.requests.len());
271    for create in creates.requests {
272        requests.extend(make_region_create(create)?);
273    }
274    Ok(requests)
275}
276
277fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
278    let region_id = drop.region_id.into();
279    Ok((
280        region_id,
281        RegionDropRequest {
282            fast_path: drop.fast_path,
283            force: drop.force,
284            partial_drop: drop.partial_drop,
285        },
286    ))
287}
288
289fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
290    let (region_id, request) = parse_region_drop(drop)?;
291    Ok(vec![(region_id, RegionRequest::Drop(request))])
292}
293
294fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
295    let mut requests = Vec::with_capacity(drops.requests.len());
296    for drop in drops.requests {
297        requests.extend(make_region_drop(drop)?);
298    }
299    Ok(requests)
300}
301
302fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
303    let region_id = RegionId::from(open.region_id);
304    let table_dir = table_dir(&open.path, region_id.table_id());
305    Ok(vec![(
306        region_id,
307        RegionRequest::Open(RegionOpenRequest {
308            engine: open.engine,
309            table_dir,
310            path_type: PathType::Bare,
311            options: open.options,
312            skip_wal_replay: false,
313            checkpoint: None,
314        }),
315    )])
316}
317
318fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
319    let region_id = close.region_id.into();
320    Ok(vec![(
321        region_id,
322        RegionRequest::Close(RegionCloseRequest {}),
323    )])
324}
325
326fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
327    let region_id = alter.region_id.into();
328    let request = RegionAlterRequest::try_from(alter)?;
329    Ok((region_id, request))
330}
331
332fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
333    let (region_id, request) = parse_region_alter(alter)?;
334    Ok(vec![(region_id, RegionRequest::Alter(request))])
335}
336
337fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
338    let mut requests = Vec::with_capacity(alters.requests.len());
339    for alter in alters.requests {
340        requests.extend(make_region_alter(alter)?);
341    }
342    Ok(requests)
343}
344
345fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
346    let region_id = flush.region_id.into();
347    Ok(vec![(
348        region_id,
349        RegionRequest::Flush(RegionFlushRequest {
350            row_group_size: None,
351        }),
352    )])
353}
354
355fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
356    let region_id = compact.region_id.into();
357    let options = compact
358        .options
359        .unwrap_or(compact_request::Options::Regular(Default::default()));
360    // Convert parallelism: a value of 0 indicates no specific parallelism requested (None)
361    let parallelism = if compact.parallelism == 0 {
362        None
363    } else {
364        Some(compact.parallelism)
365    };
366    Ok(vec![(
367        region_id,
368        RegionRequest::Compact(RegionCompactRequest {
369            options,
370            parallelism,
371        }),
372    )])
373}
374
375fn make_region_build_index(index: BuildIndexRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
376    let region_id = index.region_id.into();
377    Ok(vec![(
378        region_id,
379        RegionRequest::BuildIndex(RegionBuildIndexRequest {}),
380    )])
381}
382
383fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
384    let region_id = truncate.region_id.into();
385    match truncate.kind {
386        None => InvalidRawRegionRequestSnafu {
387            err: "missing kind in TruncateRequest".to_string(),
388        }
389        .fail(),
390        Some(truncate_request::Kind::All(_)) => Ok(vec![(
391            region_id,
392            RegionRequest::Truncate(RegionTruncateRequest::All),
393        )]),
394        Some(truncate_request::Kind::TimeRanges(time_ranges)) => {
395            let time_ranges = from_pb_time_ranges(time_ranges).context(ConvertTimeRangesSnafu)?;
396
397            Ok(vec![(
398                region_id,
399                RegionRequest::Truncate(RegionTruncateRequest::ByTimeRanges { time_ranges }),
400            )])
401        }
402    }
403}
404
405/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
406fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
407    let region_id = request.region_id.into();
408    let partition_expr_version = request.partition_expr_version.map(|v| v.value);
409    let Some(Body::ArrowIpc(request)) = request.body else {
410        return Ok(vec![]);
411    };
412
413    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
414        .with_label_values(&["decode"])
415        .start_timer();
416    let mut decoder =
417        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
418    let payload = decoder
419        .try_decode_record_batch(&request.data_header, &request.payload)
420        .context(FlightCodecSnafu)?;
421    decoder_timer.observe_duration();
422    Ok(vec![(
423        region_id,
424        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
425            region_id,
426            payload,
427            raw_data: request,
428            partition_expr_version,
429        }),
430    )])
431}
432
433fn make_region_apply_staging_manifest(
434    api::v1::region::ApplyStagingManifestRequest {
435        region_id,
436        partition_expr,
437        central_region_id,
438        manifest_path,
439    }: api::v1::region::ApplyStagingManifestRequest,
440) -> Result<Vec<(RegionId, RegionRequest)>> {
441    let region_id = region_id.into();
442    Ok(vec![(
443        region_id,
444        RegionRequest::ApplyStagingManifest(ApplyStagingManifestRequest {
445            partition_expr,
446            central_region_id: central_region_id.into(),
447            manifest_path,
448        }),
449    )])
450}
451
452/// Request to put data into a region.
453#[derive(Debug)]
454pub struct RegionPutRequest {
455    /// Rows to put.
456    pub rows: Rows,
457    /// Write hint.
458    pub hint: Option<WriteHint>,
459    /// Partition expression version for the region.
460    pub partition_expr_version: Option<u64>,
461}
462
463#[derive(Debug)]
464pub struct RegionReadRequest {
465    pub request: ScanRequest,
466}
467
468/// Request to delete data from a region.
469#[derive(Debug)]
470pub struct RegionDeleteRequest {
471    /// Keys to rows to delete.
472    ///
473    /// Each row only contains primary key columns and a time index column.
474    pub rows: Rows,
475    /// Write hint.
476    pub hint: Option<WriteHint>,
477    /// Partition expression version for the region.
478    pub partition_expr_version: Option<u64>,
479}
480
481#[derive(Debug, Clone)]
482pub struct RegionCreateRequest {
483    /// Region engine name
484    pub engine: String,
485    /// Columns in this region.
486    pub column_metadatas: Vec<ColumnMetadata>,
487    /// Columns in the primary key.
488    pub primary_key: Vec<ColumnId>,
489    /// Options of the created region.
490    pub options: HashMap<String, String>,
491    /// Directory for table's data home. Usually is composed by catalog and table id
492    pub table_dir: String,
493    /// Path type for generating paths
494    pub path_type: PathType,
495    /// Partition expression JSON from table metadata. Set to empty string for a region without partition.
496    /// `Option` to keep compatibility with old clients.
497    pub partition_expr_json: Option<String>,
498}
499
500impl RegionCreateRequest {
501    /// Checks whether the request is valid, returns an error if it is invalid.
502    pub fn validate(&self) -> Result<()> {
503        // time index must exist
504        ensure!(
505            self.column_metadatas
506                .iter()
507                .any(|x| x.semantic_type == SemanticType::Timestamp),
508            InvalidRegionRequestSnafu {
509                region_id: RegionId::new(0, 0),
510                err: "missing timestamp column in create region request".to_string(),
511            }
512        );
513
514        // build column id to indices
515        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
516        for (i, c) in self.column_metadatas.iter().enumerate() {
517            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
518                return InvalidRegionRequestSnafu {
519                    region_id: RegionId::new(0, 0),
520                    err: format!(
521                        "duplicate column id {} (at position {} and {}) in create region request",
522                        c.column_id, previous, i
523                    ),
524                }
525                .fail();
526            }
527        }
528
529        // primary key must exist
530        for column_id in &self.primary_key {
531            ensure!(
532                column_id_to_indices.contains_key(column_id),
533                InvalidRegionRequestSnafu {
534                    region_id: RegionId::new(0, 0),
535                    err: format!(
536                        "missing primary key column {} in create region request",
537                        column_id
538                    ),
539                }
540            );
541        }
542
543        Ok(())
544    }
545
546    /// Returns true when the region belongs to the metric engine's physical table.
547    pub fn is_physical_table(&self) -> bool {
548        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
549    }
550}
551
552#[derive(Debug, Clone)]
553pub struct RegionDropRequest {
554    /// Enables fast-path drop optimizations for logical regions.
555    /// Only applicable to the Metric Engine; ignored by others.
556    pub fast_path: bool,
557
558    /// Forces the drop of a physical region and all its associated logical regions.
559    /// Only relevant for physical regions managed by the Metric Engine.
560    pub force: bool,
561
562    /// If true, indicates that only a portion of the region is being dropped, and files may still be referenced by other regions.
563    /// This is used to prevent deletion of files that are still in use by other regions.
564    pub partial_drop: bool,
565}
566
567/// Open region request.
568#[derive(Debug, Clone)]
569pub struct RegionOpenRequest {
570    /// Region engine name
571    pub engine: String,
572    /// Directory for table's data home. Usually is composed by catalog and table id
573    pub table_dir: String,
574    /// Path type for generating paths
575    pub path_type: PathType,
576    /// Options of the opened region.
577    pub options: HashMap<String, String>,
578    /// To skip replaying the WAL.
579    pub skip_wal_replay: bool,
580    /// Replay checkpoint.
581    pub checkpoint: Option<ReplayCheckpoint>,
582}
583
584#[derive(Debug, Clone, Copy, PartialEq, Eq)]
585pub struct ReplayCheckpoint {
586    pub entry_id: u64,
587    pub metadata_entry_id: Option<u64>,
588}
589
590impl RegionOpenRequest {
591    /// Returns true when the region belongs to the metric engine's physical table.
592    pub fn is_physical_table(&self) -> bool {
593        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
594    }
595}
596
597/// Close region request.
598#[derive(Debug)]
599pub struct RegionCloseRequest {}
600
601/// Alter metadata of a region.
602#[derive(Debug, PartialEq, Eq, Clone)]
603pub struct RegionAlterRequest {
604    /// Kind of alteration to do.
605    pub kind: AlterKind,
606}
607
608impl RegionAlterRequest {
609    /// Checks whether the request is valid, returns an error if it is invalid.
610    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
611        self.kind.validate(metadata)?;
612
613        Ok(())
614    }
615
616    /// Returns true if we need to apply the request to the region.
617    ///
618    /// The `request` should be valid.
619    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
620        debug_assert!(self.validate(metadata).is_ok());
621        self.kind.need_alter(metadata)
622    }
623}
624
625impl TryFrom<AlterRequest> for RegionAlterRequest {
626    type Error = MetadataError;
627
628    fn try_from(value: AlterRequest) -> Result<Self> {
629        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
630            err: "missing kind in AlterRequest",
631        })?;
632
633        let kind = AlterKind::try_from(kind)?;
634        Ok(RegionAlterRequest { kind })
635    }
636}
637
638/// Kind of the alteration.
639#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
640pub enum AlterKind {
641    /// Add columns to the region.
642    AddColumns {
643        /// Columns to add.
644        columns: Vec<AddColumn>,
645    },
646    /// Drop columns from the region, only fields are allowed to drop.
647    DropColumns {
648        /// Name of columns to drop.
649        names: Vec<String>,
650    },
651    /// Change columns datatype form the region, only fields are allowed to change.
652    ModifyColumnTypes {
653        /// Columns to change.
654        columns: Vec<ModifyColumnType>,
655    },
656    /// Set region options.
657    SetRegionOptions { options: Vec<SetRegionOption> },
658    /// Unset region options.
659    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
660    /// Set index options.
661    SetIndexes { options: Vec<SetIndexOption> },
662    /// Unset index options.
663    UnsetIndexes { options: Vec<UnsetIndexOption> },
664    /// Drop column default value.
665    DropDefaults {
666        /// Name of columns to drop.
667        names: Vec<String>,
668    },
669    /// Set column default value.
670    SetDefaults {
671        /// Columns to change.
672        columns: Vec<SetDefault>,
673    },
674    /// Sync column metadatas.
675    SyncColumns {
676        column_metadatas: Vec<ColumnMetadata>,
677    },
678}
679#[derive(Debug, PartialEq, Eq, Clone)]
680pub struct SetDefault {
681    pub name: String,
682    pub default_constraint: Vec<u8>,
683}
684
685#[derive(Debug, PartialEq, Eq, Clone)]
686pub enum SetIndexOption {
687    Fulltext {
688        column_name: String,
689        options: FulltextOptions,
690    },
691    Inverted {
692        column_name: String,
693    },
694    Skipping {
695        column_name: String,
696        options: SkippingIndexOptions,
697    },
698}
699
700impl SetIndexOption {
701    /// Returns the column name of the index option.
702    pub fn column_name(&self) -> &String {
703        match self {
704            SetIndexOption::Fulltext { column_name, .. } => column_name,
705            SetIndexOption::Inverted { column_name } => column_name,
706            SetIndexOption::Skipping { column_name, .. } => column_name,
707        }
708    }
709
710    /// Returns true if the index option is fulltext.
711    pub fn is_fulltext(&self) -> bool {
712        match self {
713            SetIndexOption::Fulltext { .. } => true,
714            SetIndexOption::Inverted { .. } => false,
715            SetIndexOption::Skipping { .. } => false,
716        }
717    }
718}
719
720impl TryFrom<v1::SetIndex> for SetIndexOption {
721    type Error = MetadataError;
722
723    fn try_from(value: v1::SetIndex) -> Result<Self> {
724        let option = value.options.context(InvalidRawRegionRequestSnafu {
725            err: "missing options in SetIndex",
726        })?;
727
728        let opt = match option {
729            v1::set_index::Options::Fulltext(x) => SetIndexOption::Fulltext {
730                column_name: x.column_name.clone(),
731                options: FulltextOptions::new(
732                    x.enable,
733                    as_fulltext_option_analyzer(
734                        Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
735                    ),
736                    x.case_sensitive,
737                    as_fulltext_option_backend(
738                        PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
739                    ),
740                    x.granularity as u32,
741                    x.false_positive_rate,
742                )
743                .context(InvalidIndexOptionSnafu)?,
744            },
745            v1::set_index::Options::Inverted(i) => SetIndexOption::Inverted {
746                column_name: i.column_name,
747            },
748            v1::set_index::Options::Skipping(s) => SetIndexOption::Skipping {
749                column_name: s.column_name,
750                options: SkippingIndexOptions::new(
751                    s.granularity as u32,
752                    s.false_positive_rate,
753                    as_skipping_index_type(
754                        PbSkippingIndexType::try_from(s.skipping_index_type)
755                            .context(DecodeProtoSnafu)?,
756                    ),
757                )
758                .context(InvalidIndexOptionSnafu)?,
759            },
760        };
761
762        Ok(opt)
763    }
764}
765
766#[derive(Debug, PartialEq, Eq, Clone)]
767pub enum UnsetIndexOption {
768    Fulltext { column_name: String },
769    Inverted { column_name: String },
770    Skipping { column_name: String },
771}
772
773impl UnsetIndexOption {
774    pub fn column_name(&self) -> &String {
775        match self {
776            UnsetIndexOption::Fulltext { column_name } => column_name,
777            UnsetIndexOption::Inverted { column_name } => column_name,
778            UnsetIndexOption::Skipping { column_name } => column_name,
779        }
780    }
781
782    pub fn is_fulltext(&self) -> bool {
783        match self {
784            UnsetIndexOption::Fulltext { .. } => true,
785            UnsetIndexOption::Inverted { .. } => false,
786            UnsetIndexOption::Skipping { .. } => false,
787        }
788    }
789}
790
791impl TryFrom<v1::UnsetIndex> for UnsetIndexOption {
792    type Error = MetadataError;
793
794    fn try_from(value: v1::UnsetIndex) -> Result<Self> {
795        let option = value.options.context(InvalidRawRegionRequestSnafu {
796            err: "missing options in UnsetIndex",
797        })?;
798
799        let opt = match option {
800            v1::unset_index::Options::Fulltext(f) => UnsetIndexOption::Fulltext {
801                column_name: f.column_name,
802            },
803            v1::unset_index::Options::Inverted(i) => UnsetIndexOption::Inverted {
804                column_name: i.column_name,
805            },
806            v1::unset_index::Options::Skipping(s) => UnsetIndexOption::Skipping {
807                column_name: s.column_name,
808            },
809        };
810
811        Ok(opt)
812    }
813}
814
815impl AlterKind {
816    /// Returns an error if the alter kind is invalid.
817    ///
818    /// It allows adding column if not exists and dropping column if exists.
819    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
820        match self {
821            AlterKind::AddColumns { columns } => {
822                for col_to_add in columns {
823                    col_to_add.validate(metadata)?;
824                }
825            }
826            AlterKind::DropColumns { names } => {
827                for name in names {
828                    Self::validate_column_to_drop(name, metadata)?;
829                }
830            }
831            AlterKind::ModifyColumnTypes { columns } => {
832                for col_to_change in columns {
833                    col_to_change.validate(metadata)?;
834                }
835            }
836            AlterKind::SetRegionOptions { .. } => {}
837            AlterKind::UnsetRegionOptions { .. } => {}
838            AlterKind::SetIndexes { options } => {
839                for option in options {
840                    Self::validate_column_alter_index_option(
841                        option.column_name(),
842                        metadata,
843                        option.is_fulltext(),
844                    )?;
845                }
846            }
847            AlterKind::UnsetIndexes { options } => {
848                for option in options {
849                    Self::validate_column_alter_index_option(
850                        option.column_name(),
851                        metadata,
852                        option.is_fulltext(),
853                    )?;
854                }
855            }
856            AlterKind::DropDefaults { names } => {
857                names
858                    .iter()
859                    .try_for_each(|name| Self::validate_column_existence(name, metadata))?;
860            }
861            AlterKind::SetDefaults { columns } => {
862                columns
863                    .iter()
864                    .try_for_each(|col| Self::validate_column_existence(&col.name, metadata))?;
865            }
866            AlterKind::SyncColumns { column_metadatas } => {
867                let new_primary_keys = column_metadatas
868                    .iter()
869                    .filter(|c| c.semantic_type == SemanticType::Tag)
870                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
871                    .collect::<HashMap<_, _>>();
872
873                let old_primary_keys = metadata
874                    .column_metadatas
875                    .iter()
876                    .filter(|c| c.semantic_type == SemanticType::Tag)
877                    .map(|c| (c.column_schema.name.as_str(), c.column_id));
878
879                for (name, id) in old_primary_keys {
880                    let primary_key =
881                        new_primary_keys
882                            .get(name)
883                            .with_context(|| InvalidRegionRequestSnafu {
884                                region_id: metadata.region_id,
885                                err: format!("column {} is not a primary key", name),
886                            })?;
887
888                    ensure!(
889                        *primary_key == id,
890                        InvalidRegionRequestSnafu {
891                            region_id: metadata.region_id,
892                            err: format!(
893                                "column with same name {} has different id, existing: {}, got: {}",
894                                name, id, primary_key
895                            ),
896                        }
897                    );
898                }
899
900                let new_ts_column = column_metadatas
901                    .iter()
902                    .find(|c| c.semantic_type == SemanticType::Timestamp)
903                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
904                    .context(InvalidRegionRequestSnafu {
905                        region_id: metadata.region_id,
906                        err: "timestamp column not found",
907                    })?;
908
909                // Safety: timestamp column must exist.
910                let old_ts_column = metadata
911                    .column_metadatas
912                    .iter()
913                    .find(|c| c.semantic_type == SemanticType::Timestamp)
914                    .map(|c| (c.column_schema.name.as_str(), c.column_id))
915                    .unwrap();
916
917                ensure!(
918                    new_ts_column == old_ts_column,
919                    InvalidRegionRequestSnafu {
920                        region_id: metadata.region_id,
921                        err: format!(
922                            "timestamp column {} has different id, existing: {}, got: {}",
923                            old_ts_column.0, old_ts_column.1, new_ts_column.1
924                        ),
925                    }
926                );
927            }
928        }
929        Ok(())
930    }
931
932    /// Returns true if we need to apply the alteration to the region.
933    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
934        debug_assert!(self.validate(metadata).is_ok());
935        match self {
936            AlterKind::AddColumns { columns } => columns
937                .iter()
938                .any(|col_to_add| col_to_add.need_alter(metadata)),
939            AlterKind::DropColumns { names } => names
940                .iter()
941                .any(|name| metadata.column_by_name(name).is_some()),
942            AlterKind::ModifyColumnTypes { columns } => columns
943                .iter()
944                .any(|col_to_change| col_to_change.need_alter(metadata)),
945            AlterKind::SetRegionOptions { .. } => true,
946            AlterKind::UnsetRegionOptions { .. } => true,
947            AlterKind::SetIndexes { options, .. } => options
948                .iter()
949                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
950            AlterKind::UnsetIndexes { options } => options
951                .iter()
952                .any(|option| metadata.column_by_name(option.column_name()).is_some()),
953            AlterKind::DropDefaults { names } => names
954                .iter()
955                .any(|name| metadata.column_by_name(name).is_some()),
956
957            AlterKind::SetDefaults { columns } => columns
958                .iter()
959                .any(|x| metadata.column_by_name(&x.name).is_some()),
960            AlterKind::SyncColumns { column_metadatas } => {
961                metadata.column_metadatas != *column_metadatas
962            }
963        }
964    }
965
966    /// Returns an error if the column to drop is invalid.
967    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
968        let Some(column) = metadata.column_by_name(name) else {
969            return Ok(());
970        };
971        ensure!(
972            column.semantic_type == SemanticType::Field,
973            InvalidRegionRequestSnafu {
974                region_id: metadata.region_id,
975                err: format!("column {} is not a field and could not be dropped", name),
976            }
977        );
978        Ok(())
979    }
980
981    /// Returns an error if the column's alter index option is invalid.
982    fn validate_column_alter_index_option(
983        column_name: &String,
984        metadata: &RegionMetadata,
985        is_fulltext: bool,
986    ) -> Result<()> {
987        let column = metadata
988            .column_by_name(column_name)
989            .context(InvalidRegionRequestSnafu {
990                region_id: metadata.region_id,
991                err: format!("column {} not found", column_name),
992            })?;
993
994        if is_fulltext {
995            ensure!(
996                column.column_schema.data_type.is_string(),
997                InvalidRegionRequestSnafu {
998                    region_id: metadata.region_id,
999                    err: format!(
1000                        "cannot change alter index options for non-string column {}",
1001                        column_name
1002                    ),
1003                }
1004            );
1005        }
1006
1007        Ok(())
1008    }
1009
1010    /// Returns an error if the column isn't exist.
1011    fn validate_column_existence(column_name: &String, metadata: &RegionMetadata) -> Result<()> {
1012        metadata
1013            .column_by_name(column_name)
1014            .context(InvalidRegionRequestSnafu {
1015                region_id: metadata.region_id,
1016                err: format!("column {} not found", column_name),
1017            })?;
1018
1019        Ok(())
1020    }
1021}
1022
1023impl TryFrom<alter_request::Kind> for AlterKind {
1024    type Error = MetadataError;
1025
1026    fn try_from(kind: alter_request::Kind) -> Result<Self> {
1027        let alter_kind = match kind {
1028            alter_request::Kind::AddColumns(x) => {
1029                let columns = x
1030                    .add_columns
1031                    .into_iter()
1032                    .map(|x| x.try_into())
1033                    .collect::<Result<Vec<_>>>()?;
1034                AlterKind::AddColumns { columns }
1035            }
1036            alter_request::Kind::ModifyColumnTypes(x) => {
1037                let columns = x
1038                    .modify_column_types
1039                    .into_iter()
1040                    .map(|x| x.into())
1041                    .collect::<Vec<_>>();
1042                AlterKind::ModifyColumnTypes { columns }
1043            }
1044            alter_request::Kind::DropColumns(x) => {
1045                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
1046                AlterKind::DropColumns { names }
1047            }
1048            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
1049                options: options
1050                    .table_options
1051                    .iter()
1052                    .map(TryFrom::try_from)
1053                    .collect::<Result<Vec<_>>>()?,
1054            },
1055            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
1056                keys: options
1057                    .keys
1058                    .iter()
1059                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
1060                    .collect::<Result<Vec<_>>>()?,
1061            },
1062            alter_request::Kind::SetIndex(o) => AlterKind::SetIndexes {
1063                options: vec![SetIndexOption::try_from(o)?],
1064            },
1065            alter_request::Kind::UnsetIndex(o) => AlterKind::UnsetIndexes {
1066                options: vec![UnsetIndexOption::try_from(o)?],
1067            },
1068            alter_request::Kind::SetIndexes(o) => AlterKind::SetIndexes {
1069                options: o
1070                    .set_indexes
1071                    .into_iter()
1072                    .map(SetIndexOption::try_from)
1073                    .collect::<Result<Vec<_>>>()?,
1074            },
1075            alter_request::Kind::UnsetIndexes(o) => AlterKind::UnsetIndexes {
1076                options: o
1077                    .unset_indexes
1078                    .into_iter()
1079                    .map(UnsetIndexOption::try_from)
1080                    .collect::<Result<Vec<_>>>()?,
1081            },
1082            alter_request::Kind::DropDefaults(x) => AlterKind::DropDefaults {
1083                names: x.drop_defaults.into_iter().map(|x| x.column_name).collect(),
1084            },
1085            alter_request::Kind::SetDefaults(x) => AlterKind::SetDefaults {
1086                columns: x
1087                    .set_defaults
1088                    .into_iter()
1089                    .map(|x| {
1090                        Ok(SetDefault {
1091                            name: x.column_name,
1092                            default_constraint: x.default_constraint.clone(),
1093                        })
1094                    })
1095                    .collect::<Result<Vec<_>>>()?,
1096            },
1097            alter_request::Kind::SyncColumns(x) => AlterKind::SyncColumns {
1098                column_metadatas: x
1099                    .column_defs
1100                    .into_iter()
1101                    .map(ColumnMetadata::try_from_column_def)
1102                    .collect::<Result<Vec<_>>>()?,
1103            },
1104        };
1105
1106        Ok(alter_kind)
1107    }
1108}
1109
1110/// Adds a column.
1111#[derive(Debug, PartialEq, Eq, Clone)]
1112pub struct AddColumn {
1113    /// Metadata of the column to add.
1114    pub column_metadata: ColumnMetadata,
1115    /// Location to add the column. If location is None, the region adds
1116    /// the column to the last.
1117    pub location: Option<AddColumnLocation>,
1118}
1119
1120impl AddColumn {
1121    /// Returns an error if the column to add is invalid.
1122    ///
1123    /// It allows adding existing columns. However, the existing column must have the same metadata
1124    /// and the location must be None.
1125    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1126        ensure!(
1127            self.column_metadata.column_schema.is_nullable()
1128                || self
1129                    .column_metadata
1130                    .column_schema
1131                    .default_constraint()
1132                    .is_some(),
1133            InvalidRegionRequestSnafu {
1134                region_id: metadata.region_id,
1135                err: format!(
1136                    "no default value for column {}",
1137                    self.column_metadata.column_schema.name
1138                ),
1139            }
1140        );
1141
1142        if let Some(existing_column) =
1143            metadata.column_by_name(&self.column_metadata.column_schema.name)
1144        {
1145            // If the column already exists.
1146            ensure!(
1147                *existing_column == self.column_metadata,
1148                InvalidRegionRequestSnafu {
1149                    region_id: metadata.region_id,
1150                    err: format!(
1151                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
1152                        self.column_metadata.column_schema.name,
1153                        existing_column,
1154                        self.column_metadata,
1155                    ),
1156                }
1157            );
1158            ensure!(
1159                self.location.is_none(),
1160                InvalidRegionRequestSnafu {
1161                    region_id: metadata.region_id,
1162                    err: format!(
1163                        "column {} already exists, but location is specified",
1164                        self.column_metadata.column_schema.name
1165                    ),
1166                }
1167            );
1168        }
1169
1170        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
1171            // Ensures the existing column has the same name.
1172            ensure!(
1173                existing_column.column_schema.name == self.column_metadata.column_schema.name,
1174                InvalidRegionRequestSnafu {
1175                    region_id: metadata.region_id,
1176                    err: format!(
1177                        "column id {} already exists with different name {}",
1178                        self.column_metadata.column_id, existing_column.column_schema.name
1179                    ),
1180                }
1181            );
1182        }
1183
1184        Ok(())
1185    }
1186
1187    /// Returns true if no column to add to the region.
1188    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1189        debug_assert!(self.validate(metadata).is_ok());
1190        metadata
1191            .column_by_name(&self.column_metadata.column_schema.name)
1192            .is_none()
1193    }
1194}
1195
1196impl TryFrom<v1::region::AddColumn> for AddColumn {
1197    type Error = MetadataError;
1198
1199    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
1200        let column_def = add_column
1201            .column_def
1202            .context(InvalidRawRegionRequestSnafu {
1203                err: "missing column_def in AddColumn",
1204            })?;
1205
1206        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
1207        let location = add_column
1208            .location
1209            .map(AddColumnLocation::try_from)
1210            .transpose()?;
1211
1212        Ok(AddColumn {
1213            column_metadata,
1214            location,
1215        })
1216    }
1217}
1218
1219/// Location to add a column.
1220#[derive(Debug, PartialEq, Eq, Clone)]
1221pub enum AddColumnLocation {
1222    /// Add the column to the first position of columns.
1223    First,
1224    /// Add the column after specific column.
1225    After {
1226        /// Add the column after this column.
1227        column_name: String,
1228    },
1229}
1230
1231impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
1232    type Error = MetadataError;
1233
1234    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
1235        let location_type = LocationType::try_from(location.location_type)
1236            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
1237        let add_column_location = match location_type {
1238            LocationType::First => AddColumnLocation::First,
1239            LocationType::After => AddColumnLocation::After {
1240                column_name: location.after_column_name,
1241            },
1242        };
1243
1244        Ok(add_column_location)
1245    }
1246}
1247
1248/// Change a column's datatype.
1249#[derive(Debug, PartialEq, Eq, Clone)]
1250pub struct ModifyColumnType {
1251    /// Schema of the column to modify.
1252    pub column_name: String,
1253    /// Column will be changed to this type.
1254    pub target_type: ConcreteDataType,
1255}
1256
1257impl ModifyColumnType {
1258    /// Returns an error if the column's datatype to change is invalid.
1259    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
1260        let column_meta = metadata
1261            .column_by_name(&self.column_name)
1262            .with_context(|| InvalidRegionRequestSnafu {
1263                region_id: metadata.region_id,
1264                err: format!("column {} not found", self.column_name),
1265            })?;
1266
1267        ensure!(
1268            matches!(column_meta.semantic_type, SemanticType::Field),
1269            InvalidRegionRequestSnafu {
1270                region_id: metadata.region_id,
1271                err: "'timestamp' or 'tag' column cannot change type".to_string()
1272            }
1273        );
1274        ensure!(
1275            column_meta
1276                .column_schema
1277                .data_type
1278                .can_arrow_type_cast_to(&self.target_type),
1279            InvalidRegionRequestSnafu {
1280                region_id: metadata.region_id,
1281                err: format!(
1282                    "column '{}' cannot be cast automatically to type '{}'",
1283                    self.column_name, self.target_type
1284                ),
1285            }
1286        );
1287
1288        Ok(())
1289    }
1290
1291    /// Returns true if no column's datatype to change to the region.
1292    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
1293        debug_assert!(self.validate(metadata).is_ok());
1294        metadata.column_by_name(&self.column_name).is_some()
1295    }
1296}
1297
1298impl From<v1::ModifyColumnType> for ModifyColumnType {
1299    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
1300        let target_type = ColumnDataTypeWrapper::new(
1301            modify_column_type.target_type(),
1302            modify_column_type.target_type_extension,
1303        )
1304        .into();
1305
1306        ModifyColumnType {
1307            column_name: modify_column_type.column_name,
1308            target_type,
1309        }
1310    }
1311}
1312
1313#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1314pub enum SetRegionOption {
1315    Ttl(Option<TimeToLive>),
1316    // Modifying TwscOptions with values as (option name, new value).
1317    Twsc(String, String),
1318    // Modifying the SST format.
1319    Format(String),
1320    // Modifying the append mode.
1321    AppendMode(bool),
1322}
1323
1324impl TryFrom<&PbOption> for SetRegionOption {
1325    type Error = MetadataError;
1326
1327    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1328        let PbOption { key, value } = value;
1329        match key.as_str() {
1330            TTL_KEY => {
1331                let ttl = TimeToLive::from_humantime_or_str(value)
1332                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1333
1334                Ok(Self::Ttl(Some(ttl)))
1335            }
1336            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1337                Ok(Self::Twsc(key.clone(), value.clone()))
1338            }
1339            SST_FORMAT_KEY => Ok(Self::Format(value.clone())),
1340            APPEND_MODE_KEY => {
1341                let append_mode = value
1342                    .parse::<bool>()
1343                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1344                Ok(Self::AppendMode(append_mode))
1345            }
1346            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1347        }
1348    }
1349}
1350
1351impl From<&UnsetRegionOption> for SetRegionOption {
1352    fn from(unset_option: &UnsetRegionOption) -> Self {
1353        match unset_option {
1354            UnsetRegionOption::TwcsTriggerFileNum => {
1355                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1356            }
1357            UnsetRegionOption::TwcsMaxOutputFileSize => {
1358                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1359            }
1360            UnsetRegionOption::TwcsTimeWindow => {
1361                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1362            }
1363            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1364        }
1365    }
1366}
1367
1368impl TryFrom<&str> for UnsetRegionOption {
1369    type Error = MetadataError;
1370
1371    fn try_from(key: &str) -> Result<Self> {
1372        match key.to_ascii_lowercase().as_str() {
1373            TTL_KEY => Ok(Self::Ttl),
1374            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1375            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1376            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1377            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1378        }
1379    }
1380}
1381
1382#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1383pub enum UnsetRegionOption {
1384    TwcsTriggerFileNum,
1385    TwcsMaxOutputFileSize,
1386    TwcsTimeWindow,
1387    Ttl,
1388}
1389
1390impl UnsetRegionOption {
1391    pub fn as_str(&self) -> &str {
1392        match self {
1393            Self::Ttl => TTL_KEY,
1394            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1395            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1396            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1397        }
1398    }
1399}
1400
1401impl Display for UnsetRegionOption {
1402    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1403        write!(f, "{}", self.as_str())
1404    }
1405}
1406
1407#[derive(Debug, Clone, Default)]
1408pub struct RegionFlushRequest {
1409    pub row_group_size: Option<usize>,
1410}
1411
1412#[derive(Debug)]
1413pub struct RegionCompactRequest {
1414    pub options: compact_request::Options,
1415    pub parallelism: Option<u32>,
1416}
1417
1418impl Default for RegionCompactRequest {
1419    fn default() -> Self {
1420        Self {
1421            // Default to regular compaction.
1422            options: compact_request::Options::Regular(Default::default()),
1423            parallelism: None,
1424        }
1425    }
1426}
1427
1428#[derive(Debug, Clone, Default)]
1429pub struct RegionBuildIndexRequest {}
1430
1431/// Truncate region request.
1432#[derive(Debug)]
1433pub enum RegionTruncateRequest {
1434    /// Truncate all data in the region.
1435    All,
1436    ByTimeRanges {
1437        /// Time ranges to truncate. Both bound are inclusive.
1438        /// only files that are fully contained in the time range will be truncated.
1439        /// so no guarantee that all data in the time range will be truncated.
1440        time_ranges: Vec<(Timestamp, Timestamp)>,
1441    },
1442}
1443
1444/// Catchup region request.
1445///
1446/// Makes a readonly region to catch up to leader region changes.
1447/// There is no effect if it operating on a leader region.
1448#[derive(Debug, Clone, Copy, Default)]
1449pub struct RegionCatchupRequest {
1450    /// Sets it to writable if it's available after it has caught up with all changes.
1451    pub set_writable: bool,
1452    /// The `entry_id` that was expected to reply to.
1453    /// `None` stands replaying to latest.
1454    pub entry_id: Option<entry::Id>,
1455    /// Used for metrics metadata region.
1456    /// The `entry_id` that was expected to reply to.
1457    /// `None` stands replaying to latest.
1458    pub metadata_entry_id: Option<entry::Id>,
1459    /// The hint for replaying memtable.
1460    pub location_id: Option<u64>,
1461    /// Replay checkpoint.
1462    pub checkpoint: Option<ReplayCheckpoint>,
1463}
1464
1465#[derive(Debug, Clone)]
1466pub struct RegionBulkInsertsRequest {
1467    pub region_id: RegionId,
1468    pub payload: DfRecordBatch,
1469    pub raw_data: ArrowIpc,
1470    pub partition_expr_version: Option<u64>,
1471}
1472
1473impl RegionBulkInsertsRequest {
1474    pub fn estimated_size(&self) -> usize {
1475        self.payload.get_array_memory_size()
1476    }
1477}
1478
1479/// Request to stage a region with a new partition directive.
1480///
1481/// This request transitions a region into the staging mode.
1482/// It first flushes the memtable for the old partition expression if it is not
1483/// empty, then enters the staging mode with the new directive.
1484#[derive(Debug, Clone, PartialEq, Eq)]
1485pub enum StagingPartitionDirective {
1486    UpdatePartitionExpr(String),
1487    RejectAllWrites,
1488}
1489
1490impl StagingPartitionDirective {
1491    /// Returns the partition expression carried by this directive, if any.
1492    pub fn partition_expr(&self) -> Option<&str> {
1493        match self {
1494            Self::UpdatePartitionExpr(expr) => Some(expr),
1495            Self::RejectAllWrites => None,
1496        }
1497    }
1498}
1499
1500#[derive(Debug, Clone)]
1501pub struct EnterStagingRequest {
1502    /// The staging partition directive of the region.
1503    pub partition_directive: StagingPartitionDirective,
1504}
1505
1506impl EnterStagingRequest {
1507    /// Builds an enter-staging request with a partition expression directive.
1508    pub fn with_partition_expr(partition_expr: String) -> Self {
1509        Self {
1510            partition_directive: StagingPartitionDirective::UpdatePartitionExpr(partition_expr),
1511        }
1512    }
1513}
1514
1515/// This request is used as part of the region repartition.
1516///
1517/// After a region has entered staging mode with a new partition expression
1518/// expression) and a separate process (for example, `remap_manifests`) has
1519/// generated the new file assignments for the staging region, this request
1520/// applies that generated manifest to the region.
1521///
1522/// In practice, this means:
1523/// - The `partition_expr` identifies the staging partition expression that the manifest
1524///   was generated for.
1525/// - `central_region_id` specifies which region holds the staging blob storage
1526///   where the manifest was written during the `remap_manifests` operation.
1527/// - `manifest_path` is the relative path within the central region's staging
1528///   blob storage to fetch the generated manifest.
1529///
1530/// It should typically be called **after** the staging region has been
1531/// initialized by [`EnterStagingRequest`] and the new file layout has been
1532/// computed, to finalize the repartition operation.
1533#[derive(Debug, Clone)]
1534pub struct ApplyStagingManifestRequest {
1535    /// The partition expression of the staging region.
1536    pub partition_expr: String,
1537    /// The region that stores the staging manifests in its staging blob storage.
1538    pub central_region_id: RegionId,
1539    /// The relative path to the staging manifest within the central region's
1540    /// staging blob storage.
1541    pub manifest_path: String,
1542}
1543
1544impl fmt::Display for RegionRequest {
1545    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1546        match self {
1547            RegionRequest::Put(_) => write!(f, "Put"),
1548            RegionRequest::Delete(_) => write!(f, "Delete"),
1549            RegionRequest::Create(_) => write!(f, "Create"),
1550            RegionRequest::Drop(_) => write!(f, "Drop"),
1551            RegionRequest::Open(_) => write!(f, "Open"),
1552            RegionRequest::Close(_) => write!(f, "Close"),
1553            RegionRequest::Alter(_) => write!(f, "Alter"),
1554            RegionRequest::Flush(_) => write!(f, "Flush"),
1555            RegionRequest::Compact(_) => write!(f, "Compact"),
1556            RegionRequest::BuildIndex(_) => write!(f, "BuildIndex"),
1557            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1558            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1559            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1560            RegionRequest::EnterStaging(_) => write!(f, "EnterStaging"),
1561            RegionRequest::ApplyStagingManifest(_) => write!(f, "ApplyStagingManifest"),
1562        }
1563    }
1564}
1565
1566#[cfg(test)]
1567mod tests {
1568
1569    use api::v1::region::RegionColumnDef;
1570    use api::v1::{ColumnDataType, ColumnDef};
1571    use datatypes::prelude::ConcreteDataType;
1572    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1573
1574    use super::*;
1575    use crate::metadata::RegionMetadataBuilder;
1576
1577    #[test]
1578    fn test_from_proto_location() {
1579        let proto_location = v1::AddColumnLocation {
1580            location_type: LocationType::First as i32,
1581            after_column_name: String::default(),
1582        };
1583        let location = AddColumnLocation::try_from(proto_location).unwrap();
1584        assert_eq!(location, AddColumnLocation::First);
1585
1586        let proto_location = v1::AddColumnLocation {
1587            location_type: 10,
1588            after_column_name: String::default(),
1589        };
1590        AddColumnLocation::try_from(proto_location).unwrap_err();
1591
1592        let proto_location = v1::AddColumnLocation {
1593            location_type: LocationType::After as i32,
1594            after_column_name: "a".to_string(),
1595        };
1596        let location = AddColumnLocation::try_from(proto_location).unwrap();
1597        assert_eq!(
1598            location,
1599            AddColumnLocation::After {
1600                column_name: "a".to_string()
1601            }
1602        );
1603    }
1604
1605    #[test]
1606    fn test_from_none_proto_add_column() {
1607        AddColumn::try_from(v1::region::AddColumn {
1608            column_def: None,
1609            location: None,
1610        })
1611        .unwrap_err();
1612    }
1613
1614    #[test]
1615    fn test_from_proto_alter_request() {
1616        RegionAlterRequest::try_from(AlterRequest {
1617            region_id: 0,
1618            schema_version: 1,
1619            kind: None,
1620        })
1621        .unwrap_err();
1622
1623        let request = RegionAlterRequest::try_from(AlterRequest {
1624            region_id: 0,
1625            schema_version: 1,
1626            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1627                add_columns: vec![v1::region::AddColumn {
1628                    column_def: Some(RegionColumnDef {
1629                        column_def: Some(ColumnDef {
1630                            name: "a".to_string(),
1631                            data_type: ColumnDataType::String as i32,
1632                            is_nullable: true,
1633                            default_constraint: vec![],
1634                            semantic_type: SemanticType::Field as i32,
1635                            comment: String::new(),
1636                            ..Default::default()
1637                        }),
1638                        column_id: 1,
1639                    }),
1640                    location: Some(v1::AddColumnLocation {
1641                        location_type: LocationType::First as i32,
1642                        after_column_name: String::default(),
1643                    }),
1644                }],
1645            })),
1646        })
1647        .unwrap();
1648
1649        assert_eq!(
1650            request,
1651            RegionAlterRequest {
1652                kind: AlterKind::AddColumns {
1653                    columns: vec![AddColumn {
1654                        column_metadata: ColumnMetadata {
1655                            column_schema: ColumnSchema::new(
1656                                "a",
1657                                ConcreteDataType::string_datatype(),
1658                                true,
1659                            ),
1660                            semantic_type: SemanticType::Field,
1661                            column_id: 1,
1662                        },
1663                        location: Some(AddColumnLocation::First),
1664                    }]
1665                },
1666            }
1667        );
1668    }
1669
1670    /// Returns a new region metadata for testing. Metadata:
1671    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1672    fn new_metadata() -> RegionMetadata {
1673        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1674        builder
1675            .push_column_metadata(ColumnMetadata {
1676                column_schema: ColumnSchema::new(
1677                    "ts",
1678                    ConcreteDataType::timestamp_millisecond_datatype(),
1679                    false,
1680                ),
1681                semantic_type: SemanticType::Timestamp,
1682                column_id: 1,
1683            })
1684            .push_column_metadata(ColumnMetadata {
1685                column_schema: ColumnSchema::new(
1686                    "tag_0",
1687                    ConcreteDataType::string_datatype(),
1688                    true,
1689                ),
1690                semantic_type: SemanticType::Tag,
1691                column_id: 2,
1692            })
1693            .push_column_metadata(ColumnMetadata {
1694                column_schema: ColumnSchema::new(
1695                    "field_0",
1696                    ConcreteDataType::string_datatype(),
1697                    true,
1698                ),
1699                semantic_type: SemanticType::Field,
1700                column_id: 3,
1701            })
1702            .push_column_metadata(ColumnMetadata {
1703                column_schema: ColumnSchema::new(
1704                    "field_1",
1705                    ConcreteDataType::boolean_datatype(),
1706                    true,
1707                ),
1708                semantic_type: SemanticType::Field,
1709                column_id: 4,
1710            })
1711            .primary_key(vec![2]);
1712        builder.build().unwrap()
1713    }
1714
1715    #[test]
1716    fn test_add_column_validate() {
1717        let metadata = new_metadata();
1718        let add_column = AddColumn {
1719            column_metadata: ColumnMetadata {
1720                column_schema: ColumnSchema::new(
1721                    "tag_1",
1722                    ConcreteDataType::string_datatype(),
1723                    true,
1724                ),
1725                semantic_type: SemanticType::Tag,
1726                column_id: 5,
1727            },
1728            location: None,
1729        };
1730        add_column.validate(&metadata).unwrap();
1731        assert!(add_column.need_alter(&metadata));
1732
1733        // Add not null column.
1734        AddColumn {
1735            column_metadata: ColumnMetadata {
1736                column_schema: ColumnSchema::new(
1737                    "tag_1",
1738                    ConcreteDataType::string_datatype(),
1739                    false,
1740                ),
1741                semantic_type: SemanticType::Tag,
1742                column_id: 5,
1743            },
1744            location: None,
1745        }
1746        .validate(&metadata)
1747        .unwrap_err();
1748
1749        // Add existing column.
1750        let add_column = AddColumn {
1751            column_metadata: ColumnMetadata {
1752                column_schema: ColumnSchema::new(
1753                    "tag_0",
1754                    ConcreteDataType::string_datatype(),
1755                    true,
1756                ),
1757                semantic_type: SemanticType::Tag,
1758                column_id: 2,
1759            },
1760            location: None,
1761        };
1762        add_column.validate(&metadata).unwrap();
1763        assert!(!add_column.need_alter(&metadata));
1764    }
1765
1766    #[test]
1767    fn test_add_duplicate_columns() {
1768        let kind = AlterKind::AddColumns {
1769            columns: vec![
1770                AddColumn {
1771                    column_metadata: ColumnMetadata {
1772                        column_schema: ColumnSchema::new(
1773                            "tag_1",
1774                            ConcreteDataType::string_datatype(),
1775                            true,
1776                        ),
1777                        semantic_type: SemanticType::Tag,
1778                        column_id: 5,
1779                    },
1780                    location: None,
1781                },
1782                AddColumn {
1783                    column_metadata: ColumnMetadata {
1784                        column_schema: ColumnSchema::new(
1785                            "tag_1",
1786                            ConcreteDataType::string_datatype(),
1787                            true,
1788                        ),
1789                        semantic_type: SemanticType::Field,
1790                        column_id: 6,
1791                    },
1792                    location: None,
1793                },
1794            ],
1795        };
1796        let metadata = new_metadata();
1797        kind.validate(&metadata).unwrap();
1798        assert!(kind.need_alter(&metadata));
1799    }
1800
1801    #[test]
1802    fn test_add_existing_column_different_metadata() {
1803        let metadata = new_metadata();
1804
1805        // Add existing column with different id.
1806        let kind = AlterKind::AddColumns {
1807            columns: vec![AddColumn {
1808                column_metadata: ColumnMetadata {
1809                    column_schema: ColumnSchema::new(
1810                        "tag_0",
1811                        ConcreteDataType::string_datatype(),
1812                        true,
1813                    ),
1814                    semantic_type: SemanticType::Tag,
1815                    column_id: 4,
1816                },
1817                location: None,
1818            }],
1819        };
1820        kind.validate(&metadata).unwrap_err();
1821
1822        // Add existing column with different type.
1823        let kind = AlterKind::AddColumns {
1824            columns: vec![AddColumn {
1825                column_metadata: ColumnMetadata {
1826                    column_schema: ColumnSchema::new(
1827                        "tag_0",
1828                        ConcreteDataType::int64_datatype(),
1829                        true,
1830                    ),
1831                    semantic_type: SemanticType::Tag,
1832                    column_id: 2,
1833                },
1834                location: None,
1835            }],
1836        };
1837        kind.validate(&metadata).unwrap_err();
1838
1839        // Add existing column with different name.
1840        let kind = AlterKind::AddColumns {
1841            columns: vec![AddColumn {
1842                column_metadata: ColumnMetadata {
1843                    column_schema: ColumnSchema::new(
1844                        "tag_1",
1845                        ConcreteDataType::string_datatype(),
1846                        true,
1847                    ),
1848                    semantic_type: SemanticType::Tag,
1849                    column_id: 2,
1850                },
1851                location: None,
1852            }],
1853        };
1854        kind.validate(&metadata).unwrap_err();
1855    }
1856
1857    #[test]
1858    fn test_add_existing_column_with_location() {
1859        let metadata = new_metadata();
1860        let kind = AlterKind::AddColumns {
1861            columns: vec![AddColumn {
1862                column_metadata: ColumnMetadata {
1863                    column_schema: ColumnSchema::new(
1864                        "tag_0",
1865                        ConcreteDataType::string_datatype(),
1866                        true,
1867                    ),
1868                    semantic_type: SemanticType::Tag,
1869                    column_id: 2,
1870                },
1871                location: Some(AddColumnLocation::First),
1872            }],
1873        };
1874        kind.validate(&metadata).unwrap_err();
1875    }
1876
1877    #[test]
1878    fn test_validate_drop_column() {
1879        let metadata = new_metadata();
1880        let kind = AlterKind::DropColumns {
1881            names: vec!["xxxx".to_string()],
1882        };
1883        kind.validate(&metadata).unwrap();
1884        assert!(!kind.need_alter(&metadata));
1885
1886        AlterKind::DropColumns {
1887            names: vec!["tag_0".to_string()],
1888        }
1889        .validate(&metadata)
1890        .unwrap_err();
1891
1892        let kind = AlterKind::DropColumns {
1893            names: vec!["field_0".to_string()],
1894        };
1895        kind.validate(&metadata).unwrap();
1896        assert!(kind.need_alter(&metadata));
1897    }
1898
1899    #[test]
1900    fn test_validate_modify_column_type() {
1901        let metadata = new_metadata();
1902        AlterKind::ModifyColumnTypes {
1903            columns: vec![ModifyColumnType {
1904                column_name: "xxxx".to_string(),
1905                target_type: ConcreteDataType::string_datatype(),
1906            }],
1907        }
1908        .validate(&metadata)
1909        .unwrap_err();
1910
1911        AlterKind::ModifyColumnTypes {
1912            columns: vec![ModifyColumnType {
1913                column_name: "field_1".to_string(),
1914                target_type: ConcreteDataType::date_datatype(),
1915            }],
1916        }
1917        .validate(&metadata)
1918        .unwrap_err();
1919
1920        AlterKind::ModifyColumnTypes {
1921            columns: vec![ModifyColumnType {
1922                column_name: "ts".to_string(),
1923                target_type: ConcreteDataType::date_datatype(),
1924            }],
1925        }
1926        .validate(&metadata)
1927        .unwrap_err();
1928
1929        AlterKind::ModifyColumnTypes {
1930            columns: vec![ModifyColumnType {
1931                column_name: "tag_0".to_string(),
1932                target_type: ConcreteDataType::date_datatype(),
1933            }],
1934        }
1935        .validate(&metadata)
1936        .unwrap_err();
1937
1938        let kind = AlterKind::ModifyColumnTypes {
1939            columns: vec![ModifyColumnType {
1940                column_name: "field_0".to_string(),
1941                target_type: ConcreteDataType::int32_datatype(),
1942            }],
1943        };
1944        kind.validate(&metadata).unwrap();
1945        assert!(kind.need_alter(&metadata));
1946    }
1947
1948    #[test]
1949    fn test_validate_add_columns() {
1950        let kind = AlterKind::AddColumns {
1951            columns: vec![
1952                AddColumn {
1953                    column_metadata: ColumnMetadata {
1954                        column_schema: ColumnSchema::new(
1955                            "tag_1",
1956                            ConcreteDataType::string_datatype(),
1957                            true,
1958                        ),
1959                        semantic_type: SemanticType::Tag,
1960                        column_id: 5,
1961                    },
1962                    location: None,
1963                },
1964                AddColumn {
1965                    column_metadata: ColumnMetadata {
1966                        column_schema: ColumnSchema::new(
1967                            "field_2",
1968                            ConcreteDataType::string_datatype(),
1969                            true,
1970                        ),
1971                        semantic_type: SemanticType::Field,
1972                        column_id: 6,
1973                    },
1974                    location: None,
1975                },
1976            ],
1977        };
1978        let request = RegionAlterRequest { kind };
1979        let mut metadata = new_metadata();
1980        metadata.schema_version = 1;
1981        request.validate(&metadata).unwrap();
1982    }
1983
1984    #[test]
1985    fn test_validate_create_region() {
1986        let column_metadatas = vec![
1987            ColumnMetadata {
1988                column_schema: ColumnSchema::new(
1989                    "ts",
1990                    ConcreteDataType::timestamp_millisecond_datatype(),
1991                    false,
1992                ),
1993                semantic_type: SemanticType::Timestamp,
1994                column_id: 1,
1995            },
1996            ColumnMetadata {
1997                column_schema: ColumnSchema::new(
1998                    "tag_0",
1999                    ConcreteDataType::string_datatype(),
2000                    true,
2001                ),
2002                semantic_type: SemanticType::Tag,
2003                column_id: 2,
2004            },
2005            ColumnMetadata {
2006                column_schema: ColumnSchema::new(
2007                    "field_0",
2008                    ConcreteDataType::string_datatype(),
2009                    true,
2010                ),
2011                semantic_type: SemanticType::Field,
2012                column_id: 3,
2013            },
2014        ];
2015        let create = RegionCreateRequest {
2016            engine: "mito".to_string(),
2017            column_metadatas,
2018            primary_key: vec![3, 4],
2019            options: HashMap::new(),
2020            table_dir: "path".to_string(),
2021            path_type: PathType::Bare,
2022            partition_expr_json: Some("".to_string()),
2023        };
2024
2025        assert!(create.validate().is_err());
2026    }
2027
2028    #[test]
2029    fn test_validate_modify_column_fulltext_options() {
2030        let kind = AlterKind::SetIndexes {
2031            options: vec![SetIndexOption::Fulltext {
2032                column_name: "tag_0".to_string(),
2033                options: FulltextOptions::new_unchecked(
2034                    true,
2035                    FulltextAnalyzer::Chinese,
2036                    false,
2037                    FulltextBackend::Bloom,
2038                    1000,
2039                    0.01,
2040                ),
2041            }],
2042        };
2043        let request = RegionAlterRequest { kind };
2044        let mut metadata = new_metadata();
2045        metadata.schema_version = 1;
2046        request.validate(&metadata).unwrap();
2047
2048        let kind = AlterKind::UnsetIndexes {
2049            options: vec![UnsetIndexOption::Fulltext {
2050                column_name: "tag_0".to_string(),
2051            }],
2052        };
2053        let request = RegionAlterRequest { kind };
2054        let mut metadata = new_metadata();
2055        metadata.schema_version = 1;
2056        request.validate(&metadata).unwrap();
2057    }
2058
2059    #[test]
2060    fn test_validate_sync_columns() {
2061        let metadata = new_metadata();
2062        let kind = AlterKind::SyncColumns {
2063            column_metadatas: vec![
2064                ColumnMetadata {
2065                    column_schema: ColumnSchema::new(
2066                        "tag_1",
2067                        ConcreteDataType::string_datatype(),
2068                        true,
2069                    ),
2070                    semantic_type: SemanticType::Tag,
2071                    column_id: 5,
2072                },
2073                ColumnMetadata {
2074                    column_schema: ColumnSchema::new(
2075                        "field_2",
2076                        ConcreteDataType::string_datatype(),
2077                        true,
2078                    ),
2079                    semantic_type: SemanticType::Field,
2080                    column_id: 6,
2081                },
2082            ],
2083        };
2084        let err = kind.validate(&metadata).unwrap_err();
2085        assert!(err.to_string().contains("not a primary key"));
2086
2087        // Change the timestamp column name.
2088        let mut column_metadatas_with_different_ts_column = metadata.column_metadatas.clone();
2089        let ts_column = column_metadatas_with_different_ts_column
2090            .iter_mut()
2091            .find(|c| c.semantic_type == SemanticType::Timestamp)
2092            .unwrap();
2093        ts_column.column_schema.name = "ts1".to_string();
2094
2095        let kind = AlterKind::SyncColumns {
2096            column_metadatas: column_metadatas_with_different_ts_column,
2097        };
2098        let err = kind.validate(&metadata).unwrap_err();
2099        assert!(
2100            err.to_string()
2101                .contains("timestamp column ts has different id")
2102        );
2103
2104        // Change the primary key column name.
2105        let mut column_metadatas_with_different_pk_column = metadata.column_metadatas.clone();
2106        let pk_column = column_metadatas_with_different_pk_column
2107            .iter_mut()
2108            .find(|c| c.column_schema.name == "tag_0")
2109            .unwrap();
2110        pk_column.column_id = 100;
2111        let kind = AlterKind::SyncColumns {
2112            column_metadatas: column_metadatas_with_different_pk_column,
2113        };
2114        let err = kind.validate(&metadata).unwrap_err();
2115        assert!(
2116            err.to_string()
2117                .contains("column with same name tag_0 has different id")
2118        );
2119
2120        // Add a new field column.
2121        let mut column_metadatas_with_new_field_column = metadata.column_metadatas.clone();
2122        column_metadatas_with_new_field_column.push(ColumnMetadata {
2123            column_schema: ColumnSchema::new("field_2", ConcreteDataType::string_datatype(), true),
2124            semantic_type: SemanticType::Field,
2125            column_id: 4,
2126        });
2127        let kind = AlterKind::SyncColumns {
2128            column_metadatas: column_metadatas_with_new_field_column,
2129        };
2130        kind.validate(&metadata).unwrap();
2131    }
2132
2133    #[test]
2134    fn test_cast_path_type_to_primitive() {
2135        assert_eq!(PathType::Bare as u8, 0);
2136        assert_eq!(PathType::Data as u8, 1);
2137        assert_eq!(PathType::Metadata as u8, 2);
2138        assert_eq!(
2139            PathType::try_from(PathType::Bare as u8).unwrap(),
2140            PathType::Bare
2141        );
2142        assert_eq!(
2143            PathType::try_from(PathType::Data as u8).unwrap(),
2144            PathType::Data
2145        );
2146        assert_eq!(
2147            PathType::try_from(PathType::Metadata as u8).unwrap(),
2148            PathType::Metadata
2149        );
2150    }
2151}