/*
 * call-seq:
 *    conn.wait_for_notify( [ timeout ] ) -> String
 *    conn.wait_for_notify( [ timeout ] ) { |event, pid| block }
 *
 * Blocks while waiting for notification(s), or until the optional
 * _timeout_ is reached, whichever comes first.  _timeout_ is
 * measured in seconds and can be fractional.
 *
 * Returns +nil+ if _timeout_ is reached, the name of the NOTIFY
 * event otherwise.  If used in block form, passes the name of the
 * NOTIFY +event+ and the generating +pid+ into the block.
 * 
 */
static VALUE
pgconn_wait_for_notify(int argc, VALUE *argv, VALUE self)
{
        PGconn *conn = get_pgconn( self );
        PGnotify *notification;
        int sd = PQsocket( conn );
        int ret;
        struct timeval timeout;
        struct timeval *ptimeout = NULL;
        VALUE timeout_in, relname = Qnil, be_pid = Qnil, extra = Qnil;
        double timeout_sec;
        fd_set sd_rset;
#ifdef _WIN32
        fd_set crt_sd_rset;
#endif

        if ( sd < 0 )
                rb_bug( "PQsocket(conn): couldn't fetch the connection's socket!" );

        if ( rb_scan_args(argc, argv, "01", &timeout_in) == 1 ) {
                timeout_sec = NUM2DBL( timeout_in );
                timeout.tv_sec = (long)timeout_sec;
                timeout.tv_usec = (long)( (timeout_sec - (long)timeout_sec) * 1e6 );
                ptimeout = &timeout;
        }

        /* Check for notifications */
        while ( (notification = PQnotifies(conn)) == NULL ) {
                FD_ZERO( &sd_rset );
                FD_SET( sd, &sd_rset );

#ifdef _WIN32
                create_crt_fd(&sd_rset, &crt_sd_rset);
#endif

                /* Wait for the socket to become readable before checking again */
                ret = rb_thread_select( sd+1, &sd_rset, NULL, NULL, ptimeout );

#ifdef _WIN32
                cleanup_crt_fd(&sd_rset, &crt_sd_rset);
#endif

                if ( ret < 0 )
                        rb_sys_fail( 0 );

                /* Return nil if the select timed out */
                if ( ret == 0 ) return Qnil;

                /* Read the socket */
                if ( (ret = PQconsumeInput(conn)) != 1 )
                        rb_raise( rb_ePGError, "PQconsumeInput == %d: %s", ret, PQerrorMessage(conn) );
        }

        relname = rb_tainted_str_new2( notification->relname );
        be_pid = INT2NUM( notification->be_pid );
#ifdef HAVE_ST_NOTIFY_EXTRA
        extra = rb_str_new2( notification->extra );
#endif
        PQfreemem( notification );

        if ( rb_block_given_p() )
                rb_yield_values( 3, relname, be_pid, extra );

        return relname;
}