Documentation
Fundamentals
Subscriptions

Subscriptions

Subscriptions are long-lasting GraphQL read operations that can update their result whenever a particular server-side event occurs. Subscriptions are most commonly used to pushed updated results from the server to subscribing clients.

Resolving a subscription

Resolvers for Subscription differ from resolvers for fields of other types. Specifically, Subscription resolvers require to return an implementation of EventStream.

Resolver.swift
struct Resolver {
    // Example using an async stream generator
    func hello(_: Context, _: NoArguments) async -> EventStream<String> {
        let stream = AsyncStream { con in
            for word in ["Hello", "Bonjour", "Ciao"] {
                con.yield(word)
            }
            con.finish()
        }
        return stream.toEventStream()
    }
 
    // Example using pubsub
    func hello(_: Context, _: NoArguments) async -> EventStream<String> {
        pubsub.asyncStream(String.self, for: "HELLO_MESSAGE").toEventStrem()
    }
}

For the subscription to work under Pioneer the resolver function must return an EventStream of type:

AsyncEventStream

Swift 5.5 brought in a reactive stream like feature in the form of a protocol named AsyncSequence.

Pioneer provide an implementation of EventStream named AsyncEventStream that takes a generic AsyncSequence. This mean you can create an event stream using this class from any AsyncSequence.

let eventStream: EventStream<Int> = AsyncEventStream(from: MyIntAsyncSequence())

Extensions for AsyncSequence

Converting can be done as well with using the extended method for all AsyncSequence. This method also allow the AsyncEventStream to emit an initial and/or an ending value.

let eventStream = AsyncStream<Int>(...)
    .toEventStream()
 
// Initial value before any stream values
let eventStream1 = AsyncStream<Int>(...)
    .toEventStream(initialValue: 0)
 
// End value after stream finishes (excluding termination and value is lazily loaded; hence the function there)
let eventStream2 = AsyncStream<Int>(...)
    .toEventStream(initialValue: 0, endValue: { 10 })

Termination callback

By default, AsyncEventStream will cancel the task consuming the provided AsyncSequence when converting to an AsyncStream of a different type. For something like AsyncStream, this cancellation will trigger its termination callback so resources can be deallocated and prevent memory leaks of any kind.

However, a custom AsyncSequence might have a different trigger and approach in termination. Hence, .toEventStream alllow an explicit termination callback when converting to EventStream.

In the termination callback, you are provided with an enum that specify the two cases where termination can occur.

let sequence = MyAsyncSequence()
 
let eventStream = sequence.toEventStream(
    onTermination: { termination in
        if case .cancelled = termination {
            sequence.cancel()
        }
    }
)

Cases where stream is no longer consumed / stopped and termination will require to be triggered:

  1. Stream ended itself
  2. Client send a explicit stop request to end the subscription (might be before stream ended)
  3. Client disconnect and implicitly stop any running subscription
⚡️

Termination callback can be implicitly inferred built-in AsyncSequence and ones created by a PubSub.

AsyncPubSub

Pioneer provide an in memory publish-subscribe (Pub/Sub) model named, AsyncPubSub (opens in a new tab), to concurrent safely track events and update all active subscribers.

AsyncPubSub (opens in a new tab) conforms to PubSub (opens in a new tab) which enables your server code to both publish events to a particular topic/trigger/string and listen for events associated with a particular topic.

import struct Pioneer.AsyncPubSub
 
let pubsub = AsyncPubSub()

Publishing an event

You can publish an event using the .publish (opens in a new tab) method:

await pubsub.publish(
    for: "POST_CREATED", 
    payload: Post(by: "Jeff Bezos", content: "How much money do I have?")
)
  • The first parameter is the trigger of the event you're publishing to, as a string.
    • You don't need to register a trigger name before publishing to it.
  • The second parameter is the payload associated with the event.

As an example, let's say our GraphQL API supports a createPost mutation. A basic resolver for that might look this:

struct Resolver {
    func createPost(ctx: Context, args: CreatePostArgs) async throws -> Post {
        let post = Post(args: args)
        try await post.create(on: ctx.req.db)
        return post
    }
}
Schema

The example schema in Graphiti (opens in a new tab):

Mutation {
    Field("createPost", at: Resolver.createPost) {
        Arguments("author", at: \.author)
        Arguments("content", at: \.content)
    }
}

The example schema in GraphQL SDL:

type Mutation {
  createPost(author: String, content: String): Post!
}

After we successfully persist the new post into the database, we can publish it to the pubsub as an event.

