Fauna v10 Python client driver (current)

Version: 2.3.0 Repository: fauna/fauna-python

Fauna’s Python client driver lets you run FQL queries from Python applications.

This guide shows how to set up the driver and use it to run FQL queries.

This driver can only be used with FQL v10. It’s not compatible with earlier versions of FQL. To use earlier FQL versions, use the faunadb package.

Supported Python versions

  • Python 3.9

  • Python 3.10

  • Python 3.11

  • Python 3.12

Supported cloud runtimes

Installation

The driver is available on PyPI. To install it, run:

pip install fauna

API reference

API reference documentation for the driver is available at https://fauna.github.io/fauna-python/.

Basic usage

The following application:

  • Initializes a client instance to connect to Fauna

  • Composes a basic FQL query using an fql string template

  • Runs the query using query()

from fauna import fql
from fauna.client import Client
from fauna.encoding import QuerySuccess
from fauna.errors import FaunaException

# Initialize the client to connect to Fauna
client = Client(secret='FAUNA_SECRET')

try:
    # Compose a query
    query = fql(
        """
        Product.sortedByPriceLowToHigh() {
            name,
            description,
            price
            }"""
    )

    # Run the query
    res: QuerySuccess = client.query(query)
    print(res.data)
except FaunaException as e:
    print(e)
finally:
    # Clean up any remaining resources
    client.close()

Connect to Fauna

Each Fauna query is an independently authenticated request to the Core HTTP API’s Query endpoint. You authenticate with Fauna using an authentication secret.

Get an authentication secret

Fauna supports several secret types. For testing, you can create a key, which is a type of secret:

  1. Log in to the Fauna Dashboard.

  2. On the Explorer page, create a database.

  3. In the database’s Keys tab, click Create Key.

  4. Choose a Role of server.

  5. Click Save.

  6. Copy the Key Secret. The secret is scoped to the database.

Initialize a client

To send query requests to Fauna, initialize a Client instance using a Fauna authentication secret:

client = Client(secret='FAUNA_SECRET')

If not specified, secret defaults to the FAUNA_SECRET environment variable. For other configuration options, see Client configuration.

Connect to a child database

A scoped key lets you use a parent database’s admin key to send query requests to its child databases.

For example, if you have an admin key for a parent database and want to connect to a child database named childDB, you can create a scoped key using the following format:

// Scoped key that impersonates an `admin` key for
// the `childDB` child database.
fn...:childDB:admin

You can then initialize a Client instance using the scoped key:

client = Client(secret='fn...:childDB:admin')

Multiple connections

You can use a single client instance to run multiple asynchronous queries at once. The driver manages HTTP connections as needed. Your app doesn’t need to implement connection pools or other connection management strategies.

You can create multiple client instances to connect to Fauna using different credentials or client configurations.

AWS Lambda connections

AWS Lambda freezes, thaws, and reuses execution environments for Lambda functions. See Lambda execution environment.

When an execution environment is thawed, Lambda only runs the function’s handler code. Objects declared outside of the handler method remain initialized from before the freeze. Lambda doesn’t re-run initialization code outside the handler.

Fauna drivers keep socket connections that can time out during long freezes, causing ECONNRESET errors when thawed.

To prevent timeouts, create Fauna client connections inside function handlers. Fauna drivers use lightweight HTTP connections. You can create new connections for each request while maintaining good performance.

Run FQL queries

Use fql string templates to compose FQL queries. Run the queries using query():

query = fql("Product.sortedByPriceLowToHigh()")
client.query(query)

By default, query() uses query options from the Client configuration. You can pass options to query() to override these defaults. See Query options.

You can only compose FQL queries using string templates.

Variable interpolation

The driver supports queries with Python primitives, lists, and dicts.

Use ${} to pass native Python variables to fql queries as kwargs. You can escape a variable by prepending an additional $.

# Create a native Python var
collection_name = 'Product'

# Pass the var to an FQL query
query = fql('''
  let collection = Collection(${collection_name})
  collection.sortedByPriceLowToHigh()''',
  collection_name=collection_name)

client.query(query);

The driver encodes interpolated variables to an appropriate FQL type and uses the wire protocol to pass the query to the Core HTTP API’s Query endpoint. This helps prevent injection attacks.

Query composition

