Track changes with Event Feeds and Event Streaming
Reference: Event Feeds and Event Streaming reference |
---|
An event source tracks changes to a specific Set of documents or document fields, defined by an FQL query, in the database. The event source emits an event whenever a tracked change occurs.
You can consume an event source in two ways:
-
Event Feeds : Asynchronous requests that poll the event source for paginated events.
-
Event Streaming: A real-time subscription that pushes events from the event source to your application using an open connection to Fauna.
This guide covers how to create event sources and consume them using Event Feeds or Event Streaming.
Use cases
Some use cases where tracking changes using Event Feeds or Event Streaming is recommended:
-
Change data capture (CDC): Mirroring an application’s state across multiple systems. For example, you can synchronize your Fauna database with a data warehouse for analytics. CDC can also be used for auditing purposes, enabling you to track and record every change made to your database over time.
-
Maintain "live" application state: Client apps or browsers are made aware immediately as data changes. Users can track live e-commerce inventory, auction status, and productivity workflow state.
-
Brokered communications, documents, and collections can be used as message channels between multiple parties. A chat application is a good example of such a case.
-
Publish-subscribe (pub-sub) use cases, including updating external services and data stores, and asynchronous work queuing. An application client can integrate with a message queue, inserting events from Fauna into a Kafka topic, consumed by several downstream consumers, or data stores, such as a full-text search engine.
Create an event source
Define an event source
To create an event source for a Set of documents, append the
set.eventSource()
method to a query
that returns the Set.
For example, an e-commerce application manages a catalog of products in a
collection named Product
. The definition of this collection, which is used
in this guide, is as follows:
collection Product {
*: Any
index byType {
terms [ .type ]
values [ .name, .price ]
}
}
A business requirement of the application may be to take some administrative
action when a change occurs in the Product
collection. In other words, we want
to emit an event when a document is added, removed, or changed in the Product
collection. Event sources are based on FQL. So, creating an event source that
tracks these changes is simply a matter of initializing the event source with a
query that captures the target Set.
Product.all().eventSource()
Notice that the base query Product.all()
is a full collection scan, which when
issued as a normal query returns all documents in a collection. That same query
may be used to define the Set of documents to track for an event source. To
create an event source, append the
set.eventSource()
function to the
query that defines the Set to track. The
set.eventSource()
function returns
an event source, which is used to consume events.
Filter
Some use cases only require events for changes to a specific subset of documents in a collection. Because an event source is based on FQL queries, it is easy to filter, server-side, for that Set of desired documents. Targeting for your desired Set of documents with an event source query reduces data on the wire, and eliminates the need to filter our undesired event in your client application.
Consider the example requirement of our e-commerce application. It’s necessary
to receive events for products of the type book
that are less than 100_00
in
price
. The following event source query satisfies that requirement:
Product.where(.type == 'book' && .price < 100_00).eventSource()
The event source query specifies the exact Set of documents to track. The event source only emits events when a change occurs in the Set of documents defined by the event source query, such as when a document is added, deleted, or changed. For example, the previous event source would emit an event for each of the following database operations:
Product.create( {
name: "Music as History",
type: "book",
price: 16_89,
stock: 20
})
let p = Product.byType('book')
.where(.name == "Music as History" ).first()!
p.update({ stock: p.stock + 10 })
Product.byType('book')
.where(.name == "Music as History" ).first()!
.update({ price: 110_00 })
Notice that the last example write operation, which increased the price
of the
book titled Music as History
to 110
, still emits an event.
The book was changed in a way that left the Set of
documents tracked by the event source. Although an event was emitted
when the document left the Set, any changes that occur to that document will no
longer emit an event because it no longer meets the criteria of
the event source query.
Track changes on specific fields
In addition to specifying what Set of documents that an event source emits events for, users may also define the Set of fields in a document that will emit an event. Consider the case where the application needs to receive an event whenever the price of a product in a target type changes. An example scenario is that a user is browsing for products of type electronics. The UI of the web application needs to be updated when the price of a product the user is browsing changes. The following event source query serves the requirements.
Product.byType('electronics').eventsOn(.price)
With this query, the event source will only emit events
whenever the price
of a Product
of type electronics
changes.
Track changes to a single document
In some use cases, an event source for a single document is appropriate. For instance, consider that a user has placed a given product in their shopping cart. During the time the product is in the cart, the stock may drop as other users purchase the same product. A requirement of the application is the user be notified that the stock of a product is changing while it is in their cart. The following query would meet this requirement but is inefficient:
Product.where(.id == 390896421523423744).eventSource()
The above event source is highly inefficient as it tracks changes in the entire
Product
collection but only emits events for one of them. Users can optimize
this use case by creating a singleton Set of a single product:
Set.single(Product.byId("390896421523423744")).eventSource()
Optimize event source performance with indexes
Just as we strongly recommend the use of indexed queries for production workloads, we also recommend the use of indexes to increase the performance of event source queries. While an unindexed event source query is allowed, your event source queries will be more performant and cost-effective when an index is used to serve them.
Notice that many of the preceding event source query examples used the
byType()
index in our Product
collection definition at the top of this page.
This index uses the type
field as the index term, and the price
field as the
index value. This index has been designed to suit the query patterns we’re
tracking.
Therefore, the event source is defined as:
Product.where( .type == 'book' && .price < 100_00).eventSource()
It would be far more efficient in cost and time if it were modified, as follows,
to use the byType()
index:
Product.byType('book').where( .price < 100_00).eventSource()
There is a trade-off between event sources on a collection versus event sources on an index: index event sources only emit events when fields in their configured terms or values change. So, while the use of an index increases the efficiency of an event source query, users need to design their indexes such that they fit the change events they need. For example, given the event source query above, the following write operation would not emit an event:
let p = Product.byType('book')
.where(.name == "Music as History" ).first()!
p.update({ stock: p.stock - 1 })
In this example, even though the book Music as History
meets the query
criteria of the indexed event source query, none of the fields covered by the
index changed as a result of the write operation. Yes, the stock field did
change, but that is not a field that is covered by the index.
The need to have fields covered by the index extends to the use of
set.eventsOn()
as well. Any field
users wish to emit an event on in an index event source query must be covered by
the index. The following example query would not be accepted by Fauna as a legal
event source query, because the stock
field is neither a term nor a value of
the byType()
index.
Product.byType('book').where(.price < 100_00).eventsOn(.stock)
Projecting only the data you need into the event source
By default, add
and update
events project the entire document that was
modified, even when an index was used. In cases where it’s unnecessary to track
changes to the entire document, users may add a projection to the event source
query to suit their needs. Consider this example event source query, which
projects only the name
and price
fields.
Product.byType('book').where( .price < 100_00).eventSource(){name, price}
Only the document’s name
and price
fields will be returned in events in this
event source:
{
"type": "add",
"data": {
"name": "The Fraud",
"price": 8000
},
"txn_ts": 1710456655930000,
"cursor": "gsGabc123",
"stats": {
...
}
}
Consume event sources using Fauna’s client drivers
Applications typically consume event sources using a Fauna client driver. The drivers can consume an event source as an asynchronous Event Feed or as a real-time Event Stream.
Event Feeds
To use Event Feeds, you must have a Pro or Enterprise plan.
The following Fauna client drivers support Event Feeds:
Example
With the Python driver, use feed()
to define an event source and return the event source’s paginated events.
To get the first page of events, you typically specify a start_ts
(start
timestamp) in the FeedOptions
object passed to the initial
feed()
request.
Each page of events includes a top-level cursor
. In subsequent requests, you
can provide this cursor
instead of a start_ts
in the FeedOptions
object passed to feed()
. This polls for events after the cursor
(exclusive):
import time
from datetime import datetime, timedelta
from fauna import fql
from fauna.client import Client, FeedOptions
def process_feed(client, query, start_ts=None, sleep_time=300):
cursor = None
while True:
options = FeedOptions(
start_ts=start_ts if cursor is None else None,
cursor=cursor,
)
feed = client.feed(query, options)
for page in feed:
for event in page:
event_type = event['type']
if event_type == 'add':
# Do something on add
print('Add event: ', event)
elif event_type == 'update':
# Do something on update
print('Update event: ', event)
elif event_type == 'remove':
# Do something on remove
print('Remove event: ', event)
# Store the cursor of the last page
cursor = page.cursor
# Clear the start timestamp after the first request
start_ts = None
print(f"Sleeping for {sleep_time} seconds...")
time.sleep(sleep_time)
client = Client()
query = fql('Product.where(.type == "book" && .price < 100_00).eventSource()')
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
# Convert to microseconds
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
process_feed(client, query, start_ts=start_ts)
Event Feeds sample app
The Event Feeds sample app shows how you can use Event Feeds to track changes to a database. The app uses an AWS Lambda function to send events for related changes to another service.
See Event Feeds sample app |
---|
Event Streaming
The following Fauna client drivers support real-time Event Streaming:
Example
With the JavaScript driver, you use the
stream()
function to define and subscribe to an event source in real time:
import { Client, fql } from "fauna";
const client = new Client();
const query = fql`Product.where(.type == 'book' && .price < 100_00).eventSource()`;
const stream = client.stream(query);
try {
for await (const event of stream) {
switch (event.type) {
case "add":
// Do something on add
console.log(event.data);
break;
case "update":
// Do something on update
console.log(event.data);
break;
case "remove":
// Do something on remove
console.log(event.data);
break;
}
}
} catch (error) {
console.log(error);
}
Event Streaming sample app
The Event Streaming sample app shows how you can use Event Streaming to build a real-time chat app. You can use it as a starting point for your own app.
See Event Streaming sample app |
---|
Event types
Write events that occur in the database will generate a Set of event types in the event source, based on what type of write occurred. The Set of event types are:
-
Add: Occurs whenever a new document has been created that matches the event source query’s criteria. Or, whenever a new document has been updated that matches the event source query’s criteria. Essentially, add events occur whenever a document enters the Set defined by the event source query.
-
Remove: Occurs when a document leaves the Set defined by the event source query. This can happen when a document that meets the event source query’s criteria is deleted. This can also happen when a document is updated to no longer meet the event source query’s criteria.
Event sources don’t emit
remove
events for documents deleted due to an expired TTL. Such documents are deleted lazily upon expiration. -
Update: Occurs when a field in a document has been updated that meets the event source query’s criteria.
-
Status: Occurs periodically, and serves to update the subscribing client, similar to a keep-alive, during periods when events are either not occurring on the database, or when events are filtered out of the event source server-side.
Event Feeds don’t receive or include status events.
-
Error: Occurs when an event source can no longer be consumed. For potential causes, see the reference docs.
Your application code will need to handle the occurrence of any of these events.
Event shapes
Events emitted by the event source include a field named type
, which
identifies the type of event that has occurred. This will be a string with a
value of add
, remove
, update
, status
, or error
as described above.
The data
field will include the entire document by default. In the case that
there is a projection in the event source query, then the data field will
include only the fields defined in the projection. FQL values in the data
field are encoded using the
tagged format.
For example, for the event source query:
Product.byType('book')
.where(.price < 100)
.eventSource()
Then creating the following documents:
Product.create({
type: 'book',
name: 'The Fraud',
stock: 55,
price: 23_00
})
Will generate the following event:
{
"type": "add",
"data": {
"coll": {
"name": "Product"
},
"id": "392372948586987553",
"ts": {
"isoString": "2099-03-14T22:20:53.520Z"
},
"type": "book",
"name": "The Fraud",
"stock": 55,
"price": 2300
},
"txn_ts": 1710454853520000,
"cursor": "gsGghu789",
"stats": {
"read_ops": 1,
"storage_bytes_read": 109,
"compute_ops": 1,
"processing_time_ms": 1,
"rate_limits_hit": []
}
}
Had the event source query had the following projection of the name
and
price
fields:
Product.byType('book')
.where(.price < 100_00)
.eventSource() {
name,
price
}
The update event would have had the following shape:
{
"type": "add",
"data": {
"name": "The Fraud",
"price": 2300
},
"txn_ts": 1710454853520000,
"cursor": "gsGghu789",
"stats": {
"read_ops": 1,
"storage_bytes_read": 109,
"compute_ops": 1,
"processing_time_ms": 0,
"rate_limits_hit": []
}
}
Event source snapshots
In some use cases, it’s necessary to first load a Set of documents into the application that you then wish to keep updated with changes occurring in the database. Take the use case of presenting a leader board of a multiplayer gaming application as an example. As each player logs on to our multiplayer game, the leader board should be loaded into each player’s UI and kept up to date with player rankings and positions as each player’s score changes.
To serve such use cases, you can both query for a Set of documents and consume an event source for the same Set of documents, all in the same operation:
import { Client, fql } from "fauna";
const client = new Client();
const query = fql`
let products = Product.where( .type == 'book' && .price < 100_00)
{
products: products,
eventSource: products.eventSource()
}`;
const response = await client.query(query);
const { products, eventSource } = response.data;
// Paginate through all matching products
for await (const product of client.paginate(products)) {
console.log(product);
}
const stream = client.stream(eventSource);
try {
for await (const event of stream) {
switch (event.type) {
case "add":
case "update":
// Do something on update
console.log(event.data);
break;
case "remove":
// Do something on remove
console.log(event.data);
break;
}
}
} catch (error) {
console.log(error);
}
Notice in the FQL query string, that the Set of Product
documents returned by
the query is assigned to the variable products
. That Set is returned to the
client using a projection to the products
field of the object returned in the
query operation. The
set.eventSource()
function is called
on the same products
Set, and that event source is projected into the
eventSource
field of the query result object. The application code then
iterates on the initial seed Set of documents returned by the query and then
awaits new events on that Set.
Limitations
-
Operation limits apply to event sources.
-
While processing events, Fauna runs one query per transaction.
-
An event source can’t replay or process more than 128 events at a time. If an event source has more than 128 events to process, Fauna closes the event source with an error event.
-
You can’t create event sources for:
-
An index for a system collection.
-
A set that combines documents from one or more collections.
Is this article helpful?
Tell Fauna how the article can be improved:
Visit Fauna's forums
or email docs@fauna.com
Thank you for your feedback!