fn start_worker(
key: BatchKey,
worker_tx: Sender<WorkerCommand>,
workers: Arc<DashMap<BatchKey, PendingWorker>>,
rx: Receiver<WorkerCommand>,
shutdown: Sender<()>,
partition_manager: PartitionRuleManagerRef,
node_manager: NodeManagerRef,
catalog_manager: CatalogManagerRef,
flush_interval: Duration,
worker_idle_timeout: Duration,
max_batch_rows: usize,
flush_semaphore: Arc<Semaphore>,
)