Learn you some Clojure

Mar 07, 2012 13:03

Recently I started a small project to help me learn Clojure. It’s a chat bot with pluggable architecture and Campfire adapter at the moment. Luckily, the task turned out to be wide enough, so while implementing it I’ve touched almost every aspect of Clojure.

Today I’m going to walk through the final codebase discussing interesting parts and exploring beautiful moments. I hope this will be useful for folks who are interested in Clojure and what it is capable of, its basic concepts, a-little-less-trivial examples. I will not discuss low-level details of language syntax, so if it’s a problem you’d better start somewhere else.

Original idea came from github’s hubot, and one of the challenges was to go round its limitations. Although main task of this project was educational, it has everything needed to be used as a chat bot too. Final source code is available at github: https://github.com/tonsky/katybot.
core.clj
We start at core.clj where robot contract is defined:

(ns katybot.core)

(def version "Katybot-clj/0.3")
(defn new-robot [] {})

(defmulti listen :receptor)
(defmulti say (fn [robot msg] (:receptor robot)))
(defmulti say-img (fn [robot url] (:receptor robot)))
(defmulti user (fn [robot user-id] (:receptor robot)))
(defmulti users :receptor)
(defmulti shutdown :receptor)
(defmulti memorize (fn [robot k v] (:memory robot)))
(defmulti recall (fn [robot k] (:memory robot)))
(defmulti consider (fn [robot event] (:brain robot)))I’m using multimethods here because I want to build a robot object from interchangeable parts, combining different receptor, memory and brain implementations into one object. Robot object is just a hash-map with :receptor, :memory and :brain keys indicating to which implementation should a method call be dispatched. Closest analogue in traditional programming would be multiple inheritance, but with multimethods I can give orders to construct and even switch implementations at runtime.
atom_memory.clj
Let’s move to the easiest part of the robot - its memory. Atom_memory.clj is a very straight-forward in-memory implementation based on Clojure’s atoms. Atoms represent autonomous, non-transactional state mutated by functions applied to it:

(defn +atom-memory [robot]
(assoc robot :memory ::atom-memory ::storage (atom {})))

(defmethod memorize ::atom-memory [robot k v]
(swap! (::storage robot) assoc k v)
robot)

(defmethod recall ::atom-memory [robot k]
(get @(::storage robot) k))You can also see namespaced keywords (::kw is expanded by reader to :current-ns/kw), this way I store additional data structures in robot’s body and avoid collisions of map keys.
file_memory.clj
File-based memory has just a bit more sophisticated implementation:

(defn +file-memory [robot file]
(assoc robot
:memory ::file-memory
::agent (agent {:file file :memory (read-file file)})))

(defmethod memorize ::file-memory [robot k v]
(send (::agent robot) store k v)
robot)

(defmethod recall ::file-memory [robot k]
(get-in @(::agent robot) [:memory k]))Here, I use another Clojure concurrency primitive - agent - to avoid simultaneous reads/writes to file when accessing robot’s memory from different threads. Each agent has an event queue which he reads and processes sequentially, in his own thread, thus avoiding parallel changes applied to his state.

(defn- read-file [file]
(if (.exists (clojure.java.io/as-file file))
(read-string (slurp file))
{}))

(defn- store [state k v]
(let [{file :file, memory :memory} state
updated-memory (assoc memory k v)]
(spit file updated-memory)
(assoc state :memory updated-memory)))Another neat feature used here is Clojure’s built-in ability to serialize and deserialize native data structures. I just dump memory state to file using spit and read and parse it back using slurp and read-string.

That’s it for simple parts of the robot, we’re now moving to something interesting.
brain.clj
Brain in Katybot is a unit that executes commands, the logic behind the scene that finds answers to questions asked.

> Katy, calc me 2^100
... brain gets used ...
< 2^100 = 1.2676506 × 1030I needed an extensible and dynamic way to write commands for my robot, so I ended up in namespace-wide atom containing set of all the commands defined:

