Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions src/skuld/http.clj
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
(defn- encode-bytes
"Encode a bytes to the json generator."
[^Bytes b ^JsonGenerator jg]
(.writeString jg (-> ^Bytes b .bytes b64/encode String. (.replaceAll "\\+" "-") (.replaceAll "/" "_"))))
(.writeString jg (-> ^Bytes b .bytes b64/encode String.)))

;; Custom Cheshire encoder for the Bytes type
(add-encoder Bytes encode-bytes)
Expand Down Expand Up @@ -62,7 +62,12 @@
(defn- b64->id
"Coerces a base64-encoded id into a Bytes type."
[^String b64-id]
(-> b64-id (.replaceAll "-" "+") (.replaceAll "_" "/" ) .getBytes b64/decode Bytes.))
(-> b64-id (.replaceAll "-" "+") (.replaceAll "_" "/") .getBytes b64/decode Bytes.))

(defn- b64->bytes
"Coerce a base64-encoded value into a Bytes type."
[^String b64-str]
(-> b64-str .getBytes b64/decode Bytes.))

(defn- parse-int
"Safely coerces a string into an integer. If the conversion is impossible,
Expand Down Expand Up @@ -99,6 +104,18 @@
ret (node/complete! node msg)]
(POST req (dissoc ret :responses))))

(defn- update!
"Like `node/update!`, but wrapped around an HTTP request."
[node req id]
(let [id (b64->id id)
cid (-> req :body :cid)
lid (-> req :body :lid)
msg (-> req :body :msg b64->bytes)
w (-> req :query-params :w parse-int)
msg {:task-id id :claim-id cid :log-id lid :message msg :w w}
ret (node/update! node msg)]
(POST req (dissoc ret :responses))))

(defn- count-tasks
"Like `node/count-tasks`, but wrapped around an HTTP request."
[node req]
Expand Down Expand Up @@ -145,6 +162,7 @@
(condp route-matches req
"/queue/count" (count-queue node req)
"/tasks/claim" (claim! node req)
"/tasks/update/:id" :>> (fn [{:keys [id]}] (update! node req id))
"/tasks/complete/:id" :>> (fn [{:keys [id]}] (complete! node req id))
"/tasks/count" (count-tasks node req)
"/tasks/enqueue" (enqueue! node req)
Expand Down
25 changes: 25 additions & 0 deletions src/skuld/node.clj
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,29 @@
(vnode node))
msg))

(defn update-local!
[node msg]
(let [part (->> msg :task-id (partition-name node))]
(if-let [vnode (vnode node part)]
(do (vnode/update! vnode msg)
{:w 1})
{:error (str "I don't have partition" part "for task" (:task-id msg))})))

(defn update!
[node msg]
(let [w (or (:w msg) 2)
responses (net/sync-req! (:net node)
(preflist node (:task-id msg))
{:r w}
(merge msg {:type :update-local}))
acks (remove :error responses)
w' (reduce + (map :w acks))]
(if (<= w w')
{:w w'}
{:w w'
:error "not enough nodes acknowledged request for update"
:responses responses})))

(defn complete-local!
"Completes a given task on a local vnode."
[node msg]
Expand Down Expand Up @@ -431,6 +454,8 @@
:claim claim!
:claim-local claim-local!
:request-claim request-claim!
:update update!
:update-local update-local!
:complete complete!
:complete-local complete-local!
:wipe wipe!
Expand Down
31 changes: 26 additions & 5 deletions src/skuld/task.clj
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@

{:start (long) milliseconds in linear time
:end (long) milliseconds in linear time
:completed (long) milliseconds in linear time}"
:completed (long) milliseconds in linear time
:logs [...] a vector of logs}"
(:refer-clojure :exclude [merge])
(:use skuld.util)
(:require [skuld.flake :as flake]
[skuld.util :refer [fress-read fress-write]])
[skuld.util :refer [fress-read fress-write assocv]])
(:import com.aphyr.skuld.Bytes))

(def clock-skew-buffer
Expand All @@ -33,7 +33,8 @@
(let [now (flake/linear-time)]
{:start now
:end (+ now dt)
:completed nil}))
:completed nil
:logs []}))

(defn valid-claim?
"Is a claim currently valid?"
Expand Down Expand Up @@ -88,6 +89,11 @@
[task claim-idx t]
(assoc-in task [:claims claim-idx :completed] t))

(defn update
"Return a copy of the task with the log updated. Takes a claim index, log index
and message."
[task claim-idx log-idx message]
(assoc-in task [:claims claim-idx :logs log-idx] message))

(defn merge-by
"Merges times by merge-fn"
Expand All @@ -97,6 +103,19 @@
nil
(apply merge-fn valid-times))))

