fs2-queues

Publishing Data

Publishing data is made using a QueuePublisher. You can acquire one through a QueueClient by using the publish() method. A QueuePublisher is associated to a specific queue, which is provided when creating the publisher.

The publisher also requires a data serializer upon creation for the type of data you want to publish to it.

import cats.effect.IO

import com.commercetools.queue.{QueueClient, QueuePublisher}

def client: QueueClient[IO] = ???

// returns a publisher for the queue named `my-queue`,
// which can publish messages of type String
def publisher: QueuePublisher[IO, String] =
  client.publish[String]("my-queue")

Pipe a stream through the publisher sink

The QueuePublisher abstraction provides a sink() pipe, through which you can make your publishing source stream go. The pipe takes a parameter allowing for batching publications.

import fs2.{Pipe, Stream}

val input: Stream[IO, (String, Map[String, String])] = ???

// messages are published in batch of 10
val publicationSink: Pipe[IO, (String, Map[String, String]), Nothing] = publisher.sink(batchSize = 10)

// pipe the message producing stream through the publication sink
input.through(publicationSink)

Several Streams can safely publish to the same sink concurrently, so you can reuse the publicationSink variable.

Explicit publish

If you are integrating the library with an existing code base that performs explicit publications to the queue, you can access the QueuePusher lower level API, which exposes ways to publish a single message or a single batch. This abstraction comes in handy when the messages you produce do not come from a Stream, otherwise you should prefer the sink() pipe presented above.

A QueuePusher is accessed as a Resource as it usually implies using a connection pool. When the resource is released, the pools will be disposed properly.

publisher.pusher.use { queuePusher =>
  val produceMessages: IO[List[(String, Map[String, String])]] = ???

  // produce a batch
  produceMessages
    .flatMap { messages =>
      // push the batch
      queuePusher.push(messages, None)
    }
    // repeat forever
    .foreverM
}

Make sure that IOs publishing to the queuePusher do not outlive the use scope, otherwise you will be using a closed resource after the use block returns. If you need to spawn background fibers using the queuePusher, you can for instance use a Supervisor whose lifetime is nested within the queuePusher one.

import cats.effect.std.Supervisor

publisher.pusher.use { queuePusher =>
  val produceMessages: IO[List[(String, Map[String, String])]] = ???

  // create a supervisor that waits for supervised spawn fibers
  // to finish before being released
  Supervisor[IO](await = true).use { supervisor =>
    // produce a batch
    produceMessages
      .flatMap { messages =>
        // push the batch in the background
        supervisor.supervise(queuePusher.push(messages, None)).void
      }
  }
}