mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38    AffectedRows, EnterStagingRequest, RegionAlterRequest, RegionBuildIndexRequest,
39    RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest,
40    RegionCreateRequest, RegionFlushRequest, RegionOpenRequest, RegionRequest,
41    RegionTruncateRequest,
42};
43use store_api::storage::{FileId, RegionId};
44use tokio::sync::oneshot::{self, Receiver, Sender};
45
46use crate::error::{
47    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
48    FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu, MissingPartitionExprSnafu,
49    Result, UnexpectedSnafu,
50};
51use crate::manifest::action::{RegionEdit, RegionManifest, TruncateKind};
52use crate::memtable::MemtableId;
53use crate::memtable::bulk::part::BulkPart;
54use crate::metrics::COMPACTION_ELAPSED_TOTAL;
55use crate::region::options::RegionOptions;
56use crate::sst::file::FileMeta;
57use crate::sst::index::IndexBuildType;
58use crate::wal::EntryId;
59use crate::wal::entry_distributor::WalEntryReceiver;
60
61/// Request to write a region.
62#[derive(Debug)]
63pub struct WriteRequest {
64    /// Region to write.
65    pub region_id: RegionId,
66    /// Type of the write request.
67    pub op_type: OpType,
68    /// Rows to write.
69    pub rows: Rows,
70    /// Map column name to column index in `rows`.
71    pub name_to_index: HashMap<String, usize>,
72    /// Whether each column has null.
73    pub has_null: Vec<bool>,
74    /// Write hint.
75    pub hint: Option<WriteHint>,
76    /// Region metadata on the time of this request is created.
77    pub(crate) region_metadata: Option<RegionMetadataRef>,
78}
79
80impl WriteRequest {
81    /// Creates a new request.
82    ///
83    /// Returns `Err` if `rows` are invalid.
84    pub fn new(
85        region_id: RegionId,
86        op_type: OpType,
87        rows: Rows,
88        region_metadata: Option<RegionMetadataRef>,
89    ) -> Result<WriteRequest> {
90        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
91        for (index, column) in rows.schema.iter().enumerate() {
92            ensure!(
93                name_to_index
94                    .insert(column.column_name.clone(), index)
95                    .is_none(),
96                InvalidRequestSnafu {
97                    region_id,
98                    reason: format!("duplicate column {}", column.column_name),
99                }
100            );
101        }
102
103        let mut has_null = vec![false; rows.schema.len()];
104        for row in &rows.rows {
105            ensure!(
106                row.values.len() == rows.schema.len(),
107                InvalidRequestSnafu {
108                    region_id,
109                    reason: format!(
110                        "row has {} columns but schema has {}",
111                        row.values.len(),
112                        rows.schema.len()
113                    ),
114                }
115            );
116
117            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
118                validate_proto_value(region_id, value, column_schema)?;
119
120                if value.value_data.is_none() {
121                    has_null[i] = true;
122                }
123            }
124        }
125
126        Ok(WriteRequest {
127            region_id,
128            op_type,
129            rows,
130            name_to_index,
131            has_null,
132            hint: None,
133            region_metadata,
134        })
135    }
136
137    /// Sets the write hint.
138    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
139        self.hint = hint;
140        self
141    }
142
143    /// Returns the encoding hint.
144    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
145        infer_primary_key_encoding_from_hint(self.hint.as_ref())
146    }
147
148    /// Returns estimated size of the request.
149    pub(crate) fn estimated_size(&self) -> usize {
150        let row_size = self
151            .rows
152            .rows
153            .first()
154            .map(|row| row.encoded_len())
155            .unwrap_or(0);
156        row_size * self.rows.rows.len()
157    }
158
159    /// Gets column index by name.
160    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
161        self.name_to_index.get(name).copied()
162    }
163
164    /// Checks schema of rows is compatible with schema of the region.
165    ///
166    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
167    /// error.
168    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
169        debug_assert_eq!(self.region_id, metadata.region_id);
170
171        let region_id = self.region_id;
172        // Index all columns in rows.
173        let mut rows_columns: HashMap<_, _> = self
174            .rows
175            .schema
176            .iter()
177            .map(|column| (&column.column_name, column))
178            .collect();
179
180        let mut need_fill_default = false;
181        // Checks all columns in this region.
182        for column in &metadata.column_metadatas {
183            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
184                // Check data type.
185                ensure!(
186                    is_column_type_value_eq(
187                        input_col.datatype,
188                        input_col.datatype_extension.clone(),
189                        &column.column_schema.data_type
190                    ),
191                    InvalidRequestSnafu {
192                        region_id,
193                        reason: format!(
194                            "column {} expect type {:?}, given: {}({})",
195                            column.column_schema.name,
196                            column.column_schema.data_type,
197                            ColumnDataType::try_from(input_col.datatype)
198                                .map(|v| v.as_str_name())
199                                .unwrap_or("Unknown"),
200                            input_col.datatype,
201                        )
202                    }
203                );
204
205                // Check semantic type.
206                ensure!(
207                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
208                    InvalidRequestSnafu {
209                        region_id,
210                        reason: format!(
211                            "column {} has semantic type {:?}, given: {}({})",
212                            column.column_schema.name,
213                            column.semantic_type,
214                            api::v1::SemanticType::try_from(input_col.semantic_type)
215                                .map(|v| v.as_str_name())
216                                .unwrap_or("Unknown"),
217                            input_col.semantic_type
218                        ),
219                    }
220                );
221
222                // Check nullable.
223                // Safety: `rows_columns` ensures this column exists.
224                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
225                ensure!(
226                    !has_null || column.column_schema.is_nullable(),
227                    InvalidRequestSnafu {
228                        region_id,
229                        reason: format!(
230                            "column {} is not null but input has null",
231                            column.column_schema.name
232                        ),
233                    }
234                );
235            } else {
236                // Rows don't have this column.
237                self.check_missing_column(column)?;
238
239                need_fill_default = true;
240            }
241        }
242
243        // Checks all columns in rows exist in the region.
244        if !rows_columns.is_empty() {
245            let names: Vec<_> = rows_columns.into_keys().collect();
246            return InvalidRequestSnafu {
247                region_id,
248                reason: format!("unknown columns: {:?}", names),
249            }
250            .fail();
251        }
252
253        // If we need to fill default values, return a special error.
254        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
255
256        Ok(())
257    }
258
259    /// Tries to fill missing columns.
260    ///
261    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
262    /// values.
263    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
264        debug_assert_eq!(self.region_id, metadata.region_id);
265
266        let mut columns_to_fill = vec![];
267        for column in &metadata.column_metadatas {
268            if !self.name_to_index.contains_key(&column.column_schema.name) {
269                columns_to_fill.push(column);
270            }
271        }
272        self.fill_columns(columns_to_fill)?;
273
274        Ok(())
275    }
276
277    /// Checks the schema and fill missing columns.
278    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
279        if let Err(e) = self.check_schema(metadata) {
280            if e.is_fill_default() {
281                // TODO(yingwen): Add metrics for this case.
282                // We need to fill default value. The write request may be a request
283                // sent before changing the schema.
284                self.fill_missing_columns(metadata)?;
285            } else {
286                return Err(e);
287            }
288        }
289
290        Ok(())
291    }
292
293    /// Fills default value for specific `columns`.
294    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
295        let mut default_values = Vec::with_capacity(columns.len());
296        let mut columns_to_fill = Vec::with_capacity(columns.len());
297        for column in columns {
298            let default_value = self.column_default_value(column)?;
299            if default_value.value_data.is_some() {
300                default_values.push(default_value);
301                columns_to_fill.push(column);
302            }
303        }
304
305        for row in &mut self.rows.rows {
306            row.values.extend(default_values.iter().cloned());
307        }
308
309        for column in columns_to_fill {
310            let (datatype, datatype_ext) =
311                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
312                    .with_context(|_| ConvertColumnDataTypeSnafu {
313                        reason: format!(
314                            "no protobuf type for column {} ({:?})",
315                            column.column_schema.name, column.column_schema.data_type
316                        ),
317                    })?
318                    .to_parts();
319            self.rows.schema.push(ColumnSchema {
320                column_name: column.column_schema.name.clone(),
321                datatype: datatype as i32,
322                semantic_type: column.semantic_type as i32,
323                datatype_extension: datatype_ext,
324                options: options_from_column_schema(&column.column_schema),
325            });
326        }
327
328        Ok(())
329    }
330
331    /// Checks whether we should allow a row doesn't provide this column.
332    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
333        if self.op_type == OpType::Delete {
334            if column.semantic_type == SemanticType::Field {
335                // For delete request, all tags and timestamp is required. We don't fill default
336                // tag or timestamp while deleting rows.
337                return Ok(());
338            } else {
339                return InvalidRequestSnafu {
340                    region_id: self.region_id,
341                    reason: format!("delete requests need column {}", column.column_schema.name),
342                }
343                .fail();
344            }
345        }
346
347        // Not a delete request. Checks whether they have default value.
348        ensure!(
349            column.column_schema.is_nullable()
350                || column.column_schema.default_constraint().is_some(),
351            InvalidRequestSnafu {
352                region_id: self.region_id,
353                reason: format!("missing column {}", column.column_schema.name),
354            }
355        );
356
357        Ok(())
358    }
359
360    /// Returns the default value for specific column.
361    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
362        let default_value = match self.op_type {
363            OpType::Delete => {
364                ensure!(
365                    column.semantic_type == SemanticType::Field,
366                    InvalidRequestSnafu {
367                        region_id: self.region_id,
368                        reason: format!(
369                            "delete requests need column {}",
370                            column.column_schema.name
371                        ),
372                    }
373                );
374
375                // For delete request, we need a default value for padding so we
376                // can delete a row even a field doesn't have a default value. So the
377                // value doesn't need to following the default value constraint of the
378                // column.
379                if column.column_schema.is_nullable() {
380                    datatypes::value::Value::Null
381                } else {
382                    column.column_schema.data_type.default_value()
383                }
384            }
385            OpType::Put => {
386                // For put requests, we use the default value from column schema.
387                if column.column_schema.is_default_impure() {
388                    UnexpectedSnafu {
389                        reason: format!(
390                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
391                            self.region_id,
392                            column.column_schema.name,
393                            column.column_schema.default_constraint(),
394                        ),
395                    }
396                    .fail()?
397                }
398                column
399                    .column_schema
400                    .create_default()
401                    .context(CreateDefaultSnafu {
402                        region_id: self.region_id,
403                        column: &column.column_schema.name,
404                    })?
405                    // This column doesn't have default value.
406                    .with_context(|| InvalidRequestSnafu {
407                        region_id: self.region_id,
408                        reason: format!(
409                            "column {} does not have default value",
410                            column.column_schema.name
411                        ),
412                    })?
413            }
414        };
415
416        // Convert default value into proto's value.
417        Ok(api::helper::to_grpc_value(default_value))
418    }
419}
420
421/// Validate proto value schema.
422pub(crate) fn validate_proto_value(
423    region_id: RegionId,
424    value: &Value,
425    column_schema: &ColumnSchema,
426) -> Result<()> {
427    if let Some(value_type) = proto_value_type(value) {
428        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
429            InvalidRequestSnafu {
430                region_id,
431                reason: format!(
432                    "column {} has unknown type {}",
433                    column_schema.column_name, column_schema.datatype
434                ),
435            }
436            .build()
437        })?;
438        ensure!(
439            proto_value_type_match(column_type, value_type),
440            InvalidRequestSnafu {
441                region_id,
442                reason: format!(
443                    "value has type {:?}, but column {} has type {:?}({})",
444                    value_type, column_schema.column_name, column_type, column_schema.datatype,
445                ),
446            }
447        );
448    }
449
450    Ok(())
451}
452
453fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
454    match (column_type, value_type) {
455        (ct, vt) if ct == vt => true,
456        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
457        (ColumnDataType::Json, ColumnDataType::Binary) => true,
458        _ => false,
459    }
460}
461
462/// Oneshot output result sender.
463#[derive(Debug)]
464pub struct OutputTx(Sender<Result<AffectedRows>>);
465
466impl OutputTx {
467    /// Creates a new output sender.
468    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
469        OutputTx(sender)
470    }
471
472    /// Sends the `result`.
473    pub(crate) fn send(self, result: Result<AffectedRows>) {
474        // Ignores send result.
475        let _ = self.0.send(result);
476    }
477}
478
479/// Optional output result sender.
480#[derive(Debug)]
481pub(crate) struct OptionOutputTx(Option<OutputTx>);
482
483impl OptionOutputTx {
484    /// Creates a sender.
485    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
486        OptionOutputTx(sender)
487    }
488
489    /// Creates an empty sender.
490    pub(crate) fn none() -> OptionOutputTx {
491        OptionOutputTx(None)
492    }
493
494    /// Sends the `result` and consumes the inner sender.
495    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
496        if let Some(sender) = self.0.take() {
497            sender.send(result);
498        }
499    }
500
501    /// Sends the `result` and consumes the sender.
502    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
503        if let Some(sender) = self.0.take() {
504            sender.send(result);
505        }
506    }
507
508    /// Takes the inner sender.
509    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
510        self.0.take()
511    }
512}
513
514impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
515    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
516        Self::new(Some(OutputTx::new(sender)))
517    }
518}
519
520impl OnFailure for OptionOutputTx {
521    fn on_failure(&mut self, err: Error) {
522        self.send_mut(Err(err));
523    }
524}
525
526/// Callback on failure.
527pub(crate) trait OnFailure {
528    /// Handles `err` on failure.
529    fn on_failure(&mut self, err: Error);
530}
531
532/// Sender and write request.
533#[derive(Debug)]
534pub(crate) struct SenderWriteRequest {
535    /// Result sender.
536    pub(crate) sender: OptionOutputTx,
537    pub(crate) request: WriteRequest,
538}
539
540pub(crate) struct SenderBulkRequest {
541    pub(crate) sender: OptionOutputTx,
542    pub(crate) region_id: RegionId,
543    pub(crate) request: BulkPart,
544    pub(crate) region_metadata: RegionMetadataRef,
545}
546
547/// Request sent to a worker with timestamp
548#[derive(Debug)]
549pub(crate) struct WorkerRequestWithTime {
550    pub(crate) request: WorkerRequest,
551    pub(crate) created_at: Instant,
552}
553
554impl WorkerRequestWithTime {
555    pub(crate) fn new(request: WorkerRequest) -> Self {
556        Self {
557            request,
558            created_at: Instant::now(),
559        }
560    }
561}
562
563/// Request sent to a worker
564#[derive(Debug)]
565pub(crate) enum WorkerRequest {
566    /// Write to a region.
567    Write(SenderWriteRequest),
568
569    /// Ddl request to a region.
570    Ddl(SenderDdlRequest),
571
572    /// Notifications from internal background jobs.
573    Background {
574        /// Id of the region to send.
575        region_id: RegionId,
576        /// Internal notification.
577        notify: BackgroundNotify,
578    },
579
580    /// The internal commands.
581    SetRegionRoleStateGracefully {
582        /// Id of the region to send.
583        region_id: RegionId,
584        /// The [SettableRegionRoleState].
585        region_role_state: SettableRegionRoleState,
586        /// The sender of [SetReadonlyResponse].
587        sender: Sender<SetRegionRoleStateResponse>,
588    },
589
590    /// Notify a worker to stop.
591    Stop,
592
593    /// Use [RegionEdit] to edit a region directly.
594    EditRegion(RegionEditRequest),
595
596    /// Keep the manifest of a region up to date.
597    SyncRegion(RegionSyncRequest),
598
599    /// Bulk inserts request and region metadata.
600    BulkInserts {
601        metadata: Option<RegionMetadataRef>,
602        request: RegionBulkInsertsRequest,
603        sender: OptionOutputTx,
604    },
605
606    /// Remap manifests request.
607    RemapManifests(RemapManifestsRequest),
608}
609
610impl WorkerRequest {
611    /// Creates a new open region request.
612    pub(crate) fn new_open_region_request(
613        region_id: RegionId,
614        request: RegionOpenRequest,
615        entry_receiver: Option<WalEntryReceiver>,
616    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
617        let (sender, receiver) = oneshot::channel();
618
619        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
620            region_id,
621            sender: sender.into(),
622            request: DdlRequest::Open((request, entry_receiver)),
623        });
624
625        (worker_request, receiver)
626    }
627
628    /// Creates a new catchup region request.
629    pub(crate) fn new_catchup_region_request(
630        region_id: RegionId,
631        request: RegionCatchupRequest,
632        entry_receiver: Option<WalEntryReceiver>,
633    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
634        let (sender, receiver) = oneshot::channel();
635        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
636            region_id,
637            sender: sender.into(),
638            request: DdlRequest::Catchup((request, entry_receiver)),
639        });
640        (worker_request, receiver)
641    }
642
643    /// Converts request from a [RegionRequest].
644    pub(crate) fn try_from_region_request(
645        region_id: RegionId,
646        value: RegionRequest,
647        region_metadata: Option<RegionMetadataRef>,
648    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
649        let (sender, receiver) = oneshot::channel();
650        let worker_request = match value {
651            RegionRequest::Put(v) => {
652                let mut write_request =
653                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
654                        .with_hint(v.hint);
655                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
656                    && let Some(region_metadata) = &region_metadata
657                {
658                    write_request.maybe_fill_missing_columns(region_metadata)?;
659                }
660                WorkerRequest::Write(SenderWriteRequest {
661                    sender: sender.into(),
662                    request: write_request,
663                })
664            }
665            RegionRequest::Delete(v) => {
666                let mut write_request =
667                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
668                        .with_hint(v.hint);
669                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
670                    && let Some(region_metadata) = &region_metadata
671                {
672                    write_request.maybe_fill_missing_columns(region_metadata)?;
673                }
674                WorkerRequest::Write(SenderWriteRequest {
675                    sender: sender.into(),
676                    request: write_request,
677                })
678            }
679            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
680                region_id,
681                sender: sender.into(),
682                request: DdlRequest::Create(v),
683            }),
684            RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
685                region_id,
686                sender: sender.into(),
687                request: DdlRequest::Drop,
688            }),
689            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
690                region_id,
691                sender: sender.into(),
692                request: DdlRequest::Open((v, None)),
693            }),
694            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
695                region_id,
696                sender: sender.into(),
697                request: DdlRequest::Close(v),
698            }),
699            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
700                region_id,
701                sender: sender.into(),
702                request: DdlRequest::Alter(v),
703            }),
704            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
705                region_id,
706                sender: sender.into(),
707                request: DdlRequest::Flush(v),
708            }),
709            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
710                region_id,
711                sender: sender.into(),
712                request: DdlRequest::Compact(v),
713            }),
714            RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
715                region_id,
716                sender: sender.into(),
717                request: DdlRequest::BuildIndex(v),
718            }),
719            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
720                region_id,
721                sender: sender.into(),
722                request: DdlRequest::Truncate(v),
723            }),
724            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
725                region_id,
726                sender: sender.into(),
727                request: DdlRequest::Catchup((v, None)),
728            }),
729            RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
730                region_id,
731                sender: sender.into(),
732                request: DdlRequest::EnterStaging(v),
733            }),
734            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
735                metadata: region_metadata,
736                sender: sender.into(),
737                request: region_bulk_inserts_request,
738            },
739        };
740
741        Ok((worker_request, receiver))
742    }
743
744    pub(crate) fn new_set_readonly_gracefully(
745        region_id: RegionId,
746        region_role_state: SettableRegionRoleState,
747    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
748        let (sender, receiver) = oneshot::channel();
749
750        (
751            WorkerRequest::SetRegionRoleStateGracefully {
752                region_id,
753                region_role_state,
754                sender,
755            },
756            receiver,
757        )
758    }
759
760    pub(crate) fn new_sync_region_request(
761        region_id: RegionId,
762        manifest_version: ManifestVersion,
763    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
764        let (sender, receiver) = oneshot::channel();
765        (
766            WorkerRequest::SyncRegion(RegionSyncRequest {
767                region_id,
768                manifest_version,
769                sender,
770            }),
771            receiver,
772        )
773    }
774
775    /// Converts [RemapManifestsRequest] from a [RemapManifestsRequest](store_api::region_engine::RemapManifestsRequest).
776    ///
777    /// # Errors
778    ///
779    /// Returns an error if the partition expression is invalid or missing.
780    /// Returns an error if the new partition expressions are not found for some regions.
781    #[allow(clippy::type_complexity)]
782    pub(crate) fn try_from_remap_manifests_request(
783        store_api::region_engine::RemapManifestsRequest {
784            region_id,
785            input_regions,
786            region_mapping,
787            new_partition_exprs,
788        }: store_api::region_engine::RemapManifestsRequest,
789    ) -> Result<(
790        WorkerRequest,
791        Receiver<Result<HashMap<RegionId, RegionManifest>>>,
792    )> {
793        let (sender, receiver) = oneshot::channel();
794        let new_partition_exprs = new_partition_exprs
795            .into_iter()
796            .map(|(k, v)| {
797                Ok((
798                    k,
799                    PartitionExpr::from_json_str(&v)
800                        .context(InvalidPartitionExprSnafu { expr: v })?
801                        .context(MissingPartitionExprSnafu { region_id: k })?,
802                ))
803            })
804            .collect::<Result<HashMap<_, _>>>()?;
805
806        let request = RemapManifestsRequest {
807            region_id,
808            input_regions,
809            region_mapping,
810            new_partition_exprs,
811            sender,
812        };
813
814        Ok((WorkerRequest::RemapManifests(request), receiver))
815    }
816}
817
818/// DDL request to a region.
819#[derive(Debug)]
820pub(crate) enum DdlRequest {
821    Create(RegionCreateRequest),
822    Drop,
823    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
824    Close(RegionCloseRequest),
825    Alter(RegionAlterRequest),
826    Flush(RegionFlushRequest),
827    Compact(RegionCompactRequest),
828    BuildIndex(RegionBuildIndexRequest),
829    Truncate(RegionTruncateRequest),
830    Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
831    EnterStaging(EnterStagingRequest),
832}
833
834/// Sender and Ddl request.
835#[derive(Debug)]
836pub(crate) struct SenderDdlRequest {
837    /// Region id of the request.
838    pub(crate) region_id: RegionId,
839    /// Result sender.
840    pub(crate) sender: OptionOutputTx,
841    /// Ddl request.
842    pub(crate) request: DdlRequest,
843}
844
845/// Notification from a background job.
846#[derive(Debug)]
847pub(crate) enum BackgroundNotify {
848    /// Flush has finished.
849    FlushFinished(FlushFinished),
850    /// Flush has failed.
851    FlushFailed(FlushFailed),
852    /// Index build has finished.
853    IndexBuildFinished(IndexBuildFinished),
854    /// Index build has been stopped (aborted or succeeded).
855    IndexBuildStopped(IndexBuildStopped),
856    /// Index build has failed.
857    IndexBuildFailed(IndexBuildFailed),
858    /// Compaction has finished.
859    CompactionFinished(CompactionFinished),
860    /// Compaction has failed.
861    CompactionFailed(CompactionFailed),
862    /// Truncate result.
863    Truncate(TruncateResult),
864    /// Region change result.
865    RegionChange(RegionChangeResult),
866    /// Region edit result.
867    RegionEdit(RegionEditResult),
868    /// Enter staging result.
869    EnterStaging(EnterStagingResult),
870}
871
872/// Notifies a flush job is finished.
873#[derive(Debug)]
874pub(crate) struct FlushFinished {
875    /// Region id.
876    pub(crate) region_id: RegionId,
877    /// Entry id of flushed data.
878    pub(crate) flushed_entry_id: EntryId,
879    /// Flush result senders.
880    pub(crate) senders: Vec<OutputTx>,
881    /// Flush timer.
882    pub(crate) _timer: HistogramTimer,
883    /// Region edit to apply.
884    pub(crate) edit: RegionEdit,
885    /// Memtables to remove.
886    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
887    /// Whether the region is in staging mode.
888    pub(crate) is_staging: bool,
889}
890
891impl FlushFinished {
892    /// Marks the flush job as successful and observes the timer.
893    pub(crate) fn on_success(self) {
894        for sender in self.senders {
895            sender.send(Ok(0));
896        }
897    }
898}
899
900impl OnFailure for FlushFinished {
901    fn on_failure(&mut self, err: Error) {
902        let err = Arc::new(err);
903        for sender in self.senders.drain(..) {
904            sender.send(Err(err.clone()).context(FlushRegionSnafu {
905                region_id: self.region_id,
906            }));
907        }
908    }
909}
910
911/// Notifies a flush job is failed.
912#[derive(Debug)]
913pub(crate) struct FlushFailed {
914    /// The error source of the failure.
915    pub(crate) err: Arc<Error>,
916}
917
918#[derive(Debug)]
919pub(crate) struct IndexBuildFinished {
920    #[allow(dead_code)]
921    pub(crate) region_id: RegionId,
922    pub(crate) edit: RegionEdit,
923}
924
925/// Notifies an index build job has been stopped.
926#[derive(Debug)]
927pub(crate) struct IndexBuildStopped {
928    #[allow(dead_code)]
929    pub(crate) region_id: RegionId,
930    pub(crate) file_id: FileId,
931}
932
933/// Notifies an index build job has failed.
934#[derive(Debug)]
935pub(crate) struct IndexBuildFailed {
936    pub(crate) err: Arc<Error>,
937}
938
939/// Notifies a compaction job has finished.
940#[derive(Debug)]
941pub(crate) struct CompactionFinished {
942    /// Region id.
943    pub(crate) region_id: RegionId,
944    /// Compaction result senders.
945    pub(crate) senders: Vec<OutputTx>,
946    /// Start time of compaction task.
947    pub(crate) start_time: Instant,
948    /// Region edit to apply.
949    pub(crate) edit: RegionEdit,
950}
951
952impl CompactionFinished {
953    pub fn on_success(self) {
954        // only update compaction time on success
955        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
956
957        for sender in self.senders {
958            sender.send(Ok(0));
959        }
960        info!("Successfully compacted region: {}", self.region_id);
961    }
962}
963
964impl OnFailure for CompactionFinished {
965    /// Compaction succeeded but failed to update manifest or region's already been dropped.
966    fn on_failure(&mut self, err: Error) {
967        let err = Arc::new(err);
968        for sender in self.senders.drain(..) {
969            sender.send(Err(err.clone()).context(CompactRegionSnafu {
970                region_id: self.region_id,
971            }));
972        }
973    }
974}
975
976/// A failing compaction result.
977#[derive(Debug)]
978pub(crate) struct CompactionFailed {
979    pub(crate) region_id: RegionId,
980    /// The error source of the failure.
981    pub(crate) err: Arc<Error>,
982}
983
984/// Notifies the truncate result of a region.
985#[derive(Debug)]
986pub(crate) struct TruncateResult {
987    /// Region id.
988    pub(crate) region_id: RegionId,
989    /// Result sender.
990    pub(crate) sender: OptionOutputTx,
991    /// Truncate result.
992    pub(crate) result: Result<()>,
993    pub(crate) kind: TruncateKind,
994}
995
996/// Notifies the region the result of writing region change action.
997#[derive(Debug)]
998pub(crate) struct RegionChangeResult {
999    /// Region id.
1000    pub(crate) region_id: RegionId,
1001    /// The new region metadata to apply.
1002    pub(crate) new_meta: RegionMetadataRef,
1003    /// Result sender.
1004    pub(crate) sender: OptionOutputTx,
1005    /// Result from the manifest manager.
1006    pub(crate) result: Result<()>,
1007    /// Used for index build in schema change.
1008    pub(crate) need_index: bool,
1009    /// New options for the region.
1010    pub(crate) new_options: Option<RegionOptions>,
1011}
1012
1013/// Notifies the region the result of entering staging.
1014#[derive(Debug)]
1015pub(crate) struct EnterStagingResult {
1016    /// Region id.
1017    pub(crate) region_id: RegionId,
1018    /// The new partition expression to apply.
1019    pub(crate) partition_expr: String,
1020    /// Result sender.
1021    pub(crate) sender: OptionOutputTx,
1022    /// Result from the manifest manager.
1023    pub(crate) result: Result<()>,
1024}
1025
1026/// Request to edit a region directly.
1027#[derive(Debug)]
1028pub(crate) struct RegionEditRequest {
1029    pub(crate) region_id: RegionId,
1030    pub(crate) edit: RegionEdit,
1031    /// The sender to notify the result to the region engine.
1032    pub(crate) tx: Sender<Result<()>>,
1033}
1034
1035/// Notifies the regin the result of editing region.
1036#[derive(Debug)]
1037pub(crate) struct RegionEditResult {
1038    /// Region id.
1039    pub(crate) region_id: RegionId,
1040    /// Result sender.
1041    pub(crate) sender: Sender<Result<()>>,
1042    /// Region edit to apply.
1043    pub(crate) edit: RegionEdit,
1044    /// Result from the manifest manager.
1045    pub(crate) result: Result<()>,
1046    /// Whether region state need to be set to Writable after handling this request.
1047    pub(crate) update_region_state: bool,
1048}
1049
1050#[derive(Debug)]
1051pub(crate) struct BuildIndexRequest {
1052    pub(crate) region_id: RegionId,
1053    pub(crate) build_type: IndexBuildType,
1054    /// files need to build index, empty means all.
1055    pub(crate) file_metas: Vec<FileMeta>,
1056}
1057
1058#[derive(Debug)]
1059pub(crate) struct RegionSyncRequest {
1060    pub(crate) region_id: RegionId,
1061    pub(crate) manifest_version: ManifestVersion,
1062    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
1063    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1064}
1065
1066#[derive(Debug)]
1067pub(crate) struct RemapManifestsRequest {
1068    /// The [`RegionId`] of a staging region used to obtain table directory and storage configuration for the remap operation.
1069    pub(crate) region_id: RegionId,
1070    /// Regions to remap manifests from.
1071    pub(crate) input_regions: Vec<RegionId>,
1072    /// For each old region, which new regions should receive its files
1073    pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1074    /// New partition expressions for the new regions.
1075    pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1076    /// Result sender.
1077    pub(crate) sender: Sender<Result<HashMap<RegionId, RegionManifest>>>,
1078}
1079
1080#[cfg(test)]
1081mod tests {
1082    use api::v1::value::ValueData;
1083    use api::v1::{Row, SemanticType};
1084    use datatypes::prelude::ConcreteDataType;
1085    use datatypes::schema::ColumnDefaultConstraint;
1086    use mito_codec::test_util::i64_value;
1087    use store_api::metadata::RegionMetadataBuilder;
1088
1089    use super::*;
1090    use crate::error::Error;
1091    use crate::test_util::ts_ms_value;
1092
1093    fn new_column_schema(
1094        name: &str,
1095        data_type: ColumnDataType,
1096        semantic_type: SemanticType,
1097    ) -> ColumnSchema {
1098        ColumnSchema {
1099            column_name: name.to_string(),
1100            datatype: data_type as i32,
1101            semantic_type: semantic_type as i32,
1102            ..Default::default()
1103        }
1104    }
1105
1106    fn check_invalid_request(err: &Error, expect: &str) {
1107        if let Error::InvalidRequest {
1108            region_id: _,
1109            reason,
1110            location: _,
1111        } = err
1112        {
1113            assert_eq!(reason, expect);
1114        } else {
1115            panic!("Unexpected error {err}")
1116        }
1117    }
1118
1119    #[test]
1120    fn test_write_request_duplicate_column() {
1121        let rows = Rows {
1122            schema: vec![
1123                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1124                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1125            ],
1126            rows: vec![],
1127        };
1128
1129        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1130        check_invalid_request(&err, "duplicate column c0");
1131    }
1132
1133    #[test]
1134    fn test_valid_write_request() {
1135        let rows = Rows {
1136            schema: vec![
1137                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1138                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1139            ],
1140            rows: vec![Row {
1141                values: vec![i64_value(1), i64_value(2)],
1142            }],
1143        };
1144
1145        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1146        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1147        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1148        assert_eq!(None, request.column_index_by_name("c2"));
1149    }
1150
1151    #[test]
1152    fn test_write_request_column_num() {
1153        let rows = Rows {
1154            schema: vec![
1155                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1156                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1157            ],
1158            rows: vec![Row {
1159                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1160            }],
1161        };
1162
1163        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1164        check_invalid_request(&err, "row has 3 columns but schema has 2");
1165    }
1166
1167    fn new_region_metadata() -> RegionMetadata {
1168        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1169        builder
1170            .push_column_metadata(ColumnMetadata {
1171                column_schema: datatypes::schema::ColumnSchema::new(
1172                    "ts",
1173                    ConcreteDataType::timestamp_millisecond_datatype(),
1174                    false,
1175                ),
1176                semantic_type: SemanticType::Timestamp,
1177                column_id: 1,
1178            })
1179            .push_column_metadata(ColumnMetadata {
1180                column_schema: datatypes::schema::ColumnSchema::new(
1181                    "k0",
1182                    ConcreteDataType::int64_datatype(),
1183                    true,
1184                ),
1185                semantic_type: SemanticType::Tag,
1186                column_id: 2,
1187            })
1188            .primary_key(vec![2]);
1189        builder.build().unwrap()
1190    }
1191
1192    #[test]
1193    fn test_check_schema() {
1194        let rows = Rows {
1195            schema: vec![
1196                new_column_schema(
1197                    "ts",
1198                    ColumnDataType::TimestampMillisecond,
1199                    SemanticType::Timestamp,
1200                ),
1201                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1202            ],
1203            rows: vec![Row {
1204                values: vec![ts_ms_value(1), i64_value(2)],
1205            }],
1206        };
1207        let metadata = new_region_metadata();
1208
1209        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1210        request.check_schema(&metadata).unwrap();
1211    }
1212
1213    #[test]
1214    fn test_column_type() {
1215        let rows = Rows {
1216            schema: vec![
1217                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1218                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1219            ],
1220            rows: vec![Row {
1221                values: vec![i64_value(1), i64_value(2)],
1222            }],
1223        };
1224        let metadata = new_region_metadata();
1225
1226        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1227        let err = request.check_schema(&metadata).unwrap_err();
1228        check_invalid_request(
1229            &err,
1230            "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1231        );
1232    }
1233
1234    #[test]
1235    fn test_semantic_type() {
1236        let rows = Rows {
1237            schema: vec![
1238                new_column_schema(
1239                    "ts",
1240                    ColumnDataType::TimestampMillisecond,
1241                    SemanticType::Tag,
1242                ),
1243                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1244            ],
1245            rows: vec![Row {
1246                values: vec![ts_ms_value(1), i64_value(2)],
1247            }],
1248        };
1249        let metadata = new_region_metadata();
1250
1251        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1252        let err = request.check_schema(&metadata).unwrap_err();
1253        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1254    }
1255
1256    #[test]
1257    fn test_column_nullable() {
1258        let rows = Rows {
1259            schema: vec![
1260                new_column_schema(
1261                    "ts",
1262                    ColumnDataType::TimestampMillisecond,
1263                    SemanticType::Timestamp,
1264                ),
1265                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1266            ],
1267            rows: vec![Row {
1268                values: vec![Value { value_data: None }, i64_value(2)],
1269            }],
1270        };
1271        let metadata = new_region_metadata();
1272
1273        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1274        let err = request.check_schema(&metadata).unwrap_err();
1275        check_invalid_request(&err, "column ts is not null but input has null");
1276    }
1277
1278    #[test]
1279    fn test_column_default() {
1280        let rows = Rows {
1281            schema: vec![new_column_schema(
1282                "k0",
1283                ColumnDataType::Int64,
1284                SemanticType::Tag,
1285            )],
1286            rows: vec![Row {
1287                values: vec![i64_value(1)],
1288            }],
1289        };
1290        let metadata = new_region_metadata();
1291
1292        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1293        let err = request.check_schema(&metadata).unwrap_err();
1294        check_invalid_request(&err, "missing column ts");
1295    }
1296
1297    #[test]
1298    fn test_unknown_column() {
1299        let rows = Rows {
1300            schema: vec![
1301                new_column_schema(
1302                    "ts",
1303                    ColumnDataType::TimestampMillisecond,
1304                    SemanticType::Timestamp,
1305                ),
1306                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1307                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1308            ],
1309            rows: vec![Row {
1310                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1311            }],
1312        };
1313        let metadata = new_region_metadata();
1314
1315        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1316        let err = request.check_schema(&metadata).unwrap_err();
1317        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1318    }
1319
1320    #[test]
1321    fn test_fill_impure_columns_err() {
1322        let rows = Rows {
1323            schema: vec![new_column_schema(
1324                "k0",
1325                ColumnDataType::Int64,
1326                SemanticType::Tag,
1327            )],
1328            rows: vec![Row {
1329                values: vec![i64_value(1)],
1330            }],
1331        };
1332        let metadata = {
1333            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1334            builder
1335                .push_column_metadata(ColumnMetadata {
1336                    column_schema: datatypes::schema::ColumnSchema::new(
1337                        "ts",
1338                        ConcreteDataType::timestamp_millisecond_datatype(),
1339                        false,
1340                    )
1341                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1342                        "now()".to_string(),
1343                    )))
1344                    .unwrap(),
1345                    semantic_type: SemanticType::Timestamp,
1346                    column_id: 1,
1347                })
1348                .push_column_metadata(ColumnMetadata {
1349                    column_schema: datatypes::schema::ColumnSchema::new(
1350                        "k0",
1351                        ConcreteDataType::int64_datatype(),
1352                        true,
1353                    ),
1354                    semantic_type: SemanticType::Tag,
1355                    column_id: 2,
1356                })
1357                .primary_key(vec![2]);
1358            builder.build().unwrap()
1359        };
1360
1361        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1362        let err = request.check_schema(&metadata).unwrap_err();
1363        assert!(err.is_fill_default());
1364        assert!(
1365            request
1366                .fill_missing_columns(&metadata)
1367                .unwrap_err()
1368                .to_string()
1369                .contains("unexpected impure default value with region_id")
1370        );
1371    }
1372
1373    #[test]
1374    fn test_fill_missing_columns() {
1375        let rows = Rows {
1376            schema: vec![new_column_schema(
1377                "ts",
1378                ColumnDataType::TimestampMillisecond,
1379                SemanticType::Timestamp,
1380            )],
1381            rows: vec![Row {
1382                values: vec![ts_ms_value(1)],
1383            }],
1384        };
1385        let metadata = new_region_metadata();
1386
1387        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1388        let err = request.check_schema(&metadata).unwrap_err();
1389        assert!(err.is_fill_default());
1390        request.fill_missing_columns(&metadata).unwrap();
1391
1392        let expect_rows = Rows {
1393            schema: vec![new_column_schema(
1394                "ts",
1395                ColumnDataType::TimestampMillisecond,
1396                SemanticType::Timestamp,
1397            )],
1398            rows: vec![Row {
1399                values: vec![ts_ms_value(1)],
1400            }],
1401        };
1402        assert_eq!(expect_rows, request.rows);
1403    }
1404
1405    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1406        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1407        builder
1408            .push_column_metadata(ColumnMetadata {
1409                column_schema: datatypes::schema::ColumnSchema::new(
1410                    "ts",
1411                    ConcreteDataType::timestamp_millisecond_datatype(),
1412                    false,
1413                ),
1414                semantic_type: SemanticType::Timestamp,
1415                column_id: 1,
1416            })
1417            .push_column_metadata(ColumnMetadata {
1418                column_schema: datatypes::schema::ColumnSchema::new(
1419                    "k0",
1420                    ConcreteDataType::int64_datatype(),
1421                    true,
1422                ),
1423                semantic_type: SemanticType::Tag,
1424                column_id: 2,
1425            })
1426            .primary_key(vec![2]);
1427        builder
1428    }
1429
1430    fn region_metadata_two_fields() -> RegionMetadata {
1431        let mut builder = builder_with_ts_tag();
1432        builder
1433            .push_column_metadata(ColumnMetadata {
1434                column_schema: datatypes::schema::ColumnSchema::new(
1435                    "f0",
1436                    ConcreteDataType::int64_datatype(),
1437                    true,
1438                ),
1439                semantic_type: SemanticType::Field,
1440                column_id: 3,
1441            })
1442            // Column is not nullable.
1443            .push_column_metadata(ColumnMetadata {
1444                column_schema: datatypes::schema::ColumnSchema::new(
1445                    "f1",
1446                    ConcreteDataType::int64_datatype(),
1447                    false,
1448                )
1449                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1450                    datatypes::value::Value::Int64(100),
1451                )))
1452                .unwrap(),
1453                semantic_type: SemanticType::Field,
1454                column_id: 4,
1455            });
1456        builder.build().unwrap()
1457    }
1458
1459    #[test]
1460    fn test_fill_missing_for_delete() {
1461        let rows = Rows {
1462            schema: vec![new_column_schema(
1463                "ts",
1464                ColumnDataType::TimestampMillisecond,
1465                SemanticType::Timestamp,
1466            )],
1467            rows: vec![Row {
1468                values: vec![ts_ms_value(1)],
1469            }],
1470        };
1471        let metadata = region_metadata_two_fields();
1472
1473        let mut request =
1474            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1475        let err = request.check_schema(&metadata).unwrap_err();
1476        check_invalid_request(&err, "delete requests need column k0");
1477        let err = request.fill_missing_columns(&metadata).unwrap_err();
1478        check_invalid_request(&err, "delete requests need column k0");
1479
1480        let rows = Rows {
1481            schema: vec![
1482                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1483                new_column_schema(
1484                    "ts",
1485                    ColumnDataType::TimestampMillisecond,
1486                    SemanticType::Timestamp,
1487                ),
1488            ],
1489            rows: vec![Row {
1490                values: vec![i64_value(100), ts_ms_value(1)],
1491            }],
1492        };
1493        let mut request =
1494            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1495        let err = request.check_schema(&metadata).unwrap_err();
1496        assert!(err.is_fill_default());
1497        request.fill_missing_columns(&metadata).unwrap();
1498
1499        let expect_rows = Rows {
1500            schema: vec![
1501                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1502                new_column_schema(
1503                    "ts",
1504                    ColumnDataType::TimestampMillisecond,
1505                    SemanticType::Timestamp,
1506                ),
1507                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1508            ],
1509            // Column f1 is not nullable and we use 0 for padding.
1510            rows: vec![Row {
1511                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1512            }],
1513        };
1514        assert_eq!(expect_rows, request.rows);
1515    }
1516
1517    #[test]
1518    fn test_fill_missing_without_default_in_delete() {
1519        let mut builder = builder_with_ts_tag();
1520        builder
1521            // f0 is nullable.
1522            .push_column_metadata(ColumnMetadata {
1523                column_schema: datatypes::schema::ColumnSchema::new(
1524                    "f0",
1525                    ConcreteDataType::int64_datatype(),
1526                    true,
1527                ),
1528                semantic_type: SemanticType::Field,
1529                column_id: 3,
1530            })
1531            // f1 is not nullable and don't has default.
1532            .push_column_metadata(ColumnMetadata {
1533                column_schema: datatypes::schema::ColumnSchema::new(
1534                    "f1",
1535                    ConcreteDataType::int64_datatype(),
1536                    false,
1537                ),
1538                semantic_type: SemanticType::Field,
1539                column_id: 4,
1540            });
1541        let metadata = builder.build().unwrap();
1542
1543        let rows = Rows {
1544            schema: vec![
1545                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1546                new_column_schema(
1547                    "ts",
1548                    ColumnDataType::TimestampMillisecond,
1549                    SemanticType::Timestamp,
1550                ),
1551            ],
1552            // Missing f0 (nullable), f1 (not nullable).
1553            rows: vec![Row {
1554                values: vec![i64_value(100), ts_ms_value(1)],
1555            }],
1556        };
1557        let mut request =
1558            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1559        let err = request.check_schema(&metadata).unwrap_err();
1560        assert!(err.is_fill_default());
1561        request.fill_missing_columns(&metadata).unwrap();
1562
1563        let expect_rows = Rows {
1564            schema: vec![
1565                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1566                new_column_schema(
1567                    "ts",
1568                    ColumnDataType::TimestampMillisecond,
1569                    SemanticType::Timestamp,
1570                ),
1571                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1572            ],
1573            // Column f1 is not nullable and we use 0 for padding.
1574            rows: vec![Row {
1575                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1576            }],
1577        };
1578        assert_eq!(expect_rows, request.rows);
1579    }
1580
1581    #[test]
1582    fn test_no_default() {
1583        let rows = Rows {
1584            schema: vec![new_column_schema(
1585                "k0",
1586                ColumnDataType::Int64,
1587                SemanticType::Tag,
1588            )],
1589            rows: vec![Row {
1590                values: vec![i64_value(1)],
1591            }],
1592        };
1593        let metadata = new_region_metadata();
1594
1595        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1596        let err = request.fill_missing_columns(&metadata).unwrap_err();
1597        check_invalid_request(&err, "column ts does not have default value");
1598    }
1599
1600    #[test]
1601    fn test_missing_and_invalid() {
1602        // Missing f0 and f1 has invalid type (string).
1603        let rows = Rows {
1604            schema: vec![
1605                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1606                new_column_schema(
1607                    "ts",
1608                    ColumnDataType::TimestampMillisecond,
1609                    SemanticType::Timestamp,
1610                ),
1611                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1612            ],
1613            rows: vec![Row {
1614                values: vec![
1615                    i64_value(100),
1616                    ts_ms_value(1),
1617                    Value {
1618                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1619                    },
1620                ],
1621            }],
1622        };
1623        let metadata = region_metadata_two_fields();
1624
1625        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1626        let err = request.check_schema(&metadata).unwrap_err();
1627        check_invalid_request(
1628            &err,
1629            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1630        );
1631    }
1632
1633    #[test]
1634    fn test_write_request_metadata() {
1635        let rows = Rows {
1636            schema: vec![
1637                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1638                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1639            ],
1640            rows: vec![Row {
1641                values: vec![i64_value(1), i64_value(2)],
1642            }],
1643        };
1644
1645        let metadata = Arc::new(new_region_metadata());
1646        let request = WriteRequest::new(
1647            RegionId::new(1, 1),
1648            OpType::Put,
1649            rows,
1650            Some(metadata.clone()),
1651        )
1652        .unwrap();
1653
1654        assert!(request.region_metadata.is_some());
1655        assert_eq!(
1656            request.region_metadata.unwrap().region_id,
1657            RegionId::new(1, 1)
1658        );
1659    }
1660}