fs2-queues

Receiving Data

Receiving data is achieved through a QueueSubscriber. You can acquire one throuh a QueueClient by using the subscribe() method. A QueueSubscriber is associated with a specific queue, which is provided when creating the subscriber.

The subscriber also requires a data deserializer upon creation, to deserialize the message payload received from the queue.

import cats.effect.IO

import com.commercetools.queue.{QueueClient, QueueSubscriber}

def client: QueueClient[IO] = ???

// returns a subscriber for the queue named `my-queue`,
// which can receive messages of type String
def subscriber: QueueSubscriber[IO, String] =
  client.subscribe[String]("my-queue")

When a message is received by a subscriber, it is locked (or leased) for a queue level configured amount of time (see more on the Managing Queues page). This means that only one subscriber receives (and can process) a given message in this amount of time. A message needs to be settled once processed (or if processing fails) to ensure it is either removed from the queue or made available again. This is part of the message control flow.

In the following, we explain what kind of control flow handling is provided by the library.

Processors

The QueueSubscriber abstraction provides a processWithAutoAck() method, which automatically handles the control flow part for you. You only need to provide the processing function, allowing you to focus on your business logic.

The payload is effectful as it performs data deserialization, which can fail when a message payload is malformed. You can access the raw data by using rawPayload.

The deserialized data is memoized so that subsequent accesses to it are not recomputed.

import scala.concurrent.duration._

subscriber.processWithAutoAck(batchSize = 10, waitingTime = 20.seconds) { message =>
  message.payload.flatMap { payload =>
    IO.println(s"Received $payload").as(message.messageId)
  }
}

The processing function receives a Message, which gives access to the content and some metadata of the received messages.

The result is a Stream of the processing results, emitted in the order the messages where received. Only the successfully processed messages are emitted down-stream. The stream is failed upon the first failed processing.

The processWithAutoAck method performs automatic acking/nacking for you depending on the processing outcome. It comes in handy to implement at least once delivery startegies, releasing the message to be reprocessed by another subscriber in case of error.

If you wish to implement a stream that does not fail upon error, you can use the attemptProcessWithAutoAck() methods, which emits the results of the processing as an Either[Throwable, T]. The resulting stream does not fail if some processing fails. Otherwise it has the same behavior as the stream above.

Raw message stream

If you want more fine-grained tuning of the control flow part, you can resort to the messages() stream available via the QueueSubscriber. The stream emits a MessageContext for each received message, giving access to the message content and metadata, as well as to message control methods.

Using this stream, you can implement your processing strategy.

Using this stream, you need to explicitly settle the message (either by acking or nacking it) once processed. Failing to do so, results in the message being made available to other subscribers once the lock expires.

It is up to you to ensure that all control flow paths lead to explicit settlement of the messages you received.

The recommendation is to only resort to this stream if you need to implement complex custom control flow for messages.

A simplified processWithAutoAck method described above could be implemented this way.

The real implementation is more complex to ensure that all successfully processed messages are actually emitted down-stream and is more efficient.

import cats.effect.Outcome

import scala.concurrent.duration._

subscriber
  .messages(batchSize = 10, waitingTime = 20.seconds)
  .evalMap { context =>
    context.payload.flatMap { payload =>
      IO.println(s"Received $payload")
        .as(context.messageId)
    }
    .guaranteeCase {
      case Outcome.Succeeded(_) => context.ack()
      case _ => context.nack()
    }
  }

MessageContext control flow

There are three different methods that can be used to control the message lifecycle from the subscriber point of view:

  1. MessageContext.ack() acknowledges the message, and marks it as successfully processed in the queue system. It will be permanently removed and no other subscriber will ever receive it.
  1. MessageContext.nack() marks the message as not processed, releasing the lock in the queue system. It will be viewable for other subscribers to receive and process.
  1. MessageContext.extendLock() extends the currently owned lock by the queue level configured duration. This can be called as many times as you want, as long as you still own the lock. As long as the lock is extended, the message will not be distributed to any other subscriber by the queue system.

Explicit pull

If you are integrating this library with an existing code base that performs explicit pulls from the queue, you can access the QueuePuller lower level API, which exposes ways to pull batch of messages. This abstraction comes in handy when your processing code is based on a callback approach and is not implemented as a Stream, otherwise you should prefer the streams presented above.

A QueuePuller is accessed as a Resource as it usually implies using a connection pool. When the resource is released, the pools will be disposed properly.

import cats.effect.Outcome

import scala.concurrent.duration._

import cats.syntax.foldable._

subscriber.puller.use { queuePuller =>

  queuePuller
    .pullBatch(batchSize = 10, waitingTime = 20.seconds)
    .flatMap { chunk =>
      chunk.traverse_ { context =>
        context.payload.flatMap { payload =>
          IO.println(s"Received $payload")
        }.guaranteeCase {
          case Outcome.Succeeded(_) => context.ack()
          case _ => context.nack()
        }
      }
    }

}

Make sure that IOs pulling from the queuePuller do not outlive the use scope, otherwise you will be using a closed resource after the use block returns. If you need to spawn background fibers using the queuePuller, you can for instance use a Supervisor whose lifetime is nested within the queuePuller one.

import cats.effect.Outcome
import cats.effect.std.Supervisor

import scala.concurrent.duration._

import cats.syntax.foldable._

subscriber.puller.use { queuePuller =>
  // create a supervisor that waits for supervised spawn fibers
  // to finish before being released
  Supervisor[IO](await = true).use { supervisor =>
    queuePuller
      .pullBatch(batchSize = 10, waitingTime = 20.seconds)
      .flatMap { chunk =>
        chunk.traverse_ { context =>
          supervisor.supervise {
            context.payload.flatMap { payload =>
              IO.println(s"Received $payload")
            }
            .guaranteeCase {
              case Outcome.Succeeded(_) => context.ack()
              case _ => context.nack()
            }
          }
        }
      }
  }
}