WvStreams
wvstream.h
1 /* -*- Mode: C++ -*-
2  * Worldvisions Weaver Software:
3  * Copyright (C) 1997-2002 Net Integration Technologies, Inc.
4  *
5  * Provides basic streaming I/O support.
6  */
7 #ifndef __WVSTREAM_H
8 #define __WVSTREAM_H
9 
10 #include "iwvstream.h"
11 #include "wvtimeutils.h"
12 #include "wvstreamsdebugger.h"
13 #include <errno.h>
14 #include <limits.h>
15 #include "wvattrs.h"
16 
24 class WvStream: public IWvStream
25 {
26  IMPLEMENT_IOBJECT(WvStream);
27 
28  WvString my_wsname;
29  WSID my_wsid;
30  WvAttrs attrs;
31 public:
37 
43 
46 
49 
55 
57  bool stop_read, stop_write, closed;
58 
60  WvStream();
61  virtual ~WvStream();
62 
70  virtual void close();
71 
73  virtual void seterr(int _errnum);
74  void seterr(WvStringParm specialerr)
75  { WvErrorBase::seterr(specialerr); }
76  void seterr(WVSTRING_FORMAT_DECL)
77  { seterr(WvString(WVSTRING_FORMAT_CALL)); }
78 
80  virtual bool isok() const;
81 
83  virtual size_t read(void *buf, size_t count);
84 
94  virtual size_t read(WvBuf &outbuf, size_t count);
95 
101  virtual void unread(WvBuf &outbuf, size_t count);
102 
109  virtual size_t write(const void *buf, size_t count);
110 
118  virtual size_t write(WvBuf &inbuf, size_t count = INT_MAX);
119 
129  void outbuf_limit(size_t size)
130  { max_outbuf_size = size; }
131 
132  virtual void noread();
133  virtual void nowrite();
134  virtual void maybe_autoclose();
135 
136  virtual bool isreadable();
137  virtual bool iswritable();
138 
146  virtual size_t uread(void *buf, size_t count)
147  { return 0; /* basic WvStream doesn't actually do anything! */ }
148 
156  virtual size_t uwrite(const void *buf, size_t count)
157  { return count; /* basic WvStream doesn't actually do anything! */ }
158 
175  char *getline(time_t wait_msec = 0,
176  char separator = '\n', int readahead = 1024)
177  {
178  return blocking_getline(wait_msec, separator, readahead);
179  }
180 
182  char *getline(int wait_msec,
183  char separator = '\n', int readahead = 1024)
184  {
185  return getline(time_t(wait_msec), separator, readahead);
186  }
187 
189  char *getline(double wait_msec,
190  char separator = '\n', int readahead = 1024)
191  {
192  return getline(time_t(wait_msec), separator, readahead);
193  }
194 
195 private:
200  char *getline(char, int i = 0);
201  char *getline(bool, int i = 0);
202 public:
203 
215  char *blocking_getline(time_t wait_msec, int separator = '\n',
216  int readahead = 1024);
217 
222  char *continue_getline(time_t wait_msec, int separator = '\n',
223  int readahead = 1024);
224 
232  void queuemin(size_t count)
233  { queue_min = count; }
234 
239  void drain();
240 
246  void delay_output(bool is_delayed)
247  {
248  outbuf_delayed_flush = is_delayed;
249  want_to_flush = !is_delayed;
250  }
251 
258  void auto_flush(bool is_automatic)
259  { is_auto_flush = is_automatic; }
260 
267  virtual bool flush(time_t msec_timeout);
268 
269  virtual bool should_flush();
270 
277  void flush_then_close(int msec_timeout);
278 
300  virtual void pre_select(SelectInfo &si);
301 
306  void pre_select(SelectInfo &si, const SelectRequest &r)
307  {
308  SelectRequest oldwant = si.wants;
309  si.wants = r;
310  pre_select(si);
311  si.wants = oldwant;
312  }
313 
319  { pre_select(si, r); }
320 
333  virtual bool post_select(SelectInfo &si);
334 
340  { return post_select(si, r); }
341 
347  {
348  SelectRequest oldwant = si.wants;
349  si.wants = r;
350  bool val = post_select(si);
351  si.wants = oldwant;
352  return val;
353  }
354 
376  bool select(time_t msec_timeout)
377  { return _select(msec_timeout, false, false, false, true); }
378 
391  void runonce(time_t msec_timeout = -1)
392  { if (select(msec_timeout)) callback(); }
393 
415  bool select(time_t msec_timeout,
416  bool readable, bool writable, bool isex = false)
417  { return _select(msec_timeout, readable, writable, isex, false); }
418 
425 
434  void force_select(bool readable, bool writable, bool isexception = false);
435 
440  void undo_force_select(bool readable, bool writable,
441  bool isexception = false);
442 
460  bool continue_select(time_t msec_timeout);
461 
468 
473  virtual const WvAddr *src() const;
474 
479  void setcallback(IWvStreamCallback _callfunc);
480 
482  IWvStreamCallback setreadcallback(IWvStreamCallback _callback);
483 
485  IWvStreamCallback setwritecallback(IWvStreamCallback _callback);
486 
489  IWvStreamCallback setexceptcallback(IWvStreamCallback _callback);
490 
492  IWvStreamCallback setclosecallback(IWvStreamCallback _callback);
493 
499  void autoforward(WvStream &s);
500 
502  void noautoforward();
503  static void autoforward_callback(WvStream &input, WvStream &output);
504 
508  void *_callwrap(void *);
509 
513  void _callback();
514 
519  virtual void callback();
520 
525  void alarm(time_t msec_timeout);
526 
532  time_t alarm_remaining();
533 
538  size_t write(WvStringParm s)
539  { return write(s.cstr(), s.len()); }
540  size_t print(WvStringParm s)
541  { return write(s); }
542  size_t operator() (WvStringParm s)
543  { return write(s); }
544 
546  size_t print(WVSTRING_FORMAT_DECL)
547  { return write(WvString(WVSTRING_FORMAT_CALL)); }
548  size_t operator() (WVSTRING_FORMAT_DECL)
549  { return write(WvString(WVSTRING_FORMAT_CALL)); }
550 
551  const char *wsname() const
552  { return my_wsname; }
553  void set_wsname(WvStringParm wsname)
554  { my_wsname = wsname; }
555  void set_wsname(WVSTRING_FORMAT_DECL)
556  { set_wsname(WvString(WVSTRING_FORMAT_CALL)); }
557 
558  const char *wstype() const { return "WvStream"; }
559 
560  WSID wsid() const { return my_wsid; }
561  static IWvStream *find_by_wsid(WSID wsid);
562 
563  virtual WvString getattr(WvStringParm name) const
564  { return attrs.get(name); }
565 
566  // ridiculous hackery for now so that the wvstream unit test can poke
567  // around in the insides of WvStream. Eventually, inbuf will go away
568  // from the base WvStream class, so nothing like this will be needed.
569 #ifdef __WVSTREAM_UNIT_TEST
570 public:
571  size_t outbuf_used()
572  { return outbuf.used(); }
573  size_t inbuf_used()
574  { return inbuf.used(); }
575  void inbuf_putstr(WvStringParm t)
576  { inbuf.putstr(t); }
577 #endif
578 
579 protected:
580  void setattr(WvStringParm name, WvStringParm value)
581  { attrs.set(name, value); }
582  // builds the SelectInfo data structure (runs pre_select)
583  // returns true if there are callbacks to be dispatched
584  //
585  // all of the fields are filled in with new values
586  // si.msec_timeout contains the time until the next alarm expires
587  void _build_selectinfo(SelectInfo &si, time_t msec_timeout,
588  bool readable, bool writable, bool isexcept,
589  bool forceable);
590 
591  // runs the actual select() function over the given
592  // SelectInfo data structure, returns the number of descriptors
593  // in the set, and sets the error code if a problem occurs
594  int _do_select(SelectInfo &si);
595 
596  // processes the SelectInfo data structure (runs post_select)
597  // returns true if there are callbacks to be dispatched
598  bool _process_selectinfo(SelectInfo &si, bool forceable);
599 
600  // tries to empty the output buffer if the stream is writable
601  // not quite the same as flush() since it merely empties the output
602  // buffer asynchronously whereas flush() might have other semantics
603  // also handles autoclose (eg. after flush)
604  bool flush_outbuf(time_t msec_timeout);
605 
606  // called once flush() has emptied outbuf to ensure that any other
607  // internal stream buffers actually do get flushed before it returns
608  virtual bool flush_internal(time_t msec_timeout);
609 
610  // the real implementations for these are actually in WvFDStream, which
611  // is where they belong. By IWvStream needs them to exist for now, so
612  // it's a hack. In standard WvStream they return -1.
613  virtual int getrfd() const;
614  virtual int getwfd() const;
615 
616  // FIXME: this one is so bad, I'm not touching it. Quick hack to
617  // make it work anyway.
618  friend class WvHTTPClientProxyStream;
619 
620  WvDynBuf inbuf, outbuf;
621 
622  IWvStreamCallback callfunc;
623  wv::function<void*(void*)> call_ctx;
624 
625  IWvStreamCallback readcb, writecb, exceptcb, closecb;
626 
627  size_t max_outbuf_size;
628  bool outbuf_delayed_flush;
629  bool is_auto_flush;
630 
631  // Used to guard against excessive flushing when using delay_flush
632  bool want_to_flush;
633 
634  // Used to ensure we don't flush recursively.
635  bool is_flushing;
636 
637  size_t queue_min; // minimum bytes to read()
638  time_t autoclose_time; // close eventually, even if output is queued
639  WvTime alarm_time; // select() returns true at this time
640  WvTime last_alarm_check; // last time we checked the alarm_remaining
641 
652  virtual void execute()
653  { }
654 
655  // every call to select() selects on the globalstream.
656  static WvStream *globalstream;
657 
658  static void debugger_streams_display_header(WvStringParm cmd,
659  WvStreamsDebugger::ResultCallback result_cb);
660  static void debugger_streams_display_one_stream(WvStream *s,
661  WvStringParm cmd,
662  WvStreamsDebugger::ResultCallback result_cb);
663  static void debugger_streams_maybe_display_one_stream(WvStream *s,
664  WvStringParm cmd,
665  const WvStringList &args,
666  WvStreamsDebugger::ResultCallback result_cb);
667 
668 private:
670  bool _select(time_t msec_timeout,
671  bool readable, bool writable, bool isexcept,
672  bool forceable);
673 
674  void legacy_callback();
675 
677  WvStream(const WvStream &s);
678  WvStream& operator= (const WvStream &s);
679  static void add_debugger_commands();
680 
681  static WvString debugger_streams_run_cb(WvStringParm cmd,
682  WvStringList &args,
683  WvStreamsDebugger::ResultCallback result_cb, void *);
684  static WvString debugger_close_run_cb(WvStringParm cmd,
685  WvStringList &args,
686  WvStreamsDebugger::ResultCallback result_cb, void *);
687 };
688 
695 extern WvStream *wvcon; // tied stdin and stdout stream
696 extern WvStream *wvin; // stdin stream
697 extern WvStream *wvout; // stdout stream
698 extern WvStream *wverr; // stderr stream
699 
700 #endif // __WVSTREAM_H