tests_fuzz/utils/
csv_dump_writer.rs

1// Copyright 2023 Greptime Team
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::collections::{HashMap, HashSet};
16use std::fs::{File, OpenOptions, create_dir_all, remove_dir_all};
17use std::io::Write;
18use std::path::{Path, PathBuf};
19
20use common_telemetry::{info, warn};
21use common_time::util::current_time_millis;
22use snafu::ResultExt;
23
24use crate::error::{self, Result};
25use crate::translator::csv::CsvRecords;
26use crate::utils::{
27    get_gt_fuzz_dump_buffer_max_bytes, get_gt_fuzz_dump_dir, get_gt_fuzz_dump_suffix,
28};
29
30/// Metadata for one CSV dump session.
31#[derive(Debug, Clone)]
32pub struct CsvDumpMetadata {
33    /// Fuzz target name.
34    pub target: String,
35    /// Seed used by current fuzz input.
36    pub seed: u64,
37    /// Repartition action count.
38    pub actions: usize,
39    /// Initial partition count.
40    pub partitions: usize,
41    /// Logical table count.
42    pub tables: usize,
43    /// Session start time in unix milliseconds.
44    pub started_at_unix_ms: i64,
45}
46
47impl CsvDumpMetadata {
48    /// Builds dump metadata with current timestamp.
49    pub fn new(
50        target: impl Into<String>,
51        seed: u64,
52        actions: usize,
53        partitions: usize,
54        tables: usize,
55    ) -> Self {
56        Self {
57            target: target.into(),
58            seed,
59            actions,
60            partitions,
61            tables,
62            started_at_unix_ms: current_time_millis(),
63        }
64    }
65}
66
67/// Session writer for staged CSV dump records.
68#[derive(Debug)]
69pub struct CsvDumpSession {
70    /// Session metadata.
71    pub metadata: CsvDumpMetadata,
72    /// Session directory path.
73    pub run_dir: PathBuf,
74    /// Max in-memory buffer size before auto flush.
75    pub max_buffer_bytes: usize,
76    records: Vec<CsvRecords>,
77    buffered_bytes: usize,
78    written_tables: HashSet<String>,
79    full_headers_by_table: HashMap<String, Vec<String>>,
80}
81
82impl CsvDumpSession {
83    /// Creates session directory and writes seed metadata file.
84    pub fn new(metadata: CsvDumpMetadata) -> Result<Self> {
85        Self::new_with_buffer_limit(metadata, get_gt_fuzz_dump_buffer_max_bytes())
86    }
87
88    /// Creates session with a custom in-memory buffer limit.
89    pub fn new_with_buffer_limit(
90        metadata: CsvDumpMetadata,
91        max_buffer_bytes: usize,
92    ) -> Result<Self> {
93        let run_dir = build_run_dir(&metadata);
94        create_dir_all(&run_dir).context(error::CreateFileSnafu {
95            path: run_dir.to_string_lossy().to_string(),
96        })?;
97        write_seed_meta(&run_dir, &metadata)?;
98        info!(
99            "Create csv dump session, target: {}, run_dir: {}, max_buffer_bytes: {}",
100            metadata.target,
101            run_dir.display(),
102            max_buffer_bytes
103        );
104
105        Ok(Self {
106            metadata,
107            run_dir,
108            max_buffer_bytes,
109            records: Vec::new(),
110            buffered_bytes: 0,
111            written_tables: HashSet::new(),
112            full_headers_by_table: HashMap::new(),
113        })
114    }
115
116    /// Appends one table CSV records batch with full table headers.
117    pub fn append(&mut self, records: CsvRecords, full_headers: Vec<String>) -> Result<()> {
118        self.full_headers_by_table
119            .entry(records.table_name.clone())
120            .or_insert(full_headers);
121        self.buffered_bytes += estimate_csv_records_size(&records);
122        self.records.push(records);
123        if self.buffered_bytes >= self.max_buffer_bytes {
124            self.flush_buffered_records()?;
125        }
126        Ok(())
127    }
128
129    /// Flushes all appended batches to CSV files.
130    pub fn flush_all(&mut self) -> Result<()> {
131        self.flush_buffered_records()
132    }
133
134    /// Removes session directory after successful validation.
135    pub fn cleanup_on_success(&self) -> std::io::Result<()> {
136        match remove_dir_all(&self.run_dir) {
137            Ok(_) => {
138                info!(
139                    "Cleanup csv dump directory on success: {}",
140                    self.run_dir.display()
141                );
142                Ok(())
143            }
144            Err(err) => {
145                warn!(
146                    "Cleanup csv dump directory failed: {}, error: {:?}",
147                    self.run_dir.display(),
148                    err
149                );
150                Err(err)
151            }
152        }
153    }
154
155    fn flush_buffered_records(&mut self) -> Result<()> {
156        if self.records.is_empty() {
157            return Ok(());
158        }
159        for batch in &self.records {
160            write_batch_csv(
161                &self.run_dir,
162                batch,
163                &mut self.written_tables,
164                &self.full_headers_by_table,
165            )?;
166        }
167        self.records.clear();
168        self.buffered_bytes = 0;
169        Ok(())
170    }
171}
172
173fn write_seed_meta(run_dir: &Path, metadata: &CsvDumpMetadata) -> Result<()> {
174    let path = run_dir.join("seed.meta");
175    let mut file = File::create(&path).context(error::CreateFileSnafu {
176        path: path.to_string_lossy().to_string(),
177    })?;
178
179    let content = format!(
180        "target={}\nseed={}\nactions={}\npartitions={}\ntables={}\nstarted_at_unix_ms={}\n",
181        metadata.target,
182        metadata.seed,
183        metadata.actions,
184        metadata.partitions,
185        metadata.tables,
186        metadata.started_at_unix_ms,
187    );
188    file.write_all(content.as_bytes())
189        .context(error::WriteFileSnafu {
190            path: path.to_string_lossy().to_string(),
191        })
192}
193
194fn write_batch_csv(
195    run_dir: &Path,
196    batch: &CsvRecords,
197    written_tables: &mut HashSet<String>,
198    full_headers_by_table: &HashMap<String, Vec<String>>,
199) -> Result<()> {
200    let output_headers = full_headers_by_table
201        .get(&batch.table_name)
202        .cloned()
203        .unwrap_or_else(|| batch.headers.clone());
204    let file_name = format!("{}.table-data.csv", sanitize_file_name(&batch.table_name));
205    let path = run_dir.join(file_name);
206    let mut file = OpenOptions::new()
207        .create(true)
208        .append(true)
209        .open(&path)
210        .context(error::CreateFileSnafu {
211            path: path.to_string_lossy().to_string(),
212        })?;
213
214    if written_tables.insert(batch.table_name.clone()) {
215        file.write_all(join_line(&output_headers).as_bytes())
216            .context(error::WriteFileSnafu {
217                path: path.to_string_lossy().to_string(),
218            })?;
219        file.write_all(b"\n").context(error::WriteFileSnafu {
220            path: path.to_string_lossy().to_string(),
221        })?;
222    }
223
224    let header_index = batch
225        .headers
226        .iter()
227        .enumerate()
228        .map(|(idx, header)| (header.as_str(), idx))
229        .collect::<HashMap<_, _>>();
230
231    for record in &batch.records {
232        let aligned_values = output_headers
233            .iter()
234            .map(|header| {
235                header_index
236                    .get(header.as_str())
237                    .and_then(|idx| record.values.get(*idx))
238                    .cloned()
239                    .unwrap_or_default()
240            })
241            .collect::<Vec<_>>();
242        file.write_all(join_line(&aligned_values).as_bytes())
243            .context(error::WriteFileSnafu {
244                path: path.to_string_lossy().to_string(),
245            })?;
246        file.write_all(b"\n").context(error::WriteFileSnafu {
247            path: path.to_string_lossy().to_string(),
248        })?;
249    }
250
251    Ok(())
252}
253
254fn estimate_csv_records_size(records: &CsvRecords) -> usize {
255    let headers = records.headers.iter().map(String::len).sum::<usize>();
256    let rows = records
257        .records
258        .iter()
259        .flat_map(|record| record.values.iter())
260        .map(String::len)
261        .sum::<usize>();
262    headers + rows
263}
264
265fn join_line(cells: &[String]) -> String {
266    cells
267        .iter()
268        .map(|cell| escape_csv_cell(cell))
269        .collect::<Vec<_>>()
270        .join(",")
271}
272
273fn escape_csv_cell(value: &str) -> String {
274    if value.contains([',', '"', '\n', '\r']) {
275        format!("\"{}\"", value.replace('"', "\"\""))
276    } else {
277        value.to_string()
278    }
279}
280
281fn sanitize_file_name(raw: &str) -> String {
282    raw.chars()
283        .map(|ch| {
284            if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
285                ch
286            } else {
287                '_'
288            }
289        })
290        .collect()
291}
292
293fn build_run_dir(metadata: &CsvDumpMetadata) -> PathBuf {
294    let base = PathBuf::from(get_gt_fuzz_dump_dir());
295    let suffix = get_gt_fuzz_dump_suffix();
296    let name = format!(
297        "{}_seed_{}_actions_{}_ts_{}{}",
298        metadata.target, metadata.seed, metadata.actions, metadata.started_at_unix_ms, suffix
299    );
300    base.join(name)
301}
302
303#[cfg(test)]
304mod tests {
305    use super::{CsvDumpMetadata, CsvDumpSession};
306    use crate::translator::csv::{CsvRecord, CsvRecords};
307
308    #[test]
309    fn test_create_session_and_flush() {
310        let mut session = CsvDumpSession::new_with_buffer_limit(
311            CsvDumpMetadata::new("fuzz_case", 1, 2, 3, 4),
312            1024,
313        )
314        .unwrap();
315        session
316            .append(
317                CsvRecords {
318                    table_name: "metric-a".to_string(),
319                    headers: vec!["host".to_string(), "value".to_string()],
320                    records: vec![CsvRecord {
321                        values: vec!["web-1".to_string(), "10".to_string()],
322                    }],
323                },
324                vec!["host".to_string(), "value".to_string()],
325            )
326            .unwrap();
327        session.flush_all().unwrap();
328
329        assert!(session.run_dir.exists());
330        assert!(session.run_dir.join("seed.meta").exists());
331        assert!(session.run_dir.join("metric-a.table-data.csv").exists());
332    }
333
334    #[test]
335    fn test_auto_flush_on_buffer_limit() {
336        let mut session =
337            CsvDumpSession::new_with_buffer_limit(CsvDumpMetadata::new("fuzz_case", 5, 2, 3, 4), 1)
338                .unwrap();
339        session
340            .append(
341                CsvRecords {
342                    table_name: "metric-b".to_string(),
343                    headers: vec!["host".to_string()],
344                    records: vec![CsvRecord {
345                        values: vec!["web-2".to_string()],
346                    }],
347                },
348                vec!["host".to_string()],
349            )
350            .unwrap();
351
352        assert!(session.run_dir.join("metric-b.table-data.csv").exists());
353        assert_eq!(session.buffered_bytes, 0);
354    }
355
356    #[test]
357    fn test_flush_with_partial_headers_uses_full_headers() {
358        let mut session = CsvDumpSession::new_with_buffer_limit(
359            CsvDumpMetadata::new("fuzz_case", 7, 2, 3, 4),
360            1024,
361        )
362        .unwrap();
363        session
364            .append(
365                CsvRecords {
366                    table_name: "metric-c".to_string(),
367                    headers: vec!["host".to_string(), "value".to_string()],
368                    records: vec![CsvRecord {
369                        values: vec!["web-3".to_string(), "12".to_string()],
370                    }],
371                },
372                vec!["host".to_string(), "idc".to_string(), "value".to_string()],
373            )
374            .unwrap();
375        session.flush_all().unwrap();
376
377        let file =
378            std::fs::read_to_string(session.run_dir.join("metric-c.table-data.csv")).unwrap();
379        let mut lines = file.lines();
380        assert_eq!(lines.next().unwrap(), "host,idc,value");
381        assert_eq!(lines.next().unwrap(), "web-3,,12");
382    }
383}