Jobsched:   Running jobs with multiple Racket instances
1 Example
2 Simple server /   worker
start-simple-worker
start-simple-server
3 Server
make-server
server-start
server-close
4 Job
job
5 Server
scheduler?
worker?
make-scheduler
scheduler-add-job!
scheduler-n-queued-jobs
scheduler-n-active-jobs
scheduler-start
processor-count
6 Worker
start-worker
7 Utilities
make-racket-cmd
this-file
8 Comparison with other Racket parallelism mechanisms
8.16.0.1

Jobsched: Running jobs with multiple Racket instances🔗ℹ

Laurent Orseau

 (require jobsched) package: levintreesearch_cm

Jobsched is a job scheduler with a server-worker architecture on a single machine, where the server dispatches a number of jobs to the workers.

All workers are identical and are able to process any job assigned by the server. The workers are independent racket programs, each with its own Racket instance. See Comparison with other Racket parallelism mechanisms for more information.

The workflow is as follows:
  1. The server (a Racket program) is started.

  2. N jobs are added to the job queue in the server. A job is specified as read-able data.

  3. The server starts K workers (independent Racket programs).

  4. Each worker signals on their output ports to the server that they are ready to receive jobs, and listens to events on its input port.

  5. As soon as a worker is ready, the server sends it one of the remaining jobs.

  6. When a worker has finished a job, it sends the result to the server, and waits for another job.

  7. When the server receives the result of a job, it processes this result and sends a new job (if any is remaining) to the worker.

  8. When all N jobs are finished, the server finishes.

If your machine has K cores and sufficient memory, jobsched can run K workers in parallel and benefit from a speedup of a factor K. Jobsched has been successfully used with more than 120 workers in parallel, with more than 100 000 fast-paced jobs to dispatch between them.

1 Example🔗ℹ

Here is a simple example of a server-worker architecture. The first file contains the definition of the server:

"adder-server.rkt"

#lang racket
(require jobsched
         racket/runtime-path)
 
(define-runtime-path worker-file "adder-worker.rkt")
 
;; List of job data.
(define data-list
  (for/list ([num1 (in-range 10)] [num2 (in-range 10)])
    (list num1 num2)))
 
;; What to do once a result is received.
(define (process-result data result)
  (match data
    [(list num1 num2)
     (printf "~a + ~a = ~a\n" num1 num2 result)]))
 
