Как мы знаем, в основе отказоустойчивости эрланговских приложений лежит способность потерпевшего бедствие процесса сообщить всем связанным с ним о причине своего крушения. На
предложенном ранее хаскелевском дизайне межпроцессорной коммуникации тоже возможно подобное. Рассмотрим один из вариантов.
Сначало надо отметить, что у хаскель-процессов оформленных таким образом, после того, как сам процесс завершился, он не перестает быть доступным. Т. е. мы и дальше можем посылать ему сообщения и можем построить такую функцию-примитив, которая бы запускала новый процесс с очередью от старого, не потеряв при этом ни одного сообщения.
В отличие от эрланговских процессов, мы будем связывать их односторонней связью: если процесс имеет тип Process x (с типом входящих сообщений x), то процесс, который получает сообщение о крахе этого процесса будет иметь тип Process (Process x). В этом процессе-обработчике причину ошибки и очередь необработанных сообщений можно будет взять из самого сообщения типа Process x. Исходя из этого, функция, привязывающая один процесс другому будет выглядеть так:
link :: Process x -> Process (Process x) -> Procedure c ()
-- link x xx - связывает процесс x с процессом xx таким образом, что
-- при крахе процесса x ссылка на него будет передана процессу xx в виде сообщенияСоответственно обратная операция:
unLink :: Process x -> Process (Process x) -> Procedure c ()
Также процесс может привязать к другому процессу себя самого.
linkMe :: Process (Process c) -> Procedure c ()
unLinkMe :: Process (Process c) -> Procedure c ()
Связанный процесс всегда при выходе сообщит об этом процессам обработчикам и следовательно в самом процессе после его завершения должна указываться причина:
data ExitReason
= ExitSelf -- завершил себя сам
| ExitRemote -- был завершен другим процессом
| ExitWith Exception -- завершился с исключениемЧто соответствует способам завершения процесса:
- exit :: Procedure c () - завершение процесса без исключения (ExitSelf).
- exitWith :: Exception -> Procedure c () - завершение процесса с нужным исключением (ExitWith исключение).
- kill :: Process c' -> Procedure c () - завершение стороннего процесса (ExitRemote, или в случае, если он завершает сам себя - ExitSelf).
Контролирующий процесс, получая сообщения о завершении привязанных к себе процессов, может узнать причину завершения тремя способами:
- exitReason :: Process x -> Procedure c ExitReason - бесконечно ожидает пока процесс завершится, возвращает причину завершения.
- exitReasonDelay :: Process x -> Int -> Procedure c (Maybe ExitReason) - ожидает заданное время, пока процесс завершится. Если процесс завершился, то возвращает Just причинаЗавершения. Если процесс за это время не завершился, то возвращает Nothing.
- exitReasonNow :: Process x -> Procedure c (Maybe ExitReason) - Пытается немедленно узнать причину заершения процесса. Если процесс завершился, то возвращает Just причинаЗавершения. Если процесс в рабочем состоянии, то возвращает Nothing.
Ну и наконец, обработка исключений внутри процедуры осуществляется двумя знакомыми функциями:
- catchProcedure :: Procedure c v -> (Exception -> Procedure c v) -> Procedure c v - выполняет процедуру из первого аргумента. Если там возникло исключение, то передает его процедуре-обработчику во втором аргументе.
- tryProcedure :: Procedure c v -> Procedure c (Either Exception v) - если в процедуре возникло исключение, возвращает Left исключение. Если процедра отработала правильно, то возвращает Right результат.
{-#OPTIONS -fglasgow-exts#-}
module Process where
import Control.Monad.State
import Concurrent
import Control.Exception as Exception
import Data.List
-- причина завершения процесса:
data ExitReason
= ExitSelf -- завершил себя сам
| ExitRemote -- был завершен другим процессом
| ExitWith Exception -- был завершен в результате исключения
deriving Show
-- это содержит процесс
-- и это протягивается в качестве состояния в процедуре
data Base c
= Base (Chan c) (MVar ExitReason) (MVar [Chan (Process c)])
| Base' (Chan c) (MVar ExitReason) (MVar [Chan (Process c)]) -- когда действует catchProcedure
-- мы оборачиваем в MVar, то что должно быть доступно и процессу и процедуре
-- в Process всегда конструктор Base
-- ссылка на параллено запущенную процедуру
data Process c = Process {
processThread :: ThreadId,
processBase :: Base c
}
-- тип процедуры
-- по сути - монада состояния с состоянием Base
newtype Procedure c v = Procedure {
runProcedure :: Base c -> IO (v, Base c)
}
instance Monad (Procedure c) where
return v = Procedure $ \base -> return (v, base)
p >>= f = Procedure $ \base -> do
~(v, base') <- runProcedure p base
runProcedure (f v) base'
fail s = Procedure $ \_ -> fail s
instance Functor (Procedure c) where
fmap f p = Procedure $ \base -> do
~(v, base') <- runProcedure p base
return (f v, base')
instance MonadState (Base c) (Procedure c) where
get = Procedure $ \base -> return (base, base)
put base = Procedure $ \_ -> return ((), base)
instance MonadIO (Procedure c) where
liftIO io = Procedure $ inProc where
-- если находится внутри catch блока
inProc base@(Base' _ _ _) = do
r <- io
return (r, base)
-- тут нельзя рассылать сообщения
-- и нельзя инициализировать причину выхода.
-- если находится вне catch-блока
inProc base@(Base _ e _) = do -- выполняем io
r <- Exception.catch io $ \ex -> do -- а если произошло исключение, то
putMVar e $ ExitWith ex -- инициализируем переменную причины
exitSender base -- рассылаем сообщения
throw ex -- генерируем исключение вновь
return (r, base)
-- разные примитивы
baseList :: Base c -> MVar [Chan (Process c)]
baseList (Base' _ _ l) = l
baseList (Base _ _ l) = l
baseExitReason :: Base c -> MVar ExitReason
baseExitReason (Base' _ e _) = e
baseExitReason (Base _ e _) = e
baseChan :: Base c -> Chan c
baseChan (Base' c _ _) = c
baseChan (Base c _ _) = c
setBase (Base' c e l) = Base c e l
setBase b = b
setBase' (Base c e l) = Base' c e l
setBase' b = b
-- отсылает сообщения связанным процессам
exitSender :: Base c -> IO ()
exitSender base = withMVar (baseList base) $ \lst -> do
me <- myThreadId
mapM_ (\c -> writeChan c $ Process me base) lst
-- Убийство без порождения исключения.
-- процесс, которого убивают, отсылает сообщения связанным процессам
kill :: Process c' -> Procedure c ()
kill (Process thread base@(Base _ e _)) = liftIO $ do
me <- myThreadId
putMVar e $ if me == thread then ExitSelf else ExitRemote
-- если процесс уничтожает себя, то прична ExitSelf
-- если процесс уничтожает другой процесс, то причина ExitRemote
exitSender base -- сообщает всем процессам о своем завершении
killThread thread -- уничтожает процесс
-- для exit и exitWith не важно, работают они внутри catchProcedure или нет
-- все равно они рассылают сообщения и инициализируют переменную причины
-- Выход из процедуры без исключения
exit :: Procedure c ()
exit = Procedure $ \base -> do
let e = baseExitReason base
putMVar e ExitSelf -- инициализировать причину ExitSelf
exitSender base -- сообщить процессам, что процедура завершается
me <- myThreadId
killThread me -- убить себя
return ((), base) -- об стену
-- выход из процедуры с порождением исключения
exitWith :: Exception -> Procedure c ()
exitWith ex = Procedure $ \base -> do
let e = baseExitReason base
putMVar e $ ExitWith ex -- инициализирует причину исключением
exitSender base -- рассылает процессам сообщения
me <- myThreadId
killThread me -- убивает себя
return ((), base) -- об стену
-- Эти функции наблюдают за процессом:
-- ожидает пока указанный процесс завершится и возвращает причину его завершения:
exitReason :: Process x -> Procedure c ExitReason
exitReason (Process _ (Base _ e _)) = liftIO $ readMVar e
-- ожидает заданное время пока процесс завершится
-- Если процесс завершился возвращает Just причинаЗавершения
-- если процесс не завершился, возвращает Nothing
exitReasonDelay :: Process x -> Int -> Procedure c (Maybe ExitReason)
exitReasonDelay (Process _ (Base _ e _)) = liftIO . readMVarDelay e
-- пытается немедленно узнать причину завершения процесса
-- Если процесс завершился возвращает Just причинаЗавершения
-- если процесс не завершился, возвращает Nothing
exitReasonNow :: Process x -> Procedure c (Maybe ExitReason)
exitReasonNow (Process _ (Base _ e _)) = liftIO $ readMVarNow e
--------
runProc :: Procedure c v -> IO (v, Base c)
runProc p = do
c <- newChan
e <- newEmptyMVar
l <- newMVar []
runProcedure p $ Base c e l
evalProcedure :: Procedure c v -> Base c -> IO v
evalProcedure p base = do
~(v, _) <- runProcedure p base
return v
-- для запуска процедуры из IO-действия
evalProc :: Procedure c v -> IO v
evalProc p = do
~(v, _) <- runProc p
return v
-- запускают процедуру параллено, возвращая ссылку на процесс.
spawn, spawnOS :: Procedure c' () -> Procedure c (Process c')
spawn p = do
c <- liftIO newChan
e <- liftIO newEmptyMVar
l <- liftIO $ newMVar []
thread <- liftIO $ forkIO $ evalProcedure p $ Base c e l
return $ Process thread $ Base c e l
-- для тяжелых процессов
spawnOS p = do
c <- liftIO newChan
e <- liftIO newEmptyMVar
l <- liftIO $ newMVar []
thread <- liftIO $ forkOS $ evalProcedure p $ Base c e l
return $ Process thread $ Base c e l
-- возвращает ссылку на себя
self :: Procedure c (Process c)
self = do
base <- get
thread <- liftIO myThreadId
return $ Process thread $ setBase base
-- пауза
delay :: Int -> Procedure c ()
delay = liftIO . threadDelay
-- Messages
recv :: Procedure c c
recv = do
base <- get
liftIO $ readChan $ baseChan base
recvDelay :: Int -> Procedure c (Maybe c)
recvDelay n = do
base <- get
liftIO $ readChanDelay (baseChan base) n
recvNow :: Procedure c (Maybe c)
recvNow = do
base <- get
liftIO $ readChanNow $ baseChan base
-- отправка сообщений стороннему процессу
send, send' :: Process c' -> c' -> Procedure c ()
send (Process _ (Base c _ _)) = liftIO . writeChan c
send' (Process _ (Base c _ _)) = liftIO . unGetChan c
-- отправка сообщений себе
sendMe, sendMe' :: c -> Procedure c ()
sendMe msg = do
base <- get
liftIO $ writeChan (baseChan base) msg
sendMe' msg = do
base <- get
liftIO $ unGetChan (baseChan base) msg
-- cвязывание
link :: Process x -> Process (Process x) -> Procedure c ()
link (Process _ (Base _ _ l)) (Process _ (Base c _ _))
= liftIO $ modifyMVar_ l $ \l' -> return $ if c `elem` l' then l' else c: l'
unLink :: Process x -> Process (Process x) -> Procedure c ()
unLink (Process _ (Base _ _ l)) (Process _ (Base c _ _))
= liftIO $ modifyMVar_ l $ \l' -> return $ delete c l'
linkMe :: Process (Process c) -> Procedure c ()
linkMe p = do
me <- self
link me p
unLinkMe :: Process (Process c) -> Procedure c ()
unLinkMe p = do
me <- self
unLink me p
-- обработка исключения внутри процедуры
catchProcedure :: Procedure c v -> (Exception -> Procedure c v) -> Procedure c v
catchProcedure p pEx = Procedure $ \base -> do
result <- Exception.catch (evalProcedure p $ setBase' base) $ \ex -> evalProcedure (pEx ex) base
return (result, base)
tryProcedure :: Procedure c v -> Procedure c (Either Exception v)
tryProcedure a = catchProcedure (Right `liftM` a) (return . Left)
Syhi-подсветка кода Как уже подметили тип Procedure c v релизован как монада состояния (является экземпляром MonadState). Поскольку он же является экземляром MonadIO, функцию proc я упразнил, а вместо неё нужно применять liftIO. К функциям приема сообщений добавилась recvNow :: Procedure c (Maybe c). К функциям отправки прибавились send', sendMe, sendMe'. Не сложно догадаться что они делают.
В модуле используется
Concurrent.hs в чистом виде. В
способе migmit разобрался и думаю его использование нормальным прядком вещей (принимая во внимание, что мы отвечаем на изврат библиотеки извратом исходника). И вообще жопа - тоже нужный орган.
Пример использования в
следующем посте...