jackdaw.test.transports.kafka
build-record
(build-record m)
Builds a Kafka Producer and assoc it onto the message map
close-consumer
(close-consumer consumer)
consumer
(consumer kafka-config topic-metadata deserializers)
Creates an asynchronous Kafka Consumer of all topics defined in the
supplied `topic-metadata`
Puts all messages on the channel in the returned response. It is the
responsibility of the caller to arrange for the read the channel to
be read by some other process.
Must be closed with `close-consumer` when no longer required
deliver-ack
(deliver-ack ack)
Deliver the `ack` promise with the result of attempting to write to kafka. The
default command-handler waits for this before on to the next command so the
test response may indicate the success/failure of each write command.
load-assignments
(load-assignments consumer)
mk-consumer-record
(mk-consumer-record consumer-record)
Clojurize the ConsumerRecord returned from consuming a kafka record
mk-producer-record
(mk-producer-record {:keys [topic-name]} value)
(mk-producer-record {:keys [topic-name]} key value)
(mk-producer-record {:keys [topic-name]} partition key value)
(mk-producer-record {:keys [topic-name]} partition timestamp key value)
Creates a kafka ProducerRecord for use with `send!`.
poller
(poller messages)
Returns a function that takes a consumer and puts any messages retrieved
by polling it onto the supplied `messages` channel
producer
(producer kafka-config topic-config serializers)
Creates an asynchronous kafka producer to be used by a test-machine for for
injecting test messages
seek-to-end
(seek-to-end consumer & topic-partitions)
Seeks to the end of all the partitions assigned to the given consumer
and returns the updated consumer
subscribe
(subscribe consumer topic-config)
Subscribes to specified topics
`consumer` should be a kafka consumer
`topic-config` should be a sequence of topic-metadata maps
subscription
(subscription kafka-config topic-collection)
Subscribes to `topic-collection` and seeks to the end of all partitions. This
is usually what you want in a testing context. It's best for the test you're
trying to run now to ignore all the garbage created by previous tests.