store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
26    CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
27    DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
28};
29use api::v1::{
30    self, set_index, Analyzer, ArrowIpc, FulltextBackend as PbFulltextBackend, Option as PbOption,
31    Rows, SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::FlightDecoder;
35use common_recordbatch::DfRecordBatch;
36use common_time::TimeToLive;
37use datatypes::prelude::ConcreteDataType;
38use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
39use serde::{Deserialize, Serialize};
40use snafu::{ensure, OptionExt, ResultExt};
41use strum::{AsRefStr, IntoStaticStr};
42
43use crate::logstore::entry;
44use crate::metadata::{
45    ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidRawRegionRequestSnafu,
46    InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
47    InvalidUnsetRegionOptionRequestSnafu, MetadataError, RegionMetadata, Result, UnexpectedSnafu,
48};
49use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
50use crate::metrics;
51use crate::mito_engine_options::{
52    TTL_KEY, TWCS_MAX_OUTPUT_FILE_SIZE, TWCS_TIME_WINDOW, TWCS_TRIGGER_FILE_NUM,
53};
54use crate::path_utils::region_dir;
55use crate::storage::{ColumnId, RegionId, ScanRequest};
56
57#[derive(Debug, IntoStaticStr)]
58pub enum BatchRegionDdlRequest {
59    Create(Vec<(RegionId, RegionCreateRequest)>),
60    Drop(Vec<(RegionId, RegionDropRequest)>),
61    Alter(Vec<(RegionId, RegionAlterRequest)>),
62}
63
64impl BatchRegionDdlRequest {
65    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
66    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
67        match body {
68            region_request::Body::Creates(creates) => {
69                let requests = creates
70                    .requests
71                    .into_iter()
72                    .map(parse_region_create)
73                    .collect::<Result<Vec<_>>>()?;
74                Ok(Some(Self::Create(requests)))
75            }
76            region_request::Body::Drops(drops) => {
77                let requests = drops
78                    .requests
79                    .into_iter()
80                    .map(parse_region_drop)
81                    .collect::<Result<Vec<_>>>()?;
82                Ok(Some(Self::Drop(requests)))
83            }
84            region_request::Body::Alters(alters) => {
85                let requests = alters
86                    .requests
87                    .into_iter()
88                    .map(parse_region_alter)
89                    .collect::<Result<Vec<_>>>()?;
90                Ok(Some(Self::Alter(requests)))
91            }
92            _ => Ok(None),
93        }
94    }
95
96    pub fn request_type(&self) -> &'static str {
97        self.into()
98    }
99
100    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
101        match self {
102            Self::Create(requests) => requests
103                .into_iter()
104                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
105                .collect(),
106            Self::Drop(requests) => requests
107                .into_iter()
108                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
109                .collect(),
110            Self::Alter(requests) => requests
111                .into_iter()
112                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
113                .collect(),
114        }
115    }
116}
117
118#[derive(Debug, IntoStaticStr)]
119pub enum RegionRequest {
120    Put(RegionPutRequest),
121    Delete(RegionDeleteRequest),
122    Create(RegionCreateRequest),
123    Drop(RegionDropRequest),
124    Open(RegionOpenRequest),
125    Close(RegionCloseRequest),
126    Alter(RegionAlterRequest),
127    Flush(RegionFlushRequest),
128    Compact(RegionCompactRequest),
129    Truncate(RegionTruncateRequest),
130    Catchup(RegionCatchupRequest),
131    BulkInserts(RegionBulkInsertsRequest),
132}
133
134impl RegionRequest {
135    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
136    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
137    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
138        match body {
139            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
140            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
141            region_request::Body::Create(create) => make_region_create(create),
142            region_request::Body::Drop(drop) => make_region_drop(drop),
143            region_request::Body::Open(open) => make_region_open(open),
144            region_request::Body::Close(close) => make_region_close(close),
145            region_request::Body::Alter(alter) => make_region_alter(alter),
146            region_request::Body::Flush(flush) => make_region_flush(flush),
147            region_request::Body::Compact(compact) => make_region_compact(compact),
148            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
149            region_request::Body::Creates(creates) => make_region_creates(creates),
150            region_request::Body::Drops(drops) => make_region_drops(drops),
151            region_request::Body::Alters(alters) => make_region_alters(alters),
152            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
153            region_request::Body::Sync(_) => UnexpectedSnafu {
154                reason: "Sync request should be handled separately by RegionServer",
155            }
156            .fail(),
157        }
158    }
159
160    /// Returns the type name of the request.
161    pub fn request_type(&self) -> &'static str {
162        self.into()
163    }
164}
165
166fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
167    let requests = inserts
168        .requests
169        .into_iter()
170        .filter_map(|r| {
171            let region_id = r.region_id.into();
172            r.rows.map(|rows| {
173                (
174                    region_id,
175                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
176                )
177            })
178        })
179        .collect();
180    Ok(requests)
181}
182
183fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
184    let requests = deletes
185        .requests
186        .into_iter()
187        .filter_map(|r| {
188            let region_id = r.region_id.into();
189            r.rows.map(|rows| {
190                (
191                    region_id,
192                    RegionRequest::Delete(RegionDeleteRequest { rows }),
193                )
194            })
195        })
196        .collect();
197    Ok(requests)
198}
199
200fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
201    let column_metadatas = create
202        .column_defs
203        .into_iter()
204        .map(ColumnMetadata::try_from_column_def)
205        .collect::<Result<Vec<_>>>()?;
206    let region_id = create.region_id.into();
207    let region_dir = region_dir(&create.path, region_id);
208    Ok((
209        region_id,
210        RegionCreateRequest {
211            engine: create.engine,
212            column_metadatas,
213            primary_key: create.primary_key,
214            options: create.options,
215            region_dir,
216        },
217    ))
218}
219
220fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
221    let (region_id, request) = parse_region_create(create)?;
222    Ok(vec![(region_id, RegionRequest::Create(request))])
223}
224
225fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
226    let mut requests = Vec::with_capacity(creates.requests.len());
227    for create in creates.requests {
228        requests.extend(make_region_create(create)?);
229    }
230    Ok(requests)
231}
232
233fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
234    let region_id = drop.region_id.into();
235    Ok((
236        region_id,
237        RegionDropRequest {
238            fast_path: drop.fast_path,
239        },
240    ))
241}
242
243fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
244    let (region_id, request) = parse_region_drop(drop)?;
245    Ok(vec![(region_id, RegionRequest::Drop(request))])
246}
247
248fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
249    let mut requests = Vec::with_capacity(drops.requests.len());
250    for drop in drops.requests {
251        requests.extend(make_region_drop(drop)?);
252    }
253    Ok(requests)
254}
255
256fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
257    let region_id = open.region_id.into();
258    let region_dir = region_dir(&open.path, region_id);
259    Ok(vec![(
260        region_id,
261        RegionRequest::Open(RegionOpenRequest {
262            engine: open.engine,
263            region_dir,
264            options: open.options,
265            skip_wal_replay: false,
266        }),
267    )])
268}
269
270fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
271    let region_id = close.region_id.into();
272    Ok(vec![(
273        region_id,
274        RegionRequest::Close(RegionCloseRequest {}),
275    )])
276}
277
278fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
279    let region_id = alter.region_id.into();
280    let request = RegionAlterRequest::try_from(alter)?;
281    Ok((region_id, request))
282}
283
284fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
285    let (region_id, request) = parse_region_alter(alter)?;
286    Ok(vec![(region_id, RegionRequest::Alter(request))])
287}
288
289fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
290    let mut requests = Vec::with_capacity(alters.requests.len());
291    for alter in alters.requests {
292        requests.extend(make_region_alter(alter)?);
293    }
294    Ok(requests)
295}
296
297fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
298    let region_id = flush.region_id.into();
299    Ok(vec![(
300        region_id,
301        RegionRequest::Flush(RegionFlushRequest {
302            row_group_size: None,
303        }),
304    )])
305}
306
307fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
308    let region_id = compact.region_id.into();
309    let options = compact
310        .options
311        .unwrap_or(compact_request::Options::Regular(Default::default()));
312    Ok(vec![(
313        region_id,
314        RegionRequest::Compact(RegionCompactRequest { options }),
315    )])
316}
317
318fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
319    let region_id = truncate.region_id.into();
320    Ok(vec![(
321        region_id,
322        RegionRequest::Truncate(RegionTruncateRequest {}),
323    )])
324}
325
326/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
327fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
328    let region_id = request.region_id.into();
329    let Some(Body::ArrowIpc(request)) = request.body else {
330        return Ok(vec![]);
331    };
332
333    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
334        .with_label_values(&["decode"])
335        .start_timer();
336    let mut decoder =
337        FlightDecoder::try_from_schema_bytes(&request.schema).context(FlightCodecSnafu)?;
338    let payload = decoder
339        .try_decode_record_batch(&request.data_header, &request.payload)
340        .context(FlightCodecSnafu)?;
341    decoder_timer.observe_duration();
342    Ok(vec![(
343        region_id,
344        RegionRequest::BulkInserts(RegionBulkInsertsRequest {
345            region_id,
346            payload,
347            raw_data: request,
348        }),
349    )])
350}
351
352/// Request to put data into a region.
353#[derive(Debug)]
354pub struct RegionPutRequest {
355    /// Rows to put.
356    pub rows: Rows,
357    /// Write hint.
358    pub hint: Option<WriteHint>,
359}
360
361#[derive(Debug)]
362pub struct RegionReadRequest {
363    pub request: ScanRequest,
364}
365
366/// Request to delete data from a region.
367#[derive(Debug)]
368pub struct RegionDeleteRequest {
369    /// Keys to rows to delete.
370    ///
371    /// Each row only contains primary key columns and a time index column.
372    pub rows: Rows,
373}
374
375#[derive(Debug, Clone)]
376pub struct RegionCreateRequest {
377    /// Region engine name
378    pub engine: String,
379    /// Columns in this region.
380    pub column_metadatas: Vec<ColumnMetadata>,
381    /// Columns in the primary key.
382    pub primary_key: Vec<ColumnId>,
383    /// Options of the created region.
384    pub options: HashMap<String, String>,
385    /// Directory for region's data home. Usually is composed by catalog and table id
386    pub region_dir: String,
387}
388
389impl RegionCreateRequest {
390    /// Checks whether the request is valid, returns an error if it is invalid.
391    pub fn validate(&self) -> Result<()> {
392        // time index must exist
393        ensure!(
394            self.column_metadatas
395                .iter()
396                .any(|x| x.semantic_type == SemanticType::Timestamp),
397            InvalidRegionRequestSnafu {
398                region_id: RegionId::new(0, 0),
399                err: "missing timestamp column in create region request".to_string(),
400            }
401        );
402
403        // build column id to indices
404        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
405        for (i, c) in self.column_metadatas.iter().enumerate() {
406            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
407                return InvalidRegionRequestSnafu {
408                    region_id: RegionId::new(0, 0),
409                    err: format!(
410                        "duplicate column id {} (at position {} and {}) in create region request",
411                        c.column_id, previous, i
412                    ),
413                }
414                .fail();
415            }
416        }
417
418        // primary key must exist
419        for column_id in &self.primary_key {
420            ensure!(
421                column_id_to_indices.contains_key(column_id),
422                InvalidRegionRequestSnafu {
423                    region_id: RegionId::new(0, 0),
424                    err: format!(
425                        "missing primary key column {} in create region request",
426                        column_id
427                    ),
428                }
429            );
430        }
431
432        Ok(())
433    }
434
435    /// Returns true when the region belongs to the metric engine's physical table.
436    pub fn is_physical_table(&self) -> bool {
437        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
438    }
439}
440
441#[derive(Debug, Clone)]
442pub struct RegionDropRequest {
443    pub fast_path: bool,
444}
445
446/// Open region request.
447#[derive(Debug, Clone)]
448pub struct RegionOpenRequest {
449    /// Region engine name
450    pub engine: String,
451    /// Data directory of the region.
452    pub region_dir: String,
453    /// Options of the opened region.
454    pub options: HashMap<String, String>,
455    /// To skip replaying the WAL.
456    pub skip_wal_replay: bool,
457}
458
459impl RegionOpenRequest {
460    /// Returns true when the region belongs to the metric engine's physical table.
461    pub fn is_physical_table(&self) -> bool {
462        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
463    }
464}
465
466/// Close region request.
467#[derive(Debug)]
468pub struct RegionCloseRequest {}
469
470/// Alter metadata of a region.
471#[derive(Debug, PartialEq, Eq, Clone)]
472pub struct RegionAlterRequest {
473    /// Kind of alteration to do.
474    pub kind: AlterKind,
475}
476
477impl RegionAlterRequest {
478    /// Checks whether the request is valid, returns an error if it is invalid.
479    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
480        self.kind.validate(metadata)?;
481
482        Ok(())
483    }
484
485    /// Returns true if we need to apply the request to the region.
486    ///
487    /// The `request` should be valid.
488    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
489        debug_assert!(self.validate(metadata).is_ok());
490        self.kind.need_alter(metadata)
491    }
492}
493
494impl TryFrom<AlterRequest> for RegionAlterRequest {
495    type Error = MetadataError;
496
497    fn try_from(value: AlterRequest) -> Result<Self> {
498        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
499            err: "missing kind in AlterRequest",
500        })?;
501
502        let kind = AlterKind::try_from(kind)?;
503        Ok(RegionAlterRequest { kind })
504    }
505}
506
507/// Kind of the alteration.
508#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
509pub enum AlterKind {
510    /// Add columns to the region.
511    AddColumns {
512        /// Columns to add.
513        columns: Vec<AddColumn>,
514    },
515    /// Drop columns from the region, only fields are allowed to drop.
516    DropColumns {
517        /// Name of columns to drop.
518        names: Vec<String>,
519    },
520    /// Change columns datatype form the region, only fields are allowed to change.
521    ModifyColumnTypes {
522        /// Columns to change.
523        columns: Vec<ModifyColumnType>,
524    },
525    /// Set region options.
526    SetRegionOptions { options: Vec<SetRegionOption> },
527    /// Unset region options.
528    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
529    /// Set index options.
530    SetIndex { options: ApiSetIndexOptions },
531    /// Unset index options.
532    UnsetIndex { options: ApiUnsetIndexOptions },
533}
534
535#[derive(Debug, PartialEq, Eq, Clone)]
536pub enum ApiSetIndexOptions {
537    Fulltext {
538        column_name: String,
539        options: FulltextOptions,
540    },
541    Inverted {
542        column_name: String,
543    },
544    Skipping {
545        column_name: String,
546        options: SkippingIndexOptions,
547    },
548}
549
550impl ApiSetIndexOptions {
551    pub fn column_name(&self) -> &String {
552        match self {
553            ApiSetIndexOptions::Fulltext { column_name, .. } => column_name,
554            ApiSetIndexOptions::Inverted { column_name } => column_name,
555            ApiSetIndexOptions::Skipping { column_name, .. } => column_name,
556        }
557    }
558
559    pub fn is_fulltext(&self) -> bool {
560        match self {
561            ApiSetIndexOptions::Fulltext { .. } => true,
562            ApiSetIndexOptions::Inverted { .. } => false,
563            ApiSetIndexOptions::Skipping { .. } => false,
564        }
565    }
566}
567
568#[derive(Debug, PartialEq, Eq, Clone)]
569pub enum ApiUnsetIndexOptions {
570    Fulltext { column_name: String },
571    Inverted { column_name: String },
572    Skipping { column_name: String },
573}
574
575impl ApiUnsetIndexOptions {
576    pub fn column_name(&self) -> &String {
577        match self {
578            ApiUnsetIndexOptions::Fulltext { column_name } => column_name,
579            ApiUnsetIndexOptions::Inverted { column_name } => column_name,
580            ApiUnsetIndexOptions::Skipping { column_name } => column_name,
581        }
582    }
583
584    pub fn is_fulltext(&self) -> bool {
585        match self {
586            ApiUnsetIndexOptions::Fulltext { .. } => true,
587            ApiUnsetIndexOptions::Inverted { .. } => false,
588            ApiUnsetIndexOptions::Skipping { .. } => false,
589        }
590    }
591}
592
593impl AlterKind {
594    /// Returns an error if the alter kind is invalid.
595    ///
596    /// It allows adding column if not exists and dropping column if exists.
597    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
598        match self {
599            AlterKind::AddColumns { columns } => {
600                for col_to_add in columns {
601                    col_to_add.validate(metadata)?;
602                }
603            }
604            AlterKind::DropColumns { names } => {
605                for name in names {
606                    Self::validate_column_to_drop(name, metadata)?;
607                }
608            }
609            AlterKind::ModifyColumnTypes { columns } => {
610                for col_to_change in columns {
611                    col_to_change.validate(metadata)?;
612                }
613            }
614            AlterKind::SetRegionOptions { .. } => {}
615            AlterKind::UnsetRegionOptions { .. } => {}
616            AlterKind::SetIndex { options } => {
617                Self::validate_column_alter_index_option(
618                    options.column_name(),
619                    metadata,
620                    options.is_fulltext(),
621                )?;
622            }
623            AlterKind::UnsetIndex { options } => {
624                Self::validate_column_alter_index_option(
625                    options.column_name(),
626                    metadata,
627                    options.is_fulltext(),
628                )?;
629            }
630        }
631        Ok(())
632    }
633
634    /// Returns true if we need to apply the alteration to the region.
635    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
636        debug_assert!(self.validate(metadata).is_ok());
637        match self {
638            AlterKind::AddColumns { columns } => columns
639                .iter()
640                .any(|col_to_add| col_to_add.need_alter(metadata)),
641            AlterKind::DropColumns { names } => names
642                .iter()
643                .any(|name| metadata.column_by_name(name).is_some()),
644            AlterKind::ModifyColumnTypes { columns } => columns
645                .iter()
646                .any(|col_to_change| col_to_change.need_alter(metadata)),
647            AlterKind::SetRegionOptions { .. } => {
648                // we need to update region options for `ChangeTableOptions`.
649                // todo: we need to check if ttl has ever changed.
650                true
651            }
652            AlterKind::UnsetRegionOptions { .. } => true,
653            AlterKind::SetIndex { options, .. } => {
654                metadata.column_by_name(options.column_name()).is_some()
655            }
656            AlterKind::UnsetIndex { options } => {
657                metadata.column_by_name(options.column_name()).is_some()
658            }
659        }
660    }
661
662    /// Returns an error if the column to drop is invalid.
663    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
664        let Some(column) = metadata.column_by_name(name) else {
665            return Ok(());
666        };
667        ensure!(
668            column.semantic_type == SemanticType::Field,
669            InvalidRegionRequestSnafu {
670                region_id: metadata.region_id,
671                err: format!("column {} is not a field and could not be dropped", name),
672            }
673        );
674        Ok(())
675    }
676
677    /// Returns an error if the column's alter index option is invalid.
678    fn validate_column_alter_index_option(
679        column_name: &String,
680        metadata: &RegionMetadata,
681        is_fulltext: bool,
682    ) -> Result<()> {
683        let column = metadata
684            .column_by_name(column_name)
685            .context(InvalidRegionRequestSnafu {
686                region_id: metadata.region_id,
687                err: format!("column {} not found", column_name),
688            })?;
689
690        if is_fulltext {
691            ensure!(
692                column.column_schema.data_type.is_string(),
693                InvalidRegionRequestSnafu {
694                    region_id: metadata.region_id,
695                    err: format!(
696                        "cannot change alter index options for non-string column {}",
697                        column_name
698                    ),
699                }
700            );
701        }
702
703        Ok(())
704    }
705}
706
707impl TryFrom<alter_request::Kind> for AlterKind {
708    type Error = MetadataError;
709
710    fn try_from(kind: alter_request::Kind) -> Result<Self> {
711        let alter_kind = match kind {
712            alter_request::Kind::AddColumns(x) => {
713                let columns = x
714                    .add_columns
715                    .into_iter()
716                    .map(|x| x.try_into())
717                    .collect::<Result<Vec<_>>>()?;
718                AlterKind::AddColumns { columns }
719            }
720            alter_request::Kind::ModifyColumnTypes(x) => {
721                let columns = x
722                    .modify_column_types
723                    .into_iter()
724                    .map(|x| x.into())
725                    .collect::<Vec<_>>();
726                AlterKind::ModifyColumnTypes { columns }
727            }
728            alter_request::Kind::DropColumns(x) => {
729                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
730                AlterKind::DropColumns { names }
731            }
732            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
733                options: options
734                    .table_options
735                    .iter()
736                    .map(TryFrom::try_from)
737                    .collect::<Result<Vec<_>>>()?,
738            },
739            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
740                keys: options
741                    .keys
742                    .iter()
743                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
744                    .collect::<Result<Vec<_>>>()?,
745            },
746            alter_request::Kind::SetIndex(o) => match o.options.unwrap() {
747                set_index::Options::Fulltext(x) => AlterKind::SetIndex {
748                    options: ApiSetIndexOptions::Fulltext {
749                        column_name: x.column_name.clone(),
750                        options: FulltextOptions {
751                            enable: x.enable,
752                            analyzer: as_fulltext_option_analyzer(
753                                Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
754                            ),
755                            case_sensitive: x.case_sensitive,
756                            backend: as_fulltext_option_backend(
757                                PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
758                            ),
759                        },
760                    },
761                },
762                set_index::Options::Inverted(i) => AlterKind::SetIndex {
763                    options: ApiSetIndexOptions::Inverted {
764                        column_name: i.column_name,
765                    },
766                },
767                set_index::Options::Skipping(s) => AlterKind::SetIndex {
768                    options: ApiSetIndexOptions::Skipping {
769                        column_name: s.column_name,
770                        options: SkippingIndexOptions {
771                            index_type: as_skipping_index_type(
772                                PbSkippingIndexType::try_from(s.skipping_index_type)
773                                    .context(DecodeProtoSnafu)?,
774                            ),
775                            granularity: s.granularity as u32,
776                        },
777                    },
778                },
779            },
780            alter_request::Kind::UnsetIndex(o) => match o.options.unwrap() {
781                v1::unset_index::Options::Fulltext(f) => AlterKind::UnsetIndex {
782                    options: ApiUnsetIndexOptions::Fulltext {
783                        column_name: f.column_name,
784                    },
785                },
786                v1::unset_index::Options::Inverted(i) => AlterKind::UnsetIndex {
787                    options: ApiUnsetIndexOptions::Inverted {
788                        column_name: i.column_name,
789                    },
790                },
791                v1::unset_index::Options::Skipping(s) => AlterKind::UnsetIndex {
792                    options: ApiUnsetIndexOptions::Skipping {
793                        column_name: s.column_name,
794                    },
795                },
796            },
797        };
798
799        Ok(alter_kind)
800    }
801}
802
803/// Adds a column.
804#[derive(Debug, PartialEq, Eq, Clone)]
805pub struct AddColumn {
806    /// Metadata of the column to add.
807    pub column_metadata: ColumnMetadata,
808    /// Location to add the column. If location is None, the region adds
809    /// the column to the last.
810    pub location: Option<AddColumnLocation>,
811}
812
813impl AddColumn {
814    /// Returns an error if the column to add is invalid.
815    ///
816    /// It allows adding existing columns. However, the existing column must have the same metadata
817    /// and the location must be None.
818    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
819        ensure!(
820            self.column_metadata.column_schema.is_nullable()
821                || self
822                    .column_metadata
823                    .column_schema
824                    .default_constraint()
825                    .is_some(),
826            InvalidRegionRequestSnafu {
827                region_id: metadata.region_id,
828                err: format!(
829                    "no default value for column {}",
830                    self.column_metadata.column_schema.name
831                ),
832            }
833        );
834
835        if let Some(existing_column) =
836            metadata.column_by_name(&self.column_metadata.column_schema.name)
837        {
838            // If the column already exists.
839            ensure!(
840                *existing_column == self.column_metadata,
841                InvalidRegionRequestSnafu {
842                    region_id: metadata.region_id,
843                    err: format!(
844                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
845                        self.column_metadata.column_schema.name, existing_column, self.column_metadata,
846                    ),
847                }
848            );
849            ensure!(
850                self.location.is_none(),
851                InvalidRegionRequestSnafu {
852                    region_id: metadata.region_id,
853                    err: format!(
854                        "column {} already exists, but location is specified",
855                        self.column_metadata.column_schema.name
856                    ),
857                }
858            );
859        }
860
861        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
862            // Ensures the existing column has the same name.
863            ensure!(
864                existing_column.column_schema.name == self.column_metadata.column_schema.name,
865                InvalidRegionRequestSnafu {
866                    region_id: metadata.region_id,
867                    err: format!(
868                        "column id {} already exists with different name {}",
869                        self.column_metadata.column_id, existing_column.column_schema.name
870                    ),
871                }
872            );
873        }
874
875        Ok(())
876    }
877
878    /// Returns true if no column to add to the region.
879    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
880        debug_assert!(self.validate(metadata).is_ok());
881        metadata
882            .column_by_name(&self.column_metadata.column_schema.name)
883            .is_none()
884    }
885}
886
887impl TryFrom<v1::region::AddColumn> for AddColumn {
888    type Error = MetadataError;
889
890    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
891        let column_def = add_column
892            .column_def
893            .context(InvalidRawRegionRequestSnafu {
894                err: "missing column_def in AddColumn",
895            })?;
896
897        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
898        let location = add_column
899            .location
900            .map(AddColumnLocation::try_from)
901            .transpose()?;
902
903        Ok(AddColumn {
904            column_metadata,
905            location,
906        })
907    }
908}
909
910/// Location to add a column.
911#[derive(Debug, PartialEq, Eq, Clone)]
912pub enum AddColumnLocation {
913    /// Add the column to the first position of columns.
914    First,
915    /// Add the column after specific column.
916    After {
917        /// Add the column after this column.
918        column_name: String,
919    },
920}
921
922impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
923    type Error = MetadataError;
924
925    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
926        let location_type = LocationType::try_from(location.location_type)
927            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
928        let add_column_location = match location_type {
929            LocationType::First => AddColumnLocation::First,
930            LocationType::After => AddColumnLocation::After {
931                column_name: location.after_column_name,
932            },
933        };
934
935        Ok(add_column_location)
936    }
937}
938
939/// Change a column's datatype.
940#[derive(Debug, PartialEq, Eq, Clone)]
941pub struct ModifyColumnType {
942    /// Schema of the column to modify.
943    pub column_name: String,
944    /// Column will be changed to this type.
945    pub target_type: ConcreteDataType,
946}
947
948impl ModifyColumnType {
949    /// Returns an error if the column's datatype to change is invalid.
950    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
951        let column_meta = metadata
952            .column_by_name(&self.column_name)
953            .with_context(|| InvalidRegionRequestSnafu {
954                region_id: metadata.region_id,
955                err: format!("column {} not found", self.column_name),
956            })?;
957
958        ensure!(
959            matches!(column_meta.semantic_type, SemanticType::Field),
960            InvalidRegionRequestSnafu {
961                region_id: metadata.region_id,
962                err: "'timestamp' or 'tag' column cannot change type".to_string()
963            }
964        );
965        ensure!(
966            column_meta
967                .column_schema
968                .data_type
969                .can_arrow_type_cast_to(&self.target_type),
970            InvalidRegionRequestSnafu {
971                region_id: metadata.region_id,
972                err: format!(
973                    "column '{}' cannot be cast automatically to type '{}'",
974                    self.column_name, self.target_type
975                ),
976            }
977        );
978
979        Ok(())
980    }
981
982    /// Returns true if no column's datatype to change to the region.
983    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
984        debug_assert!(self.validate(metadata).is_ok());
985        metadata.column_by_name(&self.column_name).is_some()
986    }
987}
988
989impl From<v1::ModifyColumnType> for ModifyColumnType {
990    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
991        let target_type = ColumnDataTypeWrapper::new(
992            modify_column_type.target_type(),
993            modify_column_type.target_type_extension,
994        )
995        .into();
996
997        ModifyColumnType {
998            column_name: modify_column_type.column_name,
999            target_type,
1000        }
1001    }
1002}
1003
1004#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1005pub enum SetRegionOption {
1006    Ttl(Option<TimeToLive>),
1007    // Modifying TwscOptions with values as (option name, new value).
1008    Twsc(String, String),
1009}
1010
1011impl TryFrom<&PbOption> for SetRegionOption {
1012    type Error = MetadataError;
1013
1014    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1015        let PbOption { key, value } = value;
1016        match key.as_str() {
1017            TTL_KEY => {
1018                let ttl = TimeToLive::from_humantime_or_str(value)
1019                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1020
1021                Ok(Self::Ttl(Some(ttl)))
1022            }
1023            TWCS_TRIGGER_FILE_NUM | TWCS_MAX_OUTPUT_FILE_SIZE | TWCS_TIME_WINDOW => {
1024                Ok(Self::Twsc(key.to_string(), value.to_string()))
1025            }
1026            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1027        }
1028    }
1029}
1030
1031impl From<&UnsetRegionOption> for SetRegionOption {
1032    fn from(unset_option: &UnsetRegionOption) -> Self {
1033        match unset_option {
1034            UnsetRegionOption::TwcsTriggerFileNum => {
1035                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1036            }
1037            UnsetRegionOption::TwcsMaxOutputFileSize => {
1038                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1039            }
1040            UnsetRegionOption::TwcsTimeWindow => {
1041                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1042            }
1043            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1044        }
1045    }
1046}
1047
1048impl TryFrom<&str> for UnsetRegionOption {
1049    type Error = MetadataError;
1050
1051    fn try_from(key: &str) -> Result<Self> {
1052        match key.to_ascii_lowercase().as_str() {
1053            TTL_KEY => Ok(Self::Ttl),
1054            TWCS_TRIGGER_FILE_NUM => Ok(Self::TwcsTriggerFileNum),
1055            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1056            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1057            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1058        }
1059    }
1060}
1061
1062#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1063pub enum UnsetRegionOption {
1064    TwcsTriggerFileNum,
1065    TwcsMaxOutputFileSize,
1066    TwcsTimeWindow,
1067    Ttl,
1068}
1069
1070impl UnsetRegionOption {
1071    pub fn as_str(&self) -> &str {
1072        match self {
1073            Self::Ttl => TTL_KEY,
1074            Self::TwcsTriggerFileNum => TWCS_TRIGGER_FILE_NUM,
1075            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1076            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1077        }
1078    }
1079}
1080
1081impl Display for UnsetRegionOption {
1082    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1083        write!(f, "{}", self.as_str())
1084    }
1085}
1086
1087#[derive(Debug, Clone, Default)]
1088pub struct RegionFlushRequest {
1089    pub row_group_size: Option<usize>,
1090}
1091
1092#[derive(Debug)]
1093pub struct RegionCompactRequest {
1094    pub options: compact_request::Options,
1095}
1096
1097impl Default for RegionCompactRequest {
1098    fn default() -> Self {
1099        Self {
1100            // Default to regular compaction.
1101            options: compact_request::Options::Regular(Default::default()),
1102        }
1103    }
1104}
1105
1106/// Truncate region request.
1107#[derive(Debug)]
1108pub struct RegionTruncateRequest {}
1109
1110/// Catchup region request.
1111///
1112/// Makes a readonly region to catch up to leader region changes.
1113/// There is no effect if it operating on a leader region.
1114#[derive(Debug, Clone, Copy)]
1115pub struct RegionCatchupRequest {
1116    /// Sets it to writable if it's available after it has caught up with all changes.
1117    pub set_writable: bool,
1118    /// The `entry_id` that was expected to reply to.
1119    /// `None` stands replaying to latest.
1120    pub entry_id: Option<entry::Id>,
1121    /// Used for metrics metadata region.
1122    /// The `entry_id` that was expected to reply to.
1123    /// `None` stands replaying to latest.
1124    pub metadata_entry_id: Option<entry::Id>,
1125    /// The hint for replaying memtable.
1126    pub location_id: Option<u64>,
1127}
1128
1129/// Get sequences of regions by region ids.
1130#[derive(Debug, Clone)]
1131pub struct RegionSequencesRequest {
1132    pub region_ids: Vec<RegionId>,
1133}
1134
1135#[derive(Debug, Clone)]
1136pub struct RegionBulkInsertsRequest {
1137    pub region_id: RegionId,
1138    pub payload: DfRecordBatch,
1139    pub raw_data: ArrowIpc,
1140}
1141
1142impl RegionBulkInsertsRequest {
1143    pub fn estimated_size(&self) -> usize {
1144        self.payload.get_array_memory_size()
1145    }
1146}
1147
1148impl fmt::Display for RegionRequest {
1149    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1150        match self {
1151            RegionRequest::Put(_) => write!(f, "Put"),
1152            RegionRequest::Delete(_) => write!(f, "Delete"),
1153            RegionRequest::Create(_) => write!(f, "Create"),
1154            RegionRequest::Drop(_) => write!(f, "Drop"),
1155            RegionRequest::Open(_) => write!(f, "Open"),
1156            RegionRequest::Close(_) => write!(f, "Close"),
1157            RegionRequest::Alter(_) => write!(f, "Alter"),
1158            RegionRequest::Flush(_) => write!(f, "Flush"),
1159            RegionRequest::Compact(_) => write!(f, "Compact"),
1160            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1161            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1162            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1163        }
1164    }
1165}
1166
1167#[cfg(test)]
1168mod tests {
1169    use api::v1::region::RegionColumnDef;
1170    use api::v1::{ColumnDataType, ColumnDef};
1171    use datatypes::prelude::ConcreteDataType;
1172    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1173
1174    use super::*;
1175    use crate::metadata::RegionMetadataBuilder;
1176
1177    #[test]
1178    fn test_from_proto_location() {
1179        let proto_location = v1::AddColumnLocation {
1180            location_type: LocationType::First as i32,
1181            after_column_name: String::default(),
1182        };
1183        let location = AddColumnLocation::try_from(proto_location).unwrap();
1184        assert_eq!(location, AddColumnLocation::First);
1185
1186        let proto_location = v1::AddColumnLocation {
1187            location_type: 10,
1188            after_column_name: String::default(),
1189        };
1190        AddColumnLocation::try_from(proto_location).unwrap_err();
1191
1192        let proto_location = v1::AddColumnLocation {
1193            location_type: LocationType::After as i32,
1194            after_column_name: "a".to_string(),
1195        };
1196        let location = AddColumnLocation::try_from(proto_location).unwrap();
1197        assert_eq!(
1198            location,
1199            AddColumnLocation::After {
1200                column_name: "a".to_string()
1201            }
1202        );
1203    }
1204
1205    #[test]
1206    fn test_from_none_proto_add_column() {
1207        AddColumn::try_from(v1::region::AddColumn {
1208            column_def: None,
1209            location: None,
1210        })
1211        .unwrap_err();
1212    }
1213
1214    #[test]
1215    fn test_from_proto_alter_request() {
1216        RegionAlterRequest::try_from(AlterRequest {
1217            region_id: 0,
1218            schema_version: 1,
1219            kind: None,
1220        })
1221        .unwrap_err();
1222
1223        let request = RegionAlterRequest::try_from(AlterRequest {
1224            region_id: 0,
1225            schema_version: 1,
1226            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1227                add_columns: vec![v1::region::AddColumn {
1228                    column_def: Some(RegionColumnDef {
1229                        column_def: Some(ColumnDef {
1230                            name: "a".to_string(),
1231                            data_type: ColumnDataType::String as i32,
1232                            is_nullable: true,
1233                            default_constraint: vec![],
1234                            semantic_type: SemanticType::Field as i32,
1235                            comment: String::new(),
1236                            ..Default::default()
1237                        }),
1238                        column_id: 1,
1239                    }),
1240                    location: Some(v1::AddColumnLocation {
1241                        location_type: LocationType::First as i32,
1242                        after_column_name: String::default(),
1243                    }),
1244                }],
1245            })),
1246        })
1247        .unwrap();
1248
1249        assert_eq!(
1250            request,
1251            RegionAlterRequest {
1252                kind: AlterKind::AddColumns {
1253                    columns: vec![AddColumn {
1254                        column_metadata: ColumnMetadata {
1255                            column_schema: ColumnSchema::new(
1256                                "a",
1257                                ConcreteDataType::string_datatype(),
1258                                true,
1259                            ),
1260                            semantic_type: SemanticType::Field,
1261                            column_id: 1,
1262                        },
1263                        location: Some(AddColumnLocation::First),
1264                    }]
1265                },
1266            }
1267        );
1268    }
1269
1270    /// Returns a new region metadata for testing. Metadata:
1271    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1272    fn new_metadata() -> RegionMetadata {
1273        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1274        builder
1275            .push_column_metadata(ColumnMetadata {
1276                column_schema: ColumnSchema::new(
1277                    "ts",
1278                    ConcreteDataType::timestamp_millisecond_datatype(),
1279                    false,
1280                ),
1281                semantic_type: SemanticType::Timestamp,
1282                column_id: 1,
1283            })
1284            .push_column_metadata(ColumnMetadata {
1285                column_schema: ColumnSchema::new(
1286                    "tag_0",
1287                    ConcreteDataType::string_datatype(),
1288                    true,
1289                ),
1290                semantic_type: SemanticType::Tag,
1291                column_id: 2,
1292            })
1293            .push_column_metadata(ColumnMetadata {
1294                column_schema: ColumnSchema::new(
1295                    "field_0",
1296                    ConcreteDataType::string_datatype(),
1297                    true,
1298                ),
1299                semantic_type: SemanticType::Field,
1300                column_id: 3,
1301            })
1302            .push_column_metadata(ColumnMetadata {
1303                column_schema: ColumnSchema::new(
1304                    "field_1",
1305                    ConcreteDataType::boolean_datatype(),
1306                    true,
1307                ),
1308                semantic_type: SemanticType::Field,
1309                column_id: 4,
1310            })
1311            .primary_key(vec![2]);
1312        builder.build().unwrap()
1313    }
1314
1315    #[test]
1316    fn test_add_column_validate() {
1317        let metadata = new_metadata();
1318        let add_column = AddColumn {
1319            column_metadata: ColumnMetadata {
1320                column_schema: ColumnSchema::new(
1321                    "tag_1",
1322                    ConcreteDataType::string_datatype(),
1323                    true,
1324                ),
1325                semantic_type: SemanticType::Tag,
1326                column_id: 5,
1327            },
1328            location: None,
1329        };
1330        add_column.validate(&metadata).unwrap();
1331        assert!(add_column.need_alter(&metadata));
1332
1333        // Add not null column.
1334        AddColumn {
1335            column_metadata: ColumnMetadata {
1336                column_schema: ColumnSchema::new(
1337                    "tag_1",
1338                    ConcreteDataType::string_datatype(),
1339                    false,
1340                ),
1341                semantic_type: SemanticType::Tag,
1342                column_id: 5,
1343            },
1344            location: None,
1345        }
1346        .validate(&metadata)
1347        .unwrap_err();
1348
1349        // Add existing column.
1350        let add_column = AddColumn {
1351            column_metadata: ColumnMetadata {
1352                column_schema: ColumnSchema::new(
1353                    "tag_0",
1354                    ConcreteDataType::string_datatype(),
1355                    true,
1356                ),
1357                semantic_type: SemanticType::Tag,
1358                column_id: 2,
1359            },
1360            location: None,
1361        };
1362        add_column.validate(&metadata).unwrap();
1363        assert!(!add_column.need_alter(&metadata));
1364    }
1365
1366    #[test]
1367    fn test_add_duplicate_columns() {
1368        let kind = AlterKind::AddColumns {
1369            columns: vec![
1370                AddColumn {
1371                    column_metadata: ColumnMetadata {
1372                        column_schema: ColumnSchema::new(
1373                            "tag_1",
1374                            ConcreteDataType::string_datatype(),
1375                            true,
1376                        ),
1377                        semantic_type: SemanticType::Tag,
1378                        column_id: 5,
1379                    },
1380                    location: None,
1381                },
1382                AddColumn {
1383                    column_metadata: ColumnMetadata {
1384                        column_schema: ColumnSchema::new(
1385                            "tag_1",
1386                            ConcreteDataType::string_datatype(),
1387                            true,
1388                        ),
1389                        semantic_type: SemanticType::Field,
1390                        column_id: 6,
1391                    },
1392                    location: None,
1393                },
1394            ],
1395        };
1396        let metadata = new_metadata();
1397        kind.validate(&metadata).unwrap();
1398        assert!(kind.need_alter(&metadata));
1399    }
1400
1401    #[test]
1402    fn test_add_existing_column_different_metadata() {
1403        let metadata = new_metadata();
1404
1405        // Add existing column with different id.
1406        let kind = AlterKind::AddColumns {
1407            columns: vec![AddColumn {
1408                column_metadata: ColumnMetadata {
1409                    column_schema: ColumnSchema::new(
1410                        "tag_0",
1411                        ConcreteDataType::string_datatype(),
1412                        true,
1413                    ),
1414                    semantic_type: SemanticType::Tag,
1415                    column_id: 4,
1416                },
1417                location: None,
1418            }],
1419        };
1420        kind.validate(&metadata).unwrap_err();
1421
1422        // Add existing column with different type.
1423        let kind = AlterKind::AddColumns {
1424            columns: vec![AddColumn {
1425                column_metadata: ColumnMetadata {
1426                    column_schema: ColumnSchema::new(
1427                        "tag_0",
1428                        ConcreteDataType::int64_datatype(),
1429                        true,
1430                    ),
1431                    semantic_type: SemanticType::Tag,
1432                    column_id: 2,
1433                },
1434                location: None,
1435            }],
1436        };
1437        kind.validate(&metadata).unwrap_err();
1438
1439        // Add existing column with different name.
1440        let kind = AlterKind::AddColumns {
1441            columns: vec![AddColumn {
1442                column_metadata: ColumnMetadata {
1443                    column_schema: ColumnSchema::new(
1444                        "tag_1",
1445                        ConcreteDataType::string_datatype(),
1446                        true,
1447                    ),
1448                    semantic_type: SemanticType::Tag,
1449                    column_id: 2,
1450                },
1451                location: None,
1452            }],
1453        };
1454        kind.validate(&metadata).unwrap_err();
1455    }
1456
1457    #[test]
1458    fn test_add_existing_column_with_location() {
1459        let metadata = new_metadata();
1460        let kind = AlterKind::AddColumns {
1461            columns: vec![AddColumn {
1462                column_metadata: ColumnMetadata {
1463                    column_schema: ColumnSchema::new(
1464                        "tag_0",
1465                        ConcreteDataType::string_datatype(),
1466                        true,
1467                    ),
1468                    semantic_type: SemanticType::Tag,
1469                    column_id: 2,
1470                },
1471                location: Some(AddColumnLocation::First),
1472            }],
1473        };
1474        kind.validate(&metadata).unwrap_err();
1475    }
1476
1477    #[test]
1478    fn test_validate_drop_column() {
1479        let metadata = new_metadata();
1480        let kind = AlterKind::DropColumns {
1481            names: vec!["xxxx".to_string()],
1482        };
1483        kind.validate(&metadata).unwrap();
1484        assert!(!kind.need_alter(&metadata));
1485
1486        AlterKind::DropColumns {
1487            names: vec!["tag_0".to_string()],
1488        }
1489        .validate(&metadata)
1490        .unwrap_err();
1491
1492        let kind = AlterKind::DropColumns {
1493            names: vec!["field_0".to_string()],
1494        };
1495        kind.validate(&metadata).unwrap();
1496        assert!(kind.need_alter(&metadata));
1497    }
1498
1499    #[test]
1500    fn test_validate_modify_column_type() {
1501        let metadata = new_metadata();
1502        AlterKind::ModifyColumnTypes {
1503            columns: vec![ModifyColumnType {
1504                column_name: "xxxx".to_string(),
1505                target_type: ConcreteDataType::string_datatype(),
1506            }],
1507        }
1508        .validate(&metadata)
1509        .unwrap_err();
1510
1511        AlterKind::ModifyColumnTypes {
1512            columns: vec![ModifyColumnType {
1513                column_name: "field_1".to_string(),
1514                target_type: ConcreteDataType::date_datatype(),
1515            }],
1516        }
1517        .validate(&metadata)
1518        .unwrap_err();
1519
1520        AlterKind::ModifyColumnTypes {
1521            columns: vec![ModifyColumnType {
1522                column_name: "ts".to_string(),
1523                target_type: ConcreteDataType::date_datatype(),
1524            }],
1525        }
1526        .validate(&metadata)
1527        .unwrap_err();
1528
1529        AlterKind::ModifyColumnTypes {
1530            columns: vec![ModifyColumnType {
1531                column_name: "tag_0".to_string(),
1532                target_type: ConcreteDataType::date_datatype(),
1533            }],
1534        }
1535        .validate(&metadata)
1536        .unwrap_err();
1537
1538        let kind = AlterKind::ModifyColumnTypes {
1539            columns: vec![ModifyColumnType {
1540                column_name: "field_0".to_string(),
1541                target_type: ConcreteDataType::int32_datatype(),
1542            }],
1543        };
1544        kind.validate(&metadata).unwrap();
1545        assert!(kind.need_alter(&metadata));
1546    }
1547
1548    #[test]
1549    fn test_validate_add_columns() {
1550        let kind = AlterKind::AddColumns {
1551            columns: vec![
1552                AddColumn {
1553                    column_metadata: ColumnMetadata {
1554                        column_schema: ColumnSchema::new(
1555                            "tag_1",
1556                            ConcreteDataType::string_datatype(),
1557                            true,
1558                        ),
1559                        semantic_type: SemanticType::Tag,
1560                        column_id: 5,
1561                    },
1562                    location: None,
1563                },
1564                AddColumn {
1565                    column_metadata: ColumnMetadata {
1566                        column_schema: ColumnSchema::new(
1567                            "field_2",
1568                            ConcreteDataType::string_datatype(),
1569                            true,
1570                        ),
1571                        semantic_type: SemanticType::Field,
1572                        column_id: 6,
1573                    },
1574                    location: None,
1575                },
1576            ],
1577        };
1578        let request = RegionAlterRequest { kind };
1579        let mut metadata = new_metadata();
1580        metadata.schema_version = 1;
1581        request.validate(&metadata).unwrap();
1582    }
1583
1584    #[test]
1585    fn test_validate_create_region() {
1586        let column_metadatas = vec![
1587            ColumnMetadata {
1588                column_schema: ColumnSchema::new(
1589                    "ts",
1590                    ConcreteDataType::timestamp_millisecond_datatype(),
1591                    false,
1592                ),
1593                semantic_type: SemanticType::Timestamp,
1594                column_id: 1,
1595            },
1596            ColumnMetadata {
1597                column_schema: ColumnSchema::new(
1598                    "tag_0",
1599                    ConcreteDataType::string_datatype(),
1600                    true,
1601                ),
1602                semantic_type: SemanticType::Tag,
1603                column_id: 2,
1604            },
1605            ColumnMetadata {
1606                column_schema: ColumnSchema::new(
1607                    "field_0",
1608                    ConcreteDataType::string_datatype(),
1609                    true,
1610                ),
1611                semantic_type: SemanticType::Field,
1612                column_id: 3,
1613            },
1614        ];
1615        let create = RegionCreateRequest {
1616            engine: "mito".to_string(),
1617            column_metadatas,
1618            primary_key: vec![3, 4],
1619            options: HashMap::new(),
1620            region_dir: "path".to_string(),
1621        };
1622
1623        assert!(create.validate().is_err());
1624    }
1625
1626    #[test]
1627    fn test_validate_modify_column_fulltext_options() {
1628        let kind = AlterKind::SetIndex {
1629            options: ApiSetIndexOptions::Fulltext {
1630                column_name: "tag_0".to_string(),
1631                options: FulltextOptions {
1632                    enable: true,
1633                    analyzer: FulltextAnalyzer::Chinese,
1634                    case_sensitive: false,
1635                    backend: FulltextBackend::Bloom,
1636                },
1637            },
1638        };
1639        let request = RegionAlterRequest { kind };
1640        let mut metadata = new_metadata();
1641        metadata.schema_version = 1;
1642        request.validate(&metadata).unwrap();
1643
1644        let kind = AlterKind::UnsetIndex {
1645            options: ApiUnsetIndexOptions::Fulltext {
1646                column_name: "tag_0".to_string(),
1647            },
1648        };
1649        let request = RegionAlterRequest { kind };
1650        let mut metadata = new_metadata();
1651        metadata.schema_version = 1;
1652        request.validate(&metadata).unwrap();
1653    }
1654}