Skip to main content

ssg/
stream.rs

1// Copyright © 2023 - 2026 Static Site Generator (SSG). All rights reserved.
2// SPDX-License-Identifier: Apache-2.0 OR MIT
3
4//! High-performance streaming file processor.
5//!
6//! Provides constant-memory file processing for workloads from 1K to 50K+
7//! files. All I/O uses fixed-size buffers — memory usage does not grow
8//! with file size or transaction count.
9//!
10//! # Performance targets
11//!
12//! - Time to first result: < 2 ms
13//! - Throughput: >= 50,000 files/second
14//! - Memory: constant O(1) per file via streaming
15//!
16//! # Architecture
17//!
18//! Files are processed through a pipeline of `StreamProcessor` stages.
19//! Each stage reads from a buffered input, transforms in a fixed-size
20//! buffer, and writes to a buffered output. No file is ever fully loaded
21//! into memory unless it fits within the buffer size.
22
23use anyhow::{Context, Result};
24use std::fs::{self, File};
25use std::io::{BufReader, BufWriter, Read, Write};
26use std::path::{Path, PathBuf};
27use std::time::Instant;
28
29/// Default buffer size for streaming I/O (8 KB).
30/// Aligned to typical filesystem block size for optimal throughput.
31pub const STREAM_BUFFER_SIZE: usize = 8 * 1024;
32
33/// Maximum number of files to process in a single batch.
34/// Bounds memory for directory listings per Power of Ten Rule 2.
35pub const MAX_BATCH_SIZE: usize = 100_000;
36
37/// Result of processing a batch of files.
38#[derive(Debug, Clone, Copy)]
39pub struct BatchResult {
40    /// Number of files processed.
41    pub files_processed: usize,
42    /// Total bytes read across all files.
43    pub bytes_read: u64,
44    /// Total bytes written across all files.
45    pub bytes_written: u64,
46    /// Wall-clock duration of the batch.
47    pub duration_ms: f64,
48    /// Throughput in files per second.
49    pub throughput: f64,
50}
51
52/// Copies a single file using buffered streaming I/O.
53///
54/// Reads and writes in `STREAM_BUFFER_SIZE` chunks. Memory usage is
55/// constant regardless of file size — a 1 KB file and a 1 GB file
56/// use the same buffer.
57///
58/// # Errors
59///
60/// Returns an error if the source cannot be read or the destination
61/// cannot be written.
62pub fn stream_copy(src: &Path, dst: &Path) -> Result<u64> {
63    let file_in = File::open(src)
64        .with_context(|| format!("cannot open {}", src.display()))?;
65    let file_out = File::create(dst)
66        .with_context(|| format!("cannot create {}", dst.display()))?;
67
68    let mut reader = BufReader::with_capacity(STREAM_BUFFER_SIZE, file_in);
69    let mut writer = BufWriter::with_capacity(STREAM_BUFFER_SIZE, file_out);
70
71    let mut buf = [0u8; STREAM_BUFFER_SIZE];
72    let mut total: u64 = 0;
73
74    loop {
75        let n = reader
76            .read(&mut buf)
77            .with_context(|| format!("read error: {}", src.display()))?;
78        if n == 0 {
79            break;
80        }
81        writer
82            .write_all(&buf[..n])
83            .with_context(|| format!("write error: {}", dst.display()))?;
84        total += n as u64;
85    }
86
87    writer
88        .flush()
89        .with_context(|| format!("flush error: {}", dst.display()))?;
90
91    Ok(total)
92}
93
94/// Hashes a file using streaming I/O with constant memory.
95///
96/// Reads in `STREAM_BUFFER_SIZE` chunks and feeds each chunk to a
97/// `DefaultHasher`. Never loads the entire file into memory.
98///
99/// Returns a 16-character hex fingerprint.
100pub fn stream_hash(path: &Path) -> Result<String> {
101    use std::hash::{DefaultHasher, Hasher};
102
103    let file = File::open(path)
104        .with_context(|| format!("cannot open {}", path.display()))?;
105    let mut reader = BufReader::with_capacity(STREAM_BUFFER_SIZE, file);
106    let mut hasher = DefaultHasher::new();
107    let mut buf = [0u8; STREAM_BUFFER_SIZE];
108
109    loop {
110        let n = reader
111            .read(&mut buf)
112            .with_context(|| format!("read error: {}", path.display()))?;
113        if n == 0 {
114            break;
115        }
116        hasher.write(&buf[..n]);
117    }
118
119    Ok(format!("{:016x}", hasher.finish()))
120}
121
122/// Processes a batch of files through a streaming pipeline.
123///
124/// Applies `processor` to each file in `src_dir`, writing results to
125/// `dst_dir`. Processes files sequentially with constant memory. For
126/// parallel processing, use `process_batch_parallel`.
127///
128/// # Errors
129///
130/// Returns an error if any file cannot be read, processed, or written.
131/// Processing stops at the first error.
132pub fn process_batch<F>(
133    src_dir: &Path,
134    dst_dir: &Path,
135    processor: F,
136) -> Result<BatchResult>
137where
138    F: Fn(&Path, &Path) -> Result<u64>,
139{
140    let start = Instant::now();
141
142    fs::create_dir_all(dst_dir)
143        .with_context(|| format!("cannot create {}", dst_dir.display()))?;
144
145    let entries: Vec<PathBuf> = collect_files_bounded(src_dir)?;
146    let mut bytes_read: u64 = 0;
147    let mut bytes_written: u64 = 0;
148    let mut count: usize = 0;
149
150    for src_path in &entries {
151        let rel = src_path
152            .strip_prefix(src_dir)
153            .with_context(|| "strip_prefix failed")?;
154        let dst_path = dst_dir.join(rel);
155
156        if let Some(parent) = dst_path.parent() {
157            fs::create_dir_all(parent)?;
158        }
159
160        let src_size = fs::metadata(src_path).map_or(0, |m| m.len());
161        let written = processor(src_path, &dst_path)?;
162
163        bytes_read += src_size;
164        bytes_written += written;
165        count += 1;
166    }
167
168    let elapsed = start.elapsed();
169    let duration_ms = elapsed.as_secs_f64() * 1000.0;
170    let throughput = if duration_ms > 0.0 {
171        count as f64 / elapsed.as_secs_f64()
172    } else {
173        f64::INFINITY
174    };
175
176    Ok(BatchResult {
177        files_processed: count,
178        bytes_read,
179        bytes_written,
180        duration_ms,
181        throughput,
182    })
183}
184
185/// Collects files from a directory with a bounded iteration count.
186///
187/// Returns at most `MAX_BATCH_SIZE` files. Uses iterative traversal
188/// (no recursion) with depth tracking.
189fn collect_files_bounded(dir: &Path) -> Result<Vec<PathBuf>> {
190    collect_files_bounded_with_limit(dir, MAX_BATCH_SIZE)
191}
192
193/// Inner walker accepting an explicit limit.
194///
195/// Extracted so unit tests can exercise the saturation `break`
196/// branches without allocating `MAX_BATCH_SIZE` (100k) files on disk.
197fn collect_files_bounded_with_limit(
198    dir: &Path,
199    limit: usize,
200) -> Result<Vec<PathBuf>> {
201    let mut files = Vec::new();
202    let mut stack = vec![dir.to_path_buf()];
203    let mut iterations: usize = 0;
204
205    while let Some(current) = stack.pop() {
206        if iterations >= limit {
207            break;
208        }
209
210        let entries = fs::read_dir(&current)
211            .with_context(|| format!("cannot read {}", current.display()))?;
212
213        for entry in entries {
214            let path = entry?.path();
215            if path.is_dir() {
216                stack.push(path);
217            } else {
218                files.push(path);
219                iterations += 1;
220                if iterations >= limit {
221                    break;
222                }
223            }
224        }
225    }
226
227    Ok(files)
228}
229
230/// Processes a file by reading line-by-line with constant memory.
231///
232/// Calls `line_fn` for each line. The line buffer is reused across
233/// iterations — memory does not grow with file length.
234///
235/// # Errors
236///
237/// Returns an error if the file cannot be read.
238pub fn stream_lines<F>(path: &Path, mut line_fn: F) -> Result<usize>
239where
240    F: FnMut(usize, &str) -> Result<()>,
241{
242    use std::io::BufRead;
243
244    let file = File::open(path)
245        .with_context(|| format!("cannot open {}", path.display()))?;
246    let reader = BufReader::with_capacity(STREAM_BUFFER_SIZE, file);
247    let mut count: usize = 0;
248
249    for line in reader.lines() {
250        let line =
251            line.with_context(|| format!("read error at line {count}"))?;
252        line_fn(count, &line)?;
253        count += 1;
254    }
255
256    Ok(count)
257}
258
259/// Returns the throughput of a no-op pipeline to measure overhead.
260///
261/// Creates `n` temporary files and streams them through `stream_copy`.
262/// Returns the measured throughput in files/second.
263#[cfg(any(test, feature = "benchmark"))]
264pub fn benchmark_throughput(n: usize) -> Result<BatchResult> {
265    let tmp = tempfile::tempdir().context("cannot create temp dir")?;
266    let src = tmp.path().join("src");
267    let dst = tmp.path().join("dst");
268    fs::create_dir_all(&src)?;
269
270    // Create n small files (64 bytes each)
271    for i in 0..n {
272        fs::write(src.join(format!("f{i}.txt")), "a]".repeat(32))?;
273    }
274
275    process_batch(&src, &dst, stream_copy)
276}
277
278#[cfg(test)]
279#[allow(clippy::unwrap_used, clippy::expect_used)]
280mod tests {
281    use super::*;
282    use tempfile::tempdir;
283
284    #[test]
285    fn test_stream_copy_small_file() -> Result<()> {
286        let tmp = tempdir()?;
287        let src = tmp.path().join("src.txt");
288        let dst = tmp.path().join("dst.txt");
289        fs::write(&src, "hello world")?;
290
291        let bytes = stream_copy(&src, &dst)?;
292        assert_eq!(bytes, 11);
293        assert_eq!(fs::read_to_string(&dst)?, "hello world");
294        Ok(())
295    }
296
297    #[test]
298    fn test_stream_copy_large_file() -> Result<()> {
299        let tmp = tempdir()?;
300        let src = tmp.path().join("large.bin");
301        let dst = tmp.path().join("large_copy.bin");
302
303        // 1 MB file — larger than STREAM_BUFFER_SIZE
304        let data = vec![0xABu8; 1024 * 1024];
305        fs::write(&src, &data)?;
306
307        let bytes = stream_copy(&src, &dst)?;
308        assert_eq!(bytes, 1024 * 1024);
309        assert_eq!(fs::read(&dst)?, data);
310        Ok(())
311    }
312
313    #[test]
314    fn test_stream_copy_empty_file() -> Result<()> {
315        let tmp = tempdir()?;
316        let src = tmp.path().join("empty.txt");
317        let dst = tmp.path().join("empty_copy.txt");
318        fs::write(&src, "")?;
319
320        let bytes = stream_copy(&src, &dst)?;
321        assert_eq!(bytes, 0);
322        Ok(())
323    }
324
325    #[test]
326    fn test_stream_hash_deterministic() -> Result<()> {
327        let tmp = tempdir()?;
328        let path = tmp.path().join("test.txt");
329        fs::write(&path, "consistent content")?;
330
331        let h1 = stream_hash(&path)?;
332        let h2 = stream_hash(&path)?;
333        assert_eq!(h1, h2);
334        assert_eq!(h1.len(), 16);
335        Ok(())
336    }
337
338    #[test]
339    fn test_stream_hash_differs_for_different_content() -> Result<()> {
340        let tmp = tempdir()?;
341        let a = tmp.path().join("a.txt");
342        let b = tmp.path().join("b.txt");
343        fs::write(&a, "content a")?;
344        fs::write(&b, "content b")?;
345
346        assert_ne!(stream_hash(&a)?, stream_hash(&b)?);
347        Ok(())
348    }
349
350    #[test]
351    fn test_stream_hash_large_file() -> Result<()> {
352        let tmp = tempdir()?;
353        let path = tmp.path().join("big.bin");
354        fs::write(&path, vec![0u8; 100_000])?;
355
356        let hash = stream_hash(&path)?;
357        assert_eq!(hash.len(), 16);
358        Ok(())
359    }
360
361    #[test]
362    fn test_process_batch_copies_files() -> Result<()> {
363        let tmp = tempdir()?;
364        let src = tmp.path().join("src");
365        let dst = tmp.path().join("dst");
366        fs::create_dir_all(&src)?;
367
368        for i in 0..10 {
369            fs::write(src.join(format!("f{i}.txt")), format!("data {i}"))?;
370        }
371
372        let result = process_batch(&src, &dst, stream_copy)?;
373        assert_eq!(result.files_processed, 10);
374        assert!(result.bytes_written > 0);
375        assert!(result.throughput > 0.0);
376        Ok(())
377    }
378
379    #[test]
380    fn test_process_batch_empty_directory() -> Result<()> {
381        let tmp = tempdir()?;
382        let src = tmp.path().join("src");
383        let dst = tmp.path().join("dst");
384        fs::create_dir_all(&src)?;
385
386        let result = process_batch(&src, &dst, stream_copy)?;
387        assert_eq!(result.files_processed, 0);
388        Ok(())
389    }
390
391    #[test]
392    fn test_process_batch_nested_dirs() -> Result<()> {
393        let tmp = tempdir()?;
394        let src = tmp.path().join("src");
395        let dst = tmp.path().join("dst");
396        fs::create_dir_all(src.join("sub/deep"))?;
397        fs::write(src.join("root.txt"), "root")?;
398        fs::write(src.join("sub/mid.txt"), "mid")?;
399        fs::write(src.join("sub/deep/leaf.txt"), "leaf")?;
400
401        let result = process_batch(&src, &dst, stream_copy)?;
402        assert_eq!(result.files_processed, 3);
403        assert_eq!(fs::read_to_string(dst.join("sub/deep/leaf.txt"))?, "leaf");
404        Ok(())
405    }
406
407    #[test]
408    fn test_stream_lines_counts_correctly() -> Result<()> {
409        let tmp = tempdir()?;
410        let path = tmp.path().join("lines.txt");
411        fs::write(&path, "line1\nline2\nline3\n")?;
412
413        let count = stream_lines(&path, |_i, _line| Ok(()))?;
414        assert_eq!(count, 3);
415        Ok(())
416    }
417
418    #[test]
419    fn test_stream_lines_provides_content() -> Result<()> {
420        let tmp = tempdir()?;
421        let path = tmp.path().join("data.txt");
422        fs::write(&path, "alpha\nbeta\ngamma")?;
423
424        let mut collected = Vec::new();
425        let _ = stream_lines(&path, |_i, line| {
426            collected.push(line.to_string());
427            Ok(())
428        })?;
429        assert_eq!(collected, vec!["alpha", "beta", "gamma"]);
430        Ok(())
431    }
432
433    #[test]
434    fn test_collect_files_bounded_respects_limit() -> Result<()> {
435        let tmp = tempdir()?;
436        // MAX_BATCH_SIZE is 100_000 — just verify it doesn't panic
437        for i in 0..50 {
438            fs::write(tmp.path().join(format!("f{i}.txt")), "x")?;
439        }
440        let files = collect_files_bounded(tmp.path())?;
441        assert_eq!(files.len(), 50);
442        Ok(())
443    }
444
445    #[test]
446    fn collect_files_bounded_with_limit_breaks_on_outer_loop_saturation(
447    ) -> Result<()> {
448        // Hits the `if iterations >= limit { break }` at the top of
449        // the outer while loop (line 196 of the public version).
450        // We add files in batches across multiple subdirectories so
451        // the inner break fires first, leaves leftover stack entries,
452        // and then the next outer-loop pop sees iterations == limit.
453        let tmp = tempdir()?;
454        let a = tmp.path().join("a");
455        let b = tmp.path().join("b");
456        fs::create_dir_all(&a)?;
457        fs::create_dir_all(&b)?;
458        for i in 0..3 {
459            fs::write(a.join(format!("f{i}.txt")), "x")?;
460            fs::write(b.join(format!("f{i}.txt")), "x")?;
461        }
462
463        let files = collect_files_bounded_with_limit(tmp.path(), 2)?;
464        // The cap is honoured: at most `limit` files returned
465        // (may be slightly more depending on which subdir is popped
466        // first; the contract is "at most" with break-on-saturation).
467        assert!(files.len() <= 4);
468        Ok(())
469    }
470
471    #[test]
472    fn collect_files_bounded_with_limit_breaks_on_inner_loop_saturation(
473    ) -> Result<()> {
474        // Hits the inner `if iterations >= limit { break }` (line 210
475        // of the public version) — file count exceeds limit during
476        // a single read_dir iteration.
477        let tmp = tempdir()?;
478        for i in 0..10 {
479            fs::write(tmp.path().join(format!("f{i}.txt")), "x")?;
480        }
481        let files = collect_files_bounded_with_limit(tmp.path(), 3)?;
482        assert_eq!(files.len(), 3);
483        Ok(())
484    }
485
486    #[test]
487    fn test_benchmark_throughput_runs() -> Result<()> {
488        let result = benchmark_throughput(100)?;
489        assert_eq!(result.files_processed, 100);
490        assert!(
491            result.throughput.is_finite() && result.throughput > 0.0,
492            "invalid throughput: {}",
493            result.throughput
494        );
495        println!(
496            "Benchmark: {} files in {:.2} ms ({:.0} files/sec)",
497            result.files_processed, result.duration_ms, result.throughput
498        );
499        Ok(())
500    }
501
502    #[test]
503    fn test_batch_result_fields() {
504        let r = BatchResult {
505            files_processed: 10,
506            bytes_read: 1000,
507            bytes_written: 900,
508            duration_ms: 1.5,
509            throughput: 6666.0,
510        };
511        assert_eq!(r.files_processed, 10);
512        assert!(r.throughput > 0.0);
513    }
514
515    #[test]
516    fn test_stream_copy_nonexistent_source() {
517        let dst = std::env::temp_dir().join("ssg_stream_copy_out");
518        let result =
519            stream_copy(Path::new("/definitely-does-not-exist-ssg"), &dst);
520        assert!(result.is_err());
521    }
522
523    #[test]
524    fn test_stream_hash_nonexistent() {
525        let result = stream_hash(Path::new("/nonexistent"));
526        assert!(result.is_err());
527    }
528
529    #[test]
530    fn test_stream_lines_empty_file() -> Result<()> {
531        let tmp = tempdir()?;
532        let path = tmp.path().join("empty.txt");
533        fs::write(&path, "")?;
534
535        let count = stream_lines(&path, |_i, _line| Ok(()))?;
536        assert_eq!(count, 0);
537        Ok(())
538    }
539
540    #[test]
541    fn stream_copy_exact_buffer_size_file() -> Result<()> {
542        // Arrange
543        let tmp = tempdir()?;
544        let src = tmp.path().join("exact.bin");
545        let dst = tmp.path().join("exact_copy.bin");
546        let data = vec![0xCDu8; STREAM_BUFFER_SIZE];
547        fs::write(&src, &data)?;
548
549        // Act
550        let bytes = stream_copy(&src, &dst)?;
551
552        // Assert
553        assert_eq!(bytes, STREAM_BUFFER_SIZE as u64);
554        assert_eq!(fs::read(&dst)?, data);
555        Ok(())
556    }
557
558    #[test]
559    fn stream_hash_empty_file() -> Result<()> {
560        // Arrange
561        let tmp = tempdir()?;
562        let path = tmp.path().join("empty.bin");
563        fs::write(&path, b"")?;
564
565        // Act
566        let h1 = stream_hash(&path)?;
567        let h2 = stream_hash(&path)?;
568
569        // Assert
570        assert_eq!(h1, h2, "hash of empty file must be deterministic");
571        assert_eq!(h1.len(), 16);
572        Ok(())
573    }
574
575    #[test]
576    fn stream_hash_same_content_same_hash() -> Result<()> {
577        // Arrange
578        let tmp = tempdir()?;
579        let a = tmp.path().join("file_a.txt");
580        let b = tmp.path().join("file_b.txt");
581        let content = "identical content in both files";
582        fs::write(&a, content)?;
583        fs::write(&b, content)?;
584
585        // Act
586        let hash_a = stream_hash(&a)?;
587        let hash_b = stream_hash(&b)?;
588
589        // Assert
590        assert_eq!(hash_a, hash_b, "same content must produce same hash");
591        Ok(())
592    }
593
594    #[test]
595    fn stream_lines_binary_content() -> Result<()> {
596        // Arrange — file with no newline characters
597        let tmp = tempdir()?;
598        let path = tmp.path().join("binary.bin");
599        fs::write(&path, "no-newlines-here")?;
600
601        // Act
602        let mut lines_seen = Vec::new();
603        let count = stream_lines(&path, |_i, line| {
604            lines_seen.push(line.to_string());
605            Ok(())
606        })?;
607
608        // Assert — single line, no newline splitting
609        assert_eq!(count, 1);
610        assert_eq!(lines_seen, vec!["no-newlines-here"]);
611        Ok(())
612    }
613
614    #[test]
615    fn process_batch_empty_directory() -> Result<()> {
616        // Arrange — source directory with no files
617        let tmp = tempdir()?;
618        let src = tmp.path().join("empty_src");
619        let dst = tmp.path().join("empty_dst");
620        fs::create_dir_all(&src)?;
621
622        // Act
623        let result = process_batch(&src, &dst, stream_copy)?;
624
625        // Assert
626        assert_eq!(result.files_processed, 0);
627        assert_eq!(result.bytes_read, 0);
628        assert_eq!(result.bytes_written, 0);
629        Ok(())
630    }
631
632    // -----------------------------------------------------------------
633    // stream_copy — additional edge cases
634    // -----------------------------------------------------------------
635
636    #[test]
637    fn stream_copy_file_just_over_buffer_boundary() -> Result<()> {
638        let tmp = tempdir()?;
639        let src = tmp.path().join("over.bin");
640        let dst = tmp.path().join("over_copy.bin");
641        // One byte beyond buffer size forces two reads.
642        let data = vec![0xEFu8; STREAM_BUFFER_SIZE + 1];
643        fs::write(&src, &data)?;
644
645        let bytes = stream_copy(&src, &dst)?;
646        assert_eq!(bytes, (STREAM_BUFFER_SIZE + 1) as u64);
647        assert_eq!(fs::read(&dst)?, data);
648        Ok(())
649    }
650
651    #[test]
652    fn stream_copy_file_just_under_buffer_boundary() -> Result<()> {
653        let tmp = tempdir()?;
654        let src = tmp.path().join("under.bin");
655        let dst = tmp.path().join("under_copy.bin");
656        let data = vec![0xAAu8; STREAM_BUFFER_SIZE - 1];
657        fs::write(&src, &data)?;
658
659        let bytes = stream_copy(&src, &dst)?;
660        assert_eq!(bytes, (STREAM_BUFFER_SIZE - 1) as u64);
661        assert_eq!(fs::read(&dst)?, data);
662        Ok(())
663    }
664
665    #[test]
666    fn stream_copy_multiple_of_buffer_size() -> Result<()> {
667        let tmp = tempdir()?;
668        let src = tmp.path().join("multi.bin");
669        let dst = tmp.path().join("multi_copy.bin");
670        let data = vec![0xBBu8; STREAM_BUFFER_SIZE * 3];
671        fs::write(&src, &data)?;
672
673        let bytes = stream_copy(&src, &dst)?;
674        assert_eq!(bytes, (STREAM_BUFFER_SIZE * 3) as u64);
675        assert_eq!(fs::read(&dst)?, data);
676        Ok(())
677    }
678
679    #[test]
680    fn stream_copy_single_byte() -> Result<()> {
681        let tmp = tempdir()?;
682        let src = tmp.path().join("one.bin");
683        let dst = tmp.path().join("one_copy.bin");
684        fs::write(&src, [0x42])?;
685
686        let bytes = stream_copy(&src, &dst)?;
687        assert_eq!(bytes, 1);
688        assert_eq!(fs::read(&dst)?, vec![0x42]);
689        Ok(())
690    }
691
692    #[test]
693    fn stream_copy_dst_parent_does_not_exist() {
694        let tmp = tempdir().unwrap();
695        let src = tmp.path().join("src.txt");
696        fs::write(&src, "data").unwrap();
697        let dst = tmp.path().join("no/such/parent/out.txt");
698
699        let err = stream_copy(&src, &dst);
700        assert!(err.is_err());
701    }
702
703    // -----------------------------------------------------------------
704    // stream_hash — additional edge cases
705    // -----------------------------------------------------------------
706
707    #[test]
708    fn stream_hash_multi_chunk_file() -> Result<()> {
709        let tmp = tempdir()?;
710        let path = tmp.path().join("multi_chunk.bin");
711        // Force multiple read iterations
712        let data = vec![0xCCu8; STREAM_BUFFER_SIZE * 2 + 100];
713        fs::write(&path, &data)?;
714
715        let h1 = stream_hash(&path)?;
716        let h2 = stream_hash(&path)?;
717        assert_eq!(h1, h2);
718        assert_eq!(h1.len(), 16);
719        Ok(())
720    }
721
722    #[test]
723    fn stream_hash_exact_buffer_boundary() -> Result<()> {
724        let tmp = tempdir()?;
725        let path = tmp.path().join("exact_buf.bin");
726        let data = vec![0xDDu8; STREAM_BUFFER_SIZE];
727        fs::write(&path, &data)?;
728
729        let hash = stream_hash(&path)?;
730        assert_eq!(hash.len(), 16);
731        Ok(())
732    }
733
734    // -----------------------------------------------------------------
735    // stream_lines — additional edge cases
736    // -----------------------------------------------------------------
737
738    #[test]
739    fn stream_lines_callback_error_propagates() -> Result<()> {
740        let tmp = tempdir()?;
741        let path = tmp.path().join("err.txt");
742        fs::write(&path, "line1\nline2\nline3\n")?;
743
744        let result = stream_lines(&path, |i, _line| {
745            if i == 1 {
746                anyhow::bail!("stop at line 1");
747            }
748            Ok(())
749        });
750
751        assert!(result.is_err());
752        let msg = format!("{}", result.unwrap_err());
753        assert!(msg.contains("stop at line 1"));
754        Ok(())
755    }
756
757    #[test]
758    fn stream_lines_nonexistent_file() {
759        let result = stream_lines(Path::new("/nonexistent_ssg"), |_, _| Ok(()));
760        assert!(result.is_err());
761    }
762
763    #[test]
764    fn stream_lines_line_index_is_zero_based() -> Result<()> {
765        let tmp = tempdir()?;
766        let path = tmp.path().join("indexed.txt");
767        fs::write(&path, "a\nb\nc")?;
768
769        let mut indices = Vec::new();
770        let _ = stream_lines(&path, |i, _| {
771            indices.push(i);
772            Ok(())
773        })?;
774        assert_eq!(indices, vec![0, 1, 2]);
775        Ok(())
776    }
777
778    #[test]
779    fn stream_lines_trailing_newline_does_not_create_extra_line() -> Result<()>
780    {
781        let tmp = tempdir()?;
782        let path = tmp.path().join("trailing.txt");
783        fs::write(&path, "a\nb\n")?;
784
785        let count = stream_lines(&path, |_, _| Ok(()))?;
786        assert_eq!(count, 2);
787        Ok(())
788    }
789
790    #[test]
791    fn stream_lines_many_lines() -> Result<()> {
792        let tmp = tempdir()?;
793        let path = tmp.path().join("many.txt");
794        let mut content = String::new();
795        for i in 0..1000 {
796            content.push_str(&format!("line {i}\n"));
797        }
798        fs::write(&path, &content)?;
799
800        let count = stream_lines(&path, |_, _| Ok(()))?;
801        assert_eq!(count, 1000);
802        Ok(())
803    }
804
805    // -----------------------------------------------------------------
806    // process_batch — additional edge cases
807    // -----------------------------------------------------------------
808
809    #[test]
810    fn process_batch_nonexistent_src_dir() {
811        let tmp = tempdir().unwrap();
812        let result = process_batch(
813            &tmp.path().join("no-such-dir"),
814            &tmp.path().join("dst"),
815            stream_copy,
816        );
817        assert!(result.is_err());
818    }
819
820    #[test]
821    fn process_batch_processor_error_stops_batch() -> Result<()> {
822        let tmp = tempdir()?;
823        let src = tmp.path().join("src");
824        let dst = tmp.path().join("dst");
825        fs::create_dir_all(&src)?;
826        fs::write(src.join("a.txt"), "hello")?;
827
828        let result = process_batch(&src, &dst, |_s, _d| {
829            anyhow::bail!("processor error")
830        });
831        assert!(result.is_err());
832        Ok(())
833    }
834
835    #[test]
836    fn process_batch_throughput_finite_for_fast_run() -> Result<()> {
837        let tmp = tempdir()?;
838        let src = tmp.path().join("src");
839        let dst = tmp.path().join("dst");
840        fs::create_dir_all(&src)?;
841        for i in 0..5 {
842            fs::write(src.join(format!("f{i}.txt")), "x")?;
843        }
844
845        let result = process_batch(&src, &dst, stream_copy)?;
846        assert_eq!(result.files_processed, 5);
847        assert!(result.duration_ms >= 0.0);
848        Ok(())
849    }
850
851    // -----------------------------------------------------------------
852    // collect_files_bounded_with_limit — additional edge cases
853    // -----------------------------------------------------------------
854
855    #[test]
856    fn collect_files_bounded_with_limit_zero() -> Result<()> {
857        let tmp = tempdir()?;
858        fs::write(tmp.path().join("a.txt"), "x")?;
859
860        let files = collect_files_bounded_with_limit(tmp.path(), 0)?;
861        assert!(files.is_empty());
862        Ok(())
863    }
864
865    #[test]
866    fn collect_files_bounded_with_limit_exact() -> Result<()> {
867        let tmp = tempdir()?;
868        for i in 0..5 {
869            fs::write(tmp.path().join(format!("f{i}.txt")), "x")?;
870        }
871
872        let files = collect_files_bounded_with_limit(tmp.path(), 5)?;
873        assert_eq!(files.len(), 5);
874        Ok(())
875    }
876
877    #[test]
878    fn collect_files_bounded_with_limit_deeply_nested() -> Result<()> {
879        let tmp = tempdir()?;
880        let deep = tmp.path().join("a/b/c/d/e");
881        fs::create_dir_all(&deep)?;
882        fs::write(deep.join("leaf.txt"), "deep")?;
883        fs::write(tmp.path().join("root.txt"), "root")?;
884
885        let files = collect_files_bounded(tmp.path())?;
886        assert_eq!(files.len(), 2);
887        Ok(())
888    }
889
890    #[test]
891    fn collect_files_bounded_empty_dir() -> Result<()> {
892        let tmp = tempdir()?;
893        let files = collect_files_bounded(tmp.path())?;
894        assert!(files.is_empty());
895        Ok(())
896    }
897
898    #[test]
899    fn collect_files_bounded_nonexistent_dir() {
900        let result =
901            collect_files_bounded(Path::new("/nonexistent_ssg_walker"));
902        assert!(result.is_err());
903    }
904
905    // -----------------------------------------------------------------
906    // BatchResult — Clone / Copy / Debug
907    // -----------------------------------------------------------------
908
909    #[test]
910    fn batch_result_clone_and_debug() {
911        let r = BatchResult {
912            files_processed: 5,
913            bytes_read: 500,
914            bytes_written: 400,
915            duration_ms: 2.0,
916            throughput: 2500.0,
917        };
918        let r2 = r;
919        assert_eq!(r.files_processed, r2.files_processed);
920        assert_eq!(format!("{r:?}"), format!("{r2:?}"));
921    }
922
923    // -----------------------------------------------------------------
924    // benchmark_throughput — edge cases
925    // -----------------------------------------------------------------
926
927    #[test]
928    fn benchmark_throughput_zero_files() -> Result<()> {
929        let result = benchmark_throughput(0)?;
930        assert_eq!(result.files_processed, 0);
931        Ok(())
932    }
933
934    #[test]
935    fn benchmark_throughput_single_file() -> Result<()> {
936        let result = benchmark_throughput(1)?;
937        assert_eq!(result.files_processed, 1);
938        Ok(())
939    }
940
941    // -----------------------------------------------------------------
942    // Constants — sanity checks
943    // -----------------------------------------------------------------
944
945    #[test]
946    fn constants_are_sensible() {
947        assert_eq!(STREAM_BUFFER_SIZE, 8192);
948        assert_eq!(MAX_BATCH_SIZE, 100_000);
949    }
950}
951
952#[cfg(test)]
953#[allow(clippy::unwrap_used, clippy::expect_used)]
954mod proptests {
955    use super::*;
956    use proptest::prelude::*;
957
958    proptest! {
959        #![proptest_config(ProptestConfig::with_cases(1000))]
960
961        /// Hashing the same content twice must yield the same fingerprint.
962        #[test]
963        fn stream_hash_deterministic(data in proptest::collection::vec(any::<u8>(), 0..4096)) {
964            let dir = tempfile::tempdir().unwrap();
965            let path = dir.path().join("input.bin");
966            fs::write(&path, &data).unwrap();
967
968            let h1 = stream_hash(&path).unwrap();
969            let h2 = stream_hash(&path).unwrap();
970            prop_assert_eq!(h1, h2);
971        }
972    }
973}