Write throughput scaling

A common challenge for database operations is responding to high-throughput requests. High volume, concurrent updates on individual documents frequently leads to high contention within the database, with many connected clients trying to read from and write to a document simultaneously. Fortunately, there are schema design techniques to mitigate this problem. When your data access patterns involve a high volume of writes to a single document, one mitigation strategy is to aggregate write operations into "chunks", or batches, and execute them at planned intervals, thus reducing write contention.

Example use case: real-time tracking of bytes uploaded

The following section outlines a strategy for throughput scaling with a schema design pattern that reduces write contention. This schema design is applicable to many use cases and is based on two design patterns, called Event Sourcing and Command Query Responsibility Segregation, which were developed for use in high-performance banking applications.

This schema design:

  • Avoids contention caused by rapid writes to a single document.

  • Is a "write-once, read-many" pattern that avoids updates to indexes (and in Fauna’s case, unbounded history growth).

This example considers a collection which keeps track of data processing by individual users who perform compute operations with a Lambda/Worker (Edge Function). A field named bytesUploaded keeps a running count of total bytes processed by all Workers. A sufficiently high volume of Worker activity could quickly lead to database contention as multiple clients perform write operations simultaneously on the bytesUploaded field.

Data collection

Rather than updating the bytesUploaded field every time a new update is available, a function might insert each new update as a single document in a separate collection named eventsLog. This avoids contention, because the function is creating new documents rather than updating an existing one. Here’s an example of what such a document could look like:

{
  "ref": Ref(Collection("eventsLog"), "1"),
  "ts": 1630107062410000,
  "data": {
   "user": Ref(Collection("User"), "123"),
   "bytesUploaded": 278 }
}

The user field contains a Reference to the user which performed the compute operation, and the bytesUploaded field records the result of the operation.

Data aggregation illustration

Aggregation

To aggregate the accumulated data, you could read all the prior eventsLog documents and perform the aggregation, but that would be prohibitively expensive in both time and operations. There’s a better way.

Command query responsibility segregation can help. Implement another Worker that reads through the eventsLog documents and stores the aggregations as a set of incremental snapshots. Note that the first time the job runs, the Worker must read through all the existing eventsLog documents at once, but subsequent jobs won’t have to.

Data aggregation illustration

Each snapshot stores the aggregate value at specific, incremental points in time. An example of the data in a snapshot would look like this:

{
  "ref": Ref(Collection("snapshots"), "2"),
  "ts": 1630107063800010,
  "data": {
    "user": Ref(Collection("User"), "123"),
    "runningBytesUploaded": 1163,
    "lastReadTS": 1630107062480000,
  }
}

The snapshot contains the user Reference, the aggregated number of bytes uploaded, and the timestamp of the last eventsLog document.

Any time a Worker needs a consistent, current view of total bytes uploaded, it first queries for the latest snapshot, then reads all eventsLog documents that have occurred since the lastReadTS timestamp. Optionally, the current snapshot’s value could be written back to the User document for easier data retrieval.

Fetch the latest state

Getting the latest state is the same process of reading the latest snapshot and aggregating the cumulative eventsLog entries since that same snapshot was made, as illustrated below:

Data aggregation illustration

The Workers don’t have to replay the entire log, so getting the latest aggregated value is much faster.

Possible implementation strategies:

  • Use a cron job (or equivalent) to create new snapshots at a given interval.

  • Invoke the aggregating Worker when a client makes a request for "bytes uploaded" (with polling intervals implemented client-side).

Fauna Query Language implementation notes

Let’s take a look at how you can use FQL to retrieve documents created after a certain date-time.

First we need an index that includes the ts (timestamp) value which is part of every Fauna document. The following command creates an index which sorts the eventsLog collection by timestamp from oldest to newest:

CreateIndex({
  name: "events_log_sort_by_ts",
  source: Collection("eventsLog"),
  values: [
    { field: ["ts"] },
    { field: ["data", "user"] },
    { field: ["data", "bytesUploaded"] }
  ]
})

Then you can use the after parameter with the Paginate function to return all eventsLog documents inserted after a certain date-time:

Paginate(
  Match(Index("events_log_sort_by_ts")),
  {
    after: 1630107062480001,
  }
)

The above FQL query is equivalent to the SQL statement SELECT * FROM eventsLog WHERE ts >= @datetimevalue.

Notes:

  • The preceding query uses the after parameter because this is a pagination cursor rather than a value comparison.

  • after is inclusive: it finds documents starting with the given value and higher. That’s why one microsecond was added to the lastReadTS value: the last eventLog message would be included in the next snapshot, leading to limited double counting.

For ease of use and repeatability, the above command should be a user-defined function (UDF) so you can pass a value to the after parameter.

The command shown below creates a new UDF which checks the events_log_sort_by_ts index for all documents which were added on or after a given timestamp.

CreateFunction({
  name: "findLatestEvents",
  body: Query(
    Lambda(
      "lastSnapshotDateTime",
      Paginate(
        Match(Index("events_log_sort_by_ts")),
        {
          after: TimeAdd(Var("lastSnapshotDateTime"), 1, "microsecond")
        }
      )
    )
  )
})

You can use the new UDF with the Call function:

Call("findLatestEvents", 1639768939970000)

The output should return the timestamp, user reference, and number of bytes uploaded for each document it finds:

{
  before: [1639768939970000],
  data: [
    [1639768939970000, Ref(Collection("User"), "318252577924842048"), 512],
    [1639768949105000, Ref(Collection("User"), "318252577924842048"), 2782],
    [1639768961000000, Ref(Collection("User"), "318252577924842048"), 722],
    [1639768969695000, Ref(Collection("User"), "318252577924842048"), 8830]
  ]
}
The results show a before field even though we passed an after parameter to the Paginate function. This is because we’re looking at the final page of the results set from our index query, and there’s nothing after the last displayed item. If you wanted to navigate to an earlier page of the result set you could use the returned before value to get there. For more information about navigating through the pages of a results set, see Paginate.

Summary

The solution outlined in this tutorial consists of three separate functions that are each responsible for executing their own, very specific database task:

  • One Function/Worker that inserts new records into the eventsLog collection and does nothing else.

  • One Function/Worker that runs as part of a cron job (or other task-scheduling system) and generates snapshots, and nothing else.

  • One Function/Worker that gets the latest state by loading the latest snapshot and reading the newest eventsLog entries.

The solution described here has the added benefit of scalability. If there’s a need in the future to aggregate on a new field, you can add additional Functions/Workers to perform the specific aggregation. You can safely add new types of aggregation Functions/Workers, as long as they don’t interfere with the existing Functions/Workers.

Is this article helpful? 

Tell Fauna how the article can be improved:
Visit Fauna's forums or email docs@fauna.com

Thank you for your feedback!