Track changes with Event Feeds and Event Streaming

Reference: Event Feeds and Event Streaming reference

An event stream tracks changes to a specific Set of documents or document fields, defined by an FQL query, in the database. The stream emits an event whenever a tracked change occurs.

You can consume an event stream in two ways:

  • Event Feeds : Asynchronous requests that poll the stream for paginated events.

  • Event Streaming: A real-time subscription that pushes events from the stream to your application using an open connection to Fauna.

This guide covers how to create event streams and consume them using Event Feeds or Event Streaming.

Use cases

Some use cases where tracking changes using Event Feeds or Event Streaming is recommended:

  • Change data capture (CDC): Mirroring an application’s state across multiple systems. For example, you can synchronize your Fauna database with a data warehouse for analytics. CDC can also be used for auditing purposes, enabling you to track and record every change made to your database over time.

  • Maintain "live" application state: Client apps or browsers are made aware immediately as data changes. Users can track live e-commerce inventory, auction status, and productivity workflow state.

  • Brokered communications, documents, and collections can be used as message channels between multiple parties. A chat application is a good example of such a case.

  • Publish-subscribe (pub-sub) use cases, including updating external services and data stores, and asynchronous work queuing. An application client can integrate with a message queue, inserting events from Fauna into a Kafka topic, consumed by several downstream consumers, or data stores, such as a full-text search engine.

Create a stream

Define a stream

To create a stream for a Set of documents, append the set.toStream() method to a query that returns the Set.

For example, an e-commerce application manages a catalog of products in a collection named Product. The definition of this collection, which is used in this guide, is as follows:

collection Product {
  *: Any

  index byType {
      terms [ .type ]
      values [ .name, .price ]
  }
}

A business requirement of the application may be to take some administrative action when a change occurs in the Product collection. In other words, we want to emit an event when a document is added, removed, or changed in the Product collection. Event streams are based on FQL. So, creating a stream that tracks these changes is simply a matter of initializing the stream with a query that captures the target Set.

Product.all().toStream()

Notice that the base query Product.all() is a full collection scan, which when issued as a normal query returns all documents in a collection. That same query may be used to define the Set of documents to track for an event stream. To create a stream, append the set.toStream() function to the query that defines the Set to be streamed. The set.toStream() function returns a stream token, which is used to start and consume the stream.

Filter

Some use cases only require events for changes to a specific subset of documents in a collection. Because an event stream is based on FQL queries, it is easy to filter, server-side, for that Set of desired documents. Targeting for your desired Set of documents with a stream query reduces data on the wire, and eliminates the need to filter our undesired event in your client application.

Consider the example requirement of our e-commerce application. It’s necessary to receive events for products of the type book that are less than 100_00 in price. The following stream query satisfies that requirement:

Product.where( .type == 'book' && .price < 100_00).toStream()

The stream query specifies the exact Set of documents to track. The stream only emits events when a change occurs in the Set of documents defined by the stream query, such as when a document is added, deleted, or changed. For example, the previous stream would emit an event for each of the following database operations;

Product.create( {
  name: "Music as History",
  type: "book",
  price: 16_89,
  stock: 20
})
let p = Product.byType('book')
 .where(.name == "Music as History" ).first()!
p.update({ stock: p.stock + 10 })
Product.byType('book')
 .where(.name == "Music as History" ).first()!
 .update({ price: 110_00 })

Notice that the last example write operation, which increased the price of the book titled Music as History to 110, still emits an event. The book was changed in a way that left the Set of documents tracked by the stream. Although an event was emitted when the document left the Set, any changes that occur to that document will no longer emit an event because it no longer meets the criteria of the stream query.

Track changes on specific fields

In addition to specifying what Set of documents that a stream emits events for, users may also define the Set of fields in a document that will emit an event. Consider the case where the application needs to receive an event whenever the price of a product in a target type changes. An example scenario is that a user is browsing for products of type electronics. The UI of the web application needs to be updated when the price of a product the user is browsing changes. The following stream query serves the requirements.

Product.byType('electronics').changesOn(.price)

With this query, the stream will only emit events whenever the price of a Product of type electronics changes.

Track changes to a single document

In some use cases, an event stream for a single document is appropriate. For instance, consider that a user has placed a given product in their shopping cart. During the time the product is in the cart, the stock may drop as other users purchase the same product. A requirement of the application is the user be notified that the stock of a product is changing while it is in their cart. The following query would meet this requirement but is inefficient:

Product.where(.id == 390896421523423744).toStream()

The above stream is highly inefficient as it tracks changes in the entire Product collection but only emits events for one of them. Users can optimize this use case by creating a singleton Set of a single product:

Set.single(Product.byId("390896421523423744")).toStream()

Optimize stream performance with indexes

Just as we strongly recommend the use of indexed queries for production workloads, we also recommend the use of indexes to increase the performance of stream queries. While an unindexed stream query is allowed, your stream queries will be more performant and cost-effective when an index is used to serve them.

