;;;; Parallel programming utilities for SBCL. ;;;; Copyright (c) 2008,2009 Andy Hefner ;;;; Permission is hereby granted, free of charge, to any person obtaining ;;;; a copy of this software and associated documentation files (the ;;;; "Software"), to deal in the Software without restriction, including ;;;; without limitation the rights to use, copy, modify, merge, publish, ;;;; distribute, sublicense, and/or sellcopies of the Software, and to ;;;; permit persons to whom the Software is furnished to do so, subject ;;;; to the following conditions: ;;;; The above copyright notice and this permission notice shall be ;;;; included in all copies or substantial portions of the Software. ;;;; THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, ;;;; EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES ;;;; OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND ;;;; NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT ;;;; HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, ;;;; WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING ;;;; FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR ;;;; OTHER DEALINGS IN THE SOFTWARE. (defpackage :hq (:use :common-lisp) (:export #:message-queue #:make-message-queue #:message-queue-mutex #:message-queue-waitqueue #:message-queue-head #:message-queue-tail #:message-queue-push #:message-queue-push* #:message-queue-add #:message-queue-add* #:message-queue-pop #:message-queue-pop-n-nonblock #:message-queue-pop-n-blocking #:make-syncvar #:syncvar #:syncvar-full-p #:syncvar-mutex #:syncvar-waitqueue #:syncvar-generator #:syncvar-get #:syncvar-put #:delay #:force #:number-of-processors #:*worker-debug-stream* #:execute-job #:job-restartable-p #:note-job-failure #:ensure-workers #:get-work-queue #:shutdown-workers #:add-worker-bindings #:define-thread-variable #:declare-thread-variable #:parallel-chunks #:timed-call)) (in-package :hq) ;;;; Message queues ;;; Using message-queue-add / message-queue-pop yields a queue. ;;; Using message-queue-push / message-queue-pop yields a stack. (defstruct message-queue (mutex (sb-thread:make-mutex)) (waitqueue (sb-thread:make-waitqueue)) (head nil) (tail nil)) (declaim (inline message-queue-push message-queue-add message-queue-pop %message-queue-pop-n)) (defun message-queue-push (queue object) "Push 'object' on the head of 'queue'" (declare (optimize (speed 3)) (type message-queue queue)) (sb-thread:with-mutex ((message-queue-mutex queue)) (push object (message-queue-head queue)) (unless (message-queue-tail queue) (setf (message-queue-tail queue) (message-queue-head queue))) (sb-thread:condition-notify (message-queue-waitqueue queue))) (values)) (defun message-queue-push* (queue list) "Push a list of objects onto the head of the queue" (declare (optimize (speed 3)) (type message-queue queue) (type list list)) (when list (sb-thread:with-mutex ((message-queue-mutex queue)) (let ((tail (last list))) (unless (message-queue-tail queue) (setf (message-queue-tail queue) tail)) (setf (cdr tail) (message-queue-head queue) (message-queue-head queue) list) (sb-thread:condition-broadcast (message-queue-waitqueue queue))))) (values)) (defun message-queue-add (queue object) "Enqueue 'object' at end of 'queue'" (declare (optimize (speed 3)) (type message-queue queue)) (sb-thread:with-mutex ((message-queue-mutex queue)) (let ((new-tail (cons object nil))) (when (message-queue-tail queue) (setf (cdr (message-queue-tail queue)) new-tail)) (setf (message-queue-tail queue) new-tail (message-queue-head queue) (or (message-queue-head queue) new-tail))) (sb-thread:condition-notify (message-queue-waitqueue queue)) (values))) (defun message-queue-add* (queue list) "Enqueue a list of objects" (declare (optimize (speed 3)) (type message-queue queue) (type list list)) (when list (sb-thread:with-mutex ((message-queue-mutex queue)) (when (message-queue-tail queue) (setf (cdr (message-queue-tail queue)) list)) (setf (message-queue-tail queue) (last list) (message-queue-head queue) (or (message-queue-head queue) list)) (sb-thread:condition-broadcast (message-queue-waitqueue queue)))) (values)) (defun %message-queue-pop-n (queue n) (declare (optimize (speed 3)) (type fixnum n) (type message-queue queue)) (and (> n 0) (let* ((head (message-queue-head queue)) (tail (nthcdr (1- n) head))) (setf (message-queue-head queue) (cdr tail)) (unless (cdr tail) (setf (message-queue-tail queue) nil)) (when tail (setf (cdr tail) nil)) head))) (defun message-queue-pop (queue) "Pop an object from the head of the queue" (declare (optimize (speed 3)) (type message-queue queue)) (sb-thread:with-mutex ((message-queue-mutex queue)) (loop (when (message-queue-head queue) (return (car (%message-queue-pop-n queue 1)))) (sb-thread:condition-wait (message-queue-waitqueue queue) (message-queue-mutex queue))))) (defun message-queue-pop-n-nonblock (queue n) "Pop up to N objects from the queue, without blocking." (declare (optimize (speed 3)) (type message-queue queue) (type fixnum n)) (sb-thread:with-mutex ((message-queue-mutex queue)) (%message-queue-pop-n queue n))) (defun message-queue-pop-n-blocking (queue n) "Pop at least up to N objects from the queue, blocking until at least one item is available" (declare (optimize (speed 3)) (type message-queue queue) (type fixnum n)) (and (> n 0) (cons (message-queue-pop queue) (message-queue-pop-n-nonblock queue (1- n))))) ;;; Synchronous variable, a write-once storage cell not particularly ;;; similar to the SyncVar of CML. A syncvar is either empty or ;;; full. The syncvar-put operation stores a value in an empty ;;; syncvar. The syncvar-get operation retrieves the value of a full ;;; syncvar, blocking until the variable becomes full. A generator ;;; function allows the syncvar structure to also support delay and ;;; force operators for lazy computations. (defstruct syncvar full-p %value (mutex (sb-thread:make-mutex)) waitqueue generator) (defun syncvar-get (var) (declare (type syncvar var)) ;; Getting the value of a syncvar: ;; If the 'full' flag is true, the value has already been computed and ;; stored, and we may safely obtain it immediately. If not, the value ;; has either not been computed, or is currently being computed. ;; Syncvars are write-once, so there's no possible race condition ;; between querying the full flag and (when true) retreiving the value. (if (syncvar-full-p var) (syncvar-%value var) (sb-thread:with-mutex ((syncvar-mutex var)) ;; In either case, we must acquire the mutex. Having done so, we must ;; check the 'full' flag again, in case the value was deposited between ;; our first check and when we acquired the mutex. (cond ((syncvar-full-p var) (syncvar-%value var)) ;; If a generator function is present, compute the value while ;; holding the mutex and return it. ((syncvar-generator var) (prog1 (setf (syncvar-%value var) (funcall (syncvar-generator var))) (setf (syncvar-generator var) nil (syncvar-full-p var) t))) ;; Otherwise, there is no generator, so this must be a synchronous ;; (rather than lazy) value. Wait on the condition variable until ;; a value arrives. Create a waitqueue if one does not already ;; exist. (t (unless (syncvar-waitqueue var) (setf (syncvar-waitqueue var) (sb-thread:make-waitqueue))) (loop until (syncvar-full-p var) do (sb-thread:condition-wait (syncvar-waitqueue var) (syncvar-mutex var))) (syncvar-%value var)))))) (defun syncvar-put (var value) (declare (type syncvar var)) (sb-thread:with-mutex ((syncvar-mutex var)) (when (syncvar-full-p var) (error "Synchronous variable ~W is already full." var)) (prog1 (setf (syncvar-%value var) value) (setf (syncvar-full-p var) t) (let ((waitqueue (syncvar-waitqueue var))) (when waitqueue (setf (syncvar-waitqueue var) nil) (sb-thread:condition-broadcast waitqueue))))) value) (defmacro delay (&body body) `(make-syncvar :generator (lambda () ,@body))) (defun force (arg) (etypecase arg (syncvar (syncvar-get arg)))) ;;; Detection of number of processors (defvar *num-cpus* nil) (defun number-of-processors () (or *num-cpus* (setf *num-cpus* (%number-of-processors)))) (defun %number-of-processors () (with-open-file (in "/proc/cpuinfo") (loop with tag = (format nil "processor~A" #\Tab) as line = (read-line in nil nil) while line summing (if (= (length tag) (mismatch tag line)) 1 0)))) ;;;; Worker threads fed from a central job queue (defvar *workers* nil) (defvar *work-queue* nil) (defparameter *worker-debug-stream* *trace-output*) (defvar *worker-environment* (cons nil nil)) (defgeneric execute-job (job) (:method ((job function)) (funcall job))) (defgeneric job-restartable-p (job) (:method ((job function)) t) (:method (job) nil)) (defgeneric note-job-failure (job) (:method (job) (format *worker-debug-stream* "~&Job ~A was abandoned.~%" job) (values))) (defun %initialize-workers () (let* ((work-queue (setf *work-queue* (or *work-queue* (make-message-queue)))) (workers (loop for i from 0 below (number-of-processors) collect (sb-thread:make-thread (lambda () (worker-thread-top-level work-queue)))))) (setf *workers* workers) work-queue)) (defun guarded-execute-job (job) (tagbody try (restart-case (execute-job job) (retry-job () :report "Retry this job" :test (lambda (c) (declare (ignore c)) (job-restartable-p job)) (go try)) (abandon-job () :report "Abandon this job" (note-job-failure job) (go done))) done)) (defun worker-thread-top-level (work-queue) (worker-work work-queue *worker-environment*)) (defun worker-work (work-queue environment &optional job) (destructuring-bind (binding . rest) environment (cond ((and rest binding) (destructuring-bind (symbol . initfn) binding ;;(format *worker-debug-stream* "~&binding ~A~%" symbol) (progv (list symbol) (list (funcall initfn)) (worker-work work-queue rest job)))) (rest (worker-work work-queue rest job)) (t (loop (when (eql job :exit) (when *worker-debug-stream* (format *worker-debug-stream* "~&Thread exit.~%")) (message-queue-add work-queue :exit) (return-from worker-work)) (when job (guarded-execute-job job)) (setf job (message-queue-pop work-queue)) (when (rest environment) (return-from worker-work (worker-work work-queue environment job)))))))) (defun ensure-workers () (or *work-queue* (%initialize-workers))) (defun get-work-queue () (ensure-workers)) (defun shutdown-workers () (when *work-queue* (message-queue-push (get-work-queue) :exit)) (setf *work-queue* nil)) ;;;; Thread-local variables -- ;;;; ;;;; It is sometimes useful to guarantee that each worker thread will ;;;; have its own binding of a special variable which persists beyond ;;;; the extent of a single job, e.g. temporary buffers which cannot ;;;; be shared between threads but can be reused between jobs. For ;;;; this reason there exists the notion of the worker environment, ;;;; represented a list containing pairs of symbols and initialization ;;;; functions. Between jobs, the main worker loop will check for ;;;; additions to this list of bindings, and bind the variables to the ;;;; result of calling the initialization function using PROGV. New ;;;; bindings must be NCONC'd to the tail of the list in order for ;;;; running workers to see them. ;;;; Two utilities are available to extend the environment: (defun add-worker-binding (symbol init-function) "SYMBOL will be bound to the result of invoking INIT-FUNCTION with no arguments in each worker thread before any subsequent jobs are executed." (nconc *worker-environment* (list (cons symbol init-function))) (values)) (defmacro define-thread-variable (name &optional global-value worker-initform documentation) `(progn (defvar ,name ,global-value ,@(and documentation (list documentation))) (eval-when (:load-toplevel :execute) (add-worker-binding ',name (lambda () ,worker-initform))))) (defmacro declare-thread-variable (name worker-initform) `(eval-when (:load-toplevel :execute) (add-worker-binding ',name (lambda () ,worker-initform)))) ;;;; Utilities (defun parallel-chunks (start end function &key (min-chunk-size 1) (max-chunk-size nil) (debug nil)) "Spawn jobs which will call 'function' with subranges between start and end. Function should take two arguments, start and end indices. It may be called multiple times, simultaneously, and in no particular order. Use this to do something resembling a parallel map." (let* ((length (- end start)) (chunk-size (min (or max-chunk-size length) (max min-chunk-size (round length (* (number-of-processors) 8))))) (error-occured nil) (num-chunks 0) (num-chunks-lock (sb-thread:make-mutex)) (completion-port (make-syncvar))) (when debug (format *trace-output* "~¶llel-chunks: chunk-size=~A, total chunks=~A~%" chunk-size (ceiling length chunk-size))) ;; Might it be useful to randomize the chunk sizes, to avoid bad ;; cache effects? (message-queue-add* (get-work-queue) (loop for chunk-start from start below end by chunk-size as chunk-end = (min end (+ chunk-start chunk-size)) do (incf num-chunks) collect (let ((extent-start chunk-start) (extent-end chunk-end)) (lambda () (unwind-protect (handler-case (funcall function extent-start extent-end) (error () (setf error-occured t))) (sb-thread:with-mutex (num-chunks-lock) (decf num-chunks) (assert (>= num-chunks 0)) (when (zerop num-chunks) (syncvar-put completion-port (if error-occured :error :complete))))))))) ;; (format *trace-output* "~&Waiting for completion.~%") (ecase (syncvar-get completion-port) (:error (error "The job was not completed.")) (:complete t)))) (defun parallel-chunks-test (size &key (num-runs 10)) (let ((array (make-array size))) (flet ((fn (start end) (loop for i from start below end do (assert (null (aref array i))) (setf (aref array i) (* (sin i) (sin i))))) ; Very silly. (reset () (loop for i from 0 below size do (setf (aref array i) nil))) (check () #+NIL (loop for i from (1- size) downto 0 do (assert (= i (aref array i)))))) (values (time (loop repeat num-runs collect (prog2 (reset) (timed-call (lambda () (parallel-chunks 0 size #'fn))) (check)))) (time (loop repeat num-runs collect (prog2 (reset) (timed-call (lambda () (fn 0 size))) (check)))))))) ;;; message queue test (defun message-queue-test (&key (num-workers 4) (num-ints 1000) (push-batch-size 1) (pop-batch-size 1)) (let* ((terminate-vector (make-array num-workers :initial-element nil)) (exit-sentinel :exit) (running t) (stream *standard-output*) (result-vectors (loop for i from 0 below num-workers collect (make-array num-ints :element-type 'bit :initial-element 0))) (queue (make-message-queue)) (threads (loop for id from 0 for vector in result-vectors collect (sb-thread:make-thread (let ((id id) (vector vector)) (lambda () (loop named outer-loop as next-messages = (if (= 1 pop-batch-size) (list (message-queue-pop queue)) (message-queue-pop-n-blocking queue (1+ (random pop-batch-size)))) do (loop as next = (pop next-messages) while next do (cond ((eql next exit-sentinel) (format stream "~&~A EXIT?~%" id) (assert (not running)) (setf (elt terminate-vector id) t) ;; If we popped multiple messages, and one of them was the exit token, ;; push the unhandled ones back before we exit. (when next-messages (message-queue-add* queue next-messages)) (message-queue-add queue next) ; Pass the exit sentinel along to the next thread (return-from outer-loop)) (t (assert (zerop (elt vector next))) ;(format stream "~&popped ~A~%" next) (setf (elt vector next) 1))))))))))) (format t "~&Handing out numbers:~%") (time (loop with i = 0 while (< i num-ints) as batch = (loop with batch-size = (min push-batch-size (- num-ints i)) for j from i below (+ i batch-size) collect j finally (incf i batch-size)) do ;(print batch) (if (= push-batch-size 1) (if (zerop (random 2)) (message-queue-push queue (first batch)) (message-queue-add queue (first batch))) (if (zerop (random 2)) (message-queue-push* queue batch) (message-queue-add* queue batch))))) (format t "~&Waiting for threads to finish:~%") (setf running nil) (message-queue-add queue exit-sentinel) (loop for spins from 0 until (every #'identity terminate-vector) finally (format t "~&Spun ~A times waiting for threads to finish.~%" spins)) (assert (eql exit-sentinel (message-queue-pop queue))) (assert (null (message-queue-head queue))) (assert (null (message-queue-tail queue))) (map nil #'sb-thread:join-thread threads) (format t "~&Joined threads.~%") (format t "~&Work distribution: ~A~%" (mapcar (lambda (v) (count 1 v)) result-vectors)) (loop for i from 0 below num-ints do (assert (= 1 (reduce #'+ result-vectors :key (lambda (v) (elt v i)))))) (format t "~&Results look good.~%"))) (defun timed-call (function) (let ((start (get-internal-run-time)) (results (multiple-value-list (funcall function)))) (values (float (/ (- (get-internal-run-time) start) internal-time-units-per-second)) results))) (defun queue-benchmark (&optional (size 10000000)) (let* ((queue (make-message-queue)) (enqueue-speed (time (/ size (timed-call (lambda () (dotimes (i size) (message-queue-add queue i))))))) (dequeue-speed (time (/ size (timed-call (lambda () (dotimes (i size) (message-queue-pop queue))))))) (push-speed (time (/ size (timed-call (lambda () (dotimes (i size) (message-queue-push queue i)))))))) (format t "~&Enqueue speed: ~A/sec~%" (round enqueue-speed)) (format t "~& Push speed: ~A/sec~%" (round push-speed)) (format t "~&Dequeue speed: ~A/sec~%" (round dequeue-speed)) (values enqueue-speed push-speed dequeue-speed))) (defun cons-foo () (let ((queue (make-message-queue))) (time (dotimes (i 10000000) (sb-thread:with-mutex ((message-queue-mutex queue)) 'do-nothing))))) ;;; vector-push-extend thread safety experiment #+NIL (dotimes (i 100) (let ((vector (make-array 1 :initial-element 666 :adjustable t :fill-pointer t)) (running t) (num-tests 0)) (sb-thread:make-thread (lambda () (loop while running do (incf num-tests) (assert (= 666 (elt vector (1- (fill-pointer vector)))))))) (sb-thread:join-thread (sb-thread:make-thread (lambda () (dotimes (i 1000000) (vector-push-extend 666 vector)) (setf running nil)))) num-tests))