You can use variable interpolation to pass FQL string templates as query fragments to compose an FQL query:

# Create a reusable query fragment.
product = fql('Product.byName("pizza").first()')

# Use the fragment in another FQL query.
query = fql(f'''
  let product = {product}
  product {{
    name,
    price
  }}
''')

client.query(query)

Pagination

Use paginate() to iterate through a Set that contains more than one page of results. paginate() accepts the same Query options as query().

# Adjust `pageSize()` size as needed.
query = fql('''
  Product.sortedByPriceLowToHigh()
    .pageSize(2)''')

pages = client.paginate(query);

for products in pages:
    for product in products:
        print(products)

Query stats

Successful query responses and ServiceError errors return query stats:

from fauna import fql
from fauna.client import Client
from fauna.errors import ServiceError

client = Client(secret='FAUNA_SECRET')

try:
    query = fql('"Hello world"')
    res = client.query(query)
    print(res.stats)
except ServiceError as e:
    if e.stats is not None:
        print(e.stats)
    # more error handling...

User-defined classes

Serialization and deserialization with user-defined classes is not supported.

When composing FQL queries, adapt your classes into dicts or lists. When instantiating classes from a query result, build them from the expected result.

class MyClass:
    def __init__ (self, my_prop):
        self.my_prop = my_prop

    def to_dict(self):
        return { 'my_prop': self.my_prop }

    @static_method
    def from_result(obj):
        return MyClass(obj['my_prop'])

Client configuration

The Client instance comes with reasonable configuration defaults. We recommend using the defaults in most cases.

If needed, you can configure the client to override the defaults. This also lets you set default Query options.

from datetime import timedelta
from fauna.client import Client
from fauna.client.headers import Header
from fauna.client.endpoints import Endpoints

config = {
    # Configure the client
    'secret': 'FAUNA_SECRET',
    'endpoint': Endpoints.Default,
    'client_buffer_timeout': timedelta(seconds=5),
    'http_read_timeout': None,
    'http_write_timeout': timedelta(seconds=5),
    'http_connect_timeout': timedelta(seconds=5),
    'http_pool_timeout': timedelta(seconds=5),
    'http_idle_timeout': timedelta(seconds=5),
    'max_attempts': 3,
    'max_backoff': 20,

    # Set default query options
    'additional_headers': {'foo': 'bar'},
    'linearized': False,
    'max_contention_retries': 5,
    'query_tags': {'tag': 'value'},
    'query_timeout': timedelta(seconds=60),
    'typecheck': True,
}

client = Client(**config)

For supported parameters, see Client in the API reference.

Environment variables

By default, secret and endpoint default to the respective FAUNA_SECRET and FAUNA_ENDPOINT environment variables.

For example, if you set the following environment variables:

export FAUNA_SECRET=FAUNA_SECRET
export FAUNA_ENDPOINT=https://db.fauna.com/

You can initialize the client with a default configuration:

client = Client()

Retries

By default, the client automatically retries query requests that return a limit_exceeded [error code. Retries use an exponential backoff.

Use the Client configuration's max_backoff parameter to set the maximum time between retries. Similarly, use max_attempts to set the maximum number of retry attempts.

Query options

The Client configuration sets default query options for the following methods:

  • query()

  • paginate()

You can pass a QueryOptions object to override these defaults:

options = QueryOptions(
    additional_headers={'foo': 'bar'},
    linearized=False,
    max_contention_retries=5,
    query_tags={'name': 'hello world query'},
    query_timeout=timedelta(seconds=60),
    traceparent='00-750efa5fb6a131eb2cf4db39f28366cb-000000000000000b-00',
    typecheck=True
)

client.query(fql('"Hello world"'), options)

For supported properties, see QueryOptions in the API reference.

Event Feeds

The driver supports Event Feeds. An Event Feed asynchronously polls an event source for events.

To use Event Feeds, you must have a Pro or Enterprise plan.

Request an Event Feed

To get an event source, append set.eventSource() or set.eventsOn() to a supported Set.

To get paginated events, pass the event source to feed():

from fauna import fql
from fauna.client import Client

client = Client()

response = client.query(fql('''
    let set = Product.all()
      {
        initialPage: set.pageSize(10),
        eventSource: set.eventSource()
      }
    '''))

initial_page = response.data['initialPage']
event_source = response.data['eventSource']

feed = client.feed(event_source)

If changes occur between the creation of the event source and the feed() request, the feed replays and emits any related events.

You can also pass a query that produces an event source directly to feed():

query = fql('Product.all().eventsOn(.price, .stock)')

feed = client.feed(query)

In most cases, you’ll get events after a specific start time or cursor.

Get events after a specific start time

When you first poll an event source using an Event Feed, you usually include a start_ts (start timestamp) in the FeedOptions object that’s passed to feed(). The request returns events that occurred after the specified timestamp (exclusive).

start_ts is an integer representing a time in microseconds since the Unix epoch:

from fauna import fql
from fauna.client import Client, FeedOptions
from datetime import datetime, timedelta

client = Client()

# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
# Convert to microseconds
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)