(defonce reflexes (atom #{}))

(defn register-reflex [name]
(swap! reflexes conj (resolve name)))defonce means var’s value won’t be replaced when we reload the file. (resolve name) translates symbol name to the corresponding var, a place where function’s body is stored. If at some point in the future I will redefine function’s body with defn (fixing bug, for example), new body will be assigned to the very same var, so in @reflexes robot will always see latest function evaluated.

Using this, we can register any function as a reflex:

(register-reflex 'fn-name)But vast majority of the commands follow the very same pattern:

  1. check if incoming message is a :command message;
  2. match message body againts some regex;
  3. if match succeeded, use capturing groups as arguments and do something;
  4. return flag indicating whether a command was processed or not.
For such purposes, I wrote a macro which, given regex and function body, generates a function with the checks described above and also registers this function as a reflex:

(defmacro defcommand [& body]
(let [[name doc attr-map re [robot event groups] body]
(consume [symbol? string? map? regex? vector?] body)]
`(do
(defn ~name
~(merge {:doc doc} attr-map)
[~robot {type# :type text# :text :as event#}]
(if (= type# :command)
(when-let [~groups (re-find ~re text#)]
(let [~event event#]
~@body
:answered))))
(register-reflex '~name))))Generated function is of two arguments, and it binds regex match result to the third one (groups) for inner body. I also tried my best at keeping destructuring ability in arguments declaration:

(defcommand on-translate
#"translate from ([a-z]{2}) to ([a-z]{2}) (.*)"
[robot {:keys [user-id] :as event} [_ from to phrase]]
...)consume is my own function for parsing list of forms with gaps based on their expected types. See utils.clj.

Other parts of brain.clj are not particularry interesting. In consider-event I check if a text event was a command addressed to my robot or just an inter-user chat. You can also write a reflex for raw events using defreflex macro (similar to defcommand, see usage examples in etiquette.clj).
repl.clj
Due to dynamic Clojure nature, it is possible to write functions for loading code, reloading code, importing namespaces, etc, all in runtime.

Command scripts are all stored in separate files in reflexes/ directory, but are quite simple themselves. To locate and reload them, I keep a set of shortcut functions in repl.clj:

(defn reload-robot []
(doseq [module ["utils" "core" "brain" "campfire" "console" "atom_memory" "file_memory"]]
(fyi "Loading " module)
(load module)
(use (-> (str "katybot." module) (str/replace "_" "-") symbol))))

(defn- load-dir [dir]
(fyi "Loading " dir ":")
(doseq [f (->> (file-seq (clojure.java.io/as-file dir))
(filter #(-> (.getName %) (.endsWith ".clj")))
(sort-by #(.getParent %)))]
(fyi " " f)
(load-file (.getCanonicalPath f))))

(defn reload-reflexes []
(load-dir "reflexes"))reload-reflexes could be evaluated along with a running robot because it contributes to global katybot.brain/reflexes atom. Next time an event happens, robot will operate an updated set of reflexes. This way you can include additional reflexes or fix those already loaded.

Functions which Clojure doesn’t have, like file manipulations, are all easily accessible via Java interop (all the (.dot …) forms).

You can even define a function to reload its own source file:

(defn reload-all []
(fyi "Loading repl")
(load "repl")
(use 'katybot.repl))console.clj
Receptor is a robot unit that talks to users. Receptors are Katybot’s ears and lips.

katybot.console is a simple receptor implementation that reads commands from stdin and prints results to stdout. It could be a great example of what is required to implement a custom receptor.

Nothing really interesting here, although you can see a colorized output and using of a (promise) as a stop condition.

(defn +console-receptor [robot]
(assoc robot
:receptor ::console-receptor
::running (promise)))

(defmethod listen ::console-receptor [robot]
(printf "\u001b[1;32m> ") (flush)
(doseq [line (line-seq (java.io.BufferedReader. *in*))
:let [event {:type :text, :text line, :user-id 0, :timestamp (java.util.Date.)}
action (consider robot event)]
:while (not (str/blank? line))
:while (not (realized? (::running robot)))]
(printf "\u001b[1;32m> ") (flush))
(printf "\u001b[m") (flush))

(defmethod shutdown ::console-receptor [robot]
(deliver (::running robot) false))Promises are another Clojure’s concurrency primitive, they allow one thread to block until another deliver some result to the first one. In my particular case, I could also use atom because I do not need this “block until” ability of promises, but I decided to try something else.
campfire_api.clj
Before we see how Campfire receptor is implemented, we have to see Campfire API library. It’s not complete, just enough for chat bot to do its basic functions; however, it’s the most complicated part of the whole robot.

First, there’s the protocol, so other API implementations could be provided:

(defprotocol CampfireAPI
(join [_ room])
(say [_ room msg])
(listen [_ room msg-callback])
(stop-listening [_ room])
(leave [_ room])
(user-info [_ user])
(user-me [_])
(room-info [_ room]))Protocols are Clojure’s approach to polymorphism and expression problem, a lot like interfaces, but unlike latter, they are separated from class implementing it, so it’s possible to write protocol’s implementations for existing classes, for example.

Next, there’s a bunch of dynamic vars:

(def ^:dynamic *check-interval* 5000)
(def ^:dynamic *stuck-timeout* 10000)
(def ^:dynamic *headers* {
"Content-Type" "application/json; charset=utf-8"
"Accept" "application/json"})
(def ^:dynamic *user-agent* "katybot.campfire-api")
(def ^:dynamic *debug* false)They are a lot like global variables, but with ability to be redefined temporary inside a body of a binding macro. It makes them very handy for implementing rarely needed options without сluttering main contract:

(binding [api/*debug* true
api/*user-agent* "Katybot-clj/0.3"]
(api/listen api room))Next, in campfire-async-api we build a dynamic object that will capture local bindings just as any closure will do, and then implement our protocol, all happening in the function’s body:

(defn campfire-async-api [account token]
(let [...
connections (ref {})]
(reify
CampfireAPI

(listen [api room msg-callback]
(dosync
(stop-listening api room)
(let [agnt (room-agent api (create-client) room msg-callback)]
(alter connections assoc room agnt)
agnt)))

(stop-listening [_ room]
(dosync
(when-let [old-agnt (@connections room)]
(send old-agnt finish)
(alter connections dissoc room)
old-agnt)))

...)))We build a connections map which is wrapped in a Ref and contains the list of all active room → agent mappings. We use ref and dosync instead of an atom because there’re several concurrent-sensitive parts involved (ref itself and agents in the map), we read and write to the ref and send actions to the agent; it should happen all at once or not happen at all. Atoms can’t give us such a guarantee because they are not integrated into STM, but refs and agents are.


So, for each room we are listening to there’s an agent. For reading Campfire’s stream I’ve chosen http.async.client by neotyk. It seems to be the most popular Clojure library for speaking http at the moment. The use of asynchronous client is also a good excercise for Clojure’s concurrent facilities.

From prior implementation experience I knew Campfire connection could stuck or be dropped after few hours of work. They have special “keep-alive” events sent every 3 seconds to inform client that everything is ok. It’s the responsibility of the API to handle these situations transparently.

So, I decided to use agent as a state machine, switching between :init, :listening, :broken and :finished phases:

(defn- touch [state phase]
(if (= :finished (:phase state))
state
(assoc state :phase phase, :last-accessed (now))))

(defn connect [state]
(let [{:keys [room api client msg-callback]} state
url (format "https://streaming.campfirenow.com/room/%s/live.json" room)]
(join api room) ; just in case we were kicked
(binding [httpr/*default-callbacks* (merge httpr/*default-callbacks*
{:completed (partial done-callback *agent*)
:error (partial err-callback *agent*)})]
(-> state
(touch :listening)
(assoc :resp (httpc/request-stream client :get url (partial part-callback msg-callback *agent*)))))))

(defn disconnect [state]
(if (= (:phase state) :listening)
(httpc/cancel (:resp state)))
state)

(defn reconnect [state]
(-> state
disconnect
connect))

(defn finish [state]
(with-open [client (:client state)]
(-> state
disconnect
(assoc :phase :finished))))(touch ...) function is what I use for state transitions because, first, I needed to be sure I will not escape anywhere from :finished, and second, I needed to update :last-accessed time.

In connect you can see me setting up httpc/request-stream callbacks via dynamic var bindings, that’s where I learned the trick.

Since all of the transitions are just functions applying to agent’s state and returning new state, it is very easy to combine and reuse them, as I did in reconnect and finish.

Two other features of Clojure agents that came in handy are error handlers and watches. Watches are notified whenever agent’s state changes. I use them to debug agent switching phases:

(defn- logger [room _ agnt old-state new-state]
(when (not= (:phase old-state) (:phase new-state))
(debug new-state "logger" (:phase old-state) " -> " (:phase new-state))))

(add-watch :logger-watcher (partial logger room))Error handler is called when exception is thrown in agent’s thread. Its task is to repair agent’s state and restart it. In my case all scheduled actions will be cleared from agent’s queue (:clear-actions true), exception will be printed and agent will be restarted in :broken state:

(defn- doctor [agnt e]
(omg! "Exception in room-agent " (:room @agnt) ": " e)
(send agnt touch :broken))

(let [agnt (agent {...}
:error-handler doctor
:clear-actions true)])And to top it all, I have separate thread scheduled via java.util.Timer (see utils.clj) which monitors agent’s :last-accessed time and schedule reconnect if agent was inactive for too long:

(defn- watchman [agnt]
(let [{:keys [phase resp last-accessed] :as state} @agnt
delay (- (now) last-accessed)]
(cond
(= phase :finished) nil ; stopping watchman
(> delay *stuck-timeout*) (do
(send agnt reconnect)
:continue)
:else :continue)))

(schedule (partial watchman agnt) *check-interval*)So, there’re four threads interacting with each other (api caller thread, agent, watchman and httpc callbacks thread) without any locks involved. Developing such a configuration was easy enough, and I hope it is still pretty understandable. Of course, there’s no heavy load, but for my task Clojure’s concurrent facilities turned out to be a perfect match, a way better than anything I’ve tried before.
campfire.clj
campfire.clj is a pretty straightforward implementation of receptor using campfire-api. Its main task is to translate events from Campfire format to robot format. We can look at some interesting control flow and data manipulation primitives:

(defn- user-from-campfire [user]
(change-keys user :avatar_url :avatar))

(defn type-from-campfire [type]
(case type
"TextMessage" :text
"EnterMessage" :join
"LeaveMessage" :leave
type))

(defn- item-from-campfire [item]
(-> item
(change-keys
:body :text
:user_id :user-id
:created_at :timestamp)
(update-in [:timestamp] #(.parse (java.text.SimpleDateFormat. "yyyy/MM/dd HH:mm:ss Z") %))
(update-in [:type] type-from-campfire)))

(defmethod users ::campfire-receptor [{api ::api, room ::room}]
(let [room (api/room-info api room)]
(into {}
(for [u (:users room)]
[(:id u) (user-from-campfire u)]))))Where change-keys is defined in utils.clj as follows:

(defn change-keys [m & {:as keys}]
(reduce
(fn [acc [old new]]
(-> acc
(assoc new (m old))
(dissoc old)))
m keys))project.clj
Build tools and REPL I used were provided by Leiningen project. repl.clj helpers were nicely integrated into it via:

:repl-init katybot.replWhen I run lein repl, it automatically loads katybot.repl namespace, which loads and imports rest of my project.
utils.clj

What’s left is utils.clj where I have log levels named right:

(defn log [& msg]
(let [ts (-> (java.text.SimpleDateFormat. "MMM dd HH:mm:ss")
(.format (java.util.Date.)))]
(println (apply str "\u001b[1;30m" ts "\u001b[m " msg))))
(defn btw [& msg] (log "\u001b[1;30m" (apply str msg) "\u001b[m"))
(defn fyi [& msg] (log "\u001b[1;32m" (apply str msg) "\u001b[m"))
(defn omg! [& msg] (log "\u001b[1;31m" (apply str msg) "\u001b[m"))Aslo, it turned out to be very handy to use different colors for different log levels. No, not in Windows, sorry.
Conclusion
It was an exciting ride, in spite of the fact that I’ve got some things done right second or even third time only. Approach to state management, dynamic features, concurrency primitives and rich standard library proved to make development easy and resulting product robust. Clojure is definetly my language of choice, and I can’t wait to try ClojureScript and Emacs/Swank next.

Any questions about this project, implementation details, comments on how this code could be improved, made more idiomatic or anything, are welcome.

And, if you’ve read till here, thank you.

(defn listen-console []
(-> (new-robot)
(+file-memory "robot.memory")
(+global-brain ["/" "Katy" "Kate"])
(+console-receptor)
(listen)))

en, clojure, fp

Previous post Next post
Up