Event Streaming

Event Streaming is a feature that notifies a client application of database changes as they occur. A subscribing application can use streaming to track changes without running ongoing polling queries. With streaming, client applications subscribe to changes on a target set of documents in the database identified by an FQL query.

Some use cases where event streaming is recommended:

  • Maintain "live" application state: Client apps or browsers are made aware immediately as data changes. Users can track live e-commerce inventory, auction status, and productivity workflow state.

  • Brokered communications, documents, and collections can be used as message channels between multiple parties. A chat application is a good example of such a case.

  • Publish-subscribe (pub-sub) use cases, including updating external services and data stores, and asynchronous work queuing. An application client can integrate with a message queue, inserting event notifications from Fauna into a Kafka topic, consumed by several downstream consumers, or data stores, such as a full-text search engine.

Supported drivers

Event Streaming is supported in the latest beta releases of the JavaScript and Python drivers.

Create a stream

Define a stream

To create a stream for a set of documents, append the toStream() method to a query that returns the set.

For example, an e-commerce application manages a catalog of products in a collection named Product. The definition of this collection, which is used in this guide, is as follows:

collection Product {
    index byType {
        terms [ .type ]
        values [ .name, .price ]
    }
}

A business requirement of the application may be to take some administrative action when a change occurs in the Product collection. In other words, we want to get a notification when a document is added, removed, or changed in the Product collection. Fauna streaming is based on FQL. So, creating a stream that captures these notifications is simply a matter of initializing the stream with a query that captures the target set.

Product.all().toStream()

Notice that the base query Product.all() is a full collection scan, which when issued as a normal query returns all documents in a collection. That same query may be used to define the set of documents to be subscribed to. To create a stream, append the toStream() function to the query that defines the set to be streamed. The toStream() function returns a stream token, which is used to start the stream subscription.

Filter

Some use cases only require notifications for changes to a specific subset of documents in a collection. Because Fauna streaming is based on FQL queries, it is easy to filter, server-side, for that set of desired documents. Targeting for your desired set of documents with a stream query reduces data on the wire, and eliminates the need to filter our undesired event in your client application.

Consider the example requirement of our e-commerce application. It’s necessary to receive notifications on products of the category book that are less than 100 in price. The following stream query satisfies that requirement:

Product.where( .type == 'book' && .price < 100).toStream()

The stream query specifies the exact set of documents that will be subscribed to. Event notifications will be sent on the stream only when a change occurs in the set of documents defined by the stream query, such as when a document is added, deleted, or changed. For example, each of the following database operations would trigger a notification on the stream defined above.

Product.create( {
  name: "Music as History",
  type: "book",
  price: 16.89,
  quantity: 20
})
let p = Product.byType('book')
 .where(.name == "Music as History" ).first()!
p.update({ quantity: p.quantity + 10 })
Product.byType('book')
 .where(.name == "Music as History" ).first()!
 .update({ price: 110 })

Notice that the last example write operation, which increased the price of the book titled Music as History to 110, still triggers a notification event on the stream. This is because the book was changed in a way that left the set of documents the stream subscribed to. Although a notification was sent when the document left the set, any changes that occur to that document will no longer trigger an event notification because it no longer meets the criteria of the stream query.

Track changes on specific fields

In addition to specifying what set of documents that a stream is subscribed to, users may also define the set of fields in a document that will trigger a notification. Consider the case where the application needs to receive a notification whenever the price of a product in a target category changes. An example scenario is that a user is browsing for products of type electronics. The UI of the web application needs to be updated when the price of a product the user is browsing changes. The following stream query serves the requirements.

Product.byType('electronics').changesOn(.price)

With this stream query, the subscribing client will only receive notifications whenever the price of a Product of type electronics changes.

Stream changes on a single document

In some use cases, streaming on a single document is appropriate. For instance, consider that a user has placed a given product in their shopping cart. During the time the product is in the cart, the quantity may drop as other users purchase the same product. A requirement of the application is the user be notified that the quantity of a product is changing while it is in their cart. The following query would meet this requirement but is inefficient.

Product.where(.id == 390896421523423744).toStream()

The above stream is highly inefficient as it watches for changes in the entire Product collection but only produces events for one of them. Users can optimize this use case by creating a singleton set of a single product.

Set.single(Product.byId("390896421523423744")).toStream()

Optimize stream performance with indexes

Just as we strongly recommend the use of indexed queries for production workloads, we also recommend the use of indexes to increase the performance of stream queries. While an unindexed streaming query is allowed, your stream queries will be more performant and cost-effective when an index is used to serve them.

Notice that many of the preceding stream query examples used the byType index in our Product collection definition at the top of this page. This index uses the type field as the index term, and the price field as the index value. This index has been designed to suit the query patterns we’re streaming on.

Therefore, the stream is defined as:

Product.where( .type == 'book' && .price < 100).toStream()

It would be far more efficient in cost and time if it were modified, as follows, to use the byType index:

Product.byType('book').where( .price < 100 ).toStream()

There is a trade-off between streams on a collection versus streams on an index: index streams only produce events when fields in their configured terms or values change. So, while the use of an index increases the efficiency of a stream query, users need to design their indexes such that they fit the change events they need to be subscribed to. For example, given the stream query above, the following write operation would not trigger a notification.

let p = Product.byType('book')
 .where(.name == "Music as History" ).first()!

p.update({ quantity: p.quantity - 1 })

In this example, even though the book Music as History meets the query criteria of the indexed stream query, none of the fields covered by the index changed as a result of the write operation. Yes, the quantity field did change, but that is not a field that is covered by the index.