struct Resolver {
    func createPost(ctx: Context, args: CreatePostArgs) async throws -> Post {
        let post = Post(args: args)
        try await post.create(on: ctx.req.db)
        await pubsub.publish(for: "POST_CREATED", post)
        return post
    }
}

Next, we can listen for this event in our Subscription resolver.

Listening for events

An AsyncStream asynchronously iterate over events, and if that stream comes from a PubSub (opens in a new tab), it will be associated with a particular trigger and will receive the events published under that trigger.

You can create an AsyncStream by calling the .asyncStream (opens in a new tab) method and passing in a the event trigger that this stream should listen for and the type.

pubsub.asyncStream(Post.self, for: "POST_CREATED");

Which would looke this in a subscription resolver:

Resolver.swift
struct Resolver {
    func postCreated(ctx: Context, _: NoArguments) async -> EventStream<Post> {
        ctx.pubsub.asyncStream(Post.self, for: "POST_CREATED").toEventStrem()
    }
}

Custom Pub/Sub

As mentioned before, AsyncPubSub (opens in a new tab) is an in memory pub-sub implementation that is limited to a single server instance, which may become an issue on production environments where there are multiple distributed server instances.

In which case, you likely want to either use or implement a custom pub-sub system that is backed by an external datastore.

PubSub as protocol

🧩

Pub/Sub implementation conform to this protocol is enforced to have the same API to AsyncPubSub (opens in a new tab), which make easy to switch between.

However, it is not necessary to use PubSub for your subscription resolver and to build a custom Pub/Sub implementation.

Pioneer exported the PubSub (opens in a new tab) protocol which allow different implementation with the same API AsyncPubSub (opens in a new tab) notably implementation backed by popular event-publishing systems (i.e. Redis) with similar API which allow user of this library to prototype with the in memory AsyncPubSub and easily migrate to a distributed PubSub implementation without very little changes.

The basic rules to implement A PubSub (opens in a new tab) are as follow:

Conformance

The method .asyncStream (opens in a new tab) should return an AsyncStream for a single subscriber where it can be unsubscribed without closing the topic entirely.

  • The type of DataType should conform to Sendable and Decodabble to help make sure it is safe to pass around and be able to decoded if necessary (since it is likely to come from a network call).
  • Recommended to create a new AsyncStream on each method call.
  • If you are having trouble with broadcasting a publisher to multiple consumer/subscriber, recommended taking a look at Broadcast.

The method .publish (opens in a new tab) should publish events to all subscriber that associated with the trigger.

  • The DataType conform to Sendable and Encodable to help make sure it is safe to pass around and be able to encoded if necessary (since it is likely to be send on a network call).

The method .close (opens in a new tab) should dispose and shutdown all subscriber that associated with the trigger.

⚠️

The implementation should be free of data races and be working safely under asynchronous scopes.

If you are having trouble with data-race safe state management, recommended use Swift's Actor (opens in a new tab).

Broadcast

Additionally, common client libraries for popular event-publishing systems usually only provide a function that to subscribe to a specific publisher, but

  • No option of unsubscribing without closing the publisher entirely
  • Only allow 1 subscriber for each publisher / channel
    • Usually because subscription is its own new network connection and multiple of those can be resource intensive.

In this case, the actor, Broadcast (opens in a new tab), is provided which can broadcast any events from a publisher to multiple different downstream where each downstream share the same upstream and can be unsubscribed / disposed (to prevent leaks) without closing the upstream and publisher.

Broadcast (opens in a new tab) provide the methods:

Essentially, it will be applied on an event publisher to create multiple downstream(s) (opens in a new tab) and handle distribution of events, where:

Example
let broadcast = Broadcast<Event>()
receiveSubscriptionFromExternalPublisher(
    ...,
    onMessage: { msg async in
        let event = convertToEvent(msg)
        await broadcast.publish(event)
    },
    onFinished: { reason async in
        await broadcast.close()
    }
)
 
// All of these downstream are getting all messages from the upstream
let downstream0 = await broadcast.downstream().stream
let downstream1 = await broadcast.downstream().stream
let downstream2 = await broadcast.downstream().stream
let downstream3 = await broadcast.downstream().stream
 
sendToExternalPublisher(..., msg: SomeMessage())
 
// Dispose a downstream without effecting the others
let task3 = Task {
    for await msg in downstream3 {
        // ...
    }
}
 
task3.cancel()
 
 
// Shutdown all downstreams
closeExternalPublisher(...)
await broadcast.close()

Redis Example

As an example, say we want to build a redis backed PubSub (opens in a new tab).

⛔️

This is only meant to be an example to give a better idea on how to implement a custom implementation that conform to PubSub and utilize Broadcast.

️🚀

