38 #include <system_error>
47 using namespace scc::util;
49 class InStreambuf :
public virtual std::streambuf
51 std::shared_ptr<Reader> m_shared;
53 std::vector<char> m_recv;
57 InStreambuf(
const InStreambuf&) =
delete;
58 void operator=(
const InStreambuf&) =
delete;
62 if (!m_reader)
throw std::runtime_error(
"InStreambuf not initialized");
69 len = m_reader->
read(m_recv.data(), m_recv.size());
71 catch (std::exception& ex)
86 virtual int_type underflow()
90 m_recv.resize(m_resize_recv);
98 return traits_type::eof();
102 setg((
char*)m_recv.data(), (
char*)m_recv.data(), (
char*)m_recv.data() + len);
104 return traits_type::to_int_type(m_recv[0]);
123 InStreambuf(
Reader& reader,
size_t recv_buf_sz) : m_reader(&reader), m_resize_recv(0)
125 m_recv.resize(recv_buf_sz);
128 setg((
char*)m_recv.data(), (
char*)m_recv.data(), (
char*)m_recv.data());
130 InStreambuf(
const std::shared_ptr<Reader>& reader,
size_t recv_buf_sz) : m_shared(reader), m_reader(m_shared.get()), m_resize_recv(0)
132 m_recv.resize(recv_buf_sz);
135 setg((
char*)m_recv.data(), (
char*)m_recv.data(), (
char*)m_recv.data());
139 void read_reset(
const std::shared_ptr<Reader>& reader)
142 m_reader = m_shared.get();
145 std::shared_ptr<Reader> read_shared()
const
150 size_t recvbuf_size()
const
152 if (m_resize_recv)
return m_resize_recv;
154 return m_recv.size();
157 void recvbuf_size(
size_t sz)
159 if (sz == 0)
throw std::runtime_error(
"input stream empty buffer");
165 std::string recv_fail()
const
169 void clear_recv_fail()
175 class OutStreambuf :
public virtual std::streambuf
177 std::shared_ptr<Writer> m_shared;
179 std::vector<char> m_send;
182 OutStreambuf(
const OutStreambuf&) =
delete;
183 void operator=(
const OutStreambuf&) =
delete;
185 bool send(
size_t len)
187 if (!m_writer)
throw std::runtime_error(
"OutStreambuf not initialized");
189 if (!len)
return true;
192 char* loc = (
char*)m_send.data();
201 sent = m_writer->
write(loc, left);
208 catch (std::exception& ex)
219 setp((
char*)m_send.data(), (
char*)m_send.data() + m_send.size() - 1);
230 virtual int_type overflow(int_type c)
232 if (traits_type::eq_int_type(traits_type::eof(), c))
237 traits_type::assign(*pptr(), c);
239 if (!send(m_send.size()))
241 return traits_type::eof();
256 size_t len = pptr() - pbase();
265 OutStreambuf(
Writer& writer,
size_t send_buf_sz) : m_writer(&writer)
267 m_send.resize(send_buf_sz);
270 setp((
char*)m_send.data(), (
char*)m_send.data() + m_send.size() - 1);
272 OutStreambuf(
const std::shared_ptr<Writer>& writer,
size_t send_buf_sz) : m_shared(writer), m_writer(m_shared.get())
274 m_send.resize(send_buf_sz);
277 setp((
char*)m_send.data(), (
char*)m_send.data() + m_send.size() - 1);
281 void write_reset(
const std::shared_ptr<Writer>& writer)
284 m_writer = m_shared.get();
287 std::shared_ptr<Writer> write_shared()
const
292 size_t sendbuf_size()
const
294 return m_send.size();
297 void sendbuf_size(
size_t sz)
299 if (sz == 0)
throw std::runtime_error(
"output stream empty buffer");
306 setp((
char*)m_send.data(), (
char*)m_send.data() + m_send.size() - 1);
309 std::string send_fail()
const
313 void clear_send_fail()
319 class IoStreambuf :
public InStreambuf,
public OutStreambuf
323 IoStreambuf(
Reader& reader,
Writer& writer,
size_t recv_buf_sz,
size_t send_buf_sz)
324 : InStreambuf(reader, recv_buf_sz), OutStreambuf(writer, send_buf_sz)
327 IoStreambuf(
const std::shared_ptr<Reader>& reader,
const std::shared_ptr<Writer>& writer,
size_t recv_buf_sz,
size_t send_buf_sz)
328 : InStreambuf(reader, recv_buf_sz), OutStreambuf(writer, send_buf_sz)
335 if (InStreambuf::sync() == -1)
return -1;
336 if (OutStreambuf::sync() == -1)
return -1;
343 if (recv_buf_sz == 0)
throw std::runtime_error(
"InStream empty buffer");
345 rdbuf(
new InStreambuf(reader, recv_buf_sz));
350 if (!reader)
throw std::runtime_error(
"InStream null pointer");
352 if (recv_buf_sz == 0)
throw std::runtime_error(
"InStream empty buffer");
354 rdbuf(
new InStreambuf(reader, recv_buf_sz));
357 InStream::~InStream()
362 void InStream::read_reset(
const std::shared_ptr<Reader>& reader)
364 if (!reader)
throw std::runtime_error(
"InStream reset with null shared");
366 InStreambuf* sb =
dynamic_cast<InStreambuf*
>(rdbuf());
369 throw std::runtime_error(
"invalid streambuffer");
371 sb->read_reset(reader);
374 std::shared_ptr<Reader> InStream::read_shared()
const
376 InStreambuf* sb =
dynamic_cast<InStreambuf*
>(rdbuf());
379 throw std::runtime_error(
"invalid streambuffer");
381 return sb->read_shared();
386 rdbuf(other.rdbuf());
387 other.rdbuf(
nullptr);
392 rdbuf(other.rdbuf());
393 other.rdbuf(
nullptr);
399 InStreambuf* sb =
dynamic_cast<InStreambuf*
>(rdbuf());
402 throw std::runtime_error(
"invalid streambuffer");
404 return sb->recvbuf_size();
409 if (sz == 0)
throw std::runtime_error(
"InStream empty buffer");
411 InStreambuf* sb =
dynamic_cast<InStreambuf*
>(rdbuf());
414 throw std::runtime_error(
"invalid streambuffer");
416 sb->recvbuf_size(sz);
421 InStreambuf* sb =
dynamic_cast<InStreambuf*
>(rdbuf());
424 throw std::runtime_error(
"invalid streambuffer");
426 return sb->recv_fail();
429 void InStream::clear(std::ios::iostate state)
431 std::istream::clear(state);
433 if (state != std::ios::goodbit)
return;
435 InStreambuf* sb =
dynamic_cast<InStreambuf*
>(rdbuf());
438 throw std::runtime_error(
"invalid streambuffer");
440 sb->clear_recv_fail();
445 if (send_buf_sz == 0)
throw std::runtime_error(
"OutStream empty buffer");
447 rdbuf(
new OutStreambuf(writer, send_buf_sz));
452 if (!writer)
throw std::runtime_error(
"OutStream null pointer");
454 if (send_buf_sz == 0)
throw std::runtime_error(
"OutStream empty buffer");
456 rdbuf(
new OutStreambuf(writer, send_buf_sz));
459 OutStream::~OutStream()
464 void OutStream::write_reset(
const std::shared_ptr<Writer>& writer)
466 if (!writer)
throw std::runtime_error(
"OutStream reset with null shared");
468 OutStreambuf* sb =
dynamic_cast<OutStreambuf*
>(rdbuf());
471 throw std::runtime_error(
"invalid streambuffer");
473 sb->write_reset(writer);
476 std::shared_ptr<Writer> OutStream::write_shared()
const
478 OutStreambuf* sb =
dynamic_cast<OutStreambuf*
>(rdbuf());
481 throw std::runtime_error(
"invalid streambuffer");
483 return sb->write_shared();
488 rdbuf(other.rdbuf());
489 other.rdbuf(
nullptr);
494 rdbuf(other.rdbuf());
495 other.rdbuf(
nullptr);
501 OutStreambuf* sb =
dynamic_cast<OutStreambuf*
>(rdbuf());
504 throw std::runtime_error(
"invalid streambuffer");
506 return sb->sendbuf_size();
511 OutStreambuf* sb =
dynamic_cast<OutStreambuf*
>(rdbuf());
514 throw std::runtime_error(
"invalid streambuffer");
516 sb->sendbuf_size(sz);
521 OutStreambuf* sb =
dynamic_cast<OutStreambuf*
>(rdbuf());
524 throw std::runtime_error(
"invalid streambuffer");
526 return sb->send_fail();
529 void OutStream::clear(std::ios::iostate state)
531 std::ostream::clear(state);
533 if (state != std::ios::goodbit)
return;
535 OutStreambuf* sb =
dynamic_cast<OutStreambuf*
>(rdbuf());
538 throw std::runtime_error(
"invalid streambuffer");
540 sb->clear_send_fail();
545 if (send_buf_sz == 0 || recv_buf_sz == 0)
throw std::runtime_error(
"IoStream empty buffer");
547 rdbuf(
new IoStreambuf(reader, writer, recv_buf_sz, send_buf_sz));
550 IoStream::IoStream(
const std::shared_ptr<Reader>& reader,
const std::shared_ptr<Writer>& writer,
size_t recv_buf_sz,
size_t send_buf_sz)
552 if (!reader || !writer)
throw std::runtime_error(
"IoStream null pointer(s)");
554 if (send_buf_sz == 0 || recv_buf_sz == 0)
throw std::runtime_error(
"IoStream empty buffer");
556 rdbuf(
new IoStreambuf(reader, writer, recv_buf_sz, send_buf_sz));
559 IoStream::~IoStream()
564 void IoStream::read_reset(
const std::shared_ptr<Reader>& reader)
566 if (!reader)
throw std::runtime_error(
"IoStream read reset with null shared");
568 IoStreambuf* sb =
dynamic_cast<IoStreambuf*
>(rdbuf());
571 throw std::runtime_error(
"invalid streambuffer");
573 sb->read_reset(reader);
576 void IoStream::write_reset(
const std::shared_ptr<Writer>& writer)
578 if (!writer)
throw std::runtime_error(
"IoStream write reset with null shared");
580 IoStreambuf* sb =
dynamic_cast<IoStreambuf*
>(rdbuf());
583 throw std::runtime_error(
"invalid streambuffer");
585 sb->write_reset(writer);
588 std::shared_ptr<Reader> IoStream::read_shared()
const
590 IoStreambuf* sb =
dynamic_cast<IoStreambuf*
>(rdbuf());
593 throw std::runtime_error(
"invalid streambuffer");
595 return sb->read_shared();
598 std::shared_ptr<Writer> IoStream::write_shared()
const
600 IoStreambuf* sb =
dynamic_cast<IoStreambuf*
>(rdbuf());
603 throw std::runtime_error(
"invalid streambuffer");
605 return sb->write_shared();
610 rdbuf(other.rdbuf());
611 other.rdbuf(
nullptr);
616 rdbuf(other.rdbuf());
617 other.rdbuf(
nullptr);
623 IoStreambuf* sb =
dynamic_cast<IoStreambuf*
>(rdbuf());
626 throw std::runtime_error(
"invalid streambuffer");
628 return sb->recvbuf_size();
633 IoStreambuf* sb =
dynamic_cast<IoStreambuf*
>(rdbuf());
636 throw std::runtime_error(
"invalid streambuffer");
638 return sb->sendbuf_size();
643 IoStreambuf* sb =
dynamic_cast<IoStreambuf*
>(rdbuf());
646 throw std::runtime_error(
"invalid streambuffer");
648 sb->recvbuf_size(sz);
653 IoStreambuf* sb =
dynamic_cast<IoStreambuf*
>(rdbuf());
656 throw std::runtime_error(
"invalid streambuffer");
658 sb->sendbuf_size(sz);
663 IoStreambuf* sb =
dynamic_cast<IoStreambuf*
>(rdbuf());
666 throw std::runtime_error(
"invalid streambuffer");
668 return sb->recv_fail();
673 IoStreambuf* sb =
dynamic_cast<IoStreambuf*
>(rdbuf());
676 throw std::runtime_error(
"invalid streambuffer");
678 return sb->send_fail();
681 void IoStream::clear(std::ios::iostate state)
683 std::iostream::clear(state);
685 if (state != std::ios::goodbit)
return;
687 IoStreambuf* sb =
dynamic_cast<IoStreambuf*
>(rdbuf());
690 throw std::runtime_error(
"invalid streambuffer");
692 sb->clear_send_fail();
693 sb->clear_recv_fail();
716 if (!m_reader)
return 0;
718 auto t1 = std::chrono::high_resolution_clock::now();
719 size_t rd = m_reader->
read(loc, len);
720 auto t2 = std::chrono::high_resolution_clock::now();
721 m_ticks += std::chrono::duration_cast<std::chrono::nanoseconds>(t2-t1).count();
738 throw std::runtime_error(
"ReadTimer reset with null pointer");
741 m_reader = m_shared.get();
762 if (!m_writer)
return 0;
764 auto t1 = std::chrono::high_resolution_clock::now();
765 size_t wr = m_writer->
write(loc, len);
766 auto t2 = std::chrono::high_resolution_clock::now();
767 m_ticks += std::chrono::duration_cast<std::chrono::nanoseconds>(t2-t1).count();
783 throw std::runtime_error(
"WriteTimer reset with null pointer");
786 m_writer = m_shared.get();
823 if (!m_reader)
return 0;
825 size_t rd = m_reader->
read(loc, len);
843 throw std::runtime_error(
"ReadCounter reset with null pointer");
846 m_reader = m_shared.get();
867 if (!m_writer)
return 0;
869 size_t wr = m_writer->
write(loc, len);
887 throw std::runtime_error(
"WriteCounter reset with null pointer");
890 m_writer = m_shared.get();
925 auto it = std::find_if(rd_chain.begin(), rd_chain.end(), [&r](
auto& v) { return v == r; });
927 if (it != rd_chain.end()) rd_chain.erase(it);
934 if (rd_chain.empty())
939 std::shared_ptr<PipelineReader> last;
941 for (
auto it = rd_chain.rbegin(); it != rd_chain.rend(); ++it)
943 if (it == rd_chain.rbegin()) (*it)->read_reset(rd_base);
944 else (*it)->read_reset(last);
957 auto it = std::find_if(wr_chain.begin(), wr_chain.end(), [&w](
auto& v) { return v == w; });
959 if (it != wr_chain.end()) wr_chain.erase(it);
966 if (wr_chain.empty())
971 std::shared_ptr<PipelineWriter> last;
973 for (
auto it = wr_chain.rbegin(); it != wr_chain.rend(); ++it)
975 if (it == wr_chain.rbegin()) (*it)->write_reset(wr_base);
976 else (*it)->write_reset(last);
Input stream wrapper for reader.
virtual size_t recvbuf_size() const
Size of receive buffer.
InStream & operator=(const InStream &)=delete
Copy assign not allowed.
InStream()=delete
No default construct.
virtual std::string recv_fail() const
Failure message from the input stream.
Input/output stream wrapper for reader/writer.
virtual std::string send_fail() const
Failure message from outstream.
size_t recvbuf_size() const
Size of receive buffer.
IoStream()=delete
No default construct.
virtual std::string recv_fail() const
Failure message from instream.
IoStream & operator=(const IoStream &)=delete
Copy assign not allowed.
size_t sendbuf_size() const
Size of send buffer.
Output stream wrapper for writer.
size_t sendbuf_size() const
Size of send buffer.
OutStream()=delete
No default construct.
OutStream & operator=(const OutStream &)=delete
Copy assign not allowed.
virtual std::string send_fail() const
Failure message from outstream.
Adds byte count to a read stream.
ReadCounter()
Reads will return 0 until reset.
void read_reset(Reader &)
Reset the chained reader.
virtual size_t read(void *, size_t)
Read interface.
Adds timer to a read stream.
ReadTimer()
Reads return 0 until reset.
virtual size_t read(void *, size_t)
Read and update time over underlying read.
void read_reset(Reader &)
Reset the chained reader.
RwCounter()
Reads and writes will return 0 until reset.
RwLoopBuffer()
rw loop buffer
RwTimer()
Reads and writes return 0 until reset.
Adds byte count to a write stream.
void write_reset(Writer &)
Reset the chained writer.
virtual size_t write(const void *, size_t)
Write interface.
WriteCounter()
Writes will return 0 until reset.
Adds timer to a write stream.
void write_reset(Writer &)
Reset the chained writer.
virtual size_t write(const void *, size_t)
Write and update time over underlying write.
WriteTimer()
Writes return 0 until reset.
Input/output streaming pipeline.
Base input/output stream classes.
Loopback read/write buffer.
Chain of readers base class.
void rd_del(const std::shared_ptr< PipelineReader > &)
Delete a reader from the chain.
virtual std::shared_ptr< Reader > rd_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
InChain(const std::shared_ptr< Reader > &)
Create with base reader and read buffer size.
std::shared_ptr< Reader > rd_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
InPipeline(const std::shared_ptr< Reader > &, size_t=1024)
Create with base reader and read buffer size.
std::shared_ptr< Writer > wr_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
IoPipeline(const std::shared_ptr< Reader > &, const std::shared_ptr< Writer > &, size_t=1024, size_t=1024)
Create with base reader/writer and buffer sizes.
std::shared_ptr< Reader > rd_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
Chain of writers base class.
void wr_del(const std::shared_ptr< PipelineWriter > &)
Delete a writer from the chain.
OutChain(const std::shared_ptr< Writer > &)
Create with base writer and write buffer size.
virtual std::shared_ptr< Writer > wr_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
std::shared_ptr< Writer > wr_fix_chain()
Fix the chain, and return the pointer that should be pointed to by the stream.
OutPipeline(const std::shared_ptr< Writer > &, size_t=1024)
Create with base writer and write buffer size.
Interface class for objects which can be read.
virtual size_t read(void *, size_t)=0
Read interface.
Interface class for objects which can be written.
virtual size_t write(const void *, size_t)=0
Write interface.