options = FeedOptions(
  start_ts=start_ts
 )

feed = client.feed(fql('Product.all().eventSource()'), options)

start_ts must be later than the creation time of the event source. The period between the request and the start_ts can’t exceed the history_days setting for the source Set’s collection. If history_days is 0 or unset, the period is limited to 15 minutes.

Get events after a specific cursor

After the initial request, you usually get subsequent events using the cursor for the last page or event. To get events after a cursor (exclusive), include the cursor in the FeedOptions object that’s passed to feed():

from fauna import fql
from fauna.client import Client, FeedOptions
from datetime import datetime, timedelta

client = Client()

options = FeedOptions(
  # Cursor for a previous page
  cursor='gsGabc456'
 )

feed = client.feed(fql('Product.all().eventSource()'), options)

Iterate on an Event Feed

feed() returns an iterator that emits pages of events. You can use a for loop to iterate through the pages:

query = fql('Product.all().eventsOn(.price, .stock)')

# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)

options = FeedOptions(
  start_ts=start_ts
 )

feed = client.feed(query, options)
for page in feed:
  print('Page stats: ', page.stats)

  for event in page:
    eventType = event['type']
    if (eventType == 'add'):
      # Do something on add
      print('Add event: ', event)
    elif (eventType == 'update'):
      # Do something on update
      print('Update event: ', event)
    elif (eventType == 'remove'):
      # Do something on remove
      print('Remove event: ', event)

The Event Feed iterator will stop once there are no more events to poll.

Each page includes a top-level cursor. You can include the cursor in a FeedOptions object passed to feed() to poll for events after the cursor:

import time
from datetime import datetime, timedelta
from fauna import fql
from fauna.client import Client, FeedOptions

def process_feed(client, query, start_ts=None, sleep_time=300):
    cursor = None
    while True:
        options = FeedOptions(
            start_ts=start_ts if cursor is None else None,
            cursor=cursor,
        )

        feed = client.feed(query, options)

        for page in feed:
            for event in page:
                event_type = event['type']
                if event_type == 'add':
                    # Do something on add
                    print('Add event: ', event)
                elif event_type == 'update':
                    # Do something on update
                    print('Update event: ', event)
                elif event_type == 'remove':
                    # Do something on remove
                    print('Remove event: ', event)

            # Store the cursor of the last page
            cursor = page.cursor

        # Clear the start timestamp after the first request
        start_ts = None

        print(f"Sleeping for {sleep_time} seconds...")
        time.sleep(sleep_time)

client = Client()
query = fql('Product.all().eventsOn(.price, .stock)')

# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)

process_feed(client, query, start_ts=start_ts)

Alternatively, you can get events as a single, flat array:

import time
from datetime import datetime, timedelta
from fauna import fql
from fauna.client import Client, FeedOptions

def process_feed(client, query, start_ts=None, sleep_time=300):
    cursor = None
    while True:
        options = FeedOptions(
            start_ts=start_ts if cursor is None else None,
            cursor=cursor,
        )

        feed = client.feed(query, options)

        for event in feed.flatten():
            event_type = event['type']
            if event_type == 'add':
                # Do something on add
                print('Add event: ', event)
            elif event_type == 'update':
                # Do something on update
                print('Update event: ', event)
            elif event_type == 'remove':
                # Do something on remove
                print('Remove event: ', event)

        # Store the cursor of the last page
        cursor = event['cursor']

        # Clear the start timestamp after the first request
        start_ts = None

        print(f"Sleeping for {sleep_time} seconds...")
        time.sleep(sleep_time)