Notice that many of the preceding stream query examples used the byType() index in our Product collection definition at the top of this page. This index uses the type field as the index term, and the price field as the index value. This index has been designed to suit the query patterns we’re tracking.

Therefore, the stream is defined as:

Product.where( .type == 'book' && .price < 100_00).toStream()

It would be far more efficient in cost and time if it were modified, as follows, to use the byType() index:

Product.byType('book').where( .price < 100_00).toStream()

There is a trade-off between streams on a collection versus streams on an index: index streams only emit events when fields in their configured terms or values change. So, while the use of an index increases the efficiency of a stream query, users need to design their indexes such that they fit the change events they need. For example, given the stream query above, the following write operation would not emit an event:

let p = Product.byType('book')
 .where(.name == "Music as History" ).first()!

p.update({ stock: p.stock - 1 })

In this example, even though the book Music as History meets the query criteria of the indexed stream query, none of the fields covered by the index changed as a result of the write operation. Yes, the stock field did change, but that is not a field that is covered by the index.

The need to have fields covered by the index extends to the use of set.changesOn() as well. Any field users wish to emit an event on in an index stream query must be covered by the index. The following example query would not be accepted by Fauna as a legal stream request, because the stock field is neither a term nor a value of the byType() index.

Product.byType('book').where(.price < 100_00).changesOn(.stock)

Projecting only the data you need into the stream

By default, add and update events project the entire document that was modified, even when an index was used. In cases where it’s unnecessary to track changes to the entire document, users may add a projection to the stream query to suit their needs. Consider this example stream query, which projects only the name and price fields.

Product.byType('book').where( .price < 100_00).toStream(){name, price}

Only the document’s name and price fields will be returned in events in this stream:

{
  "type": "add",
  "data": {
    "name": "The Fraud",
    "price": 8000
  },
  "txn_ts": 1710456655930000,
  "cursor": "gsGabc123",
  "stats": {
    ...
  }
}

Consume streams using Fauna’s client drivers

Applications typically consume streams using a Fauna client driver. The drivers can consume a stream using an asynchronous Event Feed or a real-time Event Streaming subscription.

Event Feeds

To use Event Feeds, you must have a Pro or Enterprise plan.

The following Fauna client drivers support Event Feeds:

Example

With the Python driver, use change_feed() to define a stream and return the stream’s paginated events.

To get the first page of events, you typically specify a start_ts (start timestamp) in the ChangeFeedOptions object passed to the initial change_feed() request.

Each page of events includes a top-level cursor. In subsequent requests, you can provide this cursor instead of a start_ts in the ChangeFeedOptions object passed to change_feed(). This polls for events after the cursor (exclusive):

import time
from datetime import datetime, timedelta
from fauna import fql
from fauna.client import Client, ChangeFeedOptions

def process_change_feed(client, query, start_ts=None, sleep_time=300):
    cursor = None
    while True:
        options = ChangeFeedOptions(
            start_ts=start_ts if cursor is None else None,
            cursor=cursor,
        )

        feed = client.change_feed(query, options)

        for page in feed:
            for event in page:
                event_type = event['type']
                if event_type == 'add':
                    # Do something on add
                    print('Add event: ', event)
                elif event_type == 'update':
                    # Do something on update
                    print('Update event: ', event)
                elif event_type == 'remove':
                    # Do something on remove
                    print('Remove event: ', event)

            # Store the cursor of the last page
            cursor = page.cursor

        # Clear the start timestamp after the first request
        start_ts = None

        print(f"Sleeping for {sleep_time} seconds...")
        time.sleep(sleep_time)

client = Client()
query = fql('Product.where(.type == "book" && .price < 100_00).toStream()')

# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
# Convert to microseconds
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)

process_change_feed(client, query, start_ts=start_ts)

Event Feeds sample app

The Event Feeds sample app shows how you can use Event Feeds to track changes to a database. The app uses an AWS Lambda function to send events for related changes to another service.

See Event Feeds sample app

Event Streaming

The following Fauna client drivers support real-time Event Streaming:

Example

With the JavaScript driver, you use the stream() function to define and subscribe to a stream in real time. Take the following example, where our earlier stream query is used to subscribe to the stream:

import { Client, fql } from "fauna";

const client = new Client();

const booksQuery = fql`Product.where(.type == 'book' && .price < 100_00).toStream()`;
const stream = client.stream(booksQuery);

try {
  for await (const event of stream) {
    switch (event.type) {
      case "add":
        // Do something on add
        console.log(event.data);
        break;
      case "update":
        // Do something on update
        console.log(event.data);
        break;
      case "remove":
        // Do something on remove
        console.log(event.data);
        break;
    }
  }
} catch (error) {
  console.log(error);
}

Event Streaming sample app

The Event Streaming sample app shows how you can use Event Streaming to build a real-time chat app. You can use it as a starting point for your own app.