;; Start the server and wait for the results.
(start-simple-server #:worker-file worker-file
                     #:data-list data-list
                     #:process-result process-result)
And the second file contains the definition of the worker:

"adder-worker.rkt"

#lang racket
(define (run-worker data)
  (match data
    [(list num1 num2) (+ num1 num2)]
    [else 0]))
 
(module+ worker
  (require jobsched)
  (start-simple-worker run-worker))

All definitions exported by the various modules below are also exported by jobsched.

2 Simple server / worker🔗ℹ

procedure

(start-simple-worker run [#:silent? silent?])  void?

  run : (-> readable? readable?)
  silent? : any/c = #f
Like start-worker except that run accepts the data of the job rather than the job. This masks the job object.

Note that start-simple-worker can be used with start-simple-server, server-start and scheduler-start.

IMPORTANT: See the remarks for start-worker.

procedure

(start-simple-server #:worker-file worker-file    
  #:data-list data-list    
  #:process-result process-result    
  [#:submod-name submod-name    
  #:n-workers n-workers])  void?
  worker-file : path-string?
  data-list : (listof readable?)
  process-result : (procedure-arity-includes/c 2)
  submod-name : symbol? = 'worker
  n-workers : integer?
   = (min (length data-list) (processor-count))
Creates and starts a server, like scheduler-start, but hiding the scheduler and the job objects. The worker is assumed to be in the racket file worker-file in the submodule submod-name. See start-simple-worker.

The workers will receive one element of data-list at a time, and return a result to be processed by process-result.

The server starts n-workers workers in separate OS processes. Refer to scheduler-start for close-workers?.

Note: By contrast to scheduler-start, the simple server does not allow to add more tasks while it is running.

start-simple-server is essentially a combination of make-server, server-start and server-close.

Try the following example with:
racket -l- jobsched/examples/server-worker-simple

"server-worker-simple.rkt"

#lang racket
(require jobsched)
 
;=== Worker ===;
 
(define (worker-run data)
  (match data
    [(list x y)
     (* x y)]))
 
(module+ worker
  (start-simple-worker worker-run))
 
;=== Server ===;
 
(module+ main
 
  (define (process-result data result)
    (printf "~a × ~a = ~a\n" (first data) (second data) result))
 
  (define data-list
    (for*/list ([x 5] [y 5]) (list x y)))
 
  (start-simple-server
   #:worker-file (this-file)
   #:data-list data-list
   #:process-result process-result))
 

3 Server🔗ℹ

procedure

(make-server #:worker-file worker-file    
  [#:submod-name submod-name    
  #:n-workers n-workers])  scheduler?
  worker-file : path-string?
  submod-name : symbol? = 'worker
  n-workers : (or/c #f integer?) = #f
Returns a new scheduler. See start-simple-server. If n-workers is not #f, then the workers are started asynchronously.

procedure

(server-start sched    
  #:data-list data-list    
  #:process-result process-result    
  [#:n-workers n-workers    
  #:close-workers? close-workers?])  void?
  sched : scheduler?
  data-list : (listof readable?)
  process-result : (procedure-arity-includes/c 2)
  n-workers : integer?
   = (min (length data-list) (processor-count))
  close-workers? : any/c = #false
Starts an existing scheduler (likely made with make-server). See start-simple-server. If close-workers? is not #f, then the workers are not closed when all jobs are done, so that they can be re-used for subsequent calls of server-start.

procedure

(server-close sched)  void?

  sched : scheduler?
If all workers of sched to exit gracefully, and blocks until all workers have exited.

4 Job🔗ℹ

The bindings in this section are also exported by jobsched.

struct

(struct job (index cost data start-ms stop-ms)
    #:transparent)
  index : nonnegative-integer?
  cost : number?
  data : readable?
  start-ms : nonnegative-integer?
  stop-ms : nonnegative-integer?
This structure is usually used only for accessing information by a worker or by the server.

The cost field is used by the priority queue of the server.

The data field contains readable? (in the sense of racket/fasl) information that is sent to the worker for processing.

The fields start-ms and stop-ms are set automatically by the server when a job is sent to a worker and when the result is received.

5 Server🔗ℹ

The bindings in this section are also exported by jobsched.

procedure

(scheduler? v)  boolean?

  v : any/c

procedure

(worker? v)  boolean?

  v : any/c

procedure

(make-scheduler make-worker-command)  scheduler?

  make-worker-command : (-> nonnegative-integer? list?)
Returns a scheduler which will use make-worker-command to start the workers’ Racket processes.

See also make-racket-cmd.

procedure

(scheduler-add-job! sched    
  #:data data    
  [#:cost cost])  void?
  sched : scheduler?
  data : readable?
  cost : number? = 0
Adds a job to the scheduler’s queue. A job can be added either before starting the scheduler, or during, using the #:before-start and #:after-stop arguments of scheduler-start.

The data will be sent to the worker, who will receive it on its input port and will be accessible via job-data.

The cost is used for ordering the job in the priority queue, which is ordered by minimum cost.

Returns the number of jobs pending in the queue.

Returns the number of jobs that have started and not yet finished.

procedure

(scheduler-start sched    
  [n-workers    
  #:before-start before-start    
  #:after-stop after-stop    
  #:close-workers? close-workers?])  void?
  sched : scheduler?
  n-workers : (or/c #f exact-nonnegative-integer?) = #f
  before-start : (-> scheduler? job? any) = void
  after-stop : (-> scheduler? job? readable? any) = void
  close-workers? : any/c = #t
Starts a scheduler, making sure that n-workers racket worker instances are running on the same machine.

If close-workers? is not #f, then all worker instances are closed when returning from the function call, and the workers normally terminate gracefully. If close-workers? is #f, workers are not closed when returning from the call, so they can be re-used for a subsequent call to scheduler-start without starting the workers (and the corresponding racket instances) again. If n-workers is #f, the number of running worker instances is not changed, and previous running instances are re-used. If there are already more running worker instances than n-workers, some workers are killed to match n-workers.

The callback before-start is called before a job is sent to a worker. The callback after-stop is called when a job is finished and the result is received from the worker. Both callbacks can be used to add new jobs to the queue, using scheduler-add-job!.

See an example of using scheduler-start in examples/server-worker.

procedure

(processor-count)  nonnegative-integer?

Re-exported from racket/future.

6 Worker🔗ℹ

The bindings in this section are also exported by jobsched.

procedure

(start-worker run-job [#:silent? silent?])  void?

  run-job : (-> job? readable?)
  silent? : any/c = #f
Starts a worker which waits for jobs. Each time a job is received, the run-job procedure is called. The data of the job can be retrieved with (job-data job). If silent? is not #f, all output of run-job to its output port is suppressed—the error port remains untouched.

See example at the top.

IMPORTANT: Jobsched uses the input/output ports of the programs for communication. Any output before start-worker is called is processed by the server, who is waiting for a ready signal from the worker. It is advised to avoid any output before start-worker.

Also note that run-job must return a readable? value, and that (void) is not readable.

7 Utilities🔗ℹ

The bindings in this section are also exported by jobsched.

procedure

(make-racket-cmd path-to-prog 
  [#:submod submod 
  #:errortrace? errortrace?] 
  args ...) 
  (listof path-string?)
  path-to-prog : path-string?
  submod : (or/c symbol? #f) = #f
  errortrace? : any/c = #f
  args : path-string?
Creates a command line to call the racket program path-to-prog. If submod is specificied, the corresponding submodule is called instead. (For example I like to use a worker submodule.) By default, the main submodule is used if available, or the main function if available. The additional command-line arguments args are passed to the program, which may choose to parse them. Note that path? arguments are turned automatically into strings by Racket’s primitives.

syntax

(this-file)

’Returns’ the path-string of the enclosing file, or #f if there is no enclosing file.

8 Comparison with other Racket parallelism mechanisms🔗ℹ

Futures can make use of many cores and can share memory, but are limited in the kind of operations they can handle without blocking. Futures are well suited for numerical applications where garbage collection can be greatly reduced, allowing to make the most of the CPUs without requiring multiple Racket instances.

Places allow more free-form racket computation and can make use of multiple cores and can share memory, but are still constrained by a single garbage collector, which prevents from making full use of all the cores.

Distributed Places create separate Racket instances like jobsched but can also spawn workers on remote machines. When all workers are on the same machine, jobsched may be simpler to use.