1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23 to_proto_value,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38 AffectedRows, RegionAlterRequest, RegionBulkInsertsRequest, RegionCatchupRequest,
39 RegionCloseRequest, RegionCompactRequest, RegionCreateRequest, RegionFlushRequest,
40 RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::RegionId;
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47 FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::{RegionEdit, TruncateKind};
50use crate::memtable::MemtableId;
51use crate::memtable::bulk::part::BulkPart;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::sst::file::FileMeta;
54use crate::sst::index::IndexBuildType;
55use crate::wal::EntryId;
56use crate::wal::entry_distributor::WalEntryReceiver;
57
58#[derive(Debug)]
60pub struct WriteRequest {
61 pub region_id: RegionId,
63 pub op_type: OpType,
65 pub rows: Rows,
67 pub name_to_index: HashMap<String, usize>,
69 pub has_null: Vec<bool>,
71 pub hint: Option<WriteHint>,
73 pub(crate) region_metadata: Option<RegionMetadataRef>,
75}
76
77impl WriteRequest {
78 pub fn new(
82 region_id: RegionId,
83 op_type: OpType,
84 rows: Rows,
85 region_metadata: Option<RegionMetadataRef>,
86 ) -> Result<WriteRequest> {
87 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
88 for (index, column) in rows.schema.iter().enumerate() {
89 ensure!(
90 name_to_index
91 .insert(column.column_name.clone(), index)
92 .is_none(),
93 InvalidRequestSnafu {
94 region_id,
95 reason: format!("duplicate column {}", column.column_name),
96 }
97 );
98 }
99
100 let mut has_null = vec![false; rows.schema.len()];
101 for row in &rows.rows {
102 ensure!(
103 row.values.len() == rows.schema.len(),
104 InvalidRequestSnafu {
105 region_id,
106 reason: format!(
107 "row has {} columns but schema has {}",
108 row.values.len(),
109 rows.schema.len()
110 ),
111 }
112 );
113
114 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
115 validate_proto_value(region_id, value, column_schema)?;
116
117 if value.value_data.is_none() {
118 has_null[i] = true;
119 }
120 }
121 }
122
123 Ok(WriteRequest {
124 region_id,
125 op_type,
126 rows,
127 name_to_index,
128 has_null,
129 hint: None,
130 region_metadata,
131 })
132 }
133
134 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
136 self.hint = hint;
137 self
138 }
139
140 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
142 infer_primary_key_encoding_from_hint(self.hint.as_ref())
143 }
144
145 pub(crate) fn estimated_size(&self) -> usize {
147 let row_size = self
148 .rows
149 .rows
150 .first()
151 .map(|row| row.encoded_len())
152 .unwrap_or(0);
153 row_size * self.rows.rows.len()
154 }
155
156 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
158 self.name_to_index.get(name).copied()
159 }
160
161 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
166 debug_assert_eq!(self.region_id, metadata.region_id);
167
168 let region_id = self.region_id;
169 let mut rows_columns: HashMap<_, _> = self
171 .rows
172 .schema
173 .iter()
174 .map(|column| (&column.column_name, column))
175 .collect();
176
177 let mut need_fill_default = false;
178 for column in &metadata.column_metadatas {
180 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
181 ensure!(
183 is_column_type_value_eq(
184 input_col.datatype,
185 input_col.datatype_extension.clone(),
186 &column.column_schema.data_type
187 ),
188 InvalidRequestSnafu {
189 region_id,
190 reason: format!(
191 "column {} expect type {:?}, given: {}({})",
192 column.column_schema.name,
193 column.column_schema.data_type,
194 ColumnDataType::try_from(input_col.datatype)
195 .map(|v| v.as_str_name())
196 .unwrap_or("Unknown"),
197 input_col.datatype,
198 )
199 }
200 );
201
202 ensure!(
204 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
205 InvalidRequestSnafu {
206 region_id,
207 reason: format!(
208 "column {} has semantic type {:?}, given: {}({})",
209 column.column_schema.name,
210 column.semantic_type,
211 api::v1::SemanticType::try_from(input_col.semantic_type)
212 .map(|v| v.as_str_name())
213 .unwrap_or("Unknown"),
214 input_col.semantic_type
215 ),
216 }
217 );
218
219 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
222 ensure!(
223 !has_null || column.column_schema.is_nullable(),
224 InvalidRequestSnafu {
225 region_id,
226 reason: format!(
227 "column {} is not null but input has null",
228 column.column_schema.name
229 ),
230 }
231 );
232 } else {
233 self.check_missing_column(column)?;
235
236 need_fill_default = true;
237 }
238 }
239
240 if !rows_columns.is_empty() {
242 let names: Vec<_> = rows_columns.into_keys().collect();
243 return InvalidRequestSnafu {
244 region_id,
245 reason: format!("unknown columns: {:?}", names),
246 }
247 .fail();
248 }
249
250 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
252
253 Ok(())
254 }
255
256 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
261 debug_assert_eq!(self.region_id, metadata.region_id);
262
263 let mut columns_to_fill = vec![];
264 for column in &metadata.column_metadatas {
265 if !self.name_to_index.contains_key(&column.column_schema.name) {
266 columns_to_fill.push(column);
267 }
268 }
269 self.fill_columns(columns_to_fill)?;
270
271 Ok(())
272 }
273
274 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
276 if let Err(e) = self.check_schema(metadata) {
277 if e.is_fill_default() {
278 self.fill_missing_columns(metadata)?;
282 } else {
283 return Err(e);
284 }
285 }
286
287 Ok(())
288 }
289
290 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
292 let mut default_values = Vec::with_capacity(columns.len());
293 let mut columns_to_fill = Vec::with_capacity(columns.len());
294 for column in columns {
295 let default_value = self.column_default_value(column)?;
296 if default_value.value_data.is_some() {
297 default_values.push(default_value);
298 columns_to_fill.push(column);
299 }
300 }
301
302 for row in &mut self.rows.rows {
303 row.values.extend(default_values.iter().cloned());
304 }
305
306 for column in columns_to_fill {
307 let (datatype, datatype_ext) =
308 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
309 .with_context(|_| ConvertColumnDataTypeSnafu {
310 reason: format!(
311 "no protobuf type for column {} ({:?})",
312 column.column_schema.name, column.column_schema.data_type
313 ),
314 })?
315 .to_parts();
316 self.rows.schema.push(ColumnSchema {
317 column_name: column.column_schema.name.clone(),
318 datatype: datatype as i32,
319 semantic_type: column.semantic_type as i32,
320 datatype_extension: datatype_ext,
321 options: options_from_column_schema(&column.column_schema),
322 });
323 }
324
325 Ok(())
326 }
327
328 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
330 if self.op_type == OpType::Delete {
331 if column.semantic_type == SemanticType::Field {
332 return Ok(());
335 } else {
336 return InvalidRequestSnafu {
337 region_id: self.region_id,
338 reason: format!("delete requests need column {}", column.column_schema.name),
339 }
340 .fail();
341 }
342 }
343
344 ensure!(
346 column.column_schema.is_nullable()
347 || column.column_schema.default_constraint().is_some(),
348 InvalidRequestSnafu {
349 region_id: self.region_id,
350 reason: format!("missing column {}", column.column_schema.name),
351 }
352 );
353
354 Ok(())
355 }
356
357 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
359 let default_value = match self.op_type {
360 OpType::Delete => {
361 ensure!(
362 column.semantic_type == SemanticType::Field,
363 InvalidRequestSnafu {
364 region_id: self.region_id,
365 reason: format!(
366 "delete requests need column {}",
367 column.column_schema.name
368 ),
369 }
370 );
371
372 if column.column_schema.is_nullable() {
377 datatypes::value::Value::Null
378 } else {
379 column.column_schema.data_type.default_value()
380 }
381 }
382 OpType::Put => {
383 if column.column_schema.is_default_impure() {
385 UnexpectedSnafu {
386 reason: format!(
387 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
388 self.region_id,
389 column.column_schema.name,
390 column.column_schema.default_constraint(),
391 ),
392 }
393 .fail()?
394 }
395 column
396 .column_schema
397 .create_default()
398 .context(CreateDefaultSnafu {
399 region_id: self.region_id,
400 column: &column.column_schema.name,
401 })?
402 .with_context(|| InvalidRequestSnafu {
404 region_id: self.region_id,
405 reason: format!(
406 "column {} does not have default value",
407 column.column_schema.name
408 ),
409 })?
410 }
411 };
412
413 Ok(to_proto_value(default_value))
415 }
416}
417
418pub(crate) fn validate_proto_value(
420 region_id: RegionId,
421 value: &Value,
422 column_schema: &ColumnSchema,
423) -> Result<()> {
424 if let Some(value_type) = proto_value_type(value) {
425 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
426 InvalidRequestSnafu {
427 region_id,
428 reason: format!(
429 "column {} has unknown type {}",
430 column_schema.column_name, column_schema.datatype
431 ),
432 }
433 .build()
434 })?;
435 ensure!(
436 proto_value_type_match(column_type, value_type),
437 InvalidRequestSnafu {
438 region_id,
439 reason: format!(
440 "value has type {:?}, but column {} has type {:?}({})",
441 value_type, column_schema.column_name, column_type, column_schema.datatype,
442 ),
443 }
444 );
445 }
446
447 Ok(())
448}
449
450fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
451 match (column_type, value_type) {
452 (ct, vt) if ct == vt => true,
453 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
454 (ColumnDataType::Json, ColumnDataType::Binary) => true,
455 _ => false,
456 }
457}
458
459#[derive(Debug)]
461pub struct OutputTx(Sender<Result<AffectedRows>>);
462
463impl OutputTx {
464 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
466 OutputTx(sender)
467 }
468
469 pub(crate) fn send(self, result: Result<AffectedRows>) {
471 let _ = self.0.send(result);
473 }
474}
475
476#[derive(Debug)]
478pub(crate) struct OptionOutputTx(Option<OutputTx>);
479
480impl OptionOutputTx {
481 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
483 OptionOutputTx(sender)
484 }
485
486 pub(crate) fn none() -> OptionOutputTx {
488 OptionOutputTx(None)
489 }
490
491 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
493 if let Some(sender) = self.0.take() {
494 sender.send(result);
495 }
496 }
497
498 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
500 if let Some(sender) = self.0.take() {
501 sender.send(result);
502 }
503 }
504
505 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
507 self.0.take()
508 }
509}
510
511impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
512 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
513 Self::new(Some(OutputTx::new(sender)))
514 }
515}
516
517impl OnFailure for OptionOutputTx {
518 fn on_failure(&mut self, err: Error) {
519 self.send_mut(Err(err));
520 }
521}
522
523pub(crate) trait OnFailure {
525 fn on_failure(&mut self, err: Error);
527}
528
529#[derive(Debug)]
531pub(crate) struct SenderWriteRequest {
532 pub(crate) sender: OptionOutputTx,
534 pub(crate) request: WriteRequest,
535}
536
537pub(crate) struct SenderBulkRequest {
538 pub(crate) sender: OptionOutputTx,
539 pub(crate) region_id: RegionId,
540 pub(crate) request: BulkPart,
541 pub(crate) region_metadata: RegionMetadataRef,
542}
543
544#[derive(Debug)]
546pub(crate) struct WorkerRequestWithTime {
547 pub(crate) request: WorkerRequest,
548 pub(crate) created_at: Instant,
549}
550
551impl WorkerRequestWithTime {
552 pub(crate) fn new(request: WorkerRequest) -> Self {
553 Self {
554 request,
555 created_at: Instant::now(),
556 }
557 }
558}
559
560#[derive(Debug)]
562pub(crate) enum WorkerRequest {
563 Write(SenderWriteRequest),
565
566 Ddl(SenderDdlRequest),
568
569 Background {
571 region_id: RegionId,
573 notify: BackgroundNotify,
575 },
576
577 SetRegionRoleStateGracefully {
579 region_id: RegionId,
581 region_role_state: SettableRegionRoleState,
583 sender: Sender<SetRegionRoleStateResponse>,
585 },
586
587 Stop,
589
590 EditRegion(RegionEditRequest),
592
593 SyncRegion(RegionSyncRequest),
595
596 #[allow(dead_code)]
598 BuildIndexRegion(RegionBuildIndexRequest),
599
600 BulkInserts {
602 metadata: Option<RegionMetadataRef>,
603 request: RegionBulkInsertsRequest,
604 sender: OptionOutputTx,
605 },
606}
607
608impl WorkerRequest {
609 pub(crate) fn new_open_region_request(
610 region_id: RegionId,
611 request: RegionOpenRequest,
612 entry_receiver: Option<WalEntryReceiver>,
613 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
614 let (sender, receiver) = oneshot::channel();
615
616 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
617 region_id,
618 sender: sender.into(),
619 request: DdlRequest::Open((request, entry_receiver)),
620 });
621
622 (worker_request, receiver)
623 }
624
625 pub(crate) fn try_from_region_request(
627 region_id: RegionId,
628 value: RegionRequest,
629 region_metadata: Option<RegionMetadataRef>,
630 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
631 let (sender, receiver) = oneshot::channel();
632 let worker_request = match value {
633 RegionRequest::Put(v) => {
634 let mut write_request =
635 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
636 .with_hint(v.hint);
637 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
638 && let Some(region_metadata) = ®ion_metadata
639 {
640 write_request.maybe_fill_missing_columns(region_metadata)?;
641 }
642 WorkerRequest::Write(SenderWriteRequest {
643 sender: sender.into(),
644 request: write_request,
645 })
646 }
647 RegionRequest::Delete(v) => {
648 let mut write_request =
649 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
650 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
651 && let Some(region_metadata) = ®ion_metadata
652 {
653 write_request.maybe_fill_missing_columns(region_metadata)?;
654 }
655 WorkerRequest::Write(SenderWriteRequest {
656 sender: sender.into(),
657 request: write_request,
658 })
659 }
660 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
661 region_id,
662 sender: sender.into(),
663 request: DdlRequest::Create(v),
664 }),
665 RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
666 region_id,
667 sender: sender.into(),
668 request: DdlRequest::Drop,
669 }),
670 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
671 region_id,
672 sender: sender.into(),
673 request: DdlRequest::Open((v, None)),
674 }),
675 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
676 region_id,
677 sender: sender.into(),
678 request: DdlRequest::Close(v),
679 }),
680 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
681 region_id,
682 sender: sender.into(),
683 request: DdlRequest::Alter(v),
684 }),
685 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
686 region_id,
687 sender: sender.into(),
688 request: DdlRequest::Flush(v),
689 }),
690 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
691 region_id,
692 sender: sender.into(),
693 request: DdlRequest::Compact(v),
694 }),
695 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
696 region_id,
697 sender: sender.into(),
698 request: DdlRequest::Truncate(v),
699 }),
700 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
701 region_id,
702 sender: sender.into(),
703 request: DdlRequest::Catchup(v),
704 }),
705 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
706 metadata: region_metadata,
707 sender: sender.into(),
708 request: region_bulk_inserts_request,
709 },
710 };
711
712 Ok((worker_request, receiver))
713 }
714
715 pub(crate) fn new_set_readonly_gracefully(
716 region_id: RegionId,
717 region_role_state: SettableRegionRoleState,
718 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
719 let (sender, receiver) = oneshot::channel();
720
721 (
722 WorkerRequest::SetRegionRoleStateGracefully {
723 region_id,
724 region_role_state,
725 sender,
726 },
727 receiver,
728 )
729 }
730
731 pub(crate) fn new_sync_region_request(
732 region_id: RegionId,
733 manifest_version: ManifestVersion,
734 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
735 let (sender, receiver) = oneshot::channel();
736 (
737 WorkerRequest::SyncRegion(RegionSyncRequest {
738 region_id,
739 manifest_version,
740 sender,
741 }),
742 receiver,
743 )
744 }
745}
746
747#[derive(Debug)]
749pub(crate) enum DdlRequest {
750 Create(RegionCreateRequest),
751 Drop,
752 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
753 Close(RegionCloseRequest),
754 Alter(RegionAlterRequest),
755 Flush(RegionFlushRequest),
756 Compact(RegionCompactRequest),
757 Truncate(RegionTruncateRequest),
758 Catchup(RegionCatchupRequest),
759}
760
761#[derive(Debug)]
763pub(crate) struct SenderDdlRequest {
764 pub(crate) region_id: RegionId,
766 pub(crate) sender: OptionOutputTx,
768 pub(crate) request: DdlRequest,
770}
771
772#[derive(Debug)]
774pub(crate) enum BackgroundNotify {
775 FlushFinished(FlushFinished),
777 FlushFailed(FlushFailed),
779 IndexBuildFinished(IndexBuildFinished),
781 #[allow(dead_code)]
783 IndexBuildFailed(IndexBuildFailed),
784 CompactionFinished(CompactionFinished),
786 CompactionFailed(CompactionFailed),
788 Truncate(TruncateResult),
790 RegionChange(RegionChangeResult),
792 RegionEdit(RegionEditResult),
794}
795
796#[derive(Debug)]
798pub(crate) struct FlushFinished {
799 pub(crate) region_id: RegionId,
801 pub(crate) flushed_entry_id: EntryId,
803 pub(crate) senders: Vec<OutputTx>,
805 pub(crate) _timer: HistogramTimer,
807 pub(crate) edit: RegionEdit,
809 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
811}
812
813impl FlushFinished {
814 pub(crate) fn on_success(self) {
816 for sender in self.senders {
817 sender.send(Ok(0));
818 }
819 }
820}
821
822impl OnFailure for FlushFinished {
823 fn on_failure(&mut self, err: Error) {
824 let err = Arc::new(err);
825 for sender in self.senders.drain(..) {
826 sender.send(Err(err.clone()).context(FlushRegionSnafu {
827 region_id: self.region_id,
828 }));
829 }
830 }
831}
832
833#[derive(Debug)]
835pub(crate) struct FlushFailed {
836 pub(crate) err: Arc<Error>,
838}
839
840#[derive(Debug)]
841pub(crate) struct IndexBuildFinished {
842 #[allow(dead_code)]
843 pub(crate) region_id: RegionId,
844 pub(crate) edit: RegionEdit,
845}
846
847#[derive(Debug)]
849pub(crate) struct IndexBuildFailed {
850 #[allow(dead_code)]
851 pub(crate) err: Arc<Error>,
852}
853
854#[derive(Debug)]
856pub(crate) struct CompactionFinished {
857 pub(crate) region_id: RegionId,
859 pub(crate) senders: Vec<OutputTx>,
861 pub(crate) start_time: Instant,
863 pub(crate) edit: RegionEdit,
865}
866
867impl CompactionFinished {
868 pub fn on_success(self) {
869 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
871
872 for sender in self.senders {
873 sender.send(Ok(0));
874 }
875 info!("Successfully compacted region: {}", self.region_id);
876 }
877}
878
879impl OnFailure for CompactionFinished {
880 fn on_failure(&mut self, err: Error) {
882 let err = Arc::new(err);
883 for sender in self.senders.drain(..) {
884 sender.send(Err(err.clone()).context(CompactRegionSnafu {
885 region_id: self.region_id,
886 }));
887 }
888 }
889}
890
891#[derive(Debug)]
893pub(crate) struct CompactionFailed {
894 pub(crate) region_id: RegionId,
895 pub(crate) err: Arc<Error>,
897}
898
899#[derive(Debug)]
901pub(crate) struct TruncateResult {
902 pub(crate) region_id: RegionId,
904 pub(crate) sender: OptionOutputTx,
906 pub(crate) result: Result<()>,
908 pub(crate) kind: TruncateKind,
909}
910
911#[derive(Debug)]
913pub(crate) struct RegionChangeResult {
914 pub(crate) region_id: RegionId,
916 pub(crate) new_meta: RegionMetadataRef,
918 pub(crate) sender: OptionOutputTx,
920 pub(crate) result: Result<()>,
922}
923
924#[derive(Debug)]
926pub(crate) struct RegionEditRequest {
927 pub(crate) region_id: RegionId,
928 pub(crate) edit: RegionEdit,
929 pub(crate) tx: Sender<Result<()>>,
931}
932
933#[derive(Debug)]
935pub(crate) struct RegionEditResult {
936 pub(crate) region_id: RegionId,
938 pub(crate) sender: Sender<Result<()>>,
940 pub(crate) edit: RegionEdit,
942 pub(crate) result: Result<()>,
944}
945
946#[derive(Debug)]
947pub(crate) struct RegionBuildIndexRequest {
948 pub(crate) region_id: RegionId,
949 pub(crate) build_type: IndexBuildType,
950 pub(crate) file_metas: Vec<FileMeta>,
952}
953
954#[derive(Debug)]
955pub(crate) struct RegionSyncRequest {
956 pub(crate) region_id: RegionId,
957 pub(crate) manifest_version: ManifestVersion,
958 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
960}
961
962#[cfg(test)]
963mod tests {
964 use api::v1::value::ValueData;
965 use api::v1::{Row, SemanticType};
966 use datatypes::prelude::ConcreteDataType;
967 use datatypes::schema::ColumnDefaultConstraint;
968 use mito_codec::test_util::i64_value;
969 use store_api::metadata::RegionMetadataBuilder;
970
971 use super::*;
972 use crate::error::Error;
973 use crate::test_util::ts_ms_value;
974
975 fn new_column_schema(
976 name: &str,
977 data_type: ColumnDataType,
978 semantic_type: SemanticType,
979 ) -> ColumnSchema {
980 ColumnSchema {
981 column_name: name.to_string(),
982 datatype: data_type as i32,
983 semantic_type: semantic_type as i32,
984 ..Default::default()
985 }
986 }
987
988 fn check_invalid_request(err: &Error, expect: &str) {
989 if let Error::InvalidRequest {
990 region_id: _,
991 reason,
992 location: _,
993 } = err
994 {
995 assert_eq!(reason, expect);
996 } else {
997 panic!("Unexpected error {err}")
998 }
999 }
1000
1001 #[test]
1002 fn test_write_request_duplicate_column() {
1003 let rows = Rows {
1004 schema: vec![
1005 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1006 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1007 ],
1008 rows: vec![],
1009 };
1010
1011 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1012 check_invalid_request(&err, "duplicate column c0");
1013 }
1014
1015 #[test]
1016 fn test_valid_write_request() {
1017 let rows = Rows {
1018 schema: vec![
1019 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1020 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1021 ],
1022 rows: vec![Row {
1023 values: vec![i64_value(1), i64_value(2)],
1024 }],
1025 };
1026
1027 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1028 assert_eq!(0, request.column_index_by_name("c0").unwrap());
1029 assert_eq!(1, request.column_index_by_name("c1").unwrap());
1030 assert_eq!(None, request.column_index_by_name("c2"));
1031 }
1032
1033 #[test]
1034 fn test_write_request_column_num() {
1035 let rows = Rows {
1036 schema: vec![
1037 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1038 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1039 ],
1040 rows: vec![Row {
1041 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1042 }],
1043 };
1044
1045 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1046 check_invalid_request(&err, "row has 3 columns but schema has 2");
1047 }
1048
1049 fn new_region_metadata() -> RegionMetadata {
1050 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1051 builder
1052 .push_column_metadata(ColumnMetadata {
1053 column_schema: datatypes::schema::ColumnSchema::new(
1054 "ts",
1055 ConcreteDataType::timestamp_millisecond_datatype(),
1056 false,
1057 ),
1058 semantic_type: SemanticType::Timestamp,
1059 column_id: 1,
1060 })
1061 .push_column_metadata(ColumnMetadata {
1062 column_schema: datatypes::schema::ColumnSchema::new(
1063 "k0",
1064 ConcreteDataType::int64_datatype(),
1065 true,
1066 ),
1067 semantic_type: SemanticType::Tag,
1068 column_id: 2,
1069 })
1070 .primary_key(vec![2]);
1071 builder.build().unwrap()
1072 }
1073
1074 #[test]
1075 fn test_check_schema() {
1076 let rows = Rows {
1077 schema: vec![
1078 new_column_schema(
1079 "ts",
1080 ColumnDataType::TimestampMillisecond,
1081 SemanticType::Timestamp,
1082 ),
1083 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1084 ],
1085 rows: vec![Row {
1086 values: vec![ts_ms_value(1), i64_value(2)],
1087 }],
1088 };
1089 let metadata = new_region_metadata();
1090
1091 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1092 request.check_schema(&metadata).unwrap();
1093 }
1094
1095 #[test]
1096 fn test_column_type() {
1097 let rows = Rows {
1098 schema: vec![
1099 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1100 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1101 ],
1102 rows: vec![Row {
1103 values: vec![i64_value(1), i64_value(2)],
1104 }],
1105 };
1106 let metadata = new_region_metadata();
1107
1108 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1109 let err = request.check_schema(&metadata).unwrap_err();
1110 check_invalid_request(
1111 &err,
1112 "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1113 );
1114 }
1115
1116 #[test]
1117 fn test_semantic_type() {
1118 let rows = Rows {
1119 schema: vec![
1120 new_column_schema(
1121 "ts",
1122 ColumnDataType::TimestampMillisecond,
1123 SemanticType::Tag,
1124 ),
1125 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1126 ],
1127 rows: vec![Row {
1128 values: vec![ts_ms_value(1), i64_value(2)],
1129 }],
1130 };
1131 let metadata = new_region_metadata();
1132
1133 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1134 let err = request.check_schema(&metadata).unwrap_err();
1135 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1136 }
1137
1138 #[test]
1139 fn test_column_nullable() {
1140 let rows = Rows {
1141 schema: vec![
1142 new_column_schema(
1143 "ts",
1144 ColumnDataType::TimestampMillisecond,
1145 SemanticType::Timestamp,
1146 ),
1147 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1148 ],
1149 rows: vec![Row {
1150 values: vec![Value { value_data: None }, i64_value(2)],
1151 }],
1152 };
1153 let metadata = new_region_metadata();
1154
1155 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1156 let err = request.check_schema(&metadata).unwrap_err();
1157 check_invalid_request(&err, "column ts is not null but input has null");
1158 }
1159
1160 #[test]
1161 fn test_column_default() {
1162 let rows = Rows {
1163 schema: vec![new_column_schema(
1164 "k0",
1165 ColumnDataType::Int64,
1166 SemanticType::Tag,
1167 )],
1168 rows: vec![Row {
1169 values: vec![i64_value(1)],
1170 }],
1171 };
1172 let metadata = new_region_metadata();
1173
1174 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1175 let err = request.check_schema(&metadata).unwrap_err();
1176 check_invalid_request(&err, "missing column ts");
1177 }
1178
1179 #[test]
1180 fn test_unknown_column() {
1181 let rows = Rows {
1182 schema: vec![
1183 new_column_schema(
1184 "ts",
1185 ColumnDataType::TimestampMillisecond,
1186 SemanticType::Timestamp,
1187 ),
1188 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1189 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1190 ],
1191 rows: vec![Row {
1192 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1193 }],
1194 };
1195 let metadata = new_region_metadata();
1196
1197 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1198 let err = request.check_schema(&metadata).unwrap_err();
1199 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1200 }
1201
1202 #[test]
1203 fn test_fill_impure_columns_err() {
1204 let rows = Rows {
1205 schema: vec![new_column_schema(
1206 "k0",
1207 ColumnDataType::Int64,
1208 SemanticType::Tag,
1209 )],
1210 rows: vec![Row {
1211 values: vec![i64_value(1)],
1212 }],
1213 };
1214 let metadata = {
1215 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1216 builder
1217 .push_column_metadata(ColumnMetadata {
1218 column_schema: datatypes::schema::ColumnSchema::new(
1219 "ts",
1220 ConcreteDataType::timestamp_millisecond_datatype(),
1221 false,
1222 )
1223 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1224 "now()".to_string(),
1225 )))
1226 .unwrap(),
1227 semantic_type: SemanticType::Timestamp,
1228 column_id: 1,
1229 })
1230 .push_column_metadata(ColumnMetadata {
1231 column_schema: datatypes::schema::ColumnSchema::new(
1232 "k0",
1233 ConcreteDataType::int64_datatype(),
1234 true,
1235 ),
1236 semantic_type: SemanticType::Tag,
1237 column_id: 2,
1238 })
1239 .primary_key(vec![2]);
1240 builder.build().unwrap()
1241 };
1242
1243 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1244 let err = request.check_schema(&metadata).unwrap_err();
1245 assert!(err.is_fill_default());
1246 assert!(
1247 request
1248 .fill_missing_columns(&metadata)
1249 .unwrap_err()
1250 .to_string()
1251 .contains("unexpected impure default value with region_id")
1252 );
1253 }
1254
1255 #[test]
1256 fn test_fill_missing_columns() {
1257 let rows = Rows {
1258 schema: vec![new_column_schema(
1259 "ts",
1260 ColumnDataType::TimestampMillisecond,
1261 SemanticType::Timestamp,
1262 )],
1263 rows: vec![Row {
1264 values: vec![ts_ms_value(1)],
1265 }],
1266 };
1267 let metadata = new_region_metadata();
1268
1269 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1270 let err = request.check_schema(&metadata).unwrap_err();
1271 assert!(err.is_fill_default());
1272 request.fill_missing_columns(&metadata).unwrap();
1273
1274 let expect_rows = Rows {
1275 schema: vec![new_column_schema(
1276 "ts",
1277 ColumnDataType::TimestampMillisecond,
1278 SemanticType::Timestamp,
1279 )],
1280 rows: vec![Row {
1281 values: vec![ts_ms_value(1)],
1282 }],
1283 };
1284 assert_eq!(expect_rows, request.rows);
1285 }
1286
1287 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1288 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1289 builder
1290 .push_column_metadata(ColumnMetadata {
1291 column_schema: datatypes::schema::ColumnSchema::new(
1292 "ts",
1293 ConcreteDataType::timestamp_millisecond_datatype(),
1294 false,
1295 ),
1296 semantic_type: SemanticType::Timestamp,
1297 column_id: 1,
1298 })
1299 .push_column_metadata(ColumnMetadata {
1300 column_schema: datatypes::schema::ColumnSchema::new(
1301 "k0",
1302 ConcreteDataType::int64_datatype(),
1303 true,
1304 ),
1305 semantic_type: SemanticType::Tag,
1306 column_id: 2,
1307 })
1308 .primary_key(vec![2]);
1309 builder
1310 }
1311
1312 fn region_metadata_two_fields() -> RegionMetadata {
1313 let mut builder = builder_with_ts_tag();
1314 builder
1315 .push_column_metadata(ColumnMetadata {
1316 column_schema: datatypes::schema::ColumnSchema::new(
1317 "f0",
1318 ConcreteDataType::int64_datatype(),
1319 true,
1320 ),
1321 semantic_type: SemanticType::Field,
1322 column_id: 3,
1323 })
1324 .push_column_metadata(ColumnMetadata {
1326 column_schema: datatypes::schema::ColumnSchema::new(
1327 "f1",
1328 ConcreteDataType::int64_datatype(),
1329 false,
1330 )
1331 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1332 datatypes::value::Value::Int64(100),
1333 )))
1334 .unwrap(),
1335 semantic_type: SemanticType::Field,
1336 column_id: 4,
1337 });
1338 builder.build().unwrap()
1339 }
1340
1341 #[test]
1342 fn test_fill_missing_for_delete() {
1343 let rows = Rows {
1344 schema: vec![new_column_schema(
1345 "ts",
1346 ColumnDataType::TimestampMillisecond,
1347 SemanticType::Timestamp,
1348 )],
1349 rows: vec![Row {
1350 values: vec![ts_ms_value(1)],
1351 }],
1352 };
1353 let metadata = region_metadata_two_fields();
1354
1355 let mut request =
1356 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1357 let err = request.check_schema(&metadata).unwrap_err();
1358 check_invalid_request(&err, "delete requests need column k0");
1359 let err = request.fill_missing_columns(&metadata).unwrap_err();
1360 check_invalid_request(&err, "delete requests need column k0");
1361
1362 let rows = Rows {
1363 schema: vec![
1364 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1365 new_column_schema(
1366 "ts",
1367 ColumnDataType::TimestampMillisecond,
1368 SemanticType::Timestamp,
1369 ),
1370 ],
1371 rows: vec![Row {
1372 values: vec![i64_value(100), ts_ms_value(1)],
1373 }],
1374 };
1375 let mut request =
1376 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1377 let err = request.check_schema(&metadata).unwrap_err();
1378 assert!(err.is_fill_default());
1379 request.fill_missing_columns(&metadata).unwrap();
1380
1381 let expect_rows = Rows {
1382 schema: vec![
1383 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1384 new_column_schema(
1385 "ts",
1386 ColumnDataType::TimestampMillisecond,
1387 SemanticType::Timestamp,
1388 ),
1389 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1390 ],
1391 rows: vec![Row {
1393 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1394 }],
1395 };
1396 assert_eq!(expect_rows, request.rows);
1397 }
1398
1399 #[test]
1400 fn test_fill_missing_without_default_in_delete() {
1401 let mut builder = builder_with_ts_tag();
1402 builder
1403 .push_column_metadata(ColumnMetadata {
1405 column_schema: datatypes::schema::ColumnSchema::new(
1406 "f0",
1407 ConcreteDataType::int64_datatype(),
1408 true,
1409 ),
1410 semantic_type: SemanticType::Field,
1411 column_id: 3,
1412 })
1413 .push_column_metadata(ColumnMetadata {
1415 column_schema: datatypes::schema::ColumnSchema::new(
1416 "f1",
1417 ConcreteDataType::int64_datatype(),
1418 false,
1419 ),
1420 semantic_type: SemanticType::Field,
1421 column_id: 4,
1422 });
1423 let metadata = builder.build().unwrap();
1424
1425 let rows = Rows {
1426 schema: vec![
1427 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1428 new_column_schema(
1429 "ts",
1430 ColumnDataType::TimestampMillisecond,
1431 SemanticType::Timestamp,
1432 ),
1433 ],
1434 rows: vec![Row {
1436 values: vec![i64_value(100), ts_ms_value(1)],
1437 }],
1438 };
1439 let mut request =
1440 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1441 let err = request.check_schema(&metadata).unwrap_err();
1442 assert!(err.is_fill_default());
1443 request.fill_missing_columns(&metadata).unwrap();
1444
1445 let expect_rows = Rows {
1446 schema: vec![
1447 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1448 new_column_schema(
1449 "ts",
1450 ColumnDataType::TimestampMillisecond,
1451 SemanticType::Timestamp,
1452 ),
1453 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1454 ],
1455 rows: vec![Row {
1457 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1458 }],
1459 };
1460 assert_eq!(expect_rows, request.rows);
1461 }
1462
1463 #[test]
1464 fn test_no_default() {
1465 let rows = Rows {
1466 schema: vec![new_column_schema(
1467 "k0",
1468 ColumnDataType::Int64,
1469 SemanticType::Tag,
1470 )],
1471 rows: vec![Row {
1472 values: vec![i64_value(1)],
1473 }],
1474 };
1475 let metadata = new_region_metadata();
1476
1477 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1478 let err = request.fill_missing_columns(&metadata).unwrap_err();
1479 check_invalid_request(&err, "column ts does not have default value");
1480 }
1481
1482 #[test]
1483 fn test_missing_and_invalid() {
1484 let rows = Rows {
1486 schema: vec![
1487 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1488 new_column_schema(
1489 "ts",
1490 ColumnDataType::TimestampMillisecond,
1491 SemanticType::Timestamp,
1492 ),
1493 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1494 ],
1495 rows: vec![Row {
1496 values: vec![
1497 i64_value(100),
1498 ts_ms_value(1),
1499 Value {
1500 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1501 },
1502 ],
1503 }],
1504 };
1505 let metadata = region_metadata_two_fields();
1506
1507 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1508 let err = request.check_schema(&metadata).unwrap_err();
1509 check_invalid_request(
1510 &err,
1511 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1512 );
1513 }
1514
1515 #[test]
1516 fn test_write_request_metadata() {
1517 let rows = Rows {
1518 schema: vec![
1519 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1520 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1521 ],
1522 rows: vec![Row {
1523 values: vec![i64_value(1), i64_value(2)],
1524 }],
1525 };
1526
1527 let metadata = Arc::new(new_region_metadata());
1528 let request = WriteRequest::new(
1529 RegionId::new(1, 1),
1530 OpType::Put,
1531 rows,
1532 Some(metadata.clone()),
1533 )
1534 .unwrap();
1535
1536 assert!(request.region_metadata.is_some());
1537 assert_eq!(
1538 request.region_metadata.unwrap().region_id,
1539 RegionId::new(1, 1)
1540 );
1541 }
1542}