Event Feeds sample app

This sample application demonstrates how to use Event Feeds to return paginated events from a collection. The application uses Fauna’s Python driver to retrieve the events.

The sample app uses AWS Lambda function to poll for events every 10 minutes. It then sends any fetched events to another service.

Prerequisites

  • Knowledge of Python

  • Knowledge of FQL and Event Feeds

  • Basic understanding of AWS Lambda serverless functions

  • AWS account

  • AWS SAM CLI installed

Learning Objectives

  • How to use Event Feeds in a serverless application

Overview of Event Feeds

An event stream tracks changes to a specific Set of documents or document fields, defined by an FQL query, in the database. Event Feeds are asynchronous requests that poll a stream for paginated events.

Imagine you have a e-commerce operation. Every time a product is low in stock, you want to send a notification to your team. You can use an Event Feed to track the stock levels of your products. When the stock level of a product falls below a certain threshold, the Event Feed sends an event to your warehouse team. Let’s say you check the stock levels every 10 minutes. The Event Feed will return all the events from the previous 10 minutes.

Configure Fauna

Create a Fauna database

Before you start, you need to create a Fauna database and collection. For this sample application, you can use the demo data provided by Fauna.

  1. Log in the Fauna Dashboard.

  2. In the Dashboard, create a database with the Demo data option enabled. Then navigate to the database in the Dashboard.

  3. In the upper left pane of the Dashboard’s Explorer page, click the demo database, and click the Keys tab.

  4. Click Create Key.

  5. Choose a Role of server.

  6. Click Save.

  7. Copy the Key Secret. Save the key’s secret. You’ll use it later.

Create a collection to track Event Feed cursors

Each time you poll an Event Feed, you get a top-level page cursor. You can use this cursor to get the next set of events. You can store returned cursors in a collection.

  1. Create a collection named Cursor.

    collection Cursor {
      name: String
      value: String?
    
      index byName {
        terms [.name]
      }
    }
  2. Create a document in the Cursor collection with the name ProductInventory.

    Cursor.create({name: "ProductInventory"})

Create a serverless project

  1. Run the following command to create a new serverless project:

    sam init
  2. Choose the 1 option for the AWS Quick Start Templates and then choose the python3.10 runtime.

  3. Choose the hello_world template.

  4. The folder structure of the project will look like this:

    sam-python-function/
    ├── README.md
    ├── events
    │   └── event.json
    ├── hello_world
    │   ├── __init__.py
    │   ├── app.py              # Main Lambda handler file
    │   └── requirements.txt    # Dependencies for your Lambda function
    ├── template.yaml           # SAM template file
    └── tests
        └── unit
            └── test_handler.py

Write code for the serverless function

  1. Navigate to the project directory and open the hello_world/requirements.txt file.

  2. Add fauna driver as a dependency:

    fauna
  3. Navigate inside the Lambda function directory and run the following command to install the dependencies:

    cd hello_world
    pip install -r requirements.txt -t .
  4. Next, add the following code to the hello_world/app.py file:

    import json
    import time
    from fauna import fql
    from datetime import datetime, timedelta
    from fauna.client import Client, ChangeFeedOptions
    
    def lambda_handler(e, context):
    
      client = Client()
    
      # Get the previous feed cursor if it exists
      cursor = None
      options = None
    
      cursor_data = client.query(fql('Cursor.byName("ProductInventory").first()'))
      cursor = cursor_data.data.value if cursor_data.data.value else None
    
      # If no cursor exists, capture all events from previous 10 minutes
      if cursor is None:
        # Calculate timestamp for 10 minutes ago
        ten_minutes_ago = datetime.now() - timedelta(minutes=10)
        # Convert to microseconds
        start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
    
        options = ChangeFeedOptions(
          start_ts=start_ts
        )
    
      feed = client.change_feed(fql('Product.where(.stock < 25).toStream()'), options)
    
      for page in feed:
        for event in page:
          event_type = event['type']
          if event_type == 'add':
            # Do something on add
            print('Add event: ', event)
          elif event_type == 'update':
            # Make an API call to another service on event
            # (i.e. email or slack notification)
            print('Update event: ', event)
          elif event_type == 'remove':
            # Do something on remove
            print('Remove event: ', event)
    
        # Store the cursor of the last page
        cursor = page.cursor
        # Store the cursor in the database
        cursor_update = client.query(fql('''
          Cursor.byName("ProductInventory").first()!.update({
            value: ${cursor}
          })
        ''', cursor=cursor))
    
        print(f'Cursor updated: {cursor}')
    
      return {
        "statusCode": 200,
        "body": json.dumps({
            "message": "Event feed processed successfully"
        })
      }
  5. Save the file.

  6. Open the template.yaml file. This file defines the AWS resources that your serverless application will use.

  7. Add the following code to the template.yaml file:

    AWSTemplateFormatVersion: '2010-09-09'
    Transform: AWS::Serverless-2016-10-31
    Description: >
      sam-python-function
    
    Globals:
      Function:
        Timeout: 3
    
    Resources:
      ChangeFeedFunction:
        Type: AWS::Serverless::Function
        Properties:
          CodeUri: hello_world/
          Handler: app.lambda_handler
          Runtime: python3.10
          Events:
            # Schedule to run every 10 minutes
            ChangeFeedFunctionSchedule:
              Type: Schedule
              Properties:
                Schedule: rate(10 minutes)  # AWS CloudWatch Events schedule
                Name: "ChangeFeedEvery10Minutes"  # Optional: Name of the rule
                Description: "Trigger hello_world Lambda every 10 minutes"
  8. Save the file.

  9. Change directory to the root of your project and run the following command to deploy the serverless function:

    sam deploy --guided
  10. Follow the prompts to deploy the function.

  11. Once the function is deployed, you can view the logs in the AWS CloudWatch console.

Add Fauna key to the Lambda function environment

Add the Fauna key you generated earlier to the Lambda function environment using the AWS CLI.

  1. Run the following command to add the FAUNA_SECRET environment variable to the Lambda function:

    aws lambda update-function-configuration \
      --function-name <your-function-name> \
      --environment Variables="{FAUNA_SECRET=<YOUR_SECRET>}"

Testing

The Lambda function will now automatically trigger every 10 minutes. The function will poll for events that happened in the past 10 minutes. You can verify this in the CloudWatch Logs.

Verify the Event Feed

Run the following FQL query to update the stock of a product to less than 25:

Product.all().first()!.update({ stock: 10 })

After running the query, you should see the event in the CloudWatch logs the next time the Lambda function runs.

Cloudwatch logs showing the Event Feed

Cleanup

To avoid incurring charges, delete the resources you created in this tutorial.

  1. Run the following command to delete the resources:

    sam delete

Is this article helpful? 

Tell Fauna how the article can be improved:
Visit Fauna's forums or email docs@fauna.com

Thank you for your feedback!