scclib
Stable Cloud Computing C++ Library
All Classes Files Functions Variables Typedefs Enumerations Enumerator Modules Pages
iohelper.cc
Go to the documentation of this file.
1 /*
2 BSD 3-Clause License
3 
4 Copyright (c) 2022, Stable Cloud Computing, Inc.
5 
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions are met:
8 
9 1. Redistributions of source code must retain the above copyright notice, this
10  list of conditions and the following disclaimer.
11 
12 2. Redistributions in binary form must reproduce the above copyright notice,
13  this list of conditions and the following disclaimer in the documentation
14  and/or other materials provided with the distribution.
15 
16 3. Neither the name of the copyright holder nor the names of its
17  contributors may be used to endorse or promote products derived from
18  this software without specific prior written permission.
19 
20 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31 #include <util/rwloopbuf.h>
32 #include <util/rwcounter.h>
33 #include <util/rwtimer.h>
34 #include <util/iopipeline.h>
35 #include <gtest/gtest.h>
36 #include <iostream>
37 #include <string>
38 #include <thread>
39 #include <util/iostream.h>
40 
47 using std::cout;
48 using std::endl;
49 using std::string;
50 using std::shared_ptr;
51 using scc::util::Reader;
52 using scc::util::Writer;
53 //using scc::util::ReadTimer; // TBD: untested
54 //using scc::util::WriteTimer; // TBD: untested
55 using scc::util::RwTimer;
64 
65 
66 static string val = "This is a test of the emergency RwLoopBuffer system!";
67 #define BUF_SZ 1024
68 static char buf[BUF_SZ];
69 
70 TEST(RwLoopBuffer, sanity)
71 {
72  RwLoopBuffer rw(val);
73  ASSERT_EQ(rw.str(), val);
74 
75  rw = val;
76  ASSERT_EQ(rw.read(buf, BUF_SZ), val.size());
77  ASSERT_EQ(memcmp(buf, val.data(), val.size()), 0);
78 
79  rw.clear();
80  ASSERT_EQ(rw.write(val.data(), val.size()), val.size());
81  ASSERT_EQ(rw.str(), val);
82 }
83 
84 TEST(RwLoopBuffer, read_empty)
85 {
86  RwLoopBuffer rw;
87  ASSERT_EQ(rw.read(buf, 0), 0);
88  ASSERT_EQ(rw.read(buf, BUF_SZ/2), 0);
89  ASSERT_EQ(rw.read(buf, BUF_SZ), 0);
90 }
91 
92 TEST(RwLoopBuffer, Write_all_read_0)
93 {
94  RwLoopBuffer rw;
95  ASSERT_EQ(rw.write(val.data(), val.size()), val.size());
96  ASSERT_EQ(rw.read(buf, 0), 0);
97 }
98 
99 TEST(RwLoopBuffer, Write_all_read_max)
100 {
101  cout << "val.size()=" << val.size() << endl;
102  RwLoopBuffer rw;
103 
104  cout << "init readloc=" << rw.read_loc() << " writeloc=" << rw.write_loc() << endl;
105  ASSERT_EQ(rw.read_loc(), 0);
106  ASSERT_EQ(rw.write_loc(), 0);
107 
108  ASSERT_EQ(rw.write(val.data(), val.size()), val.size());
109  cout << "after write readloc=" << rw.read_loc() << " writeloc=" << rw.write_loc() << endl;
110  ASSERT_EQ(rw.read_loc(), 0);
111  ASSERT_EQ(rw.write_loc(), val.size());
112 
113  ASSERT_EQ(rw.read(buf, BUF_SZ), val.size());
114  cout << "after read readloc=" << rw.read_loc() << " writeloc=" << rw.write_loc() << endl;
115  ASSERT_EQ(rw.read_loc(), val.size());
116  ASSERT_EQ(rw.write_loc(), val.size());
117 
118  ASSERT_EQ(memcmp(buf, val.data(), val.size()), 0);
119 
120  ASSERT_EQ(rw.read(buf, BUF_SZ), 0); // can't read any more
121 }
122 
123 TEST(RwLoopBuffer, Write_chunks_read_chunks)
124 {
125  RwLoopBuffer rw;
126 
127  auto ptr = (char*)val.data();
128  int sz = val.size();
129  do
130  {
131  int towrite = sz < 7 ? sz : 7;
132  ASSERT_EQ(rw.write(ptr, towrite), towrite);
133  ptr += towrite;
134  sz -= towrite;
135  }
136  while (sz);
137 
138  ptr = buf;
139  sz = val.size();
140  do
141  {
142  auto got = rw.read(ptr, 11);
143  ASSERT_GT(got, 0);
144  ASSERT_LE(got, 11);
145  ptr += got;
146  sz -= got;
147  }
148  while (sz);
149 
150  ASSERT_EQ(memcmp(buf, val.data(), val.size()), 0);
151 }
152 
153 TEST(RwLoopBuffer, clear)
154 {
155  RwLoopBuffer rw;
156  ASSERT_EQ(rw.write(val.data(), val.size()), val.size());
157  rw.clear();
158  ASSERT_EQ(rw.read(buf, BUF_SZ), 0);
159 }
160 
161 TEST(RwCounter, counter_and_loop_buffer)
162 {
163  RwLoopBuffer rwb;
164  RwCounter rw(rwb, rwb); // now provides read() and write() for the buffer
165 
166  ASSERT_EQ(rw.read_count(), 0);
167  ASSERT_EQ(rw.write_count(), 0);
168 
169  ASSERT_EQ(rw.write(val.data(), val.size()), val.size());
170  ASSERT_EQ(rw.read_count(), 0);
171  ASSERT_EQ(rw.write_count(), val.size());
172 
173  ASSERT_EQ(rw.read(buf, BUF_SZ), val.size());
174  ASSERT_EQ(memcmp(buf, val.data(), val.size()), 0);
175  ASSERT_EQ(rw.read_count(), val.size());
176  ASSERT_EQ(rw.write_count(), val.size());
177 
178  rw.read_count_reset();
179  ASSERT_EQ(rw.read_count(), 0);
180  ASSERT_EQ(rw.write_count(), val.size());
181 
182  rw.write_count_reset();
183  ASSERT_EQ(rw.read_count(), 0);
184  ASSERT_EQ(rw.write_count(), 0);
185 }
186 
187 TEST(IoHelpers, counter_timer)
188 {
190  using std::chrono::milliseconds;
191  using std::chrono::duration_cast;
192 
193  struct Slow : public Reader, public Writer
194  {
195  Slow() {}
196  virtual ~Slow() {}
197  size_t read(void*, size_t l)
198  {
199  std::this_thread::sleep_for(milliseconds(50));
200  return l;
201  }
202  size_t write(const void*, size_t l)
203  {
204  std::this_thread::sleep_for(milliseconds(100));
205  return l;
206  }
207  };
208 
209  Slow base;
210  RwCounter c(base, base);
211  RwTimer t(c, c);
212  t.read(0, 50);
213  t.write(0, 100);
214  cout << "read timer=" << t.read_dur().count() << endl;
215  cout << "write timer=" << t.write_dur().count() << endl;
216  cout << "read count=" << c.read_count() << endl;
217  cout << "write count=" << c.write_count() << endl;
218 
219  ASSERT_EQ(duration_cast<milliseconds>(t.read_dur()).count(), 50);
220  ASSERT_EQ(duration_cast<milliseconds>(t.write_dur()).count(), 100);
221  ASSERT_EQ(c.read_count(), 50);
222  ASSERT_EQ(c.write_count(), 100);
224 }
225 
226 TEST(IoHelpers, chained_test)
227 {
229  shared_ptr<RwLoopBuffer> rbuf(new RwLoopBuffer);
230 
231  shared_ptr<ReadCounter> rc(new ReadCounter(rbuf));
232 
233  shared_ptr<RwLoopBuffer> wbuf(new RwLoopBuffer);
234 
235  shared_ptr<WriteCounter> wc(new WriteCounter(wbuf));
236 
237  InStream in(rc);
238  OutStream out(wc);
239 
240  ASSERT_EQ(rc.use_count(), 2);
241  ASSERT_EQ(wc.use_count(), 2);
242  ASSERT_EQ(rbuf.use_count(), 2);
243  ASSERT_EQ(wbuf.use_count(), 2);
244 
245  string test("this is a test!");
246 
247  for (char ch : test) out.put(ch); // write to wbuf
248  out.flush();
249 
250  cout << "wc count=" << wc->write_count() << endl;
251  cout << "wbuf size=" << wbuf->size() << " idx=" << wbuf->idx() << " str=" << wbuf->str() << endl;
252 
253  ASSERT_EQ(test, wbuf->str());
254  ASSERT_EQ(wc->write_count(), test.size());
255 
256  rbuf->set(test);
257 
258  string got;
259  for (char ch; in.get(ch);) got.push_back(ch); // read from rbuf
260 
261  cout << "rc count=" << rc->read_count() << endl;
262  cout << "rbuf size=" << rbuf->size() << " idx=" << rbuf->idx() << " str=" << rbuf->str() << endl;
263 
264  ASSERT_EQ(test, got);
265  ASSERT_EQ(rc->read_count(), test.size());
266 
267  shared_ptr<RwLoopBuffer> wbuf2(new RwLoopBuffer);
268  wc->write_reset(wbuf2); // reset the write counter to send to a new buffer
269  ASSERT_EQ(wc->write_count(), 0);
270 
271  ASSERT_EQ(wbuf2.use_count(), 2);
272 
273  wbuf.reset(); // safe to get rid of this now
274  ASSERT_EQ(wbuf.use_count(), 0);
275 
276  for (char ch : test) out.put(ch); // write to rbuf2
277  out.flush();
278 
279  cout << "wc count=" << wc->write_count() << endl;
280  cout << "wbuf2 size=" << wbuf2->size() << " idx=" << wbuf2->idx() << " str=" << wbuf2->str() << endl;
281 
282  ASSERT_EQ(test, wbuf2->str());
283  ASSERT_EQ(wc->write_count(), test.size());
285 }
286 
287 static string twain =
288  "One can survive everything, nowadays, except death, and live down everything except a good reputation.\n"
289  "One should always play fairly when one has the winning cards.\n"
290  "Patriotism is the virtue of the vicious.\n"
291  "Selfishness is not living as one wishes to live, it is asking others to live as one wishes to live.";
292 
293 struct IoPipelineTester : public testing::Test
294 {
295  shared_ptr<RwLoopBuffer> buf;
296  shared_ptr<IoPipeline> io;
297  std::string got;
298 
299  IoPipelineTester()
300  {
301  buf.reset(new RwLoopBuffer);
302  io.reset(new IoPipeline(buf, buf));
303  }
304  virtual ~IoPipelineTester() {}
305 
306  void read(unsigned n = UINT32_MAX)
307  {
308  char ch;
309  for (unsigned i = 0; i < n && io->get(ch); i++) got.push_back(ch);
310  }
311  void write(unsigned n = UINT32_MAX, bool flush=true)
312  {
313  for (unsigned i = 0; i < n && i < twain.size() && io->put(twain[i]); i++) {}
314 
315  if (flush) io->flush();
316  }
317  void pr(const string& msg)
318  {
319  cout << msg << endl;
320  cout << "base rd ptr=" << (long)(Reader*)io->rd_base.get() << endl;
321  cout << "base wr ptr=" << (long)(Writer*)io->wr_base.get() << endl;
322  cout << "strm rd inp=" << (long)(Reader*)io->read_shared().get() << endl;
323  cout << "strm wr inp=" << (long)(Writer*)io->write_shared().get() << endl;
324  }
325  void pr_chain()
326  {
327  cout << "read chain:" << endl;
328  int n = 0;
329  for (auto& i : io->rd_chain)
330  {
331  cout << " ["<<n<<"] ptr=" << (long)(Reader*)i.get() << endl;
332  cout << " ["<<n<<"] inp=" << (long)(Reader*)i->read_shared().get() << endl;
333  n++;
334  }
335  cout << "write chain:" << endl;
336  n = 0;
337  for (auto& i : io->wr_chain)
338  {
339  cout << " ["<<n<<"] ptr=" << (long)(Writer*)i.get() << endl;
340  cout << " ["<<n<<"] inp=" << (long)(Writer*)i->write_shared().get() << endl;
341  n++;
342  }
343  }
344 
345  void ver(void* a, void* b)
346  {
347  ASSERT_EQ(a, b);
348  }
349 };
350 
351 TEST_F(IoPipelineTester, basic)
352 {
353  pr("*init");
354  write();
355  pr("*after write");
356  ASSERT_EQ(buf->str(), twain);
357  read();
358  pr("*after read");
359  ASSERT_EQ(got, twain);
360  ver(io->rd_base.get(), (Reader*)buf.get());
361  ver(io->wr_base.get(), (Writer*)buf.get());
362  ver(io->rd_base.get(), io->read_shared().get());
363  ver(io->wr_base.get(), io->write_shared().get());
364 }
365 
366 TEST_F(IoPipelineTester, replace_base)
367 {
368  write();
369  ASSERT_EQ(buf->str(), twain);
370  read();
371  pr("*after read");
372  ASSERT_EQ(got, twain);
373 
374  shared_ptr<RwLoopBuffer> buf2(new RwLoopBuffer);
375  io->rd_base = buf2;
376  io->rd_fix_chain();
377  io->wr_base = buf2;
378  io->wr_fix_chain();
379 
380  io->clear();
381  got.clear();
382  write();
383  read();
384  pr("*after read to buf2");
385  ASSERT_EQ(got, twain);
386 
387  ver(io->rd_base.get(), (Reader*)buf2.get());
388  ver(io->wr_base.get(), (Writer*)buf2.get());
389  ver(io->rd_base.get(), io->read_shared().get());
390  ver(io->wr_base.get(), io->write_shared().get());
391 }
392 
393 TEST_F(IoPipelineTester, add_and_delete)
394 {
395  shared_ptr<RwCounter> x(new RwCounter(buf, buf));
396  shared_ptr<RwTimer> y(new RwTimer(buf, buf));
397 
398  io->rw_add_back(x, x);
399  io->rw_add_back(y, y);
400 
401  pr("*added two elements to chain");
402  pr_chain();
403 
404  write();
405  read();
406  cout << "*after read" << endl;
407  cout << "count calls r=" << x->read_calls() << " w=" << x->write_calls() << endl;
408  cout << "time calls r=" << y->read_calls() << " w=" << y->write_calls() << endl;
409 
410  ASSERT_EQ(x->read_calls(), 2); // one for the eof
411  ASSERT_EQ(x->write_calls(), 1);
412  ASSERT_EQ(y->read_calls(), 2);
413  ASSERT_EQ(y->write_calls(), 1);
414 
415  ASSERT_EQ(got, twain);
416 
417  ASSERT_EQ(io->rd_chain.size(), 2);
418  ver(io->rd_base.get(), (Reader*)buf.get());
419  ver(io->rd_chain.back()->read_shared().get(), (Reader*)io->rd_base.get());
420  ver(io->rd_chain.front()->read_shared().get(), (Reader*)io->rd_chain.back().get());
421  ver(io->read_shared().get(), (Reader*)io->rd_chain.front().get());
422 
423  ASSERT_EQ(io->wr_chain.size(), 2);
424  ver(io->wr_base.get(), (Writer*)buf.get());
425  ver(io->wr_chain.back()->write_shared().get(), (Writer*)io->wr_base.get());
426  ver(io->wr_chain.front()->write_shared().get(), (Writer*)io->wr_chain.back().get());
427  ver(io->write_shared().get(), (Writer*)io->wr_chain.front().get());
428 
429  io->rd_del(x);
430  io->wr_del(y);
431  pr("*remove a reader and writer");
432  pr_chain();
433 
434  x->read_calls_reset();
435  x->write_calls_reset();
436  y->read_calls_reset();
437  y->write_calls_reset();
438  ASSERT_EQ(x->read_calls(), 0);
439  ASSERT_EQ(x->write_calls(), 0);
440  ASSERT_EQ(y->read_calls(), 0);
441  ASSERT_EQ(y->write_calls(), 0);
442 
443  io->clear();
444  buf->clear();
445  got.clear();
446  write();
447  read();
448 
449  ASSERT_EQ(got, twain);
450 
451  cout << "*after remove" << endl;
452  cout << "count calls r=" << x->read_calls() << " w=" << x->write_calls() << endl;
453  cout << "time calls r=" << y->read_calls() << " w=" << y->write_calls() << endl;
454 
455  ASSERT_EQ(x->read_calls(), 0);
456  ASSERT_EQ(x->write_calls(), 1);
457  ASSERT_EQ(y->read_calls(), 2);
458  ASSERT_EQ(y->write_calls(), 0);
459  ASSERT_EQ(io->rd_chain.size(), 1);
460  ASSERT_EQ(io->wr_chain.size(), 1);
461 
462  io->wr_del(x);
463  io->rd_del(y);
464  pr("*remove all");
465  pr_chain();
466 
467  x->read_calls_reset();
468  x->write_calls_reset();
469  y->read_calls_reset();
470  y->write_calls_reset();
471 
472  io->clear();
473  buf->clear();
474  got.clear();
475  write();
476  read();
477 
478  ASSERT_EQ(got, twain);
479 
480  cout << "*after remove all" << endl;
481  cout << "count calls r=" << x->read_calls() << " w=" << x->write_calls() << endl;
482  cout << "time calls r=" << y->read_calls() << " w=" << y->write_calls() << endl;
483 
484  ASSERT_EQ(x->read_calls(), 0);
485  ASSERT_EQ(x->write_calls(), 0);
486  ASSERT_EQ(y->read_calls(), 0);
487  ASSERT_EQ(y->write_calls(), 0);
488  ASSERT_EQ(io->rd_chain.size(), 0);
489  ASSERT_EQ(io->wr_chain.size(), 0);
490 
491  x.reset();
492  y.reset();
493 
494  ASSERT_EQ(x.use_count(), 0);
495  ASSERT_EQ(y.use_count(), 0);
496 }
Input stream wrapper for reader.
Definition: iostream.h:59
Output stream wrapper for writer.
Definition: iostream.h:108
Adds byte count to a read stream.
Definition: rwcounter.h:59
virtual size_t read(void *, size_t)
Read and update time over underlying read.
Definition: iostream.cc:714
Adds byte count to a read/write stream.
Definition: rwcounter.h:135
Loopback read/write stream buffer.
Definition: rwloopbuf.h:57
size_t const read_loc()
Return the current read location.
Definition: rwloopbuf.h:124
void clear()
Empty and reset.
Definition: rwloopbuf.h:85
size_t write(const void *loc, size_t len)
Writer write.
Definition: rwloopbuf.h:130
size_t const write_loc()
Return the current write location.
Definition: rwloopbuf.h:137
size_t read(void *loc, size_t len)
Reader read.
Definition: rwloopbuf.h:111
Adds byte count to a read/write stream.
Definition: rwtimer.h:139
Adds byte count to a write stream.
Definition: rwcounter.h:97
virtual size_t write(const void *, size_t)
Write and update time over underlying write.
Definition: iostream.cc:760
TEST(IoHelpers, counter_timer)
Definition: iohelper.cc:187
Input/output streaming pipeline.
Base input/output stream classes.
Read/write counter.
Loopback read/write buffer.
Read/write timer.
Input/output stream with pipeline of readers and writers.
Definition: iopipeline.h:163
Pipeline reader to carry out processing in a pipeline (chain of readers).
Definition: iobase.h:76
Interface class for objects which can be read.
Definition: iobase.h:67
virtual size_t read(void *, size_t)=0
Read interface.
Interface class for objects which can be written.
Definition: iobase.h:86
virtual size_t write(const void *, size_t)=0
Write interface.