mito2/
flush.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Flush related utilities and structs.
16
17use std::collections::HashMap;
18use std::num::NonZeroU64;
19use std::sync::Arc;
20use std::sync::atomic::{AtomicUsize, Ordering};
21use std::time::Instant;
22
23use common_telemetry::{debug, error, info, trace};
24use datatypes::arrow::datatypes::SchemaRef;
25use either::Either;
26use partition::expr::PartitionExpr;
27use smallvec::{SmallVec, smallvec};
28use snafu::ResultExt;
29use store_api::storage::RegionId;
30use strum::IntoStaticStr;
31use tokio::sync::{Semaphore, mpsc, watch};
32
33use crate::access_layer::{
34    AccessLayerRef, Metrics, OperationType, SstInfoArray, SstWriteRequest, WriteType,
35};
36use crate::cache::CacheManagerRef;
37use crate::config::MitoConfig;
38use crate::error::{
39    Error, FlushRegionSnafu, InvalidPartitionExprSnafu, JoinSnafu, RegionClosedSnafu,
40    RegionDroppedSnafu, RegionTruncatedSnafu, Result,
41};
42use crate::manifest::action::{RegionEdit, RegionMetaAction, RegionMetaActionList};
43use crate::memtable::{BoxedRecordBatchIterator, EncodedRange, IterBuilder, MemtableRanges};
44use crate::metrics::{
45    FLUSH_BYTES_TOTAL, FLUSH_ELAPSED, FLUSH_FAILURE_TOTAL, FLUSH_REQUESTS_TOTAL,
46    INFLIGHT_FLUSH_COUNT,
47};
48use crate::read::dedup::{DedupReader, LastNonNull, LastRow};
49use crate::read::flat_dedup::{FlatDedupIterator, FlatLastNonNull, FlatLastRow};
50use crate::read::flat_merge::FlatMergeIterator;
51use crate::read::merge::MergeReaderBuilder;
52use crate::read::scan_region::PredicateGroup;
53use crate::read::{FlatSource, Source};
54use crate::region::options::{IndexOptions, MergeMode, RegionOptions};
55use crate::region::version::{VersionControlData, VersionControlRef, VersionRef};
56use crate::region::{ManifestContextRef, RegionLeaderState, RegionRoleState};
57use crate::request::{
58    BackgroundNotify, FlushFailed, FlushFinished, OptionOutputTx, OutputTx, SenderBulkRequest,
59    SenderDdlRequest, SenderWriteRequest, WorkerRequest, WorkerRequestWithTime,
60};
61use crate::schedule::scheduler::{Job, SchedulerRef};
62use crate::sst::file::FileMeta;
63use crate::sst::parquet::{DEFAULT_READ_BATCH_SIZE, DEFAULT_ROW_GROUP_SIZE, SstInfo, WriteOptions};
64use crate::sst::{FlatSchemaOptions, to_flat_sst_arrow_schema};
65use crate::worker::WorkerListener;
66
67/// Global write buffer (memtable) manager.
68///
69/// Tracks write buffer (memtable) usages and decide whether the engine needs to flush.
70pub trait WriteBufferManager: Send + Sync + std::fmt::Debug {
71    /// Returns whether to trigger the engine.
72    fn should_flush_engine(&self) -> bool;
73
74    /// Returns whether to stall write requests.
75    fn should_stall(&self) -> bool;
76
77    /// Reserves `mem` bytes.
78    fn reserve_mem(&self, mem: usize);
79
80    /// Tells the manager we are freeing `mem` bytes.
81    ///
82    /// We are in the process of freeing `mem` bytes, so it is not considered
83    /// when checking the soft limit.
84    fn schedule_free_mem(&self, mem: usize);
85
86    /// We have freed `mem` bytes.
87    fn free_mem(&self, mem: usize);
88
89    /// Returns the total memory used by memtables.
90    fn memory_usage(&self) -> usize;
91}
92
93pub type WriteBufferManagerRef = Arc<dyn WriteBufferManager>;
94
95/// Default [WriteBufferManager] implementation.
96///
97/// Inspired by RocksDB's WriteBufferManager.
98/// <https://github.com/facebook/rocksdb/blob/main/include/rocksdb/write_buffer_manager.h>
99#[derive(Debug)]
100pub struct WriteBufferManagerImpl {
101    /// Write buffer size for the engine.
102    global_write_buffer_size: usize,
103    /// Mutable memtable memory size limit.
104    mutable_limit: usize,
105    /// Memory in used (e.g. used by mutable and immutable memtables).
106    memory_used: AtomicUsize,
107    /// Memory that hasn't been scheduled to free (e.g. used by mutable memtables).
108    memory_active: AtomicUsize,
109    /// Optional notifier.
110    /// The manager can wake up the worker once we free the write buffer.
111    notifier: Option<watch::Sender<()>>,
112}
113
114impl WriteBufferManagerImpl {
115    /// Returns a new manager with specific `global_write_buffer_size`.
116    pub fn new(global_write_buffer_size: usize) -> Self {
117        Self {
118            global_write_buffer_size,
119            mutable_limit: Self::get_mutable_limit(global_write_buffer_size),
120            memory_used: AtomicUsize::new(0),
121            memory_active: AtomicUsize::new(0),
122            notifier: None,
123        }
124    }
125
126    /// Attaches a notifier to the manager.
127    pub fn with_notifier(mut self, notifier: watch::Sender<()>) -> Self {
128        self.notifier = Some(notifier);
129        self
130    }
131
132    /// Returns memory usage of mutable memtables.
133    pub fn mutable_usage(&self) -> usize {
134        self.memory_active.load(Ordering::Relaxed)
135    }
136
137    /// Returns the size limit for mutable memtables.
138    fn get_mutable_limit(global_write_buffer_size: usize) -> usize {
139        // Reserves half of the write buffer for mutable memtable.
140        global_write_buffer_size / 2
141    }
142}
143
144impl WriteBufferManager for WriteBufferManagerImpl {
145    fn should_flush_engine(&self) -> bool {
146        let mutable_memtable_memory_usage = self.memory_active.load(Ordering::Relaxed);
147        if mutable_memtable_memory_usage > self.mutable_limit {
148            debug!(
149                "Engine should flush (over mutable limit), mutable_usage: {}, memory_usage: {}, mutable_limit: {}, global_limit: {}",
150                mutable_memtable_memory_usage,
151                self.memory_usage(),
152                self.mutable_limit,
153                self.global_write_buffer_size,
154            );
155            return true;
156        }
157
158        let memory_usage = self.memory_used.load(Ordering::Relaxed);
159        // If the memory exceeds the buffer size, we trigger more aggressive
160        // flush. But if already more than half memory is being flushed,
161        // triggering more flush may not help. We will hold it instead.
162        if memory_usage >= self.global_write_buffer_size {
163            if mutable_memtable_memory_usage >= self.global_write_buffer_size / 2 {
164                debug!(
165                    "Engine should flush (over total limit), memory_usage: {}, global_write_buffer_size: {}, \
166                 mutable_usage: {}.",
167                    memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
168                );
169                return true;
170            } else {
171                trace!(
172                    "Engine won't flush, memory_usage: {}, global_write_buffer_size: {}, mutable_usage: {}.",
173                    memory_usage, self.global_write_buffer_size, mutable_memtable_memory_usage
174                );
175            }
176        }
177
178        false
179    }
180
181    fn should_stall(&self) -> bool {
182        self.memory_usage() >= self.global_write_buffer_size
183    }
184
185    fn reserve_mem(&self, mem: usize) {
186        self.memory_used.fetch_add(mem, Ordering::Relaxed);
187        self.memory_active.fetch_add(mem, Ordering::Relaxed);
188    }
189
190    fn schedule_free_mem(&self, mem: usize) {
191        self.memory_active.fetch_sub(mem, Ordering::Relaxed);
192    }
193
194    fn free_mem(&self, mem: usize) {
195        self.memory_used.fetch_sub(mem, Ordering::Relaxed);
196        if let Some(notifier) = &self.notifier {
197            // Notifies the worker after the memory usage is decreased. When we drop the memtable
198            // outside of the worker, the worker may still stall requests because the memory usage
199            // is not updated. So we need to notify the worker to handle stalled requests again.
200            let _ = notifier.send(());
201        }
202    }
203
204    fn memory_usage(&self) -> usize {
205        self.memory_used.load(Ordering::Relaxed)
206    }
207}
208
209/// Reason of a flush task.
210#[derive(Debug, IntoStaticStr)]
211pub enum FlushReason {
212    /// Other reasons.
213    Others,
214    /// Engine reaches flush threshold.
215    EngineFull,
216    /// Manual flush.
217    Manual,
218    /// Flush to alter table.
219    Alter,
220    /// Flush periodically.
221    Periodically,
222    /// Flush memtable during downgrading state.
223    Downgrading,
224}
225
226impl FlushReason {
227    /// Get flush reason as static str.
228    fn as_str(&self) -> &'static str {
229        self.into()
230    }
231}
232
233/// Task to flush a region.
234pub(crate) struct RegionFlushTask {
235    /// Region to flush.
236    pub(crate) region_id: RegionId,
237    /// Reason to flush.
238    pub(crate) reason: FlushReason,
239    /// Flush result senders.
240    pub(crate) senders: Vec<OutputTx>,
241    /// Request sender to notify the worker.
242    pub(crate) request_sender: mpsc::Sender<WorkerRequestWithTime>,
243
244    pub(crate) access_layer: AccessLayerRef,
245    pub(crate) listener: WorkerListener,
246    pub(crate) engine_config: Arc<MitoConfig>,
247    pub(crate) row_group_size: Option<usize>,
248    pub(crate) cache_manager: CacheManagerRef,
249    pub(crate) manifest_ctx: ManifestContextRef,
250
251    /// Index options for the region.
252    pub(crate) index_options: IndexOptions,
253    /// Semaphore to control flush concurrency.
254    pub(crate) flush_semaphore: Arc<Semaphore>,
255}
256
257impl RegionFlushTask {
258    /// Push the sender if it is not none.
259    pub(crate) fn push_sender(&mut self, mut sender: OptionOutputTx) {
260        if let Some(sender) = sender.take_inner() {
261            self.senders.push(sender);
262        }
263    }
264
265    /// Consumes the task and notify the sender the job is success.
266    fn on_success(self) {
267        for sender in self.senders {
268            sender.send(Ok(0));
269        }
270    }
271
272    /// Send flush error to waiter.
273    fn on_failure(&mut self, err: Arc<Error>) {
274        for sender in self.senders.drain(..) {
275            sender.send(Err(err.clone()).context(FlushRegionSnafu {
276                region_id: self.region_id,
277            }));
278        }
279    }
280
281    /// Converts the flush task into a background job.
282    ///
283    /// We must call this in the region worker.
284    fn into_flush_job(mut self, version_control: &VersionControlRef) -> Job {
285        // Get a version of this region before creating a job to get current
286        // wal entry id, sequence and immutable memtables.
287        let version_data = version_control.current();
288
289        Box::pin(async move {
290            INFLIGHT_FLUSH_COUNT.inc();
291            self.do_flush(version_data).await;
292            INFLIGHT_FLUSH_COUNT.dec();
293        })
294    }
295
296    /// Runs the flush task.
297    async fn do_flush(&mut self, version_data: VersionControlData) {
298        let timer = FLUSH_ELAPSED.with_label_values(&["total"]).start_timer();
299        self.listener.on_flush_begin(self.region_id).await;
300
301        let worker_request = match self.flush_memtables(&version_data).await {
302            Ok(edit) => {
303                let memtables_to_remove = version_data
304                    .version
305                    .memtables
306                    .immutables()
307                    .iter()
308                    .map(|m| m.id())
309                    .collect();
310                let flush_finished = FlushFinished {
311                    region_id: self.region_id,
312                    // The last entry has been flushed.
313                    flushed_entry_id: version_data.last_entry_id,
314                    senders: std::mem::take(&mut self.senders),
315                    _timer: timer,
316                    edit,
317                    memtables_to_remove,
318                };
319                WorkerRequest::Background {
320                    region_id: self.region_id,
321                    notify: BackgroundNotify::FlushFinished(flush_finished),
322                }
323            }
324            Err(e) => {
325                error!(e; "Failed to flush region {}", self.region_id);
326                // Discard the timer.
327                timer.stop_and_discard();
328
329                let err = Arc::new(e);
330                self.on_failure(err.clone());
331                WorkerRequest::Background {
332                    region_id: self.region_id,
333                    notify: BackgroundNotify::FlushFailed(FlushFailed { err }),
334                }
335            }
336        };
337        self.send_worker_request(worker_request).await;
338    }
339
340    /// Flushes memtables to level 0 SSTs and updates the manifest.
341    /// Returns the [RegionEdit] to apply.
342    async fn flush_memtables(&self, version_data: &VersionControlData) -> Result<RegionEdit> {
343        // We must use the immutable memtables list and entry ids from the `version_data`
344        // for consistency as others might already modify the version in the `version_control`.
345        let version = &version_data.version;
346        let timer = FLUSH_ELAPSED
347            .with_label_values(&["flush_memtables"])
348            .start_timer();
349
350        let mut write_opts = WriteOptions {
351            write_buffer_size: self.engine_config.sst_write_buffer_size,
352            ..Default::default()
353        };
354        if let Some(row_group_size) = self.row_group_size {
355            write_opts.row_group_size = row_group_size;
356        }
357
358        let DoFlushMemtablesResult {
359            file_metas,
360            flushed_bytes,
361            series_count,
362            flush_metrics,
363        } = self.do_flush_memtables(version, write_opts).await?;
364
365        if !file_metas.is_empty() {
366            FLUSH_BYTES_TOTAL.inc_by(flushed_bytes);
367        }
368
369        let mut file_ids = Vec::with_capacity(file_metas.len());
370        let mut total_rows = 0;
371        let mut total_bytes = 0;
372        for meta in &file_metas {
373            file_ids.push(meta.file_id);
374            total_rows += meta.num_rows;
375            total_bytes += meta.file_size;
376        }
377        info!(
378            "Successfully flush memtables, region: {}, reason: {}, files: {:?}, series count: {}, total_rows: {}, total_bytes: {}, cost: {:?}, metrics: {:?}",
379            self.region_id,
380            self.reason.as_str(),
381            file_ids,
382            series_count,
383            total_rows,
384            total_bytes,
385            timer.stop_and_record(),
386            flush_metrics,
387        );
388        flush_metrics.observe();
389
390        let edit = RegionEdit {
391            files_to_add: file_metas,
392            files_to_remove: Vec::new(),
393            timestamp_ms: Some(chrono::Utc::now().timestamp_millis()),
394            compaction_time_window: None,
395            // The last entry has been flushed.
396            flushed_entry_id: Some(version_data.last_entry_id),
397            flushed_sequence: Some(version_data.committed_sequence),
398            committed_sequence: None,
399        };
400        info!("Applying {edit:?} to region {}", self.region_id);
401
402        let action_list = RegionMetaActionList::with_action(RegionMetaAction::Edit(edit.clone()));
403
404        let expected_state = if matches!(self.reason, FlushReason::Downgrading) {
405            RegionLeaderState::Downgrading
406        } else {
407            // Check if region is in staging mode
408            let current_state = self.manifest_ctx.current_state();
409            if current_state == RegionRoleState::Leader(RegionLeaderState::Staging) {
410                RegionLeaderState::Staging
411            } else {
412                RegionLeaderState::Writable
413            }
414        };
415        // We will leak files if the manifest update fails, but we ignore them for simplicity. We can
416        // add a cleanup job to remove them later.
417        let version = self
418            .manifest_ctx
419            .update_manifest(expected_state, action_list)
420            .await?;
421        info!(
422            "Successfully update manifest version to {version}, region: {}, reason: {}",
423            self.region_id,
424            self.reason.as_str()
425        );
426
427        Ok(edit)
428    }
429
430    async fn do_flush_memtables(
431        &self,
432        version: &VersionRef,
433        write_opts: WriteOptions,
434    ) -> Result<DoFlushMemtablesResult> {
435        let memtables = version.memtables.immutables();
436        let mut file_metas = Vec::with_capacity(memtables.len());
437        let mut flushed_bytes = 0;
438        let mut series_count = 0;
439        // Convert partition expression once outside the map
440        let partition_expr = match &version.metadata.partition_expr {
441            None => None,
442            Some(json_expr) if json_expr.is_empty() => None,
443            Some(json_str) => partition::expr::PartitionExpr::from_json_str(json_str)
444                .with_context(|_| InvalidPartitionExprSnafu { expr: json_str })?,
445        };
446        let mut flush_metrics = Metrics::new(WriteType::Flush);
447        for mem in memtables {
448            if mem.is_empty() {
449                // Skip empty memtables.
450                continue;
451            }
452
453            // Compact the memtable first, this waits the background compaction to finish.
454            let compact_start = std::time::Instant::now();
455            if let Err(e) = mem.compact(true) {
456                common_telemetry::error!(e; "Failed to compact memtable before flush");
457            }
458            let compact_cost = compact_start.elapsed();
459            flush_metrics.compact_memtable += compact_cost;
460
461            // Sets `for_flush` flag to true.
462            let mem_ranges = mem.ranges(None, PredicateGroup::default(), None, true)?;
463            let num_mem_ranges = mem_ranges.ranges.len();
464            let num_mem_rows = mem_ranges.stats.num_rows();
465            let memtable_id = mem.id();
466            // Increases series count for each mem range. We consider each mem range has different series so
467            // the counter may have more series than the actual series count.
468            series_count += mem_ranges.stats.series_count();
469
470            if mem_ranges.is_record_batch() {
471                let flush_start = Instant::now();
472                let FlushFlatMemResult {
473                    num_encoded,
474                    max_sequence,
475                    num_sources,
476                    results,
477                } = self
478                    .flush_flat_mem_ranges(version, &write_opts, mem_ranges)
479                    .await?;
480                for (source_idx, result) in results.into_iter().enumerate() {
481                    let (ssts_written, metrics) = result?;
482                    if ssts_written.is_empty() {
483                        // No data written.
484                        continue;
485                    }
486
487                    common_telemetry::debug!(
488                        "Region {} flush one memtable {} {}/{}, metrics: {:?}",
489                        self.region_id,
490                        memtable_id,
491                        source_idx,
492                        num_sources,
493                        metrics
494                    );
495
496                    flush_metrics = flush_metrics.merge(metrics);
497
498                    file_metas.extend(ssts_written.into_iter().map(|sst_info| {
499                        flushed_bytes += sst_info.file_size;
500                        Self::new_file_meta(
501                            self.region_id,
502                            max_sequence,
503                            sst_info,
504                            partition_expr.clone(),
505                        )
506                    }));
507                }
508
509                common_telemetry::debug!(
510                    "Region {} flush {} memtables for {}, num_mem_ranges: {}, num_encoded: {}, num_rows: {}, flush_cost: {:?}, compact_cost: {:?}",
511                    self.region_id,
512                    num_sources,
513                    memtable_id,
514                    num_mem_ranges,
515                    num_encoded,
516                    num_mem_rows,
517                    flush_start.elapsed(),
518                    compact_cost,
519                );
520            } else {
521                let max_sequence = mem_ranges.stats.max_sequence();
522                let source = memtable_source(mem_ranges, &version.options).await?;
523
524                // Flush to level 0.
525                let source = Either::Left(source);
526                let write_request = self.new_write_request(version, max_sequence, source);
527
528                let (ssts_written, metrics) = self
529                    .access_layer
530                    .write_sst(write_request, &write_opts, WriteType::Flush)
531                    .await?;
532                if ssts_written.is_empty() {
533                    // No data written.
534                    continue;
535                }
536
537                common_telemetry::debug!(
538                    "Region {} flush one memtable, num_mem_ranges: {}, num_rows: {}, metrics: {:?}",
539                    self.region_id,
540                    num_mem_ranges,
541                    num_mem_rows,
542                    metrics
543                );
544
545                flush_metrics = flush_metrics.merge(metrics);
546
547                file_metas.extend(ssts_written.into_iter().map(|sst_info| {
548                    flushed_bytes += sst_info.file_size;
549                    Self::new_file_meta(
550                        self.region_id,
551                        max_sequence,
552                        sst_info,
553                        partition_expr.clone(),
554                    )
555                }));
556            };
557        }
558
559        Ok(DoFlushMemtablesResult {
560            file_metas,
561            flushed_bytes,
562            series_count,
563            flush_metrics,
564        })
565    }
566
567    async fn flush_flat_mem_ranges(
568        &self,
569        version: &VersionRef,
570        write_opts: &WriteOptions,
571        mem_ranges: MemtableRanges,
572    ) -> Result<FlushFlatMemResult> {
573        let batch_schema = to_flat_sst_arrow_schema(
574            &version.metadata,
575            &FlatSchemaOptions::from_encoding(version.metadata.primary_key_encoding),
576        );
577        let flat_sources = memtable_flat_sources(
578            batch_schema,
579            mem_ranges,
580            &version.options,
581            version.metadata.primary_key.len(),
582        )?;
583        let mut tasks = Vec::with_capacity(flat_sources.encoded.len() + flat_sources.sources.len());
584        let num_encoded = flat_sources.encoded.len();
585        let max_sequence = flat_sources.max_sequence;
586        for source in flat_sources.sources {
587            let source = Either::Right(source);
588            let write_request = self.new_write_request(version, max_sequence, source);
589            let access_layer = self.access_layer.clone();
590            let write_opts = write_opts.clone();
591            let semaphore = self.flush_semaphore.clone();
592            let task = common_runtime::spawn_global(async move {
593                let _permit = semaphore.acquire().await.unwrap();
594                access_layer
595                    .write_sst(write_request, &write_opts, WriteType::Flush)
596                    .await
597            });
598            tasks.push(task);
599        }
600        for encoded in flat_sources.encoded {
601            let access_layer = self.access_layer.clone();
602            let cache_manager = self.cache_manager.clone();
603            let region_id = version.metadata.region_id;
604            let semaphore = self.flush_semaphore.clone();
605            let task = common_runtime::spawn_global(async move {
606                let _permit = semaphore.acquire().await.unwrap();
607                let metrics = access_layer
608                    .put_sst(&encoded.data, region_id, &encoded.sst_info, &cache_manager)
609                    .await?;
610                Ok((smallvec![encoded.sst_info], metrics))
611            });
612            tasks.push(task);
613        }
614        let num_sources = tasks.len();
615        let results = futures::future::try_join_all(tasks)
616            .await
617            .context(JoinSnafu)?;
618        Ok(FlushFlatMemResult {
619            num_encoded,
620            max_sequence,
621            num_sources,
622            results,
623        })
624    }
625
626    fn new_file_meta(
627        region_id: RegionId,
628        max_sequence: u64,
629        sst_info: SstInfo,
630        partition_expr: Option<PartitionExpr>,
631    ) -> FileMeta {
632        FileMeta {
633            region_id,
634            file_id: sst_info.file_id,
635            time_range: sst_info.time_range,
636            level: 0,
637            file_size: sst_info.file_size,
638            available_indexes: sst_info.index_metadata.build_available_indexes(),
639            index_file_size: sst_info.index_metadata.file_size,
640            num_rows: sst_info.num_rows as u64,
641            num_row_groups: sst_info.num_row_groups,
642            sequence: NonZeroU64::new(max_sequence),
643            partition_expr,
644        }
645    }
646
647    fn new_write_request(
648        &self,
649        version: &VersionRef,
650        max_sequence: u64,
651        source: Either<Source, FlatSource>,
652    ) -> SstWriteRequest {
653        SstWriteRequest {
654            op_type: OperationType::Flush,
655            metadata: version.metadata.clone(),
656            source,
657            cache_manager: self.cache_manager.clone(),
658            storage: version.options.storage.clone(),
659            max_sequence: Some(max_sequence),
660            index_options: self.index_options.clone(),
661            index_config: self.engine_config.index.clone(),
662            inverted_index_config: self.engine_config.inverted_index.clone(),
663            fulltext_index_config: self.engine_config.fulltext_index.clone(),
664            bloom_filter_index_config: self.engine_config.bloom_filter_index.clone(),
665        }
666    }
667
668    /// Notify flush job status.
669    pub(crate) async fn send_worker_request(&self, request: WorkerRequest) {
670        if let Err(e) = self
671            .request_sender
672            .send(WorkerRequestWithTime::new(request))
673            .await
674        {
675            error!(
676                "Failed to notify flush job status for region {}, request: {:?}",
677                self.region_id, e.0
678            );
679        }
680    }
681
682    /// Merge two flush tasks.
683    fn merge(&mut self, mut other: RegionFlushTask) {
684        assert_eq!(self.region_id, other.region_id);
685        // Now we only merge senders. They share the same flush reason.
686        self.senders.append(&mut other.senders);
687    }
688}
689
690struct FlushFlatMemResult {
691    num_encoded: usize,
692    max_sequence: u64,
693    num_sources: usize,
694    results: Vec<Result<(SstInfoArray, Metrics)>>,
695}
696
697struct DoFlushMemtablesResult {
698    file_metas: Vec<FileMeta>,
699    flushed_bytes: u64,
700    series_count: usize,
701    flush_metrics: Metrics,
702}
703
704/// Returns a [Source] for the given memtable.
705async fn memtable_source(mem_ranges: MemtableRanges, options: &RegionOptions) -> Result<Source> {
706    let source = if mem_ranges.ranges.len() == 1 {
707        let only_range = mem_ranges.ranges.into_values().next().unwrap();
708        let iter = only_range.build_iter()?;
709        Source::Iter(iter)
710    } else {
711        // todo(hl): a workaround since sync version of MergeReader is wip.
712        let sources = mem_ranges
713            .ranges
714            .into_values()
715            .map(|r| r.build_iter().map(Source::Iter))
716            .collect::<Result<Vec<_>>>()?;
717        let merge_reader = MergeReaderBuilder::from_sources(sources).build().await?;
718        let maybe_dedup = if options.append_mode {
719            // no dedup in append mode
720            Box::new(merge_reader) as _
721        } else {
722            // dedup according to merge mode
723            match options.merge_mode.unwrap_or(MergeMode::LastRow) {
724                MergeMode::LastRow => {
725                    Box::new(DedupReader::new(merge_reader, LastRow::new(false))) as _
726                }
727                MergeMode::LastNonNull => {
728                    Box::new(DedupReader::new(merge_reader, LastNonNull::new(false))) as _
729                }
730            }
731        };
732        Source::Reader(maybe_dedup)
733    };
734    Ok(source)
735}
736
737struct FlatSources {
738    max_sequence: u64,
739    sources: SmallVec<[FlatSource; 4]>,
740    encoded: SmallVec<[EncodedRange; 4]>,
741}
742
743// TODO(yingwen): Flushes into multiple files in parallel.
744/// Returns the max sequence and [FlatSource] for the given memtable.
745fn memtable_flat_sources(
746    schema: SchemaRef,
747    mem_ranges: MemtableRanges,
748    options: &RegionOptions,
749    field_column_start: usize,
750) -> Result<FlatSources> {
751    let MemtableRanges { ranges, stats } = mem_ranges;
752    let max_sequence = stats.max_sequence();
753    let mut flat_sources = FlatSources {
754        max_sequence,
755        sources: SmallVec::new(),
756        encoded: SmallVec::new(),
757    };
758
759    if ranges.len() == 1 {
760        let only_range = ranges.into_values().next().unwrap();
761        if let Some(encoded) = only_range.encoded() {
762            flat_sources.encoded.push(encoded);
763        } else {
764            let iter = only_range.build_record_batch_iter(None)?;
765            flat_sources.sources.push(FlatSource::Iter(iter));
766        };
767    } else {
768        let min_flush_rows = stats.num_rows / 8;
769        let min_flush_rows = min_flush_rows.max(DEFAULT_ROW_GROUP_SIZE);
770        let mut last_iter_rows = 0;
771        let num_ranges = ranges.len();
772        let mut input_iters = Vec::with_capacity(num_ranges);
773        for (_range_id, range) in ranges {
774            if let Some(encoded) = range.encoded() {
775                flat_sources.encoded.push(encoded);
776                continue;
777            }
778
779            let iter = range.build_record_batch_iter(None)?;
780            input_iters.push(iter);
781            last_iter_rows += range.num_rows();
782
783            if last_iter_rows > min_flush_rows {
784                let maybe_dedup = merge_and_dedup(
785                    &schema,
786                    options,
787                    field_column_start,
788                    std::mem::replace(&mut input_iters, Vec::with_capacity(num_ranges)),
789                )?;
790
791                flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
792                last_iter_rows = 0;
793            }
794        }
795
796        // Handle remaining iters.
797        if !input_iters.is_empty() {
798            let maybe_dedup = merge_and_dedup(&schema, options, field_column_start, input_iters)?;
799
800            flat_sources.sources.push(FlatSource::Iter(maybe_dedup));
801        }
802    }
803
804    Ok(flat_sources)
805}
806
807fn merge_and_dedup(
808    schema: &SchemaRef,
809    options: &RegionOptions,
810    field_column_start: usize,
811    input_iters: Vec<BoxedRecordBatchIterator>,
812) -> Result<BoxedRecordBatchIterator> {
813    let merge_iter = FlatMergeIterator::new(schema.clone(), input_iters, DEFAULT_READ_BATCH_SIZE)?;
814    let maybe_dedup = if options.append_mode {
815        // No dedup in append mode
816        Box::new(merge_iter) as _
817    } else {
818        // Dedup according to merge mode.
819        match options.merge_mode() {
820            MergeMode::LastRow => {
821                Box::new(FlatDedupIterator::new(merge_iter, FlatLastRow::new(false))) as _
822            }
823            MergeMode::LastNonNull => Box::new(FlatDedupIterator::new(
824                merge_iter,
825                FlatLastNonNull::new(field_column_start, false),
826            )) as _,
827        }
828    };
829    Ok(maybe_dedup)
830}
831
832/// Manages background flushes of a worker.
833pub(crate) struct FlushScheduler {
834    /// Tracks regions need to flush.
835    region_status: HashMap<RegionId, FlushStatus>,
836    /// Background job scheduler.
837    scheduler: SchedulerRef,
838}
839
840impl FlushScheduler {
841    /// Creates a new flush scheduler.
842    pub(crate) fn new(scheduler: SchedulerRef) -> FlushScheduler {
843        FlushScheduler {
844            region_status: HashMap::new(),
845            scheduler,
846        }
847    }
848
849    /// Returns true if the region already requested flush.
850    pub(crate) fn is_flush_requested(&self, region_id: RegionId) -> bool {
851        self.region_status.contains_key(&region_id)
852    }
853
854    /// Schedules a flush `task` for specific `region`.
855    pub(crate) fn schedule_flush(
856        &mut self,
857        region_id: RegionId,
858        version_control: &VersionControlRef,
859        task: RegionFlushTask,
860    ) -> Result<()> {
861        debug_assert_eq!(region_id, task.region_id);
862
863        let version = version_control.current().version;
864        if version.memtables.is_empty() {
865            debug_assert!(!self.region_status.contains_key(&region_id));
866            // The region has nothing to flush.
867            task.on_success();
868            return Ok(());
869        }
870
871        // Don't increase the counter if a region has nothing to flush.
872        FLUSH_REQUESTS_TOTAL
873            .with_label_values(&[task.reason.as_str()])
874            .inc();
875
876        // Add this region to status map.
877        let flush_status = self
878            .region_status
879            .entry(region_id)
880            .or_insert_with(|| FlushStatus::new(region_id, version_control.clone()));
881        // Checks whether we can flush the region now.
882        if flush_status.flushing {
883            // There is already a flush job running.
884            flush_status.merge_task(task);
885            return Ok(());
886        }
887
888        // TODO(yingwen): We can merge with pending and execute directly.
889        // If there are pending tasks, then we should push it to pending list.
890        if flush_status.pending_task.is_some() {
891            flush_status.merge_task(task);
892            return Ok(());
893        }
894
895        // Now we can flush the region directly.
896        if let Err(e) = version_control.freeze_mutable() {
897            error!(e; "Failed to freeze the mutable memtable for region {}", region_id);
898
899            // Remove from region status if we can't freeze the mutable memtable.
900            self.region_status.remove(&region_id);
901            return Err(e);
902        }
903        // Submit a flush job.
904        let job = task.into_flush_job(version_control);
905        if let Err(e) = self.scheduler.schedule(job) {
906            // If scheduler returns error, senders in the job will be dropped and waiters
907            // can get recv errors.
908            error!(e; "Failed to schedule flush job for region {}", region_id);
909
910            // Remove from region status if we can't submit the task.
911            self.region_status.remove(&region_id);
912            return Err(e);
913        }
914
915        flush_status.flushing = true;
916
917        Ok(())
918    }
919
920    /// Notifies the scheduler that the flush job is finished.
921    ///
922    /// Returns all pending requests if the region doesn't need to flush again.
923    pub(crate) fn on_flush_success(
924        &mut self,
925        region_id: RegionId,
926    ) -> Option<(
927        Vec<SenderDdlRequest>,
928        Vec<SenderWriteRequest>,
929        Vec<SenderBulkRequest>,
930    )> {
931        let flush_status = self.region_status.get_mut(&region_id)?;
932
933        // This region doesn't have running flush job.
934        flush_status.flushing = false;
935
936        let pending_requests = if flush_status.pending_task.is_none() {
937            // The region doesn't have any pending flush task.
938            // Safety: The flush status must exist.
939            let flush_status = self.region_status.remove(&region_id).unwrap();
940            Some((
941                flush_status.pending_ddls,
942                flush_status.pending_writes,
943                flush_status.pending_bulk_writes,
944            ))
945        } else {
946            let version_data = flush_status.version_control.current();
947            if version_data.version.memtables.is_empty() {
948                // The region has nothing to flush, we also need to remove it from the status.
949                // Safety: The pending task is not None.
950                let task = flush_status.pending_task.take().unwrap();
951                // The region has nothing to flush. We can notify pending task.
952                task.on_success();
953                // `schedule_next_flush()` may pick up the same region to flush, so we must remove
954                // it from the status to avoid leaking pending requests.
955                // Safety: The flush status must exist.
956                let flush_status = self.region_status.remove(&region_id).unwrap();
957                Some((
958                    flush_status.pending_ddls,
959                    flush_status.pending_writes,
960                    flush_status.pending_bulk_writes,
961                ))
962            } else {
963                // We can flush the region again, keep it in the region status.
964                None
965            }
966        };
967
968        // Schedule next flush job.
969        if let Err(e) = self.schedule_next_flush() {
970            error!(e; "Flush of region {} is successful, but failed to schedule next flush", region_id);
971        }
972
973        pending_requests
974    }
975
976    /// Notifies the scheduler that the flush job is failed.
977    pub(crate) fn on_flush_failed(&mut self, region_id: RegionId, err: Arc<Error>) {
978        error!(err; "Region {} failed to flush, cancel all pending tasks", region_id);
979
980        FLUSH_FAILURE_TOTAL.inc();
981
982        // Remove this region.
983        let Some(flush_status) = self.region_status.remove(&region_id) else {
984            return;
985        };
986
987        // Fast fail: cancels all pending tasks and sends error to their waiters.
988        flush_status.on_failure(err);
989
990        // Still tries to schedule a new flush.
991        if let Err(e) = self.schedule_next_flush() {
992            error!(e; "Failed to schedule next flush after region {} flush is failed", region_id);
993        }
994    }
995
996    /// Notifies the scheduler that the region is dropped.
997    pub(crate) fn on_region_dropped(&mut self, region_id: RegionId) {
998        self.remove_region_on_failure(
999            region_id,
1000            Arc::new(RegionDroppedSnafu { region_id }.build()),
1001        );
1002    }
1003
1004    /// Notifies the scheduler that the region is closed.
1005    pub(crate) fn on_region_closed(&mut self, region_id: RegionId) {
1006        self.remove_region_on_failure(region_id, Arc::new(RegionClosedSnafu { region_id }.build()));
1007    }
1008
1009    /// Notifies the scheduler that the region is truncated.
1010    pub(crate) fn on_region_truncated(&mut self, region_id: RegionId) {
1011        self.remove_region_on_failure(
1012            region_id,
1013            Arc::new(RegionTruncatedSnafu { region_id }.build()),
1014        );
1015    }
1016
1017    fn remove_region_on_failure(&mut self, region_id: RegionId, err: Arc<Error>) {
1018        // Remove this region.
1019        let Some(flush_status) = self.region_status.remove(&region_id) else {
1020            return;
1021        };
1022
1023        // Notifies all pending tasks.
1024        flush_status.on_failure(err);
1025    }
1026
1027    /// Add ddl request to pending queue.
1028    ///
1029    /// # Panics
1030    /// Panics if region didn't request flush.
1031    pub(crate) fn add_ddl_request_to_pending(&mut self, request: SenderDdlRequest) {
1032        let status = self.region_status.get_mut(&request.region_id).unwrap();
1033        status.pending_ddls.push(request);
1034    }
1035
1036    /// Add write request to pending queue.
1037    ///
1038    /// # Panics
1039    /// Panics if region didn't request flush.
1040    pub(crate) fn add_write_request_to_pending(&mut self, request: SenderWriteRequest) {
1041        let status = self
1042            .region_status
1043            .get_mut(&request.request.region_id)
1044            .unwrap();
1045        status.pending_writes.push(request);
1046    }
1047
1048    /// Add bulk write request to pending queue.
1049    ///
1050    /// # Panics
1051    /// Panics if region didn't request flush.
1052    pub(crate) fn add_bulk_request_to_pending(&mut self, request: SenderBulkRequest) {
1053        let status = self.region_status.get_mut(&request.region_id).unwrap();
1054        status.pending_bulk_writes.push(request);
1055    }
1056
1057    /// Returns true if the region has pending DDLs.
1058    pub(crate) fn has_pending_ddls(&self, region_id: RegionId) -> bool {
1059        self.region_status
1060            .get(&region_id)
1061            .map(|status| !status.pending_ddls.is_empty())
1062            .unwrap_or(false)
1063    }
1064
1065    /// Schedules a new flush task when the scheduler can submit next task.
1066    pub(crate) fn schedule_next_flush(&mut self) -> Result<()> {
1067        debug_assert!(
1068            self.region_status
1069                .values()
1070                .all(|status| status.flushing || status.pending_task.is_some())
1071        );
1072
1073        // Get the first region from status map.
1074        let Some(flush_status) = self
1075            .region_status
1076            .values_mut()
1077            .find(|status| status.pending_task.is_some())
1078        else {
1079            return Ok(());
1080        };
1081        debug_assert!(!flush_status.flushing);
1082        let task = flush_status.pending_task.take().unwrap();
1083        let region_id = flush_status.region_id;
1084        let version_control = flush_status.version_control.clone();
1085
1086        self.schedule_flush(region_id, &version_control, task)
1087    }
1088}
1089
1090impl Drop for FlushScheduler {
1091    fn drop(&mut self) {
1092        for (region_id, flush_status) in self.region_status.drain() {
1093            // We are shutting down so notify all pending tasks.
1094            flush_status.on_failure(Arc::new(RegionClosedSnafu { region_id }.build()));
1095        }
1096    }
1097}
1098
1099/// Flush status of a region scheduled by the [FlushScheduler].
1100///
1101/// Tracks running and pending flush tasks and all pending requests of a region.
1102struct FlushStatus {
1103    /// Current region.
1104    region_id: RegionId,
1105    /// Version control of the region.
1106    version_control: VersionControlRef,
1107    /// There is a flush task running.
1108    ///
1109    /// It is possible that a region is not flushing but has pending task if the scheduler
1110    /// doesn't schedules this region.
1111    flushing: bool,
1112    /// Task waiting for next flush.
1113    pending_task: Option<RegionFlushTask>,
1114    /// Pending ddl requests.
1115    pending_ddls: Vec<SenderDdlRequest>,
1116    /// Requests waiting to write after altering the region.
1117    pending_writes: Vec<SenderWriteRequest>,
1118    /// Bulk requests waiting to write after altering the region.
1119    pending_bulk_writes: Vec<SenderBulkRequest>,
1120}
1121
1122impl FlushStatus {
1123    fn new(region_id: RegionId, version_control: VersionControlRef) -> FlushStatus {
1124        FlushStatus {
1125            region_id,
1126            version_control,
1127            flushing: false,
1128            pending_task: None,
1129            pending_ddls: Vec::new(),
1130            pending_writes: Vec::new(),
1131            pending_bulk_writes: Vec::new(),
1132        }
1133    }
1134
1135    /// Merges the task to pending task.
1136    fn merge_task(&mut self, task: RegionFlushTask) {
1137        if let Some(pending) = &mut self.pending_task {
1138            pending.merge(task);
1139        } else {
1140            self.pending_task = Some(task);
1141        }
1142    }
1143
1144    fn on_failure(self, err: Arc<Error>) {
1145        if let Some(mut task) = self.pending_task {
1146            task.on_failure(err.clone());
1147        }
1148        for ddl in self.pending_ddls {
1149            ddl.sender.send(Err(err.clone()).context(FlushRegionSnafu {
1150                region_id: self.region_id,
1151            }));
1152        }
1153        for write_req in self.pending_writes {
1154            write_req
1155                .sender
1156                .send(Err(err.clone()).context(FlushRegionSnafu {
1157                    region_id: self.region_id,
1158                }));
1159        }
1160    }
1161}
1162
1163#[cfg(test)]
1164mod tests {
1165    use tokio::sync::oneshot;
1166
1167    use super::*;
1168    use crate::cache::CacheManager;
1169    use crate::memtable::time_series::TimeSeriesMemtableBuilder;
1170    use crate::test_util::scheduler_util::{SchedulerEnv, VecScheduler};
1171    use crate::test_util::version_util::{VersionControlBuilder, write_rows_to_version};
1172
1173    #[test]
1174    fn test_get_mutable_limit() {
1175        assert_eq!(4, WriteBufferManagerImpl::get_mutable_limit(8));
1176        assert_eq!(5, WriteBufferManagerImpl::get_mutable_limit(10));
1177        assert_eq!(32, WriteBufferManagerImpl::get_mutable_limit(64));
1178        assert_eq!(0, WriteBufferManagerImpl::get_mutable_limit(0));
1179    }
1180
1181    #[test]
1182    fn test_over_mutable_limit() {
1183        // Mutable limit is 500.
1184        let manager = WriteBufferManagerImpl::new(1000);
1185        manager.reserve_mem(400);
1186        assert!(!manager.should_flush_engine());
1187        assert!(!manager.should_stall());
1188
1189        // More than mutable limit.
1190        manager.reserve_mem(400);
1191        assert!(manager.should_flush_engine());
1192
1193        // Freezes mutable.
1194        manager.schedule_free_mem(400);
1195        assert!(!manager.should_flush_engine());
1196        assert_eq!(800, manager.memory_used.load(Ordering::Relaxed));
1197        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1198
1199        // Releases immutable.
1200        manager.free_mem(400);
1201        assert_eq!(400, manager.memory_used.load(Ordering::Relaxed));
1202        assert_eq!(400, manager.memory_active.load(Ordering::Relaxed));
1203    }
1204
1205    #[test]
1206    fn test_over_global() {
1207        // Mutable limit is 500.
1208        let manager = WriteBufferManagerImpl::new(1000);
1209        manager.reserve_mem(1100);
1210        assert!(manager.should_stall());
1211        // Global usage is still 1100.
1212        manager.schedule_free_mem(200);
1213        assert!(manager.should_flush_engine());
1214
1215        // More than global limit, but mutable (1100-200-450=450) is not enough (< 500).
1216        manager.schedule_free_mem(450);
1217        assert!(!manager.should_flush_engine());
1218
1219        // Now mutable is enough.
1220        manager.reserve_mem(50);
1221        assert!(manager.should_flush_engine());
1222        manager.reserve_mem(100);
1223        assert!(manager.should_flush_engine());
1224    }
1225
1226    #[test]
1227    fn test_manager_notify() {
1228        let (sender, receiver) = watch::channel(());
1229        let manager = WriteBufferManagerImpl::new(1000).with_notifier(sender);
1230        manager.reserve_mem(500);
1231        assert!(!receiver.has_changed().unwrap());
1232        manager.schedule_free_mem(500);
1233        assert!(!receiver.has_changed().unwrap());
1234        manager.free_mem(500);
1235        assert!(receiver.has_changed().unwrap());
1236    }
1237
1238    #[tokio::test]
1239    async fn test_schedule_empty() {
1240        let env = SchedulerEnv::new().await;
1241        let (tx, _rx) = mpsc::channel(4);
1242        let mut scheduler = env.mock_flush_scheduler();
1243        let builder = VersionControlBuilder::new();
1244
1245        let version_control = Arc::new(builder.build());
1246        let (output_tx, output_rx) = oneshot::channel();
1247        let mut task = RegionFlushTask {
1248            region_id: builder.region_id(),
1249            reason: FlushReason::Others,
1250            senders: Vec::new(),
1251            request_sender: tx,
1252            access_layer: env.access_layer.clone(),
1253            listener: WorkerListener::default(),
1254            engine_config: Arc::new(MitoConfig::default()),
1255            row_group_size: None,
1256            cache_manager: Arc::new(CacheManager::default()),
1257            manifest_ctx: env
1258                .mock_manifest_context(version_control.current().version.metadata.clone())
1259                .await,
1260            index_options: IndexOptions::default(),
1261            flush_semaphore: Arc::new(Semaphore::new(2)),
1262        };
1263        task.push_sender(OptionOutputTx::from(output_tx));
1264        scheduler
1265            .schedule_flush(builder.region_id(), &version_control, task)
1266            .unwrap();
1267        assert!(scheduler.region_status.is_empty());
1268        let output = output_rx.await.unwrap().unwrap();
1269        assert_eq!(output, 0);
1270        assert!(scheduler.region_status.is_empty());
1271    }
1272
1273    #[tokio::test]
1274    async fn test_schedule_pending_request() {
1275        let job_scheduler = Arc::new(VecScheduler::default());
1276        let env = SchedulerEnv::new().await.scheduler(job_scheduler.clone());
1277        let (tx, _rx) = mpsc::channel(4);
1278        let mut scheduler = env.mock_flush_scheduler();
1279        let mut builder = VersionControlBuilder::new();
1280        // Overwrites the empty memtable builder.
1281        builder.set_memtable_builder(Arc::new(TimeSeriesMemtableBuilder::default()));
1282        let version_control = Arc::new(builder.build());
1283        // Writes data to the memtable so it is not empty.
1284        let version_data = version_control.current();
1285        write_rows_to_version(&version_data.version, "host0", 0, 10);
1286        let manifest_ctx = env
1287            .mock_manifest_context(version_data.version.metadata.clone())
1288            .await;
1289        // Creates 3 tasks.
1290        let mut tasks: Vec<_> = (0..3)
1291            .map(|_| RegionFlushTask {
1292                region_id: builder.region_id(),
1293                reason: FlushReason::Others,
1294                senders: Vec::new(),
1295                request_sender: tx.clone(),
1296                access_layer: env.access_layer.clone(),
1297                listener: WorkerListener::default(),
1298                engine_config: Arc::new(MitoConfig::default()),
1299                row_group_size: None,
1300                cache_manager: Arc::new(CacheManager::default()),
1301                manifest_ctx: manifest_ctx.clone(),
1302                index_options: IndexOptions::default(),
1303                flush_semaphore: Arc::new(Semaphore::new(2)),
1304            })
1305            .collect();
1306        // Schedule first task.
1307        let task = tasks.pop().unwrap();
1308        scheduler
1309            .schedule_flush(builder.region_id(), &version_control, task)
1310            .unwrap();
1311        // Should schedule 1 flush.
1312        assert_eq!(1, scheduler.region_status.len());
1313        assert_eq!(1, job_scheduler.num_jobs());
1314        // Check the new version.
1315        let version_data = version_control.current();
1316        assert_eq!(0, version_data.version.memtables.immutables()[0].id());
1317        // Schedule remaining tasks.
1318        let output_rxs: Vec<_> = tasks
1319            .into_iter()
1320            .map(|mut task| {
1321                let (output_tx, output_rx) = oneshot::channel();
1322                task.push_sender(OptionOutputTx::from(output_tx));
1323                scheduler
1324                    .schedule_flush(builder.region_id(), &version_control, task)
1325                    .unwrap();
1326                output_rx
1327            })
1328            .collect();
1329        // Assumes the flush job is finished.
1330        version_control.apply_edit(
1331            Some(RegionEdit {
1332                files_to_add: Vec::new(),
1333                files_to_remove: Vec::new(),
1334                timestamp_ms: None,
1335                compaction_time_window: None,
1336                flushed_entry_id: None,
1337                flushed_sequence: None,
1338                committed_sequence: None,
1339            }),
1340            &[0],
1341            builder.file_purger(),
1342        );
1343        scheduler.on_flush_success(builder.region_id());
1344        // No new flush task.
1345        assert_eq!(1, job_scheduler.num_jobs());
1346        // The flush status is cleared.
1347        assert!(scheduler.region_status.is_empty());
1348        for output_rx in output_rxs {
1349            let output = output_rx.await.unwrap().unwrap();
1350            assert_eq!(output, 0);
1351        }
1352    }
1353}