scclib
Stable Cloud Computing C++ Library
iostream.cc
Go to the documentation of this file.
1 /*
2 BSD 3-Clause License
3 
4 Copyright (c) 2022, Stable Cloud Computing, Inc.
5 
6 Redistribution and use in source and binary forms, with or without
7 modification, are permitted provided that the following conditions are met:
8 
9 1. Redistributions of source code must retain the above copyright notice, this
10  list of conditions and the following disclaimer.
11 
12 2. Redistributions in binary form must reproduce the above copyright notice,
13  this list of conditions and the following disclaimer in the documentation
14  and/or other materials provided with the distribution.
15 
16 3. Neither the name of the copyright holder nor the names of its
17  contributors may be used to endorse or promote products derived from
18  this software without specific prior written permission.
19 
20 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
21 AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
22 IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
23 DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
24 FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
25 DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
26 SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
27 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
28 OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
29 OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
30 */
31 #include <util/iostream.h>
32 #include <util/rwtimer.h>
33 #include <util/rwcounter.h>
34 #include <util/rwloopbuf.h>
35 #include <util/iopipeline.h>
36 #include <streambuf>
37 #include <iostream>
38 #include <system_error>
39 #include <stdexcept>
40 #include <algorithm>
41 
47 using namespace scc::util;
48 
49 class InStreambuf : public virtual std::streambuf
50 {
51  std::shared_ptr<Reader> m_shared;
52  Reader* m_reader;
53  std::vector<char> m_recv;
54  std::string m_fail;
55  size_t m_resize_recv;
56 
57  InStreambuf(const InStreambuf&) = delete;
58  void operator=(const InStreambuf&) = delete;
59 
60  size_t read()
61  {
62  if (!m_reader) throw std::runtime_error("InStreambuf not initialized");
63 
64  size_t len;
65 
66  try
67  {
68  m_fail.clear();
69  len = m_reader->read(m_recv.data(), m_recv.size());
70  }
71  catch (std::exception& ex)
72  {
73  m_fail = ex.what();
74  throw ex;
75  }
76 
77  return len;
78  }
79 
80 protected:
81 
82  /*
83  Ensures that at least one character is available in the input area by updating the pointers to the input area (if needed)
84  and reading more data in from the input sequence (if applicable).
85  */
86  virtual int_type underflow() // buffer is empty, fill it
87  {
88  if (m_resize_recv)
89  {
90  m_recv.resize(m_resize_recv);
91  m_resize_recv = 0;
92  }
93 
94  size_t len = read();
95 
96  if (!len)
97  {
98  return traits_type::eof(); // signal eof
99  }
100 
101  // set the get pointer to beginning, len data
102  setg((char*)m_recv.data(), (char*)m_recv.data(), (char*)m_recv.data() + len);
103 
104  return traits_type::to_int_type(m_recv[0]); // return the first char in the buffer
105  }
106 
107  /*
108  For input streams, this typically empties the get area and forces a re-read from the associated sequence to pick up recent changes
109 
110  For output streams, this typically results in writing the contents of the put area into the associated sequence,
111  i.e. flushing of the output buffer.
112  */
113  virtual int sync()
114  {
115  /*
116  the correct behavior here is to do nothing. Devices like files with a single file pointer would have different behavior,
117  but doing anything here messes up sockets & character rw buffer.
118  */
119  return 0;
120  }
121 
122 public:
123  InStreambuf(Reader& reader, size_t recv_buf_sz) : m_reader(&reader), m_resize_recv(0)
124  {
125  m_recv.resize(recv_buf_sz);
126 
127  // set the get pointer to beginning, no data
128  setg((char*)m_recv.data(), (char*)m_recv.data(), (char*)m_recv.data());
129  }
130  InStreambuf(const std::shared_ptr<Reader>& reader, size_t recv_buf_sz) : m_shared(reader), m_reader(m_shared.get()), m_resize_recv(0)
131  {
132  m_recv.resize(recv_buf_sz);
133 
134  // set the get pointer to beginning, no data
135  setg((char*)m_recv.data(), (char*)m_recv.data(), (char*)m_recv.data());
136  }
137  ~InStreambuf() {}
138 
139  void read_reset(const std::shared_ptr<Reader>& reader)
140  {
141  m_shared = reader;
142  m_reader = m_shared.get();
143  }
144 
145  std::shared_ptr<Reader> read_shared() const
146  {
147  return m_shared;
148  }
149 
150  size_t recvbuf_size() const
151  {
152  if (m_resize_recv) return m_resize_recv; // in case we have not underflowed yet
153 
154  return m_recv.size();
155  }
156 
157  void recvbuf_size(size_t sz)
158  {
159  if (sz == 0) throw std::runtime_error("input stream empty buffer");
160 
161  m_resize_recv = sz; // will resize the next time we underflow (after all the buffered data is consumed)
162  // this ensures that the buffer can be extended or reduced safely regardless of buffer contents
163  }
164 
165  std::string recv_fail() const
166  {
167  return m_fail;
168  }
169  void clear_recv_fail()
170  {
171  m_fail.clear();
172  }
173 };
174 
175 class OutStreambuf : public virtual std::streambuf
176 {
177  std::shared_ptr<Writer> m_shared;
178  Writer* m_writer;
179  std::vector<char> m_send;
180  std::string m_fail;
181 
182  OutStreambuf(const OutStreambuf&) = delete;
183  void operator=(const OutStreambuf&) = delete;
184 
185  bool send(size_t len)
186  {
187  if (!m_writer) throw std::runtime_error("OutStreambuf not initialized");
188 
189  if (!len) return true;
190 
191  size_t left = len;
192  char* loc = (char*)m_send.data();
193 
194  while (left)
195  {
196  size_t sent;
197 
198  try
199  {
200  m_fail.clear();
201  sent = m_writer->write(loc, left);
202 
203  if (sent == 0)
204  {
205  return false; // eof condition
206  }
207  }
208  catch (std::exception& ex)
209  {
210  m_fail = ex.what();
211  throw ex;
212  }
213 
214  left -= sent;
215  loc += sent;
216  }
217 
218  // set put area to the full buffer
219  setp((char*)m_send.data(), (char*)m_send.data() + m_send.size() - 1);
220 
221  return true;
222  }
223 
224 protected:
225 
226  /*
227  Ensures that there is space at the put area for at least one character by saving some initial
228  subsequence of characters starting at pbase() to the output sequence and updating the pointers to the put area (if needed).
229  */
230  virtual int_type overflow(int_type c) // buffer is full, send it
231  {
232  if (traits_type::eq_int_type(traits_type::eof(), c)) // got an eof, don't send anything
233  {
234  return c;
235  }
236 
237  traits_type::assign(*pptr(), c); // store the last character in the buffer
238 
239  if (!send(m_send.size()))
240  {
241  return traits_type::eof();
242  }
243 
244  return c;
245  }
246 
247  /*
248  For input streams, this typically empties the get area and forces a re-read from the associated sequence to pick up recent changes
249 
250  For output streams, this typically results in writing the contents of the put area into the associated sequence,
251  i.e. flushing of the output buffer.
252  */
253  virtual int sync()
254  {
255  // flush the buffer
256  size_t len = pptr() - pbase();
257  if (!send(len))
258  {
259  return -1;
260  }
261  return 0;
262  }
263 
264 public:
265  OutStreambuf(Writer& writer, size_t send_buf_sz) : m_writer(&writer)
266  {
267  m_send.resize(send_buf_sz);
268 
269  // set put area to full buffer
270  setp((char*)m_send.data(), (char*)m_send.data() + m_send.size() - 1);
271  }
272  OutStreambuf(const std::shared_ptr<Writer>& writer, size_t send_buf_sz) : m_shared(writer), m_writer(m_shared.get())
273  {
274  m_send.resize(send_buf_sz);
275 
276  // set put area to full buffer
277  setp((char*)m_send.data(), (char*)m_send.data() + m_send.size() - 1);
278  }
279  ~OutStreambuf() {}
280 
281  void write_reset(const std::shared_ptr<Writer>& writer)
282  {
283  m_shared = writer;
284  m_writer = m_shared.get();
285  }
286 
287  std::shared_ptr<Writer> write_shared() const
288  {
289  return m_shared;
290  }
291 
292  size_t sendbuf_size() const
293  {
294  return m_send.size();
295  }
296 
297  void sendbuf_size(size_t sz)
298  {
299  if (sz == 0) throw std::runtime_error("output stream empty buffer");
300 
301  sync(); // send any partial data
302 
303  m_send.resize(sz);
304 
305  // set put area to full buffer
306  setp((char*)m_send.data(), (char*)m_send.data() + m_send.size() - 1);
307  }
308 
309  std::string send_fail() const
310  {
311  return m_fail;
312  }
313  void clear_send_fail()
314  {
315  m_fail.clear();
316  }
317 };
318 
319 class IoStreambuf : public InStreambuf, public OutStreambuf
320 {
321 
322 public:
323  IoStreambuf(Reader& reader, Writer& writer, size_t recv_buf_sz, size_t send_buf_sz)
324  : InStreambuf(reader, recv_buf_sz), OutStreambuf(writer, send_buf_sz)
325  {
326  }
327  IoStreambuf(const std::shared_ptr<Reader>& reader, const std::shared_ptr<Writer>& writer, size_t recv_buf_sz, size_t send_buf_sz)
328  : InStreambuf(reader, recv_buf_sz), OutStreambuf(writer, send_buf_sz)
329  {
330  }
331  ~IoStreambuf() {}
332 
333  virtual int sync()
334  {
335  if (InStreambuf::sync() == -1) return -1;
336  if (OutStreambuf::sync() == -1) return -1;
337  return 0;
338  }
339 };
340 
341 InStream::InStream(Reader& reader, size_t recv_buf_sz)
342 {
343  if (recv_buf_sz == 0) throw std::runtime_error("InStream empty buffer");
344 
345  rdbuf(new InStreambuf(reader, recv_buf_sz));
346 }
347 
348 InStream::InStream(const std::shared_ptr<Reader>& reader, size_t recv_buf_sz)
349 {
350  if (!reader) throw std::runtime_error("InStream null pointer");
351 
352  if (recv_buf_sz == 0) throw std::runtime_error("InStream empty buffer");
353 
354  rdbuf(new InStreambuf(reader, recv_buf_sz));
355 }
356 
357 InStream::~InStream()
358 {
359  delete rdbuf();
360 }
361 
362 void InStream::read_reset(const std::shared_ptr<Reader>& reader)
363 {
364  if (!reader) throw std::runtime_error("InStream reset with null shared");
365 
366  InStreambuf* sb = dynamic_cast<InStreambuf*>(rdbuf());
367  if (sb == nullptr)
368  {
369  throw std::runtime_error("invalid streambuffer");
370  }
371  sb->read_reset(reader);
372 }
373 
374 std::shared_ptr<Reader> InStream::read_shared() const
375 {
376  InStreambuf* sb = dynamic_cast<InStreambuf*>(rdbuf());
377  if (sb == nullptr)
378  {
379  throw std::runtime_error("invalid streambuffer");
380  }
381  return sb->read_shared();
382 }
383 
385 {
386  rdbuf(other.rdbuf());
387  other.rdbuf(nullptr);
388 }
389 
391 {
392  rdbuf(other.rdbuf());
393  other.rdbuf(nullptr);
394  return *this;
395 }
396 
398 {
399  InStreambuf* sb = dynamic_cast<InStreambuf*>(rdbuf());
400  if (sb == nullptr)
401  {
402  throw std::runtime_error("invalid streambuffer");
403  }
404  return sb->recvbuf_size();
405 }
406 
407 void InStream::recvbuf_size(size_t sz)
408 {
409  if (sz == 0) throw std::runtime_error("InStream empty buffer");
410 
411  InStreambuf* sb = dynamic_cast<InStreambuf*>(rdbuf());
412  if (sb == nullptr)
413  {
414  throw std::runtime_error("invalid streambuffer");
415  }
416  sb->recvbuf_size(sz);
417 }
418 
419 std::string InStream::recv_fail() const
420 {
421  InStreambuf* sb = dynamic_cast<InStreambuf*>(rdbuf());
422  if (sb == nullptr)
423  {
424  throw std::runtime_error("invalid streambuffer");
425  }
426  return sb->recv_fail();
427 }
428 
429 void InStream::clear(std::ios::iostate state)
430 {
431  std::istream::clear(state);
432 
433  if (state != std::ios::goodbit) return; // only clear if the stream is reset
434 
435  InStreambuf* sb = dynamic_cast<InStreambuf*>(rdbuf());
436  if (sb == nullptr)
437  {
438  throw std::runtime_error("invalid streambuffer");
439  }
440  sb->clear_recv_fail();
441 }
442 
443 OutStream::OutStream(Writer& writer, size_t send_buf_sz)
444 {
445  if (send_buf_sz == 0) throw std::runtime_error("OutStream empty buffer");
446 
447  rdbuf(new OutStreambuf(writer, send_buf_sz));
448 }
449 
450 OutStream::OutStream(const std::shared_ptr<Writer>& writer, size_t send_buf_sz)
451 {
452  if (!writer) throw std::runtime_error("OutStream null pointer");
453 
454  if (send_buf_sz == 0) throw std::runtime_error("OutStream empty buffer");
455 
456  rdbuf(new OutStreambuf(writer, send_buf_sz));
457 }
458 
459 OutStream::~OutStream()
460 {
461  delete rdbuf();
462 }
463 
464 void OutStream::write_reset(const std::shared_ptr<Writer>& writer)
465 {
466  if (!writer) throw std::runtime_error("OutStream reset with null shared");
467 
468  OutStreambuf* sb = dynamic_cast<OutStreambuf*>(rdbuf());
469  if (sb == nullptr)
470  {
471  throw std::runtime_error("invalid streambuffer");
472  }
473  sb->write_reset(writer);
474 }
475 
476 std::shared_ptr<Writer> OutStream::write_shared() const
477 {
478  OutStreambuf* sb = dynamic_cast<OutStreambuf*>(rdbuf());
479  if (sb == nullptr)
480  {
481  throw std::runtime_error("invalid streambuffer");
482  }
483  return sb->write_shared();
484 }
485 
487 {
488  rdbuf(other.rdbuf());
489  other.rdbuf(nullptr);
490 }
491 
493 {
494  rdbuf(other.rdbuf());
495  other.rdbuf(nullptr);
496  return *this;
497 }
498 
500 {
501  OutStreambuf* sb = dynamic_cast<OutStreambuf*>(rdbuf());
502  if (sb == nullptr)
503  {
504  throw std::runtime_error("invalid streambuffer");
505  }
506  return sb->sendbuf_size();
507 }
508 
509 void OutStream::sendbuf_size(size_t sz)
510 {
511  OutStreambuf* sb = dynamic_cast<OutStreambuf*>(rdbuf());
512  if (sb == nullptr)
513  {
514  throw std::runtime_error("invalid streambuffer");
515  }
516  sb->sendbuf_size(sz);
517 }
518 
519 std::string OutStream::send_fail() const
520 {
521  OutStreambuf* sb = dynamic_cast<OutStreambuf*>(rdbuf());
522  if (sb == nullptr)
523  {
524  throw std::runtime_error("invalid streambuffer");
525  }
526  return sb->send_fail();
527 }
528 
529 void OutStream::clear(std::ios::iostate state)
530 {
531  std::ostream::clear(state);
532 
533  if (state != std::ios::goodbit) return; // only clear if the stream is reset
534 
535  OutStreambuf* sb = dynamic_cast<OutStreambuf*>(rdbuf());
536  if (sb == nullptr)
537  {
538  throw std::runtime_error("invalid streambuffer");
539  }
540  sb->clear_send_fail();
541 }
542 
543 IoStream::IoStream(Reader& reader, Writer& writer, size_t recv_buf_sz, size_t send_buf_sz)
544 {
545  if (send_buf_sz == 0 || recv_buf_sz == 0) throw std::runtime_error("IoStream empty buffer");
546 
547  rdbuf(new IoStreambuf(reader, writer, recv_buf_sz, send_buf_sz));
548 }
549 
550 IoStream::IoStream(const std::shared_ptr<Reader>& reader, const std::shared_ptr<Writer>& writer, size_t recv_buf_sz, size_t send_buf_sz)
551 {
552  if (!reader || !writer) throw std::runtime_error("IoStream null pointer(s)");
553 
554  if (send_buf_sz == 0 || recv_buf_sz == 0) throw std::runtime_error("IoStream empty buffer");
555 
556  rdbuf(new IoStreambuf(reader, writer, recv_buf_sz, send_buf_sz));
557 }
558 
559 IoStream::~IoStream()
560 {
561  delete rdbuf();
562 }
563 
564 void IoStream::read_reset(const std::shared_ptr<Reader>& reader)
565 {
566  if (!reader) throw std::runtime_error("IoStream read reset with null shared");
567 
568  IoStreambuf* sb = dynamic_cast<IoStreambuf*>(rdbuf());
569  if (sb == nullptr)
570  {
571  throw std::runtime_error("invalid streambuffer");
572  }
573  sb->read_reset(reader);
574 }
575 
576 void IoStream::write_reset(const std::shared_ptr<Writer>& writer)
577 {
578  if (!writer) throw std::runtime_error("IoStream write reset with null shared");
579 
580  IoStreambuf* sb = dynamic_cast<IoStreambuf*>(rdbuf());
581  if (sb == nullptr)
582  {
583  throw std::runtime_error("invalid streambuffer");
584  }
585  sb->write_reset(writer);
586 }
587 
588 std::shared_ptr<Reader> IoStream::read_shared() const
589 {
590  IoStreambuf* sb = dynamic_cast<IoStreambuf*>(rdbuf());
591  if (sb == nullptr)
592  {
593  throw std::runtime_error("invalid streambuffer");
594  }
595  return sb->read_shared();
596 }
597 
598 std::shared_ptr<Writer> IoStream::write_shared() const
599 {
600  IoStreambuf* sb = dynamic_cast<IoStreambuf*>(rdbuf());
601  if (sb == nullptr)
602  {
603  throw std::runtime_error("invalid streambuffer");
604  }
605  return sb->write_shared();
606 }
607 
609 {
610  rdbuf(other.rdbuf());
611  other.rdbuf(nullptr);
612 }
613 
615 {
616  rdbuf(other.rdbuf());
617  other.rdbuf(nullptr);
618  return *this;
619 }
620 
622 {
623  IoStreambuf* sb = dynamic_cast<IoStreambuf*>(rdbuf());
624  if (sb == nullptr)
625  {
626  throw std::runtime_error("invalid streambuffer");
627  }
628  return sb->recvbuf_size();
629 }
630 
632 {
633  IoStreambuf* sb = dynamic_cast<IoStreambuf*>(rdbuf());
634  if (sb == nullptr)
635  {
636  throw std::runtime_error("invalid streambuffer");
637  }
638  return sb->sendbuf_size();
639 }
640 
641 void IoStream::recvbuf_size(size_t sz)
642 {
643  IoStreambuf* sb = dynamic_cast<IoStreambuf*>(rdbuf());
644  if (sb == nullptr)
645  {
646  throw std::runtime_error("invalid streambuffer");
647  }
648  sb->recvbuf_size(sz);
649 }
650 
651 void IoStream::sendbuf_size(size_t sz)
652 {
653  IoStreambuf* sb = dynamic_cast<IoStreambuf*>(rdbuf());
654  if (sb == nullptr)
655  {
656  throw std::runtime_error("invalid streambuffer");
657  }
658  sb->sendbuf_size(sz);
659 }
660 
661 std::string IoStream::recv_fail() const
662 {
663  IoStreambuf* sb = dynamic_cast<IoStreambuf*>(rdbuf());
664  if (sb == nullptr)
665  {
666  throw std::runtime_error("invalid streambuffer");
667  }
668  return sb->recv_fail();
669 }
670 
671 std::string IoStream::send_fail() const
672 {
673  IoStreambuf* sb = dynamic_cast<IoStreambuf*>(rdbuf());
674  if (sb == nullptr)
675  {
676  throw std::runtime_error("invalid streambuffer");
677  }
678  return sb->send_fail();
679 }
680 
681 void IoStream::clear(std::ios::iostate state)
682 {
683  std::iostream::clear(state);
684 
685  if (state != std::ios::goodbit) return; // only clear if the stream is reset
686 
687  IoStreambuf* sb = dynamic_cast<IoStreambuf*>(rdbuf());
688  if (sb == nullptr)
689  {
690  throw std::runtime_error("invalid streambuffer");
691  }
692  sb->clear_send_fail();
693  sb->clear_recv_fail();
694 }
695 
699 
700 ReadTimer::ReadTimer() : m_reader(nullptr), m_ticks{0}, m_calls{0}
701 {
702 }
703 
704 ReadTimer::ReadTimer(Reader& r) : m_reader(nullptr), m_ticks{0}, m_calls{0}
705 {
706  read_reset(r);
707 }
708 
709 ReadTimer::ReadTimer(const std::shared_ptr<Reader>& r) : m_reader(nullptr), m_ticks{0}, m_calls{0}
710 {
711  read_reset(r);
712 }
713 
714 size_t ReadTimer::read(void* loc, size_t len)
715 {
716  if (!m_reader) return 0;
717 
718  auto t1 = std::chrono::high_resolution_clock::now();
719  size_t rd = m_reader->read(loc, len);
720  auto t2 = std::chrono::high_resolution_clock::now();
721  m_ticks += std::chrono::duration_cast<std::chrono::nanoseconds>(t2-t1).count();
722  m_calls++;
723  return rd;
724 }
725 
727 {
728  m_shared.reset();
729  m_reader = &r;
730  m_ticks = 0;
731  m_calls = 0;
732 }
733 
734 void ReadTimer::read_reset(const std::shared_ptr<Reader>& r)
735 {
736  if (!r)
737  {
738  throw std::runtime_error("ReadTimer reset with null pointer");
739  }
740  m_shared = r;
741  m_reader = m_shared.get();
742  m_ticks = 0;
743  m_calls = 0;
744 }
745 
746 WriteTimer::WriteTimer() : m_writer(0), m_ticks{0}, m_calls{0}
747 {
748 }
749 
750 WriteTimer::WriteTimer(Writer& w) : m_writer(0), m_ticks{0}, m_calls{0}
751 {
752  write_reset(w);
753 }
754 
755 WriteTimer::WriteTimer(const std::shared_ptr<Writer>& w) : m_writer(0), m_ticks{0}, m_calls{0}
756 {
757  write_reset(w);
758 }
759 
760 size_t WriteTimer::write(const void* loc, size_t len)
761 {
762  if (!m_writer) return 0;
763 
764  auto t1 = std::chrono::high_resolution_clock::now();
765  size_t wr = m_writer->write(loc, len);
766  auto t2 = std::chrono::high_resolution_clock::now();
767  m_ticks += std::chrono::duration_cast<std::chrono::nanoseconds>(t2-t1).count();
768  m_calls++;
769  return wr;
770 }
771 
773 {
774  m_shared.reset();
775  m_writer = &w;
776  m_ticks = 0;
777  m_calls = 0;
778 }
779 void WriteTimer::write_reset(const std::shared_ptr<Writer>& w)
780 {
781  if (!w)
782  {
783  throw std::runtime_error("WriteTimer reset with null pointer");
784  }
785  m_shared = w;
786  m_writer = m_shared.get();
787  m_ticks = 0;
788  m_calls = 0;
789 }
790 
792 {
793 }
794 
796 {
797 }
798 
799 RwTimer::RwTimer(const std::shared_ptr<Reader>& r, const std::shared_ptr<Writer>& w) : ReadTimer(r), WriteTimer(w)
800 {
801 }
802 
806 
807 ReadCounter::ReadCounter() : m_reader(nullptr), m_count{0}, m_calls{0}
808 {
809 }
810 
811 ReadCounter::ReadCounter(Reader& r) : m_reader(nullptr), m_count{0}, m_calls{0}
812 {
813  read_reset(r);
814 }
815 
816 ReadCounter::ReadCounter(const std::shared_ptr<Reader>& r) : m_reader(nullptr), m_count{0}, m_calls{0}
817 {
818  read_reset(r);
819 }
820 
821 size_t ReadCounter::read(void* loc, size_t len)
822 {
823  if (!m_reader) return 0;
824 
825  size_t rd = m_reader->read(loc, len);
826  m_count += rd;
827  m_calls++;
828  return rd;
829 }
830 
832 {
833  m_shared.reset();
834  m_reader = &r;
835  m_count = 0;
836  m_calls = 0;
837 }
838 
839 void ReadCounter::read_reset(const std::shared_ptr<Reader>& r)
840 {
841  if (!r)
842  {
843  throw std::runtime_error("ReadCounter reset with null pointer");
844  }
845  m_shared = r;
846  m_reader = m_shared.get();
847  m_count = 0;
848  m_calls = 0;
849 }
850 
851 WriteCounter::WriteCounter() : m_writer(nullptr), m_count{0}, m_calls{0}
852 {
853 }
854 
855 WriteCounter::WriteCounter(Writer& w) : m_writer(nullptr), m_count{0}, m_calls{0}
856 {
857  write_reset(w);
858 }
859 
860 WriteCounter::WriteCounter(const std::shared_ptr<Writer>& w) : m_writer(nullptr), m_count{0}, m_calls{0}
861 {
862  write_reset(w);
863 }
864 
865 size_t WriteCounter::write(const void* loc, size_t len)
866 {
867  if (!m_writer) return 0;
868 
869  size_t wr = m_writer->write(loc, len);
870  m_count += wr;
871  m_calls++;
872  return wr;
873 }
874 
876 {
877  m_shared.reset();
878  m_writer = &w;
879  m_count = 0;
880  m_calls = 0;
881 }
882 
883 void WriteCounter::write_reset(const std::shared_ptr<Writer>& w)
884 {
885  if (!w)
886  {
887  throw std::runtime_error("WriteCounter reset with null pointer");
888  }
889  m_shared = w;
890  m_writer = m_shared.get();
891  m_count = 0;
892  m_calls = 0;
893 }
894 
896 {
897 }
898 
900 {
901 }
902 
903 RwCounter::RwCounter(const std::shared_ptr<Reader>& r, const std::shared_ptr<Writer>& w) : ReadCounter(r), WriteCounter(w)
904 {
905 }
906 
910 
912 {
913 }
914 
918 
919 InChain::InChain(const std::shared_ptr<Reader>& r)
920  : rd_base(r)
921 {}
922 
923 void InChain::rd_del(const std::shared_ptr<PipelineReader>& r)
924 {
925  auto it = std::find_if(rd_chain.begin(), rd_chain.end(), [&r](auto& v) { return v == r; });
926 
927  if (it != rd_chain.end()) rd_chain.erase(it);
928 
929  rd_fix_chain();
930 }
931 
932 std::shared_ptr<Reader> InChain::rd_fix_chain()
933 {
934  if (rd_chain.empty())
935  {
936  return rd_base;
937  }
938 
939  std::shared_ptr<PipelineReader> last;
940 
941  for (auto it = rd_chain.rbegin(); it != rd_chain.rend(); ++it)
942  {
943  if (it == rd_chain.rbegin()) (*it)->read_reset(rd_base); // last points to base
944  else (*it)->read_reset(last); // points to next in pipeline
945 
946  last = *it;
947  }
948  return last; // last is first
949 }
950 
951 OutChain::OutChain(const std::shared_ptr<Writer>& w)
952  : wr_base(w)
953 {}
954 
955 void OutChain::wr_del(const std::shared_ptr<PipelineWriter>& w)
956 {
957  auto it = std::find_if(wr_chain.begin(), wr_chain.end(), [&w](auto& v) { return v == w; });
958 
959  if (it != wr_chain.end()) wr_chain.erase(it);
960 
961  wr_fix_chain();
962 }
963 
964 std::shared_ptr<Writer> OutChain::wr_fix_chain()
965 {
966  if (wr_chain.empty())
967  {
968  return wr_base;
969  }
970 
971  std::shared_ptr<PipelineWriter> last;
972 
973  for (auto it = wr_chain.rbegin(); it != wr_chain.rend(); ++it)
974  {
975  if (it == wr_chain.rbegin()) (*it)->write_reset(wr_base); // last points to base
976  else (*it)->write_reset(last); // points to next in pipeline
977 
978  last = *it;
979  }
980  return last; // last is first
981 }
982 
983 InPipeline::InPipeline(const std::shared_ptr<Reader>& r, size_t rd_bufsz) : InChain(r), InStream(r, rd_bufsz)
984 {}
985 
986 std::shared_ptr<Reader> InPipeline::rd_fix_chain()
987 {
988  auto first = InChain::rd_fix_chain();
989  read_reset(first); // reset my stream class to the first in the chain
990  return first;
991 }
992 
993 OutPipeline::OutPipeline(const std::shared_ptr<Writer>& w, size_t wr_bufsz) : OutChain(w), OutStream(w, wr_bufsz)
994 {}
995 
996 std::shared_ptr<Writer> OutPipeline::wr_fix_chain()
997 {
998  auto first = OutChain::wr_fix_chain();
999  write_reset(first); // reset my stream class to the first in the chain
1000  return first;
1001 }
1002 
1003 IoPipeline::IoPipeline(const std::shared_ptr<Reader>& r, const std::shared_ptr<Writer>& w, size_t rdbuf, size_t wrbuf)
1004  : InChain(r), OutChain(w), IoStream(r, w, rdbuf, wrbuf)
1005 {}
1006 
1007 std::shared_ptr<Reader> IoPipeline::rd_fix_chain()
1008 {
1009  auto first = InChain::rd_fix_chain();
1010  read_reset(first); // reset my stream class to the first in the chain
1011  return first;
1012 }
1013 
1014 std::shared_ptr<Writer> IoPipeline::wr_fix_chain()
1015 {
1016  auto first = OutChain::wr_fix_chain();
1017  write_reset(first); // reset my stream class to the first in the chain
1018  return first;
1019 }
Input stream wrapper for reader.
Definition: iostream.h:59
virtual size_t recvbuf_size() const
Size of receive buffer.
Definition: iostream.cc:397
InStream & operator=(const InStream &)=delete
Copy assign not allowed.
InStream()=delete
No default construct.
virtual std::string recv_fail() const
Failure message from the input stream.
Definition: iostream.cc:419
Input/output stream wrapper for reader/writer.
Definition: iostream.h:157
virtual std::string send_fail() const
Failure message from outstream.
Definition: iostream.cc:671
size_t recvbuf_size() const
Size of receive buffer.
Definition: iostream.cc:621
IoStream()=delete
No default construct.
virtual std::string recv_fail() const
Failure message from instream.
Definition: iostream.cc:661
IoStream & operator=(const IoStream &)=delete
Copy assign not allowed.
size_t sendbuf_size() const
Size of send buffer.
Definition: iostream.cc:631
Output stream wrapper for writer.
Definition: iostream.h:108
size_t sendbuf_size() const
Size of send buffer.
Definition: iostream.cc:499
OutStream()=delete
No default construct.
OutStream & operator=(const OutStream &)=delete
Copy assign not allowed.
virtual std::string send_fail() const
Failure message from outstream.
Definition: iostream.cc:519
Adds byte count to a read stream.
Definition: rwcounter.h:59
ReadCounter()
Reads will return 0 until reset.
Definition: iostream.cc:807
void read_reset(Reader &)
Reset the chained reader.
Definition: iostream.cc:831
virtual size_t read(void *, size_t)
Read interface.
Definition: iostream.cc:821
Adds timer to a read stream.
Definition: rwtimer.h:61
ReadTimer()
Reads return 0 until reset.
Definition: iostream.cc:700
virtual size_t read(void *, size_t)
Read and update time over underlying read.
Definition: iostream.cc:714
void read_reset(Reader &)
Reset the chained reader.
Definition: iostream.cc:726
RwCounter()
Reads and writes will return 0 until reset.
Definition: iostream.cc:895
RwLoopBuffer()
rw loop buffer
Definition: iostream.cc:911
RwTimer()
Reads and writes return 0 until reset.
Definition: iostream.cc:791
Adds byte count to a write stream.
Definition: rwcounter.h:97
void write_reset(Writer &)
Reset the chained writer.
Definition: iostream.cc:875
virtual size_t write(const void *, size_t)
Write interface.
Definition: iostream.cc:865
WriteCounter()
Writes will return 0 until reset.
Definition: iostream.cc:851
Adds timer to a write stream.
Definition: rwtimer.h:100
void write_reset(Writer &)
Reset the chained writer.
Definition: iostream.cc:772
virtual size_t write(const void *, size_t)
Write and update time over underlying write.
Definition: iostream.cc:760
WriteTimer()
Writes return 0 until reset.
Definition: iostream.cc:746
Input/output streaming pipeline.
Base input/output stream classes.
Read/write counter.
Loopback read/write buffer.
Read/write timer.
Chain of readers base class.
Definition: iopipeline.h:56
void rd_del(const std::shared_ptr< PipelineReader > &)
Delete a reader from the chain.
Definition: iostream.cc:923
virtual std::shared_ptr< Reader > rd_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
Definition: iostream.cc:932
InChain(const std::shared_ptr< Reader > &)
Create with base reader and read buffer size.
Definition: iostream.cc:919
std::shared_ptr< Reader > rd_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
Definition: iostream.cc:986
InPipeline(const std::shared_ptr< Reader > &, size_t=1024)
Create with base reader and read buffer size.
Definition: iostream.cc:983
std::shared_ptr< Writer > wr_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
Definition: iostream.cc:1014
IoPipeline(const std::shared_ptr< Reader > &, const std::shared_ptr< Writer > &, size_t=1024, size_t=1024)
Create with base reader/writer and buffer sizes.
Definition: iostream.cc:1003
std::shared_ptr< Reader > rd_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
Definition: iostream.cc:1007
Chain of writers base class.
Definition: iopipeline.h:94
void wr_del(const std::shared_ptr< PipelineWriter > &)
Delete a writer from the chain.
Definition: iostream.cc:955
OutChain(const std::shared_ptr< Writer > &)
Create with base writer and write buffer size.
Definition: iostream.cc:951
virtual std::shared_ptr< Writer > wr_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
Definition: iostream.cc:964
std::shared_ptr< Writer > wr_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
Definition: iostream.cc:996
OutPipeline(const std::shared_ptr< Writer > &, size_t=1024)
Create with base writer and write buffer size.
Definition: iostream.cc:993
Interface class for objects which can be read.
Definition: iobase.h:67
virtual size_t read(void *, size_t)=0
Read interface.
Interface class for objects which can be written.
Definition: iobase.h:86
virtual size_t write(const void *, size_t)=0
Write interface.