Отказоустойчивость.

May 30, 2008 16:10

Как мы знаем, в основе отказоустойчивости эрланговских приложений лежит способность потерпевшего бедствие процесса сообщить всем связанным с ним о причине своего крушения. На предложенном ранее хаскелевском дизайне межпроцессорной коммуникации тоже возможно подобное. Рассмотрим один из вариантов.

Сначало надо отметить, что у хаскель-процессов оформленных таким образом, после того, как сам процесс завершился, он не перестает быть доступным. Т. е. мы и дальше можем посылать ему сообщения и можем построить такую функцию-примитив, которая бы запускала новый процесс с очередью от старого, не потеряв при этом ни одного сообщения.

В отличие от эрланговских процессов, мы будем связывать их односторонней связью: если процесс имеет тип Process x (с типом входящих сообщений x), то процесс, который получает сообщение о крахе этого процесса будет иметь тип Process (Process x). В этом процессе-обработчике причину ошибки и очередь необработанных сообщений можно будет взять из самого сообщения типа Process x. Исходя из этого, функция, привязывающая один процесс другому будет выглядеть так:
link :: Process x -> Process (Process x) -> Procedure c () -- link x xx - связывает процесс x с процессом xx таким образом, что -- при крахе процесса x ссылка на него будет передана процессу xx в виде сообщенияСоответственно обратная операция:
unLink :: Process x -> Process (Process x) -> Procedure c ()

Также процесс может привязать к другому процессу себя самого.
linkMe :: Process (Process c) -> Procedure c ()
unLinkMe :: Process (Process c) -> Procedure c ()

Связанный процесс всегда при выходе сообщит об этом процессам обработчикам и следовательно в самом процессе после его завершения должна указываться причина:
data ExitReason = ExitSelf -- завершил себя сам | ExitRemote -- был завершен другим процессом | ExitWith Exception -- завершился с исключениемЧто соответствует способам завершения процесса:
  • exit :: Procedure c () - завершение процесса без исключения (ExitSelf).
  • exitWith :: Exception -> Procedure c () - завершение процесса с нужным исключением (ExitWith исключение).
  • kill :: Process c' -> Procedure c () - завершение стороннего процесса (ExitRemote, или в случае, если он завершает сам себя - ExitSelf).
Контролирующий процесс, получая сообщения о завершении привязанных к себе процессов, может узнать причину завершения тремя способами:
  • exitReason :: Process x -> Procedure c ExitReason - бесконечно ожидает пока процесс завершится, возвращает причину завершения.
  • exitReasonDelay :: Process x -> Int -> Procedure c (Maybe ExitReason) - ожидает заданное время, пока процесс завершится. Если процесс завершился, то возвращает Just причинаЗавершения. Если процесс за это время не завершился, то возвращает Nothing.
  • exitReasonNow :: Process x -> Procedure c (Maybe ExitReason) - Пытается немедленно узнать причину заершения процесса. Если процесс завершился, то возвращает Just причинаЗавершения. Если процесс в рабочем состоянии, то возвращает Nothing.
Ну и наконец, обработка исключений внутри процедуры осуществляется двумя знакомыми функциями:
  • catchProcedure :: Procedure c v -> (Exception -> Procedure c v) -> Procedure c v - выполняет процедуру из первого аргумента. Если там возникло исключение, то передает его процедуре-обработчику во втором аргументе.
  • tryProcedure :: Procedure c v -> Procedure c (Either Exception v) - если в процедуре возникло исключение, возвращает Left исключение. Если процедра отработала правильно, то возвращает Right результат.

