Skip to main content

mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{
37    MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
38};
39use store_api::region_request::{
40    AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
41    RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
42    RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
43    RegionOpenRequest, RegionRequest, RegionTruncateRequest, StagingPartitionDirective,
44};
45use store_api::storage::{FileId, RegionId};
46use tokio::sync::oneshot::{self, Receiver, Sender};
47
48use crate::error::{
49    CompactRegionSnafu, CompactionCancelledSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
50    Error, FillDefaultSnafu, FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu,
51    MissingPartitionExprSnafu, Result, UnexpectedSnafu,
52};
53use crate::flush::FlushReason;
54use crate::manifest::action::{RegionEdit, TruncateKind};
55use crate::memtable::MemtableId;
56use crate::memtable::bulk::part::BulkPart;
57use crate::metrics::COMPACTION_ELAPSED_TOTAL;
58use crate::region::options::RegionOptions;
59use crate::sst::file::FileMeta;
60use crate::sst::index::IndexBuildType;
61use crate::wal::EntryId;
62use crate::wal::entry_distributor::WalEntryReceiver;
63
64/// Request to write a region.
65#[derive(Debug)]
66pub struct WriteRequest {
67    /// Region to write.
68    pub region_id: RegionId,
69    /// Type of the write request.
70    pub op_type: OpType,
71    /// Rows to write.
72    pub rows: Rows,
73    /// Map column name to column index in `rows`.
74    pub name_to_index: HashMap<String, usize>,
75    /// Whether each column has null.
76    pub has_null: Vec<bool>,
77    /// Write hint.
78    pub hint: Option<WriteHint>,
79    /// Region metadata on the time of this request is created.
80    pub(crate) region_metadata: Option<RegionMetadataRef>,
81    /// Partition expression version for the region.
82    pub partition_expr_version: Option<u64>,
83}
84
85impl WriteRequest {
86    /// Creates a new request.
87    ///
88    /// Returns `Err` if `rows` are invalid.
89    pub fn new(
90        region_id: RegionId,
91        op_type: OpType,
92        rows: Rows,
93        region_metadata: Option<RegionMetadataRef>,
94    ) -> Result<WriteRequest> {
95        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
96        for (index, column) in rows.schema.iter().enumerate() {
97            ensure!(
98                name_to_index
99                    .insert(column.column_name.clone(), index)
100                    .is_none(),
101                InvalidRequestSnafu {
102                    region_id,
103                    reason: format!("duplicate column {}", column.column_name),
104                }
105            );
106        }
107
108        let mut has_null = vec![false; rows.schema.len()];
109        for row in &rows.rows {
110            ensure!(
111                row.values.len() == rows.schema.len(),
112                InvalidRequestSnafu {
113                    region_id,
114                    reason: format!(
115                        "row has {} columns but schema has {}",
116                        row.values.len(),
117                        rows.schema.len()
118                    ),
119                }
120            );
121
122            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
123                validate_proto_value(region_id, value, column_schema)?;
124
125                if value.value_data.is_none() {
126                    has_null[i] = true;
127                }
128            }
129        }
130
131        Ok(WriteRequest {
132            region_id,
133            op_type,
134            rows,
135            name_to_index,
136            has_null,
137            hint: None,
138            region_metadata,
139            partition_expr_version: None,
140        })
141    }
142
143    /// Sets the write hint.
144    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
145        self.hint = hint;
146        self
147    }
148
149    pub fn with_partition_expr_version(mut self, partition_expr_version: Option<u64>) -> Self {
150        self.partition_expr_version = partition_expr_version;
151        self
152    }
153
154    /// Returns the encoding hint.
155    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
156        infer_primary_key_encoding_from_hint(self.hint.as_ref())
157    }
158
159    /// Returns estimated size of the request.
160    pub(crate) fn estimated_size(&self) -> usize {
161        let row_size = self
162            .rows
163            .rows
164            .first()
165            .map(|row| row.encoded_len())
166            .unwrap_or(0);
167        row_size * self.rows.rows.len()
168    }
169
170    /// Gets column index by name.
171    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
172        self.name_to_index.get(name).copied()
173    }
174
175    /// Checks schema of rows is compatible with schema of the region.
176    ///
177    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
178    /// error.
179    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
180        debug_assert_eq!(self.region_id, metadata.region_id);
181
182        let region_id = self.region_id;
183        // Index all columns in rows.
184        let mut rows_columns: HashMap<_, _> = self
185            .rows
186            .schema
187            .iter()
188            .map(|column| (&column.column_name, column))
189            .collect();
190
191        let mut need_fill_default = false;
192        // Checks all columns in this region.
193        for column in &metadata.column_metadatas {
194            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
195                // Check data type.
196                ensure!(
197                    is_column_type_value_eq(
198                        input_col.datatype,
199                        input_col.datatype_extension.clone(),
200                        &column.column_schema.data_type
201                    ),
202                    InvalidRequestSnafu {
203                        region_id,
204                        reason: format!(
205                            "column {} expect type {:?}, given: {}({})",
206                            column.column_schema.name,
207                            column.column_schema.data_type,
208                            ColumnDataType::try_from(input_col.datatype)
209                                .map(|v| v.as_str_name())
210                                .unwrap_or("Unknown"),
211                            input_col.datatype,
212                        )
213                    }
214                );
215
216                // Check semantic type.
217                ensure!(
218                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
219                    InvalidRequestSnafu {
220                        region_id,
221                        reason: format!(
222                            "column {} has semantic type {:?}, given: {}({})",
223                            column.column_schema.name,
224                            column.semantic_type,
225                            api::v1::SemanticType::try_from(input_col.semantic_type)
226                                .map(|v| v.as_str_name())
227                                .unwrap_or("Unknown"),
228                            input_col.semantic_type
229                        ),
230                    }
231                );
232
233                // Check nullable.
234                // Safety: `rows_columns` ensures this column exists.
235                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
236                ensure!(
237                    !has_null || column.column_schema.is_nullable(),
238                    InvalidRequestSnafu {
239                        region_id,
240                        reason: format!(
241                            "column {} is not null but input has null",
242                            column.column_schema.name
243                        ),
244                    }
245                );
246            } else {
247                // Rows don't have this column.
248                self.check_missing_column(column)?;
249
250                need_fill_default = true;
251            }
252        }
253
254        // Checks all columns in rows exist in the region.
255        if !rows_columns.is_empty() {
256            let names: Vec<_> = rows_columns.into_keys().collect();
257            return InvalidRequestSnafu {
258                region_id,
259                reason: format!("unknown columns: {:?}", names),
260            }
261            .fail();
262        }
263
264        // If we need to fill default values, return a special error.
265        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
266
267        Ok(())
268    }
269
270    /// Tries to fill missing columns.
271    ///
272    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
273    /// values.
274    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
275        debug_assert_eq!(self.region_id, metadata.region_id);
276
277        let mut columns_to_fill = vec![];
278        for column in &metadata.column_metadatas {
279            if !self.name_to_index.contains_key(&column.column_schema.name) {
280                columns_to_fill.push(column);
281            }
282        }
283        self.fill_columns(columns_to_fill)?;
284
285        Ok(())
286    }
287
288    /// Checks the schema and fill missing columns.
289    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
290        if let Err(e) = self.check_schema(metadata) {
291            if e.is_fill_default() {
292                // TODO(yingwen): Add metrics for this case.
293                // We need to fill default value. The write request may be a request
294                // sent before changing the schema.
295                self.fill_missing_columns(metadata)?;
296            } else {
297                return Err(e);
298            }
299        }
300
301        Ok(())
302    }
303
304    /// Fills default value for specific `columns`.
305    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
306        let mut default_values = Vec::with_capacity(columns.len());
307        let mut columns_to_fill = Vec::with_capacity(columns.len());
308        for column in columns {
309            let default_value = self.column_default_value(column)?;
310            if default_value.value_data.is_some() {
311                default_values.push(default_value);
312                columns_to_fill.push(column);
313            }
314        }
315
316        for row in &mut self.rows.rows {
317            row.values.extend(default_values.iter().cloned());
318        }
319
320        for column in columns_to_fill {
321            let (datatype, datatype_ext) =
322                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
323                    .with_context(|_| ConvertColumnDataTypeSnafu {
324                        reason: format!(
325                            "no protobuf type for column {} ({:?})",
326                            column.column_schema.name, column.column_schema.data_type
327                        ),
328                    })?
329                    .to_parts();
330            self.rows.schema.push(ColumnSchema {
331                column_name: column.column_schema.name.clone(),
332                datatype: datatype as i32,
333                semantic_type: column.semantic_type as i32,
334                datatype_extension: datatype_ext,
335                options: options_from_column_schema(&column.column_schema),
336            });
337        }
338
339        Ok(())
340    }
341
342    /// Checks whether we should allow a row doesn't provide this column.
343    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
344        if self.op_type == OpType::Delete {
345            if column.semantic_type == SemanticType::Field {
346                // For delete request, all tags and timestamp is required. We don't fill default
347                // tag or timestamp while deleting rows.
348                return Ok(());
349            } else {
350                return InvalidRequestSnafu {
351                    region_id: self.region_id,
352                    reason: format!("delete requests need column {}", column.column_schema.name),
353                }
354                .fail();
355            }
356        }
357
358        // Not a delete request. Checks whether they have default value.
359        ensure!(
360            column.column_schema.is_nullable()
361                || column.column_schema.default_constraint().is_some(),
362            InvalidRequestSnafu {
363                region_id: self.region_id,
364                reason: format!("missing column {}", column.column_schema.name),
365            }
366        );
367
368        Ok(())
369    }
370
371    /// Returns the default value for specific column.
372    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
373        let default_value = match self.op_type {
374            OpType::Delete => {
375                ensure!(
376                    column.semantic_type == SemanticType::Field,
377                    InvalidRequestSnafu {
378                        region_id: self.region_id,
379                        reason: format!(
380                            "delete requests need column {}",
381                            column.column_schema.name
382                        ),
383                    }
384                );
385
386                // For delete request, we need a default value for padding so we
387                // can delete a row even a field doesn't have a default value. So the
388                // value doesn't need to following the default value constraint of the
389                // column.
390                if column.column_schema.is_nullable() {
391                    datatypes::value::Value::Null
392                } else {
393                    column.column_schema.data_type.default_value()
394                }
395            }
396            OpType::Put => {
397                // For put requests, we use the default value from column schema.
398                if column.column_schema.is_default_impure() {
399                    UnexpectedSnafu {
400                        reason: format!(
401                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
402                            self.region_id,
403                            column.column_schema.name,
404                            column.column_schema.default_constraint(),
405                        ),
406                    }
407                    .fail()?
408                }
409                column
410                    .column_schema
411                    .create_default()
412                    .context(CreateDefaultSnafu {
413                        region_id: self.region_id,
414                        column: &column.column_schema.name,
415                    })?
416                    // This column doesn't have default value.
417                    .with_context(|| InvalidRequestSnafu {
418                        region_id: self.region_id,
419                        reason: format!(
420                            "column {} does not have default value",
421                            column.column_schema.name
422                        ),
423                    })?
424            }
425        };
426
427        // Convert default value into proto's value.
428        Ok(api::helper::to_grpc_value(default_value))
429    }
430}
431
432/// Validate proto value schema.
433pub(crate) fn validate_proto_value(
434    region_id: RegionId,
435    value: &Value,
436    column_schema: &ColumnSchema,
437) -> Result<()> {
438    if let Some(value_type) = proto_value_type(value) {
439        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
440            InvalidRequestSnafu {
441                region_id,
442                reason: format!(
443                    "column {} has unknown type {}",
444                    column_schema.column_name, column_schema.datatype
445                ),
446            }
447            .build()
448        })?;
449        ensure!(
450            proto_value_type_match(column_type, value_type),
451            InvalidRequestSnafu {
452                region_id,
453                reason: format!(
454                    "value has type {:?}, but column {} has type {:?}({})",
455                    value_type, column_schema.column_name, column_type, column_schema.datatype,
456                ),
457            }
458        );
459    }
460
461    Ok(())
462}
463
464fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
465    match (column_type, value_type) {
466        (ct, vt) if ct == vt => true,
467        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
468        (ColumnDataType::Json, ColumnDataType::Binary) => true,
469        _ => false,
470    }
471}
472
473/// Oneshot output result sender.
474#[derive(Debug)]
475pub struct OutputTx(Sender<Result<AffectedRows>>);
476
477impl OutputTx {
478    /// Creates a new output sender.
479    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
480        OutputTx(sender)
481    }
482
483    /// Sends the `result`.
484    pub(crate) fn send(self, result: Result<AffectedRows>) {
485        // Ignores send result.
486        let _ = self.0.send(result);
487    }
488}
489
490/// Optional output result sender.
491#[derive(Debug)]
492pub(crate) struct OptionOutputTx(Option<OutputTx>);
493
494impl OptionOutputTx {
495    /// Creates a sender.
496    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
497        OptionOutputTx(sender)
498    }
499
500    /// Creates an empty sender.
501    pub(crate) fn none() -> OptionOutputTx {
502        OptionOutputTx(None)
503    }
504
505    /// Sends the `result` and consumes the inner sender.
506    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
507        if let Some(sender) = self.0.take() {
508            sender.send(result);
509        }
510    }
511
512    /// Sends the `result` and consumes the sender.
513    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
514        if let Some(sender) = self.0.take() {
515            sender.send(result);
516        }
517    }
518
519    /// Takes the inner sender.
520    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
521        self.0.take()
522    }
523}
524
525impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
526    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
527        Self::new(Some(OutputTx::new(sender)))
528    }
529}
530
531impl OnFailure for OptionOutputTx {
532    fn on_failure(&mut self, err: Error) {
533        self.send_mut(Err(err));
534    }
535}
536
537/// Callback on failure.
538pub(crate) trait OnFailure {
539    /// Handles `err` on failure.
540    fn on_failure(&mut self, err: Error);
541}
542
543/// Sender and write request.
544#[derive(Debug)]
545pub(crate) struct SenderWriteRequest {
546    /// Result sender.
547    pub(crate) sender: OptionOutputTx,
548    pub(crate) request: WriteRequest,
549}
550
551pub(crate) struct SenderBulkRequest {
552    pub(crate) sender: OptionOutputTx,
553    pub(crate) region_id: RegionId,
554    pub(crate) request: BulkPart,
555    pub(crate) region_metadata: RegionMetadataRef,
556    pub(crate) partition_expr_version: Option<u64>,
557}
558
559#[derive(Debug)]
560pub(crate) struct BulkInsertRequest {
561    pub(crate) metadata: Option<RegionMetadataRef>,
562    pub(crate) request: RegionBulkInsertsRequest,
563    pub(crate) sender: OptionOutputTx,
564}
565
566/// Request sent to a worker with timestamp
567#[derive(Debug)]
568pub(crate) struct WorkerRequestWithTime {
569    pub(crate) request: WorkerRequest,
570    pub(crate) created_at: Instant,
571}
572
573impl WorkerRequestWithTime {
574    pub(crate) fn new(request: WorkerRequest) -> Self {
575        Self {
576            request,
577            created_at: Instant::now(),
578        }
579    }
580}
581
582/// Request sent to a worker
583#[derive(Debug)]
584pub(crate) enum WorkerRequest {
585    /// Write to a region.
586    Write(SenderWriteRequest),
587
588    /// Ddl request to a region.
589    Ddl(SenderDdlRequest),
590
591    /// Notifications from internal background jobs.
592    Background {
593        /// Id of the region to send.
594        region_id: RegionId,
595        /// Internal notification.
596        notify: BackgroundNotify,
597    },
598
599    /// The internal commands.
600    SetRegionRoleStateGracefully {
601        /// Id of the region to send.
602        region_id: RegionId,
603        /// The [SettableRegionRoleState].
604        region_role_state: SettableRegionRoleState,
605        /// The sender of [SetReadonlyResponse].
606        sender: Sender<SetRegionRoleStateResponse>,
607    },
608
609    /// Notify a worker to stop.
610    Stop,
611
612    /// Use [RegionEdit] to edit a region directly.
613    EditRegion(RegionEditRequest),
614
615    /// Keep the manifest of a region up to date.
616    SyncRegion(RegionSyncRequest),
617
618    /// Bulk inserts request and region metadata.
619    BulkInserts(BulkInsertRequest),
620
621    /// Remap manifests request.
622    RemapManifests(RemapManifestsRequest),
623
624    /// Copy region from request.
625    CopyRegionFrom(CopyRegionFromRequest),
626}
627
628impl WorkerRequest {
629    /// Creates a new open region request.
630    pub(crate) fn new_open_region_request(
631        region_id: RegionId,
632        request: RegionOpenRequest,
633        entry_receiver: Option<WalEntryReceiver>,
634    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
635        let (sender, receiver) = oneshot::channel();
636
637        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
638            region_id,
639            sender: sender.into(),
640            request: DdlRequest::Open((request, entry_receiver)),
641        });
642
643        (worker_request, receiver)
644    }
645
646    /// Creates a new catchup region request.
647    pub(crate) fn new_catchup_region_request(
648        region_id: RegionId,
649        request: RegionCatchupRequest,
650        entry_receiver: Option<WalEntryReceiver>,
651    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
652        let (sender, receiver) = oneshot::channel();
653        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
654            region_id,
655            sender: sender.into(),
656            request: DdlRequest::Catchup((request, entry_receiver)),
657        });
658        (worker_request, receiver)
659    }
660
661    /// Converts request from a [RegionRequest].
662    pub(crate) fn try_from_region_request(
663        region_id: RegionId,
664        value: RegionRequest,
665        region_metadata: Option<RegionMetadataRef>,
666    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
667        let (sender, receiver) = oneshot::channel();
668        let worker_request = match value {
669            RegionRequest::Put(v) => {
670                let mut write_request =
671                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
672                        .with_hint(v.hint)
673                        .with_partition_expr_version(v.partition_expr_version);
674                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
675                    && let Some(region_metadata) = &region_metadata
676                {
677                    write_request.maybe_fill_missing_columns(region_metadata)?;
678                }
679                WorkerRequest::Write(SenderWriteRequest {
680                    sender: sender.into(),
681                    request: write_request,
682                })
683            }
684            RegionRequest::Delete(v) => {
685                let mut write_request =
686                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
687                        .with_hint(v.hint)
688                        .with_partition_expr_version(v.partition_expr_version);
689                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
690                    && let Some(region_metadata) = &region_metadata
691                {
692                    write_request.maybe_fill_missing_columns(region_metadata)?;
693                }
694                WorkerRequest::Write(SenderWriteRequest {
695                    sender: sender.into(),
696                    request: write_request,
697                })
698            }
699            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
700                region_id,
701                sender: sender.into(),
702                request: DdlRequest::Create(v),
703            }),
704            RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
705                region_id,
706                sender: sender.into(),
707                request: DdlRequest::Drop(v),
708            }),
709            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
710                region_id,
711                sender: sender.into(),
712                request: DdlRequest::Open((v, None)),
713            }),
714            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
715                region_id,
716                sender: sender.into(),
717                request: DdlRequest::Close(v),
718            }),
719            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
720                region_id,
721                sender: sender.into(),
722                request: DdlRequest::Alter(v),
723            }),
724            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
725                region_id,
726                sender: sender.into(),
727                request: DdlRequest::Flush(v),
728            }),
729            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
730                region_id,
731                sender: sender.into(),
732                request: DdlRequest::Compact(v),
733            }),
734            RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
735                region_id,
736                sender: sender.into(),
737                request: DdlRequest::BuildIndex(v),
738            }),
739            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
740                region_id,
741                sender: sender.into(),
742                request: DdlRequest::Truncate(v),
743            }),
744            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
745                region_id,
746                sender: sender.into(),
747                request: DdlRequest::Catchup((v, None)),
748            }),
749            RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
750                region_id,
751                sender: sender.into(),
752                request: DdlRequest::EnterStaging(v),
753            }),
754            RegionRequest::BulkInserts(region_bulk_inserts_request) => {
755                WorkerRequest::BulkInserts(BulkInsertRequest {
756                    metadata: region_metadata,
757                    sender: sender.into(),
758                    request: region_bulk_inserts_request,
759                })
760            }
761            RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
762                region_id,
763                sender: sender.into(),
764                request: DdlRequest::ApplyStagingManifest(v),
765            }),
766        };
767
768        Ok((worker_request, receiver))
769    }
770
771    pub(crate) fn new_set_readonly_gracefully(
772        region_id: RegionId,
773        region_role_state: SettableRegionRoleState,
774    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
775        let (sender, receiver) = oneshot::channel();
776
777        (
778            WorkerRequest::SetRegionRoleStateGracefully {
779                region_id,
780                region_role_state,
781                sender,
782            },
783            receiver,
784        )
785    }
786
787    pub(crate) fn new_sync_region_request(
788        region_id: RegionId,
789        manifest_version: ManifestVersion,
790    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
791        let (sender, receiver) = oneshot::channel();
792        (
793            WorkerRequest::SyncRegion(RegionSyncRequest {
794                region_id,
795                manifest_version,
796                sender,
797            }),
798            receiver,
799        )
800    }
801
802    /// Converts [RemapManifestsRequest] from a [RemapManifestsRequest](store_api::region_engine::RemapManifestsRequest).
803    ///
804    /// # Errors
805    ///
806    /// Returns an error if the partition expression is invalid or missing.
807    /// Returns an error if the new partition expressions are not found for some regions.
808    #[allow(clippy::type_complexity)]
809    pub(crate) fn try_from_remap_manifests_request(
810        store_api::region_engine::RemapManifestsRequest {
811            region_id,
812            input_regions,
813            region_mapping,
814            new_partition_exprs,
815        }: store_api::region_engine::RemapManifestsRequest,
816    ) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
817        let (sender, receiver) = oneshot::channel();
818        let new_partition_exprs = new_partition_exprs
819            .into_iter()
820            .map(|(k, v)| {
821                Ok((
822                    k,
823                    PartitionExpr::from_json_str(&v)
824                        .context(InvalidPartitionExprSnafu { expr: v })?
825                        .context(MissingPartitionExprSnafu { region_id: k })?,
826                ))
827            })
828            .collect::<Result<HashMap<_, _>>>()?;
829
830        let request = RemapManifestsRequest {
831            region_id,
832            input_regions,
833            region_mapping,
834            new_partition_exprs,
835            sender,
836        };
837
838        Ok((WorkerRequest::RemapManifests(request), receiver))
839    }
840
841    /// Converts [CopyRegionFromRequest] from a [MitoCopyRegionFromRequest](store_api::region_engine::MitoCopyRegionFromRequest).
842    pub(crate) fn try_from_copy_region_from_request(
843        region_id: RegionId,
844        store_api::region_engine::MitoCopyRegionFromRequest {
845            source_region_id,
846            parallelism,
847        }: store_api::region_engine::MitoCopyRegionFromRequest,
848    ) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
849        let (sender, receiver) = oneshot::channel();
850        let request = CopyRegionFromRequest {
851            region_id,
852            source_region_id,
853            parallelism,
854            sender,
855        };
856        Ok((WorkerRequest::CopyRegionFrom(request), receiver))
857    }
858}
859
860/// DDL request to a region.
861#[derive(Debug)]
862pub(crate) enum DdlRequest {
863    Create(RegionCreateRequest),
864    Drop(RegionDropRequest),
865    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
866    Close(RegionCloseRequest),
867    Alter(RegionAlterRequest),
868    Flush(RegionFlushRequest),
869    Compact(RegionCompactRequest),
870    BuildIndex(RegionBuildIndexRequest),
871    Truncate(RegionTruncateRequest),
872    Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
873    EnterStaging(EnterStagingRequest),
874    ApplyStagingManifest(ApplyStagingManifestRequest),
875}
876
877/// Sender and Ddl request.
878#[derive(Debug)]
879pub(crate) struct SenderDdlRequest {
880    /// Region id of the request.
881    pub(crate) region_id: RegionId,
882    /// Result sender.
883    pub(crate) sender: OptionOutputTx,
884    /// Ddl request.
885    pub(crate) request: DdlRequest,
886}
887
888/// Notification from a background job.
889#[derive(Debug)]
890pub(crate) enum BackgroundNotify {
891    /// Flush has finished.
892    FlushFinished(FlushFinished),
893    /// Flush has failed.
894    FlushFailed(FlushFailed),
895    /// Index build has finished.
896    IndexBuildFinished(IndexBuildFinished),
897    /// Index build has been stopped (aborted or succeeded).
898    IndexBuildStopped(IndexBuildStopped),
899    /// Index build has failed.
900    IndexBuildFailed(IndexBuildFailed),
901    /// Compaction has finished.
902    CompactionFinished(CompactionFinished),
903    /// Compaction has been cancelled cooperatively.
904    CompactionCancelled(CompactionCancelled),
905    /// Compaction has failed.
906    CompactionFailed(CompactionFailed),
907    /// Truncate result.
908    Truncate(TruncateResult),
909    /// Region change result.
910    RegionChange(RegionChangeResult),
911    /// Region edit result.
912    RegionEdit(RegionEditResult),
913    /// Enter staging result.
914    EnterStaging(EnterStagingResult),
915    /// Copy region result.
916    CopyRegionFromFinished(CopyRegionFromFinished),
917}
918
919/// Notifies a flush job is finished.
920#[derive(Debug)]
921pub(crate) struct FlushFinished {
922    /// Region id.
923    pub(crate) region_id: RegionId,
924    /// Entry id of flushed data.
925    pub(crate) flushed_entry_id: EntryId,
926    /// Flush result senders.
927    pub(crate) senders: Vec<OutputTx>,
928    /// Flush timer.
929    pub(crate) _timer: HistogramTimer,
930    /// Region edit to apply.
931    pub(crate) edit: RegionEdit,
932    /// Memtables to remove.
933    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
934    /// Whether the region is in staging mode.
935    pub(crate) is_staging: bool,
936    /// Reason for flush.
937    pub(crate) flush_reason: FlushReason,
938}
939
940impl FlushFinished {
941    /// Marks the flush job as successful and observes the timer.
942    pub(crate) fn on_success(self) {
943        for sender in self.senders {
944            sender.send(Ok(0));
945        }
946    }
947}
948
949impl OnFailure for FlushFinished {
950    fn on_failure(&mut self, err: Error) {
951        let err = Arc::new(err);
952        for sender in self.senders.drain(..) {
953            sender.send(Err(err.clone()).context(FlushRegionSnafu {
954                region_id: self.region_id,
955            }));
956        }
957    }
958}
959
960/// Notifies a flush job is failed.
961#[derive(Debug)]
962pub(crate) struct FlushFailed {
963    /// The error source of the failure.
964    pub(crate) err: Arc<Error>,
965}
966
967#[derive(Debug)]
968pub(crate) struct IndexBuildFinished {
969    #[allow(dead_code)]
970    pub(crate) region_id: RegionId,
971    pub(crate) edit: RegionEdit,
972}
973
974/// Notifies an index build job has been stopped.
975#[derive(Debug)]
976pub(crate) struct IndexBuildStopped {
977    #[allow(dead_code)]
978    pub(crate) region_id: RegionId,
979    pub(crate) file_id: FileId,
980}
981
982/// Notifies an index build job has failed.
983#[derive(Debug)]
984pub(crate) struct IndexBuildFailed {
985    pub(crate) err: Arc<Error>,
986}
987
988/// Notifies a compaction job has finished.
989#[derive(Debug)]
990pub(crate) struct CompactionFinished {
991    /// Region id.
992    pub(crate) region_id: RegionId,
993    /// Compaction result senders.
994    pub(crate) senders: Vec<OutputTx>,
995    /// Start time of compaction task.
996    pub(crate) start_time: Instant,
997    /// Region edit to apply.
998    pub(crate) edit: RegionEdit,
999}
1000
1001/// Notifies a compaction job has been cancelled cooperatively.
1002#[derive(Debug)]
1003pub(crate) struct CompactionCancelled {
1004    /// Region id.
1005    pub(crate) region_id: RegionId,
1006    /// Waiters to wake once the cancellation has been observed by the worker.
1007    pub(crate) senders: Vec<OutputTx>,
1008}
1009
1010impl CompactionCancelled {
1011    pub(crate) fn on_success(self) {
1012        for sender in self.senders {
1013            sender.send(CompactionCancelledSnafu {}.fail());
1014        }
1015        info!("Compaction cancelled for region: {}", self.region_id);
1016    }
1017}
1018
1019impl CompactionFinished {
1020    pub fn on_success(self) {
1021        // only update compaction time on success
1022        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
1023
1024        for sender in self.senders {
1025            sender.send(Ok(0));
1026        }
1027        info!("Successfully compacted region: {}", self.region_id);
1028    }
1029}
1030
1031impl OnFailure for CompactionFinished {
1032    /// Compaction succeeded but failed to update manifest or region's already been dropped.
1033    fn on_failure(&mut self, err: Error) {
1034        let err = Arc::new(err);
1035        for sender in self.senders.drain(..) {
1036            sender.send(Err(err.clone()).context(CompactRegionSnafu {
1037                region_id: self.region_id,
1038            }));
1039        }
1040    }
1041}
1042
1043/// A failing compaction result.
1044#[derive(Debug)]
1045pub(crate) struct CompactionFailed {
1046    pub(crate) region_id: RegionId,
1047    /// The error source of the failure.
1048    pub(crate) err: Arc<Error>,
1049}
1050
1051/// Notifies the truncate result of a region.
1052#[derive(Debug)]
1053pub(crate) struct TruncateResult {
1054    /// Region id.
1055    pub(crate) region_id: RegionId,
1056    /// Result sender.
1057    pub(crate) sender: OptionOutputTx,
1058    /// Truncate result.
1059    pub(crate) result: Result<()>,
1060    pub(crate) kind: TruncateKind,
1061}
1062
1063/// Notifies the region the result of writing region change action.
1064#[derive(Debug)]
1065pub(crate) struct RegionChangeResult {
1066    /// Region id.
1067    pub(crate) region_id: RegionId,
1068    /// The new region metadata to apply.
1069    pub(crate) new_meta: RegionMetadataRef,
1070    /// Result sender.
1071    pub(crate) sender: OptionOutputTx,
1072    /// Result from the manifest manager.
1073    pub(crate) result: Result<()>,
1074    /// Used for index build in schema change.
1075    pub(crate) need_index: bool,
1076    /// New options for the region.
1077    pub(crate) new_options: Option<RegionOptions>,
1078}
1079
1080/// Notifies the region the result of entering staging.
1081#[derive(Debug)]
1082pub(crate) struct EnterStagingResult {
1083    /// Region id.
1084    pub(crate) region_id: RegionId,
1085    /// The new staging partition directive to apply.
1086    pub(crate) partition_directive: StagingPartitionDirective,
1087    /// Result sender.
1088    pub(crate) sender: OptionOutputTx,
1089    /// Result from the manifest manager.
1090    pub(crate) result: Result<()>,
1091}
1092
1093#[derive(Debug)]
1094pub(crate) struct CopyRegionFromFinished {
1095    /// Region id.
1096    pub(crate) region_id: RegionId,
1097    /// Region edit to apply.
1098    pub(crate) edit: RegionEdit,
1099    /// Result sender.
1100    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1101}
1102
1103/// Request to edit a region directly.
1104#[derive(Debug)]
1105pub(crate) struct RegionEditRequest {
1106    pub(crate) region_id: RegionId,
1107    pub(crate) edit: RegionEdit,
1108    /// The sender to notify the result to the region engine.
1109    pub(crate) tx: Sender<Result<()>>,
1110}
1111
1112/// Notifies the regin the result of editing region.
1113#[derive(Debug)]
1114pub(crate) struct RegionEditResult {
1115    /// Region id.
1116    pub(crate) region_id: RegionId,
1117    /// Result sender.
1118    pub(crate) sender: Sender<Result<()>>,
1119    /// Region edit to apply.
1120    pub(crate) edit: RegionEdit,
1121    /// Result from the manifest manager.
1122    pub(crate) result: Result<()>,
1123    /// Whether region state need to be set to Writable after handling this request.
1124    pub(crate) update_region_state: bool,
1125    /// The region is in staging mode before handling this request.
1126    pub(crate) is_staging: bool,
1127}
1128
1129#[derive(Debug)]
1130pub(crate) struct BuildIndexRequest {
1131    pub(crate) region_id: RegionId,
1132    pub(crate) build_type: IndexBuildType,
1133    /// files need to build index, empty means all.
1134    pub(crate) file_metas: Vec<FileMeta>,
1135}
1136
1137#[derive(Debug)]
1138pub(crate) struct RegionSyncRequest {
1139    pub(crate) region_id: RegionId,
1140    pub(crate) manifest_version: ManifestVersion,
1141    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
1142    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1143}
1144
1145#[derive(Debug)]
1146pub(crate) struct RemapManifestsRequest {
1147    /// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation.
1148    pub(crate) region_id: RegionId,
1149    /// Regions to remap manifests from.
1150    pub(crate) input_regions: Vec<RegionId>,
1151    /// For each old region, which new regions should receive its files
1152    pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1153    /// New partition expressions for the new regions.
1154    pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1155    /// Sender for the result of the remap operation.
1156    ///
1157    /// The result is a map from region IDs to their corresponding staging manifest paths.
1158    pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
1159}
1160
1161#[derive(Debug)]
1162pub(crate) struct CopyRegionFromRequest {
1163    /// The [`RegionId`] of the target region.
1164    pub(crate) region_id: RegionId,
1165    /// The [`RegionId`] of the source region.
1166    pub(crate) source_region_id: RegionId,
1167    /// The parallelism of the copy operation.
1168    pub(crate) parallelism: usize,
1169    /// Result sender.
1170    pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1171}
1172
1173#[cfg(test)]
1174mod tests {
1175    use api::v1::value::ValueData;
1176    use api::v1::{Row, SemanticType};
1177    use common_error::ext::ErrorExt;
1178    use common_error::status_code::StatusCode;
1179    use datatypes::prelude::ConcreteDataType;
1180    use datatypes::schema::ColumnDefaultConstraint;
1181    use mito_codec::test_util::i64_value;
1182    use store_api::metadata::RegionMetadataBuilder;
1183    use tokio::sync::oneshot;
1184
1185    use super::*;
1186    use crate::error::Error;
1187    use crate::test_util::ts_ms_value;
1188
1189    fn new_column_schema(
1190        name: &str,
1191        data_type: ColumnDataType,
1192        semantic_type: SemanticType,
1193    ) -> ColumnSchema {
1194        ColumnSchema {
1195            column_name: name.to_string(),
1196            datatype: data_type as i32,
1197            semantic_type: semantic_type as i32,
1198            ..Default::default()
1199        }
1200    }
1201
1202    fn check_invalid_request(err: &Error, expect: &str) {
1203        if let Error::InvalidRequest {
1204            region_id: _,
1205            reason,
1206            location: _,
1207        } = err
1208        {
1209            assert_eq!(reason, expect);
1210        } else {
1211            panic!("Unexpected error {err}")
1212        }
1213    }
1214
1215    #[test]
1216    fn test_write_request_duplicate_column() {
1217        let rows = Rows {
1218            schema: vec![
1219                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1220                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1221            ],
1222            rows: vec![],
1223        };
1224
1225        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1226        check_invalid_request(&err, "duplicate column c0");
1227    }
1228
1229    #[test]
1230    fn test_valid_write_request() {
1231        let rows = Rows {
1232            schema: vec![
1233                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1234                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1235            ],
1236            rows: vec![Row {
1237                values: vec![i64_value(1), i64_value(2)],
1238            }],
1239        };
1240
1241        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1242        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1243        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1244        assert_eq!(None, request.column_index_by_name("c2"));
1245    }
1246
1247    #[test]
1248    fn test_compaction_cancelled_sends_cancelled_error() {
1249        let (tx, rx) = oneshot::channel();
1250        let request = CompactionCancelled {
1251            region_id: RegionId::new(1, 1),
1252            senders: vec![OutputTx::new(tx)],
1253        };
1254
1255        request.on_success();
1256
1257        let err = rx.blocking_recv().unwrap().unwrap_err();
1258        assert!(matches!(err, Error::CompactionCancelled { .. }));
1259        assert_eq!(err.status_code(), StatusCode::Cancelled);
1260    }
1261
1262    #[test]
1263    fn test_write_request_column_num() {
1264        let rows = Rows {
1265            schema: vec![
1266                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1267                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1268            ],
1269            rows: vec![Row {
1270                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1271            }],
1272        };
1273
1274        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1275        check_invalid_request(&err, "row has 3 columns but schema has 2");
1276    }
1277
1278    fn new_region_metadata() -> RegionMetadata {
1279        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1280        builder
1281            .push_column_metadata(ColumnMetadata {
1282                column_schema: datatypes::schema::ColumnSchema::new(
1283                    "ts",
1284                    ConcreteDataType::timestamp_millisecond_datatype(),
1285                    false,
1286                ),
1287                semantic_type: SemanticType::Timestamp,
1288                column_id: 1,
1289            })
1290            .push_column_metadata(ColumnMetadata {
1291                column_schema: datatypes::schema::ColumnSchema::new(
1292                    "k0",
1293                    ConcreteDataType::int64_datatype(),
1294                    true,
1295                ),
1296                semantic_type: SemanticType::Tag,
1297                column_id: 2,
1298            })
1299            .primary_key(vec![2]);
1300        builder.build().unwrap()
1301    }
1302
1303    #[test]
1304    fn test_check_schema() {
1305        let rows = Rows {
1306            schema: vec![
1307                new_column_schema(
1308                    "ts",
1309                    ColumnDataType::TimestampMillisecond,
1310                    SemanticType::Timestamp,
1311                ),
1312                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1313            ],
1314            rows: vec![Row {
1315                values: vec![ts_ms_value(1), i64_value(2)],
1316            }],
1317        };
1318        let metadata = new_region_metadata();
1319
1320        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1321        request.check_schema(&metadata).unwrap();
1322    }
1323
1324    #[test]
1325    fn test_column_type() {
1326        let rows = Rows {
1327            schema: vec![
1328                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1329                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1330            ],
1331            rows: vec![Row {
1332                values: vec![i64_value(1), i64_value(2)],
1333            }],
1334        };
1335        let metadata = new_region_metadata();
1336
1337        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1338        let err = request.check_schema(&metadata).unwrap_err();
1339        check_invalid_request(
1340            &err,
1341            "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1342        );
1343    }
1344
1345    #[test]
1346    fn test_semantic_type() {
1347        let rows = Rows {
1348            schema: vec![
1349                new_column_schema(
1350                    "ts",
1351                    ColumnDataType::TimestampMillisecond,
1352                    SemanticType::Tag,
1353                ),
1354                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1355            ],
1356            rows: vec![Row {
1357                values: vec![ts_ms_value(1), i64_value(2)],
1358            }],
1359        };
1360        let metadata = new_region_metadata();
1361
1362        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1363        let err = request.check_schema(&metadata).unwrap_err();
1364        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1365    }
1366
1367    #[test]
1368    fn test_column_nullable() {
1369        let rows = Rows {
1370            schema: vec![
1371                new_column_schema(
1372                    "ts",
1373                    ColumnDataType::TimestampMillisecond,
1374                    SemanticType::Timestamp,
1375                ),
1376                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1377            ],
1378            rows: vec![Row {
1379                values: vec![Value { value_data: None }, i64_value(2)],
1380            }],
1381        };
1382        let metadata = new_region_metadata();
1383
1384        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1385        let err = request.check_schema(&metadata).unwrap_err();
1386        check_invalid_request(&err, "column ts is not null but input has null");
1387    }
1388
1389    #[test]
1390    fn test_column_default() {
1391        let rows = Rows {
1392            schema: vec![new_column_schema(
1393                "k0",
1394                ColumnDataType::Int64,
1395                SemanticType::Tag,
1396            )],
1397            rows: vec![Row {
1398                values: vec![i64_value(1)],
1399            }],
1400        };
1401        let metadata = new_region_metadata();
1402
1403        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1404        let err = request.check_schema(&metadata).unwrap_err();
1405        check_invalid_request(&err, "missing column ts");
1406    }
1407
1408    #[test]
1409    fn test_unknown_column() {
1410        let rows = Rows {
1411            schema: vec![
1412                new_column_schema(
1413                    "ts",
1414                    ColumnDataType::TimestampMillisecond,
1415                    SemanticType::Timestamp,
1416                ),
1417                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1418                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1419            ],
1420            rows: vec![Row {
1421                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1422            }],
1423        };
1424        let metadata = new_region_metadata();
1425
1426        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1427        let err = request.check_schema(&metadata).unwrap_err();
1428        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1429    }
1430
1431    #[test]
1432    fn test_fill_impure_columns_err() {
1433        let rows = Rows {
1434            schema: vec![new_column_schema(
1435                "k0",
1436                ColumnDataType::Int64,
1437                SemanticType::Tag,
1438            )],
1439            rows: vec![Row {
1440                values: vec![i64_value(1)],
1441            }],
1442        };
1443        let metadata = {
1444            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1445            builder
1446                .push_column_metadata(ColumnMetadata {
1447                    column_schema: datatypes::schema::ColumnSchema::new(
1448                        "ts",
1449                        ConcreteDataType::timestamp_millisecond_datatype(),
1450                        false,
1451                    )
1452                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1453                        "now()".to_string(),
1454                    )))
1455                    .unwrap(),
1456                    semantic_type: SemanticType::Timestamp,
1457                    column_id: 1,
1458                })
1459                .push_column_metadata(ColumnMetadata {
1460                    column_schema: datatypes::schema::ColumnSchema::new(
1461                        "k0",
1462                        ConcreteDataType::int64_datatype(),
1463                        true,
1464                    ),
1465                    semantic_type: SemanticType::Tag,
1466                    column_id: 2,
1467                })
1468                .primary_key(vec![2]);
1469            builder.build().unwrap()
1470        };
1471
1472        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1473        let err = request.check_schema(&metadata).unwrap_err();
1474        assert!(err.is_fill_default());
1475        assert!(
1476            request
1477                .fill_missing_columns(&metadata)
1478                .unwrap_err()
1479                .to_string()
1480                .contains("unexpected impure default value with region_id")
1481        );
1482    }
1483
1484    #[test]
1485    fn test_fill_missing_columns() {
1486        let rows = Rows {
1487            schema: vec![new_column_schema(
1488                "ts",
1489                ColumnDataType::TimestampMillisecond,
1490                SemanticType::Timestamp,
1491            )],
1492            rows: vec![Row {
1493                values: vec![ts_ms_value(1)],
1494            }],
1495        };
1496        let metadata = new_region_metadata();
1497
1498        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1499        let err = request.check_schema(&metadata).unwrap_err();
1500        assert!(err.is_fill_default());
1501        request.fill_missing_columns(&metadata).unwrap();
1502
1503        let expect_rows = Rows {
1504            schema: vec![new_column_schema(
1505                "ts",
1506                ColumnDataType::TimestampMillisecond,
1507                SemanticType::Timestamp,
1508            )],
1509            rows: vec![Row {
1510                values: vec![ts_ms_value(1)],
1511            }],
1512        };
1513        assert_eq!(expect_rows, request.rows);
1514    }
1515
1516    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1517        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1518        builder
1519            .push_column_metadata(ColumnMetadata {
1520                column_schema: datatypes::schema::ColumnSchema::new(
1521                    "ts",
1522                    ConcreteDataType::timestamp_millisecond_datatype(),
1523                    false,
1524                ),
1525                semantic_type: SemanticType::Timestamp,
1526                column_id: 1,
1527            })
1528            .push_column_metadata(ColumnMetadata {
1529                column_schema: datatypes::schema::ColumnSchema::new(
1530                    "k0",
1531                    ConcreteDataType::int64_datatype(),
1532                    true,
1533                ),
1534                semantic_type: SemanticType::Tag,
1535                column_id: 2,
1536            })
1537            .primary_key(vec![2]);
1538        builder
1539    }
1540
1541    fn region_metadata_two_fields() -> RegionMetadata {
1542        let mut builder = builder_with_ts_tag();
1543        builder
1544            .push_column_metadata(ColumnMetadata {
1545                column_schema: datatypes::schema::ColumnSchema::new(
1546                    "f0",
1547                    ConcreteDataType::int64_datatype(),
1548                    true,
1549                ),
1550                semantic_type: SemanticType::Field,
1551                column_id: 3,
1552            })
1553            // Column is not nullable.
1554            .push_column_metadata(ColumnMetadata {
1555                column_schema: datatypes::schema::ColumnSchema::new(
1556                    "f1",
1557                    ConcreteDataType::int64_datatype(),
1558                    false,
1559                )
1560                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1561                    datatypes::value::Value::Int64(100),
1562                )))
1563                .unwrap(),
1564                semantic_type: SemanticType::Field,
1565                column_id: 4,
1566            });
1567        builder.build().unwrap()
1568    }
1569
1570    #[test]
1571    fn test_fill_missing_for_delete() {
1572        let rows = Rows {
1573            schema: vec![new_column_schema(
1574                "ts",
1575                ColumnDataType::TimestampMillisecond,
1576                SemanticType::Timestamp,
1577            )],
1578            rows: vec![Row {
1579                values: vec![ts_ms_value(1)],
1580            }],
1581        };
1582        let metadata = region_metadata_two_fields();
1583
1584        let mut request =
1585            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1586        let err = request.check_schema(&metadata).unwrap_err();
1587        check_invalid_request(&err, "delete requests need column k0");
1588        let err = request.fill_missing_columns(&metadata).unwrap_err();
1589        check_invalid_request(&err, "delete requests need column k0");
1590
1591        let rows = Rows {
1592            schema: vec![
1593                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1594                new_column_schema(
1595                    "ts",
1596                    ColumnDataType::TimestampMillisecond,
1597                    SemanticType::Timestamp,
1598                ),
1599            ],
1600            rows: vec![Row {
1601                values: vec![i64_value(100), ts_ms_value(1)],
1602            }],
1603        };
1604        let mut request =
1605            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1606        let err = request.check_schema(&metadata).unwrap_err();
1607        assert!(err.is_fill_default());
1608        request.fill_missing_columns(&metadata).unwrap();
1609
1610        let expect_rows = Rows {
1611            schema: vec![
1612                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1613                new_column_schema(
1614                    "ts",
1615                    ColumnDataType::TimestampMillisecond,
1616                    SemanticType::Timestamp,
1617                ),
1618                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1619            ],
1620            // Column f1 is not nullable and we use 0 for padding.
1621            rows: vec![Row {
1622                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1623            }],
1624        };
1625        assert_eq!(expect_rows, request.rows);
1626    }
1627
1628    #[test]
1629    fn test_fill_missing_without_default_in_delete() {
1630        let mut builder = builder_with_ts_tag();
1631        builder
1632            // f0 is nullable.
1633            .push_column_metadata(ColumnMetadata {
1634                column_schema: datatypes::schema::ColumnSchema::new(
1635                    "f0",
1636                    ConcreteDataType::int64_datatype(),
1637                    true,
1638                ),
1639                semantic_type: SemanticType::Field,
1640                column_id: 3,
1641            })
1642            // f1 is not nullable and don't has default.
1643            .push_column_metadata(ColumnMetadata {
1644                column_schema: datatypes::schema::ColumnSchema::new(
1645                    "f1",
1646                    ConcreteDataType::int64_datatype(),
1647                    false,
1648                ),
1649                semantic_type: SemanticType::Field,
1650                column_id: 4,
1651            });
1652        let metadata = builder.build().unwrap();
1653
1654        let rows = Rows {
1655            schema: vec![
1656                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1657                new_column_schema(
1658                    "ts",
1659                    ColumnDataType::TimestampMillisecond,
1660                    SemanticType::Timestamp,
1661                ),
1662            ],
1663            // Missing f0 (nullable), f1 (not nullable).
1664            rows: vec![Row {
1665                values: vec![i64_value(100), ts_ms_value(1)],
1666            }],
1667        };
1668        let mut request =
1669            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1670        let err = request.check_schema(&metadata).unwrap_err();
1671        assert!(err.is_fill_default());
1672        request.fill_missing_columns(&metadata).unwrap();
1673
1674        let expect_rows = Rows {
1675            schema: vec![
1676                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1677                new_column_schema(
1678                    "ts",
1679                    ColumnDataType::TimestampMillisecond,
1680                    SemanticType::Timestamp,
1681                ),
1682                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1683            ],
1684            // Column f1 is not nullable and we use 0 for padding.
1685            rows: vec![Row {
1686                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1687            }],
1688        };
1689        assert_eq!(expect_rows, request.rows);
1690    }
1691
1692    #[test]
1693    fn test_no_default() {
1694        let rows = Rows {
1695            schema: vec![new_column_schema(
1696                "k0",
1697                ColumnDataType::Int64,
1698                SemanticType::Tag,
1699            )],
1700            rows: vec![Row {
1701                values: vec![i64_value(1)],
1702            }],
1703        };
1704        let metadata = new_region_metadata();
1705
1706        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1707        let err = request.fill_missing_columns(&metadata).unwrap_err();
1708        check_invalid_request(&err, "column ts does not have default value");
1709    }
1710
1711    #[test]
1712    fn test_missing_and_invalid() {
1713        // Missing f0 and f1 has invalid type (string).
1714        let rows = Rows {
1715            schema: vec![
1716                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1717                new_column_schema(
1718                    "ts",
1719                    ColumnDataType::TimestampMillisecond,
1720                    SemanticType::Timestamp,
1721                ),
1722                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1723            ],
1724            rows: vec![Row {
1725                values: vec![
1726                    i64_value(100),
1727                    ts_ms_value(1),
1728                    Value {
1729                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1730                    },
1731                ],
1732            }],
1733        };
1734        let metadata = region_metadata_two_fields();
1735
1736        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1737        let err = request.check_schema(&metadata).unwrap_err();
1738        check_invalid_request(
1739            &err,
1740            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1741        );
1742    }
1743
1744    #[test]
1745    fn test_write_request_metadata() {
1746        let rows = Rows {
1747            schema: vec![
1748                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1749                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1750            ],
1751            rows: vec![Row {
1752                values: vec![i64_value(1), i64_value(2)],
1753            }],
1754        };
1755
1756        let metadata = Arc::new(new_region_metadata());
1757        let request = WriteRequest::new(
1758            RegionId::new(1, 1),
1759            OpType::Put,
1760            rows,
1761            Some(metadata.clone()),
1762        )
1763        .unwrap();
1764
1765        assert!(request.region_metadata.is_some());
1766        assert_eq!(
1767            request.region_metadata.unwrap().region_id,
1768            RegionId::new(1, 1)
1769        );
1770    }
1771}