diff --git a/src/skuld/http.clj b/src/skuld/http.clj index 4486914..e2c21c8 100644 --- a/src/skuld/http.clj +++ b/src/skuld/http.clj @@ -17,7 +17,7 @@ (defn- encode-bytes "Encode a bytes to the json generator." [^Bytes b ^JsonGenerator jg] - (.writeString jg (-> ^Bytes b .bytes b64/encode String. (.replaceAll "\\+" "-") (.replaceAll "/" "_")))) + (.writeString jg (-> ^Bytes b .bytes b64/encode String.))) ;; Custom Cheshire encoder for the Bytes type (add-encoder Bytes encode-bytes) @@ -62,7 +62,12 @@ (defn- b64->id "Coerces a base64-encoded id into a Bytes type." [^String b64-id] - (-> b64-id (.replaceAll "-" "+") (.replaceAll "_" "/" ) .getBytes b64/decode Bytes.)) + (-> b64-id (.replaceAll "-" "+") (.replaceAll "_" "/") .getBytes b64/decode Bytes.)) + +(defn- b64->bytes + "Coerce a base64-encoded value into a Bytes type." + [^String b64-str] + (-> b64-str .getBytes b64/decode Bytes.)) (defn- parse-int "Safely coerces a string into an integer. If the conversion is impossible, @@ -99,6 +104,18 @@ ret (node/complete! node msg)] (POST req (dissoc ret :responses)))) +(defn- update! + "Like `node/update!`, but wrapped around an HTTP request." + [node req id] + (let [id (b64->id id) + cid (-> req :body :cid) + lid (-> req :body :lid) + msg (-> req :body :msg b64->bytes) + w (-> req :query-params :w parse-int) + msg {:task-id id :claim-id cid :log-id lid :message msg :w w} + ret (node/update! node msg)] + (POST req (dissoc ret :responses)))) + (defn- count-tasks "Like `node/count-tasks`, but wrapped around an HTTP request." [node req] @@ -145,6 +162,7 @@ (condp route-matches req "/queue/count" (count-queue node req) "/tasks/claim" (claim! node req) + "/tasks/update/:id" :>> (fn [{:keys [id]}] (update! node req id)) "/tasks/complete/:id" :>> (fn [{:keys [id]}] (complete! node req id)) "/tasks/count" (count-tasks node req) "/tasks/enqueue" (enqueue! node req) diff --git a/src/skuld/node.clj b/src/skuld/node.clj index fad8048..8936580 100644 --- a/src/skuld/node.clj +++ b/src/skuld/node.clj @@ -352,6 +352,29 @@ (vnode node)) msg)) +(defn update-local! + [node msg] + (let [part (->> msg :task-id (partition-name node))] + (if-let [vnode (vnode node part)] + (do (vnode/update! vnode msg) + {:w 1}) + {:error (str "I don't have partition" part "for task" (:task-id msg))}))) + +(defn update! + [node msg] + (let [w (or (:w msg) 2) + responses (net/sync-req! (:net node) + (preflist node (:task-id msg)) + {:r w} + (merge msg {:type :update-local})) + acks (remove :error responses) + w' (reduce + (map :w acks))] + (if (<= w w') + {:w w'} + {:w w' + :error "not enough nodes acknowledged request for update" + :responses responses}))) + (defn complete-local! "Completes a given task on a local vnode." [node msg] @@ -431,6 +454,8 @@ :claim claim! :claim-local claim-local! :request-claim request-claim! + :update update! + :update-local update-local! :complete complete! :complete-local complete-local! :wipe wipe! diff --git a/src/skuld/task.clj b/src/skuld/task.clj index 0565c65..1b20190 100644 --- a/src/skuld/task.clj +++ b/src/skuld/task.clj @@ -9,11 +9,11 @@ {:start (long) milliseconds in linear time :end (long) milliseconds in linear time - :completed (long) milliseconds in linear time}" + :completed (long) milliseconds in linear time + :logs [...] a vector of logs}" (:refer-clojure :exclude [merge]) - (:use skuld.util) (:require [skuld.flake :as flake] - [skuld.util :refer [fress-read fress-write]]) + [skuld.util :refer [fress-read fress-write assocv]]) (:import com.aphyr.skuld.Bytes)) (def clock-skew-buffer @@ -33,7 +33,8 @@ (let [now (flake/linear-time)] {:start now :end (+ now dt) - :completed nil})) + :completed nil + :logs []})) (defn valid-claim? "Is a claim currently valid?" @@ -88,6 +89,11 @@ [task claim-idx t] (assoc-in task [:claims claim-idx :completed] t)) +(defn update + "Return a copy of the task with the log updated. Takes a claim index, log index + and message." + [task claim-idx log-idx message] + (assoc-in task [:claims claim-idx :logs log-idx] message)) (defn merge-by "Merges times by merge-fn" @@ -97,6 +103,19 @@ nil (apply merge-fn valid-times)))) +(defn merge-logs + "Merge logs vectors by picking the first non-nil value from each index" + [& logses] + (->> logses + (map count) + (apply max) + range + (mapv (fn [i] + (->> logses + (map #(nth % i nil)) + (keep identity) + first))))) + (defn merge-claims "Merges a collection of vectors of claims together." [claims] @@ -120,7 +139,9 @@ (:end claim)) :completed (merge-by min (:completed merged) - (:completed claim))} + (:completed claim)) + :logs (merge-logs (:logs merged) + (:logs claim))} claim) merged)) nil diff --git a/src/skuld/util.clj b/src/skuld/util.clj index c122562..01845a4 100644 --- a/src/skuld/util.clj +++ b/src/skuld/util.clj @@ -85,17 +85,17 @@ (list value)) vec)))) -(defn update - "Like update-in, but takes a single key. Given a map, a key, a function, and - args, updates the value of (get map key) to be (f current-value arg1 arg2 - ...)" - [m k f & args] - (assoc m k (apply f (get m k) args))) - -(defn update! - "Transient version of update" - [m k f & args] - (assoc! m k (apply f (get m k) args))) +;(defn update +; "Like update-in, but takes a single key. Given a map, a key, a function, and +; args, updates the value of (get map key) to be (f current-value arg1 arg2 +; ...)" +; [m k f & args] +; (assoc m k (apply f (get m k) args))) +; +;(defn update! +; "Transient version of update" +; [m k f & args] +; (assoc! m k (apply f (get m k) args))) (defmacro compare+ "Expands into a comparison between a and b on the basis of function f1, then diff --git a/src/skuld/vnode.clj b/src/skuld/vnode.clj index 9a0b053..3132b99 100644 --- a/src/skuld/vnode.clj +++ b/src/skuld/vnode.clj @@ -571,6 +571,18 @@ ; The ID in the queue did not exist in the database (recur vnode queue-name dt))))) +(defn update! + "Updates the given task in the specified claim. Msg should contain: + + :task-id The task identifier + :claim-id The claim index + :log-id The log index + :message The log message contents as a byte array" + [vnode {:keys [task-id claim-id log-id message] :as msg}] + (merge-task! vnode + (-> vnode + (get-task task-id) + (task/update claim-id log-id message)))) (defn complete! "Completes the given task in the specified claim. Msg should contain: diff --git a/test/skuld/node_test.clj b/test/skuld/node_test.clj index ba2bb54..56bc501 100644 --- a/test/skuld/node_test.clj +++ b/test/skuld/node_test.clj @@ -341,6 +341,20 @@ (is (= id id*)) (is (not= claims [])))))) + ;; Add a log message + (let [uri (str "http://127.0.0.1:13100/tasks/update/" id) + cid 0 + lid 0 + resp (http/post uri {:form-params {:cid cid :lid lid :message "status=10"} + :content-type :json + :as :json}) + content-type (get-in resp [:headers "content-type"]) + resp* (http/get (str "http://127.0.0.1:13100/tasks/" id "?r=3") + {:as :json}) + logs (-> resp* :body :task :claims (nth cid) :logs)] + (is (= 200 (:status resp))) + (is (= "application/json;charset=utf-8" content-type)) + (is (not (nil? logs)))) ;; Finally let's complete it (let [uri (str "http://127.0.0.1:13100/tasks/complete/" id) diff --git a/test/skuld/task_test.clj b/test/skuld/task_test.clj index 20e3243..fda3591 100644 --- a/test/skuld/task_test.clj +++ b/test/skuld/task_test.clj @@ -45,14 +45,26 @@ (let [t (task {:data :hi})] (is (= (merge (assoc t :claims [{:start 0 :end 1 :completed 100}]) (assoc t :claims [{:start 2 :end 4 :completed 50}])) - (assoc t :claims [{:start 0 :end 4 :completed 50}]))))) + (assoc t :claims [{:start 0 :end 4 :completed 50 :logs []}]))))) (testing "completed without start" (let [t (task {:data :hi})] (is (= (merge (assoc t :claims [{:start 0 :end 1 :completed 100}]) (assoc t :claims [{:start 2 :end 4}]) (assoc t :claims [{:completed 50}])) - (assoc t :claims [{:start 0 :end 4 :completed 50}])))))) + (assoc t :claims [{:start 0 :end 4 :completed 50 :logs []}]))))) + + (testing "a claim with different logs" + (let [t (task {:data :hi})] + (is (= (merge (assoc t :claims [{:start 0 :end 1 :logs ["one"]}]) + (assoc t :claims [{:start 0 :end 1 :logs [nil "two"]}])) + (assoc t :claims [{:start 0 :end 1 :completed nil :logs ["one" "two"]}]))))) + + (testing "a claim with some of the same logs" + (let [t (task {:data :hi})] + (is (= (merge (assoc t :claims [{:start 0 :end 1 :logs ["one"]}]) + (assoc t :claims [{:start 0 :end 1 :logs ["one" "two"]}])) + (assoc t :claims [{:start 0 :end 1 :completed nil :logs ["one" "two"]}])))))) (deftest claim-test