See Event Streaming sample app

Event types

Write events that occur in the database will generate a Set of event types in the stream, based on what type of write occurred. The Set of event types are:

  • Add: Occurs whenever a new document has been created that matches the stream query’s criteria. Or, whenever a new document has been updated that matches the stream query’s criteria. Essentially, add events occur whenever a document enters the Set defined by the stream query.

  • Remove: Occurs when a document leaves the Set defined by the stream query. This can happen when a document that meets the stream query’s criteria is deleted. This can also happen when a document is updated to no longer meet the stream query’s criteria.

    Event streams don’t emit remove events for documents deleted due to an expired TTL. Such documents are deleted lazily upon expiration.

  • Update: Occurs when a field in a document has been updated that meets the stream query criteria.

  • Status: Occurs periodically, and serves to update the subscribing client, similar to a keep-alive, during periods when events are either not occurring on the database, or when events are filtered out of the stream server-side.

    Event Feeds don’t receive or include status events.

  • Error: Occurs when a stream can no longer be consumed. For potential causes, see the reference docs.

Your application code will need to handle the occurrence of any of these events.

Event shapes

Events sent through the stream include a field named type, which identifies the type of event that has occurred. This will be a string with a value of add, remove, update, status, or error as described above.

The data field will include the entire document by default. In the case that there is a projection in the stream query, then the data field will include only the fields defined in the projection. FQL values in the data field are encoded using the tagged format.

For example, for the stream query Product.byType('book').where(.price < 100).toStream(), creating the following document:

Product.create({
  type: 'book',
  name: 'The Fraud',
  stock: 55,
  price: 23_00
})

Will generate the following event:

{
  "type": "add",
  "data": {
    "coll": {
      "name": "Product"
    },
    "id": "392372948586987553",
    "ts": {
      "isoString": "2099-03-14T22:20:53.520Z"
    },
    "type": "book",
    "name": "The Fraud",
    "stock": 55,
    "price": 2300
  },
  "txn_ts": 1710454853520000,
  "cursor": "gsGghu789",
  "stats": {
    "read_ops": 1,
    "storage_bytes_read": 109,
    "compute_ops": 1,
    "processing_time_ms": 1,
    "rate_limits_hit": []
  }
}

Had the stream query had the following projection of the name and price fields; Product.byType('book').where(.price < 100_00).toStream(){ name, price }, the update event would have had the following shape:

{
  "type": "add",
  "data": {
    "name": "The Fraud",
    "price": 2300
  },
  "txn_ts": 1710454853520000,
  "cursor": "gsGghu789",
  "stats": {
    "read_ops": 1,
    "storage_bytes_read": 109,
    "compute_ops": 1,
    "processing_time_ms": 0,
    "rate_limits_hit": []
  }
}

Stream snapshots

In some use cases, it’s necessary to first load a Set of documents into the application that you then wish to keep updated with changes occurring in the database. Take the use case of presenting a leader board of a multiplayer gaming application as an example. As each player logs on to our multiplayer game, the leader board should be loaded into each player’s UI and kept up to date with player rankings and positions as each player’s score changes.

To serve such use cases, you can both query for a Set of documents and consume a stream for the same Set of documents, all in the same operation. Consider the following code snippet, which builds on our earlier examples. The following code queries for the Set of products of type book that are less than 100_00 in price. It uses Event Streaming to subscribe to an event stream on that same Set:

import { Client, fql } from "fauna";

const client = new Client();

const booksQuery = fql`
  let products = Product.where( .type == 'book' && .price < 100_00)
  {
    products: products,
    streamToken: products.toStream()
  }`;
const response = await client.query(booksQuery);
const { products, streamToken } = response.data;

// Paginate through all matching products
for await (const product of client.paginate(products)) {
  console.log(product);
}

const stream = client.stream(streamToken);
try {
  for await (const event of stream) {
    switch (event.type) {
      case "add":
      case "update":
        // Do something on update
        console.log(event.data);
        break;
      case "remove":
        // Do something on remove
        console.log(event.data);
        break;
    }
  }
} catch (error) {
  console.log(error);
}

Notice in the FQL query string, that the Set of Product documents returned by the query is assigned to the variable products. That Set is returned to the client using a projection to the products field of the object returned in the query operation. The set.toStream() function is called on the same products Set, and that stream is projected into the streamToken field of the query result object. The application code then iterates on the initial seed Set of documents returned by the query and then awaits new events on that Set.

Limitations

  • Operation limits apply to event streams.

  • While processing events, Fauna runs one query per transaction.

  • A stream can’t replay or process more than 128 events at a time. If a stream has more than 128 events to process, Fauna closes the stream with an error event.

  • You can’t create streams for:

Is this article helpful? 

Tell Fauna how the article can be improved:
Visit Fauna's forums or email docs@fauna.com

Thank you for your feedback!