module System.ZMQ3 (
Size
, Context
, Socket
, Flag (SendMore)
, Switch (..)
, Timeout
, Event (..)
, EventType (..)
, EventMsg (..)
, Poll (..)
, SocketType
, Sender
, Receiver
, Subscriber
, Pair(..)
, Pub(..)
, Sub(..)
, XPub(..)
, XSub(..)
, Req(..)
, Rep(..)
, Dealer(..)
, Router(..)
, XReq
, XRep
, Pull(..)
, Push(..)
, withContext
, withSocket
, bind
, unbind
, connect
, send
, send'
, sendMulti
, receive
, receiveMulti
, version
, monitor
, poll
, System.ZMQ3.subscribe
, System.ZMQ3.unsubscribe
, ioThreads
, maxSockets
, setIoThreads
, setMaxSockets
, System.ZMQ3.affinity
, System.ZMQ3.backlog
, System.ZMQ3.delayAttachOnConnect
, System.ZMQ3.events
, System.ZMQ3.fileDescriptor
, System.ZMQ3.identity
, System.ZMQ3.ipv4Only
, System.ZMQ3.lastEndpoint
, System.ZMQ3.linger
, System.ZMQ3.maxMessageSize
, System.ZMQ3.mcastHops
, System.ZMQ3.moreToReceive
, System.ZMQ3.rate
, System.ZMQ3.receiveBuffer
, System.ZMQ3.receiveHighWM
, System.ZMQ3.receiveTimeout
, System.ZMQ3.reconnectInterval
, System.ZMQ3.reconnectIntervalMax
, System.ZMQ3.recoveryInterval
, System.ZMQ3.sendBuffer
, System.ZMQ3.sendHighWM
, System.ZMQ3.sendTimeout
, System.ZMQ3.tcpKeepAlive
, System.ZMQ3.tcpKeepAliveCount
, System.ZMQ3.tcpKeepAliveIdle
, System.ZMQ3.tcpKeepAliveInterval
, setAffinity
, setBacklog
, setDelayAttachOnConnect
, setIdentity
, setIpv4Only
, setLinger
, setMaxMessageSize
, setMcastHops
, setRate
, setReceiveBuffer
, setReceiveHighWM
, setReceiveTimeout
, setReconnectInterval
, setReconnectIntervalMax
, setRecoveryInterval
, setRouterMandatory
, setSendBuffer
, setSendHighWM
, setSendTimeout
, setTcpAcceptFilter
, setTcpKeepAlive
, setTcpKeepAliveCount
, setTcpKeepAliveIdle
, setTcpKeepAliveInterval
, setXPubVerbose
, Data.Restricted.restrict
, Data.Restricted.toRestricted
, ZMQError
, errno
, source
, message
, init
, term
, context
, destroy
, socket
, close
, waitRead
, waitWrite
, proxy
) where
import Prelude hiding (init)
import Control.Applicative
import Control.Exception
import Control.Monad (unless, void)
import Control.Monad.IO.Class
import Data.List (intersect, foldl')
import Data.List.NonEmpty (NonEmpty)
import Data.Restricted
import Foreign hiding (throwIf, throwIf_, throwIfNull, void)
import Foreign.C.String
import Foreign.C.Types (CInt, CShort)
import System.Posix.Types (Fd(..))
import System.ZMQ3.Base
import System.ZMQ3.Internal
import System.ZMQ3.Error
import qualified Data.ByteString as SB
import qualified Data.ByteString.Lazy as LB
import qualified Data.List.NonEmpty as S
import qualified Prelude as P
import qualified System.ZMQ3.Base as B
import GHC.Conc (threadWaitRead, threadWaitWrite)
data Pair = Pair
data Pub = Pub
data Sub = Sub
data XPub = XPub
data XSub = XSub
data Req = Req
data Rep = Rep
data Dealer = Dealer
type XReq = Dealer
data Router = Router
type XRep = Router
data Pull = Pull
data Push = Push
class Subscriber a
class Sender a
class Receiver a
instance SocketType Pair where zmqSocketType = const pair
instance Sender Pair
instance Receiver Pair
instance SocketType Pub where zmqSocketType = const pub
instance Sender Pub
instance SocketType Sub where zmqSocketType = const sub
instance Subscriber Sub
instance Receiver Sub
instance SocketType XPub where zmqSocketType = const xpub
instance Sender XPub
instance Receiver XPub
instance SocketType XSub where zmqSocketType = const xsub
instance Sender XSub
instance Receiver XSub
instance SocketType Req where zmqSocketType = const request
instance Sender Req
instance Receiver Req
instance SocketType Rep where zmqSocketType = const response
instance Sender Rep
instance Receiver Rep
instance SocketType Dealer where zmqSocketType = const dealer
instance Sender Dealer
instance Receiver Dealer
instance SocketType Router where zmqSocketType = const router
instance Sender Router
instance Receiver Router
instance SocketType Pull where zmqSocketType = const pull
instance Receiver Pull
instance SocketType Push where zmqSocketType = const push
instance Sender Push
data Event =
In
| Out
| Err
deriving (Eq, Ord, Read, Show)
data Poll m where
Sock :: Socket s -> [Event] -> Maybe ([Event] -> m ()) -> Poll m
File :: Fd -> [Event] -> Maybe ([Event] -> m ()) -> Poll m
version :: IO (Int, Int, Int)
version =
with 0 $ \major_ptr ->
with 0 $ \minor_ptr ->
with 0 $ \patch_ptr ->
c_zmq_version major_ptr minor_ptr patch_ptr >>
tupleUp <$> peek major_ptr <*> peek minor_ptr <*> peek patch_ptr
where
tupleUp a b c = (fromIntegral a, fromIntegral b, fromIntegral c)
init :: Size -> IO Context
init n = do
c <- context
setIoThreads n c
return c
context :: IO Context
context = Context <$> throwIfNull "init" c_zmq_ctx_new
term :: Context -> IO ()
term = destroy
destroy :: Context -> IO ()
destroy c = throwIfMinus1Retry_ "term" . c_zmq_ctx_destroy . _ctx $ c
withContext :: (Context -> IO a) -> IO a
withContext act =
bracket (throwIfNull "withContext (new)" $ c_zmq_ctx_new)
(throwIfMinus1Retry_ "withContext (destroy)" . c_zmq_ctx_destroy)
(act . Context)
withSocket :: SocketType a => Context -> a -> (Socket a -> IO b) -> IO b
withSocket c t = bracket (socket c t) close
socket :: SocketType a => Context -> a -> IO (Socket a)
socket c t = Socket <$> mkSocketRepr t c
close :: Socket a -> IO ()
close = closeSock . _socketRepr
subscribe :: Subscriber a => Socket a -> SB.ByteString -> IO ()
subscribe s = setByteStringOpt s B.subscribe
unsubscribe :: Subscriber a => Socket a -> SB.ByteString -> IO ()
unsubscribe s = setByteStringOpt s B.unsubscribe
events :: Socket a -> IO [Event]
events s = toEvents <$> getIntOpt s B.events 0
fileDescriptor :: Socket a -> IO Fd
fileDescriptor s = Fd . fromIntegral <$> getInt32Option B.filedesc s
moreToReceive :: Socket a -> IO Bool
moreToReceive s = (== 1) <$> getInt32Option B.receiveMore s
ioThreads :: Context -> IO Word
ioThreads = ctxIntOption "ioThreads" _ioThreads
maxSockets :: Context -> IO Word
maxSockets = ctxIntOption "maxSockets" _maxSockets
identity :: Socket a -> IO SB.ByteString
identity s = getByteStringOpt s B.identity
affinity :: Socket a -> IO Word64
affinity s = getIntOpt s B.affinity 0
maxMessageSize :: Socket a -> IO Int64
maxMessageSize s = getIntOpt s B.maxMessageSize 0
ipv4Only :: Socket a -> IO Bool
ipv4Only s = (== 1) <$> getInt32Option B.ipv4Only s
backlog :: Socket a -> IO Int
backlog = getInt32Option B.backlog
delayAttachOnConnect :: Socket a -> IO Bool
delayAttachOnConnect s = (== 1) <$> getInt32Option B.delayAttachOnConnect s
linger :: Socket a -> IO Int
linger = getInt32Option B.linger
lastEndpoint :: Socket a -> IO String
lastEndpoint s = getStrOpt s B.lastEndpoint
rate :: Socket a -> IO Int
rate = getInt32Option B.rate
receiveBuffer :: Socket a -> IO Int
receiveBuffer = getInt32Option B.receiveBuf
reconnectInterval :: Socket a -> IO Int
reconnectInterval = getInt32Option B.reconnectIVL
reconnectIntervalMax :: Socket a -> IO Int
reconnectIntervalMax = getInt32Option B.reconnectIVLMax
recoveryInterval :: Socket a -> IO Int
recoveryInterval = getInt32Option B.recoveryIVL
sendBuffer :: Socket a -> IO Int
sendBuffer = getInt32Option B.sendBuf
mcastHops :: Socket a -> IO Int
mcastHops = getInt32Option B.mcastHops
receiveHighWM :: Socket a -> IO Int
receiveHighWM = getInt32Option B.receiveHighWM
receiveTimeout :: Socket a -> IO Int
receiveTimeout = getInt32Option B.receiveTimeout
sendTimeout :: Socket a -> IO Int
sendTimeout = getInt32Option B.sendTimeout
sendHighWM :: Socket a -> IO Int
sendHighWM = getInt32Option B.sendHighWM
tcpKeepAlive :: Socket a -> IO Switch
tcpKeepAlive s = getInt32Option B.tcpKeepAlive s >>= convert . toSwitch
where
convert Nothing = throwError "Invalid value for ZMQ_TCP_KEEPALIVE"
convert (Just i) = return i
tcpKeepAliveCount :: Socket a -> IO Int
tcpKeepAliveCount = getInt32Option B.tcpKeepAliveCount
tcpKeepAliveIdle :: Socket a -> IO Int
tcpKeepAliveIdle = getInt32Option B.tcpKeepAliveIdle
tcpKeepAliveInterval :: Socket a -> IO Int
tcpKeepAliveInterval = getInt32Option B.tcpKeepAliveInterval
setIoThreads :: Word -> Context -> IO ()
setIoThreads n = setCtxIntOption "ioThreads" _ioThreads n
setMaxSockets :: Word -> Context -> IO ()
setMaxSockets n = setCtxIntOption "maxSockets" _maxSockets n
setIdentity :: Restricted N1 N254 SB.ByteString -> Socket a -> IO ()
setIdentity x s = setByteStringOpt s B.identity (rvalue x)
setAffinity :: Word64 -> Socket a -> IO ()
setAffinity x s = setIntOpt s B.affinity x
setDelayAttachOnConnect :: Bool -> Socket a -> IO ()
setDelayAttachOnConnect x s = setIntOpt s B.delayAttachOnConnect (bool2cint x)
setMaxMessageSize :: Integral i => Restricted Nneg1 Int64 i -> Socket a -> IO ()
setMaxMessageSize x s = setIntOpt s B.maxMessageSize ((fromIntegral . rvalue $ x) :: Int64)
setIpv4Only :: Bool -> Socket a -> IO ()
setIpv4Only x s = setIntOpt s B.ipv4Only (bool2cint x)
setLinger :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
setLinger = setInt32OptFromRestricted B.linger
setReceiveTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
setReceiveTimeout = setInt32OptFromRestricted B.receiveTimeout
setRouterMandatory :: Bool -> Socket Router -> IO ()
setRouterMandatory x s = setIntOpt s B.routerMandatory (bool2cint x)
setSendTimeout :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
setSendTimeout = setInt32OptFromRestricted B.sendTimeout
setRate :: Integral i => Restricted N1 Int32 i -> Socket a -> IO ()
setRate = setInt32OptFromRestricted B.rate
setMcastHops :: Integral i => Restricted N1 Int32 i -> Socket a -> IO ()
setMcastHops = setInt32OptFromRestricted B.mcastHops
setBacklog :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
setBacklog = setInt32OptFromRestricted B.backlog
setReceiveBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
setReceiveBuffer = setInt32OptFromRestricted B.receiveBuf
setReconnectInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
setReconnectInterval = setInt32OptFromRestricted B.reconnectIVL
setReconnectIntervalMax :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
setReconnectIntervalMax = setInt32OptFromRestricted B.reconnectIVLMax
setSendBuffer :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
setSendBuffer = setInt32OptFromRestricted B.sendBuf
setRecoveryInterval :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
setRecoveryInterval = setInt32OptFromRestricted B.recoveryIVL
setReceiveHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
setReceiveHighWM = setInt32OptFromRestricted B.receiveHighWM
setSendHighWM :: Integral i => Restricted N0 Int32 i -> Socket a -> IO ()
setSendHighWM = setInt32OptFromRestricted B.sendHighWM
setTcpAcceptFilter :: Maybe SB.ByteString -> Socket a -> IO ()
setTcpAcceptFilter Nothing sock = onSocket "setTcpAcceptFilter" sock $ \s ->
throwIfMinus1Retry_ "setStrOpt" $
c_zmq_setsockopt s (optVal tcpAcceptFilter) nullPtr 0
setTcpAcceptFilter (Just dat) sock = setByteStringOpt sock tcpAcceptFilter dat
setTcpKeepAlive :: Switch -> Socket a -> IO ()
setTcpKeepAlive x s = setIntOpt s B.tcpKeepAlive (fromSwitch x :: CInt)
setTcpKeepAliveCount :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
setTcpKeepAliveCount = setInt32OptFromRestricted B.tcpKeepAliveCount
setTcpKeepAliveIdle :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
setTcpKeepAliveIdle = setInt32OptFromRestricted B.tcpKeepAliveIdle
setTcpKeepAliveInterval :: Integral i => Restricted Nneg1 Int32 i -> Socket a -> IO ()
setTcpKeepAliveInterval = setInt32OptFromRestricted B.tcpKeepAliveInterval
setXPubVerbose :: Bool -> Socket XPub -> IO ()
setXPubVerbose x s = setIntOpt s B.xpubVerbose (bool2cint x)
bind :: Socket a -> String -> IO ()
bind sock str = onSocket "bind" sock $
throwIfMinus1Retry_ "bind" . withCString str . c_zmq_bind
unbind :: Socket a -> String -> IO ()
unbind sock str = onSocket "unbind" sock $
throwIfMinus1Retry_ "unbind" . withCString str . c_zmq_unbind
connect :: Socket a -> String -> IO ()
connect sock str = onSocket "connect" sock $
throwIfMinus1Retry_ "connect" . withCString str . c_zmq_connect
send :: Sender a => Socket a -> [Flag] -> SB.ByteString -> IO ()
send sock fls val = bracket (messageOf val) messageClose $ \m ->
onSocket "send" sock $ \s ->
retry "send" (waitWrite sock) $
c_zmq_sendmsg s (msgPtr m) (combineFlags (DontWait : fls))
send' :: Sender a => Socket a -> [Flag] -> LB.ByteString -> IO ()
send' sock fls val = bracket (messageOfLazy val) messageClose $ \m ->
onSocket "send'" sock $ \s ->
retry "send'" (waitWrite sock) $
c_zmq_sendmsg s (msgPtr m) (combineFlags (DontWait : fls))
sendMulti :: Sender a => Socket a -> NonEmpty SB.ByteString -> IO ()
sendMulti sock msgs = do
mapM_ (send sock [SendMore]) (S.init msgs)
send sock [] (S.last msgs)
receive :: Receiver a => Socket a -> IO (SB.ByteString)
receive sock = bracket messageInit messageClose $ \m ->
onSocket "receive" sock $ \s -> do
retry "receive" (waitRead sock) $
c_zmq_recvmsg s (msgPtr m) (flagVal dontWait)
data_ptr <- c_zmq_msg_data (msgPtr m)
size <- c_zmq_msg_size (msgPtr m)
SB.packCStringLen (data_ptr, fromIntegral size)
receiveMulti :: Receiver a => Socket a -> IO [SB.ByteString]
receiveMulti sock = recvall []
where
recvall acc = do
msg <- receive sock
moreToReceive sock >>= next (msg:acc)
next acc True = recvall acc
next acc False = return (reverse acc)
socketMonitor :: [EventType] -> String -> Socket a -> IO ()
socketMonitor es addr soc = onSocket "socketMonitor" soc $ \s ->
withCString addr $ \a ->
throwIfMinus1_ "zmq_socket_monitor" $
c_zmq_socket_monitor s a (events2cint es)
monitor :: [EventType] -> Context -> Socket a -> IO (Bool -> IO (Maybe EventMsg))
monitor es ctx sock = do
let addr = "inproc://" ++ show (_socket . _socketRepr $ sock)
s <- socket ctx Pair
socketMonitor es addr sock
connect s addr
next s <$> messageInit
where
next soc m False = messageClose m `finally` close soc >> return Nothing
next soc m True = onSocket "recv" soc $ \s -> do
retry "recv" (waitRead soc) $ c_zmq_recvmsg s (msgPtr m) (flagVal dontWait)
ptr <- c_zmq_msg_data (msgPtr m)
str <- peekByteOff ptr zmqEventAddrOffset >>= SB.packCString
dat <- peekByteOff ptr zmqEventDataOffset :: IO CInt
tag <- peek ptr :: IO CInt
return . Just $ eventMessage str dat (ZMQEventType tag)
poll :: MonadIO m => Timeout -> [Poll m] -> m [[Event]]
poll _ [] = return []
poll to desc = do
let len = length desc
let ps = map toZMQPoll desc
ps' <- liftIO $ withArray ps $ \ptr -> do
throwIfMinus1Retry_ "poll" $
c_zmq_poll ptr (fromIntegral len) (fromIntegral to)
peekArray len ptr
mapM fromZMQPoll (zip desc ps')
where
toZMQPoll :: MonadIO m => Poll m -> ZMQPoll
toZMQPoll (Sock (Socket (SocketRepr s _)) e _) =
ZMQPoll s 0 (combine (map fromEvent e)) 0
toZMQPoll (File (Fd s) e _) =
ZMQPoll nullPtr (fromIntegral s) (combine (map fromEvent e)) 0
fromZMQPoll :: MonadIO m => (Poll m, ZMQPoll) -> m [Event]
fromZMQPoll (p, zp) = do
let e = toEvents . fromIntegral . pRevents $ zp
let (e', f) = case p of
(Sock _ x g) -> (x, g)
(File _ x g) -> (x, g)
unless (null (e `intersect` e')) $
maybe (return ()) ($ e) f
return e
fromEvent :: Event -> CShort
fromEvent In = fromIntegral . pollVal $ pollIn
fromEvent Out = fromIntegral . pollVal $ pollOut
fromEvent Err = fromIntegral . pollVal $ pollerr
toEvents :: Word32 -> [Event]
toEvents e = foldl' (\es f -> f e es) [] tests
where
tests =
[ \i xs -> if i .&. (fromIntegral . pollVal $ pollIn) /= 0 then In:xs else xs
, \i xs -> if i .&. (fromIntegral . pollVal $ pollOut) /= 0 then Out:xs else xs
, \i xs -> if i .&. (fromIntegral . pollVal $ pollerr) /= 0 then Err:xs else xs
]
retry :: String -> IO () -> IO CInt -> IO ()
retry msg wait act = throwIfMinus1RetryMayBlock_ msg act wait
wait' :: (Fd -> IO ()) -> ZMQPollEvent -> Socket a -> IO ()
wait' w f s = do
fd <- getIntOpt s B.filedesc 0
w (Fd fd)
evs <- getInt32Option B.events s
unless (testev evs) $
wait' w f s
where
testev e = e .&. fromIntegral (pollVal f) /= 0
waitRead :: Socket a -> IO ()
waitRead = wait' threadWaitRead pollIn
waitWrite :: Socket a -> IO ()
waitWrite = wait' threadWaitWrite pollOut
proxy :: Socket a -> Socket b -> Maybe (Socket c) -> IO ()
proxy front back capture =
onSocket "proxy-front" front $ \f ->
onSocket "proxy-back" back $ \b ->
void (c_zmq_proxy f b c)
where
c = maybe nullPtr (_socket . _socketRepr) capture