ZeroMQ: Distributed Messaging
This library provides bindings to the ZeroMQ (or “0MQ”, or “ZMQ”) distributed messaging library.
(require zeromq) | package: zeromq-r-lib |
This package is distributed under the GNU Lesser General Public License (LGPL). As a client of this library you must also comply with the libzmq license.
1 ZeroMQ Examples
This section contains examples of using this library adapted from the 0MQ Guide.
1.1 Hello World in ZeroMQ
This example is adapted from Ask and Ye Shall Receive, which illustrates REP-REQ communication.
Here is the “hello world” server:
> (define responder-thread (thread (lambda () (define responder (zmq-socket 'rep)) (zmq-bind responder "tcp://*:5555") (let loop () (define msg (zmq-recv-string responder)) (printf "Server received: ~s\n" msg) (zmq-send responder "World") (loop)))))
(define responder (zmq-socket 'rep #:bind "tcp://*:5555"))
Here is the “hello world” client:
> (define requester (zmq-socket 'req #:connect "tcp://localhost:5555"))
> (for ([request-number (in-range 3)]) (zmq-send requester "Hello") (define response (zmq-recv-string requester)) (printf "Client received ~s (#~s)\n" response request-number))
Server received: "Hello"
Client received "World" (#0)
Server received: "Hello"
Client received "World" (#1)
Server received: "Hello"
Client received "World" (#2)
> (zmq-close requester)
1.2 Weather Reporting in ZeroMQ
This example is adapted from Getting the Message Out, which illustrates PUB-SUB communication.
Here’s the weather update server:
> (define (zip->string zip) (~r zip #:precision 5 #:pad-string "0")) > (define (random-zip) (random 100000))
> (define publisher-thread (thread (lambda () (define publisher (zmq-socket 'pub #:bind "tcp://*:5556")) (let loop () (define zip (zip->string (random-zip))) (define temp (- (random 215) 80)) (define rhumid (+ (random 50) 10)) (zmq-send publisher (format "~a ~a ~a" zip temp rhumid)) (loop)))))
Here is the weather client:
> (define subscriber (zmq-socket 'sub #:connect "tcp://localhost:5556")) > (define myzip (zip->string (random-zip))) > (printf "Subscribing to ZIP code ~a only\n" myzip) Subscribing to ZIP code 10001 only
> (zmq-subscribe subscriber myzip)
> (define total-temp (for/sum ([update-number (in-range 10)]) (define msg (zmq-recv-string subscriber)) (define temp (let ([in (open-input-string msg)]) (read in) (read in))) (printf "Client got temperature update #~s: ~s\n" update-number temp) temp))
Client got temperature update #0: 40
Client got temperature update #1: 53
Client got temperature update #2: 130
Client got temperature update #3: 22
Client got temperature update #4: -77
Client got temperature update #5: -4
Client got temperature update #6: 85
Client got temperature update #7: -2
Client got temperature update #8: 100
Client got temperature update #9: 96
> (printf "Average temperature for ZIP code ~s was ~s\n" myzip (~r (/ total-temp 10))) Average temperature for ZIP code "10001" was "44.3"
> (zmq-close subscriber)
1.3 Divide and Conquer in ZeroMQ
This example is adapted from Divide and Conquer, which illustrates PUSH-PULL communication.
Here’s the ventilator:
; Task ventilator ; Binds PUSH socket to tcp://localhost:5557 ; Sends batch of tasks to workers via that socket
> (define (ventilator go-sema) (define sender (zmq-socket 'push #:bind "tcp://*:5557")) (define sink (zmq-socket 'push #:connect "tcp://localhost:5558")) (semaphore-wait go-sema) (zmq-send sink "0") ; message 0 signals start of batch (define total-msec (for/fold ([total 0]) ([task-number (in-range 100)]) (define workload (add1 (random 100))) (zmq-send sender (format "~s" workload)) (+ total workload))) (printf "Total expected cost: ~s msec\n" total-msec) (zmq-close sender) (zmq-close sink))
Here are the workers:
; Task worker ; Connects PULL socket to tcp://localhost:5557 ; Collects workloads from ventilator via that socket ; Connects PUSH socket to tcp://localhost:5558 ; Sends results to sink via that socket
> (define (worker) (define receiver (zmq-socket 'pull #:connect "tcp://localhost:5557")) (define sender (zmq-socket 'push #:connect "tcp://localhost:5558")) (let loop () (define s (zmq-recv-string receiver)) (sleep (/ (read (open-input-string s)) 1000)) ; do the work (zmq-send sender "") (loop)))
Here is the sink:
; Task sink ; Binds PULL socket to tcp://localhost:5558 ; Collects results from workers via that socket
> (define (sink) (define receiver (zmq-socket 'pull #:bind "tcp://*:5558")) (void (zmq-recv receiver)) ; start of batch (time (for ([task-number (in-range 100)]) (void (zmq-recv receiver)))) (zmq-close receiver))
Now we create a sink thread, a ventilator thread, and 10 worker threads. We give them a little time to connect to each other, then we start the task ventilator and wait for the sink to collect the results.
> (let () (define go-sema (make-semaphore 0)) (define sink-thread (thread sink)) (define ventilator-thread (thread (lambda () (ventilator go-sema)))) (define worker-threads (for/list ([i 10]) (thread worker))) ; Give the threads some time to connect... (begin (sleep 1) (semaphore-post go-sema)) (void (sync sink-thread)))
Total expected cost: 5291 msec
cpu time: 292 real time: 623 gc time: 0
Note that to achieve the desired parallel speedup here, it’s important
to give all of the worker threads time to connect their receiver
sockets—
2 ZeroMQ API
procedure
Added in version 1.1 of package zeromq-r-lib.
procedure
(zmq-version) →
(or/c (list/c exact-nonnegative-integer? exact-nonnegative-integer? exact-nonnegative-integer?) #f)
Added in version 1.1 of package zeromq-r-lib.
2.1 Managing ZeroMQ Sockets
procedure
(zmq-socket type [ #:identity identity #:bind bind-endpoints #:connect connect-endpoints #:subscribe subscriptions]) → zmq-socket?
type :
(or/c 'pair 'pub 'sub 'req 'rep 'dealer 'router 'pull 'push 'xpub 'xsub 'stream) identity : (or/c bytes? #f) = #f bind-endpoints : (or/c string? (listof string?)) = null connect-endpoints : (or/c string? (listof string?)) = null
subscriptions : (or/c bytes? string? (listof (or/c bytes? string?))) = null
See the zmq_socket documentation for brief descriptions of the different types of sockets, and see the 0MQ Guide for more detailed explanations.
A ZeroMQ socket acts as a synchronizable event (evt?) that is ready when zmq-recv-message would receive a message without blocking; the synchronization result is the received message (zmq-message?). If the socket is closed, it is never ready for synchronization; use zmq-closed-evt to detect closed sockets.
Unlike libzmq, zmq-socket creates sockets with a short default “linger” period (ZMQ_LINGER), to avoid blocking the Racket VM when the underlying context is shut down. The linger period can be changed with zmq-set-option.
procedure
(zmq-socket? v) → boolean?
v : any/c
procedure
s : zmq-socket?
procedure
(zmq-closed? s) → boolean?
s : zmq-socket?
procedure
(zmq-closed-evt s) → evt?
s : zmq-socket?
procedure
(zmq-list-endpoints s mode) → (listof string?)
s : zmq-socket? mode : (or/c 'bind 'connect)
procedure
(zmq-get-option s option) → (or/c exact-integer? bytes?)
s : zmq-socket? option : symbol?
procedure
(zmq-set-option s option value) → void?
s : zmq-socket? option : symbol? value : (or/c exact-integer? bytes?)
procedure
(zmq-list-options mode) → (listof symbol?)
mode : (or/c 'get 'set)
procedure
(zmq-connect s endpoint ...) → void?
s : zmq-socket? endpoint : string?
procedure
s : zmq-socket? endpoint : string?
See the transport documentation pages (tcp, pgm, ipc, inproc, vmci, udp) for more information about transports and their endpoint notations.
If endpoint refers to a filesystem path or network address, access is checked against (current-security-guard). This library cannot parse and check all endpoint formats supported by libzmq; if endpoint is not in a supported format, an exception is raised with the message “invalid endpoint or unsupported endpoint format.” The parsing and access control check can be skipped by using zmq-unsafe-connect or zmq-unsafe-bind instead.
procedure
(zmq-disconnect s endpoint ...) → void?
s : zmq-socket? endpoint : string?
procedure
(zmq-unbind s endpoint ...) → void?
s : zmq-socket? endpoint : string?
Note that in some cases endpoint must be more specific than the argument to zmq-bind or zmq-connect. For example, see the section labeled “Unbinding wild-card address from a socket” in zmq_tcp.
procedure
(zmq-subscribe s topic ...) → void?
s : zmq-socket? topic : (or/c bytes? string?)
procedure
(zmq-unsubscribe s topic ...) → void?
s : zmq-socket? topic : (or/c bytes? string?)
A topic matches a message if topic is a prefix of the message. The empty topic accepts all messages.
2.2 Sending and Receiving ZeroMQ Messages
A ZeroMQ message consists of one or more frames (represented by byte strings). The procedures in this library support sending and receiving only complete messages (as opposed to the frame-at-a-time operations in the libzmq C library).
procedure
(zmq-message? v) → boolean?
v : any/c
procedure & match pattern
(zmq-message frames) → zmq-message?
frames : (or/c bytes? string? (listof (or/c bytes? string?)))
When used a match pattern, the frames subpattern is always matched against a list of bytestrings.
> (define msg (zmq-message "hello world")) > (match msg [(zmq-message frames) frames]) '(#"hello world")
In libzmq version 4.3.2, the draft (unstable) API has additional operations on messages to support the draft socket types; for example, a message used with a CLIENT or SERVER socket has a routing-id field. Support will be added to zmq-message when the corresponding draft APIs become stable.
Added in version 1.1 of package zeromq-r-lib.
procedure
(zmq-send-message s msg) → void?
s : zmq-socket msg : zmq-message?
Added in version 1.1 of package zeromq-r-lib.
procedure
(zmq-recv-message s) → zmq-message?
s : zmq-socket?
Added in version 1.1 of package zeromq-r-lib.
procedure
s : zmq-socket? msg : (non-empty-listof (or/c bytes? string?))
procedure
s : zmq-socket?
procedure
(zmq-recv-string s) → string?
s : zmq-socket?
If a multi-frame message is received from s, an error is raised. (The message is still consumed.)
procedure
s : zmq-socket?
procedure
(zmq-proxy sock1 sock2 [ #:capture capture #:other-evt other-evt]) → any sock1 : zmq-socket? sock2 : zmq-socket? capture : (-> zmq-socket? zmq-message? any) = void other-evt : evt? = never-evt
This procedure returns only when the proxy is finished, either because
one of the sockets is closed—
Added in version 1.1 of package zeromq-r-lib.
3 ZeroMQ Unsafe Functions
The functions provided by this module are unsafe.
(require zeromq/unsafe) | package: zeromq-r-lib |
procedure
(zmq-unsafe-connect s endpoint ...) → void?
s : zmq-socket? endpoint : string?
procedure
(zmq-unsafe-bind s endpoint ...) → void?
s : zmq-socket? endpoint : string?
These functions are unsafe, not in the sense that misuse is likely to cause memory corruption, but in the sense that they do not respect the current security guard.
procedure
(zmq-unsafe-get-ctx) →
cpointer? any/c
The context is automatically destroyed when there are no references to it visible to the Racket GC; there is a reference in every socket created with it. In addition, the keepalive value holds a reference to the context, so as long as keepalive is visible to the Racket GC, the context will not be destroyed. (Currently keepalive is the same as the-ctx, but future versions of this library may implement context finalization differently.)
Added in version 1.2 of package zeromq-r-lib.
4 ZeroMQ Requirements
This library requires the libzmq foreign library to be installed in either the operating system’s default library search path or in Racket’s extended library search path (see get-lib-search-dirs).
On Linux, libzmq.so.5 is required. On Debian-based systems, it is available from the libzmq5 package. On RedHat-based systems, it is available from the zeromq package.
With Homebrew: Run brew install zeromq. The library will be installed in /usr/local/lib, which is in the operating system’s default search path.
With MacPorts: Install the zmq port. The library will be installed in /opt/local/lib, which is not in the operating system’s default search path. Manually copy or link the library into one of the directories returned by (get-lib-search-dirs).
On Windows, libzmq.dll is required. It is automatically installed via the zeromq-win32-{i386,x86_64} package.