A package, PioneerRedisPubSub (opens in a new tab), provide a Redis implemention of PubSub where it has been optimised and tested.

Here we create an example implementation of PubSub using Redis, that utilize Redis channel for Pub/Sub. We also make use of Broadcast to not open multiple connection and use the 1 redis subscription connection for all GraphQL subscription of the same topic.

RedisPubSub.swift
import Foundation
import NIOFoundationCompat
import class Pioneer.Broadcast
import struct Redis.RedisChannelName
import protocol Pioneer.PubSub
import protocol Redis.RedisClient
 
struct RedisPubSub: PubSub {
 
    // MARK: - Actor for distribution
    actor Dispatcher {
        private let redis: RedisClient
        private var broadcasting: [String: Broadcast<Data>] = [:]
 
        init(_ redis: RedisClient) {
            self.redis = redis
        }
 
        /// Get a downstream from the broadcast for the channel given
        func downstream(to channel: String) async throws -> AsyncStream<Data> {
            let broadcast = try await subscribe(to: channel)
            let downstream = await broadcast.downstream()
            return downstream.stream
        }
 
        /// Get the broadcast for the channel if exist, otherwise make a new one
        private func subscribe(to channel: String) async throws -> Broadcast<Data> {
            if let broadcast = broadcasting[channel] {
                return broadcast
            }
            let broadcast = Broadcast<Data>()
            broadcasting[channel] = broadcast
            try await apply(from: .init(channel), to: broadcast)
            return broadcast
        }
 
        /// Apply broadcasting to the Redis channel subscription
        private func apply(from channel: RedisChannelName, to broadcast: Broadcast<Data>) async throws {
            do {
                try await redis.subscribe(
                    to: channel,
                    messageReceiver: { _, msg in
                        guard case .bulkString(.some(let buffer)) = msg else { return }
                        let data = Data(buffer: buffer)
                        Task {
                            await broadcast.publish(data)
                        }
                    },
                    onUnsubscribe: { _, _ in
                        Task {
                            await broadcast.close()
                        }
                    }
                )
                .get()
            } catch {
              await broadcast.close()
              throw error
            }
        }
 
        /// Pubblish the data (which is RESPValueConvertible) to the specific redis channel
        func publish(for channel: String, _ value: Data) async throws {
            let _ = try await redis.publish(value, to: .init(channel)).get()
        }
 
        /// Close the redis channel subscription and all of the downstreams
        func close(for channel: String) async throws {
            try await redis.unsubscribe(from: .init(channel)).get()
            await broadcasting[channel]?.close()
        }
    }
 
    // MARK: -- Protocol required methods
 
    public func asyncStream<DataType: Sendable & Decodable>(_ type: DataType.Type = DataType.self, for trigger: String) -> AsyncThrowingStream<DataType, Error> {
        AsyncThrowingStream<DataType, Error> { con in
            let task = Task {
                do {
                    let stream = try await dispatcher.downstream(to: trigger)
                    for await data in stream {
                        do {
                            let event = try JSONDecoder().decode(DataType.self, data)
                            con.yield(event)
                        } catch {
                            con.finish(throwing: error)
                        }
                    }
                    con.finish()
                } catch {
                    con.finish(throwing: error)
                }
            }
            con.onTermination = { @Sendable _ in
                task.cancel()
            }
        }
    }
 
    public func publish<DataType: Sendable & Encodable>(for trigger: String, payload: DataType) async throws {
        let data = try JSONEncoder().encode(payload)
        try await dispatcher.publish(for: trigger, data)
    }
 
    public func close(for trigger: String) async throws {
        try await dispatcher.close(for: trigger)
    }
 
    // MARK: - Properties
 
    private let dispatcher: Dispatcher
 
    public init(_ redis: RedisClient) {
        self.dispatcher = .init(redis)
    }
}

Now we can have the Resolver to have a property pubsub of type PubSub (opens in a new tab) instead of AsyncPubSub (opens in a new tab), while still being able to use AsyncPubSub (opens in a new tab) during development.

Message.swift
struct Message: Sendable, Codable { ... }
Resolver.swift
struct Resolver {
    let pubsub: PubSub = app.environment.isRelease ? RedisPubSub(app.redis) : AsyncPubSub()
 
    func create(ctx: Context, _: NoArguments) async throws -> Message {
        let message = ...
        try await pubsub.publish(message)
        return message
    }
 
    func onCreate(ctx: Context, _: NoArguments) async -> EventStream<Message> {
        pubsub.asyncStream(Message.self, for: "message-create").toEventStream()
    }
}

So now, if we can use the RedisPubSub on a production environment.

Last updated on October 22, 2023