Subscriptions
Subscriptions are long-lasting GraphQL read operations that can update their result whenever a particular server-side event occurs. Subscriptions are most commonly used to pushed updated results from the server to subscribing clients.
Resolving a subscription
Resolvers for Subscription differ from resolvers for fields of other types. Specifically, Subscription resolvers require to return an implementation of EventStream
.
struct Resolver {
// Example using an async stream generator
func hello(_: Context, _: NoArguments) async -> EventStream<String> {
let stream = AsyncStream { con in
for word in ["Hello", "Bonjour", "Ciao"] {
con.yield(word)
}
con.finish()
}
return stream.toEventStream()
}
// Example using pubsub
func hello(_: Context, _: NoArguments) async -> EventStream<String> {
pubsub.asyncStream(String.self, for: "HELLO_MESSAGE").toEventStrem()
}
}
For the subscription to work under Pioneer the resolver function must return an EventStream
of type:
- AsyncEventStream (opens in a new tab)
Can be from any
AsyncSequence
, a standard protocol in Swift 5.5 for asynchronous, sequential, iterated elements - ConcurrentEventStream (opens in a new tab)
Must be from a
AsyncThrowingStream
AsyncEventStream
Swift 5.5 brought in a reactive stream like feature in the form of a protocol named AsyncSequence
.
Pioneer provide an implementation of EventStream
named AsyncEventStream that takes a generic AsyncSequence
. This mean you can create an event stream using this class from any AsyncSequence
.
let eventStream: EventStream<Int> = AsyncEventStream(from: MyIntAsyncSequence())
Extensions for AsyncSequence
Converting can be done as well with using the extended method for all AsyncSequence
. This method also allow the AsyncEventStream to emit an initial and/or an ending value.
let eventStream = AsyncStream<Int>(...)
.toEventStream()
// Initial value before any stream values
let eventStream1 = AsyncStream<Int>(...)
.toEventStream(initialValue: 0)
// End value after stream finishes (excluding termination and value is lazily loaded; hence the function there)
let eventStream2 = AsyncStream<Int>(...)
.toEventStream(initialValue: 0, endValue: { 10 })
Termination callback
By default, AsyncEventStream will cancel the task consuming the provided AsyncSequence
when converting to an AsyncStream
of a different type. For something like AsyncStream
, this cancellation will trigger its termination callback so resources can be deallocated and prevent memory leaks of any kind.
However, a custom AsyncSequence
might have a different trigger and approach in termination. Hence, .toEventStream
alllow an explicit termination callback when converting to EventStream
.
In the termination callback, you are provided with an enum that specify the two cases where termination can occur.
let sequence = MyAsyncSequence()
let eventStream = sequence.toEventStream(
onTermination: { termination in
if case .cancelled = termination {
sequence.cancel()
}
}
)
Cases where stream is no longer consumed / stopped and termination will require to be triggered:
- Stream ended itself
- Client send a explicit stop request to end the subscription (might be before stream ended)
- Client disconnect and implicitly stop any running subscription
Termination callback can be implicitly inferred built-in AsyncSequence
and ones created by a PubSub.
AsyncPubSub
Pioneer provide an in memory publish-subscribe (Pub/Sub) model named, AsyncPubSub (opens in a new tab), to concurrent safely track events and update all active subscribers.
AsyncPubSub (opens in a new tab) conforms to PubSub (opens in a new tab) which enables your server code to both publish events to a particular topic/trigger/string and listen for events associated with a particular topic.
import struct Pioneer.AsyncPubSub
let pubsub = AsyncPubSub()
Publishing an event
You can publish an event using the .publish (opens in a new tab) method:
await pubsub.publish(
for: "POST_CREATED",
payload: Post(by: "Jeff Bezos", content: "How much money do I have?")
)
- The first parameter is the trigger of the event you're publishing to, as a string.
- You don't need to register a trigger name before publishing to it.
- The second parameter is the payload associated with the event.
As an example, let's say our GraphQL API supports a createPost
mutation. A basic resolver for that might look this:
struct Resolver {
func createPost(ctx: Context, args: CreatePostArgs) async throws -> Post {
let post = Post(args: args)
try await post.create(on: ctx.req.db)
return post
}
}
Schema
The example schema in Graphiti (opens in a new tab):
Mutation {
Field("createPost", at: Resolver.createPost) {
Arguments("author", at: \.author)
Arguments("content", at: \.content)
}
}
The example schema in GraphQL SDL:
type Mutation {
createPost(author: String, content: String): Post!
}
After we successfully persist the new post into the database, we can publish it to the pubsub as an event.
struct Resolver {
func createPost(ctx: Context, args: CreatePostArgs) async throws -> Post {
let post = Post(args: args)
try await post.create(on: ctx.req.db)
await pubsub.publish(for: "POST_CREATED", post)
return post
}
}
Next, we can listen for this event in our Subscription
resolver.
Listening for events
An AsyncStream
asynchronously iterate over events, and if that stream comes from a PubSub (opens in a new tab), it will be associated with a particular trigger and will receive the events published under that trigger.
You can create an AsyncStream
by calling the .asyncStream (opens in a new tab) method and passing in a the event trigger that this stream should listen for and the type.
pubsub.asyncStream(Post.self, for: "POST_CREATED");
Which would looke this in a subscription resolver:
struct Resolver {
func postCreated(ctx: Context, _: NoArguments) async -> EventStream<Post> {
ctx.pubsub.asyncStream(Post.self, for: "POST_CREATED").toEventStrem()
}
}
Custom Pub/Sub
As mentioned before, AsyncPubSub (opens in a new tab) is an in memory pub-sub implementation that is limited to a single server instance, which may become an issue on production environments where there are multiple distributed server instances.
In which case, you likely want to either use or implement a custom pub-sub system that is backed by an external datastore.
PubSub as protocol
Pub/Sub implementation conform to this protocol is enforced to have the same API to AsyncPubSub (opens in a new tab), which make easy to switch between.
However, it is not necessary to use PubSub for your subscription resolver and to build a custom Pub/Sub implementation.
Pioneer exported the PubSub (opens in a new tab) protocol which allow different implementation with the same API AsyncPubSub (opens in a new tab) notably implementation backed by popular event-publishing systems (i.e. Redis) with similar API which allow user of this library to prototype with the in memory AsyncPubSub and easily migrate to a distributed PubSub implementation without very little changes.
The basic rules to implement A PubSub (opens in a new tab) are as follow:
Conformance
The method .asyncStream (opens in a new tab) should return an AsyncStream
for a single subscriber where it can be unsubscribed without closing the topic entirely.
- The type of
DataType
should conform toSendable
andDecodabble
to help make sure it is safe to pass around and be able to decoded if necessary (since it is likely to come from a network call). - Recommended to create a new
AsyncStream
on each method call. - If you are having trouble with broadcasting a publisher to multiple consumer/subscriber, recommended taking a look at Broadcast.
The method .publish (opens in a new tab) should publish events to all subscriber that associated with the trigger.
- The
DataType
conform toSendable
andEncodable
to help make sure it is safe to pass around and be able to encoded if necessary (since it is likely to be send on a network call).
The method .close (opens in a new tab) should dispose and shutdown all subscriber that associated with the trigger.
The implementation should be free of data races and be working safely under asynchronous scopes.
If you are having trouble with data-race safe state management, recommended use Swift's Actor (opens in a new tab).
Broadcast
Additionally, common client libraries for popular event-publishing systems usually only provide a function that to subscribe to a specific publisher, but
- No option of unsubscribing without closing the publisher entirely
- Only allow 1 subscriber for each publisher / channel
- Usually because subscription is its own new network connection and multiple of those can be resource intensive.
In this case, the actor, Broadcast (opens in a new tab), is provided which can broadcast any events from a publisher to multiple different downstream where each downstream share the same upstream and can be unsubscribed / disposed (to prevent leaks) without closing the upstream and publisher.
Broadcast (opens in a new tab) provide the methods:
- .downstream (opens in a new tab) to create a new subscriber stream that will receive events broadcasted
- .publish (opens in a new tab) to broadcast the events to all subscriber
- .close (opens in a new tab) to close the broadcast and shutdown all subscriber
Essentially, it will be applied on an event publisher to create multiple downstream(s) (opens in a new tab) and handle distribution of events, where:
-
Different consumer can subscribe to the same upstream and all of them get the same messages
Usually to prevent making multiple subscription might be resource intensive
-
Downstream(s) (opens in a new tab) can be disposed, stopped, or cancelled individually to prevent leaks
Disposed by cancelling
Task
used to consume it -
Closing any downstream(s) (opens in a new tab) does not close other downstream(s) (opens in a new tab), broadcast, and the upstream
Other downstream(s) (opens in a new tab) will continue receiving broadcasted events
-
Closing broadcast dispose all downstream(s) (opens in a new tab), but not necessarily the upstream
Example
let broadcast = Broadcast<Event>()
receiveSubscriptionFromExternalPublisher(
...,
onMessage: { msg async in
let event = convertToEvent(msg)
await broadcast.publish(event)
},
onFinished: { reason async in
await broadcast.close()
}
)
// All of these downstream are getting all messages from the upstream
let downstream0 = await broadcast.downstream().stream
let downstream1 = await broadcast.downstream().stream
let downstream2 = await broadcast.downstream().stream
let downstream3 = await broadcast.downstream().stream
sendToExternalPublisher(..., msg: SomeMessage())
// Dispose a downstream without effecting the others
let task3 = Task {
for await msg in downstream3 {
// ...
}
}
task3.cancel()
// Shutdown all downstreams
closeExternalPublisher(...)
await broadcast.close()
Redis Example
As an example, say we want to build a redis backed PubSub (opens in a new tab).
A package, PioneerRedisPubSub (opens in a new tab), provide a Redis implemention of PubSub where it has been optimised and tested.
Here we create an example implementation of PubSub using Redis, that utilize Redis channel for Pub/Sub. We also make use of Broadcast to not open multiple connection and use the 1 redis subscription connection for all GraphQL subscription of the same topic.
import Foundation
import NIOFoundationCompat
import class Pioneer.Broadcast
import struct Redis.RedisChannelName
import protocol Pioneer.PubSub
import protocol Redis.RedisClient
struct RedisPubSub: PubSub {
// MARK: - Actor for distribution
actor Dispatcher {
private let redis: RedisClient
private var broadcasting: [String: Broadcast<Data>] = [:]
init(_ redis: RedisClient) {
self.redis = redis
}
/// Get a downstream from the broadcast for the channel given
func downstream(to channel: String) async throws -> AsyncStream<Data> {
let broadcast = try await subscribe(to: channel)
let downstream = await broadcast.downstream()
return downstream.stream
}
/// Get the broadcast for the channel if exist, otherwise make a new one
private func subscribe(to channel: String) async throws -> Broadcast<Data> {
if let broadcast = broadcasting[channel] {
return broadcast
}
let broadcast = Broadcast<Data>()
broadcasting[channel] = broadcast
try await apply(from: .init(channel), to: broadcast)
return broadcast
}
/// Apply broadcasting to the Redis channel subscription
private func apply(from channel: RedisChannelName, to broadcast: Broadcast<Data>) async throws {
do {
try await redis.subscribe(
to: channel,
messageReceiver: { _, msg in
guard case .bulkString(.some(let buffer)) = msg else { return }
let data = Data(buffer: buffer)
Task {
await broadcast.publish(data)
}
},
onUnsubscribe: { _, _ in
Task {
await broadcast.close()
}
}
)
.get()
} catch {
await broadcast.close()
throw error
}
}
/// Pubblish the data (which is RESPValueConvertible) to the specific redis channel
func publish(for channel: String, _ value: Data) async throws {
let _ = try await redis.publish(value, to: .init(channel)).get()
}
/// Close the redis channel subscription and all of the downstreams
func close(for channel: String) async throws {
try await redis.unsubscribe(from: .init(channel)).get()
await broadcasting[channel]?.close()
}
}
// MARK: -- Protocol required methods
public func asyncStream<DataType: Sendable & Decodable>(_ type: DataType.Type = DataType.self, for trigger: String) -> AsyncThrowingStream<DataType, Error> {
AsyncThrowingStream<DataType, Error> { con in
let task = Task {
do {
let stream = try await dispatcher.downstream(to: trigger)
for await data in stream {
do {
let event = try JSONDecoder().decode(DataType.self, data)
con.yield(event)
} catch {
con.finish(throwing: error)
}
}
con.finish()
} catch {
con.finish(throwing: error)
}
}
con.onTermination = { @Sendable _ in
task.cancel()
}
}
}
public func publish<DataType: Sendable & Encodable>(for trigger: String, payload: DataType) async throws {
let data = try JSONEncoder().encode(payload)
try await dispatcher.publish(for: trigger, data)
}
public func close(for trigger: String) async throws {
try await dispatcher.close(for: trigger)
}
// MARK: - Properties
private let dispatcher: Dispatcher
public init(_ redis: RedisClient) {
self.dispatcher = .init(redis)
}
}
Now we can have the Resolver to have a property pubsub
of type PubSub (opens in a new tab) instead of AsyncPubSub (opens in a new tab), while still being able to use AsyncPubSub (opens in a new tab) during development.
struct Message: Sendable, Codable { ... }
struct Resolver {
let pubsub: PubSub = app.environment.isRelease ? RedisPubSub(app.redis) : AsyncPubSub()
func create(ctx: Context, _: NoArguments) async throws -> Message {
let message = ...
try await pubsub.publish(message)
return message
}
func onCreate(ctx: Context, _: NoArguments) async -> EventStream<Message> {
pubsub.asyncStream(Message.self, for: "message-create").toEventStream()
}
}
So now, if we can use the RedisPubSub
on a production environment.