majordomo2
(require majordomo2) | package: majordomo2 |
1 Introduction
majordomo2 is a task manager. It obsoletes the original majordomo package.
Major features include:
Restart tasks that fail
Carry state across restarts
Automatic parallelization on request
Pre-process results
Sort results after pre-processing
Post-process results
2 Demonstration
The provided functions and data types are defined in the API section, but here are examples of practical usage.
2.1 Simple Example
> (define (build-email-text msg names) (for/list ([name names]) (format "Dear ~a: ~a" name msg))) > (define names '("fred" "barney" "betty" "wilma" "bam-bam")) > (define jarvis (start-majordomo))
> (define result-channel (add-task jarvis build-email-text "hi there" names)) > (channel? result-channel) #t
> (define result (sync result-channel)) > (task? result) #t
> (pretty-print (task.data result))
'("Dear fred: hi there"
"Dear barney: hi there"
"Dear betty: hi there"
"Dear wilma: hi there"
"Dear bam-bam: hi there")
; stop-majordomo shuts down all tasks that were added to that instance. This ; shuts down the instance custodian which will shut down the custodian for all ; the tasks. > (stop-majordomo jarvis)
2.2 Sorting Task Results
> (define jarvis (start-majordomo))
> (pretty-print ; Do the same thing as we did in the prior section, but get the results back ; sorted. (task.data (sync (add-task jarvis build-email-text "hi there" names #:sort-op string<?))))
'("Dear bam-bam: hi there"
"Dear barney: hi there"
"Dear betty: hi there"
"Dear fred: hi there"
"Dear wilma: hi there")
> (define (mock-network-function . ip-addrs) ; In real life, this would (e.g.) connect to the specified IP address, ; send a message, and return whether or not it succeeded. For purposes of ; this demonstration we'll have it return an arbitrary status. ; ; Note that tasks include a unique ID that is being returned. This is ; autogenerated if not provided. (for/list ([addr ip-addrs] [status (in-cycle '(success failure timeout))]) (list (task.id (current-task)) addr status)))
> (pretty-print ; This shows the baseline return value from mock-network-function before we do ; any sorting. (task.data (sync (add-task jarvis mock-network-function "4.4.4.4" "8.8.8.8" "172.67.188.90" "104.21.48.235"))))
'((task-13479 "4.4.4.4" success)
(task-13479 "8.8.8.8" failure)
(task-13479 "172.67.188.90" timeout)
(task-13479 "104.21.48.235" success))
> (pretty-print ; Now we'll sort the results based on their status. NB: sort-cache-keys? is ; unnecessary here since the key is cheap to calculate, but it's included for ; sake of demonstration. (task.data (sync (add-task jarvis mock-network-function "4.4.4.4" "8.8.8.8" "172.67.188.90" "104.21.48.235" #:sort-op symbol<? #:sort-key last #:sort-cache-keys? #t))))
'((task-13480 "8.8.8.8" failure)
(task-13480 "4.4.4.4" success)
(task-13480 "104.21.48.235" success)
(task-13480 "172.67.188.90" timeout))
> (stop-majordomo jarvis)
2.3 Pre- and Post-Processing
> (define jarvis (start-majordomo))
> (pretty-print ; This is a contrived and overly fancy example, but it demonstrates the ; functionality. We'll generate the strings, append some text to the end after ; they've been generated, sort the strings by their length, and then put it all ; into titlecase. (task.data (sync (add-task jarvis build-email-text "hi there" names #:sort-op < #:sort-key string-length #:pre (curry map (curryr string-append ", my friend.")) #:post (curry map string-titlecase)))))
'("Dear Fred: Hi There, My Friend."
"Dear Betty: Hi There, My Friend."
"Dear Wilma: Hi There, My Friend."
"Dear Barney: Hi There, My Friend."
"Dear Bam-Bam: Hi There, My Friend.")
> (stop-majordomo jarvis)
2.4 Filtering Task Results
If you’re going to use pre/post processing and/or sorting, it’s a good idea to ensure that all elements in your data set are appropriate. To make this easy you can filter the results before preprocessing, sorting, and postprocessing happen in order to get rid of bad results.
> (define jarvis (start-majordomo))
> (define (simple) (success '(1 2 3 4 5 6 #f 'error))) ; Filtering the results happens before pre-processing
> (define result (sync (add-task jarvis simple #:filter (and/c number? even?) #:pre (λ (lst) (if (memf odd? lst) (raise 'failed) lst)) #:sort-op >))) > (task.data result) '(6 4 2)
> (stop-majordomo jarvis)
2.5 Restarting
When a task fails, either because it threw an error or simply timed out, it will be restarted if there are retries left. By default there will be 3 retries (meaning a total of 4 attempts to run the task), but you can use the #:retries keyword to specify how many you want. The argument must be a natural number or +inf.0.
> (define jarvis (start-majordomo))
> (define (failing-func) (displayln "inside failing-func") (raise-arguments-error 'failing-func "I don't feel so good"))
> (pretty-print ; If a function raises an error then the result will contain the value raised. ; By default it will restart 3 times, but in this example we don't want it to ; restart at all so we will specify 0 retries. (format "Data after failure was: ~a" (task.data (sync (add-task jarvis failing-func #:retries 0)))))
inside failing-func
"Data after failure was: #(struct:exn:fail:contract failing-func: I don't feel so good #<continuation-mark-set>)"
> (define (func-times-out . args) ; Sometimes an action will time out without explicitly throwing an error. If ; so then if it has retries left it will be restarted again with all of its ; original arguments. We can use the 'data' value in (current-task) to carry ; state across the restart. (define state (task.data (current-task))) (match state [(? hash-empty?) (displayln "Initial start. Hello!") (update-data (hash-set* state 'remaining-args args 'results '())) (displayln "Intentional timeout to demonstrate restart.") (sync never-evt)] [(hash-table ('remaining-args (list current remaining ...))) (displayln (format "Marking ~a as processed..." current)) (update-data (hash-set* state 'remaining-args remaining 'results (cons current (hash-ref state 'results '())))) (displayln "Intentional timeout to demonstrate restart.") (sync never-evt)]) (displayln "Goodbye."))
> (let ([result (sync (add-task jarvis func-times-out 1 2 3 4 5 6 #:keepalive 0.1 ; max time to complete/keepalive #:post (λ (h) (hash-set h 'results (reverse (hash-ref h 'results '()))))))]) ; Be sure to use struct* in your match pattern instead of struct. The ; task struct contains private fields that are not documented ; or provided. Use the struct-plus-plus reflection API if you ; really need to dig out the full field list. (match-define (struct* task ([status status] [data data])) result) (displayln (format "Final status was: ~v" status)) (displayln (format "Final data was: ~v" data)))
Initial start. Hello!
Intentional timeout to demonstrate restart.
Marking 1 as processed...
Intentional timeout to demonstrate restart.
Marking 2 as processed...
Intentional timeout to demonstrate restart.
Marking 3 as processed...
Intentional timeout to demonstrate restart.
Final status was: 'timeout
Final data was: '#hash((remaining-args . (4 5 6)) (results . (1 2 3)))
> (define (long-running-task) ; If a task is going to take a long time, it can periodically send a keepalive ; so that the manager knows not to restart it. You can use update-data, ; success, failure, or keepalive for that. (for ([i 10]) (sleep 0.1) (keepalive)) (success 'finished))
> (let ([result (sync (add-task jarvis long-running-task #:retries 0 #:keepalive 0.25))]) ; In this case we are saying that the task has failed if the manager doesn't ; hear from it after 0.25 seconds. (displayln (format "Final status of long-running task was: ~v" (task.status result))) (displayln (format "Final data of long-running task was: ~v" (task.data result))))
Final status of long-running task was: 'success
Final data of long-running task was: 'finished
> (let ([result (sync (add-task jarvis long-running-task #:retries 0 #:keepalive 0.05))]) ; Here we let the task timeout. (displayln (format "Final status of long-running task was: ~v" (task.status result))) (displayln (format "Final data of long-running task was: ~v" (task.data result))))
Final status of long-running task was: 'timeout
Final data of long-running task was: '#hash()
> (stop-majordomo jarvis)
2.6 Parallelization
> (define jarvis (start-majordomo))
> (pretty-print ; Run the task in series to start with in order to show the baseline data. (We ; already did this in a previous section but it's useful to repeat it here.) ; Note that the task ID is the same throughout. (task.data (sync (add-task jarvis mock-network-function "4.4.4.4" "8.8.8.8" "172.67.188.90" "104.21.48.235"))))
'((task-13523 "4.4.4.4" success)
(task-13523 "8.8.8.8" failure)
(task-13523 "172.67.188.90" timeout)
(task-13523 "104.21.48.235" success))
> (pretty-print ; Parallelize the task such that each argument in the list is farmed out to a ; separate sub task and the results are compiled back together. NB: Because of ; how mock-network-function is defined, this results in an extra layer of ; listing, as we're getting multiple lists each containing the results of ; processing a single argument instead of one list containing the results of ; processing each argument in turn. See below for how to handle this. Note ; that the task ID is different for each entry. (task.data (sync (add-task jarvis mock-network-function #:parallel? #t "4.4.4.4" "8.8.8.8" "172.67.188.90" "104.21.48.235")))) '(#<task> #<task> #<task> #<task>)
> (pretty-print ; Same as above, but we'll append the sublists together in order to get back to ; the original results. Note that once again the task ID is different for each ; entry. (task.data (sync (add-task jarvis mock-network-function #:parallel? #t #:post (lambda (tasks) (apply append (map task.data tasks))) "4.4.4.4" "8.8.8.8" "172.67.188.90" "104.21.48.235"))))
'((task-13529 "4.4.4.4" success)
(task-13530 "8.8.8.8" success)
(task-13531 "172.67.188.90" success)
(task-13532 "104.21.48.235" success))
> (stop-majordomo jarvis)
2.7 Unwrapping
Often you’ll have a function that wants individual arguments but you have the arguments as a list, because you generated them using a map or got them from a database using query-rows or etc. You can save yourself an unnecessary apply by using the #:unwrap? argument.
> (define jarvis (start-majordomo)) > (define args (for/list ([i 5]) i)) ; This fails because we need to pass 0 1 2 3 4 separately instead of as a list
> (pretty-print (task.data (sync (add-task jarvis + args))))
(exn:fail:contract
"+: contract violation\n expected: number?\n given: '(0 1 2 3 4)"
#<continuation-mark-set>)
; This works because we are telling the task to unwrap the list into separate args > (task.data (sync (add-task jarvis + args #:unwrap? #t))) 10
2.8 Flattening
Sometimes, usually when running in parallel, you’ll have a task where the data field ends up containing one or more tasks, each of which might also be nested, and what you actually want is the data from those subtasks. You could dig that out on the far end or you could have the system do it for you.
> (define jarvis (start-majordomo))
> (define (make-task x) (task++ #:data x))
> (let* ([jarvis (start-majordomo)] [without-flatten (sync (add-task jarvis make-task 0 1 2 #:parallel? #t))] [nested (sync (add-task jarvis make-task (for/list ([i 3]) (make-task i)) #:unwrap? #t #:parallel? #t))] [with-flatten (sync (add-task jarvis make-task 0 1 2 (make-task (make-task 3)) #:parallel? #t #:flatten-nested-tasks? #t))]) (printf "data from without-flatten: ~v\n" (task.data without-flatten)) (printf "data from nested tasks in without-flatten: ~v\n" (map task.data (task.data without-flatten))) (printf "data via explicit call to flatten-nested-tasks: ~v\n" (task.data (flatten-nested-tasks without-flatten))) ; ; Data from nested tasks gets lifted regardless of nesting depth (printf "data via #:flatten-nested-tasks?: ~v\n" (task.data with-flatten)) (stop-majordomo jarvis))
data from without-flatten: '(#<task> #<task> #<task>)
data from nested tasks in without-flatten: '(#<task> #<task> #<task>)
data via explicit call to flatten-nested-tasks: '(0 1 2)
data via #:flatten-nested-tasks?: '(0 1 2 3)
Note that flattening loses the id and status fields of the subtasks. The status of the returned task will be 'mixed iff the subtasks had differing status codes and at least one of them had status of either 'success or 'mixed.
2.9 Convenience Functions
Some operations are common enough that it’s worth having a short form to avoid boilerplate.
; A task that simply returns a specified value. Generates its own majordomo > (task.data (sync (task-return-value 'hello))) 'hello
; Simplify adding task, syncing for result, and fetching data > (get-task-data (start-majordomo) add1 7) 8
3 API
procedure
(start-majordomo [#:max-workers max-workers]) → majordomo?
max-workers : (or/c +inf.0 exact-positive-integer?) = +inf.0 (stop-majordomo) → void?
procedure
(majordomo.id instance) → any/c
instance : majordomo (majordomo-id instance) → any/c instance : majordomo
procedure
(majordomo? val) → boolean?
val : any/c
procedure
(task-status/c val) → boolean?
val : (or/c 'success 'failure 'unspecified 'timeout 'mixed)
task constructor
id : symbol? = (gensym "task-") status : task-status/c = 'unspecified data : any/c = (hash)
procedure
(task.status the-task) → symbol?
the-task : task? (task-status the-task) → symbol? the-task : task?
procedure
procedure
(is-success? the-task) → boolean?
the-task : task? (is-not-success? the-task) → boolean? the-task : task? (is-failure? the-task) → boolean? the-task : task? (is-timeout? the-task) → boolean? the-task : task? (is-unspecified-status? the-task) → boolean? the-task : task?
procedure
(set-task-status the-task val) → task?
the-task : task? val : task-status/c
parameter
(current-task) → task?
(current-task t) → void? t : task?
= #f
procedure
arg : any/c = the-unsupplied-arg
Set the status field of current-task to 'success. Tell the manager that the worker has completed. This will cause the manager to send the value of current-task to the customer.
procedure
arg : any/c = the-unsupplied-arg
Set the status field of current-task to 'failure. Tell the manager that the worker has completed. This will cause the manager to send the value of current-task to the customer.
procedure
(update-data val) → void?
val : any/c
3.1 Task Data
The current-task parameter holds a task structure that can be used to carry state across restarts, such as which arguments have already been processed. (Since the task is started with the same arguments every time.) The data field is also useful for returning a value from the action. See Task Results for details.
There are three functions that an action can call to manipulate the contents of the current-task struct:
(update-data val) will both update the data field of the current-task and send a keepalive to the manager indicating that the worker is still running and does not need to be restarted.
(success) or (success arg) will set the status field of the current-task to 'success and tell the manager that the worker has successfully completed. If an argument was provided then the data field will be updated to that value.
(failure) or (failure arg) will set the status field of the current-task to 'failure and tell the manager that the worker has completed but failed. If an argument was provided then the data field will be updated to that value.
If none of these functions is ever called then the data field will be set as follows:
If the action completes without error, (success val) will be called, where val is the return value of the action.
If the action raises an error, (failure e) will be called, where e is the raised value.
If the action uses up all its retries and then times out, the data field is left undisturbed and the status is set to 'timeout.
3.2 Task Results
add-task returns a channel. When the task finishes, the content of the current-task parameter is placed onto the channel. The customer may retrieve the struct (via sync, channel-get, etc) and examine the status and data fields in order to determine how the task completed and what the final result was.
4 Running tasks
Tasks are created inside, and managed by, a majordomo instance.
procedure
(add-task jarvis action arg ... [ #:keepalive keepalive-time #:retries retries #:parallel? parallel? #:unwrap? unwrap? #:flatten-nested-tasks? flatten-nested-tasks? #:filter filter-func #:pre pre #:sort-op sort-op #:sort-key sort-key #:sort-cache-keys? cache-keys? #:post post]) → channel? jarvis : majordomo? action : (unconstrained-domain-> any/c) arg : any/c keepalive-time : (and/c real? (not/c negative?)) = 5 retries : (or/c natural-number/c +inf.0) = 3 parallel? : boolean? = #f unwrap? : boolean? = #f flatten-nested-tasks? : boolean? = #f filter-func : (or/c #f procedure?) = #f pre : procedure? = identity sort-op : (or/c #f (-> any/c any/c any/c)) = #f sort-key : (-> any/c any/c) = identity cache-keys? : boolean? = #f post : procedure? = identity
Two threads are created, a worker and a manager. The worker does (apply action args). (See Parallel Processing for an exception.) The manager does a sync/timeout keepalive-time on the worker. If the worker times out then the worker thread is killed and a new thread is started using the same arguments as the original and the most recent value of current-task, thereby allowing state to be carried over from one attempt to the next. The keepalive timer resets whenever the worker does any of the following:
Terminate (either naturally or by raising an error)
Call keepalive
Call update-data
Call success
Call failure
Arguments are as follows:
keepalive-time The duration within which the worker must either terminate or notify the manager that it’s still running.
retries The number of times that the manager should restart the worker before giving up. (Note: This counts retries, not maximum attempts. The maximum number of times your action will be started is retries + 1, with the +1 being the initial attempt.)
parallel? Whether the action should be run in parallel. See Parallel Processing.
unwrap? Whether the arguments should be used as-is or one layer of listing should be removed. Saves you the trouble of currying in an apply. See Unwrapping.
flatten-nested-tasks? If your action procedure produces a task that contains tasks you can have the system automatically lift the subtasks out and combine their data into the top-level task so that you don’t have to dig it out on the far end. You will lose the id and status fields of the subtasks if you do this. See Flattening.
filter Filter the data after flattening (if any) and before passing it to the preprocessing function. See Filtering Task Results.
pre Pre-processes the results of the action. The default preprocessor is identity. This happens after flattening and before sorting.
sort-op, sort-key, sort-cache-keys? Whether and how to sort the results of the action. They are passed as the (respectively) less-than?, #:key extract-key and #:cache-keys? cache-keys? arguments to a sort call. Sorting happens after preprocessing and before postprocessing.
post Postprocesses the results of the action immediately before returning them. The default is identity.
Obviously, if the results of your function are not a list, either leave sort-op as #f so that it doesn’t try to sort and fail, or else use #:pre list to make it a list before sorting is applied.
Consolidating previous information, the pipeline goes:
flatten > filter > preprocess > sort > postprocess
Each step in the pipeline is optional.
procedure
(task-return-value val) → channel?
val : any/c
(add-task (start-majordomo) identity val)
procedure
(get-task-data jarvis action arg ... [ #:keepalive keepalive-time #:retries retries #:parallel? parallel? #:unwrap? unwrap? #:flatten-nested-tasks? flatten-nested-tasks? #:filter filter-func #:pre pre #:sort-op sort-op #:sort-key sort-key #:sort-cache-keys? cache-keys? #:post post]) → channel? jarvis : majordomo? action : (unconstrained-domain-> any/c) arg : any/c keepalive-time : (and/c real? (not/c negative?)) = 5 retries : (or/c natural-number/c +inf.0) = 3 parallel? : boolean? = #f unwrap? : boolean? = #f flatten-nested-tasks? : boolean? = #f filter-func : (or/c #f procedure?) = #f pre : procedure? = identity sort-op : (or/c #f (-> any/c any/c any/c)) = #f sort-key : (-> any/c any/c) = identity cache-keys? : boolean? = #f post : procedure? = identity
(task.data (sync (add-task jarvis action args keyword-args)))
The arguments and keywords all have the same meaning as add-task.
5 Parallel Processing
As shown in the Parallelization demonstration, tasks can be parallelized simply by passing #:parallel? #t. In this case add-task will call itself recursively, creating subtasks and passing each of them one of the arguments in turn. The main task will then wait for the subtasks to complete, aggregate the subtasks into a list, and treat it as normal by running it through some or all of flattening, preprocessing, sorting, and postprocessing. The subtasks will be run with the same #:keepalive and #:retries values as the main tasks but everything else will be the default, meaning that all preprocessing and sorting will happen in the main task.
Caveats:
The items in the main task’s data field will be task structs, not the data values in those task structs.
Obviously, your subtasks are running in separate threads. Their current-task will not be the same as the one in the original task.
6 Notes
The task structure is defined via the struct-plus-plus module, giving it a keyword constructor, dotted accessors, reflection data, etc. As stated above, not all accessors are exported, so if you need to use match on a struct, use struct* instead of struct.