35 #include <gtest/gtest.h>
50 using std::shared_ptr;
66 static string val =
"This is a test of the emergency RwLoopBuffer system!";
68 static char buf[BUF_SZ];
73 ASSERT_EQ(rw.str(), val);
76 ASSERT_EQ(rw.read(buf, BUF_SZ), val.size());
77 ASSERT_EQ(memcmp(buf, val.data(), val.size()), 0);
80 ASSERT_EQ(rw.write(val.data(), val.size()), val.size());
81 ASSERT_EQ(rw.str(), val);
87 ASSERT_EQ(rw.
read(buf, 0), 0);
88 ASSERT_EQ(rw.
read(buf, BUF_SZ/2), 0);
89 ASSERT_EQ(rw.
read(buf, BUF_SZ), 0);
95 ASSERT_EQ(rw.
write(val.data(), val.size()), val.size());
96 ASSERT_EQ(rw.
read(buf, 0), 0);
101 cout <<
"val.size()=" << val.size() << endl;
104 cout <<
"init readloc=" << rw.
read_loc() <<
" writeloc=" << rw.
write_loc() << endl;
108 ASSERT_EQ(rw.
write(val.data(), val.size()), val.size());
109 cout <<
"after write readloc=" << rw.
read_loc() <<
" writeloc=" << rw.
write_loc() << endl;
113 ASSERT_EQ(rw.
read(buf, BUF_SZ), val.size());
114 cout <<
"after read readloc=" << rw.
read_loc() <<
" writeloc=" << rw.
write_loc() << endl;
115 ASSERT_EQ(rw.
read_loc(), val.size());
118 ASSERT_EQ(memcmp(buf, val.data(), val.size()), 0);
120 ASSERT_EQ(rw.
read(buf, BUF_SZ), 0);
127 auto ptr = (
char*)val.data();
131 int towrite = sz < 7 ? sz : 7;
132 ASSERT_EQ(rw.
write(ptr, towrite), towrite);
142 auto got = rw.
read(ptr, 11);
150 ASSERT_EQ(memcmp(buf, val.data(), val.size()), 0);
156 ASSERT_EQ(rw.
write(val.data(), val.size()), val.size());
158 ASSERT_EQ(rw.
read(buf, BUF_SZ), 0);
166 ASSERT_EQ(rw.read_count(), 0);
167 ASSERT_EQ(rw.write_count(), 0);
169 ASSERT_EQ(rw.
write(val.data(), val.size()), val.size());
170 ASSERT_EQ(rw.read_count(), 0);
171 ASSERT_EQ(rw.write_count(), val.size());
173 ASSERT_EQ(rw.
read(buf, BUF_SZ), val.size());
174 ASSERT_EQ(memcmp(buf, val.data(), val.size()), 0);
175 ASSERT_EQ(rw.read_count(), val.size());
176 ASSERT_EQ(rw.write_count(), val.size());
178 rw.read_count_reset();
179 ASSERT_EQ(rw.read_count(), 0);
180 ASSERT_EQ(rw.write_count(), val.size());
182 rw.write_count_reset();
183 ASSERT_EQ(rw.read_count(), 0);
184 ASSERT_EQ(rw.write_count(), 0);
190 using std::chrono::milliseconds;
191 using std::chrono::duration_cast;
197 size_t read(
void*,
size_t l)
199 std::this_thread::sleep_for(milliseconds(50));
202 size_t write(
const void*,
size_t l)
204 std::this_thread::sleep_for(milliseconds(100));
214 cout <<
"read timer=" << t.read_dur().count() << endl;
215 cout <<
"write timer=" << t.write_dur().count() << endl;
216 cout <<
"read count=" << c.read_count() << endl;
217 cout <<
"write count=" << c.write_count() << endl;
219 ASSERT_EQ(duration_cast<milliseconds>(t.read_dur()).count(), 50);
220 ASSERT_EQ(duration_cast<milliseconds>(t.write_dur()).count(), 100);
221 ASSERT_EQ(c.read_count(), 50);
222 ASSERT_EQ(c.write_count(), 100);
240 ASSERT_EQ(rc.use_count(), 2);
241 ASSERT_EQ(wc.use_count(), 2);
242 ASSERT_EQ(rbuf.use_count(), 2);
243 ASSERT_EQ(wbuf.use_count(), 2);
245 string test(
"this is a test!");
247 for (
char ch : test) out.put(ch);
250 cout <<
"wc count=" << wc->write_count() << endl;
251 cout <<
"wbuf size=" << wbuf->size() <<
" idx=" << wbuf->idx() <<
" str=" << wbuf->str() << endl;
253 ASSERT_EQ(test, wbuf->str());
254 ASSERT_EQ(wc->write_count(), test.size());
259 for (
char ch; in.get(ch);) got.push_back(ch);
261 cout <<
"rc count=" << rc->read_count() << endl;
262 cout <<
"rbuf size=" << rbuf->size() <<
" idx=" << rbuf->idx() <<
" str=" << rbuf->str() << endl;
264 ASSERT_EQ(test, got);
265 ASSERT_EQ(rc->read_count(), test.size());
268 wc->write_reset(wbuf2);
269 ASSERT_EQ(wc->write_count(), 0);
271 ASSERT_EQ(wbuf2.use_count(), 2);
274 ASSERT_EQ(wbuf.use_count(), 0);
276 for (
char ch : test) out.put(ch);
279 cout <<
"wc count=" << wc->write_count() << endl;
280 cout <<
"wbuf2 size=" << wbuf2->size() <<
" idx=" << wbuf2->idx() <<
" str=" << wbuf2->str() << endl;
282 ASSERT_EQ(test, wbuf2->str());
283 ASSERT_EQ(wc->write_count(), test.size());
287 static string twain =
288 "One can survive everything, nowadays, except death, and live down everything except a good reputation.\n"
289 "One should always play fairly when one has the winning cards.\n"
290 "Patriotism is the virtue of the vicious.\n"
291 "Selfishness is not living as one wishes to live, it is asking others to live as one wishes to live.";
293 struct IoPipelineTester :
public testing::Test
295 shared_ptr<RwLoopBuffer> buf;
296 shared_ptr<IoPipeline> io;
304 virtual ~IoPipelineTester() {}
306 void read(
unsigned n = UINT32_MAX)
309 for (
unsigned i = 0; i < n && io->get(ch); i++) got.push_back(ch);
311 void write(
unsigned n = UINT32_MAX,
bool flush=
true)
313 for (
unsigned i = 0; i < n && i < twain.size() && io->put(twain[i]); i++) {}
315 if (flush) io->flush();
317 void pr(
const string& msg)
320 cout <<
"base rd ptr=" << (long)(
Reader*)io->rd_base.get() << endl;
321 cout <<
"base wr ptr=" << (long)(
Writer*)io->wr_base.get() << endl;
322 cout <<
"strm rd inp=" << (long)(
Reader*)io->read_shared().get() << endl;
323 cout <<
"strm wr inp=" << (long)(
Writer*)io->write_shared().get() << endl;
327 cout <<
"read chain:" << endl;
329 for (
auto& i : io->rd_chain)
331 cout <<
" ["<<n<<
"] ptr=" << (long)(
Reader*)i.get() << endl;
332 cout <<
" ["<<n<<
"] inp=" << (long)(
Reader*)i->read_shared().get() << endl;
335 cout <<
"write chain:" << endl;
337 for (
auto& i : io->wr_chain)
339 cout <<
" ["<<n<<
"] ptr=" << (long)(
Writer*)i.get() << endl;
340 cout <<
" ["<<n<<
"] inp=" << (long)(
Writer*)i->write_shared().get() << endl;
345 void ver(
void* a,
void* b)
351 TEST_F(IoPipelineTester, basic)
356 ASSERT_EQ(buf->str(), twain);
359 ASSERT_EQ(got, twain);
360 ver(io->rd_base.get(), (
Reader*)buf.get());
361 ver(io->wr_base.get(), (
Writer*)buf.get());
362 ver(io->rd_base.get(), io->read_shared().get());
363 ver(io->wr_base.get(), io->write_shared().get());
366 TEST_F(IoPipelineTester, replace_base)
369 ASSERT_EQ(buf->str(), twain);
372 ASSERT_EQ(got, twain);
384 pr(
"*after read to buf2");
385 ASSERT_EQ(got, twain);
387 ver(io->rd_base.get(), (
Reader*)buf2.get());
388 ver(io->wr_base.get(), (
Writer*)buf2.get());
389 ver(io->rd_base.get(), io->read_shared().get());
390 ver(io->wr_base.get(), io->write_shared().get());
393 TEST_F(IoPipelineTester, add_and_delete)
395 shared_ptr<RwCounter> x(
new RwCounter(buf, buf));
396 shared_ptr<RwTimer> y(
new RwTimer(buf, buf));
398 io->rw_add_back(x, x);
399 io->rw_add_back(y, y);
401 pr(
"*added two elements to chain");
406 cout <<
"*after read" << endl;
407 cout <<
"count calls r=" << x->read_calls() <<
" w=" << x->write_calls() << endl;
408 cout <<
"time calls r=" << y->read_calls() <<
" w=" << y->write_calls() << endl;
410 ASSERT_EQ(x->read_calls(), 2);
411 ASSERT_EQ(x->write_calls(), 1);
412 ASSERT_EQ(y->read_calls(), 2);
413 ASSERT_EQ(y->write_calls(), 1);
415 ASSERT_EQ(got, twain);
417 ASSERT_EQ(io->rd_chain.size(), 2);
418 ver(io->rd_base.get(), (
Reader*)buf.get());
419 ver(io->rd_chain.back()->read_shared().get(), (
Reader*)io->rd_base.get());
420 ver(io->rd_chain.front()->read_shared().get(), (
Reader*)io->rd_chain.back().get());
421 ver(io->read_shared().get(), (
Reader*)io->rd_chain.front().get());
423 ASSERT_EQ(io->wr_chain.size(), 2);
424 ver(io->wr_base.get(), (
Writer*)buf.get());
425 ver(io->wr_chain.back()->write_shared().get(), (
Writer*)io->wr_base.get());
426 ver(io->wr_chain.front()->write_shared().get(), (
Writer*)io->wr_chain.back().get());
427 ver(io->write_shared().get(), (
Writer*)io->wr_chain.front().get());
431 pr(
"*remove a reader and writer");
434 x->read_calls_reset();
435 x->write_calls_reset();
436 y->read_calls_reset();
437 y->write_calls_reset();
438 ASSERT_EQ(x->read_calls(), 0);
439 ASSERT_EQ(x->write_calls(), 0);
440 ASSERT_EQ(y->read_calls(), 0);
441 ASSERT_EQ(y->write_calls(), 0);
449 ASSERT_EQ(got, twain);
451 cout <<
"*after remove" << endl;
452 cout <<
"count calls r=" << x->read_calls() <<
" w=" << x->write_calls() << endl;
453 cout <<
"time calls r=" << y->read_calls() <<
" w=" << y->write_calls() << endl;
455 ASSERT_EQ(x->read_calls(), 0);
456 ASSERT_EQ(x->write_calls(), 1);
457 ASSERT_EQ(y->read_calls(), 2);
458 ASSERT_EQ(y->write_calls(), 0);
459 ASSERT_EQ(io->rd_chain.size(), 1);
460 ASSERT_EQ(io->wr_chain.size(), 1);
467 x->read_calls_reset();
468 x->write_calls_reset();
469 y->read_calls_reset();
470 y->write_calls_reset();
478 ASSERT_EQ(got, twain);
480 cout <<
"*after remove all" << endl;
481 cout <<
"count calls r=" << x->read_calls() <<
" w=" << x->write_calls() << endl;
482 cout <<
"time calls r=" << y->read_calls() <<
" w=" << y->write_calls() << endl;
484 ASSERT_EQ(x->read_calls(), 0);
485 ASSERT_EQ(x->write_calls(), 0);
486 ASSERT_EQ(y->read_calls(), 0);
487 ASSERT_EQ(y->write_calls(), 0);
488 ASSERT_EQ(io->rd_chain.size(), 0);
489 ASSERT_EQ(io->wr_chain.size(), 0);
494 ASSERT_EQ(x.use_count(), 0);
495 ASSERT_EQ(y.use_count(), 0);
Input stream wrapper for reader.
Output stream wrapper for writer.
Adds byte count to a read stream.
virtual size_t read(void *, size_t)
Read and update time over underlying read.
Adds byte count to a read/write stream.
Loopback read/write stream buffer.
size_t const read_loc()
Return the current read location.
void clear()
Empty and reset.
size_t write(const void *loc, size_t len)
Writer write.
size_t const write_loc()
Return the current write location.
size_t read(void *loc, size_t len)
Reader read.
Adds byte count to a read/write stream.
Adds byte count to a write stream.
virtual size_t write(const void *, size_t)
Write and update time over underlying write.
TEST(IoHelpers, counter_timer)
Input/output streaming pipeline.
Base input/output stream classes.
Loopback read/write buffer.
Input/output stream with pipeline of readers and writers.
Pipeline reader to carry out processing in a pipeline (chain of readers).
Interface class for objects which can be read.
virtual size_t read(void *, size_t)=0
Read interface.
Interface class for objects which can be written.
virtual size_t write(const void *, size_t)=0
Write interface.