(defn merge-logs
"Merge logs vectors by picking the first non-nil value from each index"
[& logses]
(->> logses
(map count)
(apply max)
range
(mapv (fn [i]
(->> logses
(map #(nth % i nil))
(keep identity)
first)))))

(defn merge-claims
"Merges a collection of vectors of claims together."
[claims]
Expand All @@ -120,7 +139,9 @@
(:end claim))
:completed (merge-by min
(:completed merged)
(:completed claim))}
(:completed claim))
:logs (merge-logs (:logs merged)
(:logs claim))}
claim)
merged))
nil
Expand Down
22 changes: 11 additions & 11 deletions src/skuld/util.clj
Original file line number Diff line number Diff line change
Expand Up @@ -85,17 +85,17 @@
(list value))
vec))))

(defn update
"Like update-in, but takes a single key. Given a map, a key, a function, and
args, updates the value of (get map key) to be (f current-value arg1 arg2
...)"
[m k f & args]
(assoc m k (apply f (get m k) args)))

(defn update!
"Transient version of update"
[m k f & args]
(assoc! m k (apply f (get m k) args)))
;(defn update
; "Like update-in, but takes a single key. Given a map, a key, a function, and
; args, updates the value of (get map key) to be (f current-value arg1 arg2
; ...)"
; [m k f & args]
; (assoc m k (apply f (get m k) args)))
;
;(defn update!
; "Transient version of update"
; [m k f & args]
; (assoc! m k (apply f (get m k) args)))

(defmacro compare+
"Expands into a comparison between a and b on the basis of function f1, then
Expand Down
12 changes: 12 additions & 0 deletions src/skuld/vnode.clj
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,18 @@
; The ID in the queue did not exist in the database
(recur vnode queue-name dt)))))

(defn update!
"Updates the given task in the specified claim. Msg should contain:

:task-id The task identifier
:claim-id The claim index
:log-id The log index
:message The log message contents as a byte array"
[vnode {:keys [task-id claim-id log-id message] :as msg}]
(merge-task! vnode
(-> vnode
(get-task task-id)
(task/update claim-id log-id message))))

(defn complete!
"Completes the given task in the specified claim. Msg should contain:
Expand Down
14 changes: 14 additions & 0 deletions test/skuld/node_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,20 @@
(is (= id id*))
(is (not= claims []))))))

;; Add a log message
(let [uri (str "http://127.0.0.1:13100/tasks/update/" id)
cid 0
lid 0
resp (http/post uri {:form-params {:cid cid :lid lid :message "status=10"}
:content-type :json
:as :json})
content-type (get-in resp [:headers "content-type"])
resp* (http/get (str "http://127.0.0.1:13100/tasks/" id "?r=3")
{:as :json})
logs (-> resp* :body :task :claims (nth cid) :logs)]
(is (= 200 (:status resp)))
(is (= "application/json;charset=utf-8" content-type))
(is (not (nil? logs))))

;; Finally let's complete it
(let [uri (str "http://127.0.0.1:13100/tasks/complete/" id)
Expand Down
16 changes: 14 additions & 2 deletions test/skuld/task_test.clj
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,26 @@
(let [t (task {:data :hi})]
(is (= (merge (assoc t :claims [{:start 0 :end 1 :completed 100}])
(assoc t :claims [{:start 2 :end 4 :completed 50}]))
(assoc t :claims [{:start 0 :end 4 :completed 50}])))))
(assoc t :claims [{:start 0 :end 4 :completed 50 :logs []}])))))

(testing "completed without start"
(let [t (task {:data :hi})]
(is (= (merge (assoc t :claims [{:start 0 :end 1 :completed 100}])
(assoc t :claims [{:start 2 :end 4}])
(assoc t :claims [{:completed 50}]))
(assoc t :claims [{:start 0 :end 4 :completed 50}]))))))
(assoc t :claims [{:start 0 :end 4 :completed 50 :logs []}])))))

(testing "a claim with different logs"
(let [t (task {:data :hi})]
(is (= (merge (assoc t :claims [{:start 0 :end 1 :logs ["one"]}])
(assoc t :claims [{:start 0 :end 1 :logs [nil "two"]}]))
(assoc t :claims [{:start 0 :end 1 :completed nil :logs ["one" "two"]}])))))

(testing "a claim with some of the same logs"
(let [t (task {:data :hi})]
(is (= (merge (assoc t :claims [{:start 0 :end 1 :logs ["one"]}])
(assoc t :claims [{:start 0 :end 1 :logs ["one" "two"]}]))
(assoc t :claims [{:start 0 :end 1 :completed nil :logs ["one" "two"]}]))))))


(deftest claim-test
Expand Down