Implement random writing

Finally, the functional program. Random bytes are now collected by the
main thread at the start of each iteration. Worker threads repeatedly
write the data to disk. On each iteration, the size of the data is
doubled, up to a maximum size of 32 megabytes. When the program
terminates, the generated files are removed.
master
Dustin 2017-09-07 22:28:00 -05:00
parent 5b022693b0
commit 1a32606a74
3 changed files with 146 additions and 17 deletions

61
Cargo.lock generated
View File

@ -3,6 +3,8 @@ name = "iostress"
version = "0.1.0"
dependencies = [
"clap 2.26.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)",
"tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
@ -20,6 +22,11 @@ dependencies = [
"winapi 0.2.8 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "bitflags"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "bitflags"
version = "0.9.1"
@ -41,6 +48,19 @@ dependencies = [
"vec_map 0.8.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "conv"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "custom_derive"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "kernel32-sys"
version = "0.2.2"
@ -55,11 +75,45 @@ name = "libc"
version = "0.2.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "magenta"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"conv 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)",
"magenta-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "magenta-sys"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "rand"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)",
"magenta 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "strsim"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
[[package]]
name = "tempdir"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"rand 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "term_size"
version = "0.3.0"
@ -107,11 +161,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
[metadata]
"checksum ansi_term 0.9.0 (registry+https://github.com/rust-lang/crates.io-index)" = "23ac7c30002a5accbf7e8987d0632fa6de155b7c3d39d0067317a391e00a2ef6"
"checksum atty 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "d912da0db7fa85514874458ca3651fe2cddace8d0b0505571dbdcd41ab490159"
"checksum bitflags 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "aad18937a628ec6abcd26d1489012cc0e18c21798210f491af69ded9b881106d"
"checksum bitflags 0.9.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4efd02e230a02e18f92fc2735f44597385ed02ad8f831e7c1c1156ee5e1ab3a5"
"checksum clap 2.26.0 (registry+https://github.com/rust-lang/crates.io-index)" = "2267a8fdd4dce6956ba6649e130f62fb279026e5e84b92aa939ac8f85ce3f9f0"
"checksum conv 0.3.3 (registry+https://github.com/rust-lang/crates.io-index)" = "78ff10625fd0ac447827aa30ea8b861fead473bb60aeb73af6c1c58caf0d1299"
"checksum custom_derive 0.1.7 (registry+https://github.com/rust-lang/crates.io-index)" = "ef8ae57c4978a2acd8b869ce6b9ca1dfe817bff704c220209fdef2c0b75a01b9"
"checksum kernel32-sys 0.2.2 (registry+https://github.com/rust-lang/crates.io-index)" = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
"checksum libc 0.2.29 (registry+https://github.com/rust-lang/crates.io-index)" = "8a014d9226c2cc402676fbe9ea2e15dd5222cd1dd57f576b5b283178c944a264"
"checksum magenta 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "4bf0336886480e671965f794bc9b6fce88503563013d1bfb7a502c81fe3ac527"
"checksum magenta-sys 0.1.1 (registry+https://github.com/rust-lang/crates.io-index)" = "40d014c7011ac470ae28e2f76a02bfea4a8480f73e701353b49ad7a8d75f4699"
"checksum rand 0.3.16 (registry+https://github.com/rust-lang/crates.io-index)" = "eb250fd207a4729c976794d03db689c9be1d634ab5a1c9da9492a13d8fecbcdf"
"checksum strsim 0.6.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b4d15c810519a91cf877e7e36e63fe068815c678181439f2f29e2562147c3694"
"checksum tempdir 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "87974a6f5c1dfb344d733055601650059a3363de2a6104819293baff662132d6"
"checksum term_size 0.3.0 (registry+https://github.com/rust-lang/crates.io-index)" = "e2b6b55df3198cc93372e85dd2ed817f0e38ce8cc0f22eb32391bfad9c4bf209"
"checksum textwrap 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)" = "f728584ea33b0ad19318e20557cb0a39097751dbb07171419673502f848c7af6"
"checksum unicode-segmentation 1.2.0 (registry+https://github.com/rust-lang/crates.io-index)" = "a8083c594e02b8ae1654ae26f0ade5158b119bd88ad0e8227a5d8fcd72407946"

View File

@ -5,3 +5,5 @@ authors = ["Dustin C. Hatch <dustin.hatch@firemon.com>"]
[dependencies]
clap = "2.26"
rand = "0.3"
tempdir = "0.3"

View File

@ -1,16 +1,29 @@
extern crate clap;
extern crate rand;
extern crate tempdir;
use rand::Rng;
use std::env;
use std::fs;
use std::io::prelude::*;
use std::io;
use std::process;
use std::sync;
use std::thread;
const MAX_DATA_SIZE: usize = 33554432;
struct Args {
num_threads: usize,
passes: usize,
iterations: usize,
}
struct IOStressContext {
iterations: usize,
data: sync::RwLock<Vec<u8>>,
barrier: sync::Barrier,
lock: sync::Mutex<usize>,
@ -19,9 +32,10 @@ struct IOStressContext {
impl IOStressContext {
fn new(t: usize) -> IOStressContext {
fn new(t: usize, u: usize) -> IOStressContext {
IOStressContext {
data: sync::RwLock::new(Vec::with_capacity(4096)),
iterations: u,
data: sync::RwLock::new(Vec::with_capacity(MAX_DATA_SIZE)),
barrier: sync::Barrier::new(t + 1),
lock: sync::Mutex::new(0),
cond: sync::Condvar::new(),
@ -41,6 +55,20 @@ fn parse_args() -> Args {
.value_name("COUNT")
.default_value("1")
.help("Number of simultaneous tests")
.takes_value(true))
.arg(clap::Arg::with_name("passes")
.short("p")
.long("passes")
.value_name("COUNT")
.default_value("17")
.help("Number of passes to make, increasing size each time")
.takes_value(true))
.arg(clap::Arg::with_name("iterations")
.short("n")
.long("iterations")
.value_name("COUNT")
.default_value("10")
.help("Number of iterations at each data size")
.takes_value(true));
let matches = app.get_matches();
@ -52,10 +80,30 @@ fn parse_args() -> Args {
process::exit(2);
},
},
passes: match matches.value_of("passes").unwrap().parse() {
Ok(n) => n,
Err(_) => {
eprintln!("Invalid pass count");
process::exit(2);
},
},
iterations: match matches.value_of("iterations").unwrap().parse() {
Ok(n) => n,
Err(_) => {
eprintln!("Invalid iteration count");
process::exit(2);
},
},
}
}
fn write_data(filename: &str, data: &[u8]) -> io::Result<usize> {
let mut f = fs::File::create(filename)?;
f.write(data)
}
fn worker_thread(ctx: sync::Arc<IOStressContext>) {
let thread = thread::current();
let name = thread.name().unwrap();
@ -64,25 +112,29 @@ fn worker_thread(ctx: sync::Arc<IOStressContext>) {
{
let mut ctr = ctx.lock.lock().unwrap();
while *ctr <= prev_ctr {
println!("{} waiting on condition", name);
ctr = ctx.cond.wait(ctr).unwrap();
}
println!("{} woke up, counter: {} -> {}", name, prev_ctr, *ctr);
prev_ctr = *ctr;
}
{
let data = ctx.data.read().unwrap();
// Here is where the actual processing would occur
println!("{} data: {:?}", name, *data);
if (*data).len() == 0 {
println!("{} done!", name);
break;
}
for i in 0..ctx.iterations {
let filename = format!("{}-{}.{}", name, prev_ctr, i);
match write_data(&filename, &(*data)) {
Ok(f) => f,
Err(_) => {
eprintln!("{} failed to write data", name);
break;
},
};
}
}
println!("{} waiting at barrier", name);
ctx.barrier.wait();
}
println!("{} waiting at final barrier", name);
ctx.barrier.wait();
println!("{} terminating", name);
}
@ -91,7 +143,15 @@ fn worker_thread(ctx: sync::Arc<IOStressContext>) {
fn main() {
let args = parse_args();
let ctx = sync::Arc::new(IOStressContext::new(args.num_threads));
let mut rng = rand::OsRng::new().unwrap();
let pwd = env::current_dir().unwrap();
let tmpdir = tempdir::TempDir::new_in(".", "iostress").unwrap();
env::set_current_dir(tmpdir.path()).unwrap();
let ctx = sync::Arc::new(
IOStressContext::new(args.num_threads, args.iterations)
);
let mut threads = Vec::with_capacity(args.num_threads);
for i in 0..args.num_threads {
let c = ctx.clone();
@ -99,28 +159,34 @@ fn main() {
threads.push(t.spawn(move || worker_thread(c)).unwrap());
}
for i in 0..5 {
let mut size: usize = 512;
for i in 0..args.passes + 1 {
{
println!("Collecting random data");
let mut noise = vec![0u8; size];
rng.fill_bytes(&mut noise);
let mut data = ctx.data.write().unwrap();
if i == 4 {
(*data).clear();
} else {
// Here is where data would be fetched and sent to the workers
(*data).push(i + 1);
if i < args.passes {
(*data).extend(noise.iter().cloned());
}
}
{
let mut ctr = ctx.lock.lock().unwrap();
*ctr += 1;
println!("Notifying threads");
ctx.cond.notify_all();
}
println!("Waiting for threads to finish iteration {}", i + 1);
ctx.barrier.wait();
size <<= 1;
if size > MAX_DATA_SIZE {
size = 512;
}
}
for thread in threads {
println!("waiting for {}", thread.thread().name().unwrap());
thread.join().unwrap();
}
env::set_current_dir(pwd).unwrap();
}