# Fauna v10 Python client driver (current) | Version: 2.4.0 | Repository: fauna/fauna-python | | --- | --- | --- | --- | Fauna’s Python client driver lets you run FQL queries from Python applications. This guide shows how to set up the driver and use it to run FQL queries. ## [](#supported-python-versions)Supported Python versions * Python 3.9 * Python 3.10 * Python 3.11 * Python 3.12 ## [](#supported-cloud-runtimes)Supported cloud runtimes * AWS Lambda (See [AWS Lambda connections](#aws-lambda-connections)) * Vercel Functions ## [](#installation)Installation The driver is available on [PyPI](https://pypi.org/project/fauna/). To install it, run: ```bash pip install fauna ``` ## [](#api-reference)API reference API reference documentation for the driver is available at [https://fauna.github.io/fauna-python/](https://fauna.github.io/fauna-python/). ## [](#sample-app)Sample app For a practical example, check out the [Python sample app](https://github.com/fauna/python-sample-app). This sample app is an e-commerce application that uses Python3, Flask, and the Fauna Python driver. The source code includes comments highlighting best practices for using the driver and composing FQL queries. ## [](#basic-usage)Basic usage The following application: * Initializes a client instance to connect to Fauna * Composes a basic FQL query using an `fql` string template * Runs the query using `query()` ```python from fauna import fql from fauna.client import Client from fauna.encoding import QuerySuccess from fauna.errors import FaunaException # Initialize the client to connect to Fauna client = Client(secret='FAUNA_SECRET') try: # Compose a query query = fql( """ Product.sortedByPriceLowToHigh() { name, description, price }""" ) # Run the query res: QuerySuccess = client.query(query) print(res.data) except FaunaException as e: print(e) finally: # Clean up any remaining resources client.close() ``` ## [](#connect-to-fauna)Connect to Fauna Each Fauna query is an independently authenticated request to the Core HTTP API’s [Query endpoint](../../../reference/http/reference/core-api/#operation/query). You authenticate with Fauna using an [authentication secret](../../../learn/security/authentication/#secrets). ### [](#get-an-authentication-secret)Get an authentication secret Fauna supports several [secret types](../../../learn/security/authentication/#secret-types). For testing, you can create a [key](../../../learn/security/keys/), which is a type of secret: 1. Log in to the [Fauna Dashboard](https://dashboard.fauna.com/). 2. On the **Explorer** page, create a database. 3. In the database’s **Keys** tab, click **Create Key**. 4. Choose a **Role** of **server**. 5. Click **Save**. 6. Copy the **Key Secret**. The secret is scoped to the database. ### [](#initialize-a-client)Initialize a client To send query requests to Fauna, initialize a `Client` instance using a Fauna authentication secret: ```python client = Client(secret='FAUNA_SECRET') ``` If not specified, `secret` defaults to the `FAUNA_SECRET` environment variable. For other configuration options, see [Client configuration](#config). ### [](#connect-to-a-child-database)Connect to a child database A [scoped key](../../../learn/security/keys/#scoped-keys) lets you use a parent database’s admin key to send query requests to its child databases. For example, if you have an admin key for a parent database and want to connect to a child database named `childDB`, you can create a scoped key using the following format: ``` // Scoped key that impersonates an `admin` key for // the `childDB` child database. fn...:childDB:admin ``` You can then initialize a `Client` instance using the scoped key: ```python client = Client(secret='fn...:childDB:admin') ``` ### [](#multiple-connections)Multiple connections You can use a single client instance to run multiple asynchronous queries at once. The driver manages HTTP connections as needed. Your app doesn’t need to implement connection pools or other connection management strategies. You can create multiple client instances to connect to Fauna using different credentials or client configurations. ### [](#aws-lambda-connections)AWS Lambda connections AWS Lambda freezes, thaws, and reuses execution environments for Lambda functions. See [Lambda execution environment](https://docs.aws.amazon.com/lambda/latest/dg/running-lambda-code.html). When an execution environment is thawed, Lambda only runs the function’s handler code. Objects declared outside of the handler method remain initialized from before the freeze. Lambda doesn’t re-run initialization code outside the handler. Fauna drivers keep socket connections that can time out during long freezes, causing `ECONNRESET` errors when thawed. To prevent timeouts, create Fauna client connections inside function handlers. Fauna drivers use lightweight HTTP connections. You can create new connections for each request while maintaining good performance. ## [](#run-fql-queries)Run FQL queries Use `fql` string templates to compose FQL queries. Run the queries using `query()`: ```python query = fql("Product.sortedByPriceLowToHigh()") client.query(query) ``` By default, `query()` uses query options from the [Client configuration](#config). You can pass options to `query()` to override these defaults. See [Query options](#query-opts). You can only compose FQL queries using string templates. ### [](#var)Variable interpolation The driver supports queries with Python primitives, lists, and dicts. Use `${}` to pass native Python variables to `fql` queries as kwargs. You can escape a variable by prepending an additional `$`. ```python # Create a native Python var collection_name = 'Product' # Pass the var to an FQL query query = fql(''' let collection = Collection(${collection_name}) collection.sortedByPriceLowToHigh()''', collection_name=collection_name) client.query(query); ``` The driver encodes interpolated variables to an appropriate [FQL type](../../../reference/fql/types/) and uses the [wire protocol](../../../reference/http/reference/wire-protocol/) to pass the query to the Core HTTP API’s [Query endpoint](../../../reference/http/reference/core-api/#operation/query). This helps prevent injection attacks. ### [](#query-composition)Query composition You can use variable interpolation to pass FQL string templates as query fragments to compose an FQL query: ```python # Create a reusable query fragment. product = fql('Product.byName("pizza").first()') # Use the fragment in another FQL query. query = fql(f''' let product = {product} product {{ name, price }} ''') client.query(query) ``` ### [](#pagination)Pagination Use `paginate()` to iterate through a Set that contains more than one page of results. `paginate()` accepts the same [Query options](#query-opts) as `query()`. ```python # Adjust `pageSize()` size as needed. query = fql(''' Product.sortedByPriceLowToHigh() .pageSize(2)''') pages = client.paginate(query) for products in pages: for product in products: print(product) ``` ### [](#query-stats)Query stats Successful query responses and `ServiceError` errors return [query stats](../../../reference/http/reference/query-stats/): ```python from fauna import fql from fauna.client import Client from fauna.errors import ServiceError client = Client(secret='FAUNA_SECRET') try: query = fql('"Hello world"') res = client.query(query) print(res.stats) except ServiceError as e: if e.stats is not None: print(e.stats) # more error handling... ``` ### [](#user-defined-classes)User-defined classes Serialization and deserialization with user-defined classes is not supported. When composing FQL queries, adapt your classes into dicts or lists. When instantiating classes from a query result, build them from the expected result. ```python class MyClass: def __init__ (self, my_prop): self.my_prop = my_prop def to_dict(self): return { 'my_prop': self.my_prop } @static_method def from_result(obj): return MyClass(obj['my_prop']) ``` ## [](#config)Client configuration The `Client` instance comes with reasonable configuration defaults. We recommend using the defaults in most cases. If needed, you can configure the client to override the defaults. This also lets you set default [Query options](#query-opts). ```python from datetime import timedelta from fauna.client import Client from fauna.client.headers import Header from fauna.client.endpoints import Endpoints config = { # Configure the client 'secret': 'FAUNA_SECRET', 'endpoint': Endpoints.Default, 'client_buffer_timeout': timedelta(seconds=5), 'http_read_timeout': None, 'http_write_timeout': timedelta(seconds=5), 'http_connect_timeout': timedelta(seconds=5), 'http_pool_timeout': timedelta(seconds=5), 'http_idle_timeout': timedelta(seconds=5), 'max_attempts': 3, 'max_backoff': 20, # Set default query options 'additional_headers': {'foo': 'bar'}, 'linearized': False, 'max_contention_retries': 5, 'query_tags': {'tag': 'value'}, 'query_timeout': timedelta(seconds=60), 'typecheck': True, } client = Client(**config) ``` For supported parameters, see [Client](https://fauna.github.io/fauna-python/latest/api/fauna/client/client.html#Client) in the API reference. ### [](#environment-variables)Environment variables By default, `secret` and `endpoint` default to the respective `FAUNA_SECRET` and `FAUNA_ENDPOINT` environment variables. For example, if you set the following environment variables: ```bash export FAUNA_SECRET=FAUNA_SECRET export FAUNA_ENDPOINT=https://db.fauna.com/ ``` You can initialize the client with a default configuration: ```python client = Client() ``` ### [](#retries)Retries By default, the client automatically retries query requests that return a `limit_exceeded` [\[error code](../../../reference/http/reference/errors/). Retries use an exponential backoff. Use the [Client configuration](#config)'s `max_backoff` parameter to set the maximum time between retries. Similarly, use `max_attempts` to set the maximum number of retry attempts. ## [](#query-opts)Query options The [Client configuration](#config) sets default query options for the following methods: * `query()` * `paginate()` You can pass a `QueryOptions` object to override these defaults: ```python options = QueryOptions( additional_headers={'foo': 'bar'}, linearized=False, max_contention_retries=5, query_tags={'name': 'hello world query'}, query_timeout=timedelta(seconds=60), traceparent='00-750efa5fb6a131eb2cf4db39f28366cb-000000000000000b-00', typecheck=True ) client.query(fql('"Hello world"'), options) ``` For supported properties, see [QueryOptions](https://fauna.github.io/fauna-python/latest/api/fauna/client/client.html#QueryOptions) in the API reference. ## [](#event-feeds)Event feeds The driver supports [event feeds](../../../learn/cdc/#event-feeds). An event feed asynchronously polls an [event source](../../../learn/cdc/) for events. To use event feeds, you must have a Pro or Enterprise plan. ### [](#request-an-event-feed)Request an event feed To get an event source, append [`set.eventSource()`](../../../reference/fql-api/set/eventsource/) or [`set.eventsOn()`](../../../reference/fql-api/set/eventson/) to a [supported Set](../../../learn/cdc/#sets). To get paginated events, pass the event source to `feed()`: ```python from fauna import fql from fauna.client import Client client = Client() response = client.query(fql(''' let set = Product.all() { initialPage: set.pageSize(10), eventSource: set.eventSource() } ''')) initial_page = response.data['initialPage'] event_source = response.data['eventSource'] feed = client.feed(event_source) ``` If changes occur between the creation of the event source and the `feed()` request, the feed replays and emits any related events. You can also pass a query that produces an event source directly to `feed()`: ```python query = fql('Product.all().eventsOn(.price, .stock)') feed = client.feed(query) ``` In most cases, you’ll get events after a specific [start time](#start-time) or [cursor](#cursor). #### [](#start-time)Get events after a specific start time When you first poll an event source using an event feed, you usually include a `start_ts` (start timestamp) in the [`FeedOptions` object](#event-feed-opts) that’s passed to `feed()`. The request returns events that occurred after the specified timestamp (exclusive). `start_ts` is an integer representing a time in microseconds since the Unix epoch: ```python from fauna import fql from fauna.client import Client, FeedOptions from datetime import datetime, timedelta client = Client() # Calculate timestamp for 10 minutes ago ten_minutes_ago = datetime.now() - timedelta(minutes=10) # Convert to microseconds start_ts = int(ten_minutes_ago.timestamp() * 1_000_000) options = FeedOptions( start_ts=start_ts ) feed = client.feed(fql('Product.all().eventSource()'), options) ``` #### [](#cursor)Get events after a specific cursor After the initial request, you usually get subsequent events using the [cursor](../../../learn/cdc/#cursor) for the last page or event. To get events after a cursor (exclusive), include the `cursor` in the [`FeedOptions` object](#event-feed-opts) that’s passed to `feed()`: ```python from fauna import fql from fauna.client import Client, FeedOptions from datetime import datetime, timedelta client = Client() options = FeedOptions( # Cursor for a previous page cursor='gsGabc456' ) feed = client.feed(fql('Product.all().eventSource()'), options) ``` ### [](#loop)Iterate on an event feed `feed()` returns an iterator that emits pages of events. You can use a for loop to iterate through the pages: ```python query = fql('Product.all().eventsOn(.price, .stock)') # Calculate timestamp for 10 minutes ago ten_minutes_ago = datetime.now() - timedelta(minutes=10) start_ts = int(ten_minutes_ago.timestamp() * 1_000_000) options = FeedOptions( start_ts=start_ts ) feed = client.feed(query, options) for page in feed: print('Page stats: ', page.stats) for event in page: eventType = event['type'] if (eventType == 'add'): # Do something on add print('Add event: ', event) elif (eventType == 'update'): # Do something on update print('Update event: ', event) elif (eventType == 'remove'): # Do something on remove print('Remove event: ', event) ``` The event feed iterator will stop once there are no more events to poll. Each page includes a top-level `cursor`. You can include the cursor in a [`FeedOptions` object](#event-feed-opts) passed to `feed()` to poll for events after the cursor: ```python import time from datetime import datetime, timedelta from fauna import fql from fauna.client import Client, FeedOptions def process_feed(client, query, start_ts=None, sleep_time=300): cursor = None while True: options = FeedOptions( start_ts=start_ts if cursor is None else None, cursor=cursor, ) feed = client.feed(query, options) for page in feed: for event in page: event_type = event['type'] if event_type == 'add': # Do something on add print('Add event: ', event) elif event_type == 'update': # Do something on update print('Update event: ', event) elif event_type == 'remove': # Do something on remove print('Remove event: ', event) # Store the cursor of the last page cursor = page.cursor # Clear the start timestamp after the first request start_ts = None print(f"Sleeping for {sleep_time} seconds...") time.sleep(sleep_time) client = Client() query = fql('Product.all().eventsOn(.price, .stock)') # Calculate timestamp for 10 minutes ago ten_minutes_ago = datetime.now() - timedelta(minutes=10) start_ts = int(ten_minutes_ago.timestamp() * 1_000_000) process_feed(client, query, start_ts=start_ts) ``` Alternatively, you can get events as a single, flat array: ```python import time from datetime import datetime, timedelta from fauna import fql from fauna.client import Client, FeedOptions def process_feed(client, query, start_ts=None, sleep_time=300): cursor = None while True: options = FeedOptions( start_ts=start_ts if cursor is None else None, cursor=cursor, ) feed = client.feed(query, options) for event in feed.flatten(): event_type = event['type'] if event_type == 'add': # Do something on add print('Add event: ', event) elif event_type == 'update': # Do something on update print('Update event: ', event) elif event_type == 'remove': # Do something on remove print('Remove event: ', event) # Store the cursor of the last page cursor = event['cursor'] # Clear the start timestamp after the first request start_ts = None print(f"Sleeping for {sleep_time} seconds...") time.sleep(sleep_time) client = Client() query = fql('Product.all().eventsOn(.price, .stock)') # Calculate timestamp for 10 minutes ago ten_minutes_ago = datetime.now() - timedelta(minutes=10) start_ts = int(ten_minutes_ago.timestamp() * 1_000_000) process_feed(client, query, start_ts=start_ts) ``` If needed, you can store the cursor as a collection document. For an example, see the [event feeds app](../../sample-apps/event-feeds/). ### [](#error-handling)Error handling If a non-retryable error occurs when opening or processing an event feed, Fauna raises a `FaunaException`: ```python from fauna import fql from fauna.client import Client from fauna.errors import FaunaException client = Client() # Calculate timestamp for 10 minutes ago ten_minutes_ago = datetime.now() - timedelta(minutes=10) start_ts = int(ten_minutes_ago.timestamp() * 1_000_000) options = FeedOptions( start_ts=start_ts ) feed = client.feed(fql( 'Product.all().eventsOn(.price, .stock)' ), options) for page in feed: try: for event in page: print(event) # ... except FaunaException as e: print('error ocurred with event processing: ', e) # The current event will be skipped ``` Each page’s `cursor` contains the cursor for the page’s last successfully processed event. If you’re using a [loop to poll for changes](#loop), using the cursor will result in skipping any events that caused errors. ### [](#event-feed-opts)Event feed options The client configuration sets default options for the `feed()` method. You can pass a `FeedOptions` object to override these defaults: ```python from fauna import fql from fauna.client import Client, FeedOptions from datetime import timedelta client = Client() options = FeedOptions( max_attempts=3, max_backoff=20, query_timeout=timedelta(seconds=5), page_size=None, cursor=None, start_ts=None, ) client.feed(fql('Product.all().eventSource()'), options) ``` For supported properties, see [FeedOptions](https://fauna.github.io/fauna-python/latest/api/fauna/client/client.html#FeedOptions) in the API reference. ### [](#sample-app-2)Sample app For a practical example that uses the Python driver with event feeds, check out the [event feeds sample app](../../sample-apps/event-feeds/). ## [](#event-streaming)Event streams The driver supports [event streams](../../../learn/cdc/). ### [](#start-a-stream)Start a stream To get an event source, append [`set.eventSource()`](../../../reference/fql-api/set/eventsource/) or [`set.eventsOn()`](../../../reference/fql-api/set/eventson/) to a [supported Set](../../../learn/cdc/#sets). To stream the source’s events, pass the event source to `stream()`: ```python import fauna from fauna import fql from fauna.client import Client, StreamOptions client = Client() response = client.query(fql(''' let set = Product.all() { initialPage: set.pageSize(10), eventSource: set.eventSource() } ''')) initial_page = response.data['initialPage'] event_source = response.data['eventSource'] client.stream(event_source) ``` You can also pass a query that produces an event source directly to `stream()`: ```python query = fql('Product.all().eventsOn(.price, .stock)') client.stream(query) ``` ### [](#iterate-on-a-stream)Iterate on a stream `stream()` returns an iterator that emits events as they occur. You can use a generator expression to iterate through the events: ```python query = fql('Product.all().eventsOn(.price, .stock)') with client.stream(query) as stream: for event in stream: eventType = event['type'] if (eventType == 'add'): print('Add event: ', event) ## ... elif (eventType == 'update'): print('Update event: ', event) ## ... elif (eventType == 'remove'): print('Remove event: ', event) ## ... ``` ### [](#close-a-stream)Close a stream Use `close()` to close a stream: ```python query = fql('Product.all().eventsOn(.price, .stock)') count = 0 with client.stream(query) as stream: for event in stream: print('Stream event', event) # ... count+=1 if (count == 2): stream.close() ``` ### [](#error-handling-2)Error handling If a non-retryable error occurs when opening or processing a stream, Fauna raises a `FaunaException`: ```python import fauna from fauna import fql from fauna.client import Client from fauna.errors import FaunaException client = Client(secret='FAUNA_SECRET') try: with client.stream(fql( 'Product.all().eventsOn(.price, .stock)' )) as stream: for event in stream: print(event) # ... except FaunaException as e: print('error ocurred with stream: ', e) ``` ### [](#stream-options)Stream options The [Client configuration](#config) sets default options for the `stream()` method. You can pass a `StreamOptions` object to override these defaults: ```python options = StreamOptions( max_attempts=5, max_backoff=1, start_ts=1710968002310000, status_events=True ) client.stream(fql('Product.all().eventSource()'), options) ``` For supported properties, see [StreamOptions](https://fauna.github.io/fauna-python/latest/api/fauna/client/client.html#StreamOptions) in the API reference. ## [](#debug-logging)Debug logging Logging is handled using Python’s standard `logging` package under the `fauna` namespace. Logs include the HTTP request with body (excluding the `Authorization` header) and the full HTTP response. To enable logging: ```python import logging from fauna.client import Client from fauna import fql logging.basicConfig( level=logging.DEBUG ) client = Client() client.query(fql('42')) ``` For configuration options or to set specific log levels, see Python’s [Logging HOWTO](https://docs.python.org/3/howto/logging.html). # Python driver source code # Files ## File: fauna/client/__init__.py ````python from .client import Client, QueryOptions, StreamOptions, FeedOptions, FeedPage, FeedIterator from .endpoints import Endpoints from .headers import Header ```` ## File: fauna/client/client.py ````python import logging from dataclasses import dataclass from datetime import timedelta from typing import Any, Dict, Iterator, Mapping, Optional, Union, List import fauna from fauna.client.headers import _DriverEnvironment, _Header, _Auth, Header from fauna.client.retryable import Retryable from fauna.client.utils import _Environment, LastTxnTs from fauna.encoding import FaunaEncoder, FaunaDecoder from fauna.encoding import QuerySuccess, QueryTags, QueryStats from fauna.errors import FaunaError, ClientError, ProtocolError, \ RetryableFaunaException, NetworkError from fauna.http.http_client import HTTPClient from fauna.query import EventSource, Query, Page, fql logger = logging.getLogger("fauna") DefaultHttpConnectTimeout = timedelta(seconds=5) DefaultHttpReadTimeout: Optional[timedelta] = None DefaultHttpWriteTimeout = timedelta(seconds=5) DefaultHttpPoolTimeout = timedelta(seconds=5) DefaultIdleConnectionTimeout = timedelta(seconds=5) DefaultQueryTimeout = timedelta(seconds=5) DefaultClientBufferTimeout = timedelta(seconds=5) DefaultMaxConnections = 20 DefaultMaxIdleConnections = 20 @dataclass class QueryOptions: """ A dataclass representing options available for a query. * linearized - If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. * max_contention_retries - The max number of times to retry the query if contention is encountered. * query_timeout - Controls the maximum amount of time Fauna will execute your query before marking it failed. * query_tags - Tags to associate with the query. See `logging `_ * traceparent - A traceparent to associate with the query. See `logging `_ Must match format: https://www.w3.org/TR/trace-context/#traceparent-header * typecheck - Enable or disable typechecking of the query before evaluation. If not set, the value configured on the Client will be used. If neither is set, Fauna will use the value of the "typechecked" flag on the database configuration. * additional_headers - Add/update HTTP request headers for the query. In general, this should not be necessary. """ linearized: Optional[bool] = None max_contention_retries: Optional[int] = None query_timeout: Optional[timedelta] = DefaultQueryTimeout query_tags: Optional[Mapping[str, str]] = None traceparent: Optional[str] = None typecheck: Optional[bool] = None additional_headers: Optional[Dict[str, str]] = None @dataclass class StreamOptions: """ A dataclass representing options available for a stream. * max_attempts - The maximum number of times to attempt a stream query when a retryable exception is thrown. * max_backoff - The maximum backoff in seconds for an individual retry. * start_ts - The starting timestamp of the stream, exclusive. If set, Fauna will return events starting after the timestamp. * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. * status_events - Indicates if stream should include status events. Status events are periodic events that update the client with the latest valid timestamp (in the event of a dropped connection) as well as metrics about the cost of maintaining the stream other than the cost of the received events. """ max_attempts: Optional[int] = None max_backoff: Optional[int] = None start_ts: Optional[int] = None cursor: Optional[str] = None status_events: bool = False @dataclass class FeedOptions: """ A dataclass representing options available for an event feed. * max_attempts - The maximum number of times to attempt an event feed query when a retryable exception is thrown. * max_backoff - The maximum backoff in seconds for an individual retry. * query_timeout - Controls the maximum amount of time Fauna will execute a query before returning a page of events. * start_ts - The starting timestamp of the event feed, exclusive. If set, Fauna will return events starting after the timestamp. * cursor - The starting event cursor, exclusive. If set, Fauna will return events starting after the cursor. * page_size - Maximum number of events returned per page. Must be in the range 1 to 16000 (inclusive). Defaults to 16. """ max_attempts: Optional[int] = None max_backoff: Optional[int] = None query_timeout: Optional[timedelta] = None page_size: Optional[int] = None start_ts: Optional[int] = None cursor: Optional[str] = None class Client: def __init__( self, endpoint: Optional[str] = None, secret: Optional[str] = None, http_client: Optional[HTTPClient] = None, query_tags: Optional[Mapping[str, str]] = None, linearized: Optional[bool] = None, max_contention_retries: Optional[int] = None, typecheck: Optional[bool] = None, additional_headers: Optional[Dict[str, str]] = None, query_timeout: Optional[timedelta] = DefaultQueryTimeout, client_buffer_timeout: Optional[timedelta] = DefaultClientBufferTimeout, http_read_timeout: Optional[timedelta] = DefaultHttpReadTimeout, http_write_timeout: Optional[timedelta] = DefaultHttpWriteTimeout, http_connect_timeout: Optional[timedelta] = DefaultHttpConnectTimeout, http_pool_timeout: Optional[timedelta] = DefaultHttpPoolTimeout, http_idle_timeout: Optional[timedelta] = DefaultIdleConnectionTimeout, max_attempts: int = 3, max_backoff: int = 20, ): """Initializes a Client. :param endpoint: The Fauna Endpoint to use. Defaults to https://db.fauna.com, or the `FAUNA_ENDPOINT` env variable. :param secret: The Fauna Secret to use. Defaults to empty, or the `FAUNA_SECRET` env variable. :param http_client: An :class:`HTTPClient` implementation. Defaults to a global :class:`HTTPXClient`. :param query_tags: Tags to associate with the query. See `logging `_ :param linearized: If true, unconditionally run the query as strictly serialized. This affects read-only transactions. Transactions which write will always be strictly serialized. :param max_contention_retries: The max number of times to retry the query if contention is encountered. :param typecheck: Enable or disable typechecking of the query before evaluation. If not set, Fauna will use the value of the "typechecked" flag on the database configuration. :param additional_headers: Add/update HTTP request headers for the query. In general, this should not be necessary. :param query_timeout: Controls the maximum amount of time Fauna will execute your query before marking it failed, default is :py:data:`DefaultQueryTimeout`. :param client_buffer_timeout: Time in milliseconds beyond query_timeout at which the client will abort a request if it has not received a response. The default is :py:data:`DefaultClientBufferTimeout`, which should account for network latency for most clients. The value must be greater than zero. The closer to zero the value is, the more likely the client is to abort the request before the server can report a legitimate response or error. :param http_read_timeout: Set HTTP Read timeout, default is :py:data:`DefaultHttpReadTimeout`. :param http_write_timeout: Set HTTP Write timeout, default is :py:data:`DefaultHttpWriteTimeout`. :param http_connect_timeout: Set HTTP Connect timeout, default is :py:data:`DefaultHttpConnectTimeout`. :param http_pool_timeout: Set HTTP Pool timeout, default is :py:data:`DefaultHttpPoolTimeout`. :param http_idle_timeout: Set HTTP Idle timeout, default is :py:data:`DefaultIdleConnectionTimeout`. :param max_attempts: The maximum number of times to attempt a query when a retryable exception is thrown. Defaults to 3. :param max_backoff: The maximum backoff in seconds for an individual retry. Defaults to 20. """ self._set_endpoint(endpoint) self._max_attempts = max_attempts self._max_backoff = max_backoff if secret is None: self._auth = _Auth(_Environment.EnvFaunaSecret()) else: self._auth = _Auth(secret) self._last_txn_ts = LastTxnTs() self._query_tags = {} if query_tags is not None: self._query_tags.update(query_tags) if query_timeout is not None: self._query_timeout_ms = int(query_timeout.total_seconds() * 1000) else: self._query_timeout_ms = None self._headers: Dict[str, str] = { _Header.AcceptEncoding: "gzip", _Header.ContentType: "application/json;charset=utf-8", _Header.Driver: "python", _Header.DriverEnv: str(_DriverEnvironment()), } if typecheck is not None: self._headers[Header.Typecheck] = str(typecheck).lower() if linearized is not None: self._headers[Header.Linearized] = str(linearized).lower() if max_contention_retries is not None and max_contention_retries > 0: self._headers[Header.MaxContentionRetries] = \ f"{max_contention_retries}" if additional_headers is not None: self._headers = { **self._headers, **additional_headers, } self._session: HTTPClient if http_client is not None: self._session = http_client else: if fauna.global_http_client is None: timeout_s: Optional[float] = None if query_timeout is not None and client_buffer_timeout is not None: timeout_s = (query_timeout + client_buffer_timeout).total_seconds() read_timeout_s: Optional[float] = None if http_read_timeout is not None: read_timeout_s = http_read_timeout.total_seconds() write_timeout_s: Optional[float] = http_write_timeout.total_seconds( ) if http_write_timeout is not None else None connect_timeout_s: Optional[float] = http_connect_timeout.total_seconds( ) if http_connect_timeout is not None else None pool_timeout_s: Optional[float] = http_pool_timeout.total_seconds( ) if http_pool_timeout is not None else None idle_timeout_s: Optional[float] = http_idle_timeout.total_seconds( ) if http_idle_timeout is not None else None import httpx from fauna.http.httpx_client import HTTPXClient c = HTTPXClient( httpx.Client( http1=True, http2=False, timeout=httpx.Timeout( timeout=timeout_s, connect=connect_timeout_s, read=read_timeout_s, write=write_timeout_s, pool=pool_timeout_s, ), limits=httpx.Limits( max_connections=DefaultMaxConnections, max_keepalive_connections=DefaultMaxIdleConnections, keepalive_expiry=idle_timeout_s, ), ), logger) fauna.global_http_client = c self._session = fauna.global_http_client def close(self): self._session.close() if self._session == fauna.global_http_client: fauna.global_http_client = None def set_last_txn_ts(self, txn_ts: int): """ Set the last timestamp seen by this client. This has no effect if earlier than stored timestamp. .. WARNING:: This should be used only when coordinating timestamps across multiple clients. Moving the timestamp arbitrarily forward into the future will cause transactions to stall. :param txn_ts: the new transaction time. """ self._last_txn_ts.update_txn_time(txn_ts) def get_last_txn_ts(self) -> Optional[int]: """ Get the last timestamp seen by this client. :return: """ return self._last_txn_ts.time def get_query_timeout(self) -> Optional[timedelta]: """ Get the query timeout for all queries. """ if self._query_timeout_ms is not None: return timedelta(milliseconds=self._query_timeout_ms) else: return None def paginate( self, fql: Query, opts: Optional[QueryOptions] = None, ) -> "QueryIterator": """ Run a query on Fauna and returning an iterator of results. If the query returns a Page, the iterator will fetch additional Pages until the after token is null. Each call for a page will be retried with exponential backoff up to the max_attempts set in the client's retry policy in the event of a 429 or 502. :param fql: A Query :param opts: (Optional) Query Options :return: a :class:`QueryResponse` :raises NetworkError: HTTP Request failed in transit :raises ProtocolError: HTTP error not from Fauna :raises ServiceError: Fauna returned an error :raises ValueError: Encoding and decoding errors :raises TypeError: Invalid param types """ if not isinstance(fql, Query): err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ f"Query by calling fauna.fql()" raise TypeError(err_msg) return QueryIterator(self, fql, opts) def query( self, fql: Query, opts: Optional[QueryOptions] = None, ) -> QuerySuccess: """ Run a query on Fauna. A query will be retried max_attempt times with exponential backoff up to the max_backoff in the event of a 429. :param fql: A Query :param opts: (Optional) Query Options :return: a :class:`QueryResponse` :raises NetworkError: HTTP Request failed in transit :raises ProtocolError: HTTP error not from Fauna :raises ServiceError: Fauna returned an error :raises ValueError: Encoding and decoding errors :raises TypeError: Invalid param types """ if not isinstance(fql, Query): err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ f"Query by calling fauna.fql()" raise TypeError(err_msg) try: encoded_query: Mapping[str, Any] = FaunaEncoder.encode(fql) except Exception as e: raise ClientError("Failed to encode Query") from e retryable = Retryable[QuerySuccess]( self._max_attempts, self._max_backoff, self._query, "/query/1", fql=encoded_query, opts=opts, ) r = retryable.run() r.response.stats.attempts = r.attempts return r.response def _query( self, path: str, fql: Mapping[str, Any], arguments: Optional[Mapping[str, Any]] = None, opts: Optional[QueryOptions] = None, ) -> QuerySuccess: headers = self._headers.copy() headers[_Header.Format] = "tagged" headers[_Header.Authorization] = self._auth.bearer() if self._query_timeout_ms is not None: headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) headers.update(self._last_txn_ts.request_header) query_tags = {} if self._query_tags is not None: query_tags.update(self._query_tags) if opts is not None: if opts.linearized is not None: headers[Header.Linearized] = str(opts.linearized).lower() if opts.max_contention_retries is not None: headers[Header.MaxContentionRetries] = \ f"{opts.max_contention_retries}" if opts.traceparent is not None: headers[Header.Traceparent] = opts.traceparent if opts.query_timeout is not None: timeout_ms = f"{int(opts.query_timeout.total_seconds() * 1000)}" headers[Header.QueryTimeoutMs] = timeout_ms if opts.query_tags is not None: query_tags.update(opts.query_tags) if opts.typecheck is not None: headers[Header.Typecheck] = str(opts.typecheck).lower() if opts.additional_headers is not None: headers.update(opts.additional_headers) if len(query_tags) > 0: headers[Header.Tags] = QueryTags.encode(query_tags) data: dict[str, Any] = { "query": fql, "arguments": arguments or {}, } with self._session.request( method="POST", url=self._endpoint + path, headers=headers, data=data, ) as response: status_code = response.status_code() response_json = response.json() headers = response.headers() self._check_protocol(response_json, status_code) dec: Any = FaunaDecoder.decode(response_json) if status_code > 399: FaunaError.parse_error_and_throw(dec, status_code) if "txn_ts" in dec: self.set_last_txn_ts(int(response_json["txn_ts"])) stats = QueryStats(dec["stats"]) if "stats" in dec else None summary = dec["summary"] if "summary" in dec else None query_tags = QueryTags.decode( dec["query_tags"]) if "query_tags" in dec else None txn_ts = dec["txn_ts"] if "txn_ts" in dec else None schema_version = dec["schema_version"] if "schema_version" in dec else None traceparent = headers.get("traceparent", None) static_type = dec["static_type"] if "static_type" in dec else None return QuerySuccess( data=dec["data"], query_tags=query_tags, static_type=static_type, stats=stats, summary=summary, traceparent=traceparent, txn_ts=txn_ts, schema_version=schema_version, ) def stream( self, fql: Union[EventSource, Query], opts: StreamOptions = StreamOptions() ) -> "StreamIterator": """ Opens a Stream in Fauna and returns an iterator that consume Fauna events. :param fql: An EventSource or a Query that returns an EventSource. :param opts: (Optional) Stream Options. :return: a :class:`StreamIterator` :raises ClientError: Invalid options provided :raises NetworkError: HTTP Request failed in transit :raises ProtocolError: HTTP error not from Fauna :raises ServiceError: Fauna returned an error :raises ValueError: Encoding and decoding errors :raises TypeError: Invalid param types """ if isinstance(fql, Query): if opts.cursor is not None: raise ClientError( "The 'cursor' configuration can only be used with an event source.") source = self.query(fql).data else: source = fql if not isinstance(source, EventSource): err_msg = f"'fql' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." raise TypeError(err_msg) headers = self._headers.copy() headers[_Header.Format] = "tagged" headers[_Header.Authorization] = self._auth.bearer() return StreamIterator(self._session, headers, self._endpoint + "/stream/1", self._max_attempts, self._max_backoff, opts, source) def feed( self, source: Union[EventSource, Query], opts: FeedOptions = FeedOptions(), ) -> "FeedIterator": """ Opens an event feed in Fauna and returns an iterator that consume Fauna events. :param source: An EventSource or a Query that returns an EventSource. :param opts: (Optional) Event feed options. :return: a :class:`FeedIterator` :raises ClientError: Invalid options provided :raises NetworkError: HTTP Request failed in transit :raises ProtocolError: HTTP error not from Fauna :raises ServiceError: Fauna returned an error :raises ValueError: Encoding and decoding errors :raises TypeError: Invalid param types """ if isinstance(source, Query): source = self.query(source).data if not isinstance(source, EventSource): err_msg = f"'source' must be an EventSource, or a Query that returns an EventSource but was a {type(source)}." raise TypeError(err_msg) headers = self._headers.copy() headers[_Header.Format] = "tagged" headers[_Header.Authorization] = self._auth.bearer() if opts.query_timeout is not None: query_timeout_ms = int(opts.query_timeout.total_seconds() * 1000) headers[Header.QueryTimeoutMs] = str(query_timeout_ms) elif self._query_timeout_ms is not None: headers[Header.QueryTimeoutMs] = str(self._query_timeout_ms) return FeedIterator(self._session, headers, self._endpoint + "/feed/1", self._max_attempts, self._max_backoff, opts, source) def _check_protocol(self, response_json: Any, status_code): # TODO: Logic to validate wire protocol belongs elsewhere. should_raise = False # check for QuerySuccess if status_code <= 399 and "data" not in response_json: should_raise = True # check for QueryFailure if status_code > 399: if "error" not in response_json: should_raise = True else: e = response_json["error"] if "code" not in e or "message" not in e: should_raise = True if should_raise: raise ProtocolError( status_code, f"Response is in an unknown format: \n{response_json}", ) def _set_endpoint(self, endpoint): if endpoint is None: endpoint = _Environment.EnvFaunaEndpoint() if endpoint[-1:] == "/": endpoint = endpoint[:-1] self._endpoint = endpoint class StreamIterator: """A class that mixes a ContextManager and an Iterator so we can detected retryable errors.""" def __init__(self, http_client: HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, opts: StreamOptions, source: EventSource): self._http_client = http_client self._headers = headers self._endpoint = endpoint self._max_attempts = max_attempts self._max_backoff = max_backoff self._opts = opts self._source = source self._stream = None self.last_ts = None self.last_cursor = None self._ctx = self._create_stream() if opts.start_ts is not None and opts.cursor is not None: err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the StreamOptions." raise TypeError(err_msg) def __enter__(self): return self def __exit__(self, exc_type, exc_value, exc_traceback): if self._stream is not None: self._stream.close() self._ctx.__exit__(exc_type, exc_value, exc_traceback) return False def __iter__(self): return self def __next__(self): if self._opts.max_attempts is not None: max_attempts = self._opts.max_attempts else: max_attempts = self._max_attempts if self._opts.max_backoff is not None: max_backoff = self._opts.max_backoff else: max_backoff = self._max_backoff retryable = Retryable[Any](max_attempts, max_backoff, self._next_element) return retryable.run().response def _next_element(self): try: if self._stream is None: try: self._stream = self._ctx.__enter__() except Exception: self._retry_stream() if self._stream is not None: event: Any = FaunaDecoder.decode(next(self._stream)) if event["type"] == "error": FaunaError.parse_error_and_throw(event, 400) self.last_ts = event["txn_ts"] self.last_cursor = event.get('cursor') if event["type"] == "start": return self._next_element() if not self._opts.status_events and event["type"] == "status": return self._next_element() return event raise StopIteration except NetworkError: self._retry_stream() def _retry_stream(self): if self._stream is not None: self._stream.close() self._stream = None try: self._ctx = self._create_stream() except Exception: pass raise RetryableFaunaException def _create_stream(self): data: Dict[str, Any] = {"token": self._source.token} if self.last_cursor is not None: data["cursor"] = self.last_cursor elif self._opts.cursor is not None: data["cursor"] = self._opts.cursor elif self._opts.start_ts is not None: data["start_ts"] = self._opts.start_ts return self._http_client.stream( url=self._endpoint, headers=self._headers, data=data) def close(self): if self._stream is not None: self._stream.close() class FeedPage: def __init__(self, events: List[Any], cursor: str, stats: QueryStats): self._events = events self.cursor = cursor self.stats = stats def __len__(self): return len(self._events) def __iter__(self) -> Iterator[Any]: for event in self._events: if event["type"] == "error": FaunaError.parse_error_and_throw(event, 400) yield event class FeedIterator: """A class to provide an iterator on top of event feed pages.""" def __init__(self, http: HTTPClient, headers: Dict[str, str], endpoint: str, max_attempts: int, max_backoff: int, opts: FeedOptions, source: EventSource): self._http = http self._headers = headers self._endpoint = endpoint self._max_attempts = opts.max_attempts or max_attempts self._max_backoff = opts.max_backoff or max_backoff self._request: Dict[str, Any] = {"token": source.token} self._is_done = False if opts.start_ts is not None and opts.cursor is not None: err_msg = "Only one of 'start_ts' or 'cursor' can be defined in the FeedOptions." raise TypeError(err_msg) if opts.page_size is not None: self._request["page_size"] = opts.page_size if opts.cursor is not None: self._request["cursor"] = opts.cursor elif opts.start_ts is not None: self._request["start_ts"] = opts.start_ts def __iter__(self) -> Iterator[FeedPage]: self._is_done = False return self def __next__(self) -> FeedPage: if self._is_done: raise StopIteration retryable = Retryable[Any](self._max_attempts, self._max_backoff, self._next_page) return retryable.run().response def _next_page(self) -> FeedPage: with self._http.request( method="POST", url=self._endpoint, headers=self._headers, data=self._request, ) as response: status_code = response.status_code() decoded: Any = FaunaDecoder.decode(response.json()) if status_code > 399: FaunaError.parse_error_and_throw(decoded, status_code) self._is_done = not decoded["has_next"] self._request["cursor"] = decoded["cursor"] if "start_ts" in self._request: del self._request["start_ts"] return FeedPage(decoded["events"], decoded["cursor"], QueryStats(decoded["stats"])) def flatten(self) -> Iterator: """A generator that yields events instead of pages of events.""" for page in self: for event in page: yield event class QueryIterator: """A class to provider an iterator on top of Fauna queries.""" def __init__(self, client: Client, fql: Query, opts: Optional[QueryOptions] = None): """Initializes the QueryIterator :param fql: A Query :param opts: (Optional) Query Options :raises TypeError: Invalid param types """ if not isinstance(client, Client): err_msg = f"'client' must be a Client but was a {type(client)}. You can build a " \ f"Client by calling fauna.client.Client()" raise TypeError(err_msg) if not isinstance(fql, Query): err_msg = f"'fql' must be a Query but was a {type(fql)}. You can build a " \ f"Query by calling fauna.fql()" raise TypeError(err_msg) self.client = client self.fql = fql self.opts = opts def __iter__(self) -> Iterator: return self.iter() def iter(self) -> Iterator: """ A generator function that immediately fetches and yields the results of the stored query. Yields additional pages on subsequent iterations if they exist """ cursor = None initial_response = self.client.query(self.fql, self.opts) if isinstance(initial_response.data, Page): cursor = initial_response.data.after yield initial_response.data.data while cursor is not None: next_response = self.client.query( fql("Set.paginate(${after})", after=cursor), self.opts) # TODO: `Set.paginate` does not yet return a `@set` tagged value # so we will get back a plain object that might not have # an after property. cursor = next_response.data.get("after") yield next_response.data.get("data") else: yield [initial_response.data] def flatten(self) -> Iterator: """ A generator function that immediately fetches and yields the results of the stored query. Yields each item individually, rather than a whole Page at a time. Fetches additional pages as required if they exist. """ for page in self.iter(): for item in page: yield item ```` ## File: fauna/client/endpoints.py ````python class Endpoints: Default = "https://db.fauna.com" Local = "http://localhost:8443" ```` ## File: fauna/client/headers.py ````python import os import platform import sys from dataclasses import dataclass from typing import Callable from fauna import __version__ class Header: LastTxnTs = "X-Last-Txn-Ts" Linearized = "X-Linearized" MaxContentionRetries = "X-Max-Contention-Retries" QueryTimeoutMs = "X-Query-Timeout-Ms" Typecheck = "X-Typecheck" Tags = "X-Query-Tags" Traceparent = "Traceparent" class _Header: AcceptEncoding = "Accept-Encoding" Authorization = "Authorization" ContentType = "Content-Type" Driver = "X-Driver" DriverEnv = "X-Driver-Env" Format = "X-Format" class _Auth: """Creates an auth helper object""" def bearer(self): return "Bearer {}".format(self.secret) def __init__(self, secret): self.secret = secret def __eq__(self, other): return self.secret == getattr(other, 'secret', None) def __ne__(self, other): return not self == other class _DriverEnvironment: def __init__(self): self.pythonVersion = "{0}.{1}.{2}-{3}".format(*sys.version_info) self.driverVersion = __version__ self.env = self._get_runtime_env() self.os = "{0}-{1}".format(platform.system(), platform.release()) @staticmethod def _get_runtime_env(): @dataclass class EnvChecker: name: str check: Callable[[], bool] env: list[EnvChecker] = [ EnvChecker( name="Netlify", check=lambda: "NETLIFY_IMAGES_CDN_DOMAIN" in os.environ, ), EnvChecker( name="Vercel", check=lambda: "VERCEL" in os.environ, ), EnvChecker( name="Heroku", check=lambda: "PATH" in \ os.environ and ".heroku" in os.environ["PATH"], ), EnvChecker( name="AWS Lambda", check=lambda: "AWS_LAMBDA_FUNCTION_VERSION" in os.environ, ), EnvChecker( name="GCP Cloud Functions", check=lambda: "_" in \ os.environ and "google" in os.environ["_"], ), EnvChecker( name="GCP Compute Instances", check=lambda: "GOOGLE_CLOUD_PROJECT" in os.environ, ), EnvChecker( name="Azure Cloud Functions", check=lambda: "WEBSITE_FUNCTIONS_AZUREMONITOR_CATEGORIES" in \ os.environ, ), EnvChecker( name="Azure Compute", check=lambda: "ORYX_ENV_TYPE" in os.environ and \ "WEBSITE_INSTANCE_ID" in os.environ and \ os.environ["ORYX_ENV_TYPE"] == "AppService", ), ] try: recognized = next(e for e in env if e.check()) if recognized is not None: return recognized.name except: return "Unknown" def __str__(self): return "driver=python-{0}; runtime=python-{1} env={2}; os={3}".format( self.driverVersion, self.pythonVersion, self.env, self.os).lower() ```` ## File: fauna/client/retryable.py ````python import abc from dataclasses import dataclass from random import random from time import sleep from typing import Callable, Optional, TypeVar, Generic from fauna.errors import RetryableFaunaException class RetryStrategy: @abc.abstractmethod def wait(self) -> float: pass class ExponentialBackoffStrategy(RetryStrategy): def __init__(self, max_backoff: int): self._max_backoff = float(max_backoff) self._i = 0.0 def wait(self) -> float: """Returns the number of seconds to wait for the next call.""" backoff = random() * (2.0**self._i) self._i += 1.0 return min(backoff, self._max_backoff) T = TypeVar('T') @dataclass class RetryableResponse(Generic[T]): attempts: int response: T class Retryable(Generic[T]): """ Retryable is a wrapper class that acts on a Callable that returns a T type. """ _strategy: RetryStrategy _error: Optional[Exception] def __init__( self, max_attempts: int, max_backoff: int, func: Callable[..., T], *args, **kwargs, ): self._max_attempts = max_attempts self._strategy = ExponentialBackoffStrategy(max_backoff) self._func = func self._args = args self._kwargs = kwargs self._error = None def run(self) -> RetryableResponse[T]: """Runs the wrapped function. Retries up to max_attempts if the function throws a RetryableFaunaException. It propagates the thrown exception if max_attempts is reached or if a non-retryable is thrown. Returns the number of attempts and the response """ attempt = 0 while True: sleep_time = 0.0 if attempt == 0 else self._strategy.wait() sleep(sleep_time) try: attempt += 1 qs = self._func(*self._args, **self._kwargs) return RetryableResponse[T](attempt, qs) except RetryableFaunaException as e: if attempt >= self._max_attempts: raise e ```` ## File: fauna/client/utils.py ````python import os import threading from typing import Generic, Callable, TypeVar, Optional from fauna.client.endpoints import Endpoints from fauna.client.headers import Header def _fancy_bool_from_str(val: str) -> bool: return val.lower() in ["1", "true", "yes", "y"] class LastTxnTs(object): """Wraps tracking the last transaction time supplied from the database.""" def __init__( self, time: Optional[int] = None, ): self._lock: threading.Lock = threading.Lock() self._time: Optional[int] = time @property def time(self): """Produces the last transaction time, or, None if not yet updated.""" with self._lock: return self._time @property def request_header(self): """Produces a dictionary with a non-zero `X-Last-Seen-Txn` header; or, if one has not yet been set, the empty header dictionary.""" t = self._time if t is None: return {} return {Header.LastTxnTs: str(t)} def update_txn_time(self, new_txn_time: int): """Updates the internal transaction time. In order to maintain a monotonically-increasing value, `newTxnTime` is discarded if it is behind the current timestamp.""" with self._lock: self._time = max(self._time or 0, new_txn_time) T = TypeVar('T') class _SettingFromEnviron(Generic[T]): def __init__( self, var_name: str, default_value: str, adapt_from_str: Callable[[str], T], ): self.__var_name = var_name self.__default_value = default_value self.__adapt_from_str = adapt_from_str def __call__(self) -> T: return self.__adapt_from_str( os.environ.get( self.__var_name, default=self.__default_value, )) class _Environment: EnvFaunaEndpoint = _SettingFromEnviron( "FAUNA_ENDPOINT", Endpoints.Default, str, ) """environment variable for Fauna Client HTTP endpoint""" EnvFaunaSecret = _SettingFromEnviron( "FAUNA_SECRET", "", str, ) """environment variable for Fauna Client authentication""" ```` ## File: fauna/encoding/__init__.py ````python from .decoder import FaunaDecoder from .encoder import FaunaEncoder from .wire_protocol import ConstraintFailure, QueryTags, QueryInfo, QueryStats, QuerySuccess ```` ## File: fauna/encoding/decoder.py ````python import base64 from typing import Any, List, Union from iso8601 import parse_date from fauna.query.models import Module, DocumentReference, Document, NamedDocument, NamedDocumentReference, Page, \ NullDocument, EventSource class FaunaDecoder: """Supports the following types: +--------------------+---------------+ | Python | Fauna | +====================+===============+ | dict | object | +--------------------+---------------+ | list, tuple | array | +--------------------+---------------+ | str | string | +--------------------+---------------+ | int | @int | +--------------------+---------------+ | int | @long | +--------------------+---------------+ | float | @double | +--------------------+---------------+ | datetime.datetime | @time | +--------------------+---------------+ | datetime.date | @date | +--------------------+---------------+ | True | true | +--------------------+---------------+ | False | false | +--------------------+---------------+ | None | null | +--------------------+---------------+ | bytearray | @bytes | +--------------------+---------------+ | *DocumentReference | @ref | +--------------------+---------------+ | *Document | @doc | +--------------------+---------------+ | Module | @mod | +--------------------+---------------+ | Page | @set | +--------------------+---------------+ | EventSource | @stream | +--------------------+---------------+ """ @staticmethod def decode(obj: Any): """Decodes supported objects from the tagged typed into untagged. Examples: - { "@int": "100" } decodes to 100 of type int - { "@double": "100" } decodes to 100.0 of type float - { "@long": "100" } decodes to 100 of type int - { "@time": "..." } decodes to a datetime - { "@date": "..." } decodes to a date - { "@doc": ... } decodes to a Document or NamedDocument - { "@ref": ... } decodes to a DocumentReference or NamedDocumentReference - { "@mod": ... } decodes to a Module - { "@set": ... } decodes to a Page - { "@stream": ... } decodes to an EventSource - { "@bytes": ... } decodes to a bytearray :param obj: the object to decode """ return FaunaDecoder._decode(obj) @staticmethod def _decode(o: Any, escaped: bool = False): if isinstance(o, (str, bool, int, float)): return o elif isinstance(o, list): return FaunaDecoder._decode_list(o) elif isinstance(o, dict): return FaunaDecoder._decode_dict(o, escaped) @staticmethod def _decode_list(lst: List): return [FaunaDecoder._decode(i) for i in lst] @staticmethod def _decode_dict(dct: dict, escaped: bool): keys = dct.keys() # If escaped, everything is user-specified if escaped: return {k: FaunaDecoder._decode(v) for k, v in dct.items()} if len(keys) == 1: if "@int" in keys: return int(dct["@int"]) if "@long" in keys: return int(dct["@long"]) if "@double" in dct: return float(dct["@double"]) if "@object" in dct: return FaunaDecoder._decode(dct["@object"], True) if "@mod" in dct: return Module(dct["@mod"]) if "@time" in dct: return parse_date(dct["@time"]) if "@date" in dct: return parse_date(dct["@date"]).date() if "@bytes" in dct: bts = base64.b64decode(dct["@bytes"]) return bytearray(bts) if "@doc" in dct: value = dct["@doc"] if isinstance(value, str): # Not distinguishing between DocumentReference and NamedDocumentReference because this shouldn't # be an issue much longer return DocumentReference.from_string(value) contents = FaunaDecoder._decode(value) if "id" in contents and "coll" in contents and "ts" in contents: doc_id = contents.pop("id") doc_coll = contents.pop("coll") doc_ts = contents.pop("ts") return Document( id=doc_id, coll=doc_coll, ts=doc_ts, data=contents, ) elif "name" in contents and "coll" in contents and "ts" in contents: doc_name = contents.pop("name") doc_coll = contents.pop("coll") doc_ts = contents.pop("ts") return NamedDocument( name=doc_name, coll=doc_coll, ts=doc_ts, data=contents, ) else: # Unsupported document reference. Return the unwrapped value to futureproof. return contents if "@ref" in dct: value = dct["@ref"] if "id" not in value and "name" not in value: # Unsupported document reference. Return the unwrapped value to futureproof. return value col = FaunaDecoder._decode(value["coll"]) doc_ref: Union[DocumentReference, NamedDocumentReference] if "id" in value: doc_ref = DocumentReference(col, value["id"]) else: doc_ref = NamedDocumentReference(col, value["name"]) if "exists" in value and not value["exists"]: cause = value["cause"] if "cause" in value else None return NullDocument(doc_ref, cause) return doc_ref if "@set" in dct: value = dct["@set"] if isinstance(value, str): return Page(after=value) after = value["after"] if "after" in value else None data = FaunaDecoder._decode(value["data"]) if "data" in value else None return Page(data=data, after=after) if "@stream" in dct: return EventSource(dct["@stream"]) return {k: FaunaDecoder._decode(v) for k, v in dct.items()} ```` ## File: fauna/encoding/encoder.py ````python import base64 from datetime import datetime, date from typing import Any, Optional, List, Union from fauna.query.models import DocumentReference, Module, Document, NamedDocument, NamedDocumentReference, NullDocument, \ EventSource from fauna.query.query_builder import Query, Fragment, LiteralFragment, ValueFragment _RESERVED_TAGS = [ "@date", "@doc", "@double", "@int", "@long", "@mod", "@object", "@ref", "@set", "@time", ] class FaunaEncoder: """Supports the following types: +-------------------------------+---------------+ | Python | Fauna Tags | +===============================+===============+ | dict | @object | +-------------------------------+---------------+ | list, tuple | array | +-------------------------------+---------------+ | str | string | +-------------------------------+---------------+ | int 32-bit signed | @int | +-------------------------------+---------------+ | int 64-bit signed | @long | +-------------------------------+---------------+ | float | @double | +-------------------------------+---------------+ | datetime.datetime | @time | +-------------------------------+---------------+ | datetime.date | @date | +-------------------------------+---------------+ | True | True | +-------------------------------+---------------+ | False | False | +-------------------------------+---------------+ | None | None | +-------------------------------+---------------+ | bytes / bytearray | @bytes | +-------------------------------+---------------+ | *Document | @ref | +-------------------------------+---------------+ | *DocumentReference | @ref | +-------------------------------+---------------+ | Module | @mod | +-------------------------------+---------------+ | Query | fql | +-------------------------------+---------------+ | ValueFragment | value | +-------------------------------+---------------+ | TemplateFragment | string | +-------------------------------+---------------+ | EventSource | string | +-------------------------------+---------------+ """ @staticmethod def encode(obj: Any) -> Any: """Encodes supported objects into the tagged format. Examples: - Up to 32-bit ints encode to { "@int": "..." } - Up to 64-bit ints encode to { "@long": "..." } - Floats encode to { "@double": "..." } - datetime encodes to { "@time": "..." } - date encodes to { "@date": "..." } - DocumentReference encodes to { "@doc": "..." } - Module encodes to { "@mod": "..." } - Query encodes to { "fql": [...] } - ValueFragment encodes to { "value": } - LiteralFragment encodes to a string - EventSource encodes to a string :raises ValueError: If value cannot be encoded, cannot be encoded safely, or there's a circular reference. :param obj: the object to decode """ return FaunaEncoder._encode(obj) @staticmethod def from_int(obj: int): if -2**31 <= obj <= 2**31 - 1: return {"@int": repr(obj)} elif -2**63 <= obj <= 2**63 - 1: return {"@long": repr(obj)} else: raise ValueError("Precision loss when converting int to Fauna type") @staticmethod def from_bool(obj: bool): return obj @staticmethod def from_float(obj: float): return {"@double": repr(obj)} @staticmethod def from_str(obj: str): return obj @staticmethod def from_datetime(obj: datetime): if obj.utcoffset() is None: raise ValueError("datetimes must be timezone-aware") return {"@time": obj.isoformat(sep="T")} @staticmethod def from_date(obj: date): return {"@date": obj.isoformat()} @staticmethod def from_bytes(obj: Union[bytearray, bytes]): return {"@bytes": base64.b64encode(obj).decode('ascii')} @staticmethod def from_doc_ref(obj: DocumentReference): return {"@ref": {"id": obj.id, "coll": FaunaEncoder.from_mod(obj.coll)}} @staticmethod def from_named_doc_ref(obj: NamedDocumentReference): return {"@ref": {"name": obj.name, "coll": FaunaEncoder.from_mod(obj.coll)}} @staticmethod def from_mod(obj: Module): return {"@mod": obj.name} @staticmethod def from_dict(obj: Any): return {"@object": obj} @staticmethod def from_none(): return None @staticmethod def from_fragment(obj: Fragment): if isinstance(obj, LiteralFragment): return obj.get() elif isinstance(obj, ValueFragment): v = obj.get() if isinstance(v, Query): return FaunaEncoder.from_query_interpolation_builder(v) else: return {"value": FaunaEncoder.encode(v)} else: raise ValueError(f"Unknown fragment type: {type(obj)}") @staticmethod def from_query_interpolation_builder(obj: Query): return {"fql": [FaunaEncoder.from_fragment(f) for f in obj.fragments]} @staticmethod def from_streamtoken(obj: EventSource): return {"@stream": obj.token} @staticmethod def _encode(o: Any, _markers: Optional[List] = None): if _markers is None: _markers = [] if isinstance(o, str): return FaunaEncoder.from_str(o) elif o is None: return FaunaEncoder.from_none() elif o is True: return FaunaEncoder.from_bool(o) elif o is False: return FaunaEncoder.from_bool(o) elif isinstance(o, int): return FaunaEncoder.from_int(o) elif isinstance(o, float): return FaunaEncoder.from_float(o) elif isinstance(o, Module): return FaunaEncoder.from_mod(o) elif isinstance(o, DocumentReference): return FaunaEncoder.from_doc_ref(o) elif isinstance(o, NamedDocumentReference): return FaunaEncoder.from_named_doc_ref(o) elif isinstance(o, datetime): return FaunaEncoder.from_datetime(o) elif isinstance(o, date): return FaunaEncoder.from_date(o) elif isinstance(o, bytearray) or isinstance(o, bytes): return FaunaEncoder.from_bytes(o) elif isinstance(o, Document): return FaunaEncoder.from_doc_ref(DocumentReference(o.coll, o.id)) elif isinstance(o, NamedDocument): return FaunaEncoder.from_named_doc_ref( NamedDocumentReference(o.coll, o.name)) elif isinstance(o, NullDocument): return FaunaEncoder.encode(o.ref) elif isinstance(o, (list, tuple)): return FaunaEncoder._encode_list(o, _markers) elif isinstance(o, dict): return FaunaEncoder._encode_dict(o, _markers) elif isinstance(o, Query): return FaunaEncoder.from_query_interpolation_builder(o) elif isinstance(o, EventSource): return FaunaEncoder.from_streamtoken(o) else: raise ValueError(f"Object {o} of type {type(o)} cannot be encoded") @staticmethod def _encode_list(lst, markers): _id = id(lst) if _id in markers: raise ValueError("Circular reference detected") markers.append(id(lst)) res = [FaunaEncoder._encode(elem, markers) for elem in lst] markers.pop() return res @staticmethod def _encode_dict(dct, markers): _id = id(dct) if _id in markers: raise ValueError("Circular reference detected") markers.append(id(dct)) if any(i in _RESERVED_TAGS for i in dct.keys()): res = { "@object": { k: FaunaEncoder._encode(v, markers) for k, v in dct.items() } } markers.pop() return res else: res = {k: FaunaEncoder._encode(v, markers) for k, v in dct.items()} markers.pop() return res ```` ## File: fauna/encoding/wire_protocol.py ````python from dataclasses import dataclass from typing import Optional, Mapping, Any, List class QueryStats: """Query stats""" @property def compute_ops(self) -> int: """The amount of Transactional Compute Ops consumed by the query.""" return self._compute_ops @property def read_ops(self) -> int: """The amount of Transactional Read Ops consumed by the query.""" return self._read_ops @property def write_ops(self) -> int: """The amount of Transactional Write Ops consumed by the query.""" return self._write_ops @property def query_time_ms(self) -> int: """The query run time in milliseconds.""" return self._query_time_ms @property def storage_bytes_read(self) -> int: """The amount of data read from storage, in bytes.""" return self._storage_bytes_read @property def storage_bytes_write(self) -> int: """The amount of data written to storage, in bytes.""" return self._storage_bytes_write @property def contention_retries(self) -> int: """The number of times the transaction was retried due to write contention.""" return self._contention_retries @property def attempts(self) -> int: """The number of attempts made by the client to run the query.""" return self._attempts @attempts.setter def attempts(self, value): self._attempts = value def __init__(self, stats: Mapping[str, Any]): self._compute_ops = stats.get("compute_ops", 0) self._read_ops = stats.get("read_ops", 0) self._write_ops = stats.get("write_ops", 0) self._query_time_ms = stats.get("query_time_ms", 0) self._storage_bytes_read = stats.get("storage_bytes_read", 0) self._storage_bytes_write = stats.get("storage_bytes_write", 0) self._contention_retries = stats.get("contention_retries", 0) self._attempts = 0 def __repr__(self): stats = { "compute_ops": self._compute_ops, "read_ops": self._read_ops, "write_ops": self._write_ops, "query_time_ms": self._query_time_ms, "storage_bytes_read": self._storage_bytes_read, "storage_bytes_write": self._storage_bytes_write, "contention_retries": self._contention_retries, "attempts": self._attempts, } return f"{self.__class__.__name__}(stats={repr(stats)})" def __eq__(self, other): return type(self) == type(other) \ and self.compute_ops == other.compute_ops \ and self.read_ops == other.read_ops \ and self.write_ops == other.write_ops \ and self.query_time_ms == other.query_time_ms \ and self.storage_bytes_read == other.storage_bytes_read \ and self.storage_bytes_write == other.storage_bytes_write \ and self.contention_retries == other.contention_retries \ and self.attempts == other.attempts def __ne__(self, other): return not self.__eq__(other) class QueryInfo: @property def query_tags(self) -> Mapping[str, Any]: """The tags associated with the query.""" return self._query_tags @property def summary(self) -> str: """A comprehensive, human readable summary of any errors, warnings and/or logs returned from the query.""" return self._summary @property def stats(self) -> QueryStats: """Query stats associated with the query.""" return self._stats @property def txn_ts(self) -> int: """The last transaction timestamp of the query. A Unix epoch in microseconds.""" return self._txn_ts @property def schema_version(self) -> int: """The schema version that was used for the query execution.""" return self._schema_version def __init__( self, query_tags: Optional[Mapping[str, str]] = None, stats: Optional[QueryStats] = None, summary: Optional[str] = None, txn_ts: Optional[int] = None, schema_version: Optional[int] = None, ): self._query_tags = query_tags or {} self._stats = stats or QueryStats({}) self._summary = summary or "" self._txn_ts = txn_ts or 0 self._schema_version = schema_version or 0 def __repr__(self): return f"{self.__class__.__name__}(" \ f"query_tags={repr(self.query_tags)}," \ f"stats={repr(self.stats)}," \ f"summary={repr(self.summary)}," \ f"txn_ts={repr(self.txn_ts)}," \ f"schema_version={repr(self.schema_version)})" class QuerySuccess(QueryInfo): """The result of the query.""" @property def data(self) -> Any: """The data returned by the query. This is the result of the FQL query.""" return self._data @property def static_type(self) -> Optional[str]: """If typechecked, the query's inferred static result type, if the query was typechecked.""" return self._static_type @property def traceparent(self) -> Optional[str]: """The traceparent for the query.""" return self._traceparent def __init__( self, data: Any, query_tags: Optional[Mapping[str, str]], static_type: Optional[str], stats: Optional[QueryStats], summary: Optional[str], traceparent: Optional[str], txn_ts: Optional[int], schema_version: Optional[int], ): super().__init__( query_tags=query_tags, stats=stats, summary=summary, txn_ts=txn_ts, schema_version=schema_version, ) self._traceparent = traceparent self._static_type = static_type self._data = data def __repr__(self): return f"{self.__class__.__name__}(" \ f"query_tags={repr(self.query_tags)}," \ f"static_type={repr(self.static_type)}," \ f"stats={repr(self.stats)}," \ f"summary={repr(self.summary)}," \ f"traceparent={repr(self.traceparent)}," \ f"txn_ts={repr(self.txn_ts)}," \ f"schema_version={repr(self.schema_version)}," \ f"data={repr(self.data)})" @dataclass class ConstraintFailure: message: str name: Optional[str] = None paths: Optional[List[Any]] = None class QueryTags: @staticmethod def encode(tags: Mapping[str, str]) -> str: return ",".join([f"{k}={v}" for k, v in tags.items()]) @staticmethod def decode(tag_str: str) -> Mapping[str, str]: res: dict[str, str] = {} for pair in tag_str.split(","): kv = pair.split("=") res[kv[0]] = kv[1] return res ```` ## File: fauna/errors/__init__.py ````python from .errors import AuthenticationError, AuthorizationError, QueryCheckError, QueryRuntimeError, \ QueryTimeoutError, ServiceInternalError, ServiceTimeoutError, ThrottlingError, ContendedTransactionError, \ InvalidRequestError, AbortError, RetryableFaunaException from .errors import ClientError, FaunaError, NetworkError from .errors import FaunaException from .errors import ProtocolError, ServiceError ```` ## File: fauna/errors/errors.py ````python from typing import Optional, List, Any, Mapping from fauna.encoding import ConstraintFailure, QueryStats, QueryInfo, QueryTags class FaunaException(Exception): """Base class Fauna Exceptions""" pass class RetryableFaunaException(FaunaException): pass class ClientError(FaunaException): """An error representing a failure internal to the client, itself. This indicates Fauna was never called - the client failed internally prior to sending the request.""" pass class NetworkError(FaunaException): """An error representing a failure due to the network. This indicates Fauna was never reached.""" pass class ProtocolError(FaunaException): """An error representing a HTTP failure - but one not directly emitted by Fauna.""" @property def status_code(self) -> int: return self._status_code @property def message(self) -> str: return self._message def __init__(self, status_code: int, message: str): self._status_code = status_code self._message = message def __str__(self): return f"{self.status_code}: {self.message}" class FaunaError(FaunaException): """Base class Fauna Errors""" @property def status_code(self) -> int: return self._status_code @property def code(self) -> str: return self._code @property def message(self) -> str: return self._message @property def abort(self) -> Optional[Any]: return self._abort @property def constraint_failures(self) -> Optional[List['ConstraintFailure']]: return self._constraint_failures def __init__( self, status_code: int, code: str, message: str, abort: Optional[Any] = None, constraint_failures: Optional[List['ConstraintFailure']] = None, ): self._status_code = status_code self._code = code self._message = message self._abort = abort self._constraint_failures = constraint_failures def __str__(self): return f"{self.status_code}: {self.code}\n{self.message}" @staticmethod def parse_error_and_throw(body: Any, status_code: int): err = body["error"] code = err["code"] message = err["message"] query_tags = QueryTags.decode( body["query_tags"]) if "query_tags" in body else None stats = QueryStats(body["stats"]) if "stats" in body else None txn_ts = body["txn_ts"] if "txn_ts" in body else None schema_version = body["schema_version"] if "schema_version" in body else None summary = body["summary"] if "summary" in body else None constraint_failures: Optional[List[ConstraintFailure]] = None if "constraint_failures" in err: constraint_failures = [ ConstraintFailure( message=cf["message"], name=cf["name"] if "name" in cf else None, paths=cf["paths"] if "paths" in cf else None, ) for cf in err["constraint_failures"] ] if status_code >= 400 and status_code < 500: if code == "invalid_query": raise QueryCheckError( status_code=400, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) elif code == "invalid_request": raise InvalidRequestError( status_code=400, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) elif code == "abort": abort = err["abort"] if "abort" in err else None raise AbortError( status_code=400, code=code, message=message, summary=summary, abort=abort, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) elif code == "unauthorized": raise AuthenticationError( status_code=401, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) elif code == "forbidden" and status_code == 403: raise AuthorizationError( status_code=403, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) elif code == "method_not_allowed": raise QueryRuntimeError( status_code=405, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) elif code == "conflict": raise ContendedTransactionError( status_code=409, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) elif code == "request_size_exceeded": raise QueryRuntimeError( status_code=413, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) elif code == "limit_exceeded": raise ThrottlingError( status_code=429, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) elif code == "time_out": raise QueryTimeoutError( status_code=440, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) else: raise QueryRuntimeError( status_code=status_code, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) elif status_code == 500: raise ServiceInternalError( status_code=status_code, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) elif status_code == 503: raise ServiceTimeoutError( status_code=status_code, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) else: raise ServiceError( status_code=status_code, code=code, message=message, summary=summary, constraint_failures=constraint_failures, query_tags=query_tags, stats=stats, txn_ts=txn_ts, schema_version=schema_version, ) class ServiceError(FaunaError, QueryInfo): """An error representing a query failure returned by Fauna.""" def __init__( self, status_code: int, code: str, message: str, summary: Optional[str] = None, abort: Optional[Any] = None, constraint_failures: Optional[List['ConstraintFailure']] = None, query_tags: Optional[Mapping[str, str]] = None, stats: Optional[QueryStats] = None, txn_ts: Optional[int] = None, schema_version: Optional[int] = None, ): QueryInfo.__init__( self, query_tags=query_tags, stats=stats, summary=summary, txn_ts=txn_ts, schema_version=schema_version, ) FaunaError.__init__( self, status_code=status_code, code=code, message=message, abort=abort, constraint_failures=constraint_failures, ) def __str__(self): constraint_str = "---" if self._constraint_failures: constraint_str = f"---\nconstraint failures: {self._constraint_failures}\n---" return f"{self._status_code}: {self.code}\n{self.message}\n{constraint_str}\n{self.summary or ''}" class AbortError(ServiceError): pass class InvalidRequestError(ServiceError): pass class QueryCheckError(ServiceError): """An error due to a "compile-time" check of the query failing.""" pass class ContendedTransactionError(ServiceError): """Transaction is aborted due to concurrent modification.""" pass class QueryRuntimeError(ServiceError): """An error response that is the result of the query failing during execution. QueryRuntimeError's occur when a bug in your query causes an invalid execution to be requested. The 'code' field will vary based on the specific error cause.""" pass class AuthenticationError(ServiceError): """AuthenticationError indicates invalid credentials were used.""" pass class AuthorizationError(ServiceError): """AuthorizationError indicates the credentials used do not have permission to perform the requested action.""" pass class ThrottlingError(ServiceError, RetryableFaunaException): """ThrottlingError indicates some capacity limit was exceeded and thus the request could not be served.""" pass class QueryTimeoutError(ServiceError): """A failure due to the timeout being exceeded, but the timeout was set lower than the query's expected processing time. This response is distinguished from a ServiceTimeoutException in that a QueryTimeoutError shows Fauna behaving in an expected manner.""" pass class ServiceInternalError(ServiceError): """ServiceInternalError indicates Fauna failed unexpectedly.""" pass class ServiceTimeoutError(ServiceError): """ServiceTimeoutError indicates Fauna was not available to service the request before the timeout was reached.""" pass ```` ## File: fauna/http/__init__.py ````python from .http_client import HTTPClient, HTTPResponse from .httpx_client import HTTPXClient ```` ## File: fauna/http/http_client.py ````python import abc import contextlib from dataclasses import dataclass from typing import Iterator, Mapping, Any @dataclass(frozen=True) class ErrorResponse: status_code: int error_code: str error_message: str summary: str class HTTPResponse(abc.ABC): @abc.abstractmethod def headers(self) -> Mapping[str, str]: pass @abc.abstractmethod def status_code(self) -> int: pass @abc.abstractmethod def json(self) -> Any: pass @abc.abstractmethod def text(self) -> str: pass @abc.abstractmethod def read(self) -> bytes: pass @abc.abstractmethod def iter_bytes(self) -> Iterator[bytes]: pass @abc.abstractmethod def close(self): pass def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() class HTTPClient(abc.ABC): @abc.abstractmethod def request( self, method: str, url: str, headers: Mapping[str, str], data: Mapping[str, Any], ) -> HTTPResponse: pass @abc.abstractmethod @contextlib.contextmanager def stream( self, url: str, headers: Mapping[str, str], data: Mapping[str, Any], ) -> Iterator[Any]: pass @abc.abstractmethod def close(self): pass ```` ## File: fauna/http/httpx_client.py ````python import json import logging from contextlib import contextmanager from json import JSONDecodeError from typing import Mapping, Any, Optional, Iterator import httpx from fauna.errors import ClientError, NetworkError from fauna.http.http_client import HTTPResponse, HTTPClient class HTTPXResponse(HTTPResponse): def __init__(self, response: httpx.Response): self._r = response def headers(self) -> Mapping[str, str]: h = {} for (k, v) in self._r.headers.items(): h[k] = v return h def json(self) -> Any: try: decoded = self._r.read().decode("utf-8") return json.loads(decoded) except (JSONDecodeError, UnicodeDecodeError) as e: raise ClientError( f"Unable to decode response from endpoint {self._r.request.url}. Check that your endpoint is valid." ) from e def text(self) -> str: return str(self.read(), encoding='utf-8') def status_code(self) -> int: return self._r.status_code def read(self) -> bytes: return self._r.read() def iter_bytes(self, size: Optional[int] = None) -> Iterator[bytes]: return self._r.iter_bytes(size) def close(self) -> None: try: self._r.close() except Exception as e: raise ClientError("Error closing response") from e class HTTPXClient(HTTPClient): def __init__(self, client: httpx.Client, logger: logging.Logger = logging.getLogger("fauna")): super(HTTPXClient, self).__init__() self._c = client self._logger = logger def request( self, method: str, url: str, headers: Mapping[str, str], data: Mapping[str, Any], ) -> HTTPResponse: try: request = self._c.build_request( method, url, json=data, headers=headers, ) if self._logger.isEnabledFor(logging.DEBUG): headers_to_log = request.headers.copy() headers_to_log.pop("Authorization") self._logger.debug( f"query.request method={request.method} url={request.url} headers={headers_to_log} data={data}" ) except httpx.InvalidURL as e: raise ClientError("Invalid URL Format") from e try: response = self._c.send( request, stream=False, ) if self._logger.isEnabledFor(logging.DEBUG): self._logger.debug( f"query.response status_code={response.status_code} headers={response.headers} data={response.text}" ) return HTTPXResponse(response) except (httpx.HTTPError, httpx.InvalidURL) as e: raise NetworkError("Exception re-raised from HTTP request") from e @contextmanager def stream( self, url: str, headers: Mapping[str, str], data: Mapping[str, Any], ) -> Iterator[Any]: request = self._c.build_request( method="POST", url=url, headers=headers, json=data, ) if self._logger.isEnabledFor(logging.DEBUG): headers_to_log = request.headers.copy() headers_to_log.pop("Authorization") self._logger.debug( f"stream.request method={request.method} url={request.url} headers={headers_to_log} data={data}" ) response = self._c.send( request=request, stream=True, ) try: yield self._transform(response) finally: response.close() def _transform(self, response): try: for line in response.iter_lines(): loaded = json.loads(line) if self._logger.isEnabledFor(logging.DEBUG): self._logger.debug(f"stream.data data={loaded}") yield loaded except httpx.ReadTimeout as e: raise NetworkError("Stream timeout") from e except (httpx.HTTPError, httpx.InvalidURL) as e: raise NetworkError("Exception re-raised from HTTP request") from e def close(self): self._c.close() ```` ## File: fauna/query/__init__.py ````python from .models import Document, DocumentReference, EventSource, NamedDocument, NamedDocumentReference, NullDocument, Module, Page from .query_builder import fql, Query ```` ## File: fauna/query/models.py ````python import warnings from collections.abc import Mapping from datetime import datetime from typing import Union, Iterator, Any, Optional, List # NB. Override __getattr__ and __dir__ to deprecate StreamToken usages. Based # on: https://peps.python.org/pep-0562/ def __getattr__(name): if name == "StreamToken": warnings.warn( "StreamToken is deprecated. Prefer fauna.query.EventSource instead.", DeprecationWarning, stacklevel=2) return EventSource return super.__getattr__(name) # pyright: ignore def __dir__(): return list(super.__dir__(None)) + list("StreamToken") # pyright: ignore class Page: """A class representing a Set in Fauna.""" def __init__(self, data: Optional[List[Any]] = None, after: Optional[str] = None): self.data = data self.after = after def __repr__(self): args = [] if self.data is not None: args.append(f"data={repr(self.data)}") if self.after is not None: args.append(f"after={repr(self.after)}") return f"{self.__class__.__name__}({','.join(args)})" def __iter__(self) -> Iterator[Any]: return iter(self.data or []) def __eq__(self, other): return isinstance( other, Page) and self.data == other.data and self.after == other.after def __hash__(self): return hash((type(self), self.data, self.after)) def __ne__(self, other): return not self.__eq__(other) class EventSource: """A class represeting an EventSource in Fauna.""" def __init__(self, token: str): self.token = token def __eq__(self, other): return isinstance(other, EventSource) and self.token == other.token def __hash__(self): return hash(self.token) class Module: """A class representing a Module in Fauna. Examples of modules include Collection, Math, and a user-defined collection, among others. Usage: dogs = Module("Dogs") query = fql("${col}.all", col=dogs) """ def __init__(self, name: str): self.name = name def __repr__(self): return f"{self.__class__.__name__}(name={repr(self.name)})" def __eq__(self, other): return isinstance(other, Module) and str(self) == str(other) def __hash__(self): return hash(self.name) class BaseReference: _collection: Module @property def coll(self) -> Module: return self._collection def __init__(self, coll: Union[str, Module]): if isinstance(coll, Module): self._collection = coll elif isinstance(coll, str): self._collection = Module(coll) else: raise TypeError( f"'coll' should be of type Module or str, but was {type(coll)}") def __repr__(self): return f"{self.__class__.__name__}(coll={repr(self._collection)})" def __eq__(self, other): return isinstance(other, type(self)) and str(self) == str(other) class DocumentReference(BaseReference): """A class representing a reference to a :class:`Document` stored in Fauna. """ @property def id(self) -> str: """The ID for the :class:`Document`. Valid IDs are 64-bit integers, stored as strings. :rtype: str """ return self._id def __init__(self, coll: Union[str, Module], id: str): super().__init__(coll) if not isinstance(id, str): raise TypeError(f"'id' should be of type str, but was {type(id)}") self._id = id def __hash__(self): return hash((type(self), self._collection, self._id)) def __repr__(self): return f"{self.__class__.__name__}(id={repr(self._id)},coll={repr(self._collection)})" @staticmethod def from_string(ref: str): rs = ref.split(":") if len(rs) != 2: raise ValueError("Expects string of format :") return DocumentReference(rs[0], rs[1]) class NamedDocumentReference(BaseReference): """A class representing a reference to a :class:`NamedDocument` stored in Fauna. """ @property def name(self) -> str: """The name of the :class:`NamedDocument`. :rtype: str """ return self._name def __init__(self, coll: Union[str, Module], name: str): super().__init__(coll) if not isinstance(name, str): raise TypeError(f"'name' should be of type str, but was {type(name)}") self._name = name def __hash__(self): return hash((type(self), self._collection, self._name)) def __repr__(self): return f"{self.__class__.__name__}(name={repr(self._name)},coll={repr(self._collection)})" class NullDocument: @property def cause(self) -> Optional[str]: return self._cause @property def ref(self) -> Union[DocumentReference, NamedDocumentReference]: return self._ref def __init__( self, ref: Union[DocumentReference, NamedDocumentReference], cause: Optional[str] = None, ): self._cause = cause self._ref = ref def __repr__(self): return f"{self.__class__.__name__}(ref={repr(self.ref)},cause={repr(self._cause)})" def __eq__(self, other): if not isinstance(other, type(self)): return False return self.ref == other.ref and self.cause == other.cause def __ne__(self, other): return not self == other class BaseDocument(Mapping): """A base document class implementing an immutable mapping. """ def __init__(self, *args, **kwargs): self._store = dict(*args, **kwargs) def __getitem__(self, __k: str) -> Any: return self._store[__k] def __len__(self) -> int: return len(self._store) def __iter__(self) -> Iterator[Any]: return iter(self._store) def __eq__(self, other): if not isinstance(other, type(self)): return False if len(self) != len(other): return False for k, v in self.items(): if k not in other: return False if self[k] != other[k]: return False return True def __ne__(self, other): return not self.__eq__(other) class Document(BaseDocument): """A class representing a user document stored in Fauna. User data should be stored directly on the map, while id, ts, and coll should only be stored on the related properties. When working with a :class:`Document` in code, it should be considered immutable. """ @property def id(self) -> str: return self._id @property def ts(self) -> datetime: return self._ts @property def coll(self) -> Module: return self._coll def __init__(self, id: str, ts: datetime, coll: Union[str, Module], data: Optional[Mapping] = None): if not isinstance(id, str): raise TypeError(f"'id' should be of type str, but was {type(id)}") if not isinstance(ts, datetime): raise TypeError(f"'ts' should be of type datetime, but was {type(ts)}") if not (isinstance(coll, str) or isinstance(coll, Module)): raise TypeError( f"'coll' should be of type Module or str, but was {type(coll)}") if isinstance(coll, str): coll = Module(coll) self._id = id self._ts = ts self._coll = coll super().__init__(data or {}) def __eq__(self, other): return type(self) == type(other) \ and self.id == other.id \ and self.coll == other.coll \ and self.ts == other.ts \ and super().__eq__(other) def __ne__(self, other): return not self.__eq__(other) def __repr__(self): kvs = ",".join([f"{repr(k)}:{repr(v)}" for k, v in self.items()]) return f"{self.__class__.__name__}(" \ f"id={repr(self.id)}," \ f"coll={repr(self.coll)}," \ f"ts={repr(self.ts)}," \ f"data={{{kvs}}})" class NamedDocument(BaseDocument): """A class representing a named document stored in Fauna. Examples of named documents include Collection definitions, Index definitions, and Roles, among others. When working with a :class:`NamedDocument` in code, it should be considered immutable. """ @property def name(self) -> str: return self._name @property def ts(self) -> datetime: return self._ts @property def coll(self) -> Module: return self._coll def __init__(self, name: str, ts: datetime, coll: Union[Module, str], data: Optional[Mapping] = None): if not isinstance(name, str): raise TypeError(f"'name' should be of type str, but was {type(name)}") if not isinstance(ts, datetime): raise TypeError(f"'ts' should be of type datetime, but was {type(ts)}") if not (isinstance(coll, str) or isinstance(coll, Module)): raise TypeError( f"'coll' should be of type Module or str, but was {type(coll)}") if isinstance(coll, str): coll = Module(coll) self._name = name self._ts = ts self._coll = coll super().__init__(data or {}) def __eq__(self, other): return type(self) == type(other) \ and self.name == other.name \ and self.coll == other.coll \ and self.ts == other.ts \ and super().__eq__(other) def __ne__(self, other): return not self.__eq__(other) def __repr__(self): kvs = ",".join([f"{repr(k)}:{repr(v)}" for k, v in self.items()]) return f"{self.__class__.__name__}(" \ f"name={repr(self.name)}," \ f"coll={repr(self.coll)}," \ f"ts={repr(self.ts)}," \ f"data={{{kvs}}})" ```` ## File: fauna/query/query_builder.py ````python import abc from typing import Any, Optional, List from .template import FaunaTemplate class Fragment(abc.ABC): """An abstract class representing a Fragment of a query. """ @abc.abstractmethod def get(self) -> Any: """An abstract method for returning a stored value. """ pass class ValueFragment(Fragment): """A concrete :class:`Fragment` representing a part of a query that can represent a template variable. For example, if a template contains a variable ``${foo}``, and an object ``{ "prop": 1 }`` is provided for foo, then ``{ "prop": 1 }`` should be wrapped as a :class:`ValueFragment`. :param Any val: The value to be used as a fragment. """ def __init__(self, val: Any): self._val = val def get(self) -> Any: """Gets the stored value. :returns: The stored value. """ return self._val class LiteralFragment(Fragment): """A concrete :class:`Fragment` representing a query literal For example, in the template ```let x = ${foo}```, the portion ```let x = ``` is a query literal and should be wrapped as a :class:`LiteralFragment`. :param str val: The query literal to be used as a fragment. """ def __init__(self, val: str): self._val = val def get(self) -> str: """Returns the stored value. :returns: The stored value. """ return self._val class Query: """A class for representing a query. e.g. { "fql": [...] } """ _fragments: List[Fragment] def __init__(self, fragments: Optional[List[Fragment]] = None): self._fragments = fragments or [] @property def fragments(self) -> List[Fragment]: """The list of stored Fragments""" return self._fragments def __str__(self) -> str: res = "" for f in self._fragments: res += str(f.get()) return res def fql(query: str, **kwargs: Any) -> Query: """Creates a Query - capable of performing query composition and simple querying. It can accept a simple string query, or can perform composition using ``${}`` sigil string template with ``**kwargs`` as substitutions. The ``**kwargs`` can be Fauna data types - such as strings, document references, or modules - and embedded Query - allowing you to compose arbitrarily complex queries. When providing ``**kwargs``, following types are accepted: - :class:`str`, :class:`int`, :class:`float`, :class:`bool`, :class:`datetime.datetime`, :class:`datetime.date`, :class:`dict`, :class:`list`, :class:`Query`, :class:`DocumentReference`, :class:`Module` :raises ValueError: If there is an invalid template placeholder or a value that cannot be encoded. :returns: A :class:`Query` that can be passed to the client for evaluation against Fauna. Examples: .. code-block:: python :name: Simple-FQL-Example :caption: Simple query declaration using this function. fql('Dogs.byName("Fido")') .. code-block:: python :name: Composition-FQL-Example :caption: Query composition using this function. def get_dog(id): return fql('Dogs.byId(${id})', id=id) def get_vet_phone(id): return fql('${dog} { .vet_phone_number }', dog=get_dog(id)) get_vet_phone('d123') """ fragments: List[Any] = [] template = FaunaTemplate(query) for text, field_name in template.iter(): if text is not None and len(text) > 0: fragments.append(LiteralFragment(text)) if field_name is not None: if field_name not in kwargs: raise ValueError( f"template variable `{field_name}` not found in provided kwargs") # TODO: Reject if it's already a fragment, or accept *Fragment? Decide on API here fragments.append(ValueFragment(kwargs[field_name])) return Query(fragments) ```` ## File: fauna/query/template.py ````python import re as _re from typing import Optional, Tuple, Iterator, Match class FaunaTemplate: """A template class that supports variables marked with a ${}-sigil. Its primary purpose is to expose an iterator for the template parts that support composition of FQL queries. Implementation adapted from https://github.com/python/cpython/blob/main/Lib/string.py :param template: A string template e.g. "${my_var} { name }" :type template: str """ _delimiter = '$' _idpattern = r'[_a-zA-Z][_a-zA-Z0-9]*' _flags = _re.VERBOSE def __init__(self, template: str): """The initializer""" delim = _re.escape(self._delimiter) pattern = fr""" {delim}(?: (?P{delim}) | # Escape sequence of two delimiters {{(?P{self._idpattern})}} | # delimiter and a braced identifier (?P) # Other ill-formed delimiter exprs ) """ self._pattern = _re.compile(pattern, self._flags) self._template = template def iter(self) -> Iterator[Tuple[Optional[str], Optional[str]]]: """A method that returns an iterator over tuples representing template parts. The first value of the tuple, if not None, is a template literal. The second value of the tuple, if not None, is a template variable. If both are not None, then the template literal comes *before* the variable. :raises ValueError: If there is an invalid template placeholder :return: An iterator of template parts :rtype: collections.Iterable[Tuple[Optional[str], Optional[str]]] """ match_objects = self._pattern.finditer(self._template) cur_pos = 0 for mo in match_objects: if mo.group("invalid") is not None: self._handle_invalid(mo) span_start_pos = mo.span()[0] span_end_pos = mo.span()[1] escaped_part = mo.group("escaped") or "" variable_part = mo.group("braced") literal_part: Optional[str] = None if cur_pos != span_start_pos: literal_part = \ self._template[cur_pos:span_start_pos] \ + escaped_part cur_pos = span_end_pos yield literal_part, variable_part if cur_pos != len(self._template): yield self._template[cur_pos:], None def _handle_invalid(self, mo: Match) -> None: i = mo.start("invalid") lines = self._template[:i].splitlines(keepends=True) if not lines: colno = 1 lineno = 1 else: colno = i - len(''.join(lines[:-1])) lineno = len(lines) raise ValueError( f"Invalid placeholder in template: line {lineno}, col {colno}") ```` ## File: fauna/__init__.py ````python __title__ = "Fauna" __version__ = "2.4.0" __api_version__ = "10" __author__ = "Fauna, Inc" __license__ = "MPL 2.0" __copyright__ = "2023 Fauna, Inc" from fauna.query import fql, Document, DocumentReference, NamedDocument, NamedDocumentReference, NullDocument, Module, Page global_http_client = None ````