mito2/
request.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Worker requests.
16
17use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22    is_column_type_value_eq, is_semantic_type_eq, proto_value_type, to_proto_value,
23    ColumnDataTypeWrapper,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{ensure, OptionExt, ResultExt};
33use store_api::codec::{infer_primary_key_encoding_from_hint, PrimaryKeyEncoding};
34use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
35use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
36use store_api::region_request::{
37    AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
38    RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
39    RegionOpenRequest, RegionRequest, RegionTruncateRequest,
40};
41use store_api::storage::RegionId;
42use store_api::ManifestVersion;
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46    CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47    FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::{RegionEdit, TruncateKind};
50use crate::memtable::bulk::part::BulkPart;
51use crate::memtable::MemtableId;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::wal::entry_distributor::WalEntryReceiver;
54use crate::wal::EntryId;
55
56/// Request to write a region.
57#[derive(Debug)]
58pub struct WriteRequest {
59    /// Region to write.
60    pub region_id: RegionId,
61    /// Type of the write request.
62    pub op_type: OpType,
63    /// Rows to write.
64    pub rows: Rows,
65    /// Map column name to column index in `rows`.
66    pub name_to_index: HashMap<String, usize>,
67    /// Whether each column has null.
68    pub has_null: Vec<bool>,
69    /// Write hint.
70    pub hint: Option<WriteHint>,
71    /// Region metadata on the time of this request is created.
72    pub(crate) region_metadata: Option<RegionMetadataRef>,
73}
74
75impl WriteRequest {
76    /// Creates a new request.
77    ///
78    /// Returns `Err` if `rows` are invalid.
79    pub fn new(
80        region_id: RegionId,
81        op_type: OpType,
82        rows: Rows,
83        region_metadata: Option<RegionMetadataRef>,
84    ) -> Result<WriteRequest> {
85        let mut name_to_index = HashMap::with_capacity(rows.schema.len());
86        for (index, column) in rows.schema.iter().enumerate() {
87            ensure!(
88                name_to_index
89                    .insert(column.column_name.clone(), index)
90                    .is_none(),
91                InvalidRequestSnafu {
92                    region_id,
93                    reason: format!("duplicate column {}", column.column_name),
94                }
95            );
96        }
97
98        let mut has_null = vec![false; rows.schema.len()];
99        for row in &rows.rows {
100            ensure!(
101                row.values.len() == rows.schema.len(),
102                InvalidRequestSnafu {
103                    region_id,
104                    reason: format!(
105                        "row has {} columns but schema has {}",
106                        row.values.len(),
107                        rows.schema.len()
108                    ),
109                }
110            );
111
112            for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
113                validate_proto_value(region_id, value, column_schema)?;
114
115                if value.value_data.is_none() {
116                    has_null[i] = true;
117                }
118            }
119        }
120
121        Ok(WriteRequest {
122            region_id,
123            op_type,
124            rows,
125            name_to_index,
126            has_null,
127            hint: None,
128            region_metadata,
129        })
130    }
131
132    /// Sets the write hint.
133    pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
134        self.hint = hint;
135        self
136    }
137
138    /// Returns the encoding hint.
139    pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
140        infer_primary_key_encoding_from_hint(self.hint.as_ref())
141    }
142
143    /// Returns estimated size of the request.
144    pub(crate) fn estimated_size(&self) -> usize {
145        let row_size = self
146            .rows
147            .rows
148            .first()
149            .map(|row| row.encoded_len())
150            .unwrap_or(0);
151        row_size * self.rows.rows.len()
152    }
153
154    /// Gets column index by name.
155    pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
156        self.name_to_index.get(name).copied()
157    }
158
159    /// Checks schema of rows is compatible with schema of the region.
160    ///
161    /// If column with default value is missing, it returns a special [FillDefault](crate::error::Error::FillDefault)
162    /// error.
163    pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
164        debug_assert_eq!(self.region_id, metadata.region_id);
165
166        let region_id = self.region_id;
167        // Index all columns in rows.
168        let mut rows_columns: HashMap<_, _> = self
169            .rows
170            .schema
171            .iter()
172            .map(|column| (&column.column_name, column))
173            .collect();
174
175        let mut need_fill_default = false;
176        // Checks all columns in this region.
177        for column in &metadata.column_metadatas {
178            if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
179                // Check data type.
180                ensure!(
181                    is_column_type_value_eq(
182                        input_col.datatype,
183                        input_col.datatype_extension,
184                        &column.column_schema.data_type
185                    ),
186                    InvalidRequestSnafu {
187                        region_id,
188                        reason: format!(
189                            "column {} expect type {:?}, given: {}({})",
190                            column.column_schema.name,
191                            column.column_schema.data_type,
192                            ColumnDataType::try_from(input_col.datatype)
193                                .map(|v| v.as_str_name())
194                                .unwrap_or("Unknown"),
195                            input_col.datatype,
196                        )
197                    }
198                );
199
200                // Check semantic type.
201                ensure!(
202                    is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
203                    InvalidRequestSnafu {
204                        region_id,
205                        reason: format!(
206                            "column {} has semantic type {:?}, given: {}({})",
207                            column.column_schema.name,
208                            column.semantic_type,
209                            api::v1::SemanticType::try_from(input_col.semantic_type)
210                                .map(|v| v.as_str_name())
211                                .unwrap_or("Unknown"),
212                            input_col.semantic_type
213                        ),
214                    }
215                );
216
217                // Check nullable.
218                // Safety: `rows_columns` ensures this column exists.
219                let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
220                ensure!(
221                    !has_null || column.column_schema.is_nullable(),
222                    InvalidRequestSnafu {
223                        region_id,
224                        reason: format!(
225                            "column {} is not null but input has null",
226                            column.column_schema.name
227                        ),
228                    }
229                );
230            } else {
231                // Rows don't have this column.
232                self.check_missing_column(column)?;
233
234                need_fill_default = true;
235            }
236        }
237
238        // Checks all columns in rows exist in the region.
239        if !rows_columns.is_empty() {
240            let names: Vec<_> = rows_columns.into_keys().collect();
241            return InvalidRequestSnafu {
242                region_id,
243                reason: format!("unknown columns: {:?}", names),
244            }
245            .fail();
246        }
247
248        // If we need to fill default values, return a special error.
249        ensure!(!need_fill_default, FillDefaultSnafu { region_id });
250
251        Ok(())
252    }
253
254    /// Tries to fill missing columns.
255    ///
256    /// Currently, our protobuf format might be inefficient when we need to fill lots of null
257    /// values.
258    pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
259        debug_assert_eq!(self.region_id, metadata.region_id);
260
261        let mut columns_to_fill = vec![];
262        for column in &metadata.column_metadatas {
263            if !self.name_to_index.contains_key(&column.column_schema.name) {
264                columns_to_fill.push(column);
265            }
266        }
267        self.fill_columns(columns_to_fill)?;
268
269        Ok(())
270    }
271
272    /// Checks the schema and fill missing columns.
273    pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
274        if let Err(e) = self.check_schema(metadata) {
275            if e.is_fill_default() {
276                // TODO(yingwen): Add metrics for this case.
277                // We need to fill default value. The write request may be a request
278                // sent before changing the schema.
279                self.fill_missing_columns(metadata)?;
280            } else {
281                return Err(e);
282            }
283        }
284
285        Ok(())
286    }
287
288    /// Fills default value for specific `columns`.
289    fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
290        let mut default_values = Vec::with_capacity(columns.len());
291        let mut columns_to_fill = Vec::with_capacity(columns.len());
292        for column in columns {
293            let default_value = self.column_default_value(column)?;
294            if default_value.value_data.is_some() {
295                default_values.push(default_value);
296                columns_to_fill.push(column);
297            }
298        }
299
300        for row in &mut self.rows.rows {
301            row.values.extend(default_values.iter().cloned());
302        }
303
304        for column in columns_to_fill {
305            let (datatype, datatype_ext) =
306                ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
307                    .with_context(|_| ConvertColumnDataTypeSnafu {
308                        reason: format!(
309                            "no protobuf type for column {} ({:?})",
310                            column.column_schema.name, column.column_schema.data_type
311                        ),
312                    })?
313                    .to_parts();
314            self.rows.schema.push(ColumnSchema {
315                column_name: column.column_schema.name.clone(),
316                datatype: datatype as i32,
317                semantic_type: column.semantic_type as i32,
318                datatype_extension: datatype_ext,
319                options: options_from_column_schema(&column.column_schema),
320            });
321        }
322
323        Ok(())
324    }
325
326    /// Checks whether we should allow a row doesn't provide this column.
327    fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
328        if self.op_type == OpType::Delete {
329            if column.semantic_type == SemanticType::Field {
330                // For delete request, all tags and timestamp is required. We don't fill default
331                // tag or timestamp while deleting rows.
332                return Ok(());
333            } else {
334                return InvalidRequestSnafu {
335                    region_id: self.region_id,
336                    reason: format!("delete requests need column {}", column.column_schema.name),
337                }
338                .fail();
339            }
340        }
341
342        // Not a delete request. Checks whether they have default value.
343        ensure!(
344            column.column_schema.is_nullable()
345                || column.column_schema.default_constraint().is_some(),
346            InvalidRequestSnafu {
347                region_id: self.region_id,
348                reason: format!("missing column {}", column.column_schema.name),
349            }
350        );
351
352        Ok(())
353    }
354
355    /// Returns the default value for specific column.
356    fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
357        let default_value = match self.op_type {
358            OpType::Delete => {
359                ensure!(
360                    column.semantic_type == SemanticType::Field,
361                    InvalidRequestSnafu {
362                        region_id: self.region_id,
363                        reason: format!(
364                            "delete requests need column {}",
365                            column.column_schema.name
366                        ),
367                    }
368                );
369
370                // For delete request, we need a default value for padding so we
371                // can delete a row even a field doesn't have a default value. So the
372                // value doesn't need to following the default value constraint of the
373                // column.
374                if column.column_schema.is_nullable() {
375                    datatypes::value::Value::Null
376                } else {
377                    column.column_schema.data_type.default_value()
378                }
379            }
380            OpType::Put => {
381                // For put requests, we use the default value from column schema.
382                if column.column_schema.is_default_impure() {
383                    UnexpectedSnafu {
384                        reason: format!(
385                            "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}", 
386                            self.region_id,
387                            column.column_schema.name,
388                            column.column_schema.default_constraint(),
389                        ),
390                    }
391                    .fail()?
392                }
393                column
394                    .column_schema
395                    .create_default()
396                    .context(CreateDefaultSnafu {
397                        region_id: self.region_id,
398                        column: &column.column_schema.name,
399                    })?
400                    // This column doesn't have default value.
401                    .with_context(|| InvalidRequestSnafu {
402                        region_id: self.region_id,
403                        reason: format!(
404                            "column {} does not have default value",
405                            column.column_schema.name
406                        ),
407                    })?
408            }
409        };
410
411        // Convert default value into proto's value.
412        to_proto_value(default_value).with_context(|| InvalidRequestSnafu {
413            region_id: self.region_id,
414            reason: format!(
415                "no protobuf type for default value of column {} ({:?})",
416                column.column_schema.name, column.column_schema.data_type
417            ),
418        })
419    }
420}
421
422/// Validate proto value schema.
423pub(crate) fn validate_proto_value(
424    region_id: RegionId,
425    value: &Value,
426    column_schema: &ColumnSchema,
427) -> Result<()> {
428    if let Some(value_type) = proto_value_type(value) {
429        let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
430            InvalidRequestSnafu {
431                region_id,
432                reason: format!(
433                    "column {} has unknown type {}",
434                    column_schema.column_name, column_schema.datatype
435                ),
436            }
437            .build()
438        })?;
439        ensure!(
440            proto_value_type_match(column_type, value_type),
441            InvalidRequestSnafu {
442                region_id,
443                reason: format!(
444                    "value has type {:?}, but column {} has type {:?}({})",
445                    value_type, column_schema.column_name, column_type, column_schema.datatype,
446                ),
447            }
448        );
449    }
450
451    Ok(())
452}
453
454fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
455    match (column_type, value_type) {
456        (ct, vt) if ct == vt => true,
457        (ColumnDataType::Vector, ColumnDataType::Binary) => true,
458        (ColumnDataType::Json, ColumnDataType::Binary) => true,
459        _ => false,
460    }
461}
462
463/// Oneshot output result sender.
464#[derive(Debug)]
465pub struct OutputTx(Sender<Result<AffectedRows>>);
466
467impl OutputTx {
468    /// Creates a new output sender.
469    pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
470        OutputTx(sender)
471    }
472
473    /// Sends the `result`.
474    pub(crate) fn send(self, result: Result<AffectedRows>) {
475        // Ignores send result.
476        let _ = self.0.send(result);
477    }
478}
479
480/// Optional output result sender.
481#[derive(Debug)]
482pub(crate) struct OptionOutputTx(Option<OutputTx>);
483
484impl OptionOutputTx {
485    /// Creates a sender.
486    pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
487        OptionOutputTx(sender)
488    }
489
490    /// Creates an empty sender.
491    pub(crate) fn none() -> OptionOutputTx {
492        OptionOutputTx(None)
493    }
494
495    /// Sends the `result` and consumes the inner sender.
496    pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
497        if let Some(sender) = self.0.take() {
498            sender.send(result);
499        }
500    }
501
502    /// Sends the `result` and consumes the sender.
503    pub(crate) fn send(mut self, result: Result<AffectedRows>) {
504        if let Some(sender) = self.0.take() {
505            sender.send(result);
506        }
507    }
508
509    /// Takes the inner sender.
510    pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
511        self.0.take()
512    }
513}
514
515impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
516    fn from(sender: Sender<Result<AffectedRows>>) -> Self {
517        Self::new(Some(OutputTx::new(sender)))
518    }
519}
520
521impl OnFailure for OptionOutputTx {
522    fn on_failure(&mut self, err: Error) {
523        self.send_mut(Err(err));
524    }
525}
526
527/// Callback on failure.
528pub(crate) trait OnFailure {
529    /// Handles `err` on failure.
530    fn on_failure(&mut self, err: Error);
531}
532
533/// Sender and write request.
534#[derive(Debug)]
535pub(crate) struct SenderWriteRequest {
536    /// Result sender.
537    pub(crate) sender: OptionOutputTx,
538    pub(crate) request: WriteRequest,
539}
540
541pub(crate) struct SenderBulkRequest {
542    pub(crate) sender: OptionOutputTx,
543    pub(crate) region_id: RegionId,
544    pub(crate) request: BulkPart,
545    pub(crate) region_metadata: RegionMetadataRef,
546}
547
548/// Request sent to a worker with timestamp
549#[derive(Debug)]
550pub(crate) struct WorkerRequestWithTime {
551    pub(crate) request: WorkerRequest,
552    pub(crate) created_at: Instant,
553}
554
555impl WorkerRequestWithTime {
556    pub(crate) fn new(request: WorkerRequest) -> Self {
557        Self {
558            request,
559            created_at: Instant::now(),
560        }
561    }
562}
563
564/// Request sent to a worker
565#[derive(Debug)]
566pub(crate) enum WorkerRequest {
567    /// Write to a region.
568    Write(SenderWriteRequest),
569
570    /// Ddl request to a region.
571    Ddl(SenderDdlRequest),
572
573    /// Notifications from internal background jobs.
574    Background {
575        /// Id of the region to send.
576        region_id: RegionId,
577        /// Internal notification.
578        notify: BackgroundNotify,
579    },
580
581    /// The internal commands.
582    SetRegionRoleStateGracefully {
583        /// Id of the region to send.
584        region_id: RegionId,
585        /// The [SettableRegionRoleState].
586        region_role_state: SettableRegionRoleState,
587        /// The sender of [SetReadonlyResponse].
588        sender: Sender<SetRegionRoleStateResponse>,
589    },
590
591    /// Notify a worker to stop.
592    Stop,
593
594    /// Use [RegionEdit] to edit a region directly.
595    EditRegion(RegionEditRequest),
596
597    /// Keep the manifest of a region up to date.
598    SyncRegion(RegionSyncRequest),
599
600    /// Bulk inserts request and region metadata.
601    BulkInserts {
602        metadata: Option<RegionMetadataRef>,
603        request: RegionBulkInsertsRequest,
604        sender: OptionOutputTx,
605    },
606}
607
608impl WorkerRequest {
609    pub(crate) fn new_open_region_request(
610        region_id: RegionId,
611        request: RegionOpenRequest,
612        entry_receiver: Option<WalEntryReceiver>,
613    ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
614        let (sender, receiver) = oneshot::channel();
615
616        let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
617            region_id,
618            sender: sender.into(),
619            request: DdlRequest::Open((request, entry_receiver)),
620        });
621
622        (worker_request, receiver)
623    }
624
625    /// Converts request from a [RegionRequest].
626    pub(crate) fn try_from_region_request(
627        region_id: RegionId,
628        value: RegionRequest,
629        region_metadata: Option<RegionMetadataRef>,
630    ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
631        let (sender, receiver) = oneshot::channel();
632        let worker_request = match value {
633            RegionRequest::Put(v) => {
634                let mut write_request =
635                    WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
636                        .with_hint(v.hint);
637                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
638                    && let Some(region_metadata) = &region_metadata
639                {
640                    write_request.maybe_fill_missing_columns(region_metadata)?;
641                }
642                WorkerRequest::Write(SenderWriteRequest {
643                    sender: sender.into(),
644                    request: write_request,
645                })
646            }
647            RegionRequest::Delete(v) => {
648                let mut write_request =
649                    WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
650                if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
651                    && let Some(region_metadata) = &region_metadata
652                {
653                    write_request.maybe_fill_missing_columns(region_metadata)?;
654                }
655                WorkerRequest::Write(SenderWriteRequest {
656                    sender: sender.into(),
657                    request: write_request,
658                })
659            }
660            RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
661                region_id,
662                sender: sender.into(),
663                request: DdlRequest::Create(v),
664            }),
665            RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
666                region_id,
667                sender: sender.into(),
668                request: DdlRequest::Drop,
669            }),
670            RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
671                region_id,
672                sender: sender.into(),
673                request: DdlRequest::Open((v, None)),
674            }),
675            RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
676                region_id,
677                sender: sender.into(),
678                request: DdlRequest::Close(v),
679            }),
680            RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
681                region_id,
682                sender: sender.into(),
683                request: DdlRequest::Alter(v),
684            }),
685            RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
686                region_id,
687                sender: sender.into(),
688                request: DdlRequest::Flush(v),
689            }),
690            RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
691                region_id,
692                sender: sender.into(),
693                request: DdlRequest::Compact(v),
694            }),
695            RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
696                region_id,
697                sender: sender.into(),
698                request: DdlRequest::Truncate(v),
699            }),
700            RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
701                region_id,
702                sender: sender.into(),
703                request: DdlRequest::Catchup(v),
704            }),
705            RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
706                metadata: region_metadata,
707                sender: sender.into(),
708                request: region_bulk_inserts_request,
709            },
710        };
711
712        Ok((worker_request, receiver))
713    }
714
715    pub(crate) fn new_set_readonly_gracefully(
716        region_id: RegionId,
717        region_role_state: SettableRegionRoleState,
718    ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
719        let (sender, receiver) = oneshot::channel();
720
721        (
722            WorkerRequest::SetRegionRoleStateGracefully {
723                region_id,
724                region_role_state,
725                sender,
726            },
727            receiver,
728        )
729    }
730
731    pub(crate) fn new_sync_region_request(
732        region_id: RegionId,
733        manifest_version: ManifestVersion,
734    ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
735        let (sender, receiver) = oneshot::channel();
736        (
737            WorkerRequest::SyncRegion(RegionSyncRequest {
738                region_id,
739                manifest_version,
740                sender,
741            }),
742            receiver,
743        )
744    }
745}
746
747/// DDL request to a region.
748#[derive(Debug)]
749pub(crate) enum DdlRequest {
750    Create(RegionCreateRequest),
751    Drop,
752    Open((RegionOpenRequest, Option<WalEntryReceiver>)),
753    Close(RegionCloseRequest),
754    Alter(RegionAlterRequest),
755    Flush(RegionFlushRequest),
756    Compact(RegionCompactRequest),
757    Truncate(RegionTruncateRequest),
758    Catchup(RegionCatchupRequest),
759}
760
761/// Sender and Ddl request.
762#[derive(Debug)]
763pub(crate) struct SenderDdlRequest {
764    /// Region id of the request.
765    pub(crate) region_id: RegionId,
766    /// Result sender.
767    pub(crate) sender: OptionOutputTx,
768    /// Ddl request.
769    pub(crate) request: DdlRequest,
770}
771
772/// Notification from a background job.
773#[derive(Debug)]
774pub(crate) enum BackgroundNotify {
775    /// Flush has finished.
776    FlushFinished(FlushFinished),
777    /// Flush has failed.
778    FlushFailed(FlushFailed),
779    /// Compaction has finished.
780    CompactionFinished(CompactionFinished),
781    /// Compaction has failed.
782    CompactionFailed(CompactionFailed),
783    /// Truncate result.
784    Truncate(TruncateResult),
785    /// Region change result.
786    RegionChange(RegionChangeResult),
787    /// Region edit result.
788    RegionEdit(RegionEditResult),
789}
790
791/// Notifies a flush job is finished.
792#[derive(Debug)]
793pub(crate) struct FlushFinished {
794    /// Region id.
795    pub(crate) region_id: RegionId,
796    /// Entry id of flushed data.
797    pub(crate) flushed_entry_id: EntryId,
798    /// Flush result senders.
799    pub(crate) senders: Vec<OutputTx>,
800    /// Flush timer.
801    pub(crate) _timer: HistogramTimer,
802    /// Region edit to apply.
803    pub(crate) edit: RegionEdit,
804    /// Memtables to remove.
805    pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
806}
807
808impl FlushFinished {
809    /// Marks the flush job as successful and observes the timer.
810    pub(crate) fn on_success(self) {
811        for sender in self.senders {
812            sender.send(Ok(0));
813        }
814    }
815}
816
817impl OnFailure for FlushFinished {
818    fn on_failure(&mut self, err: Error) {
819        let err = Arc::new(err);
820        for sender in self.senders.drain(..) {
821            sender.send(Err(err.clone()).context(FlushRegionSnafu {
822                region_id: self.region_id,
823            }));
824        }
825    }
826}
827
828/// Notifies a flush job is failed.
829#[derive(Debug)]
830pub(crate) struct FlushFailed {
831    /// The error source of the failure.
832    pub(crate) err: Arc<Error>,
833}
834
835/// Notifies a compaction job has finished.
836#[derive(Debug)]
837pub(crate) struct CompactionFinished {
838    /// Region id.
839    pub(crate) region_id: RegionId,
840    /// Compaction result senders.
841    pub(crate) senders: Vec<OutputTx>,
842    /// Start time of compaction task.
843    pub(crate) start_time: Instant,
844    /// Region edit to apply.
845    pub(crate) edit: RegionEdit,
846}
847
848impl CompactionFinished {
849    pub fn on_success(self) {
850        // only update compaction time on success
851        COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
852
853        for sender in self.senders {
854            sender.send(Ok(0));
855        }
856        info!("Successfully compacted region: {}", self.region_id);
857    }
858}
859
860impl OnFailure for CompactionFinished {
861    /// Compaction succeeded but failed to update manifest or region's already been dropped.
862    fn on_failure(&mut self, err: Error) {
863        let err = Arc::new(err);
864        for sender in self.senders.drain(..) {
865            sender.send(Err(err.clone()).context(CompactRegionSnafu {
866                region_id: self.region_id,
867            }));
868        }
869    }
870}
871
872/// A failing compaction result.
873#[derive(Debug)]
874pub(crate) struct CompactionFailed {
875    pub(crate) region_id: RegionId,
876    /// The error source of the failure.
877    pub(crate) err: Arc<Error>,
878}
879
880/// Notifies the truncate result of a region.
881#[derive(Debug)]
882pub(crate) struct TruncateResult {
883    /// Region id.
884    pub(crate) region_id: RegionId,
885    /// Result sender.
886    pub(crate) sender: OptionOutputTx,
887    /// Truncate result.
888    pub(crate) result: Result<()>,
889    pub(crate) kind: TruncateKind,
890}
891
892/// Notifies the region the result of writing region change action.
893#[derive(Debug)]
894pub(crate) struct RegionChangeResult {
895    /// Region id.
896    pub(crate) region_id: RegionId,
897    /// The new region metadata to apply.
898    pub(crate) new_meta: RegionMetadataRef,
899    /// Result sender.
900    pub(crate) sender: OptionOutputTx,
901    /// Result from the manifest manager.
902    pub(crate) result: Result<()>,
903}
904
905/// Request to edit a region directly.
906#[derive(Debug)]
907pub(crate) struct RegionEditRequest {
908    pub(crate) region_id: RegionId,
909    pub(crate) edit: RegionEdit,
910    /// The sender to notify the result to the region engine.
911    pub(crate) tx: Sender<Result<()>>,
912}
913
914/// Notifies the regin the result of editing region.
915#[derive(Debug)]
916pub(crate) struct RegionEditResult {
917    /// Region id.
918    pub(crate) region_id: RegionId,
919    /// Result sender.
920    pub(crate) sender: Sender<Result<()>>,
921    /// Region edit to apply.
922    pub(crate) edit: RegionEdit,
923    /// Result from the manifest manager.
924    pub(crate) result: Result<()>,
925}
926
927#[derive(Debug)]
928pub(crate) struct RegionSyncRequest {
929    pub(crate) region_id: RegionId,
930    pub(crate) manifest_version: ManifestVersion,
931    /// Returns the latest manifest version and a boolean indicating whether new maniefst is installed.
932    pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
933}
934
935#[cfg(test)]
936mod tests {
937    use api::v1::value::ValueData;
938    use api::v1::{Row, SemanticType};
939    use datatypes::prelude::ConcreteDataType;
940    use datatypes::schema::ColumnDefaultConstraint;
941    use mito_codec::test_util::i64_value;
942    use store_api::metadata::RegionMetadataBuilder;
943
944    use super::*;
945    use crate::error::Error;
946    use crate::test_util::ts_ms_value;
947
948    fn new_column_schema(
949        name: &str,
950        data_type: ColumnDataType,
951        semantic_type: SemanticType,
952    ) -> ColumnSchema {
953        ColumnSchema {
954            column_name: name.to_string(),
955            datatype: data_type as i32,
956            semantic_type: semantic_type as i32,
957            ..Default::default()
958        }
959    }
960
961    fn check_invalid_request(err: &Error, expect: &str) {
962        if let Error::InvalidRequest {
963            region_id: _,
964            reason,
965            location: _,
966        } = err
967        {
968            assert_eq!(reason, expect);
969        } else {
970            panic!("Unexpected error {err}")
971        }
972    }
973
974    #[test]
975    fn test_write_request_duplicate_column() {
976        let rows = Rows {
977            schema: vec![
978                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
979                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
980            ],
981            rows: vec![],
982        };
983
984        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
985        check_invalid_request(&err, "duplicate column c0");
986    }
987
988    #[test]
989    fn test_valid_write_request() {
990        let rows = Rows {
991            schema: vec![
992                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
993                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
994            ],
995            rows: vec![Row {
996                values: vec![i64_value(1), i64_value(2)],
997            }],
998        };
999
1000        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1001        assert_eq!(0, request.column_index_by_name("c0").unwrap());
1002        assert_eq!(1, request.column_index_by_name("c1").unwrap());
1003        assert_eq!(None, request.column_index_by_name("c2"));
1004    }
1005
1006    #[test]
1007    fn test_write_request_column_num() {
1008        let rows = Rows {
1009            schema: vec![
1010                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1011                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1012            ],
1013            rows: vec![Row {
1014                values: vec![i64_value(1), i64_value(2), i64_value(3)],
1015            }],
1016        };
1017
1018        let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1019        check_invalid_request(&err, "row has 3 columns but schema has 2");
1020    }
1021
1022    fn new_region_metadata() -> RegionMetadata {
1023        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1024        builder
1025            .push_column_metadata(ColumnMetadata {
1026                column_schema: datatypes::schema::ColumnSchema::new(
1027                    "ts",
1028                    ConcreteDataType::timestamp_millisecond_datatype(),
1029                    false,
1030                ),
1031                semantic_type: SemanticType::Timestamp,
1032                column_id: 1,
1033            })
1034            .push_column_metadata(ColumnMetadata {
1035                column_schema: datatypes::schema::ColumnSchema::new(
1036                    "k0",
1037                    ConcreteDataType::int64_datatype(),
1038                    true,
1039                ),
1040                semantic_type: SemanticType::Tag,
1041                column_id: 2,
1042            })
1043            .primary_key(vec![2]);
1044        builder.build().unwrap()
1045    }
1046
1047    #[test]
1048    fn test_check_schema() {
1049        let rows = Rows {
1050            schema: vec![
1051                new_column_schema(
1052                    "ts",
1053                    ColumnDataType::TimestampMillisecond,
1054                    SemanticType::Timestamp,
1055                ),
1056                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1057            ],
1058            rows: vec![Row {
1059                values: vec![ts_ms_value(1), i64_value(2)],
1060            }],
1061        };
1062        let metadata = new_region_metadata();
1063
1064        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1065        request.check_schema(&metadata).unwrap();
1066    }
1067
1068    #[test]
1069    fn test_column_type() {
1070        let rows = Rows {
1071            schema: vec![
1072                new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1073                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1074            ],
1075            rows: vec![Row {
1076                values: vec![i64_value(1), i64_value(2)],
1077            }],
1078        };
1079        let metadata = new_region_metadata();
1080
1081        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1082        let err = request.check_schema(&metadata).unwrap_err();
1083        check_invalid_request(&err, "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)");
1084    }
1085
1086    #[test]
1087    fn test_semantic_type() {
1088        let rows = Rows {
1089            schema: vec![
1090                new_column_schema(
1091                    "ts",
1092                    ColumnDataType::TimestampMillisecond,
1093                    SemanticType::Tag,
1094                ),
1095                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1096            ],
1097            rows: vec![Row {
1098                values: vec![ts_ms_value(1), i64_value(2)],
1099            }],
1100        };
1101        let metadata = new_region_metadata();
1102
1103        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1104        let err = request.check_schema(&metadata).unwrap_err();
1105        check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1106    }
1107
1108    #[test]
1109    fn test_column_nullable() {
1110        let rows = Rows {
1111            schema: vec![
1112                new_column_schema(
1113                    "ts",
1114                    ColumnDataType::TimestampMillisecond,
1115                    SemanticType::Timestamp,
1116                ),
1117                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1118            ],
1119            rows: vec![Row {
1120                values: vec![Value { value_data: None }, i64_value(2)],
1121            }],
1122        };
1123        let metadata = new_region_metadata();
1124
1125        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1126        let err = request.check_schema(&metadata).unwrap_err();
1127        check_invalid_request(&err, "column ts is not null but input has null");
1128    }
1129
1130    #[test]
1131    fn test_column_default() {
1132        let rows = Rows {
1133            schema: vec![new_column_schema(
1134                "k0",
1135                ColumnDataType::Int64,
1136                SemanticType::Tag,
1137            )],
1138            rows: vec![Row {
1139                values: vec![i64_value(1)],
1140            }],
1141        };
1142        let metadata = new_region_metadata();
1143
1144        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1145        let err = request.check_schema(&metadata).unwrap_err();
1146        check_invalid_request(&err, "missing column ts");
1147    }
1148
1149    #[test]
1150    fn test_unknown_column() {
1151        let rows = Rows {
1152            schema: vec![
1153                new_column_schema(
1154                    "ts",
1155                    ColumnDataType::TimestampMillisecond,
1156                    SemanticType::Timestamp,
1157                ),
1158                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1159                new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1160            ],
1161            rows: vec![Row {
1162                values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1163            }],
1164        };
1165        let metadata = new_region_metadata();
1166
1167        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1168        let err = request.check_schema(&metadata).unwrap_err();
1169        check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1170    }
1171
1172    #[test]
1173    fn test_fill_impure_columns_err() {
1174        let rows = Rows {
1175            schema: vec![new_column_schema(
1176                "k0",
1177                ColumnDataType::Int64,
1178                SemanticType::Tag,
1179            )],
1180            rows: vec![Row {
1181                values: vec![i64_value(1)],
1182            }],
1183        };
1184        let metadata = {
1185            let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1186            builder
1187                .push_column_metadata(ColumnMetadata {
1188                    column_schema: datatypes::schema::ColumnSchema::new(
1189                        "ts",
1190                        ConcreteDataType::timestamp_millisecond_datatype(),
1191                        false,
1192                    )
1193                    .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1194                        "now()".to_string(),
1195                    )))
1196                    .unwrap(),
1197                    semantic_type: SemanticType::Timestamp,
1198                    column_id: 1,
1199                })
1200                .push_column_metadata(ColumnMetadata {
1201                    column_schema: datatypes::schema::ColumnSchema::new(
1202                        "k0",
1203                        ConcreteDataType::int64_datatype(),
1204                        true,
1205                    ),
1206                    semantic_type: SemanticType::Tag,
1207                    column_id: 2,
1208                })
1209                .primary_key(vec![2]);
1210            builder.build().unwrap()
1211        };
1212
1213        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1214        let err = request.check_schema(&metadata).unwrap_err();
1215        assert!(err.is_fill_default());
1216        assert!(request
1217            .fill_missing_columns(&metadata)
1218            .unwrap_err()
1219            .to_string()
1220            .contains("unexpected impure default value with region_id"));
1221    }
1222
1223    #[test]
1224    fn test_fill_missing_columns() {
1225        let rows = Rows {
1226            schema: vec![new_column_schema(
1227                "ts",
1228                ColumnDataType::TimestampMillisecond,
1229                SemanticType::Timestamp,
1230            )],
1231            rows: vec![Row {
1232                values: vec![ts_ms_value(1)],
1233            }],
1234        };
1235        let metadata = new_region_metadata();
1236
1237        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1238        let err = request.check_schema(&metadata).unwrap_err();
1239        assert!(err.is_fill_default());
1240        request.fill_missing_columns(&metadata).unwrap();
1241
1242        let expect_rows = Rows {
1243            schema: vec![new_column_schema(
1244                "ts",
1245                ColumnDataType::TimestampMillisecond,
1246                SemanticType::Timestamp,
1247            )],
1248            rows: vec![Row {
1249                values: vec![ts_ms_value(1)],
1250            }],
1251        };
1252        assert_eq!(expect_rows, request.rows);
1253    }
1254
1255    fn builder_with_ts_tag() -> RegionMetadataBuilder {
1256        let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1257        builder
1258            .push_column_metadata(ColumnMetadata {
1259                column_schema: datatypes::schema::ColumnSchema::new(
1260                    "ts",
1261                    ConcreteDataType::timestamp_millisecond_datatype(),
1262                    false,
1263                ),
1264                semantic_type: SemanticType::Timestamp,
1265                column_id: 1,
1266            })
1267            .push_column_metadata(ColumnMetadata {
1268                column_schema: datatypes::schema::ColumnSchema::new(
1269                    "k0",
1270                    ConcreteDataType::int64_datatype(),
1271                    true,
1272                ),
1273                semantic_type: SemanticType::Tag,
1274                column_id: 2,
1275            })
1276            .primary_key(vec![2]);
1277        builder
1278    }
1279
1280    fn region_metadata_two_fields() -> RegionMetadata {
1281        let mut builder = builder_with_ts_tag();
1282        builder
1283            .push_column_metadata(ColumnMetadata {
1284                column_schema: datatypes::schema::ColumnSchema::new(
1285                    "f0",
1286                    ConcreteDataType::int64_datatype(),
1287                    true,
1288                ),
1289                semantic_type: SemanticType::Field,
1290                column_id: 3,
1291            })
1292            // Column is not nullable.
1293            .push_column_metadata(ColumnMetadata {
1294                column_schema: datatypes::schema::ColumnSchema::new(
1295                    "f1",
1296                    ConcreteDataType::int64_datatype(),
1297                    false,
1298                )
1299                .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1300                    datatypes::value::Value::Int64(100),
1301                )))
1302                .unwrap(),
1303                semantic_type: SemanticType::Field,
1304                column_id: 4,
1305            });
1306        builder.build().unwrap()
1307    }
1308
1309    #[test]
1310    fn test_fill_missing_for_delete() {
1311        let rows = Rows {
1312            schema: vec![new_column_schema(
1313                "ts",
1314                ColumnDataType::TimestampMillisecond,
1315                SemanticType::Timestamp,
1316            )],
1317            rows: vec![Row {
1318                values: vec![ts_ms_value(1)],
1319            }],
1320        };
1321        let metadata = region_metadata_two_fields();
1322
1323        let mut request =
1324            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1325        let err = request.check_schema(&metadata).unwrap_err();
1326        check_invalid_request(&err, "delete requests need column k0");
1327        let err = request.fill_missing_columns(&metadata).unwrap_err();
1328        check_invalid_request(&err, "delete requests need column k0");
1329
1330        let rows = Rows {
1331            schema: vec![
1332                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1333                new_column_schema(
1334                    "ts",
1335                    ColumnDataType::TimestampMillisecond,
1336                    SemanticType::Timestamp,
1337                ),
1338            ],
1339            rows: vec![Row {
1340                values: vec![i64_value(100), ts_ms_value(1)],
1341            }],
1342        };
1343        let mut request =
1344            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1345        let err = request.check_schema(&metadata).unwrap_err();
1346        assert!(err.is_fill_default());
1347        request.fill_missing_columns(&metadata).unwrap();
1348
1349        let expect_rows = Rows {
1350            schema: vec![
1351                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1352                new_column_schema(
1353                    "ts",
1354                    ColumnDataType::TimestampMillisecond,
1355                    SemanticType::Timestamp,
1356                ),
1357                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1358            ],
1359            // Column f1 is not nullable and we use 0 for padding.
1360            rows: vec![Row {
1361                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1362            }],
1363        };
1364        assert_eq!(expect_rows, request.rows);
1365    }
1366
1367    #[test]
1368    fn test_fill_missing_without_default_in_delete() {
1369        let mut builder = builder_with_ts_tag();
1370        builder
1371            // f0 is nullable.
1372            .push_column_metadata(ColumnMetadata {
1373                column_schema: datatypes::schema::ColumnSchema::new(
1374                    "f0",
1375                    ConcreteDataType::int64_datatype(),
1376                    true,
1377                ),
1378                semantic_type: SemanticType::Field,
1379                column_id: 3,
1380            })
1381            // f1 is not nullable and don't has default.
1382            .push_column_metadata(ColumnMetadata {
1383                column_schema: datatypes::schema::ColumnSchema::new(
1384                    "f1",
1385                    ConcreteDataType::int64_datatype(),
1386                    false,
1387                ),
1388                semantic_type: SemanticType::Field,
1389                column_id: 4,
1390            });
1391        let metadata = builder.build().unwrap();
1392
1393        let rows = Rows {
1394            schema: vec![
1395                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1396                new_column_schema(
1397                    "ts",
1398                    ColumnDataType::TimestampMillisecond,
1399                    SemanticType::Timestamp,
1400                ),
1401            ],
1402            // Missing f0 (nullable), f1 (not nullable).
1403            rows: vec![Row {
1404                values: vec![i64_value(100), ts_ms_value(1)],
1405            }],
1406        };
1407        let mut request =
1408            WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1409        let err = request.check_schema(&metadata).unwrap_err();
1410        assert!(err.is_fill_default());
1411        request.fill_missing_columns(&metadata).unwrap();
1412
1413        let expect_rows = Rows {
1414            schema: vec![
1415                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1416                new_column_schema(
1417                    "ts",
1418                    ColumnDataType::TimestampMillisecond,
1419                    SemanticType::Timestamp,
1420                ),
1421                new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1422            ],
1423            // Column f1 is not nullable and we use 0 for padding.
1424            rows: vec![Row {
1425                values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1426            }],
1427        };
1428        assert_eq!(expect_rows, request.rows);
1429    }
1430
1431    #[test]
1432    fn test_no_default() {
1433        let rows = Rows {
1434            schema: vec![new_column_schema(
1435                "k0",
1436                ColumnDataType::Int64,
1437                SemanticType::Tag,
1438            )],
1439            rows: vec![Row {
1440                values: vec![i64_value(1)],
1441            }],
1442        };
1443        let metadata = new_region_metadata();
1444
1445        let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1446        let err = request.fill_missing_columns(&metadata).unwrap_err();
1447        check_invalid_request(&err, "column ts does not have default value");
1448    }
1449
1450    #[test]
1451    fn test_missing_and_invalid() {
1452        // Missing f0 and f1 has invalid type (string).
1453        let rows = Rows {
1454            schema: vec![
1455                new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1456                new_column_schema(
1457                    "ts",
1458                    ColumnDataType::TimestampMillisecond,
1459                    SemanticType::Timestamp,
1460                ),
1461                new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1462            ],
1463            rows: vec![Row {
1464                values: vec![
1465                    i64_value(100),
1466                    ts_ms_value(1),
1467                    Value {
1468                        value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1469                    },
1470                ],
1471            }],
1472        };
1473        let metadata = region_metadata_two_fields();
1474
1475        let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1476        let err = request.check_schema(&metadata).unwrap_err();
1477        check_invalid_request(
1478            &err,
1479            "column f1 expect type Int64(Int64Type), given: STRING(12)",
1480        );
1481    }
1482
1483    #[test]
1484    fn test_write_request_metadata() {
1485        let rows = Rows {
1486            schema: vec![
1487                new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1488                new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1489            ],
1490            rows: vec![Row {
1491                values: vec![i64_value(1), i64_value(2)],
1492            }],
1493        };
1494
1495        let metadata = Arc::new(new_region_metadata());
1496        let request = WriteRequest::new(
1497            RegionId::new(1, 1),
1498            OpType::Put,
1499            rows,
1500            Some(metadata.clone()),
1501        )
1502        .unwrap();
1503
1504        assert!(request.region_metadata.is_some());
1505        assert_eq!(
1506            request.region_metadata.unwrap().region_id,
1507            RegionId::new(1, 1)
1508        );
1509    }
1510}