Refs

Refs是用来协调对于一个或者多个binding的并发修改的。这个协调机制是利用 Software Transactional Memory (STM)来实现的。 Refs指定在一个事务里面修改。

STM在某些方面跟数据库的事务很像。在一个STM事务里面做的修改只有在事务提交之后别的线程才能看到。这实现了ACID里面的A和I。Validation函数是的对Ref的修改与跟它相关的其它的值是一致的(consistent), 也就实现了C。

要想你的代码在一个事务里面执行, 那么要把你的代码包在宏 dosync 的体内。当在一个事务里面对值进行修改,被改的其实是一个私有的、线程内的、直到事务提交才会被别的线程看到的一快内存。

如果到事务结束的时候也没有异常抛出的话, 那么这个事务会顺利的提交, 在事务里面所作的改变也就可以被别的线程看到了。

如果在事务里面有一个异常抛出,包括validation函数抛出的异常,那么这个事务会被回滚,事务里面对值做的修改也就会撤销。

如果在一个事务里面,我们要对一个Ref进行修改,但是发现从我们的事务开始之后,已经有别的线程对这个Ref做了改动(冲突了), 那么当前事务里面的改动会被撤销,然后从dosync的开头重试。那到底什么时候会检测到冲突, 什么时候会进行重试, 这个是没有保证的, 唯一保证的是clojure为检测到冲突,并且会进行重试。

要在事务里面执行的代码一定要是没有副作用的,这一点非常重要,因为前面提到的,事务可能会跟别的事务事务冲突,然后重试, 如果有副作用的话,那么出来的结果就不对了。不过要执行有副作用的代码也是可能的, 可以把这个方法调用包装给Agent, 然后这个方法会被hold住直到事务成功提交,然后执行一次。如果事务失败那么就不会执行。

ref 函数可以创建一个 Ref 对象。下面的例子代码创建一个Ref并且得到它的引用。

(def <em>name</em> (ref <em>value</em>))

dosync 宏用来包裹一个事务 -- 从它对应的左括号开始,到它对应的右括号结束。在事务里面我们用 ref-set 来改变一个Ref的值并且返回这个值。你不能在事务之外调用这个函数,否则会抛出 IllegalStateException 异常。 看例子:

(dosync
  ...
  (ref-set <em>name</em> <em>new-value</em>)
  ...)

如果你要赋的新值是基于旧的值的话,那么就需要三个步骤了:

  1. deference 这个 Ref 来获得它的旧值
  2. 计算新值
  3. 设置新值

altercommute 函数在一个操作里面完成这三个步骤。 alter 函数是用来操作那些必须以特定顺序进行的修改。而 commute 函数则是要来操作那些修改顺序不是很重要 -- 可以同时进行的修改。 跟 ref-set , 一样, 它们只能在一个事务里面调用。它们都接受一个 "update 函数" 做为参数, 以及一些额外的参数来计算新的值。这个函数会被传递这个Ref在线程内的当前的值以及一些额外的参数(如果有的话)。当我们要赋的新的值是基于旧的值计算出来的时候, 那么我们鼓励使用 altercommute 而不是 ref-set .

比如,我们想给一个Ref: counter 加一, 我们可以用 inc 函数来实现:

(dosync
  ...
  (alter counter inc)
  ; or as
  (commute counter inc)
  ...)

如果 alter 试图修改的 Ref 在当前事务开始之后被别的事务改变了,那么当前事务会进行重试。而同样的情况下 commute 不会进行重试。它会以事务内的当前值进行计算。这会获得比较好的性能(因为不进行重试)。但是要记住的是 commute 函数只有在多个线程对Ref的修改顺序不重要的时候才能使用。

如果一个事务提交了, 那么对于 commute 函数还会有一些额外的事情发生。对于每一个 commute 调用, Ref 的值会被下面的调用结果重置:

(apply <em>update-function</em> <em>last-committed-value-of-ref</em> <em>args</em>)

注意,这个update-function会被传递这个Ref最后被提交的值, 这个值可能是另外一个、在我们当前事务开始之后才开始的事务。

使用 commute 而不是 alter 是一种优化。只要对Ref进行更新的顺序不会影响到这个Ref的最终的值。

然后看一个使用了 Refs 和 Atoms (后面会介绍)的例子。这个例子涉及到银行账户以及账户之间的交易。首先我们定义一下数据模型。

