# Fauna v10 Python client driver (current)
| Version: 2.4.0 | Repository: fauna/fauna-python |
| --- | --- | --- | --- |
Fauna’s Python client driver lets you run FQL queries from Python applications.
This guide shows how to set up the driver and use it to run FQL queries.
## [](#supported-python-versions)Supported Python versions
* Python 3.9
* Python 3.10
* Python 3.11
* Python 3.12
## [](#supported-cloud-runtimes)Supported cloud runtimes
* AWS Lambda (See [AWS Lambda connections](#aws-lambda-connections))
* Vercel Functions
## [](#installation)Installation
The driver is available on [PyPI](https://pypi.org/project/fauna/). To install it, run:
```bash
pip install fauna
```
## [](#api-reference)API reference
API reference documentation for the driver is available at [https://fauna.github.io/fauna-python/](https://fauna.github.io/fauna-python/).
## [](#sample-app)Sample app
For a practical example, check out the [Python sample app](https://github.com/fauna/python-sample-app).
This sample app is an e-commerce application that uses Python3, Flask, and the Fauna Python driver. The source code includes comments highlighting best practices for using the driver and composing FQL queries.
## [](#basic-usage)Basic usage
The following application:
* Initializes a client instance to connect to Fauna
* Composes a basic FQL query using an `fql` string template
* Runs the query using `query()`
```python
from fauna import fql
from fauna.client import Client
from fauna.encoding import QuerySuccess
from fauna.errors import FaunaException
# Initialize the client to connect to Fauna
client = Client(secret='FAUNA_SECRET')
try:
# Compose a query
query = fql(
"""
Product.sortedByPriceLowToHigh() {
name,
description,
price
}"""
)
# Run the query
res: QuerySuccess = client.query(query)
print(res.data)
except FaunaException as e:
print(e)
finally:
# Clean up any remaining resources
client.close()
```
## [](#connect-to-fauna)Connect to Fauna
Each Fauna query is an independently authenticated request to the Core HTTP API’s [Query endpoint](../../../reference/http/reference/core-api/#operation/query). You authenticate with Fauna using an [authentication secret](../../../learn/security/authentication/#secrets).
### [](#get-an-authentication-secret)Get an authentication secret
Fauna supports several [secret types](../../../learn/security/authentication/#secret-types). For testing, you can create a [key](../../../learn/security/keys/), which is a type of secret:
1. Log in to the [Fauna Dashboard](https://dashboard.fauna.com/).
2. On the **Explorer** page, create a database.
3. In the database’s **Keys** tab, click **Create Key**.
4. Choose a **Role** of **server**.
5. Click **Save**.
6. Copy the **Key Secret**. The secret is scoped to the database.
### [](#initialize-a-client)Initialize a client
To send query requests to Fauna, initialize a `Client` instance using a Fauna authentication secret:
```python
client = Client(secret='FAUNA_SECRET')
```
If not specified, `secret` defaults to the `FAUNA_SECRET` environment variable. For other configuration options, see [Client configuration](#config).
### [](#connect-to-a-child-database)Connect to a child database
A [scoped key](../../../learn/security/keys/#scoped-keys) lets you use a parent database’s admin key to send query requests to its child databases.
For example, if you have an admin key for a parent database and want to connect to a child database named `childDB`, you can create a scoped key using the following format:
```
// Scoped key that impersonates an `admin` key for
// the `childDB` child database.
fn...:childDB:admin
```
You can then initialize a `Client` instance using the scoped key:
```python
client = Client(secret='fn...:childDB:admin')
```
### [](#multiple-connections)Multiple connections
You can use a single client instance to run multiple asynchronous queries at once. The driver manages HTTP connections as needed. Your app doesn’t need to implement connection pools or other connection management strategies.
You can create multiple client instances to connect to Fauna using different credentials or client configurations.
### [](#aws-lambda-connections)AWS Lambda connections
AWS Lambda freezes, thaws, and reuses execution environments for Lambda functions. See [Lambda execution environment](https://docs.aws.amazon.com/lambda/latest/dg/running-lambda-code.html).
When an execution environment is thawed, Lambda only runs the function’s handler code. Objects declared outside of the handler method remain initialized from before the freeze. Lambda doesn’t re-run initialization code outside the handler.
Fauna drivers keep socket connections that can time out during long freezes, causing `ECONNRESET` errors when thawed.
To prevent timeouts, create Fauna client connections inside function handlers. Fauna drivers use lightweight HTTP connections. You can create new connections for each request while maintaining good performance.
## [](#run-fql-queries)Run FQL queries
Use `fql` string templates to compose FQL queries. Run the queries using `query()`:
```python
query = fql("Product.sortedByPriceLowToHigh()")
client.query(query)
```
By default, `query()` uses query options from the [Client configuration](#config). You can pass options to `query()` to override these defaults. See [Query options](#query-opts).
You can only compose FQL queries using string templates.
### [](#var)Variable interpolation
The driver supports queries with Python primitives, lists, and dicts.
Use `${}` to pass native Python variables to `fql` queries as kwargs. You can escape a variable by prepending an additional `$`.
```python
# Create a native Python var
collection_name = 'Product'
# Pass the var to an FQL query
query = fql('''
let collection = Collection(${collection_name})
collection.sortedByPriceLowToHigh()''',
collection_name=collection_name)
client.query(query);
```
The driver encodes interpolated variables to an appropriate [FQL type](../../../reference/fql/types/) and uses the [wire protocol](../../../reference/http/reference/wire-protocol/) to pass the query to the Core HTTP API’s [Query endpoint](../../../reference/http/reference/core-api/#operation/query). This helps prevent injection attacks.
### [](#query-composition)Query composition
You can use variable interpolation to pass FQL string templates as query fragments to compose an FQL query:
```python
# Create a reusable query fragment.
product = fql('Product.byName("pizza").first()')
# Use the fragment in another FQL query.
query = fql(f'''
let product = {product}
product {{
name,
price
}}
''')
client.query(query)
```
### [](#pagination)Pagination
Use `paginate()` to iterate through a Set that contains more than one page of results. `paginate()` accepts the same [Query options](#query-opts) as `query()`.
```python
# Adjust `pageSize()` size as needed.
query = fql('''
Product.sortedByPriceLowToHigh()
.pageSize(2)''')
pages = client.paginate(query)
for products in pages:
for product in products:
print(product)
```
### [](#query-stats)Query stats
Successful query responses and `ServiceError` errors return [query stats](../../../reference/http/reference/query-stats/):
```python
from fauna import fql
from fauna.client import Client
from fauna.errors import ServiceError
client = Client(secret='FAUNA_SECRET')
try:
query = fql('"Hello world"')
res = client.query(query)
print(res.stats)
except ServiceError as e:
if e.stats is not None:
print(e.stats)
# more error handling...
```
### [](#user-defined-classes)User-defined classes
Serialization and deserialization with user-defined classes is not supported.
When composing FQL queries, adapt your classes into dicts or lists. When instantiating classes from a query result, build them from the expected result.
```python
class MyClass:
def __init__ (self, my_prop):
self.my_prop = my_prop
def to_dict(self):
return { 'my_prop': self.my_prop }
@static_method
def from_result(obj):
return MyClass(obj['my_prop'])
```
## [](#config)Client configuration
The `Client` instance comes with reasonable configuration defaults. We recommend using the defaults in most cases.
If needed, you can configure the client to override the defaults. This also lets you set default [Query options](#query-opts).
```python
from datetime import timedelta
from fauna.client import Client
from fauna.client.headers import Header
from fauna.client.endpoints import Endpoints
config = {
# Configure the client
'secret': 'FAUNA_SECRET',
'endpoint': Endpoints.Default,
'client_buffer_timeout': timedelta(seconds=5),
'http_read_timeout': None,
'http_write_timeout': timedelta(seconds=5),
'http_connect_timeout': timedelta(seconds=5),
'http_pool_timeout': timedelta(seconds=5),
'http_idle_timeout': timedelta(seconds=5),
'max_attempts': 3,
'max_backoff': 20,
# Set default query options
'additional_headers': {'foo': 'bar'},
'linearized': False,
'max_contention_retries': 5,
'query_tags': {'tag': 'value'},
'query_timeout': timedelta(seconds=60),
'typecheck': True,
}
client = Client(**config)
```
For supported parameters, see [Client](https://fauna.github.io/fauna-python/latest/api/fauna/client/client.html#Client) in the API reference.
### [](#environment-variables)Environment variables
By default, `secret` and `endpoint` default to the respective `FAUNA_SECRET` and `FAUNA_ENDPOINT` environment variables.
For example, if you set the following environment variables:
```bash
export FAUNA_SECRET=FAUNA_SECRET
export FAUNA_ENDPOINT=https://db.fauna.com/
```
You can initialize the client with a default configuration:
```python
client = Client()
```
### [](#retries)Retries
By default, the client automatically retries query requests that return a `limit_exceeded` [\[error code](../../../reference/http/reference/errors/). Retries use an exponential backoff.
Use the [Client configuration](#config)'s `max_backoff` parameter to set the maximum time between retries. Similarly, use `max_attempts` to set the maximum number of retry attempts.
## [](#query-opts)Query options
The [Client configuration](#config) sets default query options for the following methods:
* `query()`
* `paginate()`
You can pass a `QueryOptions` object to override these defaults:
```python
options = QueryOptions(
additional_headers={'foo': 'bar'},
linearized=False,
max_contention_retries=5,
query_tags={'name': 'hello world query'},
query_timeout=timedelta(seconds=60),
traceparent='00-750efa5fb6a131eb2cf4db39f28366cb-000000000000000b-00',
typecheck=True
)
client.query(fql('"Hello world"'), options)
```
For supported properties, see [QueryOptions](https://fauna.github.io/fauna-python/latest/api/fauna/client/client.html#QueryOptions) in the API reference.
## [](#event-feeds)Event feeds
The driver supports [event feeds](../../../learn/cdc/#event-feeds). An event feed asynchronously polls an [event source](../../../learn/cdc/) for events.
To use event feeds, you must have a Pro or Enterprise plan.
### [](#request-an-event-feed)Request an event feed
To get an event source, append [`set.eventSource()`](../../../reference/fql-api/set/eventsource/) or [`set.eventsOn()`](../../../reference/fql-api/set/eventson/) to a [supported Set](../../../learn/cdc/#sets).
To get paginated events, pass the event source to `feed()`:
```python
from fauna import fql
from fauna.client import Client
client = Client()
response = client.query(fql('''
let set = Product.all()
{
initialPage: set.pageSize(10),
eventSource: set.eventSource()
}
'''))
initial_page = response.data['initialPage']
event_source = response.data['eventSource']
feed = client.feed(event_source)
```
If changes occur between the creation of the event source and the `feed()` request, the feed replays and emits any related events.
You can also pass a query that produces an event source directly to `feed()`:
```python
query = fql('Product.all().eventsOn(.price, .stock)')
feed = client.feed(query)
```
In most cases, you’ll get events after a specific [start time](#start-time) or [cursor](#cursor).
#### [](#start-time)Get events after a specific start time
When you first poll an event source using an event feed, you usually include a `start_ts` (start timestamp) in the [`FeedOptions` object](#event-feed-opts) that’s passed to `feed()`. The request returns events that occurred after the specified timestamp (exclusive).
`start_ts` is an integer representing a time in microseconds since the Unix epoch:
```python
from fauna import fql
from fauna.client import Client, FeedOptions
from datetime import datetime, timedelta
client = Client()
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
# Convert to microseconds
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
options = FeedOptions(
start_ts=start_ts
)
feed = client.feed(fql('Product.all().eventSource()'), options)
```
#### [](#cursor)Get events after a specific cursor
After the initial request, you usually get subsequent events using the [cursor](../../../learn/cdc/#cursor) for the last page or event. To get events after a cursor (exclusive), include the `cursor` in the [`FeedOptions` object](#event-feed-opts) that’s passed to `feed()`:
```python
from fauna import fql
from fauna.client import Client, FeedOptions
from datetime import datetime, timedelta
client = Client()
options = FeedOptions(
# Cursor for a previous page
cursor='gsGabc456'
)
feed = client.feed(fql('Product.all().eventSource()'), options)
```
### [](#loop)Iterate on an event feed
`feed()` returns an iterator that emits pages of events. You can use a for loop to iterate through the pages:
```python
query = fql('Product.all().eventsOn(.price, .stock)')
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
options = FeedOptions(
start_ts=start_ts
)
feed = client.feed(query, options)
for page in feed:
print('Page stats: ', page.stats)
for event in page:
eventType = event['type']
if (eventType == 'add'):
# Do something on add
print('Add event: ', event)
elif (eventType == 'update'):
# Do something on update
print('Update event: ', event)
elif (eventType == 'remove'):
# Do something on remove
print('Remove event: ', event)
```
The event feed iterator will stop once there are no more events to poll.
Each page includes a top-level `cursor`. You can include the cursor in a [`FeedOptions` object](#event-feed-opts) passed to `feed()` to poll for events after the cursor:
```python
import time
from datetime import datetime, timedelta
from fauna import fql
from fauna.client import Client, FeedOptions
def process_feed(client, query, start_ts=None, sleep_time=300):
cursor = None
while True:
options = FeedOptions(
start_ts=start_ts if cursor is None else None,
cursor=cursor,
)
feed = client.feed(query, options)
for page in feed:
for event in page:
event_type = event['type']
if event_type == 'add':
# Do something on add
print('Add event: ', event)
elif event_type == 'update':
# Do something on update
print('Update event: ', event)
elif event_type == 'remove':
# Do something on remove
print('Remove event: ', event)
# Store the cursor of the last page
cursor = page.cursor
# Clear the start timestamp after the first request
start_ts = None
print(f"Sleeping for {sleep_time} seconds...")
time.sleep(sleep_time)
client = Client()
query = fql('Product.all().eventsOn(.price, .stock)')
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
process_feed(client, query, start_ts=start_ts)
```
Alternatively, you can get events as a single, flat array:
```python
import time
from datetime import datetime, timedelta
from fauna import fql
from fauna.client import Client, FeedOptions
def process_feed(client, query, start_ts=None, sleep_time=300):
cursor = None
while True:
options = FeedOptions(
start_ts=start_ts if cursor is None else None,
cursor=cursor,
)
feed = client.feed(query, options)
for event in feed.flatten():
event_type = event['type']
if event_type == 'add':
# Do something on add
print('Add event: ', event)
elif event_type == 'update':
# Do something on update
print('Update event: ', event)
elif event_type == 'remove':
# Do something on remove
print('Remove event: ', event)
# Store the cursor of the last page
cursor = event['cursor']
# Clear the start timestamp after the first request
start_ts = None
print(f"Sleeping for {sleep_time} seconds...")
time.sleep(sleep_time)
client = Client()
query = fql('Product.all().eventsOn(.price, .stock)')
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
process_feed(client, query, start_ts=start_ts)
```
If needed, you can store the cursor as a collection document. For an example, see the [event feeds app](../../sample-apps/event-feeds/).
### [](#error-handling)Error handling
If a non-retryable error occurs when opening or processing an event feed, Fauna raises a `FaunaException`:
```python
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException
client = Client()
# Calculate timestamp for 10 minutes ago
ten_minutes_ago = datetime.now() - timedelta(minutes=10)
start_ts = int(ten_minutes_ago.timestamp() * 1_000_000)
options = FeedOptions(
start_ts=start_ts
)
feed = client.feed(fql(
'Product.all().eventsOn(.price, .stock)'
), options)
for page in feed:
try:
for event in page:
print(event)
# ...
except FaunaException as e:
print('error ocurred with event processing: ', e)
# The current event will be skipped
```
Each page’s `cursor` contains the cursor for the page’s last successfully processed event. If you’re using a [loop to poll for changes](#loop), using the cursor will result in skipping any events that caused errors.
### [](#event-feed-opts)Event feed options
The client configuration sets default options for the `feed()` method.
You can pass a `FeedOptions` object to override these defaults:
```python
from fauna import fql
from fauna.client import Client, FeedOptions
from datetime import timedelta
client = Client()
options = FeedOptions(
max_attempts=3,
max_backoff=20,
query_timeout=timedelta(seconds=5),
page_size=None,
cursor=None,
start_ts=None,
)
client.feed(fql('Product.all().eventSource()'), options)
```
For supported properties, see [FeedOptions](https://fauna.github.io/fauna-python/latest/api/fauna/client/client.html#FeedOptions) in the API reference.
### [](#sample-app-2)Sample app
For a practical example that uses the Python driver with event feeds, check out the [event feeds sample app](../../sample-apps/event-feeds/).
## [](#event-streaming)Event streams
The driver supports [event streams](../../../learn/cdc/).
### [](#start-a-stream)Start a stream
To get an event source, append [`set.eventSource()`](../../../reference/fql-api/set/eventsource/) or [`set.eventsOn()`](../../../reference/fql-api/set/eventson/) to a [supported Set](../../../learn/cdc/#sets).
To stream the source’s events, pass the event source to `stream()`:
```python
import fauna
from fauna import fql
from fauna.client import Client, StreamOptions
client = Client()
response = client.query(fql('''
let set = Product.all()
{
initialPage: set.pageSize(10),
eventSource: set.eventSource()
}
'''))
initial_page = response.data['initialPage']
event_source = response.data['eventSource']
client.stream(event_source)
```
You can also pass a query that produces an event source directly to `stream()`:
```python
query = fql('Product.all().eventsOn(.price, .stock)')
client.stream(query)
```
### [](#iterate-on-a-stream)Iterate on a stream
`stream()` returns an iterator that emits events as they occur. You can use a generator expression to iterate through the events:
```python
query = fql('Product.all().eventsOn(.price, .stock)')
with client.stream(query) as stream:
for event in stream:
eventType = event['type']
if (eventType == 'add'):
print('Add event: ', event)
## ...
elif (eventType == 'update'):
print('Update event: ', event)
## ...
elif (eventType == 'remove'):
print('Remove event: ', event)
## ...
```
### [](#close-a-stream)Close a stream
Use `close()` to close a stream:
```python
query = fql('Product.all().eventsOn(.price, .stock)')
count = 0
with client.stream(query) as stream:
for event in stream:
print('Stream event', event)
# ...
count+=1
if (count == 2):
stream.close()
```
### [](#error-handling-2)Error handling
If a non-retryable error occurs when opening or processing a stream, Fauna raises a `FaunaException`:
```python
import fauna
from fauna import fql
from fauna.client import Client
from fauna.errors import FaunaException
client = Client(secret='FAUNA_SECRET')
try:
with client.stream(fql(
'Product.all().eventsOn(.price, .stock)'
)) as stream:
for event in stream:
print(event)
# ...
except FaunaException as e:
print('error ocurred with stream: ', e)
```
### [](#stream-options)Stream options
The [Client configuration](#config) sets default options for the `stream()` method.
You can pass a `StreamOptions` object to override these defaults:
```python
options = StreamOptions(
max_attempts=5,
max_backoff=1,
start_ts=1710968002310000,
status_events=True
)
client.stream(fql('Product.all().eventSource()'), options)
```
For supported properties, see [StreamOptions](https://fauna.github.io/fauna-python/latest/api/fauna/client/client.html#StreamOptions) in the API reference.
## [](#debug-logging)Debug logging
Logging is handled using Python’s standard `logging` package under the `fauna` namespace. Logs include the HTTP request with body (excluding the `Authorization` header) and the full HTTP response.
To enable logging:
```python
import logging
from fauna.client import Client
from fauna import fql
logging.basicConfig(
level=logging.DEBUG
)
client = Client()
client.query(fql('42'))
```
For configuration options or to set specific log levels, see Python’s [Logging HOWTO](https://docs.python.org/3/howto/logging.html).
# Python driver source code
# Files
## File: fauna/client/__init__.py
````python
from .client import Client, QueryOptions, StreamOptions, FeedOptions, FeedPage, FeedIterator
from .endpoints import Endpoints
from .headers import Header
````
## File: fauna/client/client.py
````python
import logging
from dataclasses import dataclass
from datetime import timedelta
from typing import Any, Dict, Iterator, Mapping, Optional, Union, List
import fauna
from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header
from fauna.client.retryable import Retryable
from fauna.client.utils import _Environment, LastTxnTs
from fauna.encoding import FaunaEncoder, FaunaDecoder
from fauna.encoding import QuerySuccess, QueryTags, QueryStats
from fauna.errors import FaunaError, ClientError, ProtocolError, \
RetryableFaunaException, NetworkError
from fauna.http.http_client import HTTPClient
from fauna.query import EventSource, Query, Page, fql
logger = logging.getLogger("fauna")
DefaultHttpConnectTimeout = timedelta(seconds=5)
DefaultHttpReadTimeout: Optional[timedelta] = None
DefaultHttpWriteTimeout = timedelta(seconds=5)
DefaultHttpPoolTimeout = timedelta(seconds=5)
DefaultIdleConnectionTimeout = timedelta(seconds=5)
DefaultQueryTimeout = timedelta(seconds=5)
DefaultClientBufferTimeout = timedelta(seconds=5)
DefaultMaxConnections = 20
DefaultMaxIdleConnections = 20
@dataclass
class QueryOptions:
"""
A dataclass representing options available for a query.
* linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
* max_contention_retries - The max number of times to retry the query if contention is encountered.
* query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed.
* query_tags - Tags to associate with the query. See `logging `_
* traceparent - A traceparent to associate with the query. See `logging `_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header
* typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration.
* additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary.
"""
linearized: Optional[bool] = None
max_contention_retries: Optional[int] = None
query_timeout: Optional[timedelta] = DefaultQueryTimeout
query_tags: Optional[Mapping[str, str]] = None
traceparent: Optional[str] = None
typecheck: Optional[bool] = None
additional_headers: Optional[Dict[str, str]] = None
@dataclass
class StreamOptions:
"""
A dataclass representing options available for a stream.
* max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown.
* max_backoff - The maximum backoff in seconds for an individual retry.
* start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after
the timestamp.
* cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
* status_events - Indicates if stream should include status events. Status events are periodic events that
update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics
about the cost of maintaining the stream other than the cost of the received events.
"""
max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
start_ts: Optional[int] = None
cursor: Optional[str] = None
status_events: bool = False
@dataclass
class FeedOptions:
"""
A dataclass representing options available for an event feed.
* max_attempts - The maximum number of times to attempt an event feed query when a retryable exception is thrown.
* max_backoff - The maximum backoff in seconds for an individual retry.
* query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events.
* start_ts - The starting timestamp of the event feed, exclusive. If set, Fauna will return events starting after
the timestamp.
* cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor.
* page_size - Maximum number of events returned per page. Must be in the
range 1 to 16000 (inclusive). Defaults to 16.
"""
max_attempts: Optional[int] = None
max_backoff: Optional[int] = None
query_timeout: Optional[timedelta] = None
page_size: Optional[int] = None
start_ts: Optional[int] = None
cursor: Optional[str] = None
class Client:
def __init__(
self,
endpoint: Optional[str] = None,
secret: Optional[str] = None,
http_client: Optional[HTTPClient] = None,
query_tags: Optional[Mapping[str, str]] = None,
linearized: Optional[bool] = None,
max_contention_retries: Optional[int] = None,
typecheck: Optional[bool] = None,
additional_headers: Optional[Dict[str, str]] = None,
query_timeout: Optional[timedelta] = DefaultQueryTimeout,
client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout,
http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout,
http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout,
http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout,
http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout,
http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout,
max_attempts: int = 3,
max_backoff: int = 20,
):
"""Initializes a Client.
:param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable.
:param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable.
:param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`.
:param query_tags: Tags to associate with the query. See `logging `_
:param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized.
:param max_contention_retries: The max number of times to retry the query if contention is encountered.
:param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration.
:param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary.
:param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`.
:param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error.
:param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`.
:param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`.
:param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`.
:param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`.
:param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`.
:param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3.
:param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20.
"""
self._set_endpoint(endpoint)
self._max_attempts = max_attempts
self._max_backoff = max_backoff
if secret is None:
self._auth = _Auth(_Environment.EnvFaunaSecret())
else:
self._auth = _Auth(secret)
self._last_txn_ts = LastTxnTs()
self._query_tags = {}
if query_tags is not None:
self._query_tags.update(query_tags)
if query_timeout is not None:
self._query_timeout_ms = int(query_timeout.total_seconds() * 1000)
else:
self._query_timeout_ms = None
self._headers: Dict[str, str] = {
_Header.AcceptEncoding: "gzip",
_Header.ContentType: "application/json;charset=utf-8",
_Header.Driver: "python",
_Header.DriverEnv: str(_DriverEnvironment()),
}
if typecheck is not None:
self._headers[Header.Typecheck] = str(typecheck).lower()
if linearized is not None:
self._headers[Header.Linearized] = str(linearized).lower()
if max_contention_retries is not None and max_contention_retries > 0:
self._headers[Header.MaxContentionRetries] = \
f"{max_contention_retries}"
if additional_headers is not None:
self._headers = {
**self._headers,
**additional_headers,
}
self._session: HTTPClient
if http_client is not None:
self._session = http_client
else:
if fauna.global_http_client is None:
timeout_s: Optional[float] = None
if query_timeout is not None and client_buffer_timeout is not None:
timeout_s = (query_timeout + client_buffer_timeout).total_seconds()
read_timeout_s: Optional[float] = None
if http_read_timeout is not None:
read_timeout_s = http_read_timeout.total_seconds()
write_timeout_s: Optional[float] = http_write_timeout.total_seconds(
) if http_write_timeout is not None else None
connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds(
) if http_connect_timeout is not None else None
pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds(
) if http_pool_timeout is not None else None
idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds(
) if http_idle_timeout is not None else None
import httpx
from fauna.http.httpx_client import HTTPXClient
c = HTTPXClient(
httpx.Client(
http1=True,
http2=False,
timeout=httpx.Timeout(
timeout=timeout_s,
connect=connect_timeout_s,
read=read_timeout_s,
write=write_timeout_s,
pool=pool_timeout_s,
),
limits=httpx.Limits(
max_connections=DefaultMaxConnections,
max_keepalive_connections=DefaultMaxIdleConnections,
keepalive_expiry=idle_timeout_s,
),
), logger)
fauna.global_http_client = c
self._session = fauna.global_http_client
def close(self):
self._session.close()
if self._session == fauna.global_http_client:
fauna.global_http_client = None
def set_last_txn_ts(self, txn_ts: int):
"""
Set the last timestamp seen by this client.
This has no effect if earlier than stored timestamp.
.. WARNING:: This should be used only when coordinating timestamps across
multiple clients. Moving the timestamp arbitrarily forward into
the future will cause transactions to stall.
:param txn_ts: the new transaction time.
"""
self._last_txn_ts.update_txn_time(txn_ts)
def get_last_txn_ts(self) -> Optional[int]:
"""
Get the last timestamp seen by this client.
:return:
"""
return self._last_txn_ts.time
def get_query_timeout(self) -> Optional[timedelta]:
"""
Get the query timeout for all queries.
"""
if self._query_timeout_ms is not None:
return timedelta(milliseconds=self._query_timeout_ms)
else:
return None
def paginate(
self,
fql: Query,
opts: Optional[QueryOptions] = None,
) -> "QueryIterator":
"""
Run a query on Fauna and returning an iterator of results. If the query
returns a Page, the iterator will fetch additional Pages until the
after token is null. Each call for a page will be retried with exponential
backoff up to the max_attempts set in the client's retry policy in the
event of a 429 or 502.
:param fql: A Query
:param opts: (Optional) Query Options
:return: a :class:`QueryResponse`
:raises NetworkError: HTTP Request failed in transit
:raises ProtocolError: HTTP error not from Fauna
:raises ServiceError: Fauna returned an error
:raises ValueError: Encoding and decoding errors
:raises TypeError: Invalid param types
"""
if not isinstance(fql, Query):
err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
f"Query by calling fauna.fql()"
raise TypeError(err_msg)
return QueryIterator(self, fql, opts)
def query(
self,
fql: Query,
opts: Optional[QueryOptions] = None,
) -> QuerySuccess:
"""
Run a query on Fauna. A query will be retried max_attempt times with exponential backoff
up to the max_backoff in the event of a 429.
:param fql: A Query
:param opts: (Optional) Query Options
:return: a :class:`QueryResponse`
:raises NetworkError: HTTP Request failed in transit
:raises ProtocolError: HTTP error not from Fauna
:raises ServiceError: Fauna returned an error
:raises ValueError: Encoding and decoding errors
:raises TypeError: Invalid param types
"""
if not isinstance(fql, Query):
err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
f"Query by calling fauna.fql()"
raise TypeError(err_msg)
try:
encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql)
except Exception as e:
raise ClientError("Failed to encode Query") from e
retryable = Retryable[QuerySuccess](
self._max_attempts,
self._max_backoff,
self._query,
"/query/1",
fql=encoded_query,
opts=opts,
)
r = retryable.run()
r.response.stats.attempts = r.attempts
return r.response
def _query(
self,
path: str,
fql: Mapping[str, Any],
arguments: Optional[Mapping[str, Any]] = None,
opts: Optional[QueryOptions] = None,
) -> QuerySuccess:
headers = self._headers.copy()
headers[_Header.Format] = "tagged"
headers[_Header.Authorization] = self._auth.bearer()
if self._query_timeout_ms is not None:
headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
headers.update(self._last_txn_ts.request_header)
query_tags = {}
if self._query_tags is not None:
query_tags.update(self._query_tags)
if opts is not None:
if opts.linearized is not None:
headers[Header.Linearized] = str(opts.linearized).lower()
if opts.max_contention_retries is not None:
headers[Header.MaxContentionRetries] = \
f"{opts.max_contention_retries}"
if opts.traceparent is not None:
headers[Header.Traceparent] = opts.traceparent
if opts.query_timeout is not None:
timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}"
headers[Header.QueryTimeoutMs] = timeout_ms
if opts.query_tags is not None:
query_tags.update(opts.query_tags)
if opts.typecheck is not None:
headers[Header.Typecheck] = str(opts.typecheck).lower()
if opts.additional_headers is not None:
headers.update(opts.additional_headers)
if len(query_tags) > 0:
headers[Header.Tags] = QueryTags.encode(query_tags)
data: dict[str, Any] = {
"query": fql,
"arguments": arguments or {},
}
with self._session.request(
method="POST",
url=self._endpoint + path,
headers=headers,
data=data,
) as response:
status_code = response.status_code()
response_json = response.json()
headers = response.headers()
self._check_protocol(response_json, status_code)
dec: Any = FaunaDecoder.decode(response_json)
if status_code > 399:
FaunaError.parse_error_and_throw(dec, status_code)
if "txn_ts" in dec:
self.set_last_txn_ts(int(response_json["txn_ts"]))
stats = QueryStats(dec["stats"]) if "stats" in dec else None
summary = dec["summary"] if "summary" in dec else None
query_tags = QueryTags.decode(
dec["query_tags"]) if "query_tags" in dec else None
txn_ts = dec["txn_ts"] if "txn_ts" in dec else None
schema_version = dec["schema_version"] if "schema_version" in dec else None
traceparent = headers.get("traceparent", None)
static_type = dec["static_type"] if "static_type" in dec else None
return QuerySuccess(
data=dec["data"],
query_tags=query_tags,
static_type=static_type,
stats=stats,
summary=summary,
traceparent=traceparent,
txn_ts=txn_ts,
schema_version=schema_version,
)
def stream(
self,
fql: Union[EventSource, Query],
opts: StreamOptions = StreamOptions()
) -> "StreamIterator":
"""
Opens a Stream in Fauna and returns an iterator that consume Fauna events.
:param fql: An EventSource or a Query that returns an EventSource.
:param opts: (Optional) Stream Options.
:return: a :class:`StreamIterator`
:raises ClientError: Invalid options provided
:raises NetworkError: HTTP Request failed in transit
:raises ProtocolError: HTTP error not from Fauna
:raises ServiceError: Fauna returned an error
:raises ValueError: Encoding and decoding errors
:raises TypeError: Invalid param types
"""
if isinstance(fql, Query):
if opts.cursor is not None:
raise ClientError(
"The 'cursor' configuration can only be used with an event source.")
source = self.query(fql).data
else:
source = fql
if not isinstance(source, EventSource):
err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
raise TypeError(err_msg)
headers = self._headers.copy()
headers[_Header.Format] = "tagged"
headers[_Header.Authorization] = self._auth.bearer()
return StreamIterator(self._session, headers, self._endpoint + "/stream/1",
self._max_attempts, self._max_backoff, opts, source)
def feed(
self,
source: Union[EventSource, Query],
opts: FeedOptions = FeedOptions(),
) -> "FeedIterator":
"""
Opens an event feed in Fauna and returns an iterator that consume Fauna events.
:param source: An EventSource or a Query that returns an EventSource.
:param opts: (Optional) Event feed options.
:return: a :class:`FeedIterator`
:raises ClientError: Invalid options provided
:raises NetworkError: HTTP Request failed in transit
:raises ProtocolError: HTTP error not from Fauna
:raises ServiceError: Fauna returned an error
:raises ValueError: Encoding and decoding errors
:raises TypeError: Invalid param types
"""
if isinstance(source, Query):
source = self.query(source).data
if not isinstance(source, EventSource):
err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}."
raise TypeError(err_msg)
headers = self._headers.copy()
headers[_Header.Format] = "tagged"
headers[_Header.Authorization] = self._auth.bearer()
if opts.query_timeout is not None:
query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000)
headers[Header.QueryTimeoutMs] = str(query_timeout_ms)
elif self._query_timeout_ms is not None:
headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms)
return FeedIterator(self._session, headers, self._endpoint + "/feed/1",
self._max_attempts, self._max_backoff, opts, source)
def _check_protocol(self, response_json: Any, status_code):
# TODO: Logic to validate wire protocol belongs elsewhere.
should_raise = False
# check for QuerySuccess
if status_code <= 399 and "data" not in response_json:
should_raise = True
# check for QueryFailure
if status_code > 399:
if "error" not in response_json:
should_raise = True
else:
e = response_json["error"]
if "code" not in e or "message" not in e:
should_raise = True
if should_raise:
raise ProtocolError(
status_code,
f"Response is in an unknown format: \n{response_json}",
)
def _set_endpoint(self, endpoint):
if endpoint is None:
endpoint = _Environment.EnvFaunaEndpoint()
if endpoint[-1:] == "/":
endpoint = endpoint[:-1]
self._endpoint = endpoint
class StreamIterator:
"""A class that mixes a ContextManager and an Iterator so we can detected retryable errors."""
def __init__(self, http_client: HTTPClient, headers: Dict[str, str],
endpoint: str, max_attempts: int, max_backoff: int,
opts: StreamOptions, source: EventSource):
self._http_client = http_client
self._headers = headers
self._endpoint = endpoint
self._max_attempts = max_attempts
self._max_backoff = max_backoff
self._opts = opts
self._source = source
self._stream = None
self.last_ts = None
self.last_cursor = None
self._ctx = self._create_stream()
if opts.start_ts is not None and opts.cursor is not None:
err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions."
raise TypeError(err_msg)
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, exc_traceback):
if self._stream is not None:
self._stream.close()
self._ctx.__exit__(exc_type, exc_value, exc_traceback)
return False
def __iter__(self):
return self
def __next__(self):
if self._opts.max_attempts is not None:
max_attempts = self._opts.max_attempts
else:
max_attempts = self._max_attempts
if self._opts.max_backoff is not None:
max_backoff = self._opts.max_backoff
else:
max_backoff = self._max_backoff
retryable = Retryable[Any](max_attempts, max_backoff, self._next_element)
return retryable.run().response
def _next_element(self):
try:
if self._stream is None:
try:
self._stream = self._ctx.__enter__()
except Exception:
self._retry_stream()
if self._stream is not None:
event: Any = FaunaDecoder.decode(next(self._stream))
if event["type"] == "error":
FaunaError.parse_error_and_throw(event, 400)
self.last_ts = event["txn_ts"]
self.last_cursor = event.get('cursor')
if event["type"] == "start":
return self._next_element()
if not self._opts.status_events and event["type"] == "status":
return self._next_element()
return event
raise StopIteration
except NetworkError:
self._retry_stream()
def _retry_stream(self):
if self._stream is not None:
self._stream.close()
self._stream = None
try:
self._ctx = self._create_stream()
except Exception:
pass
raise RetryableFaunaException
def _create_stream(self):
data: Dict[str, Any] = {"token": self._source.token}
if self.last_cursor is not None:
data["cursor"] = self.last_cursor
elif self._opts.cursor is not None:
data["cursor"] = self._opts.cursor
elif self._opts.start_ts is not None:
data["start_ts"] = self._opts.start_ts
return self._http_client.stream(
url=self._endpoint, headers=self._headers, data=data)
def close(self):
if self._stream is not None:
self._stream.close()
class FeedPage:
def __init__(self, events: List[Any], cursor: str, stats: QueryStats):
self._events = events
self.cursor = cursor
self.stats = stats
def __len__(self):
return len(self._events)
def __iter__(self) -> Iterator[Any]:
for event in self._events:
if event["type"] == "error":
FaunaError.parse_error_and_throw(event, 400)
yield event
class FeedIterator:
"""A class to provide an iterator on top of event feed pages."""
def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str,
max_attempts: int, max_backoff: int, opts: FeedOptions,
source: EventSource):
self._http = http
self._headers = headers
self._endpoint = endpoint
self._max_attempts = opts.max_attempts or max_attempts
self._max_backoff = opts.max_backoff or max_backoff
self._request: Dict[str, Any] = {"token": source.token}
self._is_done = False
if opts.start_ts is not None and opts.cursor is not None:
err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions."
raise TypeError(err_msg)
if opts.page_size is not None:
self._request["page_size"] = opts.page_size
if opts.cursor is not None:
self._request["cursor"] = opts.cursor
elif opts.start_ts is not None:
self._request["start_ts"] = opts.start_ts
def __iter__(self) -> Iterator[FeedPage]:
self._is_done = False
return self
def __next__(self) -> FeedPage:
if self._is_done:
raise StopIteration
retryable = Retryable[Any](self._max_attempts, self._max_backoff,
self._next_page)
return retryable.run().response
def _next_page(self) -> FeedPage:
with self._http.request(
method="POST",
url=self._endpoint,
headers=self._headers,
data=self._request,
) as response:
status_code = response.status_code()
decoded: Any = FaunaDecoder.decode(response.json())
if status_code > 399:
FaunaError.parse_error_and_throw(decoded, status_code)
self._is_done = not decoded["has_next"]
self._request["cursor"] = decoded["cursor"]
if "start_ts" in self._request:
del self._request["start_ts"]
return FeedPage(decoded["events"], decoded["cursor"],
QueryStats(decoded["stats"]))
def flatten(self) -> Iterator:
"""A generator that yields events instead of pages of events."""
for page in self:
for event in page:
yield event
class QueryIterator:
"""A class to provider an iterator on top of Fauna queries."""
def __init__(self,
client: Client,
fql: Query,
opts: Optional[QueryOptions] = None):
"""Initializes the QueryIterator
:param fql: A Query
:param opts: (Optional) Query Options
:raises TypeError: Invalid param types
"""
if not isinstance(client, Client):
err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \
f"Client by calling fauna.client.Client()"
raise TypeError(err_msg)
if not isinstance(fql, Query):
err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \
f"Query by calling fauna.fql()"
raise TypeError(err_msg)
self.client = client
self.fql = fql
self.opts = opts
def __iter__(self) -> Iterator:
return self.iter()
def iter(self) -> Iterator:
"""
A generator function that immediately fetches and yields the results of
the stored query. Yields additional pages on subsequent iterations if
they exist
"""
cursor = None
initial_response = self.client.query(self.fql, self.opts)
if isinstance(initial_response.data, Page):
cursor = initial_response.data.after
yield initial_response.data.data
while cursor is not None:
next_response = self.client.query(
fql("Set.paginate(${after})", after=cursor), self.opts)
# TODO: `Set.paginate` does not yet return a `@set` tagged value
# so we will get back a plain object that might not have
# an after property.
cursor = next_response.data.get("after")
yield next_response.data.get("data")
else:
yield [initial_response.data]
def flatten(self) -> Iterator:
"""
A generator function that immediately fetches and yields the results of
the stored query. Yields each item individually, rather than a whole
Page at a time. Fetches additional pages as required if they exist.
"""
for page in self.iter():
for item in page:
yield item
````
## File: fauna/client/endpoints.py
````python
class Endpoints:
Default = "https://db.fauna.com"
Local = "http://localhost:8443"
````
## File: fauna/client/headers.py
````python
import os
import platform
import sys
from dataclasses import dataclass
from typing import Callable
from fauna import __version__
class Header:
LastTxnTs = "X-Last-Txn-Ts"
Linearized = "X-Linearized"
MaxContentionRetries = "X-Max-Contention-Retries"
QueryTimeoutMs = "X-Query-Timeout-Ms"
Typecheck = "X-Typecheck"
Tags = "X-Query-Tags"
Traceparent = "Traceparent"
class _Header:
AcceptEncoding = "Accept-Encoding"
Authorization = "Authorization"
ContentType = "Content-Type"
Driver = "X-Driver"
DriverEnv = "X-Driver-Env"
Format = "X-Format"
class _Auth:
"""Creates an auth helper object"""
def bearer(self):
return "Bearer {}".format(self.secret)
def __init__(self, secret):
self.secret = secret
def __eq__(self, other):
return self.secret == getattr(other, 'secret', None)
def __ne__(self, other):
return not self == other
class _DriverEnvironment:
def __init__(self):
self.pythonVersion = "{0}.{1}.{2}-{3}".format(*sys.version_info)
self.driverVersion = __version__
self.env = self._get_runtime_env()
self.os = "{0}-{1}".format(platform.system(), platform.release())
@staticmethod
def _get_runtime_env():
@dataclass
class EnvChecker:
name: str
check: Callable[[], bool]
env: list[EnvChecker] = [
EnvChecker(
name="Netlify",
check=lambda: "NETLIFY_IMAGES_CDN_DOMAIN" in os.environ,
),
EnvChecker(
name="Vercel",
check=lambda: "VERCEL" in os.environ,
),
EnvChecker(
name="Heroku",
check=lambda: "PATH" in \
os.environ and ".heroku" in os.environ["PATH"],
),
EnvChecker(
name="AWS Lambda",
check=lambda: "AWS_LAMBDA_FUNCTION_VERSION" in os.environ,
),
EnvChecker(
name="GCP Cloud Functions",
check=lambda: "_" in \
os.environ and "google" in os.environ["_"],
),
EnvChecker(
name="GCP Compute Instances",
check=lambda: "GOOGLE_CLOUD_PROJECT" in os.environ,
),
EnvChecker(
name="Azure Cloud Functions",
check=lambda: "WEBSITE_FUNCTIONS_AZUREMONITOR_CATEGORIES" in \
os.environ,
),
EnvChecker(
name="Azure Compute",
check=lambda: "ORYX_ENV_TYPE" in os.environ and \
"WEBSITE_INSTANCE_ID" in os.environ and \
os.environ["ORYX_ENV_TYPE"] == "AppService",
),
]
try:
recognized = next(e for e in env if e.check())
if recognized is not None:
return recognized.name
except:
return "Unknown"
def __str__(self):
return "driver=python-{0}; runtime=python-{1} env={2}; os={3}".format(
self.driverVersion, self.pythonVersion, self.env, self.os).lower()
````
## File: fauna/client/retryable.py
````python
import abc
from dataclasses import dataclass
from random import random
from time import sleep
from typing import Callable, Optional, TypeVar, Generic
from fauna.errors import RetryableFaunaException
class RetryStrategy:
@abc.abstractmethod
def wait(self) -> float:
pass
class ExponentialBackoffStrategy(RetryStrategy):
def __init__(self, max_backoff: int):
self._max_backoff = float(max_backoff)
self._i = 0.0
def wait(self) -> float:
"""Returns the number of seconds to wait for the next call."""
backoff = random() * (2.0**self._i)
self._i += 1.0
return min(backoff, self._max_backoff)
T = TypeVar('T')
@dataclass
class RetryableResponse(Generic[T]):
attempts: int
response: T
class Retryable(Generic[T]):
"""
Retryable is a wrapper class that acts on a Callable that returns a T type.
"""
_strategy: RetryStrategy
_error: Optional[Exception]
def __init__(
self,
max_attempts: int,
max_backoff: int,
func: Callable[..., T],
*args,
**kwargs,
):
self._max_attempts = max_attempts
self._strategy = ExponentialBackoffStrategy(max_backoff)
self._func = func
self._args = args
self._kwargs = kwargs
self._error = None
def run(self) -> RetryableResponse[T]:
"""Runs the wrapped function. Retries up to max_attempts if the function throws a RetryableFaunaException. It propagates
the thrown exception if max_attempts is reached or if a non-retryable is thrown.
Returns the number of attempts and the response
"""
attempt = 0
while True:
sleep_time = 0.0 if attempt == 0 else self._strategy.wait()
sleep(sleep_time)
try:
attempt += 1
qs = self._func(*self._args, **self._kwargs)
return RetryableResponse[T](attempt, qs)
except RetryableFaunaException as e:
if attempt >= self._max_attempts:
raise e
````
## File: fauna/client/utils.py
````python
import os
import threading
from typing import Generic, Callable, TypeVar, Optional
from fauna.client.endpoints import Endpoints
from fauna.client.headers import Header
def _fancy_bool_from_str(val: str) -> bool:
return val.lower() in ["1", "true", "yes", "y"]
class LastTxnTs(object):
"""Wraps tracking the last transaction time supplied from the database."""
def __init__(
self,
time: Optional[int] = None,
):
self._lock: threading.Lock = threading.Lock()
self._time: Optional[int] = time
@property
def time(self):
"""Produces the last transaction time, or, None if not yet updated."""
with self._lock:
return self._time
@property
def request_header(self):
"""Produces a dictionary with a non-zero `X-Last-Seen-Txn` header; or,
if one has not yet been set, the empty header dictionary."""
t = self._time
if t is None:
return {}
return {Header.LastTxnTs: str(t)}
def update_txn_time(self, new_txn_time: int):
"""Updates the internal transaction time.
In order to maintain a monotonically-increasing value, `newTxnTime`
is discarded if it is behind the current timestamp."""
with self._lock:
self._time = max(self._time or 0, new_txn_time)
T = TypeVar('T')
class _SettingFromEnviron(Generic[T]):
def __init__(
self,
var_name: str,
default_value: str,
adapt_from_str: Callable[[str], T],
):
self.__var_name = var_name
self.__default_value = default_value
self.__adapt_from_str = adapt_from_str
def __call__(self) -> T:
return self.__adapt_from_str(
os.environ.get(
self.__var_name,
default=self.__default_value,
))
class _Environment:
EnvFaunaEndpoint = _SettingFromEnviron(
"FAUNA_ENDPOINT",
Endpoints.Default,
str,
)
"""environment variable for Fauna Client HTTP endpoint"""
EnvFaunaSecret = _SettingFromEnviron(
"FAUNA_SECRET",
"",
str,
)
"""environment variable for Fauna Client authentication"""
````
## File: fauna/encoding/__init__.py
````python
from .decoder import FaunaDecoder
from .encoder import FaunaEncoder
from .wire_protocol import ConstraintFailure, QueryTags, QueryInfo, QueryStats, QuerySuccess
````
## File: fauna/encoding/decoder.py
````python
import base64
from typing import Any, List, Union
from iso8601 import parse_date
from fauna.query.models import Module, DocumentReference, Document, NamedDocument, NamedDocumentReference, Page, \
NullDocument, EventSource
class FaunaDecoder:
"""Supports the following types:
+--------------------+---------------+
| Python | Fauna |
+====================+===============+
| dict | object |
+--------------------+---------------+
| list, tuple | array |
+--------------------+---------------+
| str | string |
+--------------------+---------------+
| int | @int |
+--------------------+---------------+
| int | @long |
+--------------------+---------------+
| float | @double |
+--------------------+---------------+
| datetime.datetime | @time |
+--------------------+---------------+
| datetime.date | @date |
+--------------------+---------------+
| True | true |
+--------------------+---------------+
| False | false |
+--------------------+---------------+
| None | null |
+--------------------+---------------+
| bytearray | @bytes |
+--------------------+---------------+
| *DocumentReference | @ref |
+--------------------+---------------+
| *Document | @doc |
+--------------------+---------------+
| Module | @mod |
+--------------------+---------------+
| Page | @set |
+--------------------+---------------+
| EventSource | @stream |
+--------------------+---------------+
"""
@staticmethod
def decode(obj: Any):
"""Decodes supported objects from the tagged typed into untagged.
Examples:
- { "@int": "100" } decodes to 100 of type int
- { "@double": "100" } decodes to 100.0 of type float
- { "@long": "100" } decodes to 100 of type int
- { "@time": "..." } decodes to a datetime
- { "@date": "..." } decodes to a date
- { "@doc": ... } decodes to a Document or NamedDocument
- { "@ref": ... } decodes to a DocumentReference or NamedDocumentReference
- { "@mod": ... } decodes to a Module
- { "@set": ... } decodes to a Page
- { "@stream": ... } decodes to an EventSource
- { "@bytes": ... } decodes to a bytearray
:param obj: the object to decode
"""
return FaunaDecoder._decode(obj)
@staticmethod
def _decode(o: Any, escaped: bool = False):
if isinstance(o, (str, bool, int, float)):
return o
elif isinstance(o, list):
return FaunaDecoder._decode_list(o)
elif isinstance(o, dict):
return FaunaDecoder._decode_dict(o, escaped)
@staticmethod
def _decode_list(lst: List):
return [FaunaDecoder._decode(i) for i in lst]
@staticmethod
def _decode_dict(dct: dict, escaped: bool):
keys = dct.keys()
# If escaped, everything is user-specified
if escaped:
return {k: FaunaDecoder._decode(v) for k, v in dct.items()}
if len(keys) == 1:
if "@int" in keys:
return int(dct["@int"])
if "@long" in keys:
return int(dct["@long"])
if "@double" in dct:
return float(dct["@double"])
if "@object" in dct:
return FaunaDecoder._decode(dct["@object"], True)
if "@mod" in dct:
return Module(dct["@mod"])
if "@time" in dct:
return parse_date(dct["@time"])
if "@date" in dct:
return parse_date(dct["@date"]).date()
if "@bytes" in dct:
bts = base64.b64decode(dct["@bytes"])
return bytearray(bts)
if "@doc" in dct:
value = dct["@doc"]
if isinstance(value, str):
# Not distinguishing between DocumentReference and NamedDocumentReference because this shouldn't
# be an issue much longer
return DocumentReference.from_string(value)
contents = FaunaDecoder._decode(value)
if "id" in contents and "coll" in contents and "ts" in contents:
doc_id = contents.pop("id")
doc_coll = contents.pop("coll")
doc_ts = contents.pop("ts")
return Document(
id=doc_id,
coll=doc_coll,
ts=doc_ts,
data=contents,
)
elif "name" in contents and "coll" in contents and "ts" in contents:
doc_name = contents.pop("name")
doc_coll = contents.pop("coll")
doc_ts = contents.pop("ts")
return NamedDocument(
name=doc_name,
coll=doc_coll,
ts=doc_ts,
data=contents,
)
else:
# Unsupported document reference. Return the unwrapped value to futureproof.
return contents
if "@ref" in dct:
value = dct["@ref"]
if "id" not in value and "name" not in value:
# Unsupported document reference. Return the unwrapped value to futureproof.
return value
col = FaunaDecoder._decode(value["coll"])
doc_ref: Union[DocumentReference, NamedDocumentReference]
if "id" in value:
doc_ref = DocumentReference(col, value["id"])
else:
doc_ref = NamedDocumentReference(col, value["name"])
if "exists" in value and not value["exists"]:
cause = value["cause"] if "cause" in value else None
return NullDocument(doc_ref, cause)
return doc_ref
if "@set" in dct:
value = dct["@set"]
if isinstance(value, str):
return Page(after=value)
after = value["after"] if "after" in value else None
data = FaunaDecoder._decode(value["data"]) if "data" in value else None
return Page(data=data, after=after)
if "@stream" in dct:
return EventSource(dct["@stream"])
return {k: FaunaDecoder._decode(v) for k, v in dct.items()}
````
## File: fauna/encoding/encoder.py
````python
import base64
from datetime import datetime, date
from typing import Any, Optional, List, Union
from fauna.query.models import DocumentReference, Module, Document, NamedDocument, NamedDocumentReference, NullDocument, \
EventSource
from fauna.query.query_builder import Query, Fragment, LiteralFragment, ValueFragment
_RESERVED_TAGS = [
"@date",
"@doc",
"@double",
"@int",
"@long",
"@mod",
"@object",
"@ref",
"@set",
"@time",
]
class FaunaEncoder:
"""Supports the following types:
+-------------------------------+---------------+
| Python | Fauna Tags |
+===============================+===============+
| dict | @object |
+-------------------------------+---------------+
| list, tuple | array |
+-------------------------------+---------------+
| str | string |
+-------------------------------+---------------+
| int 32-bit signed | @int |
+-------------------------------+---------------+
| int 64-bit signed | @long |
+-------------------------------+---------------+
| float | @double |
+-------------------------------+---------------+
| datetime.datetime | @time |
+-------------------------------+---------------+
| datetime.date | @date |
+-------------------------------+---------------+
| True | True |
+-------------------------------+---------------+
| False | False |
+-------------------------------+---------------+
| None | None |
+-------------------------------+---------------+
| bytes / bytearray | @bytes |
+-------------------------------+---------------+
| *Document | @ref |
+-------------------------------+---------------+
| *DocumentReference | @ref |
+-------------------------------+---------------+
| Module | @mod |
+-------------------------------+---------------+
| Query | fql |
+-------------------------------+---------------+
| ValueFragment | value |
+-------------------------------+---------------+
| TemplateFragment | string |
+-------------------------------+---------------+
| EventSource | string |
+-------------------------------+---------------+
"""
@staticmethod
def encode(obj: Any) -> Any:
"""Encodes supported objects into the tagged format.
Examples:
- Up to 32-bit ints encode to { "@int": "..." }
- Up to 64-bit ints encode to { "@long": "..." }
- Floats encode to { "@double": "..." }
- datetime encodes to { "@time": "..." }
- date encodes to { "@date": "..." }
- DocumentReference encodes to { "@doc": "..." }
- Module encodes to { "@mod": "..." }
- Query encodes to { "fql": [...] }
- ValueFragment encodes to { "value": }
- LiteralFragment encodes to a string
- EventSource encodes to a string
:raises ValueError: If value cannot be encoded, cannot be encoded safely, or there's a circular reference.
:param obj: the object to decode
"""
return FaunaEncoder._encode(obj)
@staticmethod
def from_int(obj: int):
if -2**31 <= obj <= 2**31 - 1:
return {"@int": repr(obj)}
elif -2**63 <= obj <= 2**63 - 1:
return {"@long": repr(obj)}
else:
raise ValueError("Precision loss when converting int to Fauna type")
@staticmethod
def from_bool(obj: bool):
return obj
@staticmethod
def from_float(obj: float):
return {"@double": repr(obj)}
@staticmethod
def from_str(obj: str):
return obj
@staticmethod
def from_datetime(obj: datetime):
if obj.utcoffset() is None:
raise ValueError("datetimes must be timezone-aware")
return {"@time": obj.isoformat(sep="T")}
@staticmethod
def from_date(obj: date):
return {"@date": obj.isoformat()}
@staticmethod
def from_bytes(obj: Union[bytearray, bytes]):
return {"@bytes": base64.b64encode(obj).decode('ascii')}
@staticmethod
def from_doc_ref(obj: DocumentReference):
return {"@ref": {"id": obj.id, "coll": FaunaEncoder.from_mod(obj.coll)}}
@staticmethod
def from_named_doc_ref(obj: NamedDocumentReference):
return {"@ref": {"name": obj.name, "coll": FaunaEncoder.from_mod(obj.coll)}}
@staticmethod
def from_mod(obj: Module):
return {"@mod": obj.name}
@staticmethod
def from_dict(obj: Any):
return {"@object": obj}
@staticmethod
def from_none():
return None
@staticmethod
def from_fragment(obj: Fragment):
if isinstance(obj, LiteralFragment):
return obj.get()
elif isinstance(obj, ValueFragment):
v = obj.get()
if isinstance(v, Query):
return FaunaEncoder.from_query_interpolation_builder(v)
else:
return {"value": FaunaEncoder.encode(v)}
else:
raise ValueError(f"Unknown fragment type: {type(obj)}")
@staticmethod
def from_query_interpolation_builder(obj: Query):
return {"fql": [FaunaEncoder.from_fragment(f) for f in obj.fragments]}
@staticmethod
def from_streamtoken(obj: EventSource):
return {"@stream": obj.token}
@staticmethod
def _encode(o: Any, _markers: Optional[List] = None):
if _markers is None:
_markers = []
if isinstance(o, str):
return FaunaEncoder.from_str(o)
elif o is None:
return FaunaEncoder.from_none()
elif o is True:
return FaunaEncoder.from_bool(o)
elif o is False:
return FaunaEncoder.from_bool(o)
elif isinstance(o, int):
return FaunaEncoder.from_int(o)
elif isinstance(o, float):
return FaunaEncoder.from_float(o)
elif isinstance(o, Module):
return FaunaEncoder.from_mod(o)
elif isinstance(o, DocumentReference):
return FaunaEncoder.from_doc_ref(o)
elif isinstance(o, NamedDocumentReference):
return FaunaEncoder.from_named_doc_ref(o)
elif isinstance(o, datetime):
return FaunaEncoder.from_datetime(o)
elif isinstance(o, date):
return FaunaEncoder.from_date(o)
elif isinstance(o, bytearray) or isinstance(o, bytes):
return FaunaEncoder.from_bytes(o)
elif isinstance(o, Document):
return FaunaEncoder.from_doc_ref(DocumentReference(o.coll, o.id))
elif isinstance(o, NamedDocument):
return FaunaEncoder.from_named_doc_ref(
NamedDocumentReference(o.coll, o.name))
elif isinstance(o, NullDocument):
return FaunaEncoder.encode(o.ref)
elif isinstance(o, (list, tuple)):
return FaunaEncoder._encode_list(o, _markers)
elif isinstance(o, dict):
return FaunaEncoder._encode_dict(o, _markers)
elif isinstance(o, Query):
return FaunaEncoder.from_query_interpolation_builder(o)
elif isinstance(o, EventSource):
return FaunaEncoder.from_streamtoken(o)
else:
raise ValueError(f"Object {o} of type {type(o)} cannot be encoded")
@staticmethod
def _encode_list(lst, markers):
_id = id(lst)
if _id in markers:
raise ValueError("Circular reference detected")
markers.append(id(lst))
res = [FaunaEncoder._encode(elem, markers) for elem in lst]
markers.pop()
return res
@staticmethod
def _encode_dict(dct, markers):
_id = id(dct)
if _id in markers:
raise ValueError("Circular reference detected")
markers.append(id(dct))
if any(i in _RESERVED_TAGS for i in dct.keys()):
res = {
"@object": {
k: FaunaEncoder._encode(v, markers) for k, v in dct.items()
}
}
markers.pop()
return res
else:
res = {k: FaunaEncoder._encode(v, markers) for k, v in dct.items()}
markers.pop()
return res
````
## File: fauna/encoding/wire_protocol.py
````python
from dataclasses import dataclass
from typing import Optional, Mapping, Any, List
class QueryStats:
"""Query stats"""
@property
def compute_ops(self) -> int:
"""The amount of Transactional Compute Ops consumed by the query."""
return self._compute_ops
@property
def read_ops(self) -> int:
"""The amount of Transactional Read Ops consumed by the query."""
return self._read_ops
@property
def write_ops(self) -> int:
"""The amount of Transactional Write Ops consumed by the query."""
return self._write_ops
@property
def query_time_ms(self) -> int:
"""The query run time in milliseconds."""
return self._query_time_ms
@property
def storage_bytes_read(self) -> int:
"""The amount of data read from storage, in bytes."""
return self._storage_bytes_read
@property
def storage_bytes_write(self) -> int:
"""The amount of data written to storage, in bytes."""
return self._storage_bytes_write
@property
def contention_retries(self) -> int:
"""The number of times the transaction was retried due to write contention."""
return self._contention_retries
@property
def attempts(self) -> int:
"""The number of attempts made by the client to run the query."""
return self._attempts
@attempts.setter
def attempts(self, value):
self._attempts = value
def __init__(self, stats: Mapping[str, Any]):
self._compute_ops = stats.get("compute_ops", 0)
self._read_ops = stats.get("read_ops", 0)
self._write_ops = stats.get("write_ops", 0)
self._query_time_ms = stats.get("query_time_ms", 0)
self._storage_bytes_read = stats.get("storage_bytes_read", 0)
self._storage_bytes_write = stats.get("storage_bytes_write", 0)
self._contention_retries = stats.get("contention_retries", 0)
self._attempts = 0
def __repr__(self):
stats = {
"compute_ops": self._compute_ops,
"read_ops": self._read_ops,
"write_ops": self._write_ops,
"query_time_ms": self._query_time_ms,
"storage_bytes_read": self._storage_bytes_read,
"storage_bytes_write": self._storage_bytes_write,
"contention_retries": self._contention_retries,
"attempts": self._attempts,
}
return f"{self.__class__.__name__}(stats={repr(stats)})"
def __eq__(self, other):
return type(self) == type(other) \
and self.compute_ops == other.compute_ops \
and self.read_ops == other.read_ops \
and self.write_ops == other.write_ops \
and self.query_time_ms == other.query_time_ms \
and self.storage_bytes_read == other.storage_bytes_read \
and self.storage_bytes_write == other.storage_bytes_write \
and self.contention_retries == other.contention_retries \
and self.attempts == other.attempts
def __ne__(self, other):
return not self.__eq__(other)
class QueryInfo:
@property
def query_tags(self) -> Mapping[str, Any]:
"""The tags associated with the query."""
return self._query_tags
@property
def summary(self) -> str:
"""A comprehensive, human readable summary of any errors, warnings and/or logs returned from the query."""
return self._summary
@property
def stats(self) -> QueryStats:
"""Query stats associated with the query."""
return self._stats
@property
def txn_ts(self) -> int:
"""The last transaction timestamp of the query. A Unix epoch in microseconds."""
return self._txn_ts
@property
def schema_version(self) -> int:
"""The schema version that was used for the query execution."""
return self._schema_version
def __init__(
self,
query_tags: Optional[Mapping[str, str]] = None,
stats: Optional[QueryStats] = None,
summary: Optional[str] = None,
txn_ts: Optional[int] = None,
schema_version: Optional[int] = None,
):
self._query_tags = query_tags or {}
self._stats = stats or QueryStats({})
self._summary = summary or ""
self._txn_ts = txn_ts or 0
self._schema_version = schema_version or 0
def __repr__(self):
return f"{self.__class__.__name__}(" \
f"query_tags={repr(self.query_tags)}," \
f"stats={repr(self.stats)}," \
f"summary={repr(self.summary)}," \
f"txn_ts={repr(self.txn_ts)}," \
f"schema_version={repr(self.schema_version)})"
class QuerySuccess(QueryInfo):
"""The result of the query."""
@property
def data(self) -> Any:
"""The data returned by the query. This is the result of the FQL query."""
return self._data
@property
def static_type(self) -> Optional[str]:
"""If typechecked, the query's inferred static result type, if the query was typechecked."""
return self._static_type
@property
def traceparent(self) -> Optional[str]:
"""The traceparent for the query."""
return self._traceparent
def __init__(
self,
data: Any,
query_tags: Optional[Mapping[str, str]],
static_type: Optional[str],
stats: Optional[QueryStats],
summary: Optional[str],
traceparent: Optional[str],
txn_ts: Optional[int],
schema_version: Optional[int],
):
super().__init__(
query_tags=query_tags,
stats=stats,
summary=summary,
txn_ts=txn_ts,
schema_version=schema_version,
)
self._traceparent = traceparent
self._static_type = static_type
self._data = data
def __repr__(self):
return f"{self.__class__.__name__}(" \
f"query_tags={repr(self.query_tags)}," \
f"static_type={repr(self.static_type)}," \
f"stats={repr(self.stats)}," \
f"summary={repr(self.summary)}," \
f"traceparent={repr(self.traceparent)}," \
f"txn_ts={repr(self.txn_ts)}," \
f"schema_version={repr(self.schema_version)}," \
f"data={repr(self.data)})"
@dataclass
class ConstraintFailure:
message: str
name: Optional[str] = None
paths: Optional[List[Any]] = None
class QueryTags:
@staticmethod
def encode(tags: Mapping[str, str]) -> str:
return ",".join([f"{k}={v}" for k, v in tags.items()])
@staticmethod
def decode(tag_str: str) -> Mapping[str, str]:
res: dict[str, str] = {}
for pair in tag_str.split(","):
kv = pair.split("=")
res[kv[0]] = kv[1]
return res
````
## File: fauna/errors/__init__.py
````python
from .errors import AuthenticationError, AuthorizationError, QueryCheckError, QueryRuntimeError, \
QueryTimeoutError, ServiceInternalError, ServiceTimeoutError, ThrottlingError, ContendedTransactionError, \
InvalidRequestError, AbortError, RetryableFaunaException
from .errors import ClientError, FaunaError, NetworkError
from .errors import FaunaException
from .errors import ProtocolError, ServiceError
````
## File: fauna/errors/errors.py
````python
from typing import Optional, List, Any, Mapping
from fauna.encoding import ConstraintFailure, QueryStats, QueryInfo, QueryTags
class FaunaException(Exception):
"""Base class Fauna Exceptions"""
pass
class RetryableFaunaException(FaunaException):
pass
class ClientError(FaunaException):
"""An error representing a failure internal to the client, itself.
This indicates Fauna was never called - the client failed internally
prior to sending the request."""
pass
class NetworkError(FaunaException):
"""An error representing a failure due to the network.
This indicates Fauna was never reached."""
pass
class ProtocolError(FaunaException):
"""An error representing a HTTP failure - but one not directly emitted by Fauna."""
@property
def status_code(self) -> int:
return self._status_code
@property
def message(self) -> str:
return self._message
def __init__(self, status_code: int, message: str):
self._status_code = status_code
self._message = message
def __str__(self):
return f"{self.status_code}: {self.message}"
class FaunaError(FaunaException):
"""Base class Fauna Errors"""
@property
def status_code(self) -> int:
return self._status_code
@property
def code(self) -> str:
return self._code
@property
def message(self) -> str:
return self._message
@property
def abort(self) -> Optional[Any]:
return self._abort
@property
def constraint_failures(self) -> Optional[List['ConstraintFailure']]:
return self._constraint_failures
def __init__(
self,
status_code: int,
code: str,
message: str,
abort: Optional[Any] = None,
constraint_failures: Optional[List['ConstraintFailure']] = None,
):
self._status_code = status_code
self._code = code
self._message = message
self._abort = abort
self._constraint_failures = constraint_failures
def __str__(self):
return f"{self.status_code}: {self.code}\n{self.message}"
@staticmethod
def parse_error_and_throw(body: Any, status_code: int):
err = body["error"]
code = err["code"]
message = err["message"]
query_tags = QueryTags.decode(
body["query_tags"]) if "query_tags" in body else None
stats = QueryStats(body["stats"]) if "stats" in body else None
txn_ts = body["txn_ts"] if "txn_ts" in body else None
schema_version = body["schema_version"] if "schema_version" in body else None
summary = body["summary"] if "summary" in body else None
constraint_failures: Optional[List[ConstraintFailure]] = None
if "constraint_failures" in err:
constraint_failures = [
ConstraintFailure(
message=cf["message"],
name=cf["name"] if "name" in cf else None,
paths=cf["paths"] if "paths" in cf else None,
) for cf in err["constraint_failures"]
]
if status_code >= 400 and status_code < 500:
if code == "invalid_query":
raise QueryCheckError(
status_code=400,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
elif code == "invalid_request":
raise InvalidRequestError(
status_code=400,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
elif code == "abort":
abort = err["abort"] if "abort" in err else None
raise AbortError(
status_code=400,
code=code,
message=message,
summary=summary,
abort=abort,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
elif code == "unauthorized":
raise AuthenticationError(
status_code=401,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
elif code == "forbidden" and status_code == 403:
raise AuthorizationError(
status_code=403,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
elif code == "method_not_allowed":
raise QueryRuntimeError(
status_code=405,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
elif code == "conflict":
raise ContendedTransactionError(
status_code=409,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
elif code == "request_size_exceeded":
raise QueryRuntimeError(
status_code=413,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
elif code == "limit_exceeded":
raise ThrottlingError(
status_code=429,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
elif code == "time_out":
raise QueryTimeoutError(
status_code=440,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
else:
raise QueryRuntimeError(
status_code=status_code,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
elif status_code == 500:
raise ServiceInternalError(
status_code=status_code,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
elif status_code == 503:
raise ServiceTimeoutError(
status_code=status_code,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
else:
raise ServiceError(
status_code=status_code,
code=code,
message=message,
summary=summary,
constraint_failures=constraint_failures,
query_tags=query_tags,
stats=stats,
txn_ts=txn_ts,
schema_version=schema_version,
)
class ServiceError(FaunaError, QueryInfo):
"""An error representing a query failure returned by Fauna."""
def __init__(
self,
status_code: int,
code: str,
message: str,
summary: Optional[str] = None,
abort: Optional[Any] = None,
constraint_failures: Optional[List['ConstraintFailure']] = None,
query_tags: Optional[Mapping[str, str]] = None,
stats: Optional[QueryStats] = None,
txn_ts: Optional[int] = None,
schema_version: Optional[int] = None,
):
QueryInfo.__init__(
self,
query_tags=query_tags,
stats=stats,
summary=summary,
txn_ts=txn_ts,
schema_version=schema_version,
)
FaunaError.__init__(
self,
status_code=status_code,
code=code,
message=message,
abort=abort,
constraint_failures=constraint_failures,
)
def __str__(self):
constraint_str = "---"
if self._constraint_failures:
constraint_str = f"---\nconstraint failures: {self._constraint_failures}\n---"
return f"{self._status_code}: {self.code}\n{self.message}\n{constraint_str}\n{self.summary or ''}"
class AbortError(ServiceError):
pass
class InvalidRequestError(ServiceError):
pass
class QueryCheckError(ServiceError):
"""An error due to a "compile-time" check of the query failing."""
pass
class ContendedTransactionError(ServiceError):
"""Transaction is aborted due to concurrent modification."""
pass
class QueryRuntimeError(ServiceError):
"""An error response that is the result of the query failing during execution.
QueryRuntimeError's occur when a bug in your query causes an invalid execution
to be requested.
The 'code' field will vary based on the specific error cause."""
pass
class AuthenticationError(ServiceError):
"""AuthenticationError indicates invalid credentials were used."""
pass
class AuthorizationError(ServiceError):
"""AuthorizationError indicates the credentials used do not have
permission to perform the requested action."""
pass
class ThrottlingError(ServiceError, RetryableFaunaException):
"""ThrottlingError indicates some capacity limit was exceeded
and thus the request could not be served."""
pass
class QueryTimeoutError(ServiceError):
"""A failure due to the timeout being exceeded, but the timeout
was set lower than the query's expected processing time.
This response is distinguished from a ServiceTimeoutException
in that a QueryTimeoutError shows Fauna behaving in an expected manner."""
pass
class ServiceInternalError(ServiceError):
"""ServiceInternalError indicates Fauna failed unexpectedly."""
pass
class ServiceTimeoutError(ServiceError):
"""ServiceTimeoutError indicates Fauna was not available to service
the request before the timeout was reached."""
pass
````
## File: fauna/http/__init__.py
````python
from .http_client import HTTPClient, HTTPResponse
from .httpx_client import HTTPXClient
````
## File: fauna/http/http_client.py
````python
import abc
import contextlib
from dataclasses import dataclass
from typing import Iterator, Mapping, Any
@dataclass(frozen=True)
class ErrorResponse:
status_code: int
error_code: str
error_message: str
summary: str
class HTTPResponse(abc.ABC):
@abc.abstractmethod
def headers(self) -> Mapping[str, str]:
pass
@abc.abstractmethod
def status_code(self) -> int:
pass
@abc.abstractmethod
def json(self) -> Any:
pass
@abc.abstractmethod
def text(self) -> str:
pass
@abc.abstractmethod
def read(self) -> bytes:
pass
@abc.abstractmethod
def iter_bytes(self) -> Iterator[bytes]:
pass
@abc.abstractmethod
def close(self):
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()
class HTTPClient(abc.ABC):
@abc.abstractmethod
def request(
self,
method: str,
url: str,
headers: Mapping[str, str],
data: Mapping[str, Any],
) -> HTTPResponse:
pass
@abc.abstractmethod
@contextlib.contextmanager
def stream(
self,
url: str,
headers: Mapping[str, str],
data: Mapping[str, Any],
) -> Iterator[Any]:
pass
@abc.abstractmethod
def close(self):
pass
````
## File: fauna/http/httpx_client.py
````python
import json
import logging
from contextlib import contextmanager
from json import JSONDecodeError
from typing import Mapping, Any, Optional, Iterator
import httpx
from fauna.errors import ClientError, NetworkError
from fauna.http.http_client import HTTPResponse, HTTPClient
class HTTPXResponse(HTTPResponse):
def __init__(self, response: httpx.Response):
self._r = response
def headers(self) -> Mapping[str, str]:
h = {}
for (k, v) in self._r.headers.items():
h[k] = v
return h
def json(self) -> Any:
try:
decoded = self._r.read().decode("utf-8")
return json.loads(decoded)
except (JSONDecodeError, UnicodeDecodeError) as e:
raise ClientError(
f"Unable to decode response from endpoint {self._r.request.url}. Check that your endpoint is valid."
) from e
def text(self) -> str:
return str(self.read(), encoding='utf-8')
def status_code(self) -> int:
return self._r.status_code
def read(self) -> bytes:
return self._r.read()
def iter_bytes(self, size: Optional[int] = None) -> Iterator[bytes]:
return self._r.iter_bytes(size)
def close(self) -> None:
try:
self._r.close()
except Exception as e:
raise ClientError("Error closing response") from e
class HTTPXClient(HTTPClient):
def __init__(self,
client: httpx.Client,
logger: logging.Logger = logging.getLogger("fauna")):
super(HTTPXClient, self).__init__()
self._c = client
self._logger = logger
def request(
self,
method: str,
url: str,
headers: Mapping[str, str],
data: Mapping[str, Any],
) -> HTTPResponse:
try:
request = self._c.build_request(
method,
url,
json=data,
headers=headers,
)
if self._logger.isEnabledFor(logging.DEBUG):
headers_to_log = request.headers.copy()
headers_to_log.pop("Authorization")
self._logger.debug(
f"query.request method={request.method} url={request.url} headers={headers_to_log} data={data}"
)
except httpx.InvalidURL as e:
raise ClientError("Invalid URL Format") from e
try:
response = self._c.send(
request,
stream=False,
)
if self._logger.isEnabledFor(logging.DEBUG):
self._logger.debug(
f"query.response status_code={response.status_code} headers={response.headers} data={response.text}"
)
return HTTPXResponse(response)
except (httpx.HTTPError, httpx.InvalidURL) as e:
raise NetworkError("Exception re-raised from HTTP request") from e
@contextmanager
def stream(
self,
url: str,
headers: Mapping[str, str],
data: Mapping[str, Any],
) -> Iterator[Any]:
request = self._c.build_request(
method="POST",
url=url,
headers=headers,
json=data,
)
if self._logger.isEnabledFor(logging.DEBUG):
headers_to_log = request.headers.copy()
headers_to_log.pop("Authorization")
self._logger.debug(
f"stream.request method={request.method} url={request.url} headers={headers_to_log} data={data}"
)
response = self._c.send(
request=request,
stream=True,
)
try:
yield self._transform(response)
finally:
response.close()
def _transform(self, response):
try:
for line in response.iter_lines():
loaded = json.loads(line)
if self._logger.isEnabledFor(logging.DEBUG):
self._logger.debug(f"stream.data data={loaded}")
yield loaded
except httpx.ReadTimeout as e:
raise NetworkError("Stream timeout") from e
except (httpx.HTTPError, httpx.InvalidURL) as e:
raise NetworkError("Exception re-raised from HTTP request") from e
def close(self):
self._c.close()
````
## File: fauna/query/__init__.py
````python
from .models import Document, DocumentReference, EventSource, NamedDocument, NamedDocumentReference, NullDocument, Module, Page
from .query_builder import fql, Query
````
## File: fauna/query/models.py
````python
import warnings
from collections.abc import Mapping
from datetime import datetime
from typing import Union, Iterator, Any, Optional, List
# NB. Override __getattr__ and __dir__ to deprecate StreamToken usages. Based
# on: https://peps.python.org/pep-0562/
def __getattr__(name):
if name == "StreamToken":
warnings.warn(
"StreamToken is deprecated. Prefer fauna.query.EventSource instead.",
DeprecationWarning,
stacklevel=2)
return EventSource
return super.__getattr__(name) # pyright: ignore
def __dir__():
return list(super.__dir__(None)) + list("StreamToken") # pyright: ignore
class Page:
"""A class representing a Set in Fauna."""
def __init__(self,
data: Optional[List[Any]] = None,
after: Optional[str] = None):
self.data = data
self.after = after
def __repr__(self):
args = []
if self.data is not None:
args.append(f"data={repr(self.data)}")
if self.after is not None:
args.append(f"after={repr(self.after)}")
return f"{self.__class__.__name__}({','.join(args)})"
def __iter__(self) -> Iterator[Any]:
return iter(self.data or [])
def __eq__(self, other):
return isinstance(
other, Page) and self.data == other.data and self.after == other.after
def __hash__(self):
return hash((type(self), self.data, self.after))
def __ne__(self, other):
return not self.__eq__(other)
class EventSource:
"""A class represeting an EventSource in Fauna."""
def __init__(self, token: str):
self.token = token
def __eq__(self, other):
return isinstance(other, EventSource) and self.token == other.token
def __hash__(self):
return hash(self.token)
class Module:
"""A class representing a Module in Fauna. Examples of modules include Collection, Math, and a user-defined
collection, among others.
Usage:
dogs = Module("Dogs")
query = fql("${col}.all", col=dogs)
"""
def __init__(self, name: str):
self.name = name
def __repr__(self):
return f"{self.__class__.__name__}(name={repr(self.name)})"
def __eq__(self, other):
return isinstance(other, Module) and str(self) == str(other)
def __hash__(self):
return hash(self.name)
class BaseReference:
_collection: Module
@property
def coll(self) -> Module:
return self._collection
def __init__(self, coll: Union[str, Module]):
if isinstance(coll, Module):
self._collection = coll
elif isinstance(coll, str):
self._collection = Module(coll)
else:
raise TypeError(
f"'coll' should be of type Module or str, but was {type(coll)}")
def __repr__(self):
return f"{self.__class__.__name__}(coll={repr(self._collection)})"
def __eq__(self, other):
return isinstance(other, type(self)) and str(self) == str(other)
class DocumentReference(BaseReference):
"""A class representing a reference to a :class:`Document` stored in Fauna.
"""
@property
def id(self) -> str:
"""The ID for the :class:`Document`. Valid IDs are 64-bit integers, stored as strings.
:rtype: str
"""
return self._id
def __init__(self, coll: Union[str, Module], id: str):
super().__init__(coll)
if not isinstance(id, str):
raise TypeError(f"'id' should be of type str, but was {type(id)}")
self._id = id
def __hash__(self):
return hash((type(self), self._collection, self._id))
def __repr__(self):
return f"{self.__class__.__name__}(id={repr(self._id)},coll={repr(self._collection)})"
@staticmethod
def from_string(ref: str):
rs = ref.split(":")
if len(rs) != 2:
raise ValueError("Expects string of format :")
return DocumentReference(rs[0], rs[1])
class NamedDocumentReference(BaseReference):
"""A class representing a reference to a :class:`NamedDocument` stored in Fauna.
"""
@property
def name(self) -> str:
"""The name of the :class:`NamedDocument`.
:rtype: str
"""
return self._name
def __init__(self, coll: Union[str, Module], name: str):
super().__init__(coll)
if not isinstance(name, str):
raise TypeError(f"'name' should be of type str, but was {type(name)}")
self._name = name
def __hash__(self):
return hash((type(self), self._collection, self._name))
def __repr__(self):
return f"{self.__class__.__name__}(name={repr(self._name)},coll={repr(self._collection)})"
class NullDocument:
@property
def cause(self) -> Optional[str]:
return self._cause
@property
def ref(self) -> Union[DocumentReference, NamedDocumentReference]:
return self._ref
def __init__(
self,
ref: Union[DocumentReference, NamedDocumentReference],
cause: Optional[str] = None,
):
self._cause = cause
self._ref = ref
def __repr__(self):
return f"{self.__class__.__name__}(ref={repr(self.ref)},cause={repr(self._cause)})"
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
return self.ref == other.ref and self.cause == other.cause
def __ne__(self, other):
return not self == other
class BaseDocument(Mapping):
"""A base document class implementing an immutable mapping.
"""
def __init__(self, *args, **kwargs):
self._store = dict(*args, **kwargs)
def __getitem__(self, __k: str) -> Any:
return self._store[__k]
def __len__(self) -> int:
return len(self._store)
def __iter__(self) -> Iterator[Any]:
return iter(self._store)
def __eq__(self, other):
if not isinstance(other, type(self)):
return False
if len(self) != len(other):
return False
for k, v in self.items():
if k not in other:
return False
if self[k] != other[k]:
return False
return True
def __ne__(self, other):
return not self.__eq__(other)
class Document(BaseDocument):
"""A class representing a user document stored in Fauna.
User data should be stored directly on the map, while id, ts, and coll should only be stored on the related
properties. When working with a :class:`Document` in code, it should be considered immutable.
"""
@property
def id(self) -> str:
return self._id
@property
def ts(self) -> datetime:
return self._ts
@property
def coll(self) -> Module:
return self._coll
def __init__(self,
id: str,
ts: datetime,
coll: Union[str, Module],
data: Optional[Mapping] = None):
if not isinstance(id, str):
raise TypeError(f"'id' should be of type str, but was {type(id)}")
if not isinstance(ts, datetime):
raise TypeError(f"'ts' should be of type datetime, but was {type(ts)}")
if not (isinstance(coll, str) or isinstance(coll, Module)):
raise TypeError(
f"'coll' should be of type Module or str, but was {type(coll)}")
if isinstance(coll, str):
coll = Module(coll)
self._id = id
self._ts = ts
self._coll = coll
super().__init__(data or {})
def __eq__(self, other):
return type(self) == type(other) \
and self.id == other.id \
and self.coll == other.coll \
and self.ts == other.ts \
and super().__eq__(other)
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
kvs = ",".join([f"{repr(k)}:{repr(v)}" for k, v in self.items()])
return f"{self.__class__.__name__}(" \
f"id={repr(self.id)}," \
f"coll={repr(self.coll)}," \
f"ts={repr(self.ts)}," \
f"data={{{kvs}}})"
class NamedDocument(BaseDocument):
"""A class representing a named document stored in Fauna. Examples of named documents include Collection
definitions, Index definitions, and Roles, among others.
When working with a :class:`NamedDocument` in code, it should be considered immutable.
"""
@property
def name(self) -> str:
return self._name
@property
def ts(self) -> datetime:
return self._ts
@property
def coll(self) -> Module:
return self._coll
def __init__(self,
name: str,
ts: datetime,
coll: Union[Module, str],
data: Optional[Mapping] = None):
if not isinstance(name, str):
raise TypeError(f"'name' should be of type str, but was {type(name)}")
if not isinstance(ts, datetime):
raise TypeError(f"'ts' should be of type datetime, but was {type(ts)}")
if not (isinstance(coll, str) or isinstance(coll, Module)):
raise TypeError(
f"'coll' should be of type Module or str, but was {type(coll)}")
if isinstance(coll, str):
coll = Module(coll)
self._name = name
self._ts = ts
self._coll = coll
super().__init__(data or {})
def __eq__(self, other):
return type(self) == type(other) \
and self.name == other.name \
and self.coll == other.coll \
and self.ts == other.ts \
and super().__eq__(other)
def __ne__(self, other):
return not self.__eq__(other)
def __repr__(self):
kvs = ",".join([f"{repr(k)}:{repr(v)}" for k, v in self.items()])
return f"{self.__class__.__name__}(" \
f"name={repr(self.name)}," \
f"coll={repr(self.coll)}," \
f"ts={repr(self.ts)}," \
f"data={{{kvs}}})"
````
## File: fauna/query/query_builder.py
````python
import abc
from typing import Any, Optional, List
from .template import FaunaTemplate
class Fragment(abc.ABC):
"""An abstract class representing a Fragment of a query.
"""
@abc.abstractmethod
def get(self) -> Any:
"""An abstract method for returning a stored value.
"""
pass
class ValueFragment(Fragment):
"""A concrete :class:`Fragment` representing a part of a query that can represent a template variable.
For example, if a template contains a variable ``${foo}``, and an object ``{ "prop": 1 }`` is provided for foo,
then ``{ "prop": 1 }`` should be wrapped as a :class:`ValueFragment`.
:param Any val: The value to be used as a fragment.
"""
def __init__(self, val: Any):
self._val = val
def get(self) -> Any:
"""Gets the stored value.
:returns: The stored value.
"""
return self._val
class LiteralFragment(Fragment):
"""A concrete :class:`Fragment` representing a query literal For example, in the template ```let x = ${foo}```,
the portion ```let x = ``` is a query literal and should be wrapped as a :class:`LiteralFragment`.
:param str val: The query literal to be used as a fragment.
"""
def __init__(self, val: str):
self._val = val
def get(self) -> str:
"""Returns the stored value.
:returns: The stored value.
"""
return self._val
class Query:
"""A class for representing a query.
e.g. { "fql": [...] }
"""
_fragments: List[Fragment]
def __init__(self, fragments: Optional[List[Fragment]] = None):
self._fragments = fragments or []
@property
def fragments(self) -> List[Fragment]:
"""The list of stored Fragments"""
return self._fragments
def __str__(self) -> str:
res = ""
for f in self._fragments:
res += str(f.get())
return res
def fql(query: str, **kwargs: Any) -> Query:
"""Creates a Query - capable of performing query composition and simple querying. It can accept a
simple string query, or can perform composition using ``${}`` sigil string template with ``**kwargs`` as
substitutions.
The ``**kwargs`` can be Fauna data types - such as strings, document references, or modules - and embedded
Query - allowing you to compose arbitrarily complex queries.
When providing ``**kwargs``, following types are accepted:
- :class:`str`, :class:`int`, :class:`float`, :class:`bool`, :class:`datetime.datetime`, :class:`datetime.date`,
:class:`dict`, :class:`list`, :class:`Query`, :class:`DocumentReference`, :class:`Module`
:raises ValueError: If there is an invalid template placeholder or a value that cannot be encoded.
:returns: A :class:`Query` that can be passed to the client for evaluation against Fauna.
Examples:
.. code-block:: python
:name: Simple-FQL-Example
:caption: Simple query declaration using this function.
fql('Dogs.byName("Fido")')
.. code-block:: python
:name: Composition-FQL-Example
:caption: Query composition using this function.
def get_dog(id):
return fql('Dogs.byId(${id})', id=id)
def get_vet_phone(id):
return fql('${dog} { .vet_phone_number }', dog=get_dog(id))
get_vet_phone('d123')
"""
fragments: List[Any] = []
template = FaunaTemplate(query)
for text, field_name in template.iter():
if text is not None and len(text) > 0:
fragments.append(LiteralFragment(text))
if field_name is not None:
if field_name not in kwargs:
raise ValueError(
f"template variable `{field_name}` not found in provided kwargs")
# TODO: Reject if it's already a fragment, or accept *Fragment? Decide on API here
fragments.append(ValueFragment(kwargs[field_name]))
return Query(fragments)
````
## File: fauna/query/template.py
````python
import re as _re
from typing import Optional, Tuple, Iterator, Match
class FaunaTemplate:
"""A template class that supports variables marked with a ${}-sigil. Its primary purpose
is to expose an iterator for the template parts that support composition of FQL queries.
Implementation adapted from https://github.com/python/cpython/blob/main/Lib/string.py
:param template: A string template e.g. "${my_var} { name }"
:type template: str
"""
_delimiter = '$'
_idpattern = r'[_a-zA-Z][_a-zA-Z0-9]*'
_flags = _re.VERBOSE
def __init__(self, template: str):
"""The initializer"""
delim = _re.escape(self._delimiter)
pattern = fr"""
{delim}(?:
(?P{delim}) | # Escape sequence of two delimiters
{{(?P{self._idpattern})}} | # delimiter and a braced identifier
(?P) # Other ill-formed delimiter exprs
)
"""
self._pattern = _re.compile(pattern, self._flags)
self._template = template
def iter(self) -> Iterator[Tuple[Optional[str], Optional[str]]]:
"""A method that returns an iterator over tuples representing template parts. The
first value of the tuple, if not None, is a template literal. The second value of
the tuple, if not None, is a template variable. If both are not None, then the
template literal comes *before* the variable.
:raises ValueError: If there is an invalid template placeholder
:return: An iterator of template parts
:rtype: collections.Iterable[Tuple[Optional[str], Optional[str]]]
"""
match_objects = self._pattern.finditer(self._template)
cur_pos = 0
for mo in match_objects:
if mo.group("invalid") is not None:
self._handle_invalid(mo)
span_start_pos = mo.span()[0]
span_end_pos = mo.span()[1]
escaped_part = mo.group("escaped") or ""
variable_part = mo.group("braced")
literal_part: Optional[str] = None
if cur_pos != span_start_pos:
literal_part = \
self._template[cur_pos:span_start_pos] \
+ escaped_part
cur_pos = span_end_pos
yield literal_part, variable_part
if cur_pos != len(self._template):
yield self._template[cur_pos:], None
def _handle_invalid(self, mo: Match) -> None:
i = mo.start("invalid")
lines = self._template[:i].splitlines(keepends=True)
if not lines:
colno = 1
lineno = 1
else:
colno = i - len(''.join(lines[:-1]))
lineno = len(lines)
raise ValueError(
f"Invalid placeholder in template: line {lineno}, col {colno}")
````
## File: fauna/__init__.py
````python
__title__ = "Fauna"
__version__ = "2.4.0"
__api_version__ = "10"
__author__ = "Fauna, Inc"
__license__ = "MPL 2.0"
__copyright__ = "2023 Fauna, Inc"
from fauna.query import fql, Document, DocumentReference, NamedDocument, NamedDocumentReference, NullDocument, Module, Page
global_http_client = None
````