store_api/
region_request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::HashMap;
16use std::fmt::{self, Display};
17
18use api::helper::ColumnDataTypeWrapper;
19use api::v1::add_column_location::LocationType;
20use api::v1::column_def::{
21    as_fulltext_option_analyzer, as_fulltext_option_backend, as_skipping_index_type,
22};
23use api::v1::region::bulk_insert_request::Body;
24use api::v1::region::{
25    alter_request, compact_request, region_request, AlterRequest, AlterRequests, BulkInsertRequest,
26    CloseRequest, CompactRequest, CreateRequest, CreateRequests, DeleteRequests, DropRequest,
27    DropRequests, FlushRequest, InsertRequests, OpenRequest, TruncateRequest,
28};
29use api::v1::{
30    self, set_index, Analyzer, FulltextBackend as PbFulltextBackend, Option as PbOption, Rows,
31    SemanticType, SkippingIndexType as PbSkippingIndexType, WriteHint,
32};
33pub use common_base::AffectedRows;
34use common_grpc::flight::{FlightDecoder, FlightMessage};
35use common_grpc::FlightData;
36use common_recordbatch::DfRecordBatch;
37use common_time::TimeToLive;
38use datatypes::prelude::ConcreteDataType;
39use datatypes::schema::{FulltextOptions, SkippingIndexOptions};
40use prost::Message;
41use serde::{Deserialize, Serialize};
42use snafu::{ensure, OptionExt, ResultExt};
43use strum::{AsRefStr, IntoStaticStr};
44
45use crate::logstore::entry;
46use crate::metadata::{
47    ColumnMetadata, DecodeProtoSnafu, FlightCodecSnafu, InvalidRawRegionRequestSnafu,
48    InvalidRegionRequestSnafu, InvalidSetRegionOptionRequestSnafu,
49    InvalidUnsetRegionOptionRequestSnafu, MetadataError, ProstSnafu, RegionMetadata, Result,
50    UnexpectedSnafu,
51};
52use crate::metric_engine_consts::PHYSICAL_TABLE_METADATA_KEY;
53use crate::metrics;
54use crate::mito_engine_options::{
55    TTL_KEY, TWCS_MAX_ACTIVE_WINDOW_FILES, TWCS_MAX_ACTIVE_WINDOW_RUNS,
56    TWCS_MAX_INACTIVE_WINDOW_FILES, TWCS_MAX_INACTIVE_WINDOW_RUNS, TWCS_MAX_OUTPUT_FILE_SIZE,
57    TWCS_TIME_WINDOW,
58};
59use crate::path_utils::region_dir;
60use crate::storage::{ColumnId, RegionId, ScanRequest};
61
62#[derive(Debug, IntoStaticStr)]
63pub enum BatchRegionDdlRequest {
64    Create(Vec<(RegionId, RegionCreateRequest)>),
65    Drop(Vec<(RegionId, RegionDropRequest)>),
66    Alter(Vec<(RegionId, RegionAlterRequest)>),
67}
68
69impl BatchRegionDdlRequest {
70    /// Converts [Body](region_request::Body) to [`BatchRegionDdlRequest`].
71    pub fn try_from_request_body(body: region_request::Body) -> Result<Option<Self>> {
72        match body {
73            region_request::Body::Creates(creates) => {
74                let requests = creates
75                    .requests
76                    .into_iter()
77                    .map(parse_region_create)
78                    .collect::<Result<Vec<_>>>()?;
79                Ok(Some(Self::Create(requests)))
80            }
81            region_request::Body::Drops(drops) => {
82                let requests = drops
83                    .requests
84                    .into_iter()
85                    .map(parse_region_drop)
86                    .collect::<Result<Vec<_>>>()?;
87                Ok(Some(Self::Drop(requests)))
88            }
89            region_request::Body::Alters(alters) => {
90                let requests = alters
91                    .requests
92                    .into_iter()
93                    .map(parse_region_alter)
94                    .collect::<Result<Vec<_>>>()?;
95                Ok(Some(Self::Alter(requests)))
96            }
97            _ => Ok(None),
98        }
99    }
100
101    pub fn request_type(&self) -> &'static str {
102        self.into()
103    }
104
105    pub fn into_region_requests(self) -> Vec<(RegionId, RegionRequest)> {
106        match self {
107            Self::Create(requests) => requests
108                .into_iter()
109                .map(|(region_id, request)| (region_id, RegionRequest::Create(request)))
110                .collect(),
111            Self::Drop(requests) => requests
112                .into_iter()
113                .map(|(region_id, request)| (region_id, RegionRequest::Drop(request)))
114                .collect(),
115            Self::Alter(requests) => requests
116                .into_iter()
117                .map(|(region_id, request)| (region_id, RegionRequest::Alter(request)))
118                .collect(),
119        }
120    }
121}
122
123#[derive(Debug, IntoStaticStr)]
124pub enum RegionRequest {
125    Put(RegionPutRequest),
126    Delete(RegionDeleteRequest),
127    Create(RegionCreateRequest),
128    Drop(RegionDropRequest),
129    Open(RegionOpenRequest),
130    Close(RegionCloseRequest),
131    Alter(RegionAlterRequest),
132    Flush(RegionFlushRequest),
133    Compact(RegionCompactRequest),
134    Truncate(RegionTruncateRequest),
135    Catchup(RegionCatchupRequest),
136    BulkInserts(RegionBulkInsertsRequest),
137}
138
139impl RegionRequest {
140    /// Convert [Body](region_request::Body) to a group of [RegionRequest] with region id.
141    /// Inserts/Deletes request might become multiple requests. Others are one-to-one.
142    pub fn try_from_request_body(body: region_request::Body) -> Result<Vec<(RegionId, Self)>> {
143        match body {
144            region_request::Body::Inserts(inserts) => make_region_puts(inserts),
145            region_request::Body::Deletes(deletes) => make_region_deletes(deletes),
146            region_request::Body::Create(create) => make_region_create(create),
147            region_request::Body::Drop(drop) => make_region_drop(drop),
148            region_request::Body::Open(open) => make_region_open(open),
149            region_request::Body::Close(close) => make_region_close(close),
150            region_request::Body::Alter(alter) => make_region_alter(alter),
151            region_request::Body::Flush(flush) => make_region_flush(flush),
152            region_request::Body::Compact(compact) => make_region_compact(compact),
153            region_request::Body::Truncate(truncate) => make_region_truncate(truncate),
154            region_request::Body::Creates(creates) => make_region_creates(creates),
155            region_request::Body::Drops(drops) => make_region_drops(drops),
156            region_request::Body::Alters(alters) => make_region_alters(alters),
157            region_request::Body::BulkInsert(bulk) => make_region_bulk_inserts(bulk),
158            region_request::Body::Sync(_) => UnexpectedSnafu {
159                reason: "Sync request should be handled separately by RegionServer",
160            }
161            .fail(),
162        }
163    }
164
165    /// Returns the type name of the request.
166    pub fn request_type(&self) -> &'static str {
167        self.into()
168    }
169}
170
171fn make_region_puts(inserts: InsertRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
172    let requests = inserts
173        .requests
174        .into_iter()
175        .filter_map(|r| {
176            let region_id = r.region_id.into();
177            r.rows.map(|rows| {
178                (
179                    region_id,
180                    RegionRequest::Put(RegionPutRequest { rows, hint: None }),
181                )
182            })
183        })
184        .collect();
185    Ok(requests)
186}
187
188fn make_region_deletes(deletes: DeleteRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
189    let requests = deletes
190        .requests
191        .into_iter()
192        .filter_map(|r| {
193            let region_id = r.region_id.into();
194            r.rows.map(|rows| {
195                (
196                    region_id,
197                    RegionRequest::Delete(RegionDeleteRequest { rows }),
198                )
199            })
200        })
201        .collect();
202    Ok(requests)
203}
204
205fn parse_region_create(create: CreateRequest) -> Result<(RegionId, RegionCreateRequest)> {
206    let column_metadatas = create
207        .column_defs
208        .into_iter()
209        .map(ColumnMetadata::try_from_column_def)
210        .collect::<Result<Vec<_>>>()?;
211    let region_id = create.region_id.into();
212    let region_dir = region_dir(&create.path, region_id);
213    Ok((
214        region_id,
215        RegionCreateRequest {
216            engine: create.engine,
217            column_metadatas,
218            primary_key: create.primary_key,
219            options: create.options,
220            region_dir,
221        },
222    ))
223}
224
225fn make_region_create(create: CreateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
226    let (region_id, request) = parse_region_create(create)?;
227    Ok(vec![(region_id, RegionRequest::Create(request))])
228}
229
230fn make_region_creates(creates: CreateRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
231    let mut requests = Vec::with_capacity(creates.requests.len());
232    for create in creates.requests {
233        requests.extend(make_region_create(create)?);
234    }
235    Ok(requests)
236}
237
238fn parse_region_drop(drop: DropRequest) -> Result<(RegionId, RegionDropRequest)> {
239    let region_id = drop.region_id.into();
240    Ok((
241        region_id,
242        RegionDropRequest {
243            fast_path: drop.fast_path,
244        },
245    ))
246}
247
248fn make_region_drop(drop: DropRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
249    let (region_id, request) = parse_region_drop(drop)?;
250    Ok(vec![(region_id, RegionRequest::Drop(request))])
251}
252
253fn make_region_drops(drops: DropRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
254    let mut requests = Vec::with_capacity(drops.requests.len());
255    for drop in drops.requests {
256        requests.extend(make_region_drop(drop)?);
257    }
258    Ok(requests)
259}
260
261fn make_region_open(open: OpenRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
262    let region_id = open.region_id.into();
263    let region_dir = region_dir(&open.path, region_id);
264    Ok(vec![(
265        region_id,
266        RegionRequest::Open(RegionOpenRequest {
267            engine: open.engine,
268            region_dir,
269            options: open.options,
270            skip_wal_replay: false,
271        }),
272    )])
273}
274
275fn make_region_close(close: CloseRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
276    let region_id = close.region_id.into();
277    Ok(vec![(
278        region_id,
279        RegionRequest::Close(RegionCloseRequest {}),
280    )])
281}
282
283fn parse_region_alter(alter: AlterRequest) -> Result<(RegionId, RegionAlterRequest)> {
284    let region_id = alter.region_id.into();
285    let request = RegionAlterRequest::try_from(alter)?;
286    Ok((region_id, request))
287}
288
289fn make_region_alter(alter: AlterRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
290    let (region_id, request) = parse_region_alter(alter)?;
291    Ok(vec![(region_id, RegionRequest::Alter(request))])
292}
293
294fn make_region_alters(alters: AlterRequests) -> Result<Vec<(RegionId, RegionRequest)>> {
295    let mut requests = Vec::with_capacity(alters.requests.len());
296    for alter in alters.requests {
297        requests.extend(make_region_alter(alter)?);
298    }
299    Ok(requests)
300}
301
302fn make_region_flush(flush: FlushRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
303    let region_id = flush.region_id.into();
304    Ok(vec![(
305        region_id,
306        RegionRequest::Flush(RegionFlushRequest {
307            row_group_size: None,
308        }),
309    )])
310}
311
312fn make_region_compact(compact: CompactRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
313    let region_id = compact.region_id.into();
314    let options = compact
315        .options
316        .unwrap_or(compact_request::Options::Regular(Default::default()));
317    Ok(vec![(
318        region_id,
319        RegionRequest::Compact(RegionCompactRequest { options }),
320    )])
321}
322
323fn make_region_truncate(truncate: TruncateRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
324    let region_id = truncate.region_id.into();
325    Ok(vec![(
326        region_id,
327        RegionRequest::Truncate(RegionTruncateRequest {}),
328    )])
329}
330
331/// Convert [BulkInsertRequest] to [RegionRequest] and group by [RegionId].
332fn make_region_bulk_inserts(request: BulkInsertRequest) -> Result<Vec<(RegionId, RegionRequest)>> {
333    let Some(Body::ArrowIpc(request)) = request.body else {
334        return Ok(vec![]);
335    };
336
337    let decoder_timer = metrics::CONVERT_REGION_BULK_REQUEST
338        .with_label_values(&["decode"])
339        .start_timer();
340    let schema_data = FlightData::decode(request.schema.clone()).context(ProstSnafu)?;
341    let payload_data = FlightData::decode(request.payload.clone()).context(ProstSnafu)?;
342    let mut decoder = FlightDecoder::default();
343    let _ = decoder.try_decode(&schema_data).context(FlightCodecSnafu)?;
344    let FlightMessage::Recordbatch(rb) = decoder
345        .try_decode(&payload_data)
346        .context(FlightCodecSnafu)?
347    else {
348        unreachable!("Always expect record batch message after schema");
349    };
350    decoder_timer.observe_duration();
351    let payload = rb.into_df_record_batch();
352    let region_id: RegionId = request.region_id.into();
353    Ok(vec![(
354        region_id,
355        RegionRequest::BulkInserts(RegionBulkInsertsRequest { region_id, payload }),
356    )])
357}
358
359/// Request to put data into a region.
360#[derive(Debug)]
361pub struct RegionPutRequest {
362    /// Rows to put.
363    pub rows: Rows,
364    /// Write hint.
365    pub hint: Option<WriteHint>,
366}
367
368#[derive(Debug)]
369pub struct RegionReadRequest {
370    pub request: ScanRequest,
371}
372
373/// Request to delete data from a region.
374#[derive(Debug)]
375pub struct RegionDeleteRequest {
376    /// Keys to rows to delete.
377    ///
378    /// Each row only contains primary key columns and a time index column.
379    pub rows: Rows,
380}
381
382#[derive(Debug, Clone)]
383pub struct RegionCreateRequest {
384    /// Region engine name
385    pub engine: String,
386    /// Columns in this region.
387    pub column_metadatas: Vec<ColumnMetadata>,
388    /// Columns in the primary key.
389    pub primary_key: Vec<ColumnId>,
390    /// Options of the created region.
391    pub options: HashMap<String, String>,
392    /// Directory for region's data home. Usually is composed by catalog and table id
393    pub region_dir: String,
394}
395
396impl RegionCreateRequest {
397    /// Checks whether the request is valid, returns an error if it is invalid.
398    pub fn validate(&self) -> Result<()> {
399        // time index must exist
400        ensure!(
401            self.column_metadatas
402                .iter()
403                .any(|x| x.semantic_type == SemanticType::Timestamp),
404            InvalidRegionRequestSnafu {
405                region_id: RegionId::new(0, 0),
406                err: "missing timestamp column in create region request".to_string(),
407            }
408        );
409
410        // build column id to indices
411        let mut column_id_to_indices = HashMap::with_capacity(self.column_metadatas.len());
412        for (i, c) in self.column_metadatas.iter().enumerate() {
413            if let Some(previous) = column_id_to_indices.insert(c.column_id, i) {
414                return InvalidRegionRequestSnafu {
415                    region_id: RegionId::new(0, 0),
416                    err: format!(
417                        "duplicate column id {} (at position {} and {}) in create region request",
418                        c.column_id, previous, i
419                    ),
420                }
421                .fail();
422            }
423        }
424
425        // primary key must exist
426        for column_id in &self.primary_key {
427            ensure!(
428                column_id_to_indices.contains_key(column_id),
429                InvalidRegionRequestSnafu {
430                    region_id: RegionId::new(0, 0),
431                    err: format!(
432                        "missing primary key column {} in create region request",
433                        column_id
434                    ),
435                }
436            );
437        }
438
439        Ok(())
440    }
441
442    /// Returns true when the region belongs to the metric engine's physical table.
443    pub fn is_physical_table(&self) -> bool {
444        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
445    }
446}
447
448#[derive(Debug, Clone)]
449pub struct RegionDropRequest {
450    pub fast_path: bool,
451}
452
453/// Open region request.
454#[derive(Debug, Clone)]
455pub struct RegionOpenRequest {
456    /// Region engine name
457    pub engine: String,
458    /// Data directory of the region.
459    pub region_dir: String,
460    /// Options of the opened region.
461    pub options: HashMap<String, String>,
462    /// To skip replaying the WAL.
463    pub skip_wal_replay: bool,
464}
465
466impl RegionOpenRequest {
467    /// Returns true when the region belongs to the metric engine's physical table.
468    pub fn is_physical_table(&self) -> bool {
469        self.options.contains_key(PHYSICAL_TABLE_METADATA_KEY)
470    }
471}
472
473/// Close region request.
474#[derive(Debug)]
475pub struct RegionCloseRequest {}
476
477/// Alter metadata of a region.
478#[derive(Debug, PartialEq, Eq, Clone)]
479pub struct RegionAlterRequest {
480    /// Kind of alteration to do.
481    pub kind: AlterKind,
482}
483
484impl RegionAlterRequest {
485    /// Checks whether the request is valid, returns an error if it is invalid.
486    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
487        self.kind.validate(metadata)?;
488
489        Ok(())
490    }
491
492    /// Returns true if we need to apply the request to the region.
493    ///
494    /// The `request` should be valid.
495    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
496        debug_assert!(self.validate(metadata).is_ok());
497        self.kind.need_alter(metadata)
498    }
499}
500
501impl TryFrom<AlterRequest> for RegionAlterRequest {
502    type Error = MetadataError;
503
504    fn try_from(value: AlterRequest) -> Result<Self> {
505        let kind = value.kind.context(InvalidRawRegionRequestSnafu {
506            err: "missing kind in AlterRequest",
507        })?;
508
509        let kind = AlterKind::try_from(kind)?;
510        Ok(RegionAlterRequest { kind })
511    }
512}
513
514/// Kind of the alteration.
515#[derive(Debug, PartialEq, Eq, Clone, AsRefStr)]
516pub enum AlterKind {
517    /// Add columns to the region.
518    AddColumns {
519        /// Columns to add.
520        columns: Vec<AddColumn>,
521    },
522    /// Drop columns from the region, only fields are allowed to drop.
523    DropColumns {
524        /// Name of columns to drop.
525        names: Vec<String>,
526    },
527    /// Change columns datatype form the region, only fields are allowed to change.
528    ModifyColumnTypes {
529        /// Columns to change.
530        columns: Vec<ModifyColumnType>,
531    },
532    /// Set region options.
533    SetRegionOptions { options: Vec<SetRegionOption> },
534    /// Unset region options.
535    UnsetRegionOptions { keys: Vec<UnsetRegionOption> },
536    /// Set index options.
537    SetIndex { options: ApiSetIndexOptions },
538    /// Unset index options.
539    UnsetIndex { options: ApiUnsetIndexOptions },
540}
541
542#[derive(Debug, PartialEq, Eq, Clone)]
543pub enum ApiSetIndexOptions {
544    Fulltext {
545        column_name: String,
546        options: FulltextOptions,
547    },
548    Inverted {
549        column_name: String,
550    },
551    Skipping {
552        column_name: String,
553        options: SkippingIndexOptions,
554    },
555}
556
557impl ApiSetIndexOptions {
558    pub fn column_name(&self) -> &String {
559        match self {
560            ApiSetIndexOptions::Fulltext { column_name, .. } => column_name,
561            ApiSetIndexOptions::Inverted { column_name } => column_name,
562            ApiSetIndexOptions::Skipping { column_name, .. } => column_name,
563        }
564    }
565
566    pub fn is_fulltext(&self) -> bool {
567        match self {
568            ApiSetIndexOptions::Fulltext { .. } => true,
569            ApiSetIndexOptions::Inverted { .. } => false,
570            ApiSetIndexOptions::Skipping { .. } => false,
571        }
572    }
573}
574
575#[derive(Debug, PartialEq, Eq, Clone)]
576pub enum ApiUnsetIndexOptions {
577    Fulltext { column_name: String },
578    Inverted { column_name: String },
579    Skipping { column_name: String },
580}
581
582impl ApiUnsetIndexOptions {
583    pub fn column_name(&self) -> &String {
584        match self {
585            ApiUnsetIndexOptions::Fulltext { column_name } => column_name,
586            ApiUnsetIndexOptions::Inverted { column_name } => column_name,
587            ApiUnsetIndexOptions::Skipping { column_name } => column_name,
588        }
589    }
590
591    pub fn is_fulltext(&self) -> bool {
592        match self {
593            ApiUnsetIndexOptions::Fulltext { .. } => true,
594            ApiUnsetIndexOptions::Inverted { .. } => false,
595            ApiUnsetIndexOptions::Skipping { .. } => false,
596        }
597    }
598}
599
600impl AlterKind {
601    /// Returns an error if the alter kind is invalid.
602    ///
603    /// It allows adding column if not exists and dropping column if exists.
604    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
605        match self {
606            AlterKind::AddColumns { columns } => {
607                for col_to_add in columns {
608                    col_to_add.validate(metadata)?;
609                }
610            }
611            AlterKind::DropColumns { names } => {
612                for name in names {
613                    Self::validate_column_to_drop(name, metadata)?;
614                }
615            }
616            AlterKind::ModifyColumnTypes { columns } => {
617                for col_to_change in columns {
618                    col_to_change.validate(metadata)?;
619                }
620            }
621            AlterKind::SetRegionOptions { .. } => {}
622            AlterKind::UnsetRegionOptions { .. } => {}
623            AlterKind::SetIndex { options } => {
624                Self::validate_column_alter_index_option(
625                    options.column_name(),
626                    metadata,
627                    options.is_fulltext(),
628                )?;
629            }
630            AlterKind::UnsetIndex { options } => {
631                Self::validate_column_alter_index_option(
632                    options.column_name(),
633                    metadata,
634                    options.is_fulltext(),
635                )?;
636            }
637        }
638        Ok(())
639    }
640
641    /// Returns true if we need to apply the alteration to the region.
642    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
643        debug_assert!(self.validate(metadata).is_ok());
644        match self {
645            AlterKind::AddColumns { columns } => columns
646                .iter()
647                .any(|col_to_add| col_to_add.need_alter(metadata)),
648            AlterKind::DropColumns { names } => names
649                .iter()
650                .any(|name| metadata.column_by_name(name).is_some()),
651            AlterKind::ModifyColumnTypes { columns } => columns
652                .iter()
653                .any(|col_to_change| col_to_change.need_alter(metadata)),
654            AlterKind::SetRegionOptions { .. } => {
655                // we need to update region options for `ChangeTableOptions`.
656                // todo: we need to check if ttl has ever changed.
657                true
658            }
659            AlterKind::UnsetRegionOptions { .. } => true,
660            AlterKind::SetIndex { options, .. } => {
661                metadata.column_by_name(options.column_name()).is_some()
662            }
663            AlterKind::UnsetIndex { options } => {
664                metadata.column_by_name(options.column_name()).is_some()
665            }
666        }
667    }
668
669    /// Returns an error if the column to drop is invalid.
670    fn validate_column_to_drop(name: &str, metadata: &RegionMetadata) -> Result<()> {
671        let Some(column) = metadata.column_by_name(name) else {
672            return Ok(());
673        };
674        ensure!(
675            column.semantic_type == SemanticType::Field,
676            InvalidRegionRequestSnafu {
677                region_id: metadata.region_id,
678                err: format!("column {} is not a field and could not be dropped", name),
679            }
680        );
681        Ok(())
682    }
683
684    /// Returns an error if the column's alter index option is invalid.
685    fn validate_column_alter_index_option(
686        column_name: &String,
687        metadata: &RegionMetadata,
688        is_fulltext: bool,
689    ) -> Result<()> {
690        let column = metadata
691            .column_by_name(column_name)
692            .context(InvalidRegionRequestSnafu {
693                region_id: metadata.region_id,
694                err: format!("column {} not found", column_name),
695            })?;
696
697        if is_fulltext {
698            ensure!(
699                column.column_schema.data_type.is_string(),
700                InvalidRegionRequestSnafu {
701                    region_id: metadata.region_id,
702                    err: format!(
703                        "cannot change alter index options for non-string column {}",
704                        column_name
705                    ),
706                }
707            );
708        }
709
710        Ok(())
711    }
712}
713
714impl TryFrom<alter_request::Kind> for AlterKind {
715    type Error = MetadataError;
716
717    fn try_from(kind: alter_request::Kind) -> Result<Self> {
718        let alter_kind = match kind {
719            alter_request::Kind::AddColumns(x) => {
720                let columns = x
721                    .add_columns
722                    .into_iter()
723                    .map(|x| x.try_into())
724                    .collect::<Result<Vec<_>>>()?;
725                AlterKind::AddColumns { columns }
726            }
727            alter_request::Kind::ModifyColumnTypes(x) => {
728                let columns = x
729                    .modify_column_types
730                    .into_iter()
731                    .map(|x| x.into())
732                    .collect::<Vec<_>>();
733                AlterKind::ModifyColumnTypes { columns }
734            }
735            alter_request::Kind::DropColumns(x) => {
736                let names = x.drop_columns.into_iter().map(|x| x.name).collect();
737                AlterKind::DropColumns { names }
738            }
739            alter_request::Kind::SetTableOptions(options) => AlterKind::SetRegionOptions {
740                options: options
741                    .table_options
742                    .iter()
743                    .map(TryFrom::try_from)
744                    .collect::<Result<Vec<_>>>()?,
745            },
746            alter_request::Kind::UnsetTableOptions(options) => AlterKind::UnsetRegionOptions {
747                keys: options
748                    .keys
749                    .iter()
750                    .map(|key| UnsetRegionOption::try_from(key.as_str()))
751                    .collect::<Result<Vec<_>>>()?,
752            },
753            alter_request::Kind::SetIndex(o) => match o.options.unwrap() {
754                set_index::Options::Fulltext(x) => AlterKind::SetIndex {
755                    options: ApiSetIndexOptions::Fulltext {
756                        column_name: x.column_name.clone(),
757                        options: FulltextOptions {
758                            enable: x.enable,
759                            analyzer: as_fulltext_option_analyzer(
760                                Analyzer::try_from(x.analyzer).context(DecodeProtoSnafu)?,
761                            ),
762                            case_sensitive: x.case_sensitive,
763                            backend: as_fulltext_option_backend(
764                                PbFulltextBackend::try_from(x.backend).context(DecodeProtoSnafu)?,
765                            ),
766                        },
767                    },
768                },
769                set_index::Options::Inverted(i) => AlterKind::SetIndex {
770                    options: ApiSetIndexOptions::Inverted {
771                        column_name: i.column_name,
772                    },
773                },
774                set_index::Options::Skipping(s) => AlterKind::SetIndex {
775                    options: ApiSetIndexOptions::Skipping {
776                        column_name: s.column_name,
777                        options: SkippingIndexOptions {
778                            index_type: as_skipping_index_type(
779                                PbSkippingIndexType::try_from(s.skipping_index_type)
780                                    .context(DecodeProtoSnafu)?,
781                            ),
782                            granularity: s.granularity as u32,
783                        },
784                    },
785                },
786            },
787            alter_request::Kind::UnsetIndex(o) => match o.options.unwrap() {
788                v1::unset_index::Options::Fulltext(f) => AlterKind::UnsetIndex {
789                    options: ApiUnsetIndexOptions::Fulltext {
790                        column_name: f.column_name,
791                    },
792                },
793                v1::unset_index::Options::Inverted(i) => AlterKind::UnsetIndex {
794                    options: ApiUnsetIndexOptions::Inverted {
795                        column_name: i.column_name,
796                    },
797                },
798                v1::unset_index::Options::Skipping(s) => AlterKind::UnsetIndex {
799                    options: ApiUnsetIndexOptions::Skipping {
800                        column_name: s.column_name,
801                    },
802                },
803            },
804        };
805
806        Ok(alter_kind)
807    }
808}
809
810/// Adds a column.
811#[derive(Debug, PartialEq, Eq, Clone)]
812pub struct AddColumn {
813    /// Metadata of the column to add.
814    pub column_metadata: ColumnMetadata,
815    /// Location to add the column. If location is None, the region adds
816    /// the column to the last.
817    pub location: Option<AddColumnLocation>,
818}
819
820impl AddColumn {
821    /// Returns an error if the column to add is invalid.
822    ///
823    /// It allows adding existing columns. However, the existing column must have the same metadata
824    /// and the location must be None.
825    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
826        ensure!(
827            self.column_metadata.column_schema.is_nullable()
828                || self
829                    .column_metadata
830                    .column_schema
831                    .default_constraint()
832                    .is_some(),
833            InvalidRegionRequestSnafu {
834                region_id: metadata.region_id,
835                err: format!(
836                    "no default value for column {}",
837                    self.column_metadata.column_schema.name
838                ),
839            }
840        );
841
842        if let Some(existing_column) =
843            metadata.column_by_name(&self.column_metadata.column_schema.name)
844        {
845            // If the column already exists.
846            ensure!(
847                *existing_column == self.column_metadata,
848                InvalidRegionRequestSnafu {
849                    region_id: metadata.region_id,
850                    err: format!(
851                        "column {} already exists with different metadata, existing: {:?}, got: {:?}",
852                        self.column_metadata.column_schema.name, existing_column, self.column_metadata,
853                    ),
854                }
855            );
856            ensure!(
857                self.location.is_none(),
858                InvalidRegionRequestSnafu {
859                    region_id: metadata.region_id,
860                    err: format!(
861                        "column {} already exists, but location is specified",
862                        self.column_metadata.column_schema.name
863                    ),
864                }
865            );
866        }
867
868        if let Some(existing_column) = metadata.column_by_id(self.column_metadata.column_id) {
869            // Ensures the existing column has the same name.
870            ensure!(
871                existing_column.column_schema.name == self.column_metadata.column_schema.name,
872                InvalidRegionRequestSnafu {
873                    region_id: metadata.region_id,
874                    err: format!(
875                        "column id {} already exists with different name {}",
876                        self.column_metadata.column_id, existing_column.column_schema.name
877                    ),
878                }
879            );
880        }
881
882        Ok(())
883    }
884
885    /// Returns true if no column to add to the region.
886    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
887        debug_assert!(self.validate(metadata).is_ok());
888        metadata
889            .column_by_name(&self.column_metadata.column_schema.name)
890            .is_none()
891    }
892}
893
894impl TryFrom<v1::region::AddColumn> for AddColumn {
895    type Error = MetadataError;
896
897    fn try_from(add_column: v1::region::AddColumn) -> Result<Self> {
898        let column_def = add_column
899            .column_def
900            .context(InvalidRawRegionRequestSnafu {
901                err: "missing column_def in AddColumn",
902            })?;
903
904        let column_metadata = ColumnMetadata::try_from_column_def(column_def)?;
905        let location = add_column
906            .location
907            .map(AddColumnLocation::try_from)
908            .transpose()?;
909
910        Ok(AddColumn {
911            column_metadata,
912            location,
913        })
914    }
915}
916
917/// Location to add a column.
918#[derive(Debug, PartialEq, Eq, Clone)]
919pub enum AddColumnLocation {
920    /// Add the column to the first position of columns.
921    First,
922    /// Add the column after specific column.
923    After {
924        /// Add the column after this column.
925        column_name: String,
926    },
927}
928
929impl TryFrom<v1::AddColumnLocation> for AddColumnLocation {
930    type Error = MetadataError;
931
932    fn try_from(location: v1::AddColumnLocation) -> Result<Self> {
933        let location_type = LocationType::try_from(location.location_type)
934            .map_err(|e| InvalidRawRegionRequestSnafu { err: e.to_string() }.build())?;
935        let add_column_location = match location_type {
936            LocationType::First => AddColumnLocation::First,
937            LocationType::After => AddColumnLocation::After {
938                column_name: location.after_column_name,
939            },
940        };
941
942        Ok(add_column_location)
943    }
944}
945
946/// Change a column's datatype.
947#[derive(Debug, PartialEq, Eq, Clone)]
948pub struct ModifyColumnType {
949    /// Schema of the column to modify.
950    pub column_name: String,
951    /// Column will be changed to this type.
952    pub target_type: ConcreteDataType,
953}
954
955impl ModifyColumnType {
956    /// Returns an error if the column's datatype to change is invalid.
957    pub fn validate(&self, metadata: &RegionMetadata) -> Result<()> {
958        let column_meta = metadata
959            .column_by_name(&self.column_name)
960            .with_context(|| InvalidRegionRequestSnafu {
961                region_id: metadata.region_id,
962                err: format!("column {} not found", self.column_name),
963            })?;
964
965        ensure!(
966            matches!(column_meta.semantic_type, SemanticType::Field),
967            InvalidRegionRequestSnafu {
968                region_id: metadata.region_id,
969                err: "'timestamp' or 'tag' column cannot change type".to_string()
970            }
971        );
972        ensure!(
973            column_meta
974                .column_schema
975                .data_type
976                .can_arrow_type_cast_to(&self.target_type),
977            InvalidRegionRequestSnafu {
978                region_id: metadata.region_id,
979                err: format!(
980                    "column '{}' cannot be cast automatically to type '{}'",
981                    self.column_name, self.target_type
982                ),
983            }
984        );
985
986        Ok(())
987    }
988
989    /// Returns true if no column's datatype to change to the region.
990    pub fn need_alter(&self, metadata: &RegionMetadata) -> bool {
991        debug_assert!(self.validate(metadata).is_ok());
992        metadata.column_by_name(&self.column_name).is_some()
993    }
994}
995
996impl From<v1::ModifyColumnType> for ModifyColumnType {
997    fn from(modify_column_type: v1::ModifyColumnType) -> Self {
998        let target_type = ColumnDataTypeWrapper::new(
999            modify_column_type.target_type(),
1000            modify_column_type.target_type_extension,
1001        )
1002        .into();
1003
1004        ModifyColumnType {
1005            column_name: modify_column_type.column_name,
1006            target_type,
1007        }
1008    }
1009}
1010
1011#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1012pub enum SetRegionOption {
1013    Ttl(Option<TimeToLive>),
1014    // Modifying TwscOptions with values as (option name, new value).
1015    Twsc(String, String),
1016}
1017
1018impl TryFrom<&PbOption> for SetRegionOption {
1019    type Error = MetadataError;
1020
1021    fn try_from(value: &PbOption) -> std::result::Result<Self, Self::Error> {
1022        let PbOption { key, value } = value;
1023        match key.as_str() {
1024            TTL_KEY => {
1025                let ttl = TimeToLive::from_humantime_or_str(value)
1026                    .map_err(|_| InvalidSetRegionOptionRequestSnafu { key, value }.build())?;
1027
1028                Ok(Self::Ttl(Some(ttl)))
1029            }
1030            TWCS_MAX_ACTIVE_WINDOW_RUNS
1031            | TWCS_MAX_ACTIVE_WINDOW_FILES
1032            | TWCS_MAX_INACTIVE_WINDOW_FILES
1033            | TWCS_MAX_INACTIVE_WINDOW_RUNS
1034            | TWCS_MAX_OUTPUT_FILE_SIZE
1035            | TWCS_TIME_WINDOW => Ok(Self::Twsc(key.to_string(), value.to_string())),
1036            _ => InvalidSetRegionOptionRequestSnafu { key, value }.fail(),
1037        }
1038    }
1039}
1040
1041impl From<&UnsetRegionOption> for SetRegionOption {
1042    fn from(unset_option: &UnsetRegionOption) -> Self {
1043        match unset_option {
1044            UnsetRegionOption::TwcsMaxActiveWindowFiles => {
1045                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1046            }
1047            UnsetRegionOption::TwcsMaxInactiveWindowFiles => {
1048                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1049            }
1050            UnsetRegionOption::TwcsMaxActiveWindowRuns => {
1051                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1052            }
1053            UnsetRegionOption::TwcsMaxInactiveWindowRuns => {
1054                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1055            }
1056            UnsetRegionOption::TwcsMaxOutputFileSize => {
1057                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1058            }
1059            UnsetRegionOption::TwcsTimeWindow => {
1060                SetRegionOption::Twsc(unset_option.to_string(), String::new())
1061            }
1062            UnsetRegionOption::Ttl => SetRegionOption::Ttl(Default::default()),
1063        }
1064    }
1065}
1066
1067impl TryFrom<&str> for UnsetRegionOption {
1068    type Error = MetadataError;
1069
1070    fn try_from(key: &str) -> Result<Self> {
1071        match key.to_ascii_lowercase().as_str() {
1072            TTL_KEY => Ok(Self::Ttl),
1073            TWCS_MAX_ACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxActiveWindowFiles),
1074            TWCS_MAX_INACTIVE_WINDOW_FILES => Ok(Self::TwcsMaxInactiveWindowFiles),
1075            TWCS_MAX_ACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxActiveWindowRuns),
1076            TWCS_MAX_INACTIVE_WINDOW_RUNS => Ok(Self::TwcsMaxInactiveWindowRuns),
1077            TWCS_MAX_OUTPUT_FILE_SIZE => Ok(Self::TwcsMaxOutputFileSize),
1078            TWCS_TIME_WINDOW => Ok(Self::TwcsTimeWindow),
1079            _ => InvalidUnsetRegionOptionRequestSnafu { key }.fail(),
1080        }
1081    }
1082}
1083
1084#[derive(Debug, Eq, PartialEq, Clone, Serialize, Deserialize)]
1085pub enum UnsetRegionOption {
1086    TwcsMaxActiveWindowFiles,
1087    TwcsMaxInactiveWindowFiles,
1088    TwcsMaxActiveWindowRuns,
1089    TwcsMaxInactiveWindowRuns,
1090    TwcsMaxOutputFileSize,
1091    TwcsTimeWindow,
1092    Ttl,
1093}
1094
1095impl UnsetRegionOption {
1096    pub fn as_str(&self) -> &str {
1097        match self {
1098            Self::Ttl => TTL_KEY,
1099            Self::TwcsMaxActiveWindowFiles => TWCS_MAX_ACTIVE_WINDOW_FILES,
1100            Self::TwcsMaxInactiveWindowFiles => TWCS_MAX_INACTIVE_WINDOW_FILES,
1101            Self::TwcsMaxActiveWindowRuns => TWCS_MAX_ACTIVE_WINDOW_RUNS,
1102            Self::TwcsMaxInactiveWindowRuns => TWCS_MAX_INACTIVE_WINDOW_RUNS,
1103            Self::TwcsMaxOutputFileSize => TWCS_MAX_OUTPUT_FILE_SIZE,
1104            Self::TwcsTimeWindow => TWCS_TIME_WINDOW,
1105        }
1106    }
1107}
1108
1109impl Display for UnsetRegionOption {
1110    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1111        write!(f, "{}", self.as_str())
1112    }
1113}
1114
1115#[derive(Debug, Clone, Default)]
1116pub struct RegionFlushRequest {
1117    pub row_group_size: Option<usize>,
1118}
1119
1120#[derive(Debug)]
1121pub struct RegionCompactRequest {
1122    pub options: compact_request::Options,
1123}
1124
1125impl Default for RegionCompactRequest {
1126    fn default() -> Self {
1127        Self {
1128            // Default to regular compaction.
1129            options: compact_request::Options::Regular(Default::default()),
1130        }
1131    }
1132}
1133
1134/// Truncate region request.
1135#[derive(Debug)]
1136pub struct RegionTruncateRequest {}
1137
1138/// Catchup region request.
1139///
1140/// Makes a readonly region to catch up to leader region changes.
1141/// There is no effect if it operating on a leader region.
1142#[derive(Debug, Clone, Copy)]
1143pub struct RegionCatchupRequest {
1144    /// Sets it to writable if it's available after it has caught up with all changes.
1145    pub set_writable: bool,
1146    /// The `entry_id` that was expected to reply to.
1147    /// `None` stands replaying to latest.
1148    pub entry_id: Option<entry::Id>,
1149    /// Used for metrics metadata region.
1150    /// The `entry_id` that was expected to reply to.
1151    /// `None` stands replaying to latest.
1152    pub metadata_entry_id: Option<entry::Id>,
1153    /// The hint for replaying memtable.
1154    pub location_id: Option<u64>,
1155}
1156
1157/// Get sequences of regions by region ids.
1158#[derive(Debug, Clone)]
1159pub struct RegionSequencesRequest {
1160    pub region_ids: Vec<RegionId>,
1161}
1162
1163#[derive(Debug, Clone)]
1164pub struct RegionBulkInsertsRequest {
1165    pub region_id: RegionId,
1166    pub payload: DfRecordBatch,
1167}
1168
1169impl RegionBulkInsertsRequest {
1170    pub fn estimated_size(&self) -> usize {
1171        self.payload.get_array_memory_size()
1172    }
1173}
1174
1175impl fmt::Display for RegionRequest {
1176    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
1177        match self {
1178            RegionRequest::Put(_) => write!(f, "Put"),
1179            RegionRequest::Delete(_) => write!(f, "Delete"),
1180            RegionRequest::Create(_) => write!(f, "Create"),
1181            RegionRequest::Drop(_) => write!(f, "Drop"),
1182            RegionRequest::Open(_) => write!(f, "Open"),
1183            RegionRequest::Close(_) => write!(f, "Close"),
1184            RegionRequest::Alter(_) => write!(f, "Alter"),
1185            RegionRequest::Flush(_) => write!(f, "Flush"),
1186            RegionRequest::Compact(_) => write!(f, "Compact"),
1187            RegionRequest::Truncate(_) => write!(f, "Truncate"),
1188            RegionRequest::Catchup(_) => write!(f, "Catchup"),
1189            RegionRequest::BulkInserts(_) => write!(f, "BulkInserts"),
1190        }
1191    }
1192}
1193
1194#[cfg(test)]
1195mod tests {
1196    use api::v1::region::RegionColumnDef;
1197    use api::v1::{ColumnDataType, ColumnDef};
1198    use datatypes::prelude::ConcreteDataType;
1199    use datatypes::schema::{ColumnSchema, FulltextAnalyzer, FulltextBackend};
1200
1201    use super::*;
1202    use crate::metadata::RegionMetadataBuilder;
1203
1204    #[test]
1205    fn test_from_proto_location() {
1206        let proto_location = v1::AddColumnLocation {
1207            location_type: LocationType::First as i32,
1208            after_column_name: String::default(),
1209        };
1210        let location = AddColumnLocation::try_from(proto_location).unwrap();
1211        assert_eq!(location, AddColumnLocation::First);
1212
1213        let proto_location = v1::AddColumnLocation {
1214            location_type: 10,
1215            after_column_name: String::default(),
1216        };
1217        AddColumnLocation::try_from(proto_location).unwrap_err();
1218
1219        let proto_location = v1::AddColumnLocation {
1220            location_type: LocationType::After as i32,
1221            after_column_name: "a".to_string(),
1222        };
1223        let location = AddColumnLocation::try_from(proto_location).unwrap();
1224        assert_eq!(
1225            location,
1226            AddColumnLocation::After {
1227                column_name: "a".to_string()
1228            }
1229        );
1230    }
1231
1232    #[test]
1233    fn test_from_none_proto_add_column() {
1234        AddColumn::try_from(v1::region::AddColumn {
1235            column_def: None,
1236            location: None,
1237        })
1238        .unwrap_err();
1239    }
1240
1241    #[test]
1242    fn test_from_proto_alter_request() {
1243        RegionAlterRequest::try_from(AlterRequest {
1244            region_id: 0,
1245            schema_version: 1,
1246            kind: None,
1247        })
1248        .unwrap_err();
1249
1250        let request = RegionAlterRequest::try_from(AlterRequest {
1251            region_id: 0,
1252            schema_version: 1,
1253            kind: Some(alter_request::Kind::AddColumns(v1::region::AddColumns {
1254                add_columns: vec![v1::region::AddColumn {
1255                    column_def: Some(RegionColumnDef {
1256                        column_def: Some(ColumnDef {
1257                            name: "a".to_string(),
1258                            data_type: ColumnDataType::String as i32,
1259                            is_nullable: true,
1260                            default_constraint: vec![],
1261                            semantic_type: SemanticType::Field as i32,
1262                            comment: String::new(),
1263                            ..Default::default()
1264                        }),
1265                        column_id: 1,
1266                    }),
1267                    location: Some(v1::AddColumnLocation {
1268                        location_type: LocationType::First as i32,
1269                        after_column_name: String::default(),
1270                    }),
1271                }],
1272            })),
1273        })
1274        .unwrap();
1275
1276        assert_eq!(
1277            request,
1278            RegionAlterRequest {
1279                kind: AlterKind::AddColumns {
1280                    columns: vec![AddColumn {
1281                        column_metadata: ColumnMetadata {
1282                            column_schema: ColumnSchema::new(
1283                                "a",
1284                                ConcreteDataType::string_datatype(),
1285                                true,
1286                            ),
1287                            semantic_type: SemanticType::Field,
1288                            column_id: 1,
1289                        },
1290                        location: Some(AddColumnLocation::First),
1291                    }]
1292                },
1293            }
1294        );
1295    }
1296
1297    /// Returns a new region metadata for testing. Metadata:
1298    /// `[(ts, ms, 1), (tag_0, string, 2), (field_0, string, 3), (field_1, bool, 4)]`
1299    fn new_metadata() -> RegionMetadata {
1300        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1301        builder
1302            .push_column_metadata(ColumnMetadata {
1303                column_schema: ColumnSchema::new(
1304                    "ts",
1305                    ConcreteDataType::timestamp_millisecond_datatype(),
1306                    false,
1307                ),
1308                semantic_type: SemanticType::Timestamp,
1309                column_id: 1,
1310            })
1311            .push_column_metadata(ColumnMetadata {
1312                column_schema: ColumnSchema::new(
1313                    "tag_0",
1314                    ConcreteDataType::string_datatype(),
1315                    true,
1316                ),
1317                semantic_type: SemanticType::Tag,
1318                column_id: 2,
1319            })
1320            .push_column_metadata(ColumnMetadata {
1321                column_schema: ColumnSchema::new(
1322                    "field_0",
1323                    ConcreteDataType::string_datatype(),
1324                    true,
1325                ),
1326                semantic_type: SemanticType::Field,
1327                column_id: 3,
1328            })
1329            .push_column_metadata(ColumnMetadata {
1330                column_schema: ColumnSchema::new(
1331                    "field_1",
1332                    ConcreteDataType::boolean_datatype(),
1333                    true,
1334                ),
1335                semantic_type: SemanticType::Field,
1336                column_id: 4,
1337            })
1338            .primary_key(vec![2]);
1339        builder.build().unwrap()
1340    }
1341
1342    #[test]
1343    fn test_add_column_validate() {
1344        let metadata = new_metadata();
1345        let add_column = AddColumn {
1346            column_metadata: ColumnMetadata {
1347                column_schema: ColumnSchema::new(
1348                    "tag_1",
1349                    ConcreteDataType::string_datatype(),
1350                    true,
1351                ),
1352                semantic_type: SemanticType::Tag,
1353                column_id: 5,
1354            },
1355            location: None,
1356        };
1357        add_column.validate(&metadata).unwrap();
1358        assert!(add_column.need_alter(&metadata));
1359
1360        // Add not null column.
1361        AddColumn {
1362            column_metadata: ColumnMetadata {
1363                column_schema: ColumnSchema::new(
1364                    "tag_1",
1365                    ConcreteDataType::string_datatype(),
1366                    false,
1367                ),
1368                semantic_type: SemanticType::Tag,
1369                column_id: 5,
1370            },
1371            location: None,
1372        }
1373        .validate(&metadata)
1374        .unwrap_err();
1375
1376        // Add existing column.
1377        let add_column = AddColumn {
1378            column_metadata: ColumnMetadata {
1379                column_schema: ColumnSchema::new(
1380                    "tag_0",
1381                    ConcreteDataType::string_datatype(),
1382                    true,
1383                ),
1384                semantic_type: SemanticType::Tag,
1385                column_id: 2,
1386            },
1387            location: None,
1388        };
1389        add_column.validate(&metadata).unwrap();
1390        assert!(!add_column.need_alter(&metadata));
1391    }
1392
1393    #[test]
1394    fn test_add_duplicate_columns() {
1395        let kind = AlterKind::AddColumns {
1396            columns: vec![
1397                AddColumn {
1398                    column_metadata: ColumnMetadata {
1399                        column_schema: ColumnSchema::new(
1400                            "tag_1",
1401                            ConcreteDataType::string_datatype(),
1402                            true,
1403                        ),
1404                        semantic_type: SemanticType::Tag,
1405                        column_id: 5,
1406                    },
1407                    location: None,
1408                },
1409                AddColumn {
1410                    column_metadata: ColumnMetadata {
1411                        column_schema: ColumnSchema::new(
1412                            "tag_1",
1413                            ConcreteDataType::string_datatype(),
1414                            true,
1415                        ),
1416                        semantic_type: SemanticType::Field,
1417                        column_id: 6,
1418                    },
1419                    location: None,
1420                },
1421            ],
1422        };
1423        let metadata = new_metadata();
1424        kind.validate(&metadata).unwrap();
1425        assert!(kind.need_alter(&metadata));
1426    }
1427
1428    #[test]
1429    fn test_add_existing_column_different_metadata() {
1430        let metadata = new_metadata();
1431
1432        // Add existing column with different id.
1433        let kind = AlterKind::AddColumns {
1434            columns: vec![AddColumn {
1435                column_metadata: ColumnMetadata {
1436                    column_schema: ColumnSchema::new(
1437                        "tag_0",
1438                        ConcreteDataType::string_datatype(),
1439                        true,
1440                    ),
1441                    semantic_type: SemanticType::Tag,
1442                    column_id: 4,
1443                },
1444                location: None,
1445            }],
1446        };
1447        kind.validate(&metadata).unwrap_err();
1448
1449        // Add existing column with different type.
1450        let kind = AlterKind::AddColumns {
1451            columns: vec![AddColumn {
1452                column_metadata: ColumnMetadata {
1453                    column_schema: ColumnSchema::new(
1454                        "tag_0",
1455                        ConcreteDataType::int64_datatype(),
1456                        true,
1457                    ),
1458                    semantic_type: SemanticType::Tag,
1459                    column_id: 2,
1460                },
1461                location: None,
1462            }],
1463        };
1464        kind.validate(&metadata).unwrap_err();
1465
1466        // Add existing column with different name.
1467        let kind = AlterKind::AddColumns {
1468            columns: vec![AddColumn {
1469                column_metadata: ColumnMetadata {
1470                    column_schema: ColumnSchema::new(
1471                        "tag_1",
1472                        ConcreteDataType::string_datatype(),
1473                        true,
1474                    ),
1475                    semantic_type: SemanticType::Tag,
1476                    column_id: 2,
1477                },
1478                location: None,
1479            }],
1480        };
1481        kind.validate(&metadata).unwrap_err();
1482    }
1483
1484    #[test]
1485    fn test_add_existing_column_with_location() {
1486        let metadata = new_metadata();
1487        let kind = AlterKind::AddColumns {
1488            columns: vec![AddColumn {
1489                column_metadata: ColumnMetadata {
1490                    column_schema: ColumnSchema::new(
1491                        "tag_0",
1492                        ConcreteDataType::string_datatype(),
1493                        true,
1494                    ),
1495                    semantic_type: SemanticType::Tag,
1496                    column_id: 2,
1497                },
1498                location: Some(AddColumnLocation::First),
1499            }],
1500        };
1501        kind.validate(&metadata).unwrap_err();
1502    }
1503
1504    #[test]
1505    fn test_validate_drop_column() {
1506        let metadata = new_metadata();
1507        let kind = AlterKind::DropColumns {
1508            names: vec!["xxxx".to_string()],
1509        };
1510        kind.validate(&metadata).unwrap();
1511        assert!(!kind.need_alter(&metadata));
1512
1513        AlterKind::DropColumns {
1514            names: vec!["tag_0".to_string()],
1515        }
1516        .validate(&metadata)
1517        .unwrap_err();
1518
1519        let kind = AlterKind::DropColumns {
1520            names: vec!["field_0".to_string()],
1521        };
1522        kind.validate(&metadata).unwrap();
1523        assert!(kind.need_alter(&metadata));
1524    }
1525
1526    #[test]
1527    fn test_validate_modify_column_type() {
1528        let metadata = new_metadata();
1529        AlterKind::ModifyColumnTypes {
1530            columns: vec![ModifyColumnType {
1531                column_name: "xxxx".to_string(),
1532                target_type: ConcreteDataType::string_datatype(),
1533            }],
1534        }
1535        .validate(&metadata)
1536        .unwrap_err();
1537
1538        AlterKind::ModifyColumnTypes {
1539            columns: vec![ModifyColumnType {
1540                column_name: "field_1".to_string(),
1541                target_type: ConcreteDataType::date_datatype(),
1542            }],
1543        }
1544        .validate(&metadata)
1545        .unwrap_err();
1546
1547        AlterKind::ModifyColumnTypes {
1548            columns: vec![ModifyColumnType {
1549                column_name: "ts".to_string(),
1550                target_type: ConcreteDataType::date_datatype(),
1551            }],
1552        }
1553        .validate(&metadata)
1554        .unwrap_err();
1555
1556        AlterKind::ModifyColumnTypes {
1557            columns: vec![ModifyColumnType {
1558                column_name: "tag_0".to_string(),
1559                target_type: ConcreteDataType::date_datatype(),
1560            }],
1561        }
1562        .validate(&metadata)
1563        .unwrap_err();
1564
1565        let kind = AlterKind::ModifyColumnTypes {
1566            columns: vec![ModifyColumnType {
1567                column_name: "field_0".to_string(),
1568                target_type: ConcreteDataType::int32_datatype(),
1569            }],
1570        };
1571        kind.validate(&metadata).unwrap();
1572        assert!(kind.need_alter(&metadata));
1573    }
1574
1575    #[test]
1576    fn test_validate_add_columns() {
1577        let kind = AlterKind::AddColumns {
1578            columns: vec![
1579                AddColumn {
1580                    column_metadata: ColumnMetadata {
1581                        column_schema: ColumnSchema::new(
1582                            "tag_1",
1583                            ConcreteDataType::string_datatype(),
1584                            true,
1585                        ),
1586                        semantic_type: SemanticType::Tag,
1587                        column_id: 5,
1588                    },
1589                    location: None,
1590                },
1591                AddColumn {
1592                    column_metadata: ColumnMetadata {
1593                        column_schema: ColumnSchema::new(
1594                            "field_2",
1595                            ConcreteDataType::string_datatype(),
1596                            true,
1597                        ),
1598                        semantic_type: SemanticType::Field,
1599                        column_id: 6,
1600                    },
1601                    location: None,
1602                },
1603            ],
1604        };
1605        let request = RegionAlterRequest { kind };
1606        let mut metadata = new_metadata();
1607        metadata.schema_version = 1;
1608        request.validate(&metadata).unwrap();
1609    }
1610
1611    #[test]
1612    fn test_validate_create_region() {
1613        let column_metadatas = vec![
1614            ColumnMetadata {
1615                column_schema: ColumnSchema::new(
1616                    "ts",
1617                    ConcreteDataType::timestamp_millisecond_datatype(),
1618                    false,
1619                ),
1620                semantic_type: SemanticType::Timestamp,
1621                column_id: 1,
1622            },
1623            ColumnMetadata {
1624                column_schema: ColumnSchema::new(
1625                    "tag_0",
1626                    ConcreteDataType::string_datatype(),
1627                    true,
1628                ),
1629                semantic_type: SemanticType::Tag,
1630                column_id: 2,
1631            },
1632            ColumnMetadata {
1633                column_schema: ColumnSchema::new(
1634                    "field_0",
1635                    ConcreteDataType::string_datatype(),
1636                    true,
1637                ),
1638                semantic_type: SemanticType::Field,
1639                column_id: 3,
1640            },
1641        ];
1642        let create = RegionCreateRequest {
1643            engine: "mito".to_string(),
1644            column_metadatas,
1645            primary_key: vec![3, 4],
1646            options: HashMap::new(),
1647            region_dir: "path".to_string(),
1648        };
1649
1650        assert!(create.validate().is_err());
1651    }
1652
1653    #[test]
1654    fn test_validate_modify_column_fulltext_options() {
1655        let kind = AlterKind::SetIndex {
1656            options: ApiSetIndexOptions::Fulltext {
1657                column_name: "tag_0".to_string(),
1658                options: FulltextOptions {
1659                    enable: true,
1660                    analyzer: FulltextAnalyzer::Chinese,
1661                    case_sensitive: false,
1662                    backend: FulltextBackend::Bloom,
1663                },
1664            },
1665        };
1666        let request = RegionAlterRequest { kind };
1667        let mut metadata = new_metadata();
1668        metadata.schema_version = 1;
1669        request.validate(&metadata).unwrap();
1670
1671        let kind = AlterKind::UnsetIndex {
1672            options: ApiUnsetIndexOptions::Fulltext {
1673                column_name: "tag_0".to_string(),
1674            },
1675        };
1676        let request = RegionAlterRequest { kind };
1677        let mut metadata = new_metadata();
1678        metadata.schema_version = 1;
1679        request.validate(&metadata).unwrap();
1680    }
1681}