client = Client()
query = fql('Product.all().eventsOn(.price, .stock)')

# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)

process_feed(client, query, start_ts=start_ts)

If needed, you can store the cursor as a collection document. For an example, see the Event Feeds app.

Error handling

If a non-retryable error occurs when opening or processing an Event Feed, Fauna raises a FaunaException:

from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException

client = Client()

# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)

options = FeedOptions(
  start_ts=start_ts
 )

feed = client.feed(fql(
    'Product.all().eventsOn(.price, .stock)'
), options)

for page in feed:
  try:
    for event in page:
      print(event)
      # ...
  except FaunaException as e:
    print('error ocurred with event processing: ', e)
    # The current event will be skipped

Each page’s cursor contains the cursor for the page’s last successfully processed event. If you’re using a loop to poll for changes, using the cursor will result in skipping any events that caused errors.

Event Feed options

The client configuration sets default options for the feed() method.

You can pass a FeedOptions object to override these defaults:

from fauna import fql
from fauna.client import Client, FeedOptions
from datetime import timedelta

client = Client()

options = FeedOptions(
  max_attempts=3,
  max_backoff=20,
  query_timeout=timedelta(seconds=5),
  page_size=None,
  cursor=None,
  start_ts=None,
 )

client.feed(fql('Product.all().eventSource()'), options)

For supported properties, see FeedOptions in the API reference.

Sample app

For a practical example that uses the Python driver with Event Feeds, check out the Event Feeds sample app.

Event Streaming

The driver supports Event Streaming.

Start a stream

To get an event source, append set.eventSource() or set.eventsOn() to a supported Set.

To stream the source’s events, pass the event source to stream():

import fauna

from fauna import fql
from fauna.client import Client, StreamOptions

client = Client()

response = client.query(fql('''
  let set = Product.all()
  {
    initialPage: set.pageSize(10),
    eventSource: set.eventSource()
  }
  '''))

initial_page = response.data['initialPage']
event_source = response.data['eventSource']

client.stream(event_source)

You can also pass a query that produces an event source directly to stream():

query = fql('Product.all().eventsOn(.price, .stock)')

client.stream(query)

Iterate on a stream

stream() returns an iterator that emits events as they occur. You can use a generator expression to iterate through the events:

query = fql('Product.all().eventsOn(.price, .stock)')

with client.stream(query) as stream:
    for event in stream:
        eventType = event['type']
        if (eventType == 'add'):
            print('Add event: ', event)
            ## ...
        elif (eventType == 'update'):
            print('Update event: ', event)
            ## ...
        elif (eventType == 'remove'):
            print('Remove event: ', event)
            ## ...

Close a stream

Use close() to close a stream:

query = fql('Product.all().eventsOn(.price, .stock)')

count = 0
with client.stream(query) as stream:
    for event in stream:
        print('Stream event', event)
        # ...
        count+=1

        if (count == 2):
            stream.close()

Error handling

If a non-retryable error occurs when opening or processing a stream, Fauna raises a FaunaException:

import fauna

from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException

client = Client(secret='FAUNA_SECRET')

try:
    with client.stream(fql(
        'Product.all().eventsOn(.price, .stock)'
    )) as stream:
        for event in stream:
            print(event)
        # ...
except FaunaException as e:
    print('error ocurred with stream: ', e)

Stream options

The Client configuration sets default options for the stream() method.

You can pass a StreamOptions object to override these defaults:

options = StreamOptions(
    max_attempts=5,
    max_backoff=1,
    start_ts=1710968002310000,
    status_events=True
)

client.stream(fql('Product.all().eventSource()'), options)

For supported properties, see StreamOptions in the API reference.

Logging

Logging is handled using Python’s standard logging package under the fauna namespace. Logs include the HTTP request with body (excluding the Authorization header) and the full HTTP response.

To enable logging:

import logging
from fauna.client import Client
from fauna import fql

logging.basicConfig(
    level=logging.DEBUG
)
client = Client()
client.query(fql('42'))

For configuration options or to set specific log levels, see Python’s Logging HOWTO.

Is this article helpful? 

Tell Fauna how the article can be improved:
Visit Fauna's forums or email docs@fauna.com

Thank you for your feedback!