(ns com.ociweb.bank)

; Assume the only account data that can change is its balance.
(defstruct account-struct :id  wner :balance-ref)

; We need to be able to add and delete accounts to and from a map.
; We want it to be sorted so we can easily
; find the highest account number
; for the purpose of assigning the next one.
(def account-map-ref (ref (sorted-map)))

下面的函数建立一个新的帐户,并且把它存入帐户的map, ? 然后返回它。

(defn open-account
  "creates a new account, stores it in the account map and returns it"
  [owner]
  (dosync ; required because a Ref is being changed
    (let [account-map @account-map-ref
     last-entry (last account-map)
     ; The id for the new account is one higher than the last one.
     id (if last-entry (inc (key last-entry)) 1)
     ; Create the new account with a zero starting balance.
     account (struct account-struct id owner (ref 0))]
  ; Add the new account to the map of accounts.
  (alter account-map-ref assoc id account)
  ; Return the account that was just created.
  account)))

下面的函数支持从一个账户里面存/取钱。

(defn deposit [account amount]
  "adds money to an account; can be a negative amount"
  (dosync ; required because a Ref is being changed
    (Thread/sleep 50) ; simulate a long-running operation
    (let [owner (account  wner)
          balance-ref (account :balance-ref)
          type (if (pos? amount) "deposit" "withdraw")
          direction (if (pos? amount) "to" "from")
          abs-amount (Math/abs amount)]
      (if (>= (+ @balance-ref amount) 0) ; sufficient balance?
        (do
          (alter balance-ref + amount)
          (println (str type "ing") abs-amount direction owner))
        (throw (IllegalArgumentException.
                 (str "insufficient balance for " owner
                      " to withdraw " abs-amount)))))))

(defn withdraw
  "removes money from an account"
  [account amount]
  ; A withdrawal is like a negative deposit.
  (deposit account (- amount)))

下面是函数支持把钱从一个账户转到另外一个账户。由 dosync 所开始的事务保证转账要么成功要么失败,而不会出现中间状态。

(defn transfer [from-account to-account amount]
  (dosync
    (println "transferring" amount
     "from" (from-account  wner)
     "to" (to-account  wner))
    (withdraw from-account amount)
    (deposit to-account amount)))

下面的函数支持查询账户的状态。由 dosync 所开始的事务保证事务之间的一致性。比如把不会报告一个转账了一半的金额。

(defn- report-1 ; a private function
  "prints information about a single account"
  [account]
  ; This assumes it is being called from within
  ; the transaction started in report.
  (let [balance-ref (account :balance-ref)]
    (println "balance for" (account  wner) "is" @balance-ref)))

(defn report
  "prints information about any number of accounts"
  [& accounts]
  (dosync (doseq [account accounts] (report-1 account))))

上面的代码没有去处理线程启动时候可能抛出的异常。相反,我们在当前线程给他们定义了一个异常处理器。

; Set a default uncaught exception handler
; to handle exceptions not caught in other threads.
(Thread/setDefaultUncaughtExceptionHandler
  (proxy [Thread$UncaughtExceptionHandler] []
    (uncaughtException [thread throwable]
      ; Just print the message in the exception.
      (println (.. throwable .getCause .getMessage)))))

现在我们可以调用上面的函数了。

(let [a1 (open-account "Mark")
   a2 (open-account "Tami")
   thread (Thread. #(transfer a1 a2 50))]
  (try
  (deposit a1 100)
  (deposit a2 200)

  ; There are sufficient funds in Mark's account at this point
  ; to transfer $50 to Tami's account.
  (.start thread) ; will sleep in deposit function twice!

  ; Unfortunately, due to the time it takes to complete the transfer
  ; (simulated with sleep calls), the next call will complete first.
  (withdraw a1 75)

  ; Now there are insufficient funds in Mark's account
  ; to complete the transfer.

  (.join thread) ; wait for thread to finish
  (report a1 a2)
  (catch IllegalArgumentException e
   (println (.getMessage e) "in main thread"))))

上面代码的输出是这样的:

depositing 100 to Mark
depositing 200 to Tami
transferring 50 from Mark to Tami
withdrawing 75 from Mark
transferring 50 from Mark to Tami (a retry)
insufficient balance for Mark to withdraw 50
balance for Mark is 25
balance for Tami is 200