Track changes with Event Feeds and Event Streaming
Reference: Event Feeds and Event Streaming reference |
---|
An event stream tracks changes to a specific Set of documents or document fields, defined by an FQL query, in the database. The stream emits an event whenever a tracked change occurs.
You can consume an event stream in two ways:
-
Event Feeds : Asynchronous requests that poll the stream for paginated events.
-
Event Streaming: A real-time subscription that pushes events from the stream to your application using an open connection to Fauna.
This guide covers how to create event streams and consume them using Event Feeds or Event Streaming.
Use cases
Some use cases where tracking changes using Event Feeds or Event Streaming is recommended:
-
Change data capture (CDC): Mirroring an application’s state across multiple systems. For example, you can synchronize your Fauna database with a data warehouse for analytics. CDC can also be used for auditing purposes, enabling you to track and record every change made to your database over time.
-
Maintain "live" application state: Client apps or browsers are made aware immediately as data changes. Users can track live e-commerce inventory, auction status, and productivity workflow state.
-
Brokered communications, documents, and collections can be used as message channels between multiple parties. A chat application is a good example of such a case.
-
Publish-subscribe (pub-sub) use cases, including updating external services and data stores, and asynchronous work queuing. An application client can integrate with a message queue, inserting events from Fauna into a Kafka topic, consumed by several downstream consumers, or data stores, such as a full-text search engine.
Create a stream
Define a stream
To create a stream for a Set of documents, append the
set.toStream()
method to a query
that returns the Set.
For example, an e-commerce application manages a catalog of products in a
collection named Product
. The definition of this collection, which is used
in this guide, is as follows:
collection Product {
*: Any
index byType {
terms [ .type ]
values [ .name, .price ]
}
}
A business requirement of the application may be to take some administrative
action when a change occurs in the Product
collection. In other words, we
want to emit an event when a document is added, removed, or changed in
the Product
collection. Event streams are based on FQL. So, creating a
stream that tracks these changes is simply a matter of initializing the
stream with a query that captures the target Set.
Product.all().toStream()
Notice that the base query Product.all()
is a full collection scan, which when
issued as a normal query returns all documents in a collection. That same query
may be used to define the Set of documents to track for an event stream. To
create a stream, append the
set.toStream()
function to the query
that defines the Set to be streamed. The
set.toStream()
function returns a
stream token, which is used to start and consume the stream.
Filter
Some use cases only require events for changes to a specific subset of documents in a collection. Because an event stream is based on FQL queries, it is easy to filter, server-side, for that Set of desired documents. Targeting for your desired Set of documents with a stream query reduces data on the wire, and eliminates the need to filter our undesired event in your client application.
Consider the example requirement of our e-commerce application. It’s necessary
to receive events for products of the type book
that are less than
100_00
in price
. The following stream query satisfies that requirement:
Product.where( .type == 'book' && .price < 100_00).toStream()
The stream query specifies the exact Set of documents to track. The stream only emits events when a change occurs in the Set of documents defined by the stream query, such as when a document is added, deleted, or changed. For example, the previous stream would emit an event for each of the following database operations;
Product.create( {
name: "Music as History",
type: "book",
price: 16_89,
stock: 20
})
let p = Product.byType('book')
.where(.name == "Music as History" ).first()!
p.update({ stock: p.stock + 10 })
Product.byType('book')
.where(.name == "Music as History" ).first()!
.update({ price: 110_00 })
Notice that the last example write operation, which increased the price
of the
book titled Music as History
to 110
, still emits an event.
The book was changed in a way that left the Set of
documents tracked by the stream. Although an event was emitted
when the document left the Set, any changes that occur to that document will no
longer emit an event because it no longer meets the criteria of
the stream query.
Track changes on specific fields
In addition to specifying what Set of documents that a stream emits events for, users may also define the Set of fields in a document that will emit an event. Consider the case where the application needs to receive an event whenever the price of a product in a target type changes. An example scenario is that a user is browsing for products of type electronics. The UI of the web application needs to be updated when the price of a product the user is browsing changes. The following stream query serves the requirements.
Product.byType('electronics').changesOn(.price)
With this query, the stream will only emit events
whenever the price
of a Product
of type electronics
changes.
Track changes to a single document
In some use cases, an event stream for a single document is appropriate. For instance, consider that a user has placed a given product in their shopping cart. During the time the product is in the cart, the stock may drop as other users purchase the same product. A requirement of the application is the user be notified that the stock of a product is changing while it is in their cart. The following query would meet this requirement but is inefficient:
Product.where(.id == 390896421523423744).toStream()
The above stream is highly inefficient as it tracks changes in the entire
Product
collection but only emits events for one of them. Users can
optimize this use case by creating a singleton Set of a single product:
Set.single(Product.byId("390896421523423744")).toStream()
Optimize stream performance with indexes
Just as we strongly recommend the use of indexed queries for production workloads, we also recommend the use of indexes to increase the performance of stream queries. While an unindexed stream query is allowed, your stream queries will be more performant and cost-effective when an index is used to serve them.
Notice that many of the preceding stream query examples used the byType()
index in our Product
collection definition at the top of this page. This index
uses the type
field as the index term, and the price
field as the index
value. This index has been designed to suit the query patterns we’re tracking.
Therefore, the stream is defined as:
Product.where( .type == 'book' && .price < 100_00).toStream()
It would be far more efficient in cost and time if it were modified, as follows,
to use the byType()
index:
Product.byType('book').where( .price < 100_00).toStream()
There is a trade-off between streams on a collection versus streams on an index: index streams only emit events when fields in their configured terms or values change. So, while the use of an index increases the efficiency of a stream query, users need to design their indexes such that they fit the change events they need. For example, given the stream query above, the following write operation would not emit an event:
let p = Product.byType('book')
.where(.name == "Music as History" ).first()!
p.update({ stock: p.stock - 1 })
In this example, even though the book Music as History
meets the query
criteria of the indexed stream query, none of the fields covered by the index
changed as a result of the write operation. Yes, the stock field did change,
but that is not a field that is covered by the index.
The need to have fields covered by the index extends to the use of
set.changesOn()
as well. Any field
users wish to emit an event on in an index stream query must be covered
by the index. The following example query would not be accepted by Fauna as a
legal stream request, because the stock
field is neither a term nor a value of
the byType()
index.
Product.byType('book').where(.price < 100_00).changesOn(.stock)
Projecting only the data you need into the stream
By default, add
and update
events project the entire document that was
modified, even when an index was used. In cases where it’s unnecessary to track
changes to the entire document, users may add a projection to the stream query
to suit their needs. Consider this example stream query, which projects only the
name
and price
fields.
Product.byType('book').where( .price < 100_00).toStream(){name, price}
Only the document’s name
and price
fields will be returned in events in this
stream:
{
"type": "add",
"data": {
"name": "The Fraud",
"price": 8000
},
"txn_ts": 1710456655930000,
"cursor": "gsGabc123",
"stats": {
...
}
}
Consume streams using Fauna’s client drivers
Applications typically consume streams using a Fauna client driver. The drivers can consume a stream using an asynchronous Event Feed or a real-time Event Streaming subscription.
Event Feeds
To use Event Feeds, you must have a Pro or Enterprise plan.
The following Fauna client drivers support Event Feeds:
Example
With the Python driver, use change_feed()
to define a stream and return the stream’s paginated events.
To get the first page of events, you typically specify a start_ts
(start
timestamp) in the ChangeFeedOptions
object passed to the initial
change_feed()
request.
Each page of events includes a top-level cursor
. In subsequent requests, you
can provide this cursor
instead of a start_ts
in the ChangeFeedOptions
object passed to change_feed()
. This polls for events after the cursor
(exclusive):
import time
from datetime import datetime, timedelta
from fauna import fql
from fauna.client import Client, ChangeFeedOptions
def process_change_feed(client, query, start_ts=None, sleep_time=300):
cursor = None
while True:
options = ChangeFeedOptions(
start_ts=start_ts if cursor is None else None,
cursor=cursor,
)
feed = client.change_feed(query, options)
for page in feed:
for event in page:
event_type = event['type']
if event_type == 'add':
# Do something on add
print('Add event: ', event)
elif event_type == 'update':
# Do something on update
print('Update event: ', event)
elif event_type == 'remove':
# Do something on remove
print('Remove event: ', event)
# Store the cursor of the last page
cursor = page.cursor
# Clear the start timestamp after the first request
start_ts = None
print(f"Sleeping for {sleep_time} seconds...")
time.sleep(sleep_time)
client = Client()
query = fql('Product.where(.type == "book" && .price < 100_00).toStream()')
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
# Convert to microseconds
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
process_change_feed(client, query, start_ts=start_ts)
Event Feeds sample app
The Event Feeds sample app shows how you can use Event Feeds to track changes to a database. The app uses an AWS Lambda function to send events for related changes to another service.
See Event Feeds sample app |
---|
Event Streaming
The following Fauna client drivers support real-time Event Streaming:
Example
With the JavaScript driver, you use the
stream()
function to define and subscribe to a stream in real time.
Take the following example, where our earlier stream query is used to subscribe
to the stream:
import { Client, fql } from "fauna";
const client = new Client();
const booksQuery = fql`Product.where(.type == 'book' && .price < 100_00).toStream()`;
const stream = client.stream(booksQuery);
try {
for await (const event of stream) {
switch (event.type) {
case "add":
// Do something on add
console.log(event.data);
break;
case "update":
// Do something on update
console.log(event.data);
break;
case "remove":
// Do something on remove
console.log(event.data);
break;
}
}
} catch (error) {
console.log(error);
}
Event Streaming sample app
The Event Streaming sample app shows how you can use Event Streaming to build a real-time chat app. You can use it as a starting point for your own app.
See Event Streaming sample app |
---|
Event types
Write events that occur in the database will generate a Set of event types in the stream, based on what type of write occurred. The Set of event types are:
-
Add: Occurs whenever a new document has been created that matches the stream query’s criteria. Or, whenever a new document has been updated that matches the stream query’s criteria. Essentially, add events occur whenever a document enters the Set defined by the stream query.
-
Remove: Occurs when a document leaves the Set defined by the stream query. This can happen when a document that meets the stream query’s criteria is deleted. This can also happen when a document is updated to no longer meet the stream query’s criteria.
Event streams don’t emit
remove
events for documents deleted due to an expired TTL. Such documents are deleted lazily upon expiration. -
Update: Occurs when a field in a document has been updated that meets the stream query criteria.
-
Status: Occurs periodically, and serves to update the subscribing client, similar to a keep-alive, during periods when events are either not occurring on the database, or when events are filtered out of the stream server-side.
Event Feeds don’t receive or include status events.
-
Error: Occurs when a stream can no longer be consumed. For potential causes, see the reference docs.
Your application code will need to handle the occurrence of any of these events.
Event shapes
Events sent through the stream include a field named type
, which identifies
the type of event that has occurred. This will be a string with a value of
add
, remove
, update
, status
, or error
as described above.
The data
field will include the entire document by default. In the case that
there is a projection in the stream query, then the data field will include only
the fields defined in the projection. FQL values in the data
field are encoded
using the tagged
format.
For example, for the stream query Product.byType('book').where(.price <
100).toStream()
, creating the following document:
Product.create({
type: 'book',
name: 'The Fraud',
stock: 55,
price: 23_00
})
Will generate the following event:
{
"type": "add",
"data": {
"coll": {
"name": "Product"
},
"id": "392372948586987553",
"ts": {
"isoString": "2099-03-14T22:20:53.520Z"
},
"type": "book",
"name": "The Fraud",
"stock": 55,
"price": 2300
},
"txn_ts": 1710454853520000,
"cursor": "gsGghu789",
"stats": {
"read_ops": 1,
"storage_bytes_read": 109,
"compute_ops": 1,
"processing_time_ms": 1,
"rate_limits_hit": []
}
}
Had the stream query had the following projection of the name
and price
fields; Product.byType('book').where(.price < 100_00).toStream(){ name, price
}
, the update event would have had the following shape:
{
"type": "add",
"data": {
"name": "The Fraud",
"price": 2300
},
"txn_ts": 1710454853520000,
"cursor": "gsGghu789",
"stats": {
"read_ops": 1,
"storage_bytes_read": 109,
"compute_ops": 1,
"processing_time_ms": 0,
"rate_limits_hit": []
}
}
Stream snapshots
In some use cases, it’s necessary to first load a Set of documents into the application that you then wish to keep updated with changes occurring in the database. Take the use case of presenting a leader board of a multiplayer gaming application as an example. As each player logs on to our multiplayer game, the leader board should be loaded into each player’s UI and kept up to date with player rankings and positions as each player’s score changes.
To serve such use cases, you can both query for a Set of documents and consume a
stream for the same Set of documents, all in the same operation. Consider the
following code snippet, which builds on our earlier examples. The following code
queries for the Set of products of type book
that are less than 100_00
in
price. It uses Event Streaming to subscribe to an event stream on that same
Set:
import { Client, fql } from "fauna";
const client = new Client();
const booksQuery = fql`
let products = Product.where( .type == 'book' && .price < 100_00)
{
products: products,
streamToken: products.toStream()
}`;
const response = await client.query(booksQuery);
const { products, streamToken } = response.data;
// Paginate through all matching products
for await (const product of client.paginate(products)) {
console.log(product);
}
const stream = client.stream(streamToken);
try {
for await (const event of stream) {
switch (event.type) {
case "add":
case "update":
// Do something on update
console.log(event.data);
break;
case "remove":
// Do something on remove
console.log(event.data);
break;
}
}
} catch (error) {
console.log(error);
}
Notice in the FQL query string, that the Set of Product
documents returned
by the query is assigned to the variable products
. That Set is returned to the
client using a projection to the products
field of the object returned in the
query operation. The set.toStream()
function is called on the same products
Set, and that stream is projected into
the streamToken
field of the query result object. The application code then
iterates on the initial seed Set of documents returned by the query and then
awaits new events on that Set.
Limitations
-
Operation limits apply to event streams.
-
While processing events, Fauna runs one query per transaction.
-
A stream can’t replay or process more than 128 events at a time. If a stream has more than 128 events to process, Fauna closes the stream with an error event.
-
You can’t create streams for:
-
An index for a system collection.
-
A set that combines documents from one or more collections.
Is this article helpful?
Tell Fauna how the article can be improved:
Visit Fauna's forums
or email docs@fauna.com
Thank you for your feedback!