{-#OPTIONS -fglasgow-exts#-}
module Process where

import Control.Monad.State
import Concurrent
import Control.Exception as Exception
import Data.List

-- причина завершения процесса:
data ExitReason
        = ExitSelf -- завершил себя сам
        | ExitRemote -- был завершен другим процессом
        | ExitWith Exception -- был завершен в результате исключения
                deriving Show

-- это содержит процесс
-- и это протягивается в качестве состояния в процедуре
data Base c
        = Base (Chan c) (MVar ExitReason) (MVar [Chan (Process c)])
        | Base' (Chan c) (MVar ExitReason) (MVar [Chan (Process c)]) -- когда действует catchProcedure
        -- мы оборачиваем в MVar, то что должно быть доступно и процессу и процедуре
        -- в Process всегда конструктор Base

-- ссылка на параллено запущенную процедуру
data Process c = Process {
        processThread :: ThreadId,
        processBase :: Base c
}

-- тип процедуры
-- по сути - монада состояния с состоянием Base
newtype Procedure c v = Procedure {
        runProcedure :: Base c -> IO (v, Base c)
}

instance Monad (Procedure c) where
        return v = Procedure $ \base -> return (v, base)
        p >>= f = Procedure $ \base -> do
                ~(v, base') <- runProcedure p base
                runProcedure (f v) base'
        fail s = Procedure $ \_ -> fail s

instance Functor (Procedure c) where
        fmap f p = Procedure $ \base -> do
                ~(v, base') <- runProcedure p base
                return (f v, base')

instance MonadState (Base c) (Procedure c) where
        get = Procedure $ \base -> return (base, base)
        put base = Procedure $ \_ -> return ((), base)

instance MonadIO (Procedure c) where
        liftIO io = Procedure $ inProc  where
                -- если находится внутри catch блока
                inProc base@(Base' _ _ _) = do
                        r <- io
                        return (r, base)
                        -- тут нельзя рассылать сообщения
                        -- и нельзя инициализировать причину выхода.

-- если находится вне catch-блока
                inProc base@(Base _ e _) = do -- выполняем io
                        r <- Exception.catch io $ \ex -> do -- а если произошло исключение, то
                                putMVar e $ ExitWith ex -- инициализируем переменную причины
                                exitSender base -- рассылаем сообщения
                                throw ex -- генерируем исключение вновь
                        return (r, base)

-- разные примитивы
baseList :: Base c -> MVar [Chan (Process c)]
baseList (Base' _ _ l) = l
baseList (Base _ _ l) = l

baseExitReason :: Base c -> MVar ExitReason
baseExitReason (Base' _ e _) = e
baseExitReason (Base _ e _) = e

baseChan :: Base c -> Chan c
baseChan (Base' c _ _) = c
baseChan (Base c _ _) = c

setBase (Base' c e l) = Base c e l
setBase b = b

setBase' (Base c e l) = Base' c e l
setBase' b = b

-- отсылает сообщения связанным процессам
exitSender :: Base c -> IO ()
exitSender base = withMVar (baseList base) $ \lst -> do
        me <- myThreadId
        mapM_ (\c -> writeChan c $ Process me base) lst

-- Убийство без порождения исключения.
-- процесс, которого убивают, отсылает сообщения связанным процессам
kill :: Process c' -> Procedure c ()
kill (Process thread base@(Base _ e _)) = liftIO $ do
        me <- myThreadId
        putMVar e $ if me == thread then ExitSelf else ExitRemote
                -- если процесс уничтожает себя, то прична ExitSelf
                -- если процесс уничтожает другой процесс, то причина ExitRemote
        exitSender base -- сообщает всем процессам о своем завершении
        killThread thread -- уничтожает процесс

-- для exit и exitWith не важно, работают они внутри catchProcedure или нет
-- все равно они рассылают сообщения и инициализируют переменную причины

-- Выход из процедуры без исключения
exit :: Procedure c ()
exit = Procedure $ \base -> do
        let e = baseExitReason base
        putMVar e ExitSelf -- инициализировать причину ExitSelf
        exitSender base -- сообщить процессам, что процедура завершается
        me <- myThreadId
        killThread me -- убить себя
        return ((), base) -- об стену

-- выход из процедуры с порождением исключения
exitWith :: Exception -> Procedure c ()
exitWith ex = Procedure $ \base -> do
        let e = baseExitReason base
        putMVar e $ ExitWith ex -- инициализирует причину исключением
        exitSender base -- рассылает процессам сообщения
        me <- myThreadId
        killThread me -- убивает себя
        return ((), base) -- об стену

-- Эти функции наблюдают за процессом:
-- ожидает пока указанный процесс завершится и возвращает причину его завершения:
exitReason :: Process x -> Procedure c ExitReason
exitReason (Process _ (Base _ e _)) = liftIO $ readMVar e

-- ожидает заданное время пока процесс завершится
-- Если процесс завершился возвращает Just причинаЗавершения
-- если процесс не завершился, возвращает Nothing
exitReasonDelay :: Process x -> Int -> Procedure c (Maybe ExitReason)
exitReasonDelay (Process _ (Base _ e _)) = liftIO . readMVarDelay e

-- пытается немедленно узнать причину завершения процесса
-- Если процесс завершился возвращает Just причинаЗавершения
-- если процесс не завершился, возвращает Nothing
exitReasonNow :: Process x -> Procedure c (Maybe ExitReason)
exitReasonNow (Process _ (Base _ e _)) = liftIO $ readMVarNow e
--------

runProc :: Procedure c v -> IO (v, Base c)
runProc p = do
        c <- newChan
        e <- newEmptyMVar
        l <- newMVar []
        runProcedure p $ Base c e l

evalProcedure :: Procedure c v -> Base c -> IO v
evalProcedure p base = do
        ~(v, _) <- runProcedure p base
        return v

-- для запуска процедуры из IO-действия
evalProc :: Procedure c v -> IO v
evalProc p = do
        ~(v, _) <- runProc p
        return v

-- запускают процедуру параллено, возвращая ссылку на процесс.
spawn, spawnOS :: Procedure c' () -> Procedure c (Process c')
spawn p = do
        c <- liftIO newChan
        e <- liftIO newEmptyMVar
        l <- liftIO $ newMVar []
        thread <- liftIO $ forkIO $ evalProcedure p $ Base c e l       
        return $ Process thread $ Base c e l

-- для тяжелых процессов
spawnOS p = do
        c <- liftIO newChan
        e <- liftIO newEmptyMVar
        l <- liftIO $ newMVar []
        thread <- liftIO $ forkOS $ evalProcedure p $ Base c e l
        return $ Process thread $ Base c e l

-- возвращает ссылку на себя
self :: Procedure c (Process c)
self = do
        base <- get
        thread <- liftIO myThreadId
        return $ Process thread $ setBase base

--  пауза
delay :: Int -> Procedure c ()
delay = liftIO . threadDelay

-- Messages
recv :: Procedure c c
recv = do
        base <- get
        liftIO $ readChan $ baseChan base

recvDelay :: Int -> Procedure c (Maybe c)
recvDelay n = do
        base <- get
        liftIO $ readChanDelay (baseChan base) n

recvNow :: Procedure c (Maybe c)
recvNow = do
        base <- get
        liftIO $ readChanNow $ baseChan base

-- отправка сообщений стороннему процессу
send, send' :: Process c' -> c' -> Procedure c ()
send (Process _ (Base c _ _)) = liftIO . writeChan c
send' (Process _ (Base c _ _)) = liftIO . unGetChan c

-- отправка сообщений себе
sendMe, sendMe' :: c -> Procedure c ()
sendMe msg = do
        base <- get
        liftIO $ writeChan (baseChan base) msg
sendMe' msg = do
        base <- get
        liftIO $ unGetChan (baseChan base) msg

-- cвязывание
link :: Process x -> Process (Process x) -> Procedure c ()
link (Process _ (Base _ _ l)) (Process _ (Base c _ _))
        = liftIO $ modifyMVar_ l $ \l' -> return $ if c `elem` l' then l' else c: l'

unLink :: Process x -> Process (Process x) -> Procedure c ()
unLink (Process _ (Base _ _ l)) (Process _ (Base c _ _))
        = liftIO $ modifyMVar_ l $ \l' -> return $ delete c l'

linkMe :: Process (Process c) -> Procedure c ()
linkMe p = do
        me <- self
        link me p

unLinkMe :: Process (Process c) -> Procedure c ()
unLinkMe p = do
        me <- self
        unLink me p

-- обработка исключения внутри процедуры
catchProcedure :: Procedure c v -> (Exception -> Procedure c v) -> Procedure c v
catchProcedure p pEx = Procedure $ \base -> do
                result <- Exception.catch (evalProcedure p $ setBase' base) $ \ex -> evalProcedure (pEx ex) base
                return (result, base)

tryProcedure :: Procedure c v -> Procedure c (Either Exception v)
tryProcedure a = catchProcedure (Right `liftM` a) (return . Left)Syhi-подсветка кода

Как уже подметили тип Procedure c v релизован как монада состояния (является экземпляром MonadState). Поскольку он же является экземляром MonadIO, функцию proc я упразнил, а вместо неё нужно применять liftIO. К функциям приема сообщений добавилась recvNow :: Procedure c (Maybe c). К функциям отправки прибавились send', sendMe, sendMe'. Не сложно догадаться что они делают.

В модуле используется Concurrent.hs в чистом виде. В способе migmit разобрался и думаю его использование нормальным прядком вещей (принимая во внимание, что мы отвечаем на изврат библиотеки извратом исходника). И вообще жопа - тоже нужный орган.

Пример использования в следующем посте...

отказоустойчивость, concurrent, haskell, обработка ошибок

Previous post Next post
Up