На днях в качестве эксперимента реализовал на OCaml идею оптимизирующих парсер-комбинаторов.
На одном ядре 2.33 GHz и тестовой
задачке с рсдн получилось 35 МБ в секунду, что существенно быстрее обычных парсер-комбинаторов на Окамле и даже вроде бы быстрее чем С++ / Boost.Spirit.
Но сначала небольшое введение.
1. Обычные.
Парсер-комбинаторы - это широко известная в узких кругах техника создания парсеров - функций разбора строк (или других последовательностей данных) на содержащиеся в них данные с учетом их грамматики. Например, в тестовой задачке строка содержала последовательность действительных чисел, которые нужно было прочитать и просуммировать, не пользуясь готовой функцией перевода строки в число. На менее выразительных языках задача парсинга обычно решается кодогенерацией - по описанию грамматики специальная утилита генерирует исходник парсера. Функциональные же языки позволяют построить парсер динамически, используя простейшие функции и комбинаторы для синтеза по правилам грамматики сложных парсеров из простых. Думаю, для каждого адепта функционального программирования написание своей библиотеки парсер-комбинаторов - такая же обязательная программа, как для новичка в Haskell написание туториала по монадам.
Классический вариант парсер-комбинаторов на Окамле может выглядеть так.
Парсером будет функция, которая получает на вход список неразобранных символов и в случае успеха возвращает разобранное значение и неразобранный остаток списка, а в случае неудачи возвращает признак неудачи, оставляя неразобранный список нетронутым.
Тип результата будет выглядеть так:
type 'a parse_result = Parsed of 'a * char list | Failed;;
Сначала опишем примитивный парсер, который умеет разбирать один символ, удовлетворяющий заданному предикату.
let p_pred p s =
match s with
| [] -> Failed
| h::t -> if p h then Parsed(h, t) else Failed;;Функция p_pred берет предикат p и список символов s. Если список пуст, разбор заканчивается неудачей. Если не пуст, то если символ удовлетворяет предикату, возвращаем символ и остаток списка, иначе опять неудача. Используя карринг, с помощью p_pred можно получить парсер, умеющий разбирать заданную букву или, например, произвольную цифру.
let p_char c = p_pred ((=) c);;
let isdigit c = c>='0' && c<='9';;
let p_digit = p_pred isdigit;;
Теперь появляются комбинаторы, которые из примитивных парсеров составляют более сложные. Опишем оператор последовательности (аналог ab в регулярных выражениях):
let (>>>) p1 p2 s =
match p1 s with
| Parsed(_, s2) -> p2 s2
| Failed -> Failed;;Парсер a >>> b отрабатывает успешно, если сначала успешно отработает а, а потом b. Если один из них не разобрал свою часть строки, то и вся комбинация возвращает неудачу.
Теперь оператор альтернативы (a|b в регулярных выражениях):
let (|||) p1 p2 s =
match p1 s with
| Parsed _ as ok -> ok
| Failed -> p2 s;; Парсер a ||| b отрабатывает успешно, если успешно отработал a или b.
Опциональный парсер (аналог a? в регулярных выражениях):
let p_opt defval p s =
match p s with
| Parsed _ as ok -> ok
| Failed -> Parsed(defval, s);;Если его аргумент отработал успешно, возвращается его результат, в противном случае все равно возвращается успех с заданным значением по умолчанию и исходным списком символов.
Комбинатор повторения (аналог a*):
let p_many prs s =
let rec loop st =
match prs st with
| Parsed(_, s') -> loop s'
| Failed -> Parsed((), st) in
loop s;;Применяет парсер-аргумент сколько сможет раз. Результат разбора каждой итерации выкидывается. Если результат нужен, то вот вариант с функцией накопления результата:
let p_manyf prs f v0 s =
let rec loop v st =
match prs st with
| Parsed(x, s') -> loop (f v x) s'
| Failed -> Parsed(v, st) in
loop v0 s;;Здесь v0 - начальное значение, которое изменяется функцией f на каждой успешной итерации. Например, чтобы разобрать целое положительное число, его можно использовать так:
let mkInt v x = v * 10 + int_of_char x - 48;;
let p_int = p_manyf p_digit mkInt 0;;Оператор последовательности выкидывал результат первого парсера и возвращал результат второго. Но что делать, если результат первого тоже важен? Тут можно задействовать монады. Опишем два вспомогательных парсера:
let return x s = Parsed(x, s);;
let fail s = Failed;;return x - парсер, возвращающий значение x, не трогая переданный ему список символов. fail - парсер, не разбирающий ничего. Теперь оператор монадного сочленения парсеров:
let (>>=) p1 f s =
match p1 s with
| Parsed(x, s2) -> f x s2
| Failed -> Failed;;Его правый аргумент f - не парсер, а функция, которая получает значение из успешного результата отработки парсера p1 и возвращает новый парсер, который тут же и применяется. Таким образом получается последовательное применение двух парсеров, где поведение и результат второго зависят от значения, разобранного первым. Например, разбор знакового целого числа может быть сделан так:
let p_int =
p_opt 1 (p_char '-' >>= fun _-> return (-1)) >>= fun sign ->
p_manyf p_digit mkInt 0 >>= fun n -> return (sign * n );;Если полученная на входе строка начинается на минус, то успешно отрабатывает p_char '-', его результат (символ -) заменяется на -1 и возвращается в качестве значения p_opt, а если минуса не было, то возвращается значение по умолчанию 1. Данное значение сохраняется как sign. Потом разбираются цифры, из которых получается целое число n. Результат разбора цифр умножается на sign и полученное значение считается результатом всего разбора p_int.
Данный набор комбинаторов позволяет описывать довольно сложные грамматики и одновременно действия по разбору. Например, я его успешно использовал для разбора сообщений от сервера в соревновании
Sapka'09, а также в утилите, которой раскрашен OCaml-код в этом посте.
Однако скорость у такого решения не слишком высока. Непосредственно разбор на 2.33 гигагерцовом проце проистекает со сравнительно неплохой скоростью 23 МБ/с, но это в случае когда уже есть список символов. А вот конвертирование строки в список символов происходит медленно, и если его тоже учитывать, то общая скорость падает до 7.8 МБ/с. В прошлом году я делал нечто похожее не на списках, а на Enum'aх, получалось 20 МБ/с на менее красивом демонадизированном варианте.
2. Быстрые.
Классические парсер-комбинаторы чуть менее чем полностью состоят из замыканий, вызовов функций и выделений памяти под промежуточные результаты, занимаясь при этом довольно простым делом. По сути они реализуют метод рекурсивного спуска с откатами. Более того, поскольку рекурсивные правила (содержащие сами себя) в рамках этой схемы Окамл определить не дает (или это только у меня не получилось? Update: позже
все получилось, рекурсивные правила на Окамле описывать можно), выходит, что мы имеем дело с регулярными грамматиками, а процесс разбора можно описать несложным конечным автоматом. Впрочем, добавив стек в автомат, можно реализовать и рекурсию, расширив класс допустимых грамматик. Простейший парсер, разбирающий один символ, можно представить конечным автоматом с таким графом состояний:
Сначала он находится в состоянии in, и если символ подходящий, то переходит в состояние ok, возвращая успешный результат и остаток списка, а если символ не подходит, то автомат переходит в состояние fail, не изменяя переданного ему списка. И вообще, любой парсер имеет такой общий вид: у него есть одно входное состояние и два выходных - успех и неудача.
Графы состояний автоматов парсер-комбинаторов можно получить, соединяя разным образом графы автоматов их аргументов. Результат тоже всегда имеет одно входное состояние и два выходных, поэтому подходит для дальнейшего манипулирования другими комбинаторами. Вот так выглядят описанные выше комбинаторы последовательности, альтернативы, опциональности и повторения:
Таким образом, сложные парсеры могут быть реализованы как конечные автоматы, собранные по простым правилам из КА элементарных парсеров. И можно описать комбинаторы, строящие такие автоматы. Синтезированный ими граф можно упростить и превратить в таблицу, по которой функция-парсер будет работать без лишних накладных расходов. Единственное затруднение - как описать семантические действия по созданию разобранных значений. В классическом подходе использовались замыкания, и разобранные ранее значения можно было использовать позже для синтеза более сложных. Однако результатом комбинаторов была монолитная функция, которую преобразовать и упростить уже нельзя. Возможно, у проблемы есть элегантное решение, но я пока поступил просто: семантические действия передают друг другу данные через изменяемые переменные, доступные им всем.
В графе КА парсера есть три вида переходов: а) встречен подходящий символ, б) ветка else и в) безусловный переход. Каждый переход может сопровождаться каким-нибудь действием. Действия на переходах по символу получают этот символ в качестве аргумента, другие действия ничего не получают и работают исключительно с описанным снаружи состоянием. Тип переходов можно описать так:
type move_kind =
| Symb of char * char | Else | Always
| SymbA of char * char * (int -> unit) | ElseA of (unit -> unit) | AlwaysA of (unit -> unit);;
Граф КА можно описать набором вершин, каждая из которых может иметь несколько переходов из себя в другие вершины. Переход описывается парой (тип перехода, номер вершины назначения). Храня наборы переходов и вершин в массивах, элементарные графы, разбирающие один символ, можно описать так:
let p_char c = [| [| Symb(c,c), 1; Else, 2 |]; [||]; [||] |];;
let p_range c1 c2 = [| [| Symb(c1,c2), 1; Else, 2 |]; [||]; [||] |];;
let f_char c f = [| [| SymbA(c,c,f), 1; Else, 2 |]; [||]; [||] |];;
let f_range c1 c2 f = [| [| SymbA(c1,c2,f), 1; Else, 2 |]; [||]; [||] |];;
p_char х успешно разбирает символ х. p_range описывает допустимый набор символов, например цифры или буквы. Их аналоги, начинающиеся с f_, делают все то же, но при этом содержат некие семантические действия, которые будут выполнены при успешном разборе символа. Эти графы имеют три вершины - in, ok и fail - и два перехода из нулевой (in) в другие. Поскольку все КА парсеров имеют эти три состояния, можно условиться, что первые три вершины любого графа у нас будут обозначать in, ok и fail. Каждый синтезируемый из других граф будет иметь эту внешнюю рамку из таких трех состояний, причем из нового состояния in будет безусловный переход на начальное состояние первого подграфа. Функция frame создает такую рамку. Функция relocate делает из заданного графа подграф, сдвигая номера упоминаемых в нем вершин. Функция connect создает подграф и соединяет его выходные состояния ok и fail с заданными состояниями нового графа. Используя эти вспомогательные функции несложно описать графы для комбинаторов последовательности и опциональности, соединяя подграфы по приведенным выше схемам.
let frame kind = [| [| kind, 3; |]; [||]; [||] |];;
let relocate p delta =
Array.map (Array.map (fun (kind, idx) -> (kind, idx + delta) )) p;;
let connect p delta ok_state fail_state =
let res = relocate p delta in res.(1) <- [| Always, ok_state |]; res.(2) <- [| Always, fail_state |]; res;;
let (>>>) p1 p2 =
let b_start = 3 + Array.length p1 in
Array.concat [ frame Always; connect p1 3 b_start 2; connect p2 b_start 1 2];;
let p_opt p =
Array.append (frame Always) (connect p 3 1 1);;Аналогично строятся и остальные комбинаторы - альтернативы, повторения и т.д. Их аналоги с префиксом f_ запоминают в графе семантические действия.
let f_opt p f =
let res = Array.append (frame Always) (connect p 3 1 1) in
res.(5) <- [| AlwaysA f, 1 |];
res;;
let (|||) p1 p2 =
let b_start = 3 + Array.length p1 in
Array.concat [(frame Always); (connect p1 3 1 b_start); (connect p2 b_start 1 2)];;
let p_star p =
Array.append (frame Always) (connect p 3 3 1);;
let f_star initf p =
Array.append (frame (AlwaysA initf)) (connect p 3 3 1);;
let p_plus p = p >>> p_star p;;
let f_plus initf p =
let res = p >>> p_star p in
res.(0).(0) <- (AlwaysA initf, 3); res;;
let (>>=) p f =
let res = Array.append (frame Always) (connect p 3 1 2) in
res.(4) <- [| AlwaysA f, 1 |];
res;;
Имея такие комбинаторы, можно строить графы конечных автоматов для сложных парсеров. Например, разбор вещественного числа из исходной задачки может выглядеть так:
let sign = ref 0.0;;
let ivalue = ref 0;;
let fr = ref 0.0;;
let fv = ref 0.0;;
let p_float =
f_opt (f_char '-' (fun _->sign:=-1.0)) (fun _->sign:=1.0)
>>> f_star (fun _->ivalue:=0) (f_range '0' '9' (fun c->ivalue:=!ivalue*10+c-48))
>>> p_char ','
>>> f_star (fun _->fv:=0.0; fr:=0.1)
(f_range '0' '9' (fun c-> fv:=!fv +. (float_of_int (c-48)) *. !fr; fr := !fr *. 0.1));;
В результате получается довольно развесистый граф. Но его можно сильно упростить, убрав все состояния, содержащие только безусловный переход в другое.
let rec simplify p =
let unconditional =
Array.enum p |> Enum.mapi (fun i node ->
if i=0 then None else
try Some(i, Array.find (function Always,_->true | _->false) node) with Not_found -> None )
|> Enum.filter_map identity in
match Enum.peek unconditional with
| None -> p
| Some (from, (uncond_kind, goto)) ->
let goto' = if goto < from then goto else goto-1 in
let adjust_move (kind, idx) = (kind, if idx < from then idx else if idx=from then goto' else idx-1) in
let p' = p |> Array.enum |> Enum.mapi (fun i node-> (i,node)) |> Enum.filter_map (fun (i,node) ->
if i = from then None else Some( Array.map adjust_move node ))
|> Array.of_enum in
simplify p';;
Дальше граф можно превратить в таблицу, где в каждом состоянии для каждого возможного входного символа будет указано действие - принять символ (перейти дальше по строке) или не трогать, выполнить какое-то семантическое действие или не выполнять - и номер состояния, в которое следует перейти из данного.
type move_action = Consume | Keep | ConsumeA of (int -> unit) | KeepA of (unit -> unit);;
let tabulate p =
p |> Array.map (fun node->
let tmp = Array.make 257 (Keep, -1) in
let defm = Array.fold_left (fun def (kind, idx) ->
match kind with
| Symb(c1, c2) -> let m = (Consume, idx) in for i = int_of_char c1 to int_of_char c2 do tmp.(i)<-m done; def
| SymbA(c1, c2, f) -> let m = (ConsumeA f, idx) in for i = int_of_char c1 to int_of_char c2 do tmp.(i)<-m done; def
| Else
| Always -> Keep, idx
| AlwaysA f -> KeepA f, idx
| ElseA f -> KeepA f, idx
) (Keep, -1) node in
tmp |> Array.map (fun (k, idx) -> if idx >=0 then k,idx else defm) );;
Эту таблицу можно еще оптимизировать. Если при встрече какого-то символа не совершается никаких действий и просто идет переход в другое состояние, можно проследить по таблице какое же действие в конце концов будет выполнено для этого символа и это действие поставить сразу в рассматриваемую ячейку, таким образом сэкономив на переходах. В итоге ни одна смена состояния не будет совершаться в холостую, все они будут сопровождаться действиями.
let optimize tbl =
let rec optimove ch (action, next_state) =
if action=Keep && next_state!=1 && next_state!=2 then optimove ch tbl.(next_state).(ch)
else action, next_state in
Array.mapi (fun state tab -> if state!=1 && state!=2 then Array.mapi optimove tab else tab) tbl;;
Имея такую оптимизированную таблицу, можно уже получить готовый быстрый парсер, который будет действовать по этой таблице и в конце концов выполнять либо действие, соответствующее успеху всего парсера, либо соответствующее неудаче. Конец строки считается особым символом с номером 256.
let execute success fail tbl str =
let len = String.length str in
let rec run state i =
if state = 1 then success () else
if state = 2 then fail () else
let ch = if ithen int_of_char str.[i] else 256 in
match tbl.(state).(ch) with
| Consume, next_state -> run next_state (i+1)
| Keep, next_state -> run next_state i
| ConsumeA f, next_state -> f ch; run next_state (i+1)
| KeepA f, next_state -> f (); run next_state i in
run 0 0;;
let prepare success fail p =
p |> simplify |> tabulate |> optimize |> execute success fail;;
Функция prepare берет исходный граф, построенный нашими комбинаторами, упрощает его, превращает в таблицу, оптимизирует ее и создает функцию-парсер, принимающую на вход строку и в зависимости от результата разбора выполняющую одно из заданных действий. Например, решение исходной задачки про суммирование вещественных чисел может выглядеть так:
let sum = ref 0.0;;
let p_floats =
f_star (fun _->sum:=0.0)
(p_star (p_char ' ') >>> p_float >>= fun _->sum := !sum +. !sign *. (float_of_int !ivalue +. !fv));;
let parse =
p_floats |> prepare
(fun ()->Printf.printf "Ok! Result = %f" !sum)
(fun ()->print_string "fail!");;
Решение, может, и не слишком красивое, зато работает быстро.
Если есть идеи, как сделать его красивее и удобнее, буду рад услышать.