1use std::collections::HashMap;
18use std::sync::Arc;
19use std::time::Instant;
20
21use api::helper::{
22 ColumnDataTypeWrapper, is_column_type_value_eq, is_semantic_type_eq, proto_value_type,
23 to_proto_value,
24};
25use api::v1::column_def::options_from_column_schema;
26use api::v1::{ColumnDataType, ColumnSchema, OpType, Rows, SemanticType, Value, WriteHint};
27use common_telemetry::info;
28use datatypes::prelude::DataType;
29use prometheus::HistogramTimer;
30use prost::Message;
31use smallvec::SmallVec;
32use snafu::{OptionExt, ResultExt, ensure};
33use store_api::ManifestVersion;
34use store_api::codec::{PrimaryKeyEncoding, infer_primary_key_encoding_from_hint};
35use store_api::metadata::{ColumnMetadata, RegionMetadata, RegionMetadataRef};
36use store_api::region_engine::{SetRegionRoleStateResponse, SettableRegionRoleState};
37use store_api::region_request::{
38 AffectedRows, RegionAlterRequest, RegionBuildIndexRequest, RegionBulkInsertsRequest,
39 RegionCatchupRequest, RegionCloseRequest, RegionCompactRequest, RegionCreateRequest,
40 RegionFlushRequest, RegionOpenRequest, RegionRequest, RegionTruncateRequest,
41};
42use store_api::storage::{FileId, RegionId};
43use tokio::sync::oneshot::{self, Receiver, Sender};
44
45use crate::error::{
46 CompactRegionSnafu, ConvertColumnDataTypeSnafu, CreateDefaultSnafu, Error, FillDefaultSnafu,
47 FlushRegionSnafu, InvalidRequestSnafu, Result, UnexpectedSnafu,
48};
49use crate::manifest::action::{RegionEdit, TruncateKind};
50use crate::memtable::MemtableId;
51use crate::memtable::bulk::part::BulkPart;
52use crate::metrics::COMPACTION_ELAPSED_TOTAL;
53use crate::sst::file::FileMeta;
54use crate::sst::index::IndexBuildType;
55use crate::wal::EntryId;
56use crate::wal::entry_distributor::WalEntryReceiver;
57
58#[derive(Debug)]
60pub struct WriteRequest {
61 pub region_id: RegionId,
63 pub op_type: OpType,
65 pub rows: Rows,
67 pub name_to_index: HashMap<String, usize>,
69 pub has_null: Vec<bool>,
71 pub hint: Option<WriteHint>,
73 pub(crate) region_metadata: Option<RegionMetadataRef>,
75}
76
77impl WriteRequest {
78 pub fn new(
82 region_id: RegionId,
83 op_type: OpType,
84 rows: Rows,
85 region_metadata: Option<RegionMetadataRef>,
86 ) -> Result<WriteRequest> {
87 let mut name_to_index = HashMap::with_capacity(rows.schema.len());
88 for (index, column) in rows.schema.iter().enumerate() {
89 ensure!(
90 name_to_index
91 .insert(column.column_name.clone(), index)
92 .is_none(),
93 InvalidRequestSnafu {
94 region_id,
95 reason: format!("duplicate column {}", column.column_name),
96 }
97 );
98 }
99
100 let mut has_null = vec![false; rows.schema.len()];
101 for row in &rows.rows {
102 ensure!(
103 row.values.len() == rows.schema.len(),
104 InvalidRequestSnafu {
105 region_id,
106 reason: format!(
107 "row has {} columns but schema has {}",
108 row.values.len(),
109 rows.schema.len()
110 ),
111 }
112 );
113
114 for (i, (value, column_schema)) in row.values.iter().zip(&rows.schema).enumerate() {
115 validate_proto_value(region_id, value, column_schema)?;
116
117 if value.value_data.is_none() {
118 has_null[i] = true;
119 }
120 }
121 }
122
123 Ok(WriteRequest {
124 region_id,
125 op_type,
126 rows,
127 name_to_index,
128 has_null,
129 hint: None,
130 region_metadata,
131 })
132 }
133
134 pub fn with_hint(mut self, hint: Option<WriteHint>) -> Self {
136 self.hint = hint;
137 self
138 }
139
140 pub fn primary_key_encoding(&self) -> PrimaryKeyEncoding {
142 infer_primary_key_encoding_from_hint(self.hint.as_ref())
143 }
144
145 pub(crate) fn estimated_size(&self) -> usize {
147 let row_size = self
148 .rows
149 .rows
150 .first()
151 .map(|row| row.encoded_len())
152 .unwrap_or(0);
153 row_size * self.rows.rows.len()
154 }
155
156 pub fn column_index_by_name(&self, name: &str) -> Option<usize> {
158 self.name_to_index.get(name).copied()
159 }
160
161 pub(crate) fn check_schema(&self, metadata: &RegionMetadata) -> Result<()> {
166 debug_assert_eq!(self.region_id, metadata.region_id);
167
168 let region_id = self.region_id;
169 let mut rows_columns: HashMap<_, _> = self
171 .rows
172 .schema
173 .iter()
174 .map(|column| (&column.column_name, column))
175 .collect();
176
177 let mut need_fill_default = false;
178 for column in &metadata.column_metadatas {
180 if let Some(input_col) = rows_columns.remove(&column.column_schema.name) {
181 ensure!(
183 is_column_type_value_eq(
184 input_col.datatype,
185 input_col.datatype_extension.clone(),
186 &column.column_schema.data_type
187 ),
188 InvalidRequestSnafu {
189 region_id,
190 reason: format!(
191 "column {} expect type {:?}, given: {}({})",
192 column.column_schema.name,
193 column.column_schema.data_type,
194 ColumnDataType::try_from(input_col.datatype)
195 .map(|v| v.as_str_name())
196 .unwrap_or("Unknown"),
197 input_col.datatype,
198 )
199 }
200 );
201
202 ensure!(
204 is_semantic_type_eq(input_col.semantic_type, column.semantic_type),
205 InvalidRequestSnafu {
206 region_id,
207 reason: format!(
208 "column {} has semantic type {:?}, given: {}({})",
209 column.column_schema.name,
210 column.semantic_type,
211 api::v1::SemanticType::try_from(input_col.semantic_type)
212 .map(|v| v.as_str_name())
213 .unwrap_or("Unknown"),
214 input_col.semantic_type
215 ),
216 }
217 );
218
219 let has_null = self.has_null[self.name_to_index[&column.column_schema.name]];
222 ensure!(
223 !has_null || column.column_schema.is_nullable(),
224 InvalidRequestSnafu {
225 region_id,
226 reason: format!(
227 "column {} is not null but input has null",
228 column.column_schema.name
229 ),
230 }
231 );
232 } else {
233 self.check_missing_column(column)?;
235
236 need_fill_default = true;
237 }
238 }
239
240 if !rows_columns.is_empty() {
242 let names: Vec<_> = rows_columns.into_keys().collect();
243 return InvalidRequestSnafu {
244 region_id,
245 reason: format!("unknown columns: {:?}", names),
246 }
247 .fail();
248 }
249
250 ensure!(!need_fill_default, FillDefaultSnafu { region_id });
252
253 Ok(())
254 }
255
256 pub(crate) fn fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
261 debug_assert_eq!(self.region_id, metadata.region_id);
262
263 let mut columns_to_fill = vec![];
264 for column in &metadata.column_metadatas {
265 if !self.name_to_index.contains_key(&column.column_schema.name) {
266 columns_to_fill.push(column);
267 }
268 }
269 self.fill_columns(columns_to_fill)?;
270
271 Ok(())
272 }
273
274 pub(crate) fn maybe_fill_missing_columns(&mut self, metadata: &RegionMetadata) -> Result<()> {
276 if let Err(e) = self.check_schema(metadata) {
277 if e.is_fill_default() {
278 self.fill_missing_columns(metadata)?;
282 } else {
283 return Err(e);
284 }
285 }
286
287 Ok(())
288 }
289
290 fn fill_columns(&mut self, columns: Vec<&ColumnMetadata>) -> Result<()> {
292 let mut default_values = Vec::with_capacity(columns.len());
293 let mut columns_to_fill = Vec::with_capacity(columns.len());
294 for column in columns {
295 let default_value = self.column_default_value(column)?;
296 if default_value.value_data.is_some() {
297 default_values.push(default_value);
298 columns_to_fill.push(column);
299 }
300 }
301
302 for row in &mut self.rows.rows {
303 row.values.extend(default_values.iter().cloned());
304 }
305
306 for column in columns_to_fill {
307 let (datatype, datatype_ext) =
308 ColumnDataTypeWrapper::try_from(column.column_schema.data_type.clone())
309 .with_context(|_| ConvertColumnDataTypeSnafu {
310 reason: format!(
311 "no protobuf type for column {} ({:?})",
312 column.column_schema.name, column.column_schema.data_type
313 ),
314 })?
315 .to_parts();
316 self.rows.schema.push(ColumnSchema {
317 column_name: column.column_schema.name.clone(),
318 datatype: datatype as i32,
319 semantic_type: column.semantic_type as i32,
320 datatype_extension: datatype_ext,
321 options: options_from_column_schema(&column.column_schema),
322 });
323 }
324
325 Ok(())
326 }
327
328 fn check_missing_column(&self, column: &ColumnMetadata) -> Result<()> {
330 if self.op_type == OpType::Delete {
331 if column.semantic_type == SemanticType::Field {
332 return Ok(());
335 } else {
336 return InvalidRequestSnafu {
337 region_id: self.region_id,
338 reason: format!("delete requests need column {}", column.column_schema.name),
339 }
340 .fail();
341 }
342 }
343
344 ensure!(
346 column.column_schema.is_nullable()
347 || column.column_schema.default_constraint().is_some(),
348 InvalidRequestSnafu {
349 region_id: self.region_id,
350 reason: format!("missing column {}", column.column_schema.name),
351 }
352 );
353
354 Ok(())
355 }
356
357 fn column_default_value(&self, column: &ColumnMetadata) -> Result<Value> {
359 let default_value = match self.op_type {
360 OpType::Delete => {
361 ensure!(
362 column.semantic_type == SemanticType::Field,
363 InvalidRequestSnafu {
364 region_id: self.region_id,
365 reason: format!(
366 "delete requests need column {}",
367 column.column_schema.name
368 ),
369 }
370 );
371
372 if column.column_schema.is_nullable() {
377 datatypes::value::Value::Null
378 } else {
379 column.column_schema.data_type.default_value()
380 }
381 }
382 OpType::Put => {
383 if column.column_schema.is_default_impure() {
385 UnexpectedSnafu {
386 reason: format!(
387 "unexpected impure default value with region_id: {}, column: {}, default_value: {:?}",
388 self.region_id,
389 column.column_schema.name,
390 column.column_schema.default_constraint(),
391 ),
392 }
393 .fail()?
394 }
395 column
396 .column_schema
397 .create_default()
398 .context(CreateDefaultSnafu {
399 region_id: self.region_id,
400 column: &column.column_schema.name,
401 })?
402 .with_context(|| InvalidRequestSnafu {
404 region_id: self.region_id,
405 reason: format!(
406 "column {} does not have default value",
407 column.column_schema.name
408 ),
409 })?
410 }
411 };
412
413 Ok(to_proto_value(default_value))
415 }
416}
417
418pub(crate) fn validate_proto_value(
420 region_id: RegionId,
421 value: &Value,
422 column_schema: &ColumnSchema,
423) -> Result<()> {
424 if let Some(value_type) = proto_value_type(value) {
425 let column_type = ColumnDataType::try_from(column_schema.datatype).map_err(|_| {
426 InvalidRequestSnafu {
427 region_id,
428 reason: format!(
429 "column {} has unknown type {}",
430 column_schema.column_name, column_schema.datatype
431 ),
432 }
433 .build()
434 })?;
435 ensure!(
436 proto_value_type_match(column_type, value_type),
437 InvalidRequestSnafu {
438 region_id,
439 reason: format!(
440 "value has type {:?}, but column {} has type {:?}({})",
441 value_type, column_schema.column_name, column_type, column_schema.datatype,
442 ),
443 }
444 );
445 }
446
447 Ok(())
448}
449
450fn proto_value_type_match(column_type: ColumnDataType, value_type: ColumnDataType) -> bool {
451 match (column_type, value_type) {
452 (ct, vt) if ct == vt => true,
453 (ColumnDataType::Vector, ColumnDataType::Binary) => true,
454 (ColumnDataType::Json, ColumnDataType::Binary) => true,
455 _ => false,
456 }
457}
458
459#[derive(Debug)]
461pub struct OutputTx(Sender<Result<AffectedRows>>);
462
463impl OutputTx {
464 pub(crate) fn new(sender: Sender<Result<AffectedRows>>) -> OutputTx {
466 OutputTx(sender)
467 }
468
469 pub(crate) fn send(self, result: Result<AffectedRows>) {
471 let _ = self.0.send(result);
473 }
474}
475
476#[derive(Debug)]
478pub(crate) struct OptionOutputTx(Option<OutputTx>);
479
480impl OptionOutputTx {
481 pub(crate) fn new(sender: Option<OutputTx>) -> OptionOutputTx {
483 OptionOutputTx(sender)
484 }
485
486 pub(crate) fn none() -> OptionOutputTx {
488 OptionOutputTx(None)
489 }
490
491 pub(crate) fn send_mut(&mut self, result: Result<AffectedRows>) {
493 if let Some(sender) = self.0.take() {
494 sender.send(result);
495 }
496 }
497
498 pub(crate) fn send(mut self, result: Result<AffectedRows>) {
500 if let Some(sender) = self.0.take() {
501 sender.send(result);
502 }
503 }
504
505 pub(crate) fn take_inner(&mut self) -> Option<OutputTx> {
507 self.0.take()
508 }
509}
510
511impl From<Sender<Result<AffectedRows>>> for OptionOutputTx {
512 fn from(sender: Sender<Result<AffectedRows>>) -> Self {
513 Self::new(Some(OutputTx::new(sender)))
514 }
515}
516
517impl OnFailure for OptionOutputTx {
518 fn on_failure(&mut self, err: Error) {
519 self.send_mut(Err(err));
520 }
521}
522
523pub(crate) trait OnFailure {
525 fn on_failure(&mut self, err: Error);
527}
528
529#[derive(Debug)]
531pub(crate) struct SenderWriteRequest {
532 pub(crate) sender: OptionOutputTx,
534 pub(crate) request: WriteRequest,
535}
536
537pub(crate) struct SenderBulkRequest {
538 pub(crate) sender: OptionOutputTx,
539 pub(crate) region_id: RegionId,
540 pub(crate) request: BulkPart,
541 pub(crate) region_metadata: RegionMetadataRef,
542}
543
544#[derive(Debug)]
546pub(crate) struct WorkerRequestWithTime {
547 pub(crate) request: WorkerRequest,
548 pub(crate) created_at: Instant,
549}
550
551impl WorkerRequestWithTime {
552 pub(crate) fn new(request: WorkerRequest) -> Self {
553 Self {
554 request,
555 created_at: Instant::now(),
556 }
557 }
558}
559
560#[derive(Debug)]
562pub(crate) enum WorkerRequest {
563 Write(SenderWriteRequest),
565
566 Ddl(SenderDdlRequest),
568
569 Background {
571 region_id: RegionId,
573 notify: BackgroundNotify,
575 },
576
577 SetRegionRoleStateGracefully {
579 region_id: RegionId,
581 region_role_state: SettableRegionRoleState,
583 sender: Sender<SetRegionRoleStateResponse>,
585 },
586
587 Stop,
589
590 EditRegion(RegionEditRequest),
592
593 SyncRegion(RegionSyncRequest),
595
596 BulkInserts {
598 metadata: Option<RegionMetadataRef>,
599 request: RegionBulkInsertsRequest,
600 sender: OptionOutputTx,
601 },
602}
603
604impl WorkerRequest {
605 pub(crate) fn new_open_region_request(
607 region_id: RegionId,
608 request: RegionOpenRequest,
609 entry_receiver: Option<WalEntryReceiver>,
610 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
611 let (sender, receiver) = oneshot::channel();
612
613 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
614 region_id,
615 sender: sender.into(),
616 request: DdlRequest::Open((request, entry_receiver)),
617 });
618
619 (worker_request, receiver)
620 }
621
622 pub(crate) fn new_catchup_region_request(
624 region_id: RegionId,
625 request: RegionCatchupRequest,
626 entry_receiver: Option<WalEntryReceiver>,
627 ) -> (WorkerRequest, Receiver<Result<AffectedRows>>) {
628 let (sender, receiver) = oneshot::channel();
629 let worker_request = WorkerRequest::Ddl(SenderDdlRequest {
630 region_id,
631 sender: sender.into(),
632 request: DdlRequest::Catchup((request, entry_receiver)),
633 });
634 (worker_request, receiver)
635 }
636
637 pub(crate) fn try_from_region_request(
639 region_id: RegionId,
640 value: RegionRequest,
641 region_metadata: Option<RegionMetadataRef>,
642 ) -> Result<(WorkerRequest, Receiver<Result<AffectedRows>>)> {
643 let (sender, receiver) = oneshot::channel();
644 let worker_request = match value {
645 RegionRequest::Put(v) => {
646 let mut write_request =
647 WriteRequest::new(region_id, OpType::Put, v.rows, region_metadata.clone())?
648 .with_hint(v.hint);
649 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
650 && let Some(region_metadata) = ®ion_metadata
651 {
652 write_request.maybe_fill_missing_columns(region_metadata)?;
653 }
654 WorkerRequest::Write(SenderWriteRequest {
655 sender: sender.into(),
656 request: write_request,
657 })
658 }
659 RegionRequest::Delete(v) => {
660 let mut write_request =
661 WriteRequest::new(region_id, OpType::Delete, v.rows, region_metadata.clone())?;
662 if write_request.primary_key_encoding() == PrimaryKeyEncoding::Dense
663 && let Some(region_metadata) = ®ion_metadata
664 {
665 write_request.maybe_fill_missing_columns(region_metadata)?;
666 }
667 WorkerRequest::Write(SenderWriteRequest {
668 sender: sender.into(),
669 request: write_request,
670 })
671 }
672 RegionRequest::Create(v) => WorkerRequest::Ddl(SenderDdlRequest {
673 region_id,
674 sender: sender.into(),
675 request: DdlRequest::Create(v),
676 }),
677 RegionRequest::Drop(_) => WorkerRequest::Ddl(SenderDdlRequest {
678 region_id,
679 sender: sender.into(),
680 request: DdlRequest::Drop,
681 }),
682 RegionRequest::Open(v) => WorkerRequest::Ddl(SenderDdlRequest {
683 region_id,
684 sender: sender.into(),
685 request: DdlRequest::Open((v, None)),
686 }),
687 RegionRequest::Close(v) => WorkerRequest::Ddl(SenderDdlRequest {
688 region_id,
689 sender: sender.into(),
690 request: DdlRequest::Close(v),
691 }),
692 RegionRequest::Alter(v) => WorkerRequest::Ddl(SenderDdlRequest {
693 region_id,
694 sender: sender.into(),
695 request: DdlRequest::Alter(v),
696 }),
697 RegionRequest::Flush(v) => WorkerRequest::Ddl(SenderDdlRequest {
698 region_id,
699 sender: sender.into(),
700 request: DdlRequest::Flush(v),
701 }),
702 RegionRequest::Compact(v) => WorkerRequest::Ddl(SenderDdlRequest {
703 region_id,
704 sender: sender.into(),
705 request: DdlRequest::Compact(v),
706 }),
707 RegionRequest::BuildIndex(v) => WorkerRequest::Ddl(SenderDdlRequest {
708 region_id,
709 sender: sender.into(),
710 request: DdlRequest::BuildIndex(v),
711 }),
712 RegionRequest::Truncate(v) => WorkerRequest::Ddl(SenderDdlRequest {
713 region_id,
714 sender: sender.into(),
715 request: DdlRequest::Truncate(v),
716 }),
717 RegionRequest::Catchup(v) => WorkerRequest::Ddl(SenderDdlRequest {
718 region_id,
719 sender: sender.into(),
720 request: DdlRequest::Catchup((v, None)),
721 }),
722 RegionRequest::BulkInserts(region_bulk_inserts_request) => WorkerRequest::BulkInserts {
723 metadata: region_metadata,
724 sender: sender.into(),
725 request: region_bulk_inserts_request,
726 },
727 };
728
729 Ok((worker_request, receiver))
730 }
731
732 pub(crate) fn new_set_readonly_gracefully(
733 region_id: RegionId,
734 region_role_state: SettableRegionRoleState,
735 ) -> (WorkerRequest, Receiver<SetRegionRoleStateResponse>) {
736 let (sender, receiver) = oneshot::channel();
737
738 (
739 WorkerRequest::SetRegionRoleStateGracefully {
740 region_id,
741 region_role_state,
742 sender,
743 },
744 receiver,
745 )
746 }
747
748 pub(crate) fn new_sync_region_request(
749 region_id: RegionId,
750 manifest_version: ManifestVersion,
751 ) -> (WorkerRequest, Receiver<Result<(ManifestVersion, bool)>>) {
752 let (sender, receiver) = oneshot::channel();
753 (
754 WorkerRequest::SyncRegion(RegionSyncRequest {
755 region_id,
756 manifest_version,
757 sender,
758 }),
759 receiver,
760 )
761 }
762}
763
764#[derive(Debug)]
766pub(crate) enum DdlRequest {
767 Create(RegionCreateRequest),
768 Drop,
769 Open((RegionOpenRequest, Option<WalEntryReceiver>)),
770 Close(RegionCloseRequest),
771 Alter(RegionAlterRequest),
772 Flush(RegionFlushRequest),
773 Compact(RegionCompactRequest),
774 BuildIndex(RegionBuildIndexRequest),
775 Truncate(RegionTruncateRequest),
776 Catchup((RegionCatchupRequest, Option<WalEntryReceiver>)),
777}
778
779#[derive(Debug)]
781pub(crate) struct SenderDdlRequest {
782 pub(crate) region_id: RegionId,
784 pub(crate) sender: OptionOutputTx,
786 pub(crate) request: DdlRequest,
788}
789
790#[derive(Debug)]
792pub(crate) enum BackgroundNotify {
793 FlushFinished(FlushFinished),
795 FlushFailed(FlushFailed),
797 IndexBuildFinished(IndexBuildFinished),
799 IndexBuildStopped(IndexBuildStopped),
801 IndexBuildFailed(IndexBuildFailed),
803 CompactionFinished(CompactionFinished),
805 CompactionFailed(CompactionFailed),
807 Truncate(TruncateResult),
809 RegionChange(RegionChangeResult),
811 RegionEdit(RegionEditResult),
813}
814
815#[derive(Debug)]
817pub(crate) struct FlushFinished {
818 pub(crate) region_id: RegionId,
820 pub(crate) flushed_entry_id: EntryId,
822 pub(crate) senders: Vec<OutputTx>,
824 pub(crate) _timer: HistogramTimer,
826 pub(crate) edit: RegionEdit,
828 pub(crate) memtables_to_remove: SmallVec<[MemtableId; 2]>,
830}
831
832impl FlushFinished {
833 pub(crate) fn on_success(self) {
835 for sender in self.senders {
836 sender.send(Ok(0));
837 }
838 }
839}
840
841impl OnFailure for FlushFinished {
842 fn on_failure(&mut self, err: Error) {
843 let err = Arc::new(err);
844 for sender in self.senders.drain(..) {
845 sender.send(Err(err.clone()).context(FlushRegionSnafu {
846 region_id: self.region_id,
847 }));
848 }
849 }
850}
851
852#[derive(Debug)]
854pub(crate) struct FlushFailed {
855 pub(crate) err: Arc<Error>,
857}
858
859#[derive(Debug)]
860pub(crate) struct IndexBuildFinished {
861 #[allow(dead_code)]
862 pub(crate) region_id: RegionId,
863 pub(crate) edit: RegionEdit,
864}
865
866#[derive(Debug)]
868pub(crate) struct IndexBuildStopped {
869 #[allow(dead_code)]
870 pub(crate) region_id: RegionId,
871 pub(crate) file_id: FileId,
872}
873
874#[derive(Debug)]
876pub(crate) struct IndexBuildFailed {
877 pub(crate) err: Arc<Error>,
878}
879
880#[derive(Debug)]
882pub(crate) struct CompactionFinished {
883 pub(crate) region_id: RegionId,
885 pub(crate) senders: Vec<OutputTx>,
887 pub(crate) start_time: Instant,
889 pub(crate) edit: RegionEdit,
891}
892
893impl CompactionFinished {
894 pub fn on_success(self) {
895 COMPACTION_ELAPSED_TOTAL.observe(self.start_time.elapsed().as_secs_f64());
897
898 for sender in self.senders {
899 sender.send(Ok(0));
900 }
901 info!("Successfully compacted region: {}", self.region_id);
902 }
903}
904
905impl OnFailure for CompactionFinished {
906 fn on_failure(&mut self, err: Error) {
908 let err = Arc::new(err);
909 for sender in self.senders.drain(..) {
910 sender.send(Err(err.clone()).context(CompactRegionSnafu {
911 region_id: self.region_id,
912 }));
913 }
914 }
915}
916
917#[derive(Debug)]
919pub(crate) struct CompactionFailed {
920 pub(crate) region_id: RegionId,
921 pub(crate) err: Arc<Error>,
923}
924
925#[derive(Debug)]
927pub(crate) struct TruncateResult {
928 pub(crate) region_id: RegionId,
930 pub(crate) sender: OptionOutputTx,
932 pub(crate) result: Result<()>,
934 pub(crate) kind: TruncateKind,
935}
936
937#[derive(Debug)]
939pub(crate) struct RegionChangeResult {
940 pub(crate) region_id: RegionId,
942 pub(crate) new_meta: RegionMetadataRef,
944 pub(crate) sender: OptionOutputTx,
946 pub(crate) result: Result<()>,
948 pub(crate) need_index: bool,
950}
951
952#[derive(Debug)]
954pub(crate) struct RegionEditRequest {
955 pub(crate) region_id: RegionId,
956 pub(crate) edit: RegionEdit,
957 pub(crate) tx: Sender<Result<()>>,
959}
960
961#[derive(Debug)]
963pub(crate) struct RegionEditResult {
964 pub(crate) region_id: RegionId,
966 pub(crate) sender: Sender<Result<()>>,
968 pub(crate) edit: RegionEdit,
970 pub(crate) result: Result<()>,
972}
973
974#[derive(Debug)]
975pub(crate) struct BuildIndexRequest {
976 pub(crate) region_id: RegionId,
977 pub(crate) build_type: IndexBuildType,
978 pub(crate) file_metas: Vec<FileMeta>,
980}
981
982#[derive(Debug)]
983pub(crate) struct RegionSyncRequest {
984 pub(crate) region_id: RegionId,
985 pub(crate) manifest_version: ManifestVersion,
986 pub(crate) sender: Sender<Result<(ManifestVersion, bool)>>,
988}
989
990#[cfg(test)]
991mod tests {
992 use api::v1::value::ValueData;
993 use api::v1::{Row, SemanticType};
994 use datatypes::prelude::ConcreteDataType;
995 use datatypes::schema::ColumnDefaultConstraint;
996 use mito_codec::test_util::i64_value;
997 use store_api::metadata::RegionMetadataBuilder;
998
999 use super::*;
1000 use crate::error::Error;
1001 use crate::test_util::ts_ms_value;
1002
1003 fn new_column_schema(
1004 name: &str,
1005 data_type: ColumnDataType,
1006 semantic_type: SemanticType,
1007 ) -> ColumnSchema {
1008 ColumnSchema {
1009 column_name: name.to_string(),
1010 datatype: data_type as i32,
1011 semantic_type: semantic_type as i32,
1012 ..Default::default()
1013 }
1014 }
1015
1016 fn check_invalid_request(err: &Error, expect: &str) {
1017 if let Error::InvalidRequest {
1018 region_id: _,
1019 reason,
1020 location: _,
1021 } = err
1022 {
1023 assert_eq!(reason, expect);
1024 } else {
1025 panic!("Unexpected error {err}")
1026 }
1027 }
1028
1029 #[test]
1030 fn test_write_request_duplicate_column() {
1031 let rows = Rows {
1032 schema: vec![
1033 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1034 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1035 ],
1036 rows: vec![],
1037 };
1038
1039 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1040 check_invalid_request(&err, "duplicate column c0");
1041 }
1042
1043 #[test]
1044 fn test_valid_write_request() {
1045 let rows = Rows {
1046 schema: vec![
1047 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1048 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1049 ],
1050 rows: vec![Row {
1051 values: vec![i64_value(1), i64_value(2)],
1052 }],
1053 };
1054
1055 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1056 assert_eq!(0, request.column_index_by_name("c0").unwrap());
1057 assert_eq!(1, request.column_index_by_name("c1").unwrap());
1058 assert_eq!(None, request.column_index_by_name("c2"));
1059 }
1060
1061 #[test]
1062 fn test_write_request_column_num() {
1063 let rows = Rows {
1064 schema: vec![
1065 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1066 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1067 ],
1068 rows: vec![Row {
1069 values: vec![i64_value(1), i64_value(2), i64_value(3)],
1070 }],
1071 };
1072
1073 let err = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap_err();
1074 check_invalid_request(&err, "row has 3 columns but schema has 2");
1075 }
1076
1077 fn new_region_metadata() -> RegionMetadata {
1078 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1079 builder
1080 .push_column_metadata(ColumnMetadata {
1081 column_schema: datatypes::schema::ColumnSchema::new(
1082 "ts",
1083 ConcreteDataType::timestamp_millisecond_datatype(),
1084 false,
1085 ),
1086 semantic_type: SemanticType::Timestamp,
1087 column_id: 1,
1088 })
1089 .push_column_metadata(ColumnMetadata {
1090 column_schema: datatypes::schema::ColumnSchema::new(
1091 "k0",
1092 ConcreteDataType::int64_datatype(),
1093 true,
1094 ),
1095 semantic_type: SemanticType::Tag,
1096 column_id: 2,
1097 })
1098 .primary_key(vec![2]);
1099 builder.build().unwrap()
1100 }
1101
1102 #[test]
1103 fn test_check_schema() {
1104 let rows = Rows {
1105 schema: vec![
1106 new_column_schema(
1107 "ts",
1108 ColumnDataType::TimestampMillisecond,
1109 SemanticType::Timestamp,
1110 ),
1111 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1112 ],
1113 rows: vec![Row {
1114 values: vec![ts_ms_value(1), i64_value(2)],
1115 }],
1116 };
1117 let metadata = new_region_metadata();
1118
1119 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1120 request.check_schema(&metadata).unwrap();
1121 }
1122
1123 #[test]
1124 fn test_column_type() {
1125 let rows = Rows {
1126 schema: vec![
1127 new_column_schema("ts", ColumnDataType::Int64, SemanticType::Timestamp),
1128 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1129 ],
1130 rows: vec![Row {
1131 values: vec![i64_value(1), i64_value(2)],
1132 }],
1133 };
1134 let metadata = new_region_metadata();
1135
1136 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1137 let err = request.check_schema(&metadata).unwrap_err();
1138 check_invalid_request(
1139 &err,
1140 "column ts expect type Timestamp(Millisecond(TimestampMillisecondType)), given: INT64(4)",
1141 );
1142 }
1143
1144 #[test]
1145 fn test_semantic_type() {
1146 let rows = Rows {
1147 schema: vec![
1148 new_column_schema(
1149 "ts",
1150 ColumnDataType::TimestampMillisecond,
1151 SemanticType::Tag,
1152 ),
1153 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1154 ],
1155 rows: vec![Row {
1156 values: vec![ts_ms_value(1), i64_value(2)],
1157 }],
1158 };
1159 let metadata = new_region_metadata();
1160
1161 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1162 let err = request.check_schema(&metadata).unwrap_err();
1163 check_invalid_request(&err, "column ts has semantic type Timestamp, given: TAG(0)");
1164 }
1165
1166 #[test]
1167 fn test_column_nullable() {
1168 let rows = Rows {
1169 schema: vec![
1170 new_column_schema(
1171 "ts",
1172 ColumnDataType::TimestampMillisecond,
1173 SemanticType::Timestamp,
1174 ),
1175 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1176 ],
1177 rows: vec![Row {
1178 values: vec![Value { value_data: None }, i64_value(2)],
1179 }],
1180 };
1181 let metadata = new_region_metadata();
1182
1183 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1184 let err = request.check_schema(&metadata).unwrap_err();
1185 check_invalid_request(&err, "column ts is not null but input has null");
1186 }
1187
1188 #[test]
1189 fn test_column_default() {
1190 let rows = Rows {
1191 schema: vec![new_column_schema(
1192 "k0",
1193 ColumnDataType::Int64,
1194 SemanticType::Tag,
1195 )],
1196 rows: vec![Row {
1197 values: vec![i64_value(1)],
1198 }],
1199 };
1200 let metadata = new_region_metadata();
1201
1202 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1203 let err = request.check_schema(&metadata).unwrap_err();
1204 check_invalid_request(&err, "missing column ts");
1205 }
1206
1207 #[test]
1208 fn test_unknown_column() {
1209 let rows = Rows {
1210 schema: vec![
1211 new_column_schema(
1212 "ts",
1213 ColumnDataType::TimestampMillisecond,
1214 SemanticType::Timestamp,
1215 ),
1216 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1217 new_column_schema("k1", ColumnDataType::Int64, SemanticType::Tag),
1218 ],
1219 rows: vec![Row {
1220 values: vec![ts_ms_value(1), i64_value(2), i64_value(3)],
1221 }],
1222 };
1223 let metadata = new_region_metadata();
1224
1225 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1226 let err = request.check_schema(&metadata).unwrap_err();
1227 check_invalid_request(&err, r#"unknown columns: ["k1"]"#);
1228 }
1229
1230 #[test]
1231 fn test_fill_impure_columns_err() {
1232 let rows = Rows {
1233 schema: vec![new_column_schema(
1234 "k0",
1235 ColumnDataType::Int64,
1236 SemanticType::Tag,
1237 )],
1238 rows: vec![Row {
1239 values: vec![i64_value(1)],
1240 }],
1241 };
1242 let metadata = {
1243 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1244 builder
1245 .push_column_metadata(ColumnMetadata {
1246 column_schema: datatypes::schema::ColumnSchema::new(
1247 "ts",
1248 ConcreteDataType::timestamp_millisecond_datatype(),
1249 false,
1250 )
1251 .with_default_constraint(Some(ColumnDefaultConstraint::Function(
1252 "now()".to_string(),
1253 )))
1254 .unwrap(),
1255 semantic_type: SemanticType::Timestamp,
1256 column_id: 1,
1257 })
1258 .push_column_metadata(ColumnMetadata {
1259 column_schema: datatypes::schema::ColumnSchema::new(
1260 "k0",
1261 ConcreteDataType::int64_datatype(),
1262 true,
1263 ),
1264 semantic_type: SemanticType::Tag,
1265 column_id: 2,
1266 })
1267 .primary_key(vec![2]);
1268 builder.build().unwrap()
1269 };
1270
1271 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1272 let err = request.check_schema(&metadata).unwrap_err();
1273 assert!(err.is_fill_default());
1274 assert!(
1275 request
1276 .fill_missing_columns(&metadata)
1277 .unwrap_err()
1278 .to_string()
1279 .contains("unexpected impure default value with region_id")
1280 );
1281 }
1282
1283 #[test]
1284 fn test_fill_missing_columns() {
1285 let rows = Rows {
1286 schema: vec![new_column_schema(
1287 "ts",
1288 ColumnDataType::TimestampMillisecond,
1289 SemanticType::Timestamp,
1290 )],
1291 rows: vec![Row {
1292 values: vec![ts_ms_value(1)],
1293 }],
1294 };
1295 let metadata = new_region_metadata();
1296
1297 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1298 let err = request.check_schema(&metadata).unwrap_err();
1299 assert!(err.is_fill_default());
1300 request.fill_missing_columns(&metadata).unwrap();
1301
1302 let expect_rows = Rows {
1303 schema: vec![new_column_schema(
1304 "ts",
1305 ColumnDataType::TimestampMillisecond,
1306 SemanticType::Timestamp,
1307 )],
1308 rows: vec![Row {
1309 values: vec![ts_ms_value(1)],
1310 }],
1311 };
1312 assert_eq!(expect_rows, request.rows);
1313 }
1314
1315 fn builder_with_ts_tag() -> RegionMetadataBuilder {
1316 let mut builder = RegionMetadataBuilder::new(RegionId::new(1, 1));
1317 builder
1318 .push_column_metadata(ColumnMetadata {
1319 column_schema: datatypes::schema::ColumnSchema::new(
1320 "ts",
1321 ConcreteDataType::timestamp_millisecond_datatype(),
1322 false,
1323 ),
1324 semantic_type: SemanticType::Timestamp,
1325 column_id: 1,
1326 })
1327 .push_column_metadata(ColumnMetadata {
1328 column_schema: datatypes::schema::ColumnSchema::new(
1329 "k0",
1330 ConcreteDataType::int64_datatype(),
1331 true,
1332 ),
1333 semantic_type: SemanticType::Tag,
1334 column_id: 2,
1335 })
1336 .primary_key(vec![2]);
1337 builder
1338 }
1339
1340 fn region_metadata_two_fields() -> RegionMetadata {
1341 let mut builder = builder_with_ts_tag();
1342 builder
1343 .push_column_metadata(ColumnMetadata {
1344 column_schema: datatypes::schema::ColumnSchema::new(
1345 "f0",
1346 ConcreteDataType::int64_datatype(),
1347 true,
1348 ),
1349 semantic_type: SemanticType::Field,
1350 column_id: 3,
1351 })
1352 .push_column_metadata(ColumnMetadata {
1354 column_schema: datatypes::schema::ColumnSchema::new(
1355 "f1",
1356 ConcreteDataType::int64_datatype(),
1357 false,
1358 )
1359 .with_default_constraint(Some(ColumnDefaultConstraint::Value(
1360 datatypes::value::Value::Int64(100),
1361 )))
1362 .unwrap(),
1363 semantic_type: SemanticType::Field,
1364 column_id: 4,
1365 });
1366 builder.build().unwrap()
1367 }
1368
1369 #[test]
1370 fn test_fill_missing_for_delete() {
1371 let rows = Rows {
1372 schema: vec![new_column_schema(
1373 "ts",
1374 ColumnDataType::TimestampMillisecond,
1375 SemanticType::Timestamp,
1376 )],
1377 rows: vec![Row {
1378 values: vec![ts_ms_value(1)],
1379 }],
1380 };
1381 let metadata = region_metadata_two_fields();
1382
1383 let mut request =
1384 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1385 let err = request.check_schema(&metadata).unwrap_err();
1386 check_invalid_request(&err, "delete requests need column k0");
1387 let err = request.fill_missing_columns(&metadata).unwrap_err();
1388 check_invalid_request(&err, "delete requests need column k0");
1389
1390 let rows = Rows {
1391 schema: vec![
1392 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1393 new_column_schema(
1394 "ts",
1395 ColumnDataType::TimestampMillisecond,
1396 SemanticType::Timestamp,
1397 ),
1398 ],
1399 rows: vec![Row {
1400 values: vec![i64_value(100), ts_ms_value(1)],
1401 }],
1402 };
1403 let mut request =
1404 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1405 let err = request.check_schema(&metadata).unwrap_err();
1406 assert!(err.is_fill_default());
1407 request.fill_missing_columns(&metadata).unwrap();
1408
1409 let expect_rows = Rows {
1410 schema: vec![
1411 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1412 new_column_schema(
1413 "ts",
1414 ColumnDataType::TimestampMillisecond,
1415 SemanticType::Timestamp,
1416 ),
1417 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1418 ],
1419 rows: vec![Row {
1421 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1422 }],
1423 };
1424 assert_eq!(expect_rows, request.rows);
1425 }
1426
1427 #[test]
1428 fn test_fill_missing_without_default_in_delete() {
1429 let mut builder = builder_with_ts_tag();
1430 builder
1431 .push_column_metadata(ColumnMetadata {
1433 column_schema: datatypes::schema::ColumnSchema::new(
1434 "f0",
1435 ConcreteDataType::int64_datatype(),
1436 true,
1437 ),
1438 semantic_type: SemanticType::Field,
1439 column_id: 3,
1440 })
1441 .push_column_metadata(ColumnMetadata {
1443 column_schema: datatypes::schema::ColumnSchema::new(
1444 "f1",
1445 ConcreteDataType::int64_datatype(),
1446 false,
1447 ),
1448 semantic_type: SemanticType::Field,
1449 column_id: 4,
1450 });
1451 let metadata = builder.build().unwrap();
1452
1453 let rows = Rows {
1454 schema: vec![
1455 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1456 new_column_schema(
1457 "ts",
1458 ColumnDataType::TimestampMillisecond,
1459 SemanticType::Timestamp,
1460 ),
1461 ],
1462 rows: vec![Row {
1464 values: vec![i64_value(100), ts_ms_value(1)],
1465 }],
1466 };
1467 let mut request =
1468 WriteRequest::new(RegionId::new(1, 1), OpType::Delete, rows, None).unwrap();
1469 let err = request.check_schema(&metadata).unwrap_err();
1470 assert!(err.is_fill_default());
1471 request.fill_missing_columns(&metadata).unwrap();
1472
1473 let expect_rows = Rows {
1474 schema: vec![
1475 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1476 new_column_schema(
1477 "ts",
1478 ColumnDataType::TimestampMillisecond,
1479 SemanticType::Timestamp,
1480 ),
1481 new_column_schema("f1", ColumnDataType::Int64, SemanticType::Field),
1482 ],
1483 rows: vec![Row {
1485 values: vec![i64_value(100), ts_ms_value(1), i64_value(0)],
1486 }],
1487 };
1488 assert_eq!(expect_rows, request.rows);
1489 }
1490
1491 #[test]
1492 fn test_no_default() {
1493 let rows = Rows {
1494 schema: vec![new_column_schema(
1495 "k0",
1496 ColumnDataType::Int64,
1497 SemanticType::Tag,
1498 )],
1499 rows: vec![Row {
1500 values: vec![i64_value(1)],
1501 }],
1502 };
1503 let metadata = new_region_metadata();
1504
1505 let mut request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1506 let err = request.fill_missing_columns(&metadata).unwrap_err();
1507 check_invalid_request(&err, "column ts does not have default value");
1508 }
1509
1510 #[test]
1511 fn test_missing_and_invalid() {
1512 let rows = Rows {
1514 schema: vec![
1515 new_column_schema("k0", ColumnDataType::Int64, SemanticType::Tag),
1516 new_column_schema(
1517 "ts",
1518 ColumnDataType::TimestampMillisecond,
1519 SemanticType::Timestamp,
1520 ),
1521 new_column_schema("f1", ColumnDataType::String, SemanticType::Field),
1522 ],
1523 rows: vec![Row {
1524 values: vec![
1525 i64_value(100),
1526 ts_ms_value(1),
1527 Value {
1528 value_data: Some(ValueData::StringValue("xxxxx".to_string())),
1529 },
1530 ],
1531 }],
1532 };
1533 let metadata = region_metadata_two_fields();
1534
1535 let request = WriteRequest::new(RegionId::new(1, 1), OpType::Put, rows, None).unwrap();
1536 let err = request.check_schema(&metadata).unwrap_err();
1537 check_invalid_request(
1538 &err,
1539 "column f1 expect type Int64(Int64Type), given: STRING(12)",
1540 );
1541 }
1542
1543 #[test]
1544 fn test_write_request_metadata() {
1545 let rows = Rows {
1546 schema: vec![
1547 new_column_schema("c0", ColumnDataType::Int64, SemanticType::Tag),
1548 new_column_schema("c1", ColumnDataType::Int64, SemanticType::Tag),
1549 ],
1550 rows: vec![Row {
1551 values: vec![i64_value(1), i64_value(2)],
1552 }],
1553 };
1554
1555 let metadata = Arc::new(new_region_metadata());
1556 let request = WriteRequest::new(
1557 RegionId::new(1, 1),
1558 OpType::Put,
1559 rows,
1560 Some(metadata.clone()),
1561 )
1562 .unwrap();
1563
1564 assert!(request.region_metadata.is_some());
1565 assert_eq!(
1566 request.region_metadata.unwrap().region_id,
1567 RegionId::new(1, 1)
1568 );
1569 }
1570}