Fauna v10 Python client driver (current)
Version: 2.3.0 | Repository: fauna/fauna-python |
---|
Fauna’s Python client driver lets you run FQL queries from Python applications.
This guide shows how to set up the driver and use it to run FQL queries.
This driver can only be used with FQL v10. It’s not compatible with earlier versions of FQL. To use earlier FQL versions, use the faunadb package. |
Supported cloud runtimes
-
AWS Lambda (See AWS Lambda connections)
-
Vercel Functions
Installation
The driver is available on PyPI. To install it, run:
pip install fauna
API reference
API reference documentation for the driver is available at https://fauna.github.io/fauna-python/.
Basic usage
The following application:
-
Initializes a client instance to connect to Fauna
-
Composes a basic FQL query using an
fql
string template -
Runs the query using
query()
from fauna import fql
from fauna.client import Client
from fauna.encoding import QuerySuccess
from fauna.errors import FaunaException
# Initialize the client to connect to Fauna
client = Client(secret='FAUNA_SECRET')
try:
# Compose a query
query = fql(
"""
Product.sortedByPriceLowToHigh() {
name,
description,
price
}"""
)
# Run the query
res: QuerySuccess = client.query(query)
print(res.data)
except FaunaException as e:
print(e)
finally:
# Clean up any remaining resources
client.close()
Connect to Fauna
Each Fauna query is an independently authenticated request to the Core HTTP API’s Query endpoint. You authenticate with Fauna using an authentication secret.
Get an authentication secret
Fauna supports several secret types. For testing, you can create a key, which is a type of secret:
-
Log in to the Fauna Dashboard.
-
On the Explorer page, create a database.
-
In the database’s Keys tab, click Create Key.
-
Choose a Role of server.
-
Click Save.
-
Copy the Key Secret. The secret is scoped to the database.
Initialize a client
To send query requests to Fauna, initialize a Client
instance using a Fauna
authentication secret:
client = Client(secret='FAUNA_SECRET')
If not specified, secret
defaults to the FAUNA_SECRET
environment variable.
For other configuration options, see Client configuration.
Connect to a child database
A scoped key lets you use a parent database’s admin key to send query requests to its child databases.
For example, if you have an admin key for a parent database and want to
connect to a child database named childDB
, you can create a scoped key using
the following format:
// Scoped key that impersonates an `admin` key for
// the `childDB` child database.
fn...:childDB:admin
You can then initialize a Client
instance using the scoped key:
client = Client(secret='fn...:childDB:admin')
Multiple connections
You can use a single client instance to run multiple asynchronous queries at once. The driver manages HTTP connections as needed. Your app doesn’t need to implement connection pools or other connection management strategies.
You can create multiple client instances to connect to Fauna using different credentials or client configurations.
AWS Lambda connections
AWS Lambda freezes, thaws, and reuses execution environments for Lambda functions. See Lambda execution environment.
When an execution environment is thawed, Lambda only runs the function’s handler code. Objects declared outside of the handler method remain initialized from before the freeze. Lambda doesn’t re-run initialization code outside the handler.
Fauna drivers keep socket connections that can time out during long freezes,
causing ECONNRESET
errors when thawed.
To prevent timeouts, create Fauna client connections inside function handlers. Fauna drivers use lightweight HTTP connections. You can create new connections for each request while maintaining good performance.
Run FQL queries
Use fql
string templates to compose FQL queries. Run the queries using
query()
:
query = fql("Product.sortedByPriceLowToHigh()")
client.query(query)
By default, query()
uses query options from the Client configuration. You
can pass options to query()
to override these defaults. See Query options.
You can only compose FQL queries using string templates.
Variable interpolation
The driver supports queries with Python primitives, lists, and dicts.
Use ${}
to pass native Python variables to fql
queries as
kwargs. You can escape a variable by prepending an additional $
.
# Create a native Python var
collection_name = 'Product'
# Pass the var to an FQL query
query = fql('''
let collection = Collection(${collection_name})
collection.sortedByPriceLowToHigh()''',
collection_name=collection_name)
client.query(query);
The driver encodes interpolated variables to an appropriate FQL type and uses the wire protocol to pass the query to the Core HTTP API’s Query endpoint. This helps prevent injection attacks.
Query composition
You can use variable interpolation to pass FQL string templates as query fragments to compose an FQL query:
# Create a reusable query fragment.
product = fql('Product.byName("pizza").first()')
# Use the fragment in another FQL query.
query = fql(f'''
let product = {product}
product {{
name,
price
}}
''')
client.query(query)
Pagination
Use paginate()
to iterate through a Set that contains more than one page of
results. paginate()
accepts the same Query options as query()
.
# Adjust `pageSize()` size as needed.
query = fql('''
Product.sortedByPriceLowToHigh()
.pageSize(2)''')
pages = client.paginate(query);
for products in pages:
for product in products:
print(products)
Query stats
Successful query responses and ServiceError
errors return
query stats:
from fauna import fql
from fauna.client import Client
from fauna.errors import ServiceError
client = Client(secret='FAUNA_SECRET')
try:
query = fql('"Hello world"')
res = client.query(query)
print(res.stats)
except ServiceError as e:
if e.stats is not None:
print(e.stats)
# more error handling...
User-defined classes
Serialization and deserialization with user-defined classes is not supported.
When composing FQL queries, adapt your classes into dicts or lists. When instantiating classes from a query result, build them from the expected result.
class MyClass:
def __init__ (self, my_prop):
self.my_prop = my_prop
def to_dict(self):
return { 'my_prop': self.my_prop }
@static_method
def from_result(obj):
return MyClass(obj['my_prop'])
Client configuration
The Client
instance comes with reasonable configuration defaults. We recommend
using the defaults in most cases.
If needed, you can configure the client to override the defaults. This also lets you set default Query options.
from datetime import timedelta
from fauna.client import Client
from fauna.client.headers import Header
from fauna.client.endpoints import Endpoints
config = {
# Configure the client
'secret': 'FAUNA_SECRET',
'endpoint': Endpoints.Default,
'client_buffer_timeout': timedelta(seconds=5),
'http_read_timeout': None,
'http_write_timeout': timedelta(seconds=5),
'http_connect_timeout': timedelta(seconds=5),
'http_pool_timeout': timedelta(seconds=5),
'http_idle_timeout': timedelta(seconds=5),
'max_attempts': 3,
'max_backoff': 20,
# Set default query options
'additional_headers': {'foo': 'bar'},
'linearized': False,
'max_contention_retries': 5,
'query_tags': {'tag': 'value'},
'query_timeout': timedelta(seconds=60),
'typecheck': True,
}
client = Client(**config)
For supported parameters, see Client in the API reference.
Environment variables
By default, secret
and endpoint
default to the respective FAUNA_SECRET
and
FAUNA_ENDPOINT
environment variables.
For example, if you set the following environment variables:
export FAUNA_SECRET=FAUNA_SECRET
export FAUNA_ENDPOINT=https://db.fauna.com/
You can initialize the client with a default configuration:
client = Client()
Retries
By default, the client automatically retries query requests that return a
limit_exceeded
[error code. Retries
use an exponential backoff.
Use the Client configuration's max_backoff
parameter to set the maximum
time between retries. Similarly, use max_attempts
to set the maximum number of
retry attempts.
Query options
The Client configuration sets default query options for the following methods:
-
query()
-
paginate()
You can pass a QueryOptions
object to override these defaults:
options = QueryOptions(
additional_headers={'foo': 'bar'},
linearized=False,
max_contention_retries=5,
query_tags={'name': 'hello world query'},
query_timeout=timedelta(seconds=60),
traceparent='00-750efa5fb6a131eb2cf4db39f28366cb-000000000000000b-00',
typecheck=True
)
client.query(fql('"Hello world"'), options)
For supported properties, see QueryOptions in the API reference.
Event Feeds
The driver supports Event Feeds. An Event Feed asynchronously polls an event source for events.
To use Event Feeds, you must have a Pro or Enterprise plan.
Request an Event Feed
To get an event source, append
set.eventSource()
or
set.eventsOn()
to a
supported Set.
To get paginated events, pass the event source to feed()
:
from fauna import fql
from fauna.client import Client
client = Client()
response = client.query(fql('''
let set = Product.all()
{
initialPage: set.pageSize(10),
eventSource: set.eventSource()
}
'''))
initial_page = response.data['initialPage']
event_source = response.data['eventSource']
feed = client.feed(event_source)
If changes occur between the creation of the event source and the feed()
request, the feed replays and emits any related events.
You can also pass a query that produces an event source directly to
feed()
:
query = fql('Product.all().eventsOn(.price, .stock)')
feed = client.feed(query)
In most cases, you’ll get events after a specific start time or cursor.
Get events after a specific start time
When you first poll an event source using an Event Feed, you usually include a
start_ts
(start timestamp) in the FeedOptions
object that’s passed to feed()
. The request returns events that
occurred after the specified timestamp (exclusive).
start_ts
is an integer representing a time in microseconds since the Unix
epoch:
from fauna import fql
from fauna.client import Client, FeedOptions
from datetime import datetime, timedelta
client = Client()
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
# Convert to microseconds
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
options = FeedOptions(
start_ts=start_ts
)
feed = client.feed(fql('Product.all().eventSource()'), options)
start_ts
must be later than the creation time of the event source. The
period between the request and the start_ts
can’t exceed the history_days
setting for the source Set’s collection. If history_days
is 0
or unset, the
period is limited to 15 minutes.
Get events after a specific cursor
After the initial request, you usually get subsequent events using the
cursor for the last page or event.
To get events after a cursor (exclusive), include the cursor
in the
FeedOptions
object that’s passed to
feed()
:
from fauna import fql
from fauna.client import Client, FeedOptions
from datetime import datetime, timedelta
client = Client()
options = FeedOptions(
# Cursor for a previous page
cursor='gsGabc456'
)
feed = client.feed(fql('Product.all().eventSource()'), options)
Iterate on an Event Feed
feed()
returns an iterator that emits pages of events. You can use a
for loop to iterate through the pages:
query = fql('Product.all().eventsOn(.price, .stock)')
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
options = FeedOptions(
start_ts=start_ts
)
feed = client.feed(query, options)
for page in feed:
print('Page stats: ', page.stats)
for event in page:
eventType = event['type']
if (eventType == 'add'):
# Do something on add
print('Add event: ', event)
elif (eventType == 'update'):
# Do something on update
print('Update event: ', event)
elif (eventType == 'remove'):
# Do something on remove
print('Remove event: ', event)
The Event Feed iterator will stop once there are no more events to poll.
Each page includes a top-level cursor
. You can include the cursor in a
FeedOptions
object passed to feed()
to
poll for events after the cursor:
import time
from datetime import datetime, timedelta
from fauna import fql
from fauna.client import Client, FeedOptions
def process_feed(client, query, start_ts=None, sleep_time=300):
cursor = None
while True:
options = FeedOptions(
start_ts=start_ts if cursor is None else None,
cursor=cursor,
)
feed = client.feed(query, options)
for page in feed:
for event in page:
event_type = event['type']
if event_type == 'add':
# Do something on add
print('Add event: ', event)
elif event_type == 'update':
# Do something on update
print('Update event: ', event)
elif event_type == 'remove':
# Do something on remove
print('Remove event: ', event)
# Store the cursor of the last page
cursor = page.cursor
# Clear the start timestamp after the first request
start_ts = None
print(f"Sleeping for {sleep_time} seconds...")
time.sleep(sleep_time)
client = Client()
query = fql('Product.all().eventsOn(.price, .stock)')
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
process_feed(client, query, start_ts=start_ts)
Alternatively, you can get events as a single, flat array:
import time
from datetime import datetime, timedelta
from fauna import fql
from fauna.client import Client, FeedOptions
def process_feed(client, query, start_ts=None, sleep_time=300):
cursor = None
while True:
options = FeedOptions(
start_ts=start_ts if cursor is None else None,
cursor=cursor,
)
feed = client.feed(query, options)
for event in feed.flatten():
event_type = event['type']
if event_type == 'add':
# Do something on add
print('Add event: ', event)
elif event_type == 'update':
# Do something on update
print('Update event: ', event)
elif event_type == 'remove':
# Do something on remove
print('Remove event: ', event)
# Store the cursor of the last page
cursor = event['cursor']
# Clear the start timestamp after the first request
start_ts = None
print(f"Sleeping for {sleep_time} seconds...")
time.sleep(sleep_time)
client = Client()
query = fql('Product.all().eventsOn(.price, .stock)')
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
process_feed(client, query, start_ts=start_ts)
If needed, you can store the cursor as a collection document. For an example, see the Event Feeds app.
Error handling
If a non-retryable error occurs when opening or processing an Event Feed, Fauna
raises a FaunaException
:
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException
client = Client()
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
options = FeedOptions(
start_ts=start_ts
)
feed = client.feed(fql(
'Product.all().eventsOn(.price, .stock)'
), options)
for page in feed:
try:
for event in page:
print(event)
# ...
except FaunaException as e:
print('error ocurred with event processing: ', e)
# The current event will be skipped
Each page’s cursor
contains the cursor for the page’s last successfully
processed event. If you’re using a loop to poll for changes, using the
cursor will result in skipping any events that caused errors.
Event Feed options
The client configuration sets default options for the feed()
method.
You can pass a FeedOptions
object to override these defaults:
from fauna import fql
from fauna.client import Client, FeedOptions
from datetime import timedelta
client = Client()
options = FeedOptions(
max_attempts=3,
max_backoff=20,
query_timeout=timedelta(seconds=5),
page_size=None,
cursor=None,
start_ts=None,
)
client.feed(fql('Product.all().eventSource()'), options)
For supported properties, see FeedOptions in the API reference.
Sample app
For a practical example that uses the Python driver with Event Feeds, check out the Event Feeds sample app.
Event Streaming
The driver supports Event Streaming.
Start a stream
To get an event source, append
set.eventSource()
or
set.eventsOn()
to a
supported Set.
To stream the source’s events, pass the event source to stream()
:
import fauna
from fauna import fql
from fauna.client import Client, StreamOptions
client = Client()
response = client.query(fql('''
let set = Product.all()
{
initialPage: set.pageSize(10),
eventSource: set.eventSource()
}
'''))
initial_page = response.data['initialPage']
event_source = response.data['eventSource']
client.stream(event_source)
You can also pass a query that produces an event source directly to stream()
:
query = fql('Product.all().eventsOn(.price, .stock)')
client.stream(query)
Iterate on a stream
stream()
returns an iterator that emits events as they occur.
You can use a generator expression to iterate through the events:
query = fql('Product.all().eventsOn(.price, .stock)')
with client.stream(query) as stream:
for event in stream:
eventType = event['type']
if (eventType == 'add'):
print('Add event: ', event)
## ...
elif (eventType == 'update'):
print('Update event: ', event)
## ...
elif (eventType == 'remove'):
print('Remove event: ', event)
## ...
Close a stream
Use close()
to close a stream:
query = fql('Product.all().eventsOn(.price, .stock)')
count = 0
with client.stream(query) as stream:
for event in stream:
print('Stream event', event)
# ...
count+=1
if (count == 2):
stream.close()
Error handling
If a non-retryable error occurs when opening or processing a stream, Fauna
raises a FaunaException
:
import fauna
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException
client = Client(secret='FAUNA_SECRET')
try:
with client.stream(fql(
'Product.all().eventsOn(.price, .stock)'
)) as stream:
for event in stream:
print(event)
# ...
except FaunaException as e:
print('error ocurred with stream: ', e)
Stream options
The Client configuration sets default options for the
stream()
method.
You can pass a StreamOptions
object to override these defaults:
options = StreamOptions(
max_attempts=5,
max_backoff=1,
start_ts=1710968002310000,
status_events=True
)
client.stream(fql('Product.all().eventSource()'), options)
For supported properties, see StreamOptions in the API reference.
Logging
Logging is handled using Python’s standard logging
package under the fauna
namespace. Logs include the HTTP request with body (excluding the
Authorization
header) and the full HTTP response.
To enable logging:
import logging
from fauna.client import Client
from fauna import fql
logging.basicConfig(
level=logging.DEBUG
)
client = Client()
client.query(fql('42'))
For configuration options or to set specific log levels, see Python’s Logging HOWTO.
Is this article helpful?
Tell Fauna how the article can be improved:
Visit Fauna's forums
or email docs@fauna.com
Thank you for your feedback!