Commit 35a94ed9 authored by Bjørnar Hansen's avatar Bjørnar Hansen Committed by Bjørnar Hansen
Browse files

Open source release.

parents
.cabal-sandbox/
dist/
cabal.sandbox.config
*.swp
*.hi
*.o
Copyright (c) 2015, Tingtun AS, http://tingtun.no
All rights reserved.
Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met:
1. Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer.
2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution.
3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR
BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import Distribution.Simple
main = defaultMain
{-# LANGUAGE PackageImports #-}
module Main where
import Control.Concurrent (forkIO, threadDelay)
import Control.Monad
import "monads-tf" Control.Monad.Error
import Data.Monoid
import Network.TTRPC
import System.Log.Logging
ping :: Conn -> IO String
ping = method "ping"
ping' :: Conn -> Method () String
ping' = method "ping"
add :: Conn -> Int -> Int -> IO Int
add = method "adder"
lateGreeter :: Conn -> String -> IO ()
lateGreeter = method "lateGreeter"
longRunningJob :: Conn -> String -> Method String String
longRunningJob = method "longRunningJob"
faulty :: Conn -> ErrorT RpcError IO ()
faulty = method "faulty"
faulty' :: Conn -> Method () ()
faulty' = method "faulty"
badCallType :: Conn -> String -> ErrorT RpcError IO String
badCallType = method "ping"
badReturnType :: Conn -> ErrorT RpcError IO Int
badReturnType = method "ping"
callbackHandler :: String -> Noreply (IO ())
callbackHandler s = return $ putStrLn s
allocate :: Conn -> Int -> Method String Bool
allocate = method "allocate"
deallocate :: Conn -> Int -> Method String Bool
deallocate = method "deallocate"
main2 = withLogging "client-test.log" $ do
scr <- newServerCallbacksRef "tcp://localhost:5556"
putStrLn "Firing up server for callbacks."
ctx <- context
forkIO $ server (Just scr) Nothing (Just ctx) 1 "tcp://*:5556"
(serve "callbackHandler" callbackHandler)
s <- connection ctx "tcp://127.0.0.1:5555"
putStrLn "Pinging. Result:"
print =<< ping s
j <- asyncCall' scr $ allocate s 10
monitor j
threadDelay 4000000
j' <- asyncCall' scr $ deallocate s 10
monitor j'
monitor job = loop
where loop = do status <- getAsyncStatus' job
putStrLn $ "- Status: " ++ show status
done <- tryGetCallResult' job
case done of
Just r -> putStrLn $ "- Returned: " ++ show r
Nothing -> do putStrLn "- Not done."
threadDelay 500000
loop
main = withLogging "client-test.log" $ do
scr <- newServerCallbacksRef "tcp://localhost:5556"
putStrLn "Firing up server for callbacks."
ctx <- context
forkIO $ server (Just scr) Nothing (Just ctx) 1 "tcp://*:5556"
(serve "callbackHandler" callbackHandler)
s <- connection ctx "tcp://127.0.0.1:5555"
putStrLn "Pinging. Result:"
print =<< ping s
putStrLn "Adding 15 and 27. Result:"
print =<< add s 15 27
putStrLn "Firing a greeter, check server log."
lateGreeter s "World!"
putStrLn "Done."
putStrLn "Firing a long running job asynchronously."
j <- asyncCall' scr $ longRunningJob s "doomsday"
putStrLn "Running monitoring loop."
let loop = do status <- getAsyncStatus' j
putStrLn $ "- Status: " ++ show status
done <- tryGetCallResult' j
case done of
Just r -> putStrLn $ "- Returned: " ++ r
Nothing -> do putStrLn "- Not done."
threadDelay 500000
loop
loop
putStrLn "Sending an asynchronous job to a method on our server."
callbackTo' "tcp://localhost:5556/callbackHandler" $ ping' s
putStrLn "Done, you should see a pong soon."
putStrLn "Sending an asynchronous faulty job to our server."
callbackTo' "tcp://localhost:5556/ignoredMethodName" $ faulty' s
putStrLn "Done, you should see an error logged soon."
threadDelay 500000
putStr "Running bad method in ErrorT\n- "
r <- runErrorT $ faulty s
print r
putStr "Calling non-existent server in ErrorT\n- "
s' <- connection ctx "tcp://localhost:5557"
r <- runErrorT $ faulty s'
print r
putStr "Calling with wrong type in ErrorT\n- "
r <- runErrorT $ badCallType s "error!"
print r
putStr "Calling with wrong return type in ErrorT\n- "
r <- runErrorT $ badReturnType s
print r
putStrLn "Done."
module Main where
import Control.Monad
import Data.Monoid
import Data.IORef
import Control.Concurrent
import Text.Printf
import Control.Monad.IO.Class
import Network.TTRPC
import System.Log.Logging
ping :: RPC () String
ping = return "pong"
adder :: Int -> Int -> RPC () Int
adder x y = return (x + y)
lateGreeter :: String -> Noreply (IO ())
lateGreeter s = return $ do
threadDelay 5000000
putStrLn $ "Hello, " ++ s
longRunningJob :: String -> RPC String String
longRunningJob mission = do
updateStatus $ printf "Firing up rocket for mission %s." mission
liftIO $ threadDelay 1000000
updateStatus "Callibrating thrust vectors."
liftIO $ threadDelay 1000000
updateStatus "Launching doomsday device."
liftIO $ threadDelay 2000000
updateStatus "Sending confirmation signal."
liftIO $ threadDelay 500000
return $ printf "Mission %s successfull." mission
faulty :: RPC () ()
faulty = error "The world no longer exists."
allocateLimitedResource :: IORef Int -> Int -> RPC String Bool
allocateLimitedResource ref howMuch = do
liftBase $ threadDelay 10000000
r <- liftBase $ atomicModifyIORef' ref (\n -> (n + howMuch, n + howMuch))
updateStatus ("Allocated " ++ show howMuch ++ "; " ++ show r ++ " units in use.")
installCallbackErrorHandler $ do
putStrLn "There was an error during the callback sequence, running error handler"
atomicModifyIORef' ref (\n -> (n - howMuch, ()))
return True
deallocateLimitedResource :: IORef Int -> Int -> RPC String Bool
deallocateLimitedResource ref howMuch = do
liftBase $ threadDelay 1000000
r <- liftBase $ atomicModifyIORef' ref (\n -> (n - howMuch, n - howMuch))
updateStatus ("Deallocated "++ show howMuch ++ "; " ++ show r ++ " units in use.")
return True
serverMethods :: IORef Int -> Methods
serverMethods ref = serve "ping" ping
<> serve "adder" adder
<> serve "lateGreeter" lateGreeter
<> serve "longRunningJob" longRunningJob
<> serve "faulty" faulty
<> serve "allocate" (allocateLimitedResource ref)
<> serve "deallocate" (deallocateLimitedResource ref)
main :: IO ()
main = do
ref <- newIORef 0
void . forkIO . forever $ do
threadDelay 1000000
readIORef ref >>= print
withLogging "server-test.log" $ server Nothing Nothing Nothing 5 "tcp://127.0.0.1:5555" (serverMethods ref)
module Network.TTRPC
( module Network.TTRPC.Client
, module Network.TTRPC.Server
) where
import Network.TTRPC.Client
import Network.TTRPC.Server
{-# LANGUAGE CPP #-}
{-# LANGUAGE ExistentialQuantification #-}
{-# LANGUAGE FlexibleContexts #-}
{-# LANGUAGE FlexibleInstances #-}
{-# LANGUAGE MultiParamTypeClasses #-}
{-# LANGUAGE OverloadedStrings #-}
{-# LANGUAGE PackageImports #-}
{-# LANGUAGE RecordWildCards #-}
{-# LANGUAGE ScopedTypeVariables #-}
{-# LANGUAGE TypeFamilies #-}
{-# LANGUAGE DeriveGeneric #-}
{-# LANGUAGE StandaloneDeriving #-}
module Network.TTRPC.Client
(
-- * Common for client and server
module Network.TTRPC.Common
-- * Creating a method
, Callable
, Method
, method
-- * Creating a connection to a server
, Conn(..)
, connection
-- ** ZMQ re-exports
, Z.withContext, Z.context
-- * Synchronous calls
, call
, call'
-- * Asynchronous calls
, AsyncCall(..)
, asyncCall
, asyncCall'
, callbackTo
, callbackTo'
, getCallResult
, getCallResult'
, tryGetCallResult
, tryGetCallResult'
, getStatusById
, getStatusById'
, getAsyncStatus
, getAsyncStatus'
, storeCallbackRef
-- * YMMV
, rawCall
) where
import Control.Applicative
import Control.Concurrent.MVar
import Control.Exception
import Control.Monad.Base
import "monads-tf" Control.Monad.Error
import Data.ByteString.Lazy (toStrict)
import qualified Data.ByteString.Lazy as BL
import qualified Data.ByteString.Lazy.Char8 as BL8
import qualified Data.Map as M
import Data.MessagePack hiding (get)
import Data.Monoid
import qualified Data.UUID.V4 as UUID
import qualified System.ZMQ4 as Z
import Data.Binary
import GHC.Generics (Generic)
import qualified Data.Set as Set
import Data.Set (Set)
import Data.Foldable (for_)
import Network.TTRPC.Common
#ifndef NODEBUG
import System.Log.Logging
#endif
-- Flag to enable debug logging
#ifndef NODEBUG
debugLog = logDebug
#else
debugLog _ = return ()
#endif
{-# INLINE debugLog #-}
-- | A connection to a server.
data Conn = Conn
{ connSocket :: MVar (Maybe (Z.Socket Z.Req))
, connAddress :: String
, connCtx :: Z.Context
}
instance Eq Conn where
Conn x _ _ == Conn y _ _ = x == y
-- | Create a connection to a server from a ZMQ context and a connection
-- string.
--
-- This doesn't immediately create a connected socket. One socket is created
-- and connected on demand by the caller. Only one call can be executed at
-- a time per connection.
connection :: MonadBase IO m => Z.Context -> String -> m Conn
connection ctx addr = Conn <$> liftBase (newMVar Nothing)
<*> return addr
<*> return ctx
class Callable a where
extendCall :: Conn -> String -> [Object] -> a
instance (OBJECT a, Callable e) => Callable (a -> e) where
extendCall toServer name args arg =
extendCall toServer name (toObject arg:args)
instance OBJECT ret => Callable (ErrorT RpcError IO ret) where
extendCall toServer name args =
call (Method toServer name (reverse args))
instance OBJECT ret => Callable (IO ret) where
extendCall toServer name args =
call' (Method toServer name (reverse args))
instance (OBJECT status, OBJECT ret) => Callable (Method status ret) where
extendCall toServer name args =
Method toServer name (reverse args)
-- | Something that can be called, either synchronously or asynchronously.
data Method status ret = Method Conn String [Object]
-- | Create a method.
--
-- Examples:
--
-- > add :: Conn -> Int -> Int -> ErrorT RpcError IO Int
-- > add = method "add"
--
-- > add' :: Conn -> Int -> Int -> IO Int
-- > add' = method "add"
--
-- > doJob :: Conn -> String -> Method JobStatus JobResult
-- > doJob = method "doJob"
method :: Callable e
=> String -- ^ Method name.
-> Conn -- ^ Server socket.
-> e -- ^ Arguments and eventual return type.
method name toServer = extendCall toServer name []
-- | Synchronously call a method.
call :: (MonadBase IO m, OBJECT ret)
=> Method status ret
-> ErrorT RpcError m ret
call (Method conn name args) =
fromReplyM =<< rawCall (ObjectArray ([ toObject ("call" :: String)
, toObject name
] ++ args))
conn
-- | Synchronously call a method, throwing exceptions on errors.
call' :: (MonadBase IO m, OBJECT ret) => Method status ret -> m ret
call' = deError . call
deError :: Monad m => ErrorT RpcError m ret -> m ret
deError m = do
x <- runErrorT m
case x of
Left e -> error (show e)
Right o -> return o
data AsyncCall status ret = AsyncCall
{ asyncCallId :: !String -- ^ XXX: As far as I can tell this should be
-- of type UUID. It certainly is a UUID string.
, asyncCallConn :: !Conn
, asyncCallMVar :: !(MVar (Either RpcError Object))
}
storeCallbackRef :: ServerCallbacksRef -> AsyncCall status ret -> IO ()
storeCallbackRef scr AsyncCall{..} = do
mv' <- mkWeakMVar asyncCallMVar finalizer
modifyMVar_ (scrMap scr) $ \m -> do
debugLog $ "inserting " ++ asyncCallId ++ " in server callbacks"
return $ M.insert asyncCallId mv' m
where finalizer =
modifyMVar_ (scrMap scr) $ \m -> do
debugLog $ "garbage collecting and deleting " ++ asyncCallId ++ " from server callbacks"
return $ M.delete asyncCallId m
-- | Call a server asynchronously.
asyncCall :: MonadBase IO m
=> ServerCallbacksRef
-> Method status ret
-> ErrorT RpcError m (AsyncCall status ret)
asyncCall ref (Method conn name args) = do
id' <- show <$> liftBase UUID.nextRandom
mv <- liftBase newEmptyMVar
let acall = AsyncCall id' conn mv
liftBase $ storeCallbackRef ref acall
r <- rawCall (ObjectArray ([ ObjectRAW "callback"
, toObject id'
, toObject $ scrUrl ref
, ObjectRAW "call"
, toObject name
] ++ args))
conn
case r of
Nothing ->
throwError TimeoutError
Just (ObjectArray (ObjectRAW "noreply":[])) ->
return acall
Just r'@(ObjectArray (ObjectRAW "error":_)) ->
throwError $ fromObject r'
Just r' ->
throwError . ProtocolError 6 $ "invalid async reply, " ++ show r'
-- | 'asyncCall', throwing errors on failures.
asyncCall' :: MonadBase IO m
=> ServerCallbacksRef
-> Method status ret
-> m (AsyncCall status ret)
asyncCall' = (deError .) . asyncCall
-- | Sending the reply to a different server.
callbackTo :: MonadBase IO m
=> String -- ^ The server to send to, \"tcp:\/\/name:port\/method\".
-> Method status ret
-> ErrorT RpcError m String
-- ^ The ticket ID of the request, e.g. for checking the status
-- with 'getStatusById'.
callbackTo url (Method conn name args) = do
id' <- show <$> liftBase UUID.nextRandom
r <- rawCall (ObjectArray ([ ObjectRAW "callback"
, toObject id'
, toObject url
, ObjectRAW "call"
, toObject name
] ++ args))
conn
case r of
Nothing ->
throwError TimeoutError
Just (ObjectArray (ObjectRAW "noreply":[])) ->
return id'
Just r'@(ObjectArray (ObjectRAW "error":_)) ->
throwError $ fromObject r'
Just r' ->
throwError . ProtocolError 6 $ "invalid async reply, " ++ show r'
-- | 'callbackTo', throwing errors on failure.
callbackTo' :: MonadBase IO m => String -> Method status ret -> m String
callbackTo' = (deError .) . callbackTo
-- | Send a raw 'Object' to the 'Conn', maybe returning an Object in reply.
--
-- Retries three times. If your call takes longer than one second to execute,
-- you're going to have a bad time.
--
-- If 'Nothing' it probably timed out.
rawCall :: MonadBase IO m
=> Object
-> Conn
-> m (Maybe Object)
rawCall out (Conn sockref addr ctx) = do
liftBase $ do
debugLog $ "client: sending "
++ toString out
++ " to: "
++ show addr
mo <- go 0
debugLog $ "client: got reply "
++ show (toString <$> mo)
++ " from: "
++ show addr
return mo
where
go :: Int -> IO (Maybe Object)
go n
| n >= 3 = return Nothing
| otherwise = do
m <- runMod `onException`
(Nothing <$ modifyMVar_ sockref (const $ return Nothing))
case m of
Nothing -> go (n+1)
Just x -> return (Just x)
runMod = modifyMVar sockref $ \msock -> do
sock <- case msock of
Nothing -> do sock <- Z.socket ctx Z.Req
Z.connect sock addr
return sock
Just s -> return s
Z.send sock [] . toStrict $ pack out
[evs] <- Z.poll 1000 [Z.Sock sock [Z.In] Nothing]
case evs of
(Z.In:_) -> do
x <- unpack <$> Z.receive sock
return (Just sock, Just x)
_ -> do Z.setLinger (Z.restrict (0::Int)) sock
Z.close sock
return (Nothing, Nothing)
-- | Get the result of the call, if any.
--
-- This will wait until the server has gotten either an error or a reply,
-- and return this. If the server gets taken down or crashes, this will
-- hang forever. See 'tryGetCallResult' and 'getAsyncStatus'.
getCallResult :: (MonadBase IO m, OBJECT ret)
=> AsyncCall status ret
-> ErrorT RpcError m ret
getCallResult acall = do
r <- liftBase . takeMVar $ asyncCallMVar acall
case r of
Left e -> throwError e
Right o -> fromReplyM (Just o)
-- | 'getCallResult', throwing errors on exceptions.
getCallResult' :: (MonadBase IO m, OBJECT ret) => AsyncCall status ret -> m ret
getCallResult' = deError . getCallResult
-- | Non-blocking 'getCallResult'.
tryGetCallResult :: (MonadBase IO m, OBJECT ret)
=> AsyncCall status ret
-> ErrorT RpcError m (Maybe ret)
tryGetCallResult acall = do
r <- liftBase . tryTakeMVar $ asyncCallMVar acall
case r of
Nothing -> return Nothing
Just (Left e) -> throwError e
Just (Right o) -> fromReplyM (Just o)
-- | 'tryGetCallResult', throwing errors on exceptions.
tryGetCallResult' :: (MonadBase IO m, OBJECT ret)
=> AsyncCall status ret
-> m (Maybe ret)
tryGetCallResult' = deError . tryGetCallResult
-- | Get the status of a job running on the server.
--
getStatusById :: (MonadBase IO m, Monoid status, OBJECT status)
=> String -- ^ The job ID to check the status of.
-> Conn
-> ErrorT RpcError m (Maybe status)
-- ^ Nothing if it's no longer on the server.
getStatusById id' conn = do
obj <- rawCall (ObjectArray [ ObjectRAW "checkStatus"
, toObject id'])
conn
case obj of
Just (ObjectArray [ObjectRAW "missing"]) ->
return Nothing
Just (ObjectArray [ObjectRAW "noreply"]) ->
return (Just mempty)
_ -> Just <$> fromReplyM obj
-- | 'getStatusById', throwing errors on exceptions.
getStatusById' :: (MonadBase IO m, Monoid status, OBJECT status)
=> String
-> Conn
-> m (Maybe status)
getStatusById' = (deError .) . getStatusById
-- | Get the status of a job running on the server by the given 'AsyncCall'
-- object. See 'getStatusById'.
getAsyncStatus :: (MonadBase IO m, Monoid status, OBJECT status)
=> AsyncCall status ret
-> ErrorT RpcError m (Maybe status)
getAsyncStatus AsyncCall {..} = getStatusById asyncCallId asyncCallConn
-- | 'getAsyncStatus', throwing errors on exceptions.
getAsyncStatus' :: (MonadBase IO m, Monoid status, OBJECT status)
=> AsyncCall status ret
-> m (Maybe status)
getAsyncStatus' = deError . getAsyncStatus
fromReplyM :: (Functor m, Monad m, OBJECT ret)
=> Maybe Object
-> ErrorT RpcError m ret
fromReplyM o =
case o of
Nothing ->