1use std::collections::{HashMap, HashSet};
16use std::fs::{File, OpenOptions, create_dir_all, remove_dir_all};
17use std::io::Write;
18use std::path::{Path, PathBuf};
19
20use common_telemetry::{info, warn};
21use common_time::util::current_time_millis;
22use snafu::ResultExt;
23
24use crate::error::{self, Result};
25use crate::translator::csv::CsvRecords;
26use crate::utils::{
27 get_gt_fuzz_dump_buffer_max_bytes, get_gt_fuzz_dump_dir, get_gt_fuzz_dump_suffix,
28};
29
30#[derive(Debug, Clone)]
32pub struct CsvDumpMetadata {
33 pub target: String,
35 pub seed: u64,
37 pub actions: usize,
39 pub partitions: usize,
41 pub tables: usize,
43 pub started_at_unix_ms: i64,
45}
46
47impl CsvDumpMetadata {
48 pub fn new(
50 target: impl Into<String>,
51 seed: u64,
52 actions: usize,
53 partitions: usize,
54 tables: usize,
55 ) -> Self {
56 Self {
57 target: target.into(),
58 seed,
59 actions,
60 partitions,
61 tables,
62 started_at_unix_ms: current_time_millis(),
63 }
64 }
65}
66
67#[derive(Debug)]
69pub struct CsvDumpSession {
70 pub metadata: CsvDumpMetadata,
72 pub run_dir: PathBuf,
74 pub max_buffer_bytes: usize,
76 records: Vec<CsvRecords>,
77 buffered_bytes: usize,
78 written_tables: HashSet<String>,
79 full_headers_by_table: HashMap<String, Vec<String>>,
80}
81
82impl CsvDumpSession {
83 pub fn new(metadata: CsvDumpMetadata) -> Result<Self> {
85 Self::new_with_buffer_limit(metadata, get_gt_fuzz_dump_buffer_max_bytes())
86 }
87
88 pub fn new_with_buffer_limit(
90 metadata: CsvDumpMetadata,
91 max_buffer_bytes: usize,
92 ) -> Result<Self> {
93 let run_dir = build_run_dir(&metadata);
94 create_dir_all(&run_dir).context(error::CreateFileSnafu {
95 path: run_dir.to_string_lossy().to_string(),
96 })?;
97 write_seed_meta(&run_dir, &metadata)?;
98 info!(
99 "Create csv dump session, target: {}, run_dir: {}, max_buffer_bytes: {}",
100 metadata.target,
101 run_dir.display(),
102 max_buffer_bytes
103 );
104
105 Ok(Self {
106 metadata,
107 run_dir,
108 max_buffer_bytes,
109 records: Vec::new(),
110 buffered_bytes: 0,
111 written_tables: HashSet::new(),
112 full_headers_by_table: HashMap::new(),
113 })
114 }
115
116 pub fn append(&mut self, records: CsvRecords, full_headers: Vec<String>) -> Result<()> {
118 self.full_headers_by_table
119 .entry(records.table_name.clone())
120 .or_insert(full_headers);
121 self.buffered_bytes += estimate_csv_records_size(&records);
122 self.records.push(records);
123 if self.buffered_bytes >= self.max_buffer_bytes {
124 self.flush_buffered_records()?;
125 }
126 Ok(())
127 }
128
129 pub fn flush_all(&mut self) -> Result<()> {
131 self.flush_buffered_records()
132 }
133
134 pub fn cleanup_on_success(&self) -> std::io::Result<()> {
136 match remove_dir_all(&self.run_dir) {
137 Ok(_) => {
138 info!(
139 "Cleanup csv dump directory on success: {}",
140 self.run_dir.display()
141 );
142 Ok(())
143 }
144 Err(err) => {
145 warn!(
146 "Cleanup csv dump directory failed: {}, error: {:?}",
147 self.run_dir.display(),
148 err
149 );
150 Err(err)
151 }
152 }
153 }
154
155 fn flush_buffered_records(&mut self) -> Result<()> {
156 if self.records.is_empty() {
157 return Ok(());
158 }
159 for batch in &self.records {
160 write_batch_csv(
161 &self.run_dir,
162 batch,
163 &mut self.written_tables,
164 &self.full_headers_by_table,
165 )?;
166 }
167 self.records.clear();
168 self.buffered_bytes = 0;
169 Ok(())
170 }
171}
172
173fn write_seed_meta(run_dir: &Path, metadata: &CsvDumpMetadata) -> Result<()> {
174 let path = run_dir.join("seed.meta");
175 let mut file = File::create(&path).context(error::CreateFileSnafu {
176 path: path.to_string_lossy().to_string(),
177 })?;
178
179 let content = format!(
180 "target={}\nseed={}\nactions={}\npartitions={}\ntables={}\nstarted_at_unix_ms={}\n",
181 metadata.target,
182 metadata.seed,
183 metadata.actions,
184 metadata.partitions,
185 metadata.tables,
186 metadata.started_at_unix_ms,
187 );
188 file.write_all(content.as_bytes())
189 .context(error::WriteFileSnafu {
190 path: path.to_string_lossy().to_string(),
191 })
192}
193
194fn write_batch_csv(
195 run_dir: &Path,
196 batch: &CsvRecords,
197 written_tables: &mut HashSet<String>,
198 full_headers_by_table: &HashMap<String, Vec<String>>,
199) -> Result<()> {
200 let output_headers = full_headers_by_table
201 .get(&batch.table_name)
202 .cloned()
203 .unwrap_or_else(|| batch.headers.clone());
204 let file_name = format!("{}.table-data.csv", sanitize_file_name(&batch.table_name));
205 let path = run_dir.join(file_name);
206 let mut file = OpenOptions::new()
207 .create(true)
208 .append(true)
209 .open(&path)
210 .context(error::CreateFileSnafu {
211 path: path.to_string_lossy().to_string(),
212 })?;
213
214 if written_tables.insert(batch.table_name.clone()) {
215 file.write_all(join_line(&output_headers).as_bytes())
216 .context(error::WriteFileSnafu {
217 path: path.to_string_lossy().to_string(),
218 })?;
219 file.write_all(b"\n").context(error::WriteFileSnafu {
220 path: path.to_string_lossy().to_string(),
221 })?;
222 }
223
224 let header_index = batch
225 .headers
226 .iter()
227 .enumerate()
228 .map(|(idx, header)| (header.as_str(), idx))
229 .collect::<HashMap<_, _>>();
230
231 for record in &batch.records {
232 let aligned_values = output_headers
233 .iter()
234 .map(|header| {
235 header_index
236 .get(header.as_str())
237 .and_then(|idx| record.values.get(*idx))
238 .cloned()
239 .unwrap_or_default()
240 })
241 .collect::<Vec<_>>();
242 file.write_all(join_line(&aligned_values).as_bytes())
243 .context(error::WriteFileSnafu {
244 path: path.to_string_lossy().to_string(),
245 })?;
246 file.write_all(b"\n").context(error::WriteFileSnafu {
247 path: path.to_string_lossy().to_string(),
248 })?;
249 }
250
251 Ok(())
252}
253
254fn estimate_csv_records_size(records: &CsvRecords) -> usize {
255 let headers = records.headers.iter().map(String::len).sum::<usize>();
256 let rows = records
257 .records
258 .iter()
259 .flat_map(|record| record.values.iter())
260 .map(String::len)
261 .sum::<usize>();
262 headers + rows
263}
264
265fn join_line(cells: &[String]) -> String {
266 cells
267 .iter()
268 .map(|cell| escape_csv_cell(cell))
269 .collect::<Vec<_>>()
270 .join(",")
271}
272
273fn escape_csv_cell(value: &str) -> String {
274 if value.contains([',', '"', '\n', '\r']) {
275 format!("\"{}\"", value.replace('"', "\"\""))
276 } else {
277 value.to_string()
278 }
279}
280
281fn sanitize_file_name(raw: &str) -> String {
282 raw.chars()
283 .map(|ch| {
284 if ch.is_ascii_alphanumeric() || ch == '_' || ch == '-' {
285 ch
286 } else {
287 '_'
288 }
289 })
290 .collect()
291}
292
293fn build_run_dir(metadata: &CsvDumpMetadata) -> PathBuf {
294 let base = PathBuf::from(get_gt_fuzz_dump_dir());
295 let suffix = get_gt_fuzz_dump_suffix();
296 let name = format!(
297 "{}_seed_{}_actions_{}_ts_{}{}",
298 metadata.target, metadata.seed, metadata.actions, metadata.started_at_unix_ms, suffix
299 );
300 base.join(name)
301}
302
303#[cfg(test)]
304mod tests {
305 use super::{CsvDumpMetadata, CsvDumpSession};
306 use crate::translator::csv::{CsvRecord, CsvRecords};
307
308 #[test]
309 fn test_create_session_and_flush() {
310 let mut session = CsvDumpSession::new_with_buffer_limit(
311 CsvDumpMetadata::new("fuzz_case", 1, 2, 3, 4),
312 1024,
313 )
314 .unwrap();
315 session
316 .append(
317 CsvRecords {
318 table_name: "metric-a".to_string(),
319 headers: vec!["host".to_string(), "value".to_string()],
320 records: vec![CsvRecord {
321 values: vec!["web-1".to_string(), "10".to_string()],
322 }],
323 },
324 vec!["host".to_string(), "value".to_string()],
325 )
326 .unwrap();
327 session.flush_all().unwrap();
328
329 assert!(session.run_dir.exists());
330 assert!(session.run_dir.join("seed.meta").exists());
331 assert!(session.run_dir.join("metric-a.table-data.csv").exists());
332 }
333
334 #[test]
335 fn test_auto_flush_on_buffer_limit() {
336 let mut session =
337 CsvDumpSession::new_with_buffer_limit(CsvDumpMetadata::new("fuzz_case", 5, 2, 3, 4), 1)
338 .unwrap();
339 session
340 .append(
341 CsvRecords {
342 table_name: "metric-b".to_string(),
343 headers: vec!["host".to_string()],
344 records: vec![CsvRecord {
345 values: vec!["web-2".to_string()],
346 }],
347 },
348 vec!["host".to_string()],
349 )
350 .unwrap();
351
352 assert!(session.run_dir.join("metric-b.table-data.csv").exists());
353 assert_eq!(session.buffered_bytes, 0);
354 }
355
356 #[test]
357 fn test_flush_with_partial_headers_uses_full_headers() {
358 let mut session = CsvDumpSession::new_with_buffer_limit(
359 CsvDumpMetadata::new("fuzz_case", 7, 2, 3, 4),
360 1024,
361 )
362 .unwrap();
363 session
364 .append(
365 CsvRecords {
366 table_name: "metric-c".to_string(),
367 headers: vec!["host".to_string(), "value".to_string()],
368 records: vec![CsvRecord {
369 values: vec!["web-3".to_string(), "12".to_string()],
370 }],
371 },
372 vec!["host".to_string(), "idc".to_string(), "value".to_string()],
373 )
374 .unwrap();
375 session.flush_all().unwrap();
376
377 let file =
378 std::fs::read_to_string(session.run_dir.join("metric-c.table-data.csv")).unwrap();
379 let mut lines = file.lines();
380 assert_eq!(lines.next().unwrap(), "host,idc,value");
381 assert_eq!(lines.next().unwrap(), "web-3,,12");
382 }
383}