The need to have fields covered by the index extends to the use of changesOn() as well. Any field users wish to trigger a notification on in an index stream query must be covered by the index. The following example query would not be accepted by Fauna as a legal stream request, because the quantity field is neither a term nor a value of the byType index.

Product.byType('book').where(.price < 100 ).changesOn(.quantity)

Projecting only the data you need into the stream

By default, add and update events project the entire document that was modified, even when an index was used. In cases where it’s unnecessary to stream the entire document, users may add a projection to the stream query to suit their needs. Consider this example stream query, which projects only the name and price fields.

Product.byType('book').where( .price < 100 ).toStream(){name, price}

Only the document’s name and price fields will be returned in events on this stream.

{
  type: 'add',
  data: {
    name: 'The Fraud',
    price: 80
  },
  txn_ts: 1710456655930000,
  stats: {
    ...
  }
}

Consume streams using Fauna’s client drivers

Applications consume streams with the use of a Fauna client driver. Use the Client.stream() function to initialize the stream. Take the following example, where our earlier stream query is used to subscribe to the stream.

import { Client, fql } from "fauna";

const client = new Client();

const booksQuery = fql`Product.where(.type == 'book' && .price < 100).toStream()`;
const stream = client.stream(booksQuery);

try {
  for await (const event of stream) {
    switch (event.type) {
      case "add":
      case "update":
        // Do something on update
        console.log(event.data);
        break;
      case "remove":
        // Do something on remove
        console.log(event.data);
        break;
    }
  }
} catch (error) {
  console.log(error);
}

Event types

Write events that occur in the database will generate a set of notification types in the stream, based on what type of write occurred. You can see in the preceding JavaScript example that the various streaming events are handled with a switch statement. The set of event types are:

  • Add: Occurs whenever a new document has been created that matches the stream query’s criteria. Or, whenever a new document has been updated that matches the stream query’s criteria. Essentially, add events occur whenever a document enters the set defined by the stream query.

  • Remove: Occurs when a document leaves the set defined by the stream query. This can happen when a document that meets the stream query’s criteria is deleted. This can also happen when a document is updated to no longer meet the stream query’s criteria.

  • Update: Occurs when a field in a document has been updated that meets the stream query criteria.

  • Status: Occurs periodically, and serves to update the subscribing client, similar to a keep-alive, during periods when events are either not occurring on the database, or when events are filtered out of the stream server-side.

Your application code will need to handle the occurrence of any of these events.

Event shapes

Events sent through the stream include a field named type, which identifies the type of event that has occurred. This will be a string with a value of status, add, remove, or update, as described above. The data field will include the entire document by default. In the case that there is a projection in the stream query, then the data field will include only the fields defined in the projection.

For example, for the stream query Product.byType('book').where(.price < 100).toStream(), creating the following document.

Product.createData({
  type: 'book',
  name: 'The Fraud',
  quantity: "55",
  price: 23.0
})

Will generate the following notification:

{
  type: 'add',
  data: Document {
    coll: Module { name: 'Product' },
    id: '392372948586987553',
    ts: TimeStub { isoString: '2024-03-14T22:20:53.520Z' },
    type: 'book',
    name: 'The Fraud',
    quantity: '55',
    price: 23
  },
  txn_ts: 1710454853520000,
  stats: {
    read_ops: 1,
    storage_bytes_read: 109,
    compute_ops: 1,
    processing_time_ms: 1,
    rate_limits_hit: []
  }
}

Had the streaming query had the following projection of the name and price fields; Product.byType('book').where(.price < 100).toStream(){ name, price }, the update event would have had the following shape:

{
  type: 'add',
  data: { name: 'The Fraud', price: 23 },
  txn_ts: 1710454853520000,
  stats: {
    read_ops: 1,
    storage_bytes_read: 109,
    compute_ops: 1,
    processing_time_ms: 0,
    rate_limits_hit: []
  }
}

Stream snapshots

In some use cases, it’s necessary to first load a set of documents into the application that you then wish to keep updated with changes occurring in the database. Take the use case of presenting a leader board of a multiplayer gaming application as an example. As each player logs on to our multiplayer game, the leader board should be loaded into each player’s UI and kept up to date with player rankings and positions as each player’s score changes.

To serve such use cases, Fauna streaming allows you to both query for a set of documents, and open a stream on that same set of documents, all in the same operation. Consider the following code snippet, which builds on our earlier examples. The following code queries for the set of products of type book that are less than 100 in price. It starts an event stream on that same set.

import { Client, fql } from "fauna";

const client = new Client();

const booksQuery = fql`
  let products = Product.where( .type == 'book' && .price < 100 )
  {
    products: products,
    streamToken: products.toStream()
  }`;
const response = await client.query(booksQuery);
const { products, streamToken } = response.data;

// Paginate through all matching products
for await (const product of client.paginate(products)) {
  console.log(product);
}

const stream = client.stream(streamToken);
try {
  for await (const event of stream) {
    switch (event.type) {
      case "add":
      case "update":
        // Do something on update
        console.log(event.data);
        break;
      case "remove":
        // Do something on remove
        console.log(event.data);
        break;
    }
  }
} catch (error) {
  console.log(error);
}

Notice in the FQL query string, that the set of Product documents returned by the query is assigned to the variable products. That set is returned to the client using a projection to the products field of the object returned in the query operation. The toStream() function is called on the same products set, and that stream is projected into the streamToken field of the query result object. The application code then iterates on the initial seed set of documents returned by the query and then awaits for new events on that set.

Is this article helpful? 

Tell Fauna how the article can be improved:
Visit Fauna's forums or email docs@fauna.com

Thank you for your feedback!