1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23};
24use api::v1::column_def::options_from_column_schema;
25use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
26use common_telemetry::info;
27use datatypes::prelude::DataType;
28use partition::expr::PartitionExpr;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{
37 MitoCopyRegionFromResponse, SetRegionRoleStateResponse, SettableRegionRoleState,
38};
39use store_api::region_request::{
40 AffectedRows, ApplyStagingManifestRequest, EnterStagingRequest, RegionAlterRequest,
41 RegionBuildIndexRequest, RegionBulkInsertsRequest, RegionCatchupRequest, RegionCloseRequest,
42 RegionCompactRequest, RegionCreateRequest, RegionDropRequest, RegionFlushRequest,
43 RegionOpenRequest, RegionRequest, RegionTruncateRequest, StagingPartitionDirective,
44};
45use store_api::storage::{FileId, RegionId};
46use tokio::sync::oneshot::{self, Receiver, Sender};
47
48use crate::error::{
49 CompactRegionSnafu, CompactionCancelledSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu,
50 Error, FillDefaultSnafu, FlushRegionSnafu, InvalidPartitionExprSnafu, InvalidRequestSnafu,
51 MissingPartitionExprSnafu, Result, UnexpectedSnafu,
52};
53use crate::flush::FlushReason;
54use crate::manifest::action::{RegionEdit, TruncateKind};
55use crate::memtable::MemtableId;
56use crate::memtable::bulk::part::BulkPart;
57use crate::metrics::COMPACTION_ELAPSED_TOTAL;
58use crate::region::options::RegionOptions;
59use crate::sst::file::FileMeta;
60use crate::sst::index::IndexBuildType;
61use crate::wal::EntryId;
62use crate::wal::entry_distributor::WalEntryReceiver;
63
64#[derive(Debug)]
66pub struct WriteRequest {
67 pub region_id: RegionId,
69 pub op_type: OpType,
71 pub rows: Rows,
73 pub name_to_index: HashMap<String, usize>,
75 pub has_null: Vec<bool>,
77 pub hint: Option<WriteHint>,
79 pub(crate) region_metadata: Option<RegionMetadataRef>,
81 pub partition_expr_version: Option<u64>,
83}
84
85impl WriteRequest {
86 pub fn new(
90 region_id: RegionId,
91 op_type: OpType,
92 rows: Rows,
93 region_metadata: Option<RegionMetadataRef>,
94 ) -> Result<WriteRequest> {
95 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
96 for (index, column) in rows.schema.iter().enumerate() {
97 ensure!(
98 name_to_index
99 .insert(column.column_name.clone(), index)
100 .is_none(),
101 InvalidRequestSnafu {
102 region_id,
103 reason: format!("duplicate column {}", column.column_name),
104 }
105 );
106 }
107
108 let mut has_null = vec![false; rows.schema.len()];
109 for row in &rows.rows {
110 ensure!(
111 row.values.len() == rows.schema.len(),
112 InvalidRequestSnafu {
113 region_id,
114 reason: format!(
115 "row has {} columns but schema has {}",
116 row.values.len(),
117 rows.schema.len()
118 ),
119 }
120 );
121
122 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
123 validate_proto_value(region_id, value, column_schema)?;
124
125 if value.value_data.is_none() {
126 has_null[i] = true;
127 }
128 }
129 }
130
131 Ok(WriteRequest {
132 region_id,
133 op_type,
134 rows,
135 name_to_index,
136 has_null,
137 hint: None,
138 region_metadata,
139 partition_expr_version: None,
140 })
141 }
142
143 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
145 self.hint = hint;
146 self
147 }
148
149 pub fn with_partition_expr_version(mut self, partition_expr_version: Option<u64>) -> Self {
150 self.partition_expr_version = partition_expr_version;
151 self
152 }
153
154 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
156 infer_primary_key_encoding_from_hint(self.hint.as_ref())
157 }
158
159 pub(crate) fn estimated_size(&self) -> usize {
161 let row_size = self
162 .rows
163 .rows
164 .first()
165 .map(|row| row.encoded_len())
166 .unwrap_or(0);
167 row_size * self.rows.rows.len()
168 }
169
170 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
172 self.name_to_index.get(name).copied()
173 }
174
175 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
180 debug_assert_eq!(self.region_id, metadata.region_id);
181
182 let region_id = self.region_id;
183 let mut rows_columns: HashMap<_, _> = self
185 .rows
186 .schema
187 .iter()
188 .map(|column| (&column.column_name, column))
189 .collect();
190
191 let mut need_fill_default = false;
192 for column in &metadata.column_metadatas {
194 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
195 ensure!(
197 is_column_type_value_eq(
198 input_col.datatype,
199 input_col.datatype_extension.clone(),
200 &column.column_schema.data_type
201 ),
202 InvalidRequestSnafu {
203 region_id,
204 reason: format!(
205 "column {} expect type {:?}, given: {}({})",
206 column.column_schema.name,
207 column.column_schema.data_type,
208 ColumnDataType::try_from(input_col.datatype)
209 .map(|v| v.as_str_name())
210 .unwrap_or("Unknown"),
211 input_col.datatype,
212 )
213 }
214 );
215
216 ensure!(
218 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
219 InvalidRequestSnafu {
220 region_id,
221 reason: format!(
222 "column {} has semantic type {:?}, given: {}({})",
223 column.column_schema.name,
224 column.semantic_type,
225 api::v1::SemanticType::try_from(input_col.semantic_type)
226 .map(|v| v.as_str_name())
227 .unwrap_or("Unknown"),
228 input_col.semantic_type
229 ),
230 }
231 );
232
233 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
236 ensure!(
237 !has_null || column.column_schema.is_nullable(),
238 InvalidRequestSnafu {
239 region_id,
240 reason: format!(
241 "column {} is not null but input has null",
242 column.column_schema.name
243 ),
244 }
245 );
246 } else {
247 self.check_missing_column(column)?;
249
250 need_fill_default = true;
251 }
252 }
253
254 if !rows_columns.is_empty() {
256 let names: Vec<_> = rows_columns.into_keys().collect();
257 return InvalidRequestSnafu {
258 region_id,
259 reason: format!("unknown columns: {:?}", names),
260 }
261 .fail();
262 }
263
264 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
266
267 Ok(())
268 }
269
270 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
275 debug_assert_eq!(self.region_id, metadata.region_id);
276
277 let mut columns_to_fill = vec![];
278 for column in &metadata.column_metadatas {
279 if !self.name_to_index.contains_key(&column.column_schema.name) {
280 columns_to_fill.push(column);
281 }
282 }
283 self.fill_columns(columns_to_fill)?;
284
285 Ok(())
286 }
287
288 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
290 if let Err(e) = self.check_schema(metadata) {
291 if e.is_fill_default() {
292 self.fill_missing_columns(metadata)?;
296 } else {
297 return Err(e);
298 }
299 }
300
301 Ok(())
302 }
303
304 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
306 let mut default_values = Vec::with_capacity(columns.len());
307 let mut columns_to_fill = Vec::with_capacity(columns.len());
308 for column in columns {
309 let default_value = self.column_default_value(column)?;
310 if default_value.value_data.is_some() {
311 default_values.push(default_value);
312 columns_to_fill.push(column);
313 }
314 }
315
316 for row in &mut self.rows.rows {
317 row.values.extend(default_values.iter().cloned());
318 }
319
320 for column in columns_to_fill {
321 let (datatype, datatype_ext) =
322 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
323 .with_context(|_| ConvertColumnDataTypeSnafu {
324 reason: format!(
325 "no protobuf type for column {} ({:?})",
326 column.column_schema.name, column.column_schema.data_type
327 ),
328 })?
329 .to_parts();
330 self.rows.schema.push(ColumnSchema {
331 column_name: column.column_schema.name.clone(),
332 datatype: datatype as i32,
333 semantic_type: column.semantic_type as i32,
334 datatype_extension: datatype_ext,
335 options: options_from_column_schema(&column.column_schema),
336 });
337 }
338
339 Ok(())
340 }
341
342 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
344 if self.op_type == OpType::Delete {
345 if column.semantic_type == SemanticType::Field {
346 return Ok(());
349 } else {
350 return InvalidRequestSnafu {
351 region_id: self.region_id,
352 reason: format!("delete requests need column {}", column.column_schema.name),
353 }
354 .fail();
355 }
356 }
357
358 ensure!(
360 column.column_schema.is_nullable()
361 || column.column_schema.default_constraint().is_some(),
362 InvalidRequestSnafu {
363 region_id: self.region_id,
364 reason: format!("missing column {}", column.column_schema.name),
365 }
366 );
367
368 Ok(())
369 }
370
371 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
373 let default_value = match self.op_type {
374 OpType::Delete => {
375 ensure!(
376 column.semantic_type == SemanticType::Field,
377 InvalidRequestSnafu {
378 region_id: self.region_id,
379 reason: format!(
380 "delete requests need column {}",
381 column.column_schema.name
382 ),
383 }
384 );
385
386 if column.column_schema.is_nullable() {
391 datatypes::value::Value::Null
392 } else {
393 column.column_schema.data_type.default_value()
394 }
395 }
396 OpType::Put => {
397 if column.column_schema.is_default_impure() {
399 UnexpectedSnafu {
400 reason: format!(
401 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
402 self.region_id,
403 column.column_schema.name,
404 column.column_schema.default_constraint(),
405 ),
406 }
407 .fail()?
408 }
409 column
410 .column_schema
411 .create_default()
412 .context(CreateDefaultSnafu {
413 region_id: self.region_id,
414 column: &column.column_schema.name,
415 })?
416 .with_context(|| InvalidRequestSnafu {
418 region_id: self.region_id,
419 reason: format!(
420 "column {} does not have default value",
421 column.column_schema.name
422 ),
423 })?
424 }
425 };
426
427 Ok(api::helper::to_grpc_value(default_value))
429 }
430}
431
432pub(crate) fn validate_proto_value(
434 region_id: RegionId,
435 value: &Value,
436 column_schema: &ColumnSchema,
437) -> Result<()> {
438 if let Some(value_type) = proto_value_type(value) {
439 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
440 InvalidRequestSnafu {
441 region_id,
442 reason: format!(
443 "column {} has unknown type {}",
444 column_schema.column_name, column_schema.datatype
445 ),
446 }
447 .build()
448 })?;
449 ensure!(
450 proto_value_type_match(column_type, value_type),
451 InvalidRequestSnafu {
452 region_id,
453 reason: format!(
454 "value has type {:?}, but column {} has type {:?}({})",
455 value_type, column_schema.column_name, column_type, column_schema.datatype,
456 ),
457 }
458 );
459 }
460
461 Ok(())
462}
463
464fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
465 match (column_type, value_type) {
466 (ct, vt) if ct == vt => true,
467 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
468 (ColumnDataType::Json, ColumnDataType::Binary) => true,
469 _ => false,
470 }
471}
472
473#[derive(Debug)]
475pub struct OutputTx(Sender<Result<AffectedRows>>);
476
477impl OutputTx {
478 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
480 OutputTx(sender)
481 }
482
483 pub(crate) fn send(self, result: Result<AffectedRows>) {
485 let _ = self.0.send(result);
487 }
488}
489
490#[derive(Debug)]
492pub(crate) struct OptionOutputTx(Option<OutputTx>);
493
494impl OptionOutputTx {
495 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
497 OptionOutputTx(sender)
498 }
499
500 pub(crate) fn none() -> OptionOutputTx {
502 OptionOutputTx(None)
503 }
504
505 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
507 if let Some(sender) = self.0.take() {
508 sender.send(result);
509 }
510 }
511
512 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
514 if let Some(sender) = self.0.take() {
515 sender.send(result);
516 }
517 }
518
519 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
521 self.0.take()
522 }
523}
524
525impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
526 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
527 Self::new(Some(OutputTx::new(sender)))
528 }
529}
530
531impl OnFailure for OptionOutputTx {
532 fn on_failure(&mut self, err: Error) {
533 self.send_mut(Err(err));
534 }
535}
536
537pub(crate) trait OnFailure {
539 fn on_failure(&mut self, err: Error);
541}
542
543#[derive(Debug)]
545pub(crate) struct SenderWriteRequest {
546 pub(crate) sender: OptionOutputTx,
548 pub(crate) request: WriteRequest,
549}
550
551pub(crate) struct SenderBulkRequest {
552 pub(crate) sender: OptionOutputTx,
553 pub(crate) region_id: RegionId,
554 pub(crate) request: BulkPart,
555 pub(crate) region_metadata: RegionMetadataRef,
556 pub(crate) partition_expr_version: Option<u64>,
557}
558
559#[derive(Debug)]
561pub(crate) struct WorkerRequestWithTime {
562 pub(crate) request: WorkerRequest,
563 pub(crate) created_at: Instant,
564}
565
566impl WorkerRequestWithTime {
567 pub(crate) fn new(request: WorkerRequest) -> Self {
568 Self {
569 request,
570 created_at: Instant::now(),
571 }
572 }
573}
574
575#[derive(Debug)]
577pub(crate) enum WorkerRequest {
578 Write(SenderWriteRequest),
580
581 Ddl(SenderDdlRequest),
583
584 Background {
586 region_id: RegionId,
588 notify: BackgroundNotify,
590 },
591
592 SetRegionRoleStateGracefully {
594 region_id: RegionId,
596 region_role_state: SettableRegionRoleState,
598 sender: Sender<SetRegionRoleStateResponse>,
600 },
601
602 Stop,
604
605 EditRegion(RegionEditRequest),
607
608 SyncRegion(RegionSyncRequest),
610
611 BulkInserts {
613 metadata: Option<RegionMetadataRef>,
614 request: RegionBulkInsertsRequest,
615 sender: OptionOutputTx,
616 },
617
618 RemapManifests(RemapManifestsRequest),
620
621 CopyRegionFrom(CopyRegionFromRequest),
623}
624
625impl WorkerRequest {
626 pub(crate) fn new_open_region_request(
628 region_id: RegionId,
629 request: RegionOpenRequest,
630 entry_receiver: Option<WalEntryReceiver>,
631 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
632 let (sender, receiver) = oneshot::channel();
633
634 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
635 region_id,
636 sender: sender.into(),
637 request: DdlRequest::Open((request, entry_receiver)),
638 });
639
640 (worker_request, receiver)
641 }
642
643 pub(crate) fn new_catchup_region_request(
645 region_id: RegionId,
646 request: RegionCatchupRequest,
647 entry_receiver: Option<WalEntryReceiver>,
648 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
649 let (sender, receiver) = oneshot::channel();
650 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
651 region_id,
652 sender: sender.into(),
653 request: DdlRequest::Catchup((request, entry_receiver)),
654 });
655 (worker_request, receiver)
656 }
657
658 pub(crate) fn try_from_region_request(
660 region_id: RegionId,
661 value: RegionRequest,
662 region_metadata: Option<RegionMetadataRef>,
663 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
664 let (sender, receiver) = oneshot::channel();
665 let worker_request = match value {
666 RegionRequest::Put(v) => {
667 let mut write_request =
668 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
669 .with_hint(v.hint)
670 .with_partition_expr_version(v.partition_expr_version);
671 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
672 && let Some(region_metadata) = ®ion_metadata
673 {
674 write_request.maybe_fill_missing_columns(region_metadata)?;
675 }
676 WorkerRequest::Write(SenderWriteRequest {
677 sender: sender.into(),
678 request: write_request,
679 })
680 }
681 RegionRequest::Delete(v) => {
682 let mut write_request =
683 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?
684 .with_hint(v.hint)
685 .with_partition_expr_version(v.partition_expr_version);
686 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
687 && let Some(region_metadata) = ®ion_metadata
688 {
689 write_request.maybe_fill_missing_columns(region_metadata)?;
690 }
691 WorkerRequest::Write(SenderWriteRequest {
692 sender: sender.into(),
693 request: write_request,
694 })
695 }
696 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
697 region_id,
698 sender: sender.into(),
699 request: DdlRequest::Create(v),
700 }),
701 RegionRequest::Drop(v) => WorkerRequest::Ddl(SenderDdlRequest {
702 region_id,
703 sender: sender.into(),
704 request: DdlRequest::Drop(v),
705 }),
706 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
707 region_id,
708 sender: sender.into(),
709 request: DdlRequest::Open((v, None)),
710 }),
711 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
712 region_id,
713 sender: sender.into(),
714 request: DdlRequest::Close(v),
715 }),
716 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
717 region_id,
718 sender: sender.into(),
719 request: DdlRequest::Alter(v),
720 }),
721 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
722 region_id,
723 sender: sender.into(),
724 request: DdlRequest::Flush(v),
725 }),
726 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
727 region_id,
728 sender: sender.into(),
729 request: DdlRequest::Compact(v),
730 }),
731 RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
732 region_id,
733 sender: sender.into(),
734 request: DdlRequest::BuildIndex(v),
735 }),
736 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
737 region_id,
738 sender: sender.into(),
739 request: DdlRequest::Truncate(v),
740 }),
741 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
742 region_id,
743 sender: sender.into(),
744 request: DdlRequest::Catchup((v, None)),
745 }),
746 RegionRequest::EnterStaging(v) => WorkerRequest::Ddl(SenderDdlRequest {
747 region_id,
748 sender: sender.into(),
749 request: DdlRequest::EnterStaging(v),
750 }),
751 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
752 metadata: region_metadata,
753 sender: sender.into(),
754 request: region_bulk_inserts_request,
755 },
756 RegionRequest::ApplyStagingManifest(v) => WorkerRequest::Ddl(SenderDdlRequest {
757 region_id,
758 sender: sender.into(),
759 request: DdlRequest::ApplyStagingManifest(v),
760 }),
761 };
762
763 Ok((worker_request, receiver))
764 }
765
766 pub(crate) fn new_set_readonly_gracefully(
767 region_id: RegionId,
768 region_role_state: SettableRegionRoleState,
769 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
770 let (sender, receiver) = oneshot::channel();
771
772 (
773 WorkerRequest::SetRegionRoleStateGracefully {
774 region_id,
775 region_role_state,
776 sender,
777 },
778 receiver,
779 )
780 }
781
782 pub(crate) fn new_sync_region_request(
783 region_id: RegionId,
784 manifest_version: ManifestVersion,
785 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
786 let (sender, receiver) = oneshot::channel();
787 (
788 WorkerRequest::SyncRegion(RegionSyncRequest {
789 region_id,
790 manifest_version,
791 sender,
792 }),
793 receiver,
794 )
795 }
796
797 #[allow(clippy::type_complexity)]
804 pub(crate) fn try_from_remap_manifests_request(
805 store_api::region_engine::RemapManifestsRequest {
806 region_id,
807 input_regions,
808 region_mapping,
809 new_partition_exprs,
810 }: store_api::region_engine::RemapManifestsRequest,
811 ) -> Result<(WorkerRequest, Receiver<Result<HashMap<RegionId, String>>>)> {
812 let (sender, receiver) = oneshot::channel();
813 let new_partition_exprs = new_partition_exprs
814 .into_iter()
815 .map(|(k, v)| {
816 Ok((
817 k,
818 PartitionExpr::from_json_str(&v)
819 .context(InvalidPartitionExprSnafu { expr: v })?
820 .context(MissingPartitionExprSnafu { region_id: k })?,
821 ))
822 })
823 .collect::<Result<HashMap<_, _>>>()?;
824
825 let request = RemapManifestsRequest {
826 region_id,
827 input_regions,
828 region_mapping,
829 new_partition_exprs,
830 sender,
831 };
832
833 Ok((WorkerRequest::RemapManifests(request), receiver))
834 }
835
836 pub(crate) fn try_from_copy_region_from_request(
838 region_id: RegionId,
839 store_api::region_engine::MitoCopyRegionFromRequest {
840 source_region_id,
841 parallelism,
842 }: store_api::region_engine::MitoCopyRegionFromRequest,
843 ) -> Result<(WorkerRequest, Receiver<Result<MitoCopyRegionFromResponse>>)> {
844 let (sender, receiver) = oneshot::channel();
845 let request = CopyRegionFromRequest {
846 region_id,
847 source_region_id,
848 parallelism,
849 sender,
850 };
851 Ok((WorkerRequest::CopyRegionFrom(request), receiver))
852 }
853}
854
855#[derive(Debug)]
857pub(crate) enum DdlRequest {
858 Create(RegionCreateRequest),
859 Drop(RegionDropRequest),
860 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
861 Close(RegionCloseRequest),
862 Alter(RegionAlterRequest),
863 Flush(RegionFlushRequest),
864 Compact(RegionCompactRequest),
865 BuildIndex(RegionBuildIndexRequest),
866 Truncate(RegionTruncateRequest),
867 Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
868 EnterStaging(EnterStagingRequest),
869 ApplyStagingManifest(ApplyStagingManifestRequest),
870}
871
872#[derive(Debug)]
874pub(crate) struct SenderDdlRequest {
875 pub(crate) region_id: RegionId,
877 pub(crate) sender: OptionOutputTx,
879 pub(crate) request: DdlRequest,
881}
882
883#[derive(Debug)]
885pub(crate) enum BackgroundNotify {
886 FlushFinished(FlushFinished),
888 FlushFailed(FlushFailed),
890 IndexBuildFinished(IndexBuildFinished),
892 IndexBuildStopped(IndexBuildStopped),
894 IndexBuildFailed(IndexBuildFailed),
896 CompactionFinished(CompactionFinished),
898 CompactionCancelled(CompactionCancelled),
900 CompactionFailed(CompactionFailed),
902 Truncate(TruncateResult),
904 RegionChange(RegionChangeResult),
906 RegionEdit(RegionEditResult),
908 EnterStaging(EnterStagingResult),
910 CopyRegionFromFinished(CopyRegionFromFinished),
912}
913
914#[derive(Debug)]
916pub(crate) struct FlushFinished {
917 pub(crate) region_id: RegionId,
919 pub(crate) flushed_entry_id: EntryId,
921 pub(crate) senders: Vec<OutputTx>,
923 pub(crate) _timer: HistogramTimer,
925 pub(crate) edit: RegionEdit,
927 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
929 pub(crate) is_staging: bool,
931 pub(crate) flush_reason: FlushReason,
933}
934
935impl FlushFinished {
936 pub(crate) fn on_success(self) {
938 for sender in self.senders {
939 sender.send(Ok(0));
940 }
941 }
942}
943
944impl OnFailure for FlushFinished {
945 fn on_failure(&mut self, err: Error) {
946 let err = Arc::new(err);
947 for sender in self.senders.drain(..) {
948 sender.send(Err(err.clone()).context(FlushRegionSnafu {
949 region_id: self.region_id,
950 }));
951 }
952 }
953}
954
955#[derive(Debug)]
957pub(crate) struct FlushFailed {
958 pub(crate) err: Arc<Error>,
960}
961
962#[derive(Debug)]
963pub(crate) struct IndexBuildFinished {
964 #[allow(dead_code)]
965 pub(crate) region_id: RegionId,
966 pub(crate) edit: RegionEdit,
967}
968
969#[derive(Debug)]
971pub(crate) struct IndexBuildStopped {
972 #[allow(dead_code)]
973 pub(crate) region_id: RegionId,
974 pub(crate) file_id: FileId,
975}
976
977#[derive(Debug)]
979pub(crate) struct IndexBuildFailed {
980 pub(crate) err: Arc<Error>,
981}
982
983#[derive(Debug)]
985pub(crate) struct CompactionFinished {
986 pub(crate) region_id: RegionId,
988 pub(crate) senders: Vec<OutputTx>,
990 pub(crate) start_time: Instant,
992 pub(crate) edit: RegionEdit,
994}
995
996#[derive(Debug)]
998pub(crate) struct CompactionCancelled {
999 pub(crate) region_id: RegionId,
1001 pub(crate) senders: Vec<OutputTx>,
1003}
1004
1005impl CompactionCancelled {
1006 pub(crate) fn on_success(self) {
1007 for sender in self.senders {
1008 sender.send(CompactionCancelledSnafu {}.fail());
1009 }
1010 info!("Compaction cancelled for region: {}", self.region_id);
1011 }
1012}
1013
1014impl CompactionFinished {
1015 pub fn on_success(self) {
1016 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
1018
1019 for sender in self.senders {
1020 sender.send(Ok(0));
1021 }
1022 info!("Successfully compacted region: {}", self.region_id);
1023 }
1024}
1025
1026impl OnFailure for CompactionFinished {
1027 fn on_failure(&mut self, err: Error) {
1029 let err = Arc::new(err);
1030 for sender in self.senders.drain(..) {
1031 sender.send(Err(err.clone()).context(CompactRegionSnafu {
1032 region_id: self.region_id,
1033 }));
1034 }
1035 }
1036}
1037
1038#[derive(Debug)]
1040pub(crate) struct CompactionFailed {
1041 pub(crate) region_id: RegionId,
1042 pub(crate) err: Arc<Error>,
1044}
1045
1046#[derive(Debug)]
1048pub(crate) struct TruncateResult {
1049 pub(crate) region_id: RegionId,
1051 pub(crate) sender: OptionOutputTx,
1053 pub(crate) result: Result<()>,
1055 pub(crate) kind: TruncateKind,
1056}
1057
1058#[derive(Debug)]
1060pub(crate) struct RegionChangeResult {
1061 pub(crate) region_id: RegionId,
1063 pub(crate) new_meta: RegionMetadataRef,
1065 pub(crate) sender: OptionOutputTx,
1067 pub(crate) result: Result<()>,
1069 pub(crate) need_index: bool,
1071 pub(crate) new_options: Option<RegionOptions>,
1073}
1074
1075#[derive(Debug)]
1077pub(crate) struct EnterStagingResult {
1078 pub(crate) region_id: RegionId,
1080 pub(crate) partition_directive: StagingPartitionDirective,
1082 pub(crate) sender: OptionOutputTx,
1084 pub(crate) result: Result<()>,
1086}
1087
1088#[derive(Debug)]
1089pub(crate) struct CopyRegionFromFinished {
1090 pub(crate) region_id: RegionId,
1092 pub(crate) edit: RegionEdit,
1094 pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1096}
1097
1098#[derive(Debug)]
1100pub(crate) struct RegionEditRequest {
1101 pub(crate) region_id: RegionId,
1102 pub(crate) edit: RegionEdit,
1103 pub(crate) tx: Sender<Result<()>>,
1105}
1106
1107#[derive(Debug)]
1109pub(crate) struct RegionEditResult {
1110 pub(crate) region_id: RegionId,
1112 pub(crate) sender: Sender<Result<()>>,
1114 pub(crate) edit: RegionEdit,
1116 pub(crate) result: Result<()>,
1118 pub(crate) update_region_state: bool,
1120 pub(crate) is_staging: bool,
1122}
1123
1124#[derive(Debug)]
1125pub(crate) struct BuildIndexRequest {
1126 pub(crate) region_id: RegionId,
1127 pub(crate) build_type: IndexBuildType,
1128 pub(crate) file_metas: Vec<FileMeta>,
1130}
1131
1132#[derive(Debug)]
1133pub(crate) struct RegionSyncRequest {
1134 pub(crate) region_id: RegionId,
1135 pub(crate) manifest_version: ManifestVersion,
1136 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
1138}
1139
1140#[derive(Debug)]
1141pub(crate) struct RemapManifestsRequest {
1142 pub(crate) region_id: RegionId,
1144 pub(crate) input_regions: Vec<RegionId>,
1146 pub(crate) region_mapping: HashMap<RegionId, Vec<RegionId>>,
1148 pub(crate) new_partition_exprs: HashMap<RegionId, PartitionExpr>,
1150 pub(crate) sender: Sender<Result<HashMap<RegionId, String>>>,
1154}
1155
1156#[derive(Debug)]
1157pub(crate) struct CopyRegionFromRequest {
1158 pub(crate) region_id: RegionId,
1160 pub(crate) source_region_id: RegionId,
1162 pub(crate) parallelism: usize,
1164 pub(crate) sender: Sender<Result<MitoCopyRegionFromResponse>>,
1166}
1167
1168#[cfg(test)]
1169mod tests {
1170 use api::v1::value::ValueData;
1171 use api::v1::{Row, SemanticType};
1172 use common_error::ext::ErrorExt;
1173 use common_error::status_code::StatusCode;
1174 use datatypes::prelude::ConcreteDataType;
1175 use datatypes::schema::ColumnDefaultConstraint;
1176 use mito_codec::test_util::i64_value;
1177 use store_api::metadata::RegionMetadataBuilder;
1178 use tokio::sync::oneshot;
1179
1180 use super::*;
1181 use crate::error::Error;
1182 use crate::test_util::ts_ms_value;
1183
1184 fn new_column_schema(
1185 name: &str,
1186 data_type: ColumnDataType,
1187 semantic_type: SemanticType,
1188 ) -> ColumnSchema {
1189 ColumnSchema {
1190 column_name: name.to_string(),
1191 datatype: data_type as i32,
1192 semantic_type: semantic_type as i32,
1193 ..Default::default()
1194 }
1195 }
1196
1197 fn check_invalid_request(err: &Error, expect: &str) {
1198 if let Error::InvalidRequest {
1199 region_id: _,
1200 reason,
1201 location: _,
1202 } = err
1203 {
1204 assert_eq!(reason, expect);
1205 } else {
1206 panic!("Unexpected error {err}")
1207 }
1208 }
1209
1210 #[test]
1211 fn test_write_request_duplicate_column() {
1212 let rows = Rows {
1213 schema: vec![
1214 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1215 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1216 ],
1217 rows: vec![],
1218 };
1219
1220 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1221 check_invalid_request(&err, "duplicate column c0");
1222 }
1223
1224 #[test]
1225 fn test_valid_write_request() {
1226 let rows = Rows {
1227 schema: vec![
1228 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1229 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1230 ],
1231 rows: vec![Row {
1232 values: vec![i64_value(1), i64_value(2)],
1233 }],
1234 };
1235
1236 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1237 assert_eq!(0, request.column_index_by_name("c0").unwrap());
1238 assert_eq!(1, request.column_index_by_name("c1").unwrap());
1239 assert_eq!(None, request.column_index_by_name("c2"));
1240 }
1241
1242 #[test]
1243 fn test_compaction_cancelled_sends_cancelled_error() {
1244 let (tx, rx) = oneshot::channel();
1245 let request = CompactionCancelled {
1246 region_id: RegionId::new(1, 1),
1247 senders: vec![OutputTx::new(tx)],
1248 };
1249
1250 request.on_success();
1251
1252 let err = rx.blocking_recv().unwrap().unwrap_err();
1253 assert!(matches!(err, Error::CompactionCancelled { .. }));
1254 assert_eq!(err.status_code(), StatusCode::Cancelled);
1255 }
1256
1257 #[test]
1258 fn test_write_request_column_num() {
1259 let rows = Rows {
1260 schema: vec![
1261 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1262 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1263 ],
1264 rows: vec![Row {
1265 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1266 }],
1267 };
1268
1269 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1270 check_invalid_request(&err, "row has 3 columns but schema has 2");
1271 }
1272
1273 fn new_region_metadata() -> RegionMetadata {
1274 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1275 builder
1276 .push_column_metadata(ColumnMetadata {
1277 column_schema: datatypes::schema::ColumnSchema::new(
1278 "ts",
1279 ConcreteDataType::timestamp_millisecond_datatype(),
1280 false,
1281 ),
1282 semantic_type: SemanticType::Timestamp,
1283 column_id: 1,
1284 })
1285 .push_column_metadata(ColumnMetadata {
1286 column_schema: datatypes::schema::ColumnSchema::new(
1287 "k0",
1288 ConcreteDataType::int64_datatype(),
1289 true,
1290 ),
1291 semantic_type: SemanticType::Tag,
1292 column_id: 2,
1293 })
1294 .primary_key(vec![2]);
1295 builder.build().unwrap()
1296 }
1297
1298 #[test]
1299 fn test_check_schema() {
1300 let rows = Rows {
1301 schema: vec![
1302 new_column_schema(
1303 "ts",
1304 ColumnDataType::TimestampMillisecond,
1305 SemanticType::Timestamp,
1306 ),
1307 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1308 ],
1309 rows: vec![Row {
1310 values: vec![ts_ms_value(1), i64_value(2)],
1311 }],
1312 };
1313 let metadata = new_region_metadata();
1314
1315 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1316 request.check_schema(&metadata).unwrap();
1317 }
1318
1319 #[test]
1320 fn test_column_type() {
1321 let rows = Rows {
1322 schema: vec![
1323 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1324 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1325 ],
1326 rows: vec![Row {
1327 values: vec![i64_value(1), i64_value(2)],
1328 }],
1329 };
1330 let metadata = new_region_metadata();
1331
1332 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1333 let err = request.check_schema(&metadata).unwrap_err();
1334 check_invalid_request(
1335 &err,
1336 "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1337 );
1338 }
1339
1340 #[test]
1341 fn test_semantic_type() {
1342 let rows = Rows {
1343 schema: vec![
1344 new_column_schema(
1345 "ts",
1346 ColumnDataType::TimestampMillisecond,
1347 SemanticType::Tag,
1348 ),
1349 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1350 ],
1351 rows: vec![Row {
1352 values: vec![ts_ms_value(1), i64_value(2)],
1353 }],
1354 };
1355 let metadata = new_region_metadata();
1356
1357 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1358 let err = request.check_schema(&metadata).unwrap_err();
1359 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1360 }
1361
1362 #[test]
1363 fn test_column_nullable() {
1364 let rows = Rows {
1365 schema: vec![
1366 new_column_schema(
1367 "ts",
1368 ColumnDataType::TimestampMillisecond,
1369 SemanticType::Timestamp,
1370 ),
1371 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1372 ],
1373 rows: vec![Row {
1374 values: vec![Value { value_data: None }, i64_value(2)],
1375 }],
1376 };
1377 let metadata = new_region_metadata();
1378
1379 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1380 let err = request.check_schema(&metadata).unwrap_err();
1381 check_invalid_request(&err, "column ts is not null but input has null");
1382 }
1383
1384 #[test]
1385 fn test_column_default() {
1386 let rows = Rows {
1387 schema: vec![new_column_schema(
1388 "k0",
1389 ColumnDataType::Int64,
1390 SemanticType::Tag,
1391 )],
1392 rows: vec![Row {
1393 values: vec![i64_value(1)],
1394 }],
1395 };
1396 let metadata = new_region_metadata();
1397
1398 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1399 let err = request.check_schema(&metadata).unwrap_err();
1400 check_invalid_request(&err, "missing column ts");
1401 }
1402
1403 #[test]
1404 fn test_unknown_column() {
1405 let rows = Rows {
1406 schema: vec![
1407 new_column_schema(
1408 "ts",
1409 ColumnDataType::TimestampMillisecond,
1410 SemanticType::Timestamp,
1411 ),
1412 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1413 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1414 ],
1415 rows: vec![Row {
1416 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1417 }],
1418 };
1419 let metadata = new_region_metadata();
1420
1421 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1422 let err = request.check_schema(&metadata).unwrap_err();
1423 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1424 }
1425
1426 #[test]
1427 fn test_fill_impure_columns_err() {
1428 let rows = Rows {
1429 schema: vec![new_column_schema(
1430 "k0",
1431 ColumnDataType::Int64,
1432 SemanticType::Tag,
1433 )],
1434 rows: vec![Row {
1435 values: vec![i64_value(1)],
1436 }],
1437 };
1438 let metadata = {
1439 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1440 builder
1441 .push_column_metadata(ColumnMetadata {
1442 column_schema: datatypes::schema::ColumnSchema::new(
1443 "ts",
1444 ConcreteDataType::timestamp_millisecond_datatype(),
1445 false,
1446 )
1447 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1448 "now()".to_string(),
1449 )))
1450 .unwrap(),
1451 semantic_type: SemanticType::Timestamp,
1452 column_id: 1,
1453 })
1454 .push_column_metadata(ColumnMetadata {
1455 column_schema: datatypes::schema::ColumnSchema::new(
1456 "k0",
1457 ConcreteDataType::int64_datatype(),
1458 true,
1459 ),
1460 semantic_type: SemanticType::Tag,
1461 column_id: 2,
1462 })
1463 .primary_key(vec![2]);
1464 builder.build().unwrap()
1465 };
1466
1467 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1468 let err = request.check_schema(&metadata).unwrap_err();
1469 assert!(err.is_fill_default());
1470 assert!(
1471 request
1472 .fill_missing_columns(&metadata)
1473 .unwrap_err()
1474 .to_string()
1475 .contains("unexpected impure default value with region_id")
1476 );
1477 }
1478
1479 #[test]
1480 fn test_fill_missing_columns() {
1481 let rows = Rows {
1482 schema: vec![new_column_schema(
1483 "ts",
1484 ColumnDataType::TimestampMillisecond,
1485 SemanticType::Timestamp,
1486 )],
1487 rows: vec![Row {
1488 values: vec![ts_ms_value(1)],
1489 }],
1490 };
1491 let metadata = new_region_metadata();
1492
1493 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1494 let err = request.check_schema(&metadata).unwrap_err();
1495 assert!(err.is_fill_default());
1496 request.fill_missing_columns(&metadata).unwrap();
1497
1498 let expect_rows = Rows {
1499 schema: vec![new_column_schema(
1500 "ts",
1501 ColumnDataType::TimestampMillisecond,
1502 SemanticType::Timestamp,
1503 )],
1504 rows: vec![Row {
1505 values: vec![ts_ms_value(1)],
1506 }],
1507 };
1508 assert_eq!(expect_rows, request.rows);
1509 }
1510
1511 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1512 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1513 builder
1514 .push_column_metadata(ColumnMetadata {
1515 column_schema: datatypes::schema::ColumnSchema::new(
1516 "ts",
1517 ConcreteDataType::timestamp_millisecond_datatype(),
1518 false,
1519 ),
1520 semantic_type: SemanticType::Timestamp,
1521 column_id: 1,
1522 })
1523 .push_column_metadata(ColumnMetadata {
1524 column_schema: datatypes::schema::ColumnSchema::new(
1525 "k0",
1526 ConcreteDataType::int64_datatype(),
1527 true,
1528 ),
1529 semantic_type: SemanticType::Tag,
1530 column_id: 2,
1531 })
1532 .primary_key(vec![2]);
1533 builder
1534 }
1535
1536 fn region_metadata_two_fields() -> RegionMetadata {
1537 let mut builder = builder_with_ts_tag();
1538 builder
1539 .push_column_metadata(ColumnMetadata {
1540 column_schema: datatypes::schema::ColumnSchema::new(
1541 "f0",
1542 ConcreteDataType::int64_datatype(),
1543 true,
1544 ),
1545 semantic_type: SemanticType::Field,
1546 column_id: 3,
1547 })
1548 .push_column_metadata(ColumnMetadata {
1550 column_schema: datatypes::schema::ColumnSchema::new(
1551 "f1",
1552 ConcreteDataType::int64_datatype(),
1553 false,
1554 )
1555 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1556 datatypes::value::Value::Int64(100),
1557 )))
1558 .unwrap(),
1559 semantic_type: SemanticType::Field,
1560 column_id: 4,
1561 });
1562 builder.build().unwrap()
1563 }
1564
1565 #[test]
1566 fn test_fill_missing_for_delete() {
1567 let rows = Rows {
1568 schema: vec![new_column_schema(
1569 "ts",
1570 ColumnDataType::TimestampMillisecond,
1571 SemanticType::Timestamp,
1572 )],
1573 rows: vec![Row {
1574 values: vec![ts_ms_value(1)],
1575 }],
1576 };
1577 let metadata = region_metadata_two_fields();
1578
1579 let mut request =
1580 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1581 let err = request.check_schema(&metadata).unwrap_err();
1582 check_invalid_request(&err, "delete requests need column k0");
1583 let err = request.fill_missing_columns(&metadata).unwrap_err();
1584 check_invalid_request(&err, "delete requests need column k0");
1585
1586 let rows = Rows {
1587 schema: vec![
1588 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1589 new_column_schema(
1590 "ts",
1591 ColumnDataType::TimestampMillisecond,
1592 SemanticType::Timestamp,
1593 ),
1594 ],
1595 rows: vec![Row {
1596 values: vec![i64_value(100), ts_ms_value(1)],
1597 }],
1598 };
1599 let mut request =
1600 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1601 let err = request.check_schema(&metadata).unwrap_err();
1602 assert!(err.is_fill_default());
1603 request.fill_missing_columns(&metadata).unwrap();
1604
1605 let expect_rows = Rows {
1606 schema: vec![
1607 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1608 new_column_schema(
1609 "ts",
1610 ColumnDataType::TimestampMillisecond,
1611 SemanticType::Timestamp,
1612 ),
1613 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1614 ],
1615 rows: vec![Row {
1617 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1618 }],
1619 };
1620 assert_eq!(expect_rows, request.rows);
1621 }
1622
1623 #[test]
1624 fn test_fill_missing_without_default_in_delete() {
1625 let mut builder = builder_with_ts_tag();
1626 builder
1627 .push_column_metadata(ColumnMetadata {
1629 column_schema: datatypes::schema::ColumnSchema::new(
1630 "f0",
1631 ConcreteDataType::int64_datatype(),
1632 true,
1633 ),
1634 semantic_type: SemanticType::Field,
1635 column_id: 3,
1636 })
1637 .push_column_metadata(ColumnMetadata {
1639 column_schema: datatypes::schema::ColumnSchema::new(
1640 "f1",
1641 ConcreteDataType::int64_datatype(),
1642 false,
1643 ),
1644 semantic_type: SemanticType::Field,
1645 column_id: 4,
1646 });
1647 let metadata = builder.build().unwrap();
1648
1649 let rows = Rows {
1650 schema: vec![
1651 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1652 new_column_schema(
1653 "ts",
1654 ColumnDataType::TimestampMillisecond,
1655 SemanticType::Timestamp,
1656 ),
1657 ],
1658 rows: vec![Row {
1660 values: vec![i64_value(100), ts_ms_value(1)],
1661 }],
1662 };
1663 let mut request =
1664 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1665 let err = request.check_schema(&metadata).unwrap_err();
1666 assert!(err.is_fill_default());
1667 request.fill_missing_columns(&metadata).unwrap();
1668
1669 let expect_rows = Rows {
1670 schema: vec![
1671 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1672 new_column_schema(
1673 "ts",
1674 ColumnDataType::TimestampMillisecond,
1675 SemanticType::Timestamp,
1676 ),
1677 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1678 ],
1679 rows: vec![Row {
1681 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1682 }],
1683 };
1684 assert_eq!(expect_rows, request.rows);
1685 }
1686
1687 #[test]
1688 fn test_no_default() {
1689 let rows = Rows {
1690 schema: vec![new_column_schema(
1691 "k0",
1692 ColumnDataType::Int64,
1693 SemanticType::Tag,
1694 )],
1695 rows: vec![Row {
1696 values: vec![i64_value(1)],
1697 }],
1698 };
1699 let metadata = new_region_metadata();
1700
1701 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1702 let err = request.fill_missing_columns(&metadata).unwrap_err();
1703 check_invalid_request(&err, "column ts does not have default value");
1704 }
1705
1706 #[test]
1707 fn test_missing_and_invalid() {
1708 let rows = Rows {
1710 schema: vec![
1711 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1712 new_column_schema(
1713 "ts",
1714 ColumnDataType::TimestampMillisecond,
1715 SemanticType::Timestamp,
1716 ),
1717 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1718 ],
1719 rows: vec![Row {
1720 values: vec![
1721 i64_value(100),
1722 ts_ms_value(1),
1723 Value {
1724 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1725 },
1726 ],
1727 }],
1728 };
1729 let metadata = region_metadata_two_fields();
1730
1731 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1732 let err = request.check_schema(&metadata).unwrap_err();
1733 check_invalid_request(
1734 &err,
1735 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1736 );
1737 }
1738
1739 #[test]
1740 fn test_write_request_metadata() {
1741 let rows = Rows {
1742 schema: vec![
1743 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1744 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1745 ],
1746 rows: vec![Row {
1747 values: vec![i64_value(1), i64_value(2)],
1748 }],
1749 };
1750
1751 let metadata = Arc::new(new_region_metadata());
1752 let request = WriteRequest::new(
1753 RegionId::new(1, 1),
1754 OpType::Put,
1755 rows,
1756 Some(metadata.clone()),
1757 )
1758 .unwrap();
1759
1760 assert!(request.region_metadata.is_some());
1761 assert_eq!(
1762 request.region_metadata.unwrap().region_id,
1763 RegionId::new(1, 1)
1764 );
1765 }
1766}