Предупреждение
Требуется некоторое знание Хаскеля. На вопросы - отвечу, либо отошлю к документации.
Для меня важнее показать, как можно строить что-то своё, чем познакомить с обширной стандартной библиотекой. Особенно в свете того, что она имеет тенденцию меняться со временем. Поэтому я не стесняюсь переопределять те или иные функции из стандартной библиотеки или изобретать велосипеды в виде похожих на стандартные типов данных.
Я считаю, что легкость создания своих инструментов означает легкость применения чужих. Это, конечно, верно далеко не всегда, но обратное неверно совершенно.Предистория
Отдел в ИТМиВТ, в который я поступил на работу, занимался высокопроизводительными вычислениями. Если кто не знает, то это, в основном, линейная алгебра во всех её ипостасях - решение систем линейных уравнений, задач наименьших квадратов и на собственные значения и прочее в том же духе.
Архитектура вычислителя оказалась не очень удачной, поэтому встал вопрос, как же можно исправить положение дел. В результате нам всем пришлось изучать методы построения логических схем и как-то их проверять не только на предмет правильности работы, но и на другие свойства, например, общая задержка вычислений.
Как-то так получилось, что с вопросами проверки схем стали обращаться ко мне. Наверное, я вёл себя слишком самоуверенно в разговорах об этом вопросе, к тому же я проболтался, что для Хаскеля есть несколько библиотек для моделирования схем. Но это уже неважно, главное, что большую часть прошлого, 2006, года мне пришлось заниматься моделированием аппаратуры, большей частью, на достаточно высоком уровне. Об этом и пойдёт рассказ ниже.Как работает процессор
И не только процессор.
Вот, как происходит сложение чисел с плавающей точкой:
- CMP: Сперва мы сравниваем экспоненты чисел
- SHR:Затем мантиссу числа с меньшей экспонентой сдвигаем вправо
- ADD:Складываем обе мантиссы
- NRM:Нормальизуем мантиссы
Итого - 4 этапа. Каждый из них достаточно длителен.
Если мы будем пытаться выполнять все их за один такт процессора, то это будет очень неэффективно. Процессор же выполняет и гораздо более короткие операции, например, целочисленное сложение и вычитание, и они встречаются не менее часто. Тогда применяют приём под названием конвейеризация - каждый этап назначают одному устройству, выполняющему простую операцию за один такт и устройства объединяют в цепочку. Получается вот, что:
CMP: 1 2 - 3 4 - 5 (прочерк - это простой устройства, когда нет операндов)
SHR: 1 2 - 3 4 - 5
ADD: 1 2 - 3 4 - 5
NRM: 1 2 - 3 4 - 5
Результат: 1 2 - 3 4 - 5
Такты: 1 2 3 4 5 6 7 8 9 10 11За 11 тактов мы получили 5 результатов, а могли бы получить и все 7. Средняя загрузка устройства составила 5/7 - из-за двух "пузырей"-простоев. Такты у нас короткие, поэтому результаты мы можем получать с высокой частотой. Главное - не допускать пузырей, поддерживать загрузку устройства.
По идее, мы можем отдельные сложные этапы порезать ещё сильнее, например, поделить на два этапа CMP, SHR и ADD. Тогда конвейер станет семистадийным и его частоту можно увеличить практически вдвое. К сожалению, это приводит к большей вероятности появления пузырей на входе и большему влиянию задержки в конвейере на выполнение всей группы операций целиком. Выяснить наилучший вариант реализации той или иной схемы можно только путем моделирования на реальных задачах, если, конечно, вы не
Ричард Фейнман.Разные точки зрения
На отдельный блок компьютера можно смотреть с разных точек зрения. Для примера снова возьмем сложение двух чисел с плавающей запятой.
Для обычного программиста главное, чтобы он работал в соответствии со стандартом
IEEE-754. Это самый простой взгляд на вещи и с точки зрения проектирования системы наименее интересный.
Для программиста компилятора уже становится важным, сколько тактов процессора занимает операция сложения с ПТ и на какие устройства она влияет. Ему приходится рассчитывать расположение команд, а соответствие стандарту отходит на очень далёкий план.
Для инженера-схемотехника важно и соответствие стандарту, и количество тактов процессора на операцию, и взаимодействие с другими устройствами.
Для инженера-схемотехника транзисторного уровня снова неважен стандарт, ему важно, чтобы чего не напортачить в переданной ему на оптимизацию схеме устройства.
Описание устройства сложения для двух последних инженеров очень многословное и занимает очень много времени и сил. Точка зрения обоих программистов слишком поверхностна в разной степени. Для правильного проектирования системы в целом нам необходимо не вдаваться в сколько-нибудь тонкие детали (не рассматривать схему подробно), но учитывать её основные параметры.
Поэтому высокоуровневое моделирование рассматривает систему с положения между программистом компилятора и инженером схемотехником. Оно знает, как работает участок системы, но реализует его неприменимым к разработке схемы образом (например, оно может считать буфер бесконечным - для выяснения его загрузки).Библиотеки моделирования
Всего их три штуки:
Hawk,
Lava и
Hydra.
Только Hawk может быть использован для высокоуровневого моделирования. Lava и Hydra предназначены для описания именно схем отдельных устройств. Однако, Hawk оказался достаточно устаревшим и немного сильно сложным, поэтому пришлось написать своё.
Вот оно:
module S where
data S a = S a (S a)
toList (S a s) = a:toList s
instance Show a => Show (S a) where
show s = (++"...") $ tail $ init $ show $ take 20 $ toList s
constS c = let s = S c s in s
delayS x s = S x s
fromListS c [] = constS c
fromListS c (x:xs) = delayS x $ fromListS c xs
mapS f ~(S x xs) = S (f x) $ mapS f xs
unzipS s = (mapS fst s,mapS snd s)
unzipS3 abcs = (mapS (\(a,b,c) -> a) abcs,mapS (\(a,b,c) -> b) abcs, mapS (\(a,b,c) -> c) abcs)
zipS ~(S a as) ~(S b bs) = S (a,b) $ zipS as bs
zipWithS f ~(S a as) ~(S b bs) = S (f a b) $ zipWithS f as bs
loopS :: st -> (st -> i -> (st,o)) -> S i -> S o
loopS s0 f is = xs
where
(ss,xs) = unzipS $ mapS (uncurry f) $ zipS (delayS s0 ss) isАга, это ядро системы моделирования. loopS - это
автомат Мили в общем виде.
Пользоваться этим доастаточно просто - надо собрать управляющую функцию, сконструировать начальное состояние, а затем подставит всё это в loopS.
zipS и unzipS используются для коммутации потоков.Первый пример
Снова это будет сложение с плавающей точкой, но на этот раз в составе схемы для вычисления скалярного произведения. Это часто встречающаяся операция при умножении матриц (и не только). Вот её код на языке Си:
float dotp(float*a,float*b,int binc,int len) {
float sum = 0;
int i;
for(i=0;iУмножение элементов строки и столбца порождает поток аргументов для накапливающей суммы. Сумму с накоплением для конвейеризованного устройства можно реализовать с помощью одного регистра (идея схемы Влада Балина,
gaperton):
+-----+
| R |<-+-------------+
+-----+ | |
/\ | \/ +--------+ |
| +------>| |-+--> O
I ---+-------->| |
+--------+R - дополнительный регистр, - схема суммирования, I - вход (сюда поступают данные от умножителя), O - выход, он завёрнут на второй вход сумматора и на вход регистра.
Правила работы схемы:
- Если на входе I и выходе O есть значения, то посылаем на суммирование пару (I,O).
- Если на входе I или на выходе O есть данные и в регистре R присутствуют данные, то посылаем на суммирование пару (I или O,R)
- Если на входе I или на выходе O есть данные и в регистре R нет данных, то на вход сумматора не подаётся ничего, в регистр R записывается данное с одного из входов.
- Схема простаивает.
Вот её реализация:
module SVB where -- Сумматор Влада Балина
import S
-- Полезная функция (она есть в библиотечном модуле Data.Maybe, но для
-- наглядности помещена здесь).
isJust (Just _) = True
isJust _ = False
-------------------------------------------------------------------------------
-- Блок управления устройством (диспетчеризация данных).
singlesumcontrolf r (i,o) = (r',(hasdata,addop))
where
(r',addop) = case (i,o,r) of
-- rule 1 - combine inputs.
(Just ((),x),Just ((),y),r) -> (r,Just ((),x,y))
-- rule 1 - combine one of inputs and register
(Just ((),x),_ ,Just ((),y))
-> (Nothing,Just ((),x,y))
(_ ,Just ((),x),Just ((),y))
-> (Nothing,Just ((),x,y))
-- rule 2 - remember one of inputs
(Just x,_ ,_) -> (Just x,Nothing)
(_ ,Just x,_) -> (Just x,Nothing)
-- rule 3 - bubble
(_,_,r) -> (r,Nothing)
hasdata = isJust i || isJust o
singlesumcontrol is os = loopS Nothing singlesumcontrolf (zipS is os)
-------------------------------------------------------------------------------
-- Конвейеризованная сумма.
pipelinedsumf pipeline input = (pipeline',(r,load))
where
sum = case input of
Just (tag,x,y) -> Just (tag,x+y)
Nothing -> Nothing
r = last pipeline
pipeline' = sum:init pipeline
load = length $ filter isJust pipeline'
pipelinedsum pipelinelen is =
loopS (replicate pipelinelen Nothing) pipelinedsumf is
-------------------------------------------------------------------------------
-- Потоковое конвейеризованное суммирование с параметром - длиной конвейера.
singlesum pipelinelen is = zipS sumresults $ zipS load hasdata
where
(hasdata,suminputs) = unzipS $
singlesumcontrol is sumresults
(sumresults,load) = unzipS $
pipelinedsum pipelinelen suminputs
-- Проверка работы с разными параметрами.
runsinglesum pipelinelen portionlen =
length $
takeWhile notstopped $
toList $
singlesum pipelinelen $
fromListS Nothing $ replicate portionlen $ Just ((),1)
where
notstopped (_,(load,hasdata)) = hasdata || load /= 0
Запустив ghci SVB.hs можно погонять runsinglesum с разными параметрами. Задержка подсчёта суммы порции растёт линейно в зависимости от длины конвейеры (pipelinelen).
Можно построить сумматор для нескольких входных потоков сразу.
Мы расширим запоминающее устройство до 2*количество_потоков ячеек памяти, снабдим входные данные тэгом-номером потока и усложним управление:
- На вход пришло сразу два данных. Если они из одного потока, то мы отправляем их на суммирование. Если они из разных потоков, то мы смотрим, есть ли для какого-либо из них пара в хранилище. Если есть, то отправляем пару на суммирование, соответствующее данное - на хранение. Данное из другого потока добавляем к уже хранившимся
- Если на вход пришло только одно данное, то проверяем наличие пары только для него.
- Если данных на входе нет, то пытаемся отыскать пару и отправить на суммирование.
Это не самая оптимальная схема, здесь можно обойтись всего (количество_потоков) ячеек памяти, но она сложнее.
-- Продолжение SVB.hs
-------------------------------------------------------------------------------
-- Блок управления устройством для нескольких входных потоков.
---------------------------------------
-- вспомогательные функции.
-- Заменить n-th элемент списка на заданный.
-- changen 2 (+30) [1,2,3,4,5] -> [1,2,33,4,5]
-- changen 10 (const 10) [1,2,3,4,5] -> ошибка.
changen 0 f (x:xs) = f x:xs
changen n f (x:xs) = x:changen (n-1) f xs
changen _ _ _ = error "Trying to change size of list with changen."
-- В один из регистров хранилища мы помещаем x, другой дополняем y.
updatepair storetag x btag y store =
changen storetag (const [x]) $
changen btag (y:) store
-- Один из регистров мы очищаем, другой дополняем.
updateclear cleartag btag y store =
changen cleartag (const []) $
changen btag (y:) store
-- Один из регистров просто очищаем.
clearreg tag store = changen tag (const []) store
-- Просто дополняем один из регистров хранилища.
appendreg tag x store = changen tag (x:) store
-- Добавляем к двум регистрам сразу.
appendregs atag x btag y store = appendreg atag x $ appendreg btag y store
-- Сохраняем значения в регистр.
storereg tag x store = changen tag (const [x]) store
multisumcontrolf store (i,o) = (store',(hasdata,addop))
where
hasdata = isJust i || isJust o
findanypair acc n (x:xs) = case x of
[u,v] -> (reverse acc++[]:xs,Just (n,u,v))
x -> findanypair (x:acc) (n+1) xs
findanypair acc _ [] = (reverse acc,Nothing)
(store',addop) = case (i,o) of
(Just (atag,x),Just (btag,y))
| atag == btag -> (store,Just (atag,x,y))
| otherwise -> case (store !! atag,store !! btag) of
([u,v],_) -> (updatepair atag x btag y store,Just (atag,u,v))
(_,[u,v]) -> (updatepair btag y atag x store,Just (btag,u,v))
([u],_) -> (updateclear atag btag y store,Just (atag,u,x))
(_,[v]) -> (updateclear btag atag x store,Just (atag,v,y))
_ -> (appendregs atag x btag y store,Nothing)
(Just (atag,x),_) -> singleinput atag x
(_,Just (btag,y)) -> singleinput btag y
_ -> findanypair [] 0 store
singleinput tag x = case store !! tag of
[u,v] -> (storereg tag x store,Just (tag,u,v))
[u] -> (clearreg tag store,Just (tag,u,x))
[] -> (appendreg tag x store,Nothing)
multisumcontrol nchannels is os =
loopS (replicate nchannels []) multisumcontrolf (zipS is os)
-------------------------------------------------------------------------------
-- Многопоточный конвейеризованный сумматор.
multisum nchannels pipelinelen is = zipS sumresults $ zipS load hasdata
where
(hasdata,suminputs) = unzipS $
multisumcontrol nchannels is sumresults
(sumresults,load) = unzipS $
pipelinedsum pipelinelen suminputs
-- Функции комбинирования входных потоков данных.
combineseq a = concat a -- один поток идёт следом за другим.
-- сперва идут все первые элементы из всех потоков, затем все вторые и тп.
combineinterleave a = concat $ transpose a
where
transpose ([]:_) = []
transpose ls = map head ls:transpose (map tail ls)
-- Проверка работы с разными параметрами.
runmultisum nchannels pipelinelen combinefunc portionlen =
length $
takeWhile notstopped $
toList $
multisum nchannels pipelinelen $
fromListS Nothing $ combinefunc portions
where
notstopped (_,(load,hasdata)) = hasdata || load /= 0
portions = map (\x -> replicate portionlen $ Just (x,x*10+1)) [0..nchannels-1]Снова можно погонять код на предмет изучения работы схемы. На этот раз можно комбинировать уже три параметра: количество каналов, способ комбинирования и длину конвейера. Да, и длину порции, сам собой.
Для любых двух других параметров combineinterleave дает выигрыш по сравнению с другим вариантом подачи элементов. Возможно, есть что-то ещё?