# Fauna v10 Go client driver (current) | Version: 3.0.2 | Repository: fauna/fauna-go | | --- | --- | --- | --- | Fauna’s Go client driver lets you run FQL queries from Go applications. This guide shows how to set up the driver and use it to run FQL queries. ## [](#supported-go-versions)Supported Go versions * 1.19 * 1.20 * 1.21 * 1.22 ## [](#supported-cloud-runtimes)Supported cloud runtimes * AWS Lambda (See [AWS Lambda connections](#aws-lambda-connections)) * Netlify Functions * Vercel Functions ## [](#api-reference)API reference API reference documentation for the driver is available on [pkg.go.dev](https://pkg.go.dev/github.com/fauna/fauna-go/v3#section-documentation). ## [](#installation)Installation To install the driver, run: ```bash go get github.com/fauna/fauna-go/v3 ``` ## [](#basic-usage)Basic usage The following application: * Initializes a client instance to connect to Fauna * Composes a basic FQL query using an `FQL` string template * Runs the query using `Query()` ```go package main import ( "fmt" "github.com/fauna/fauna-go/v3" ) func main() { // Initialize the client to connect to Fauna client := fauna.NewClient( "FAUNA_SECRET", fauna.DefaultTimeouts(), ) // Compose a query query, _ := fauna.FQL(` Product.sortedByPriceLowToHigh() { name, description, price } `, nil) res, err := client.Query(query) if err != nil { panic(err) } jsonData, _ := json.Marshal(res.Data) fmt.Println(string(jsonData)) } ``` ## [](#connect-to-fauna)Connect to Fauna Each Fauna query is an independently authenticated request to the Core HTTP API’s [Query endpoint](../../../reference/http/reference/core-api/#operation/query). You authenticate with Fauna using an [authentication secret](../../../learn/security/authentication/#secrets). ### [](#get-an-authentication-secret)Get an authentication secret Fauna supports several [secret types](../../../learn/security/authentication/#secret-types). For testing, you can create a [key](../../../learn/security/keys/), which is a type of secret: 1. Log in to the [Fauna Dashboard](https://dashboard.fauna.com/). 2. On the **Explorer** page, create a database. 3. In the database’s **Keys** tab, click **Create Key**. 4. Choose a **Role** of **server**. 5. Click **Save**. 6. Copy the **Key Secret**. The secret is scoped to the database. ### [](#initialize-a-client)Initialize a client To send query requests to Fauna, initialize a `Client` instance. The `NewDefaultClient()` method initializes a client using: * A Fauna authentication secret in the `FAUNA_SECRET` environment variable * A base URL used by the driver for [Fauna Core HTTP API](../../../reference/http/reference/core-api/) requests in the `FAUNA_ENDPOINT` environment variable. * Default client configuration options ```go client, clientErr := fauna.NewDefaultClient() if clientErr != nil { panic(clientErr) } ``` To pass configuration options, use `NewClient()` to initialize the client: ```go client := fauna.NewClient( "FAUNA_SECRET", fauna.DefaultTimeouts(), ) ``` `NewClient()` requires `secret` and `timeouts` arguments. For timeouts and more configuration options, see [Client configuration](#config). ### [](#connect-to-a-child-database)Connect to a child database A [scoped key](../../../learn/security/keys/#scoped-keys) lets you use a parent database’s admin key to send query requests to its child databases. For example, if you have an admin key for a parent database and want to connect to a child database named `childDB`, you can create a scoped key using the following format: ``` // Scoped key that impersonates an `admin` key for // the `childDB` child database. fn...:childDB:admin ``` You can then initialize a `Client` instance using the scoped key: ```go client := fauna.NewClient( "fn...:childDB:admin", fauna.DefaultTimeouts(), ) ``` ### [](#multiple-connections)Multiple connections You can use a single client instance to run multiple asynchronous queries at once. The driver manages HTTP connections as needed. Your app doesn’t need to implement connection pools or other connection management strategies. You can create multiple client instances to connect to Fauna using different credentials or client configurations. ### [](#aws-lambda-connections)AWS Lambda connections AWS Lambda freezes, thaws, and reuses execution environments for Lambda functions. See [Lambda execution environment](https://docs.aws.amazon.com/lambda/latest/dg/running-lambda-code.html). When an execution environment is thawed, Lambda only runs the function’s handler code. Objects declared outside of the handler method remain initialized from before the freeze. Lambda doesn’t re-run initialization code outside the handler. Fauna drivers keep socket connections that can time out during long freezes, causing `ECONNRESET` errors when thawed. To prevent timeouts, create Fauna client connections inside function handlers. Fauna drivers use lightweight HTTP connections. You can create new connections for each request while maintaining good performance. ## [](#run-fql-queries)Run FQL queries Use `FQL` string templates to compose FQL queries. Run the queries using `Query()`: ```go query, _ := fauna.FQL(`Product.sortedByPriceLowToHigh()`, nil) client.Query(query) ``` By default, `Query()` uses query options from the [Client configuration](#config). You can pass options to `Query()` to override these defaults. See [Query options](#query-opts). You can only compose FQL queries using string templates. ### [](#var)Variable interpolation Use `${}` to pass native Go variables as `map[string]any` to `FQL`. You can escape a variable by prepending an additional `$`. ```go // Create a native Go var collectionName := "Product" // Pass the var to an FQL query query, _ := fauna.FQL(` let collection = Collection(${collectionName}) collection.sortedByPriceLowToHigh() `, map[string]any{"collectionName": collectionName}) client.Query(query) ``` The driver encodes interpolated variables to an appropriate [FQL type](../../../reference/fql/types/) and uses the [wire protocol](../../../reference/http/reference/wire-protocol/) to pass the query to the Core HTTP API’s [Query endpoint](../../../reference/http/reference/core-api/#operation/query). This helps prevent injection attacks. ### [](#query-composition)Query composition You can use variable interpolation to pass FQL string templates as query fragments to compose an FQL query: ```go func main() { client := fauna.NewClient( "FAUNA_SECRET", fauna.DefaultTimeouts(), ) // Create a reusable query fragment. productQuery, _ := fauna.FQL(` Product.byName("pizza").first() `, nil) // Use the fragment in another FQL query. query, _ := fauna.FQL(` let product = ${product} product { name, price } `, map[string]any{"product": productQuery}) client.Query(query) } ``` ### [](#structs)Structs The driver supports user-defined structs: ```go type Product struct { Name string `fauna:"name"` Description string `fauna:"description"` Price int `fauna:"price"` } func main() { client := fauna.NewClient( "FAUNA_SECRET", fauna.DefaultTimeouts(), ) newProduct := Product{"key limes", "Organic, 1 ct", 95} query, _ := fauna.FQL(`Product.create(${product})`, map[string]any{"product": newProduct}) client.Query(query) } ``` ### [](#pagination)Pagination Use `Paginate()` to iterate through a Set that contains more than one page of results. `Paginate()` accepts the same [Query options](#query-opts) as `Query()`. ```go // Adjust `pageSize()` size as needed. query, _ := fauna.FQL(` Product .byName("limes") .pageSize(2) `, nil) paginator := client.Paginate(query) for { page, _ := paginator.Next() var pageItems []any page.Unmarshal(&pageItems) for _, item := range pageItems { fmt.Println(item) } if !paginator.HasNext() { break } } ``` ### [](#query-stats)Query stats Successful query responses and the following error types include [query stats](../../../reference/http/reference/query-stats/): * `ErrAbort` * `ErrAuthentication` * `ErrAuthorization` * `ErrContendedTransaction` * `ErrInvalidRequest` * `ErrQueryCheck` * `ErrQueryRuntime` * `ErrQueryRuntime` * `ErrQueryTimeout` * `ErrServiceInternal` * `ErrServiceTimeout` * `ErrThrottling` ```go query, _ := fauna.FQL(`"Hello world"`, nil) res, err := client.Query(query) if err != nil { if faunaErr, ok := err.(*fauna.ErrQueryCheck); ok { jsonData, _ := json.Marshal(faunaErr.QueryInfo.Stats) fmt.Println(string(jsonData)) } panic(err) } jsonData, _ := json.Marshal(res.Stats) fmt.Println(string(jsonData)) ``` ## [](#config)Client configuration The `Client` instance comes with reasonable configuration defaults. We recommend using the defaults in most cases. If needed, you can use `NewClient()` to configure the client and override the defaults. This also lets you set default [Query options](#query-opts). ```go secret := "FAUNA_SECRET" timeouts := fauna.Timeouts{ QueryTimeout: time.Minute, ClientBufferTimeout: time.Second * 30, ConnectionTimeout: time.Second * 10, IdleConnectionTimeout: time.Minute * 5, } client := fauna.NewClient( // Configure the client secret, timeouts, fauna.URL("https://db.fauna.com"), fauna.AdditionalHeaders(map[string]string{ "foo": "bar", }), fauna.Linearized(false), fauna.MaxAttempts(5), fauna.MaxBackoff(time.Minute), fauna.MaxContentionRetries(5), // Set default query options fauna.DefaultTypecheck(true), fauna.QueryTags(map[string]string{ "tag", "value", }), fauna.QueryTimeout(time.Second*60), ) ``` For supported parameters, see [NewClient](https://pkg.go.dev/github.com/fauna/fauna-go/v3#NewClient) in the API reference. ### [](#timeouts)Timeouts `NewClient()` requires a `timeouts` argument. The argument must contain a `Timeouts` struct: ```go timeouts := fauna.Timeouts{ QueryTimeout: time.Second * 5, ClientBufferTimeout: time.Second * 5, ConnectionTimeout: time.Second * 5, IdleConnectionTimeout: time.Second * 5, } client := fauna.NewClient( "FAUNA_SECRET", timeouts, ) ``` For default timeouts, use `DefaultTimeouts()`: ```go client := fauna.NewClient( "FAUNA_SECRET", fauna.DefaultTimeouts(), ) ``` For supported fields, see [Timeouts](https://pkg.go.dev/github.com/fauna/fauna-go/v3#Timeouts) in the API reference. ### [](#configuration-functions)Configuration functions To configure the client and set default query options, pass one or more `ClientConfigFn` functions to `NewClient()`: ```go client := fauna.NewClient( "FAUNA_SECRET", fauna.DefaultTimeouts(), // Start configuration functions fauna.URL("https://db.fauna.com"), fauna.AdditionalHeaders(map[string]string{ "foo": "bar", }), fauna.Linearized(false), fauna.MaxAttempts(5), fauna.MaxBackoff(time.Minute), fauna.MaxContentionRetries(5), // Configuration functions for // default query options fauna.DefaultTypecheck(true), fauna.QueryTags(map[string]string{ "tag", "value", }), fauna.QueryTimeout(time.Second*60), ) ``` For supported functions, see [ClientConfigFn](https://pkg.go.dev/github.com/fauna/fauna-go/v3#ClientConfigFn) in the API reference. ### [](#retries)Retries By default, the client automatically retries a query if the request returns a 429 HTTP status code. Retries use an exponential backoff. Use the `MaxBackoff` [configuration function](#configuration-functions) to set the maximum time between retries. Similarly, use `MaxAttempts` to set the maximum number of retry attempts. ## [](#query-opts)Query options The [Client configuration](#config) sets default query options for the following methods: * `Query()` * `Paginate()` * `Stream()` To override these defaults, pass one or more `QueryOptFn` functions to the method: ```go options := []fauna.QueryOptFn{ fauna.Tags(map[string]string{ "name": "hello world query", }), fauna.Timeout(time.Minute), fauna.Traceparent("00-750efa5fb6a131eb2cf4db39f28366cb-000000000000000b-00"), fauna.Typecheck(true), } query, _ := fauna.FQL(`"Hello world"`, nil) client.Query(query, options...) ``` For supported functions, see [QueryOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v3#QueryOptFn) in the API reference. ## [](#event-feeds)Event feeds The driver supports [event feeds](../../../learn/cdc/#event-feeds). An event feed asynchronously polls an [event source](../../../learn/cdc/) for events. To use event feeds, you must have a Pro or Enterprise plan. ### [](#request-an-event-feed)Request an event feed To get an event source, append [`set.eventSource()`](../../../reference/fql-api/set/eventsource/) or [`set.eventsOn()`](../../../reference/fql-api/set/eventson/) to a [supported Set](../../../learn/cdc/#sets). To get paginated events, pass the event source to `Feed()`. This lets you fetch a page of initial results followed by an event feed: ```go client := fauna.NewClient( "FAUNA_SECRET", fauna.DefaultTimeouts(), ) query, _ := fauna.FQL(` let set = Product.all() { initialPage: set.pageSize(10), eventSource: set.eventSource() } `, nil) res, err := client.Query(query) if err != nil { log.Fatalf("Failed to query: %v", err) } var result struct { InitialPage fauna.Page `fauna:"initialPage"` EventSource fauna.EventSource `fauna:"eventSource"` } if err := res.Unmarshal(&result); err != nil { log.Fatalf("Failed to unmarshal results: %v", err) } feed, err := client.Feed(result.EventSource) if err != nil { log.Fatalf("Failed to create feed: %v", err) } ``` If changes occur between the creation of the event source and the `Feed()` request, the feed replays and emits any related events. You can also pass a query that produces an event source directly to `FeedFromQuery()`: ```go query, _ := fauna.FQL(`Product.all().eventsOn(.price, .stock)`, nil) feed, err := client.FeedFromQuery(query) if err != nil { log.Fatalf("Failed to create feed from query: %v", err) } ``` In most cases, you’ll get events after a specific [start time](#start-time) or [cursor](#cursor). #### [](#start-time)Get events after a specific start time When you first poll an event source using an event feed, you usually pass `EventFeedStartTime()` to `Feed()` or `FeedFromQuery()`. The request returns events that occurred after the specified timestamp (exclusive): ```go query, _ := fauna.FQL(`Product.all().eventSource()`, nil) // Calculate timestamp for 10 minutes ago tenMinutesAgo := time.Now().Add(-10 * time.Minute) feed, err := client.FeedFromQuery( query, fauna.EventFeedStartTime(tenMinutesAgo), ) ``` The start time must be later than the creation time of the event source. The period between the request and the start time can’t exceed the `history_days` setting for the source Set’s collection. If `history_days` is `0` or unset, the period is limited to 15 minutes. #### [](#cursor)Get events after a specific cursor After the initial request, you usually get subsequent events using the [cursor](../../../learn/cdc/#cursor) for the last page or event. To get events after a cursor (exclusive), pass an `EventFeedCursor()` to `Feed()` or `FeedFromQuery()`: ```go query, _ := fauna.FQL(`Product.all().eventSource()`, nil) feed, err := client.FeedFromQuery( query, fauna.EventFeedCursor("gsGabc456"), // Cursor for a previous page ) ``` ### [](#loop)Iterate on an event feed `Feed()` and `FeedFromQuery()` return an `EventFeed` instance. You can use a `for` loop to iterate through the pages of events: ```go import ( "fmt" "log" "time" "github.com/fauna/fauna-go/v3" ) func main() { client := fauna.NewClient("FAUNA_SECRET", fauna.DefaultTimeouts()) query, _ := fauna.FQL(`Product.all().eventSource()`, nil) // Calculate timestamp for 10 minutes ago tenMinutesAgo := time.Now().Add(-10 * time.Minute) feed, err := client.FeedFromQuery( query, fauna.EventFeedStartTime(tenMinutesAgo), ) if err != nil { log.Fatalf("Failed to create feed: %v", err) } for { var page fauna.FeedPage err := feed.Next(&page) if err != nil { log.Fatalf("Error getting next feed page: %v", err) } fmt.Println("Page stats:", page.Stats) for _, event := range page.Events { switch event.Type { case "add": // Do something on add fmt.Println("Add event: ", event) case "update": // Do something on update fmt.Println("Update event: ", event) case "remove": // Do something on remove fmt.Println("Remove event: ", event) } } if !page.HasNext { break } } } ``` Each page includes a top-level `cursor`. You can pass the cursor to `Feed()` or `FeedFromQuery()` using `EventFeedCursor()`: ```go import ( "fmt" "log" "time" "github.com/fauna/fauna-go/v3" ) func processFeed(client *fauna.Client, query *fauna.Query, startTs time.Time, sleepTime time.Duration) { var cursor string = "" for { // Use start time only on the first request, then use cursor. var options []fauna.FeedOptFn if !startTs.IsZero() { options = append(options, fauna.EventFeedStartTime(startTs)) // Null out startTs after first use. startTs = time.Time{} } else { options = append(options, fauna.EventFeedCursor(cursor)) } feed, err := client.FeedFromQuery(query, options...) if err != nil { log.Fatalf("Failed to create feed: %v", err) } for { var page fauna.FeedPage err := feed.Next(&page) if err != nil { log.Fatalf("Error getting next feed page: %v", err) } for _, event := range page.Events { switch event.Type { case "add": fmt.Println("Add event:", event) case "update": fmt.Println("Update event:", event) case "remove": fmt.Println("Remove event:", event) } } // Store the cursor of the last page cursor = page.Cursor // If no more pages are available, break the inner loop if !page.HasNext { break } } // Sleep between feed requests fmt.Printf("Sleeping for %v seconds...\n", sleepTime.Seconds()) time.Sleep(sleepTime) } } func main() { client := fauna.NewClient("FAUNA_SECRET", fauna.DefaultTimeouts()) // Calculate timestamp for 10 minutes ago tenMinutesAgo := time.Now().Add(-10 * time.Minute) query, err := fauna.FQL(`Product.all().eventsOn(.price, .stock)`, nil) if err != nil { log.Fatalf("Failed to create FQL query: %v", err) } sleepTime := 300 * time.Second processFeed(client, query, tenMinutesAgo, sleepTime) } ``` If needed, you can store the cursor as a collection document: ```go import ( "fmt" "log" "time" "github.com/fauna/fauna-go/v3" ) func processFeedWithCursor(client *fauna.Client, query *fauna.Query, startTs time.Time, sleepTime time.Duration) { // Ensure `Cursor` collection exists createCursorCollection, err := fauna.FQL(` if (Collection.byName("Cursor").exists() == false) { Collection.create({ name: "Cursor", fields: { name: { signature: "String" }, value: { signature: "String?" } }, constraints: [ { unique: [ { field: ".name", mva: false } ] } ], indexes: { byName: { terms: [ { field: ".name", mva: false } ] } } }) } else { null } `, nil) if err != nil { log.Fatalf("Failed to create Cursor collection: %v", err) } if _, err := client.Query(createCursorCollection); err != nil { log.Fatalf("Failed to create Cursor collection: %v", err) } // Ensure `ProductInventory` document exists in `Cursor` createProductInventoryCursor, err := fauna.FQL(` if (Collection("Cursor").byName("ProductInventory").first() == null) { Cursor.create({ name: "ProductInventory", value: null }) } else { null } `, nil) if err != nil { log.Fatalf("Failed to create ProductInventory cursor: %v", err) } if _, err := client.Query(createProductInventoryCursor); err != nil { log.Fatalf("Failed to create ProductInventory cursor: %v", err) } for { // Fetch existing cursor from the `Cursor` collection cursorQuery, err := fauna.FQL(`Cursor.byName("ProductInventory").first()`, nil) if err != nil { log.Fatalf("Failed to create cursor query: %v", err) } cursorRes, err := client.Query(cursorQuery) if err != nil { log.Fatalf("Failed to fetch cursor: %v", err) } // Unmarshal cursor data into a map var cursorData map[string]interface{} if err := cursorRes.Unmarshal(&cursorData); err != nil { log.Fatalf("Failed to unmarshal cursor result: %v", err) } // Extract the cursor value cursor, _ := cursorData["cursor"].(string) // Set options based on cursor existence var options []fauna.FeedOptFn if cursor == "" { options = append(options, fauna.EventFeedStartTime(startTs)) } else { // Here we ensure that the query supports cursors if query == nil { log.Fatalf("Query is nil; unable to create feed with cursor.") } options = append(options, fauna.EventFeedCursor(cursor)) } // Create the feed feed, err := client.FeedFromQuery(query, options...) if err != nil { log.Fatalf("Failed to create feed: %v", err) } for { var page fauna.FeedPage if err := feed.Next(&page); err != nil { log.Fatalf("Error getting next feed page: %v", err) } for _, event := range page.Events { switch event.Type { case "add": fmt.Println("Add event: ", event) case "update": fmt.Println("Update event: ", event) case "remove": fmt.Println("Remove event: ", event) } } // Store the cursor of the last page in the `Cursor` collection cursor = page.Cursor updateCursor, err := fauna.FQL(fmt.Sprintf(` Cursor.byName("ProductInventory").first()!.update({ value: "%s" }) `, cursor), nil) if err != nil { log.Fatalf("Failed to create update cursor query: %v", err) } if _, err := client.Query(updateCursor); err != nil { log.Fatalf("Failed to update cursor: %v", err) } fmt.Printf("Cursor updated: %s\n", cursor) startTs = time.Time{} fmt.Printf("Sleeping for %v seconds...\n", sleepTime.Seconds()) time.Sleep(sleepTime) } } } func main() { client := fauna.NewClient("FAUNA_SECRET", fauna.DefaultTimeouts()) // Calculate timestamp for 10 minutes ago tenMinutesAgo := time.Now().Add(-10 * time.Minute) query, err := fauna.FQL(`Product.all().eventsOn(.price, .stock)`, nil) if err != nil { log.Fatalf("Failed to create FQL query: %v", err) } sleepTime := 300 * time.Second processFeedWithCursor(client, query, tenMinutesAgo, sleepTime) } ``` ### [](#error-handling)Error handling Errors can occur in two places: * While fetching a page * While iterating a page’s events This distinction allows for you to ignore errors originating from event processing. For example: ```go import ( "fmt" "log" "time" "github.com/fauna/fauna-go/v3" ) func main() { client := fauna.NewClient("FAUNA_SECRET", fauna.DefaultTimeouts()) query, _ := fauna.FQL(`Product.all().eventSource()`, nil) // Calculate timestamp for 10 minutes ago tenMinutesAgo := time.Now().Add(-10 * time.Minute) feed, err := client.FeedFromQuery( query, fauna.EventFeedStartTime(tenMinutesAgo), ) if err != nil { log.Fatalf("Failed to create feed: %v", err) } for { var page fauna.FeedPage err := feed.Next(&page) if err != nil { log.Fatalf("Error getting next feed page: %v", err) } fmt.Println("Page stats:", page.Stats) for _, event := range page.Events { func() { defer func() { if r := recover(); r != nil { log.Printf("Error processing event: %v", r) } }() switch event.Type { case "add": fmt.Println("Add event: ", event) case "update": fmt.Println("Update event: ", event) case "remove": fmt.Println("Remove event: ", event) default: log.Printf("Unknown event type: %s", event.Type) } }() } if !page.HasNext { break } } } ``` Each page’s `cursor` contains the cursor for the page’s last successfully processed event. If you’re using a [loop to poll for changes](#loop), you can use the cursor to skip any events that caused errors. ### [](#event-feed-opts)Event feed options Both `Feed()` and `FeedFromQuery()` accept [FeedOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v3#FeedOptFn) functions as arguments. Use `EventFeedStartTime()` to start the feed at a specific timestamp: ```go tenMinutesAgo := time.Now().Add(-10 * time.Minute) feed := client.FeedFromQuery( fauna.FQL(`Product.all().eventSource()`), fauna.EventFeedStartTime(tenMinutesAgo), ) ``` Use `EventFeedCursor()` to start the feed at a specific event or page cursor: ```go feed := client.FeedFromQuery( fauna.FQL(`Product.all().eventSource()`), fauna.EventFeedCursor("gsGabc456"), ) ``` Use `EventFeedPageSize()` to set the maximum number of events returned per page: ```go feed := client.FeedFromQuery( fauna.FQL(`Product.all().eventSource()`), fauna.EventFeedCursor("gsGabc456"), fauna.EventFeedPageSize(10), ) ``` For supported functions, see [FeedOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v3#FeedOptFn) in the API reference. ## [](#event-streaming)Event streams The driver supports [event streams](../../../learn/cdc/). ### [](#start-a-stream)Start a stream To get an event source, append [`set.eventSource()`](../../../reference/fql-api/set/eventsource/) or [`set.eventsOn()`](../../../reference/fql-api/set/eventson/) to a [supported Set](../../../learn/cdc/#sets). The driver represents event sources as `EventSource` values. To stream the source’s events, pass the event source to `Stream()`. This lets you output a stream alongside normal query results: ```go type Product struct { Name string `fauna:"name"` Description string `fauna:"description"` Price int `fauna:"price"` } func main() { client := fauna.NewClient( "FAUNA_SECRET", fauna.DefaultTimeouts(), ) dataLoad, _ := fauna.FQL(` let products = Product.all() { Products: products.toArray(), EventSource: products.eventSource() } `, nil) data, err := client.Query(dataLoad) if err != nil { panic(err) } queryResult := struct { Products []Product EventSource fauna.EventSource }{} if err := data.Unmarshal(&queryResult); err != nil { panic(err) } fmt.Println("Existing products:") for _, product := range queryResult.Products { fmt.Println(product) } events, err := client.Stream(queryResult.EventSource) if err != nil { panic(err) } defer events.Close() fmt.Println("Products from streaming:") var event fauna.Event for { err := events.Next(&event) if err != nil { panic(err) } switch event.Type { case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent: var product Product if err = event.Unmarshal(&product); err != nil { panic(err) } fmt.Println(product) } } } ``` You can also pass a query that produces an event source directly to `StreamFromQuery()`. ```go type Product struct { Name string `fauna:"name"` Description string `fauna:"description"` Price int `fauna:"price"` } func main() { client := fauna.NewClient( "FAUNA_SECRET", fauna.DefaultTimeouts(), ) streamQuery, _ := fauna.FQL("Product.all().eventSource()", nil) events, err := client.Stream(streamQuery) if err != nil { panic(err) } defer events.Close() var event fauna.Event for { err := events.Next(&event) if err != nil { panic(err) } switch event.Type { case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent: var product Product if err = event.Unmarshal(&product); err != nil { panic(err) } fmt.Println(product) } } } ``` ### [](#stream-options)Stream options Both `Stream()` and `StreamFromQuery()` accept [StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v3#StreamOptFn) functions as arguments. Use `StreamStartTime()` to restart a stream at a specific timestamp: ```go streamQuery, _ := fauna.FQL(`Product.all().eventSource()`, nil) tenMinutesAgo := time.Now().Add(-10 * time.Minute) client.StreamFromQuery(streamQuery, nil, fauna.StreamStartTime(tenMinutesAgo)) ``` Use `EventCursor()` to resume a stream after a disconnect: ```go streamQuery, _ := fauna.FQL(`Product.all().toStream()`, nil) client.StreamFromQuery(streamQuery, nil, fauna.EventCursor("abc2345==")) ``` For supported functions, see [StreamOptFn](https://pkg.go.dev/github.com/fauna/fauna-go/v3#StreamOptFn) in the API reference. ## [](#debug-logging)Debug logging To enable debug logging, set the `FAUNA_DEBUG` environment variable to an integer for the value of the desired [slog level](https://pkg.go.dev/log/slog#Level): * `slog.LevelInfo` logs all HTTP responses from Fauna. * `slog.LevelDebug` includes the HTTP request body. The `Authorization` header is not redacted. For Go versions before 1.21, the driver uses a [log.Logger](https://pkg.go.dev/log#Logger). For 1.21+, the driver uses the [slog.Logger](https://pkg.go.dev/log/slog#Logger). You can optionally define your own Logger. For an example, see `CustomLogger` in [`logging_slog_test.go`](https://github.com/fauna/fauna-go/blob/main/logging_slog_test.go)\`. # Go driver source code # Files ## File: client_example_test.go ```go package fauna_test import ( "context" "fmt" "log" "net/http" "os" "time" "github.com/fauna/fauna-go/v3" ) // ExampleNewDefaultClient query fauna running in a local Docker instance: // // docker run --rm -p 8443:8443 fauna/faunadb:latest func ExampleNewDefaultClient() { // IMPORTANT: just for the purpose of example, don't actually hardcode secret _ = os.Setenv(fauna.EnvFaunaSecret, "secret") _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, clientErr := fauna.NewDefaultClient() if clientErr != nil { log.Fatalf("client should have been initialized: %s", clientErr) } query, qErr := fauna.FQL(`Math.abs(12e5)`, nil) if qErr != nil { log.Fatalf("query failed: %s", qErr) } res, queryErr := client.Query(query) if queryErr != nil { log.Fatalf("request failed: %s", queryErr) } var result float32 if err := res.Unmarshal(&result); err != nil { log.Fatalf("%s", err) } fmt.Printf("%0.f", result) // Output: 1200000 } // ExampleNewClient query fauna running in a local Docker instance: // // docker run --rm -p 8443:8443 fauna/faunadb:latest func ExampleNewClient() { client := fauna.NewClient( // IMPORTANT: just for the purpose of example, don't actually hardcode secret "secret", fauna.DefaultTimeouts(), fauna.HTTPClient(http.DefaultClient), fauna.URL(fauna.EndpointLocal), fauna.Context(context.Background()), fauna.QueryTimeout(time.Minute*3), ) query, qErr := fauna.FQL(`Math.abs(12e5)`, nil) if qErr != nil { log.Fatalf("query failed: %s", qErr) } res, queryErr := client.Query(query) if queryErr != nil { log.Fatalf("request failed: %s", queryErr) } var result float32 if err := res.Unmarshal(&result); err != nil { log.Fatalf("%s", queryErr) } fmt.Printf("%0.f", result) // Output: 1200000 } // ExampleFQL query fauna running in a local Docker instance: // // docker run --rm -p 8443:8443 fauna/faunadb:latest func ExampleFQL() { // IMPORTANT: just for the purpose of example, don't actually hardcode secret _ = os.Setenv(fauna.EnvFaunaSecret, "secret") _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, clientErr := fauna.NewDefaultClient() if clientErr != nil { log.Fatalf("client should have been initialized: %s", clientErr) } query, fqlErr := fauna.FQL(`2 + 2`, nil) if fqlErr != nil { log.Fatalf("query failed: %s", fqlErr) } res, queryErr := client.Query(query) if queryErr != nil { log.Fatalf("request failed: %s", queryErr) } fmt.Printf("%d", res.Data) // Output: 4 } func ExampleFQL_arguments() { // IMPORTANT: just for the purpose of example, don't actually hardcode secret _ = os.Setenv(fauna.EnvFaunaSecret, "secret") _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, clientErr := fauna.NewDefaultClient() if clientErr != nil { log.Fatalf("client should have been initialized: %s", clientErr) } query, fqlErr := fauna.FQL(`${num} + 2`, map[string]any{"num": 2}) if fqlErr != nil { log.Fatalf("query failed: %s", fqlErr) } res, queryErr := client.Query(query) if queryErr != nil { log.Fatalf("request failed: %s", queryErr) } fmt.Printf("%d", res.Data) // Output: 4 } func ExampleFQL_structs() { // IMPORTANT: just for the purpose of example, don't actually hardcode secret _ = os.Setenv(fauna.EnvFaunaSecret, "secret") _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, clientErr := fauna.NewDefaultClient() if clientErr != nil { log.Fatalf("client should have been initialized: %s", clientErr) } type myObj struct { Value int `fauna:"value"` } arg := &myObj{Value: 2} query, fqlErr := fauna.FQL(`${obj}["value"] + 2`, map[string]any{"obj": arg}) if fqlErr != nil { log.Fatalf("query failed: %s", fqlErr) } res, queryErr := client.Query(query) if queryErr != nil { log.Fatalf("request failed: %s", queryErr) } fmt.Printf("%d", res.Data) // Output: 4 } func ExampleFQL_unmarshal() { // IMPORTANT: just for the purpose of example, don't actually hardcode secret _ = os.Setenv(fauna.EnvFaunaSecret, "secret") _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, clientErr := fauna.NewDefaultClient() if clientErr != nil { log.Fatalf("client should have been initialized: %s", clientErr) } type myObj struct { Value int `fauna:"value"` } // Mock out an object that looks like our struct `myObj`. query, fqlErr := fauna.FQL(`{"value": 4}`, nil) if fqlErr != nil { log.Fatalf("query failed: %s", fqlErr) } res, queryErr := client.Query(query) if queryErr != nil { log.Fatalf("request failed: %s", queryErr) } // Unmarshal the resulting object into a `myObj` object. var result myObj if err := res.Unmarshal(&result); err != nil { log.Fatalf("unmarshal failed: %s", queryErr) } fmt.Printf("%+v", result) // Output: {Value:4} } func ExampleFQL_composed() { // IMPORTANT: just for the purpose of example, don't actually hardcode secret _ = os.Setenv(fauna.EnvFaunaSecret, "secret") _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, clientErr := fauna.NewDefaultClient() if clientErr != nil { log.Fatalf("client should have been initialized: %s", clientErr) } type myObj struct { Value int `fauna:"value"` } arg := &myObj{Value: 4} // Build a query to pull a value from some object. This could be document already // in Fauna. getValQuery, gvqErr := fauna.FQL(`${obj}["value"]`, map[string]any{"obj": arg}) if gvqErr != nil { log.Fatalf("query failed: %s", gvqErr) } // Compose the value query with a multiplier to multiply the value we pulled by // some number. query, fqlErr := fauna.FQL("${multiplier} * ${value}", map[string]any{ "value": getValQuery, "multiplier": 4, }) if fqlErr != nil { log.Fatalf("query failed: %s", fqlErr) } res, queryErr := client.Query(query, fauna.Typecheck(true)) if queryErr != nil { log.Fatalf("request failed: %s", queryErr) } fmt.Printf("%+v", res.Data) // Output: 16 } func ExampleClient_Paginate() { // IMPORTANT: just for the purpose of example, don't actually hardcode secret _ = os.Setenv(fauna.EnvFaunaSecret, "secret") _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, clientErr := fauna.NewDefaultClient() if clientErr != nil { log.Fatalf("client should have been initialized: %s", clientErr) } collectionName := "pagination_sandbox" // create a collection deleteQuery, deleteQueryErr := fauna.FQL(`Collection.byName(${coll})?.delete()`, map[string]any{"coll": collectionName}) if deleteQueryErr != nil { log.Fatalf("failed to construct delete query") } if _, deleteErr := client.Query(deleteQuery); deleteErr != nil { log.Fatalf("failed to clean up collection: %t", deleteErr) } createQuery, createQueryErr := fauna.FQL(`Collection.create({ name: ${name} })`, map[string]any{"name": collectionName}) if createQueryErr != nil { log.Fatalf("failed to construct create query") } if _, createErr := client.Query(createQuery); createErr != nil { log.Fatalf("failed to create collection: %t", createErr) } // seed collection collectionModule := &fauna.Module{Name: collectionName} // update Output comment at the bottom if you change this totalTestItems := 20 for i := 0; i < totalTestItems; i++ { createCollectionQuery, createItemQueryErr := fauna.FQL(`${mod}.create({ value: ${i} })`, map[string]any{ "mod": collectionModule, "i": i, }) if createItemQueryErr != nil { log.Fatalf("failed to construct create item query: %t", createItemQueryErr) } if _, createItemErr := client.Query(createCollectionQuery); createItemErr != nil { log.Fatalf("failed to create seed item: %t", createItemErr) } } // paginate collection paginationQuery, paginationQueryErr := fauna.FQL(`${mod}.all()`, map[string]any{"mod": collectionModule}) if paginationQueryErr != nil { log.Fatalf("failed to construct pagination query: %t", paginationQueryErr) } type Item struct { Value int `fauna:"value"` } var items []Item paginator := client.Paginate(paginationQuery) for { page, pageErr := paginator.Next() if pageErr != nil { log.Fatalf("pagination failed: %t", pageErr) } var pageItems []Item if marshalErr := page.Unmarshal(&pageItems); marshalErr != nil { log.Fatalf("failed to unmarshal page: %t", marshalErr) } items = append(items, pageItems...) if !paginator.HasNext() { break } } fmt.Printf("%d", len(items)) // Output: 20 } func ExampleClient_StreamFromQuery() { // IMPORTANT: just for the purpose of example, don't actually hardcode secret _ = os.Setenv(fauna.EnvFaunaSecret, "secret") _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, err := fauna.NewDefaultClient() if err != nil { log.Fatalf("client should have been initialized: %s", err) } // setup a collection setupQuery, _ := fauna.FQL(` if (!Collection.byName('StreamingSandbox').exists()) { Collection.create({ name: 'StreamingSandbox' }) } else { StreamingSandbox.all().forEach(.delete()) } `, nil) if _, err := client.Query(setupQuery); err != nil { log.Fatalf("failed to setup the collection: %s", err) } // create a stream streamQuery, _ := fauna.FQL(`StreamingSandbox.all().eventSource()`, nil) events, err := client.StreamFromQuery(streamQuery, nil) if err != nil { log.Fatalf("failed to subscribe to the stream value: %s", err) } defer func() { _ = events.Close() }() // produce some events while the subscription is open createQuery, _ := fauna.FQL(`StreamingSandbox.create({ foo: 'bar' })`, nil) updateQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.update({ foo: 'baz' }))`, nil) deleteQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.delete())`, nil) queries := []*fauna.Query{createQuery, updateQuery, deleteQuery} for _, query := range queries { if _, err := client.Query(query); err != nil { log.Fatalf("failed execute CRUD query: %s", err) } } // fetch the produced events type Data struct { Foo string `fauna:"foo"` } var event fauna.Event expect := 3 for expect > 0 { err := events.Next(&event) if err != nil { log.Fatalf("failed to receive next event: %s", err) } switch event.Type { case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent: var data Data if err := event.Unmarshal(&data); err != nil { log.Fatalf("failed to unmarshal event data: %s", err) } fmt.Printf("Event: %s Data: %+v\n", event.Type, data) expect-- } } // Output: Event: add Data: {Foo:bar} // Event: update Data: {Foo:baz} // Event: remove Data: {Foo:baz} } func ExampleClient_Stream() { // IMPORTANT: just for the purpose of example, don't actually hardcode secret _ = os.Setenv(fauna.EnvFaunaSecret, "secret") _ = os.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, err := fauna.NewDefaultClient() if err != nil { log.Fatalf("client should have been initialized: %s", err) } // setup a collection setupQuery, _ := fauna.FQL(` if (!Collection.byName('StreamingSandbox').exists()) { Collection.create({ name: 'StreamingSandbox' }) } else { StreamingSandbox.all().forEach(.delete()) } `, nil) if _, err := client.Query(setupQuery); err != nil { log.Fatalf("failed to setup the collection: %s", err) } // create a stream streamQuery, _ := fauna.FQL(`StreamingSandbox.all().eventSource()`, nil) result, err := client.Query(streamQuery) if err != nil { log.Fatalf("failed to create a stream: %s", err) } var stream fauna.EventSource if err := result.Unmarshal(&stream); err != nil { log.Fatalf("failed to unmarshal the stream value: %s", err) } // initiate the stream subscription events, err := client.Stream(stream) if err != nil { log.Fatalf("failed to subscribe to the stream value: %s", err) } defer func() { _ = events.Close() }() // produce some events while the subscription is open createQuery, _ := fauna.FQL(`StreamingSandbox.create({ foo: 'bar' })`, nil) updateQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.update({ foo: 'baz' }))`, nil) deleteQuery, _ := fauna.FQL(`StreamingSandbox.all().forEach(.delete())`, nil) queries := []*fauna.Query{createQuery, updateQuery, deleteQuery} for _, query := range queries { if _, err := client.Query(query); err != nil { log.Fatalf("failed execute CRUD query: %s", err) } } // fetch the produced events type Data struct { Foo string `fauna:"foo"` } var event fauna.Event expect := 3 for expect > 0 { err := events.Next(&event) if err != nil { log.Fatalf("failed to receive next event: %s", err) } switch event.Type { case fauna.AddEvent, fauna.UpdateEvent, fauna.RemoveEvent: var data Data if err := event.Unmarshal(&data); err != nil { log.Fatalf("failed to unmarshal event data: %s", err) } fmt.Printf("Event: %s Data: %+v\n", event.Type, data) expect-- } } // Output: Event: add Data: {Foo:bar} // Event: update Data: {Foo:baz} // Event: remove Data: {Foo:baz} } ``` ## File: client_query_limits_test.go ```go package fauna_test import ( "os" "sync" "testing" "github.com/fauna/fauna-go/v3" "github.com/stretchr/testify/assert" ) func TestClientRetriesWithQueryLimits(t *testing.T) { t.Run("Query limits succeed on retry", func(t *testing.T) { dbName, dbNameSet := os.LookupEnv("QUERY_LIMITS_DB") collName, collNameSet := os.LookupEnv("QUERY_LIMITS_COLL") // If run in a pipeline, these will be empty strings, so check both if (!dbNameSet || !collNameSet) || (dbName == "" || collName == "") { t.Skip("Skipping query limits test due to missing env var(s)") } if _, found := os.LookupEnv(fauna.EnvFaunaSecret); !found { t.Setenv(fauna.EnvFaunaSecret, "secret") } client, clientErr := fauna.NewDefaultClient() if !assert.NoError(t, clientErr) { return } type secretObj struct { Secret string `fauna:"secret"` } query, _ := fauna.FQL(` if (Database.byName(${dbName}).exists()) { Key.create({ role: "admin", database: ${dbName} }) { secret } } else { abort("Database not found.") }`, map[string]any{"dbName": dbName}) res, queryErr := client.Query(query) if !assert.NoError(t, queryErr) { t.FailNow() } var secret secretObj marshalErr := res.Unmarshal(&secret) if assert.NoError(t, marshalErr) { clients := make([]*fauna.Client, 5) results := make(chan int, len(clients)) var wg sync.WaitGroup wg.Add(len(clients)) for i := range clients { clients[i] = fauna.NewClient(secret.Secret, fauna.DefaultTimeouts(), fauna.URL(os.Getenv(fauna.EnvFaunaEndpoint))) go func(collName string, client *fauna.Client, result chan int) { defer wg.Done() coll, _ := fauna.FQL(collName, nil) q, _ := fauna.FQL(`${coll}.all().paginate(50)`, map[string]any{"coll": coll}) res, err := client.Query(q) if err != nil { result <- -1 } else { result <- res.Stats.Attempts } }(collName, clients[i], results) } go func() { wg.Wait() close(results) }() throttled := false for result := range results { throttled = throttled || result > 1 } assert.True(t, throttled) } }) } ``` ## File: client_test.go ```go package fauna_test import ( "context" "fmt" "math/rand" "net/http" "net/url" "testing" "time" "github.com/fauna/fauna-go/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestDefaultClient(t *testing.T) { t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) t.Setenv(fauna.EnvFaunaSecret, "secret") client, clientErr := fauna.NewDefaultClient() if !assert.NoError(t, clientErr) { return } t.Run("basic requests", func(t *testing.T) { t.Run("String Length Request", func(t *testing.T) { s := "foo" query, _ := fauna.FQL(`${arg0}.length`, map[string]any{"arg0": s}) res, queryErr := client.Query(query) require.NoError(t, queryErr) var i int marshalErr := res.Unmarshal(&i) if assert.NoError(t, marshalErr) { assert.Equal(t, len(s), i) } t.Run("response has expected stats headers", func(t *testing.T) { assert.Greater(t, res.Stats.ComputeOps, 0, "should have some compute ops") assert.GreaterOrEqual(t, res.Stats.QueryTimeMs, 0) assert.Zero(t, res.Stats.ContentionRetries, "should not have retried") assert.Zero(t, res.Stats.ReadOps, "should not have read any bytes") assert.Zero(t, res.Stats.WriteOps, "should not have written any bytes") assert.Zero(t, res.Stats.StorageBytesRead, "should not have read from storage") assert.Zero(t, res.Stats.StorageBytesWrite, "should not have written to storage") assert.Nil(t, res.Stats.ProcessingTimeMs, "should not have processing time ms") }) }) t.Run("Query with options", func(t *testing.T) { q, _ := fauna.FQL(`Math.abs(-5.123e3)`, nil) res, queryErr := client.Query(q, fauna.Timeout(time.Second)) if assert.NoError(t, queryErr) { t.Logf("summary: %s", res.Summary) } }) t.Run("Returns a Schema Version", func(t *testing.T) { collName := "SchemaVersionTestCol" collMod := &fauna.Module{Name: collName} deleteQuery, _ := fauna.FQL(`Collection.byName(${coll})?.delete()`, map[string]any{"coll": collName}) _, deleteErr := client.Query(deleteQuery) if !assert.NoError(t, deleteErr) { t.Logf("failed to cleanup collection: %t", deleteErr) } q, _ := fauna.FQL(`Collection.create({ name: ${name} })`, map[string]any{"name": collName}) res, queryErr := client.Query(q) if !assert.NoError(t, queryErr) { t.FailNow() } expectedSchemaVersion := res.TxnTime q, _ = fauna.FQL(`${mod}.all()`, map[string]any{"mod": collMod}) res, queryErr = client.Query(q) if !assert.NoError(t, queryErr) { t.FailNow() } assert.Equal(t, expectedSchemaVersion, res.SchemaVersion) }) t.Run("Paginate Query", func(t *testing.T) { colName := "PaginationTest" colMod := &fauna.Module{Name: colName} deleteQuery, _ := fauna.FQL(`Collection.byName(${coll})?.delete()`, map[string]any{"coll": colName}) _, deleteErr := client.Query(deleteQuery) if deleteErr != nil { t.Logf("failed to cleanup collection: %t", deleteErr) } q, _ := fauna.FQL(`Collection.create({ name: ${name} })`, map[string]any{"name": colName}) _, createErr := client.Query(q) if !assert.NoError(t, createErr) { t.FailNow() } t.Run("a lot of items", func(t *testing.T) { totalTestItems := 200 // create items for i := 0; i < totalTestItems; i++ { createCollectionQuery, createItemErr := fauna.FQL(`${mod}.create({ value: ${i} })`, map[string]any{ "mod": colMod, "i": i, }) if !assert.NoError(t, createItemErr) { t.FailNow() } res, err := client.Query(createCollectionQuery) if !assert.NoError(t, err) { t.FailNow() } assert.NotZero(t, res.Stats.WriteOps) } // get items query query, queryErr := fauna.FQL(`${mod}.all()`, map[string]any{"mod": colMod}) if !assert.NoError(t, queryErr) { t.FailNow() } // paginate items pages := 0 itemsSeen := 0 paginator := client.Paginate(query) for { page, err := paginator.Next() if !assert.NoError(t, err) || !assert.NotNil(t, page) { t.FailNow() } pages += 1 itemsSeen += len(page.Data) t.Run("can unmarshal pages", func(t *testing.T) { var modItems []struct { Value int `fauna:"value"` } marshalErr := page.Unmarshal(&modItems) assert.NoError(t, marshalErr) assert.NotZero(t, modItems[1].Value) // use the first index to avoid zero }) if !paginator.HasNext() { break } } assert.Equal(t, totalTestItems, itemsSeen) }) t.Run("an incomplete page", func(t *testing.T) { query, queryErr := fauna.FQL(`[1,2,3,4]`, map[string]any{"mod": colMod}) if !assert.NoError(t, queryErr) { t.FailNow() } // try to paginate a query that doesn't have Pages pages := 0 paginator := client.Paginate(query) for { res, err := paginator.Next() if !assert.NoError(t, err) || !assert.NotNil(t, res) { t.FailNow() } pages += 1 if !assert.NotEmpty(t, res.Data) { t.FailNow() } if !paginator.HasNext() { break } } assert.Equal(t, 1, pages) }) }) }) } func TestNewClient(t *testing.T) { t.Run("default client", func(t *testing.T) { t.Setenv(fauna.EnvFaunaSecret, "secret") _, clientErr := fauna.NewDefaultClient() assert.NoError(t, clientErr) }) t.Run("new client respects env var", func(t *testing.T) { t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client := fauna.NewClient("secret", fauna.DefaultTimeouts()) assert.NotNil(t, client) assert.Equal(t, client.String(), fauna.EndpointLocal) }) t.Run("stringify", func(t *testing.T) { client := fauna.NewClient("secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal)) assert.Equal(t, client.String(), fauna.EndpointLocal, "client toString should be equal to the endpoint to ensure we don't expose secrets") }) t.Run("missing secret", func(t *testing.T) { _, clientErr := fauna.NewDefaultClient() assert.Error(t, clientErr, "should have failed due to missing secret") }) t.Run("has transaction time", func(t *testing.T) { t.Setenv(fauna.EnvFaunaSecret, "secret") t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, clientErr := fauna.NewDefaultClient() if !assert.NoError(t, clientErr) { return } before := client.GetLastTxnTime() if !assert.Zero(t, before) { return } q, _ := fauna.FQL(`Math.abs(-5.123e3)`, nil) if _, queryErr := client.Query(q); !assert.NoError(t, queryErr) { return } first := client.GetLastTxnTime() assert.NotZero(t, first) second := client.GetLastTxnTime() assert.Equal(t, first, second) assert.NotEqual(t, before, second) }) t.Run("custom HTTP client", func(t *testing.T) { client := fauna.NewClient( "secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal), fauna.HTTPClient(http.DefaultClient), ) q, _ := fauna.FQL(`Math.abs(-5.123e3)`, nil) _, queryErr := client.Query(q) assert.NoError(t, queryErr) }) } func TestBasicCRUDRequests(t *testing.T) { t.Setenv(fauna.EnvFaunaSecret, "secret") t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, err := fauna.NewDefaultClient() if !assert.NoError(t, err) { return } coll := fmt.Sprintf("Person_%v", randomString(12)) collMod := &fauna.Module{Name: coll} t.Run("Create a collection", func(t *testing.T) { q, _ := fauna.FQL(`Collection.create({ name: ${name} })`, map[string]any{"name": coll}) _, queryErr := client.Query(q) assert.NoError(t, queryErr) }) n := "John Smith" p := &Person{ Name: n, Address: "123 Range Road Houston, TX 77056", } t.Run("Create a Person", func(t *testing.T) { q, _ := fauna.FQL(`${coll}.create(${person})`, map[string]any{"coll": collMod, "person": p}) _, queryErr := client.Query(q) assert.NoError(t, queryErr) }) t.Run("Query a Person", func(t *testing.T) { q, _ := fauna.FQL(`${coll}.all().firstWhere(.name == ${name})`, map[string]any{"coll": collMod, "name": n}) res, queryErr := client.Query(q) if !assert.NoError(t, queryErr) { return } var result Person err := res.Unmarshal(&result) assert.NoError(t, err) assert.Equal(t, p, &result) }) t.Run("Update a Person", func(t *testing.T) { addr := "321 Rainy St Seattle, WA 98011" q, _ := fauna.FQL( `${coll}.all().firstWhere(.name == ${name}).update({address: ${addr}})`, map[string]any{"coll": collMod, "name": n, "addr": addr}) res, queryErr := client.Query(q) if !assert.NoError(t, queryErr) { return } var result Person err := res.Unmarshal(&result) assert.NoError(t, err) assert.Equal(t, Person{n, addr}, result) }) t.Run("Delete a Person", func(t *testing.T) { q, _ := fauna.FQL(`${coll}.all().firstWhere(.name == ${name}).delete()`, map[string]any{"coll": collMod, "name": p.Name}) _, queryErr := client.Query(q) assert.NoError(t, queryErr) }) t.Run("Delete a Collection", func(t *testing.T) { q, _ := fauna.FQL(`Collection.byName(${coll}).delete()`, map[string]any{"coll": coll}) _, queryErr := client.Query(q) assert.NoError(t, queryErr) }) } func TestHeaders(t *testing.T) { var ( currentHeader string expectedValue string ) // use a test client to validate the headers are being set as expected below testingClient := &http.Client{Transport: &http.Transport{ Proxy: func(request *http.Request) (*url.URL, error) { assert.Equal(t, expectedValue, request.Header.Get(currentHeader)) return request.URL, nil }, }} t.Run("can set headers directly", func(t *testing.T) { type args struct { header string headerOpt fauna.ClientConfigFn } tests := []struct { name string args args want string expectError bool }{ { name: "linearized should be true", args: args{ headerOpt: fauna.Linearized(true), header: fauna.HeaderLinearized, }, want: "true", }, { name: "timeout should be 1m", args: args{ header: fauna.HeaderQueryTimeoutMs, headerOpt: fauna.QueryTimeout(time.Minute), }, want: fmt.Sprintf("%d", time.Minute.Milliseconds()), }, { name: "max contention retries should be 1", args: args{ header: fauna.HeaderMaxContentionRetries, headerOpt: fauna.MaxContentionRetries(1), }, want: "1", }, { name: "should have tags", args: args{ header: fauna.HeaderTags, headerOpt: fauna.QueryTags(map[string]string{ "hello": "world", "what": "are=you,doing?", }), }, want: "hello=world,what=are%3Dyou%2Cdoing%3F", expectError: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { currentHeader = tt.args.header expectedValue = tt.want client := fauna.NewClient( "secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal), fauna.HTTPClient(testingClient), tt.args.headerOpt, ) // running a simple query just to invoke the request q, _ := fauna.FQL(`Math.abs(-5.123e3)`, nil) _, queryErr := client.Query(q) if !tt.expectError { assert.NoError(t, queryErr) } else { assert.Error(t, queryErr) } }) } }) t.Run("can set headers on Query", func(t *testing.T) { client := fauna.NewClient( "secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal), fauna.HTTPClient(testingClient), fauna.QueryTags(map[string]string{ "team": "X_Men", "hero": "Cyclops", }), ) currentHeader = fauna.HeaderTags expectedValue = "hero=Wolverine,team=X_Men" q, _ := fauna.FQL(`Math.abs(-5.123e3)`, nil) _, queryErr := client.Query(q, fauna.Tags(map[string]string{"hero": "Wolverine"})) assert.NoError(t, queryErr) // assertion in testingClient above currentHeader = fauna.HeaderTraceparent expectedValue = "query-traceparent-id" q, _ = fauna.FQL(`Math.abs(-5.123e3)`, nil) _, queryErr = client.Query(q, fauna.Traceparent(expectedValue)) assert.NoError(t, queryErr) }) t.Run("can use convenience methods", func(t *testing.T) { currentHeader = fauna.HeaderLinearized expectedValue = "true" client := fauna.NewClient( "secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal), fauna.HTTPClient(testingClient), fauna.Linearized(true), fauna.QueryTimeout(time.Second*3), fauna.MaxContentionRetries(5), fauna.Context(context.Background()), fauna.AdditionalHeaders(map[string]string{ "foobar": "steve", currentHeader: expectedValue, }), ) assert.NotNil(t, client) }) t.Run("supports empty headers", func(t *testing.T) { client := fauna.NewClient( "secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal), fauna.AdditionalHeaders(map[string]string{ "shouldBeEmpty": "", }), ) assert.NotNil(t, client) }) } func TestQueryTags(t *testing.T) { t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) t.Setenv(fauna.EnvFaunaSecret, "secret") client, clientErr := fauna.NewDefaultClient() if !assert.NoError(t, clientErr) { return } tags := map[string]string{ "hello": "world", "what": "areyoudoing", } // running a simple query just to invoke the request q, _ := fauna.FQL(`Math.abs(-5.123e3)`, nil) res, queryErr := client.Query(q, fauna.Tags(tags)) if assert.NoError(t, queryErr) { assert.Equal(t, tags, res.QueryTags) } } func TestErrorHandling(t *testing.T) { t.Run("authorization error", func(t *testing.T) { t.Setenv(fauna.EnvFaunaSecret, "I'm a little teapot") t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, clientErr := fauna.NewDefaultClient() if assert.NoError(t, clientErr) { return } q, _ := fauna.FQL(`Math.abs(-5.123e3)`, nil) _, queryErr := client.Query(q) if assert.NoError(t, queryErr) { return } var expectedErr *fauna.ErrAuthentication assert.ErrorAs(t, queryErr, &expectedErr) }) t.Run("invalid query", func(t *testing.T) { t.Setenv(fauna.EnvFaunaSecret, "secret") t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, clientErr := fauna.NewDefaultClient() if assert.NoError(t, clientErr) { return } q, _ := fauna.FQL(`SillyPants`, nil) _, queryErr := client.Query(q) if assert.Error(t, queryErr) { var expectedErr *fauna.ErrQueryRuntime assert.ErrorAs(t, queryErr, &expectedErr) } }) t.Run("service error", func(t *testing.T) { t.Setenv(fauna.EnvFaunaSecret, "secret") t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) client, clientErr := fauna.NewDefaultClient() if !assert.NoError(t, clientErr) { t.FailNow() } testCollection := "testing" q, _ := fauna.FQL(`Collection.create({ name: ${arg1} })`, map[string]any{"arg1": testCollection}) _, queryErr := client.Query(q) if !assert.NoError(t, queryErr) { return } q, _ = fauna.FQL(`Collection.create({ name: ${arg1} })`, map[string]any{"arg1": testCollection}) if _, queryErr := client.Query(q); assert.Error(t, queryErr) { var expectedErr *fauna.ErrQueryRuntime assert.ErrorAs(t, queryErr, &expectedErr) } else { return } t.Run("returns a NullDoc", func(t *testing.T) { nullDocQuery, nullDocQueryErr := fauna.FQL(`${coll}.byId('123')`, map[string]any{"coll": &fauna.Module{Name: testCollection}}) if !assert.NoError(t, nullDocQueryErr) { t.FailNow() } res, err := client.Query(nullDocQuery) if !assert.NoError(t, err) { t.FailNow() } assert.IsType(t, &fauna.NullDocument{}, res.Data) }) q, _ = fauna.FQL(`Collection.byName(${arg1}).delete()`, map[string]any{"arg1": testCollection}) _, queryErr = client.Query(q) assert.NoError(t, queryErr) }) } type Person struct { Name string `fauna:"name"` Address string `fauna:"address"` } func randomString(n int) string { var letters = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ") s := make([]rune, n) for i := range s { s[i] = letters[rand.Intn(len(letters))] } return string(s) } ``` ## File: client.go ```go // Package fauna provides a driver for Fauna v10 package fauna import ( "bytes" "context" _ "embed" "fmt" "io" "math" "math/rand" "net" "net/http" "net/url" "os" "strings" "time" "github.com/fauna/fauna-go/v3/internal/fingerprinting" ) //go:embed version var driverVersion string const ( // EndpointDefault constant for Fauna Production endpoint EndpointDefault = "https://db.fauna.com" // EndpointLocal constant for local (Docker) endpoint EndpointLocal = "http://localhost:8443" // EnvFaunaEndpoint environment variable for Fauna Client HTTP endpoint EnvFaunaEndpoint = "FAUNA_ENDPOINT" // EnvFaunaSecret environment variable for Fauna Client authentication EnvFaunaSecret = "FAUNA_SECRET" // EnvFaunaDebug environment variable for Fauna Client logging EnvFaunaDebug = "FAUNA_DEBUG" // Headers consumers might want to use HeaderLastTxnTs = "X-Last-Txn-Ts" HeaderLinearized = "X-Linearized" HeaderMaxContentionRetries = "X-Max-Contention-Retries" HeaderTags = "X-Query-Tags" HeaderQueryTimeoutMs = "X-Query-Timeout-Ms" HeaderTraceparent = "Traceparent" HeaderTypecheck = "X-Typecheck" // Headers just used internally headerAuthorization = "Authorization" headerContentType = "Content-Type" headerDriver = "X-Driver" headerDriverEnv = "X-Driver-Env" headerFormat = "X-Format" retryMaxAttemptsDefault = 3 retryMaxBackoffDefault = time.Second * 20 ) // Client is the Fauna Client. type Client struct { url string secret string headers map[string]string lastTxnTime txnTime typeCheckingEnabled bool http *http.Client ctx context.Context maxAttempts int maxBackoff time.Duration // lazily cached URLs queryURL, streamURL, feedURL *url.URL logger Logger } // NewDefaultClient initialize a [fauna.Client] with recommended default settings func NewDefaultClient() (*Client, error) { var secret string if val, found := os.LookupEnv(EnvFaunaSecret); !found { return nil, fmt.Errorf("unable to load key from environment variable '%s'", EnvFaunaSecret) } else { secret = val } endpointURL, urlFound := os.LookupEnv(EnvFaunaEndpoint) if !urlFound { endpointURL = EndpointDefault } return NewClient( secret, DefaultTimeouts(), URL(endpointURL), ), nil } type Timeouts struct { // The timeout of each query. This controls the maximum amount of time Fauna will // execute your query before marking it failed. QueryTimeout time.Duration // Time beyond `QueryTimeout` at which the client will abort a request if it has not received a response. // The default is 5s, which should account for network latency for most clients. The value must be greater // than zero. The closer to zero the value is, the more likely the client is to abort the request before the // server can report a legitimate response or error. ClientBufferTimeout time.Duration // ConnectionTimeout amount of time to wait for the connection to complete. ConnectionTimeout time.Duration // IdleConnectionTimeout is the maximum amount of time an idle (keep-alive) connection will // remain idle before closing itself. IdleConnectionTimeout time.Duration } // DefaultTimeouts suggested timeouts for the default [fauna.Client] func DefaultTimeouts() Timeouts { return Timeouts{ QueryTimeout: time.Second * 5, ClientBufferTimeout: time.Second * 5, ConnectionTimeout: time.Second * 5, IdleConnectionTimeout: time.Second * 5, } } // NewClient initialize a new [fauna.Client] with custom settings func NewClient(secret string, timeouts Timeouts, configFns ...ClientConfigFn) *Client { dialer := net.Dialer{ Timeout: timeouts.ConnectionTimeout, } // NOTE: prefer a response header timeout instead of a client timeout so // that the client don't stop reading a http body that was produced by // Fauna. On the query interface, an HTTP body is sent as a single http // message. On the streaming interface, HTTP chunks are sent on every event. // Therefore, it's in the driver's best interest to continue reading the // HTTP body once the headers appear. httpClient := &http.Client{ Transport: &http.Transport{ Proxy: http.ProxyFromEnvironment, DialContext: dialer.DialContext, ForceAttemptHTTP2: true, MaxIdleConns: 20, IdleConnTimeout: timeouts.IdleConnectionTimeout, ResponseHeaderTimeout: timeouts.QueryTimeout + timeouts.ClientBufferTimeout, }, } defaultHeaders := map[string]string{ headerContentType: "application/json; charset=utf-8", headerDriver: "go", headerDriverEnv: fmt.Sprintf( "driver=go-%s; runtime=%s env=%s; os=%s", strings.TrimSpace(driverVersion), fingerprinting.Version(), fingerprinting.Environment(), fingerprinting.EnvironmentOS(), ), headerFormat: "tagged", } if timeouts.QueryTimeout > 0 { defaultHeaders[HeaderQueryTimeoutMs] = fmt.Sprintf("%v", timeouts.QueryTimeout.Milliseconds()) } endpointURL, urlFound := os.LookupEnv(EnvFaunaEndpoint) if !urlFound { endpointURL = EndpointDefault } client := &Client{ ctx: context.TODO(), secret: secret, http: httpClient, url: endpointURL, headers: defaultHeaders, lastTxnTime: txnTime{}, typeCheckingEnabled: false, maxAttempts: retryMaxAttemptsDefault, maxBackoff: retryMaxBackoffDefault, logger: DefaultLogger(), } // set options to override defaults for _, configFn := range configFns { configFn(client) } return client } func (c *Client) parseQueryURL() (*url.URL, error) { if c.queryURL == nil { if queryURL, err := url.Parse(c.url); err != nil { return nil, err } else { c.queryURL = queryURL.JoinPath("query", "1") } } return c.queryURL, nil } func (c *Client) parseStreamURL() (*url.URL, error) { if c.streamURL == nil { if streamURL, err := url.Parse(c.url); err != nil { return nil, err } else { c.streamURL = streamURL.JoinPath("stream", "1") } } return c.streamURL, nil } func (c *Client) parseFeedURL() (*url.URL, error) { if c.feedURL == nil { if feedURL, err := url.Parse(c.url); err != nil { return nil, err } else { c.feedURL = feedURL.JoinPath("feed", "1") } } return c.feedURL, nil } func (c *Client) doWithRetry(req *http.Request) (attempts int, r *http.Response, err error) { req2 := req.Clone(req.Context()) body, rerr := io.ReadAll(req.Body) if rerr != nil { return attempts, r, rerr } cerr := req.Body.Close() if cerr != nil { return attempts, r, cerr } for { shouldRetry := false // Ensure we have a fresh body for the request req2.Body = io.NopCloser(bytes.NewReader(body)) r, err = c.http.Do(req2) c.logger.LogResponse(c.ctx, body, r) attempts++ if err != nil { return } if r != nil { switch r.StatusCode { case http.StatusTooManyRequests: shouldRetry = true } } if attempts >= c.maxAttempts || !shouldRetry { return attempts, r, err } // We're going to retry, so drain the response if r != nil { err = c.drainResponse(r.Body) if err != nil { return } } timer := time.NewTimer(c.backoff(attempts)) select { case <-req.Context().Done(): timer.Stop() return attempts, r, req.Context().Err() case <-timer.C: } } } func (c *Client) drainResponse(body io.ReadCloser) (err error) { defer func() { _ = body.Close() }() _, err = io.Copy(io.Discard, io.LimitReader(body, 4096)) return } func (c *Client) backoff(attempt int) (sleep time.Duration) { jitter := rand.Float64() mult := math.Pow(2, float64(attempt)) * jitter sleep = time.Duration(mult) * time.Second if sleep > c.maxBackoff { sleep = c.maxBackoff } return } // Query invoke fql optionally set multiple [QueryOptFn] func (c *Client) Query(fql *Query, opts ...QueryOptFn) (*QuerySuccess, error) { req := &queryRequest{ apiRequest: apiRequest{ Context: c.ctx, Headers: c.headers, }, Query: fql, } for _, queryOptionFn := range opts { queryOptionFn(req) } return req.do(c) } // Paginate invoke fql with pagination optionally set multiple [QueryOptFn] func (c *Client) Paginate(fql *Query, opts ...QueryOptFn) *QueryIterator { return &QueryIterator{ client: c, fql: fql, opts: opts, } } // StreamFromQuery initiates a stream subscription for the [fauna.Query]. // // This is a syntax sugar for [fauna.Client.Query] and [fauna.Client.Subscribe]. // // Note that the query provided MUST return [fauna.EventSource] value. Otherwise, // this method returns an error. func (c *Client) StreamFromQuery(fql *Query, streamOpts []StreamOptFn, opts ...QueryOptFn) (*EventStream, error) { res, err := c.Query(fql, opts...) if err != nil { return nil, err } if stream, ok := res.Data.(EventSource); ok { return c.Stream(stream, streamOpts...) } return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) } // Stream initiates a stream subscription for the given stream value. func (c *Client) Stream(stream EventSource, opts ...StreamOptFn) (*EventStream, error) { return subscribe(c, stream, opts...) } // QueryIterator is a [fauna.Client] iterator for paginated queries type QueryIterator struct { client *Client fql *Query opts []QueryOptFn } // Next returns the next page of results func (q *QueryIterator) Next() (*Page, error) { res, queryErr := q.client.Query(q.fql, q.opts...) if queryErr != nil { return nil, queryErr } if page, ok := res.Data.(*Page); ok { // First page if pageErr := q.nextPage(page.After); pageErr != nil { return nil, pageErr } return page, nil } var page Page if results, isPage := res.Data.(map[string]any); isPage { if after, hasAfter := results["after"].(string); hasAfter { page = Page{After: after, Data: results["data"].([]any)} } else { page = Page{After: "", Data: results["data"].([]any)} } } else { page = Page{After: "", Data: []any{res.Data}} } if pageErr := q.nextPage(page.After); pageErr != nil { return nil, pageErr } return &page, nil } func (q *QueryIterator) nextPage(after string) error { if after == "" { q.fql = nil return nil } var fqlErr error q.fql, fqlErr = FQL(`Set.paginate(${after})`, map[string]any{"after": after}) return fqlErr } // HasNext returns whether there is another page of results func (q *QueryIterator) HasNext() bool { return q.fql != nil } // SetLastTxnTime update the last txn time for the [fauna.Client] // This has no effect if earlier than stored timestamp. // // WARNING: This should be used only when coordinating timestamps across multiple clients. // Moving the timestamp arbitrarily forward into the future will cause transactions to stall. func (c *Client) SetLastTxnTime(txnTime time.Time) { c.lastTxnTime.sync(txnTime.UnixMicro()) } // GetLastTxnTime gets the last txn timestamp seen by the [fauna.Client] func (c *Client) GetLastTxnTime() int64 { return c.lastTxnTime.get() } // String fulfil Stringify interface for the [fauna.Client] // only returns the URL to prevent logging potentially sensitive headers. func (c *Client) String() string { return c.url } func (c *Client) setHeader(key, val string) { c.headers[key] = val } // Feed opens an event feed from the event source func (c *Client) Feed(stream EventSource, opts ...FeedOptFn) (*EventFeed, error) { feedOpts, err := parseFeedOptions(opts...) if err != nil { return nil, err } return newEventFeed(c, stream, feedOpts) } // FeedFromQuery opens an event feed from a query func (c *Client) FeedFromQuery(query *Query, opts ...FeedOptFn) (*EventFeed, error) { feedOpts, err := parseFeedOptions(opts...) if err != nil { return nil, err } res, err := c.Query(query) if err != nil { return nil, err } eventSource, ok := res.Data.(EventSource) if !ok { return nil, fmt.Errorf("query should return a fauna.EventSource but got %T", res.Data) } return newEventFeed(c, eventSource, feedOpts) } func parseFeedOptions(opts ...FeedOptFn) (*feedOptions, error) { feedOpts := feedOptions{} for _, optFn := range opts { optFn(&feedOpts) } if feedOpts.StartTS != nil && feedOpts.Cursor != nil { return nil, fmt.Errorf("cannot use EventFeedStartTime and EventFeedCursor together") } return &feedOpts, nil } ``` ## File: config.go ```go package fauna import ( "context" "fmt" "net/http" "net/url" "strings" "time" ) // ClientConfigFn configuration options for the [fauna.Client] type ClientConfigFn func(*Client) // Context specify the context to be used for the [fauna.Client] func Context(ctx context.Context) ClientConfigFn { return func(c *Client) { c.ctx = ctx } } // HTTPClient set the http.Client for the [fauna.Client] func HTTPClient(client *http.Client) ClientConfigFn { return func(c *Client) { c.http = client } } // AdditionalHeaders specify headers for the [fauna.Client] func AdditionalHeaders(headers map[string]string) ClientConfigFn { return func(c *Client) { for k, v := range headers { c.headers[k] = v } } } // MaxAttempts sets the maximum number of times the [fauna.Client] // will attempt to run a query, retrying if appropriate. func MaxAttempts(attempts int) ClientConfigFn { return func(c *Client) { c.maxAttempts = attempts } } // MaxBackoff sets the maximum duration the [fauna.Client] will wait // before retrying. func MaxBackoff(backoff time.Duration) ClientConfigFn { return func(c *Client) { c.maxBackoff = backoff } } // DefaultTypecheck set header on the [fauna.Client] // Enable or disable typechecking of the query before evaluation. If // not set, Fauna will use the value of the "typechecked" flag on // the database configuration. func DefaultTypecheck(enabled bool) ClientConfigFn { return func(c *Client) { c.setHeader(HeaderTypecheck, fmt.Sprintf("%v", enabled)) } } // Linearized set header on the [fauna.Client] // If true, unconditionally run the query as strictly serialized. // This affects read-only transactions. Transactions which write will always be strictly serialized. func Linearized(enabled bool) ClientConfigFn { return func(c *Client) { c.setHeader(HeaderLinearized, fmt.Sprintf("%v", enabled)) } } // MaxContentionRetries set header on the [fauna.Client] // The max number of times to retry the query if contention is encountered. func MaxContentionRetries(i int) ClientConfigFn { return func(c *Client) { c.setHeader(HeaderMaxContentionRetries, fmt.Sprintf("%v", i)) } } // QueryTimeout set header on the [fauna.Client] func QueryTimeout(d time.Duration) ClientConfigFn { return func(c *Client) { c.setHeader(HeaderQueryTimeoutMs, fmt.Sprintf("%v", d.Milliseconds())) } } // QueryTags sets header on the [fauna.Client] // Set tags to associate with the query. See [logging] // // [logging]: https://docs.fauna.com/fauna/current/build/logs/query_log/ func QueryTags(tags map[string]string) ClientConfigFn { return func(c *Client) { c.setHeader(HeaderTags, argsStringFromMap(tags)) } } // URL set the [fauna.Client] URL func URL(url string) ClientConfigFn { return func(c *Client) { c.url = url } } // WithLogger set the [fauna.Client] Logger func WithLogger(logger Logger) ClientConfigFn { return func(c *Client) { c.logger = logger } } // QueryOptFn function to set options on the [Client.Query] type QueryOptFn func(req *queryRequest) // QueryContext set the [context.Context] for a single [Client.Query] func QueryContext(ctx context.Context) QueryOptFn { return func(req *queryRequest) { req.Context = ctx } } // Tags set the tags header on a single [Client.Query] func Tags(tags map[string]string) QueryOptFn { return func(req *queryRequest) { if val, exists := req.Headers[HeaderTags]; exists { req.Headers[HeaderTags] = argsStringFromMap(tags, strings.Split(val, ",")...) } else { req.Headers[HeaderTags] = argsStringFromMap(tags) } } } // Traceparent sets the header on a single [Client.Query] func Traceparent(id string) QueryOptFn { return func(req *queryRequest) { req.Headers[HeaderTraceparent] = id } } // Timeout set the query timeout on a single [Client.Query] func Timeout(dur time.Duration) QueryOptFn { return func(req *queryRequest) { req.Headers[HeaderQueryTimeoutMs] = fmt.Sprintf("%d", dur.Milliseconds()) } } // Typecheck sets the header on a single [Client.Query] func Typecheck(enabled bool) QueryOptFn { return func(req *queryRequest) { req.Headers[HeaderTypecheck] = fmt.Sprintf("%v", enabled) } } // StreamOptFn function to set options on the [Client.Stream] type StreamOptFn func(req *streamRequest) // StreamStartTime set the streams starting timestamp. // // Useful when resuming a stream at a given point in time. func StreamStartTime(ts time.Time) StreamOptFn { return func(req *streamRequest) { req.StartTS = ts.UnixMicro() } } // StreamStartTimeUnixMicros set the stream starting timestamp. // // Useful when resuming a stream at a given point in time. func StreamStartTimeUnixMicros(ts int64) StreamOptFn { return func(req *streamRequest) { req.StartTS = ts } } // EventCursor set the stream starting point based on a previously received // event cursor. // // Useful when resuming a stream after a failure. func EventCursor(cursor string) StreamOptFn { return func(req *streamRequest) { req.Cursor = cursor } } func argsStringFromMap(input map[string]string, currentArgs ...string) string { params := url.Values{} for _, c := range currentArgs { s := strings.Split(c, "=") params.Set(s[0], s[1]) } for k, v := range input { params.Set(k, v) } return strings.ReplaceAll(params.Encode(), "&", ",") } // FeedOptFn function to set options on the [fauna.EventFeed] type FeedOptFn func(req *feedOptions) // EventFeedCursor set the cursor for the [fauna.EventFeed] // cannot be used with [EventFeedStartTime] or in [fauna.Client.FeedFromQuery] func EventFeedCursor(cursor string) FeedOptFn { return func(req *feedOptions) { req.Cursor = &cursor } } // EventFeedStartTime set the start time for the [fauna.EventFeed] // cannot be used with [EventFeedCursor] func EventFeedStartTime(ts time.Time) FeedOptFn { return func(req *feedOptions) { asMicro := ts.UnixMicro() req.StartTS = &asMicro } } // EventFeedStartTimeUnixMicros set the start time for the [fauna.EventFeed] // cannot be used with [EventFeedCursor] func EventFeedStartTimeUnixMicros(ts int64) FeedOptFn { return func(req *feedOptions) { req.StartTS = &ts } } // EventFeedPageSize set the page size for the [fauna.EventFeed] // The page size is the maximum number of events returned per page. // Must be in the range 1 to 16000 (inclusive). // Defaults to 16. func EventFeedPageSize(pageSize int) FeedOptFn { return func(req *feedOptions) { req.PageSize = &pageSize } } ``` ## File: error_test.go ```go package fauna import ( "net/http" "testing" "time" "github.com/stretchr/testify/assert" ) func TestGetErrFauna(t *testing.T) { type args struct { httpStatus int serviceError *ErrFauna errType error } tests := []struct { name string args args wantErr bool }{ { name: "No error", args: args{ httpStatus: 200, serviceError: nil, errType: nil, }, wantErr: false, }, { name: "Query check error", args: args{ httpStatus: http.StatusBadRequest, serviceError: &ErrFauna{Code: "invalid_query", Message: ""}, errType: &ErrQueryCheck{}, }, wantErr: true, }, { name: "Query runtime error", args: args{ httpStatus: http.StatusBadRequest, serviceError: &ErrFauna{Code: "invalid_argument", Message: ""}, errType: &ErrQueryRuntime{}, }, wantErr: true, }, { name: "Invalid request error", args: args{ httpStatus: http.StatusBadRequest, serviceError: &ErrFauna{Code: "invalid_request", Message: ""}, errType: &ErrInvalidRequest{}, }, wantErr: true, }, { name: "Abort error", args: args{ httpStatus: http.StatusBadRequest, serviceError: &ErrFauna{Code: "abort", Message: "", Abort: `{"@int":"1234"}`}, errType: &ErrAbort{}, }, wantErr: true, }, { name: "Unauthorized", args: args{ httpStatus: http.StatusUnauthorized, serviceError: &ErrFauna{Code: "", Message: ""}, errType: &ErrAuthentication{}, }, wantErr: true, }, { name: "Access not granted", args: args{ httpStatus: http.StatusForbidden, serviceError: &ErrFauna{Code: "", Message: ""}, errType: &ErrAuthorization{}, }, wantErr: true, }, { name: "Feature not enabled", args: args{ httpStatus: http.StatusGone, serviceError: &ErrFauna{Code: "", Message: ""}, errType: &ErrAuthorization{}, }, wantErr: true, }, { name: "Too many requests", args: args{ httpStatus: http.StatusTooManyRequests, serviceError: &ErrFauna{Code: "", Message: ""}, errType: &ErrThrottling{}, }, wantErr: true, }, { name: "Query timeout", args: args{ httpStatus: 440, serviceError: &ErrFauna{Code: "", Message: ""}, errType: &ErrQueryTimeout{}, }, wantErr: true, }, { name: "Internal error", args: args{ httpStatus: http.StatusInternalServerError, serviceError: &ErrFauna{Code: "", Message: ""}, errType: &ErrServiceInternal{}, }, wantErr: true, }, { name: "Service timeout", args: args{ httpStatus: http.StatusServiceUnavailable, serviceError: &ErrFauna{Code: "", Message: ""}, errType: &ErrServiceTimeout{}, }, wantErr: true, }, { name: "Contended transaction", args: args{ httpStatus: http.StatusConflict, serviceError: &ErrFauna{Code: "contended_transaction", Message: "Transaction was aborted due to detection of concurrent modification."}, errType: &ErrContendedTransaction{}, }, wantErr: true, }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { res := &queryResponse{Error: tt.args.serviceError, Summary: ""} err := getErrFauna(tt.args.httpStatus, res, 1) if tt.wantErr { assert.ErrorAs(t, err, &tt.args.errType) assert.NotZero(t, res.Error.StatusCode) } else { assert.NoError(t, err) } }) } } func TestErrAbort(t *testing.T) { t.Setenv(EnvFaunaEndpoint, EndpointLocal) t.Setenv(EnvFaunaSecret, "secret") client, clientErr := NewDefaultClient() if !assert.NoError(t, clientErr) { return } t.Run("abort field can have value", func(t *testing.T) { query, _ := FQL(`abort("foo")`, nil) _, qErr := client.Query(query) var expectedErr *ErrAbort if assert.ErrorAs(t, qErr, &expectedErr) { assert.Equal(t, "abort", expectedErr.Code) assert.Equal(t, "foo", expectedErr.Abort) } }) t.Run("ErrAbort handles object and allows unmarshalling", func(t *testing.T) { query, _ := FQL(`abort({ msg: "abrasive message", aborted_at: Time("2023-02-28T18:10:10.00001Z")})`, nil) _, qErr := client.Query(query) type CustomAbort struct { Message string `fauna:"msg"` AbortedAt time.Time `fauna:"aborted_at"` } var customAbort CustomAbort var expectedErr *ErrAbort if assert.ErrorAs(t, qErr, &expectedErr) { assert.Equal(t, "abort", expectedErr.Code) err := expectedErr.Unmarshal(&customAbort) assert.NoError(t, err) assert.Equal(t, customAbort.AbortedAt, time.Date(2023, 02, 28, 18, 10, 10, 10000, time.UTC)) assert.Equal(t, customAbort.Message, "abrasive message") } }) } func TestErrConstraint(t *testing.T) { t.Setenv(EnvFaunaEndpoint, EndpointLocal) t.Setenv(EnvFaunaSecret, "secret") client, clientErr := NewDefaultClient() if !assert.NoError(t, clientErr) { return } t.Run("constraint failures get decoded", func(t *testing.T) { retried := false query, queryErr := FQL(`Function.create({"name": "double", "body": "x => x * 2"})`, nil) if !assert.NoError(t, queryErr) { t.FailNow() } CreateFunction: _, qErr := client.Query(query) if qErr == nil { if !retried { // now we try to create the function again retried = true goto CreateFunction } // if we retried already and got another error, fail t.FailNow() } var expectedErr *ErrQueryRuntime if assert.ErrorAs(t, qErr, &expectedErr) { assert.Len(t, expectedErr.ConstraintFailures, 1) assert.NotEmpty(t, expectedErr.ConstraintFailures[0].Message) } }) } ``` ## File: error.go ```go package fauna import ( "net/http" ) const httpStatusQueryTimeout = 440 // An ErrFauna is the base of all errors and provides the underlying `code`, // `message`, and any [fauna.QueryInfo]. type ErrFauna struct { *QueryInfo StatusCode int `json:"-"` Code string `json:"code"` Message string `json:"message"` Abort any `json:"abort"` ConstraintFailures []ErrConstraintFailure `json:"constraint_failures"` } type ErrConstraintFailure struct { Message string `json:"message"` Name string `json:"name,omitempty"` Paths []any `json:"paths,omitempty"` } // Error provides the underlying error message. func (e ErrFauna) Error() string { return e.Message } // An ErrAbort is returned when the `abort()` function was called, which will // return custom abort data in the error response. type ErrAbort struct { *ErrFauna } // Unmarshal decodes the Abort property into the provided object. func (e *ErrAbort) Unmarshal(into any) error { return decodeInto(e.Abort, into) } // An ErrAuthentication is returned when Fauna is unable to authenticate // the request due to an invalid or missing authentication token. type ErrAuthentication struct { *ErrFauna } // An ErrAuthorization is returned when a query attempts to access data the // secret is not allowed to access. type ErrAuthorization struct { *ErrFauna } // ErrContendedTransaction is returned when a transaction is aborted due // to concurrent modification. type ErrContendedTransaction struct { *ErrFauna } // An ErrInvalidRequest is returned when the request body is not valid JSON, or // does not conform to the API specification type ErrInvalidRequest struct { *ErrFauna } // An ErrNetwork is returned when an unknown error is encountered when attempting // to send a request to Fauna. type ErrNetwork error // An ErrQueryCheck is returned when the query fails one or more validation checks. type ErrQueryCheck struct { *ErrFauna } // An ErrQueryRuntime is returned when the query fails due to a runtime error. // The `code` field will vary based on the specific error cause. type ErrQueryRuntime struct { *ErrFauna } // An ErrQueryTimeout is returned when the client specified timeout was // exceeded, but the timeout was set lower than the query's expected // processing time. This response is distinguished from [fauna.ServiceTimeoutError] // by the fact that a [fauna.QueryTimeoutError] response is considered a // successful response for the purpose of determining the service's availability. type ErrQueryTimeout struct { *ErrFauna } // An ErrServiceInternal is returned when an unexpected error occurs. type ErrServiceInternal struct { *ErrFauna } // An ErrServiceTimeout is returned when an unexpected timeout occurs. type ErrServiceTimeout struct { *ErrFauna } // An ErrThrottling is returned when the query exceeded some capacity limit. type ErrThrottling struct { *ErrFauna } func getErrFauna(httpStatus int, res *queryResponse, attempts int) error { if res.Error != nil { res.Error.QueryInfo = newQueryInfo(res) if res.Error.QueryInfo.Stats != nil { res.Error.QueryInfo.Stats.Attempts = attempts } res.Error.StatusCode = httpStatus } switch httpStatus { case http.StatusBadRequest: if res.Error == nil { err := &ErrQueryRuntime{&ErrFauna{ QueryInfo: newQueryInfo(res), Code: "", Message: "", StatusCode: httpStatus, }} err.Message += "\n" + res.Summary return err } switch res.Error.Code { case "invalid_query": err := &ErrQueryCheck{res.Error} err.Message += "\n" + res.Summary return err case "invalid_argument", "constraint_failure": err := &ErrQueryRuntime{res.Error} err.Message += "\n" + res.Summary return err case "abort": err := &ErrAbort{res.Error} abort, cErr := convert(false, res.Error.Abort) if cErr != nil { return cErr } err.Abort = abort err.Message += "\n" + res.Summary return err default: err := &ErrInvalidRequest{res.Error} err.Message += "\n" + res.Summary return err } case http.StatusConflict: return &ErrContendedTransaction{res.Error} case http.StatusUnauthorized: return &ErrAuthentication{res.Error} case http.StatusForbidden: return &ErrAuthorization{res.Error} case http.StatusGone: return &ErrAuthorization{res.Error} case http.StatusTooManyRequests: return &ErrThrottling{res.Error} case httpStatusQueryTimeout: return &ErrQueryTimeout{res.Error} case http.StatusInternalServerError: return &ErrServiceInternal{res.Error} case http.StatusServiceUnavailable: return &ErrServiceTimeout{res.Error} } return nil } ``` ## File: event_feed_example_test.go ```go package fauna_test import ( "fmt" "log" "github.com/fauna/fauna-go/v3" ) func ExampleEventFeed_Next() { client := fauna.NewClient("secret", fauna.DefaultTimeouts(), fauna.URL(fauna.EndpointLocal)) query, queryErr := fauna.FQL(`Collection.byName("EventFeedTest")?.delete() Collection.create({ name: "EventFeedTest" }) EventFeedTest.all().eventSource()`, nil) if queryErr != nil { log.Fatal(queryErr.Error()) } feed, feedErr := client.FeedFromQuery(query) if feedErr != nil { log.Fatal(feedErr.Error()) } addOne, _ := fauna.FQL(`EventFeedTest.create({ foo: 'bar' })`, nil) _, addOneErr := client.Query(addOne) if addOneErr != nil { log.Fatal(addOneErr.Error()) } for { var page fauna.FeedPage eventErr := feed.Next(&page) if eventErr != nil { log.Fatal(eventErr.Error()) } for _, event := range page.Events { fmt.Println(event.Type) } if !page.HasNext { break } } // Output: add } ``` ## File: event_feed_test.go ```go package fauna_test import ( "testing" "time" "github.com/fauna/fauna-go/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestEventFeed(t *testing.T) { t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) t.Setenv(fauna.EnvFaunaSecret, "secret") client, clientErr := fauna.NewDefaultClient() require.NoError(t, clientErr) resetCollection(t, client) t.Run("returns errors correctly", func(t *testing.T) { t.Run("should error when the query doesn't return an event source", func(t *testing.T) { query, queryErr := fauna.FQL(`42`, nil) require.NoError(t, queryErr) _, feedErr := client.FeedFromQuery(query) require.ErrorContains(t, feedErr, "query should return a fauna.EventSource but got int") }) t.Run("should allow passing a cursor with a query", func(t *testing.T) { query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) require.NoError(t, queryErr, "failed to create a query for EventSource") feed, feedErr := client.FeedFromQuery(query, fauna.EventFeedCursor("cursor")) require.NoError(t, feedErr, "failed to init events feed") require.NotNil(t, feed, "feed is nil") }) t.Run("should error when attempting to use a start time and a cursor", func(t *testing.T) { query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) require.NoError(t, queryErr, "failed to create a query for EventSource") req, reqErr := client.Query(query) require.NoError(t, reqErr, "failed to execute query") var response fauna.EventSource unmarshalErr := req.Unmarshal(&response) require.NoError(t, unmarshalErr, "failed to unmarshal EventSource") _, feedErr := client.Feed(response, fauna.EventFeedStartTime(time.Now()), fauna.EventFeedCursor("cursor")) require.ErrorContains(t, feedErr, "cannot use EventFeedStartTime and EventFeedCursor together") }) }) t.Run("can use event feeds from a query", func(t *testing.T) { query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) require.NoError(t, queryErr, "failed to create a query for EventSource") feed, feedErr := client.FeedFromQuery(query) require.NoError(t, feedErr, "failed to init events feed") var ( start = 5 end = 20 ) createOne(t, client, feed) createMultipleDocs(t, client, start, end) var page fauna.FeedPage eventsErr := feed.Next(&page) require.NoError(t, eventsErr, "failed to get events from EventSource") require.Equal(t, end-start, len(page.Events), "unexpected number of events") }) t.Run("can get events from EventSource", func(t *testing.T) { t.Run("can get an EventSource", func(t *testing.T) { eventSource := getEventSource(t, client) require.NotNil(t, eventSource, "failed to get an EventSource") }) t.Run("get events from an EventSource", func(t *testing.T) { eventSource := getEventSource(t, client) feed, feedErr := client.Feed(eventSource) require.NoError(t, feedErr, "failed to init events feed") var ( start = 5 end = 20 ) createOne(t, client, feed) createMultipleDocs(t, client, start, end) var page fauna.FeedPage eventsErr := feed.Next(&page) require.NoError(t, eventsErr, "failed to get events from EventSource") require.Equal(t, end-start, len(page.Events), "unexpected number of events") }) }) t.Run("can get events from history", func(t *testing.T) { resetCollection(t, client) createOne(t, client, nil) eventSource := getEventSource(t, client) require.NotNil(t, eventSource, "failed to get an EventSource") feed, feedErr := client.Feed(eventSource) require.NoError(t, feedErr, "failed to init events feed") var page fauna.FeedPage eventsErr := feed.Next(&page) require.NoError(t, eventsErr, "failed to get events") require.Equal(t, 0, len(page.Events), "unexpected number of events") eventSource = getEventSource(t, client) require.NotNil(t, eventSource, "failed to get an EventSource") feed, feedErr = client.Feed(eventSource, fauna.EventFeedStartTime(time.Now().Add(-10*time.Minute))) require.NoError(t, feedErr, "failed to init events feed") eventsErr = feed.Next(&page) require.NoError(t, eventsErr, "failed to get events") require.Equal(t, 1, len(page.Events), "unexpected number of events") require.NotNil(t, feed) // get a blank page for { eventErr := feed.Next(&page) require.NoError(t, eventErr) break } require.Empty(t, page.Events) }) t.Run("can use page size", func(t *testing.T) { resetCollection(t, client) eventSource := getEventSource(t, client) pageSize := 3 feed, feedErr := client.Feed(eventSource, fauna.EventFeedPageSize(pageSize)) require.NoError(t, feedErr, "failed to init events feed") var ( start = 5 end = 20 page fauna.FeedPage seenEvents int ) createOne(t, client, feed) createMultipleDocs(t, client, start, end) didPaginate := false for { eventsErr := feed.Next(&page) require.NoError(t, eventsErr, "failed to get events from EventSource") seenEvents += len(page.Events) if !page.HasNext { break } didPaginate = true // every page but the last should have the right page size require.Equal(t, pageSize, len(page.Events), "unexpected number of events") } require.True(t, didPaginate, "expected to have called for multiple event pages") require.Equal(t, end-start, seenEvents, "unexpected number of events") }) t.Run("can unmarshall events", func(t *testing.T) { resetCollection(t, client) createEvent := func(v string) { createOneQuery, createOneQueryErr := fauna.FQL(`EventFeedTest.create({ foo: ${v} })`, map[string]any{"v": v}) require.NoError(t, createOneQueryErr, "failed to init query for create statement") require.NotNil(t, createOneQuery, "create statement is nil") _, createOneErr := client.Query(createOneQuery) require.NoError(t, createOneErr, "failed to create a document") } query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) require.NoError(t, queryErr, "failed to create a query for EventSource") feed, feedErr := client.FeedFromQuery(query) require.NoError(t, feedErr, "failed to init events feed") createEvent("bar") createEvent("baz") createEvent("bak") type TestEvent struct { Foo string `fauna:"foo"` } var ( page fauna.FeedPage testEvent TestEvent ) events := make([]TestEvent, 0, 3) for { eventsErr := feed.Next(&page) require.NoError(t, eventsErr, "failed to get page of events") for _, event := range page.Events { err := event.Unmarshal(&testEvent) require.NoError(t, err, "error unmarshalling event") events = append(events, testEvent) } if !page.HasNext { break } } require.Equal(t, []TestEvent{TestEvent{Foo: "bar"}, TestEvent{Foo: "baz"}, TestEvent{Foo: "bak"}}, events) }) } func resetCollection(t *testing.T, client *fauna.Client) { t.Helper() setupQuery, setupQueryErr := fauna.FQL(`Collection.byName("EventFeedTest")?.delete() Collection.create({ name: "EventFeedTest" })`, nil) require.NoError(t, setupQueryErr, "setup query error: %s", setupQueryErr) _, setupErr := client.Query(setupQuery) require.NoError(t, setupErr, "setup error: %s", setupErr) } func getEventSource(t *testing.T, client *fauna.Client) fauna.EventSource { t.Helper() query, queryErr := fauna.FQL(`EventFeedTest.all().eventSource()`, nil) require.NoError(t, queryErr, "failed to create a query for EventSource") feedRes, feedResErr := client.Query(query) require.NoError(t, feedResErr, "failed to init events feed") var eventSource fauna.EventSource unmarshalErr := feedRes.Unmarshal(&eventSource) require.NoError(t, unmarshalErr, "failed to unmarshal EventSource") require.NotNil(t, eventSource, "event source is nil") require.NotEmpty(t, eventSource, "event source is empty") return eventSource } func createOne(t *testing.T, client *fauna.Client, feed *fauna.EventFeed) { t.Helper() createOneQuery, createOneQueryErr := fauna.FQL("EventFeedTest.create({ foo: 'bar' })", nil) require.NoError(t, createOneQueryErr, "failed to init query for create statement") require.NotNil(t, createOneQuery, "create statement is nil") _, createOneErr := client.Query(createOneQuery) require.NoError(t, createOneErr, "failed to create a document") if feed == nil { return } var page fauna.FeedPage eventsErr := feed.Next(&page) require.NoError(t, eventsErr, "failed to get events") assert.Equal(t, 1, len(page.Events), "unexpected number of events") } func createMultipleDocs(t *testing.T, client *fauna.Client, start int, end int) { t.Helper() query, queryErr := fauna.FQL(`Set.sequence(${start}, ${end}).forEach(n => EventFeedTest.create({ n: n }))`, map[string]any{ "start": start, "end": end, }) require.NoError(t, queryErr, "failed to init query for create statement") _, err := client.Query(query) require.NoError(t, err) } ``` ## File: event_feed.go ```go package fauna import ( "encoding/json" ) // EventFeed represents an event feed subscription. type EventFeed struct { client *Client source EventSource decoder *json.Decoder opts *feedOptions lastCursor string } type feedOptions struct { PageSize *int Cursor *string StartTS *int64 } func newEventFeed(client *Client, source EventSource, opts *feedOptions) (*EventFeed, error) { feed := &EventFeed{ client: client, source: source, opts: opts, } return feed, nil } func (ef *EventFeed) newFeedRequest() (*feedRequest, error) { req := feedRequest{ apiRequest: apiRequest{ ef.client.ctx, ef.client.headers, }, Source: ef.source, Cursor: ef.lastCursor, } if ef.opts.StartTS != nil { req.StartTS = *ef.opts.StartTS } if ef.opts.Cursor != nil { req.Cursor = *ef.opts.Cursor } if ef.opts.PageSize != nil { req.PageSize = *ef.opts.PageSize } return &req, nil } func (ef *EventFeed) open() error { req, err := ef.newFeedRequest() if err != nil { return err } byteStream, err := req.do(ef.client) if err != nil { return err } ef.decoder = json.NewDecoder(byteStream) return nil } // FeedPage represents the response from [fauna.EventFeed.Next] type FeedPage struct { Events []Event `json:"events"` Cursor string `json:"cursor"` HasNext bool `json:"has_next"` Stats Stats `json:"stats"` } // internalFeedPage represents what comes back from the wire from the feed API. We do further processing on the // events that come back to create the FeedPage returned from [fauna.EventFeed.Next] type internalFeedPage struct { Events []rawEvent `json:"events"` Cursor string `json:"cursor"` HasNext bool `json:"has_next"` Stats Stats `json:"stats"` } // Next retrieves the next FeedPage from the [fauna.EventFeed] func (ef *EventFeed) Next(page *FeedPage) error { if err := ef.open(); err != nil { return err } var internalPage internalFeedPage if err := ef.decoder.Decode(&internalPage); err != nil { return err } parsedEvents := make([]Event, len(internalPage.Events)) for i, rv := range internalPage.Events { var parsedEvent Event err := convertRawEvent(&rv, &parsedEvent) if err != nil { return err } parsedEvents[i] = parsedEvent } page.Events = parsedEvents page.HasNext = internalPage.HasNext page.Stats = internalPage.Stats page.Cursor = internalPage.Cursor ef.lastCursor = page.Cursor // preserve page size pageSize := ef.opts.PageSize ef.opts = &feedOptions{ PageSize: pageSize, } return nil } ``` ## File: logging_log_test.go ```go //go:build !go1.21 package fauna_test import ( "bytes" "context" "fmt" "net/http" "os" "testing" "github.com/fauna/fauna-go/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestLogLogger(t *testing.T) { t.Run("should be able to provide a custom logger", func(t *testing.T) { buf := new(bytes.Buffer) client := fauna.NewClient("secret", fauna.DefaultTimeouts(), fauna.WithLogger(CustomLogger{ Output: buf, }), fauna.URL(fauna.EndpointLocal)) assert.NotNil(t, client) query, queryErr := fauna.FQL(`42`, nil) require.NoError(t, queryErr) res, err := client.Query(query) require.NoError(t, err) var value int err = res.Unmarshal(&value) require.NoError(t, err) require.Equal(t, 42, value) assert.NotEmpty(t, buf) }) } type CustomLogger struct { fauna.Logger Output *bytes.Buffer } func (c CustomLogger) Info(msg string) { _, _ = fmt.Fprint(os.Stdout, msg) } func (c CustomLogger) LogResponse(_ context.Context, requestBody []byte, res *http.Response) { _, _ = fmt.Fprintf(c.Output, "URL: %s\nStatus: %s\nBody: %s\n", res.Request.URL.String(), res.Status, string(requestBody)) } ``` ## File: logging_log.go ```go //go:build !go1.21 package fauna import ( "context" "fmt" "log" "net/http" "os" "strconv" ) type Logger interface { Debug(msg string) Info(msg string) Warn(msg string) Error(msg string) LogResponse(ctx context.Context, requestBody []byte, r *http.Response) } type ClientLogger struct { Logger logger *log.Logger level int } func (d ClientLogger) Debug(msg string) { if d.logger == nil { return } d.logger.Print("DEBUG: " + msg) } func (d ClientLogger) Info(msg string) { if d.logger == nil { return } d.logger.Print("INFO: " + msg) } func (d ClientLogger) Warn(msg string) { if d.logger == nil { return } d.logger.Print("WARN: " + msg) } func (d ClientLogger) Error(msg string) { if d.logger == nil { return } d.logger.Print("ERROR: " + msg) } func (d ClientLogger) LogResponse(ctx context.Context, requestBody []byte, r *http.Response) { if d.logger == nil { return } headers := r.Request.Header if _, found := headers["Authorization"]; found { headers["Authorization"] = []string{"hidden"} } d.Debug(fmt.Sprintf("Request Body: %s", string(requestBody))) d.Info(fmt.Sprintf("HTTP Response - Status: %s, From: %s, Headers: %v", r.Status, r.Request.URL.String(), headers)) } // DefaultLogger returns the default logger func DefaultLogger() Logger { clientLogger := ClientLogger{} if val, found := os.LookupEnv(EnvFaunaDebug); found { if level, _ := strconv.Atoi(val); level >= -4 { clientLogger.level = level clientLogger.logger = log.New(os.Stdout, "[fauna-go] ", log.LstdFlags|log.Lshortfile) } } return clientLogger } ``` ## File: logging_slog_test.go ```go //go:build go1.21 package fauna_test import ( "bytes" "context" "fmt" "net/http" "os" "testing" "github.com/fauna/fauna-go/v3" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestSlogLogger(t *testing.T) { t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) t.Setenv(fauna.EnvFaunaSecret, "secret") query, queryErr := fauna.FQL(`42`, nil) require.NoError(t, queryErr) t.Run("should be able to use the default logger", func(t *testing.T) { output, pipeErr := pipeStdOut(func() { t.Setenv(fauna.EnvFaunaDebug, "-4") client, clientErr := fauna.NewDefaultClient() require.NoError(t, clientErr) _, err := client.Query(query) require.NoError(t, err) }) require.NoError(t, pipeErr) require.NotEmpty(t, output) t.Logf("output: %s", string(output)) }) t.Run("no output with warn level", func(t *testing.T) { t.Setenv(fauna.EnvFaunaDebug, "1") client, clientErr := fauna.NewDefaultClient() require.NoError(t, clientErr) output, pipeErr := pipeStdOut(func() { _, err := client.Query(query) require.NoError(t, err) }) require.NoError(t, pipeErr) require.Empty(t, output) }) t.Run("should be able to provide a custom logger", func(t *testing.T) { buf := new(bytes.Buffer) client := fauna.NewClient("secret", fauna.DefaultTimeouts(), fauna.WithLogger(CustomLogger{ Output: buf, }), fauna.URL(fauna.EndpointLocal)) assert.NotNil(t, client) res, err := client.Query(query) require.NoError(t, err) var value int err = res.Unmarshal(&value) require.NoError(t, err) require.Equal(t, 42, value) assert.NotEmpty(t, buf) }) } type CustomLogger struct { fauna.Logger Output *bytes.Buffer } func (c CustomLogger) Info(msg string, _ ...any) { _, _ = fmt.Fprint(os.Stdout, msg) } func (c CustomLogger) LogResponse(_ context.Context, requestBody []byte, res *http.Response) { _, _ = fmt.Fprintf(c.Output, "URL: %s\nStatus: %s\nBody: %s\n", res.Request.URL.String(), res.Status, string(requestBody)) } ``` ## File: logging_slog.go ```go //go:build go1.21 package fauna import ( "context" "log/slog" "net/http" "os" "strconv" ) type Logger interface { Debug(msg string, args ...any) Info(msg string, args ...any) Warn(msg string, args ...any) Error(msg string, args ...any) LogResponse(ctx context.Context, requestBody []byte, r *http.Response) } type ClientLogger struct { Logger logger *slog.Logger } func (d ClientLogger) Debug(msg string, args ...any) { if d.logger == nil { return } d.logger.Debug(msg, args...) } func (d ClientLogger) Info(msg string, args ...any) { if d.logger == nil { return } d.logger.Info(msg, args...) } func (d ClientLogger) Warn(msg string, args ...any) { if d.logger == nil { return } d.logger.Warn(msg, args...) } func (d ClientLogger) Error(msg string, args ...any) { if d.logger == nil { return } d.logger.Error(msg, args...) } func (d ClientLogger) LogResponse(ctx context.Context, requestBody []byte, r *http.Response) { if d.logger == nil { return } requestLogger := d.logger.With( slog.String("method", r.Request.Method), slog.String("url", r.Request.URL.String()), slog.Int("status", r.StatusCode)) headers := r.Request.Header if _, found := headers["Authorization"]; found { headers["Authorization"] = []string{"hidden"} } if d.logger.Enabled(ctx, slog.LevelDebug) { requestLogger = requestLogger.With( slog.String("requestBody", string(requestBody)), ) } requestLogger.With( slog.Any("headers", headers)).Info("HTTP Response") } // DefaultLogger returns the default logger func DefaultLogger() Logger { clientLogger := ClientLogger{} if val, found := os.LookupEnv(EnvFaunaDebug); found { if level, _ := strconv.Atoi(val); level >= -4 { clientLogger.logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.Level(level), })) } } return clientLogger } ``` ## File: logging_test.go ```go package fauna_test import ( "io" "os" "testing" "github.com/fauna/fauna-go/v3" "github.com/stretchr/testify/require" ) func pipeStdOut(handler func()) ([]byte, error) { storeStdout := os.Stdout r, w, _ := os.Pipe() os.Stdout = w handler() _ = w.Close() out, _ := io.ReadAll(r) os.Stdout = storeStdout return out, nil } func TestLogger(t *testing.T) { t.Run("should not log by default", func(t *testing.T) { out, outErr := pipeStdOut(func() { logger := fauna.DefaultLogger() logger.Info("testing") }) require.NoError(t, outErr) require.Empty(t, string(out)) }) t.Run("should write to stdout", func(t *testing.T) { logMessage := "now you see me" out, outErr := pipeStdOut(func() { t.Setenv("FAUNA_DEBUG", "0") logger := fauna.DefaultLogger() logger.Info(logMessage) }) require.NoError(t, outErr) outStr := string(out) require.Contains(t, outStr, logMessage) t.Logf("out: %s", outStr) }) } ``` ## File: query_test.go ```go package fauna import ( "testing" "time" "github.com/stretchr/testify/assert" ) type fqlSuccessCase struct { testName string query string args map[string]any wants *Query } func TestFQL(t *testing.T) { testDate := time.Date(2023, 2, 24, 0, 0, 0, 0, time.UTC) testDino := map[string]any{ "name": "Dino", "age": 0, "birthdate": testDate, } testInnerDino, _ := FQL("let x = ${my_var}", map[string]any{"my_var": testDino}) testCases := []fqlSuccessCase{ { "simple literal case", "let x = 11", nil, &Query{ fragments: []*queryFragment{{true, "let x = 11"}}, }, }, { "simple literal case with brace", "let x = { y: 11 }", nil, &Query{ fragments: []*queryFragment{{true, "let x = { y: 11 }"}}, }, }, { "template variable and fauna variable", "let age = ${n1}\n\"Alice is #{age} years old.\"", map[string]any{"n1": 5}, &Query{ fragments: []*queryFragment{ {true, "let age = "}, {false, 5}, {true, "\n\"Alice is #{age} years old.\""}, }, }, }, { "template variable", "let x = ${my_var}", map[string]any{"my_var": testDino}, &Query{ fragments: []*queryFragment{ {true, "let x = "}, {false, testDino}, }, }, }, { "query variable", "${inner}\nx { name }", map[string]any{ "inner": testInnerDino, }, &Query{ fragments: []*queryFragment{ {false, testInnerDino}, {true, "\nx { name }"}, }, }, }, } for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { if q, err := FQL(tc.query, tc.args); assert.NoError(t, err) { assert.Equal(t, tc.wants, q) } }) } } func BenchmarkFQL(b *testing.B) { for i := 0; i < b.N; i++ { _, _ = FQL(`${arg0}.length`, map[string]any{"arg0": "foo"}) } } ``` ## File: query.go ```go package fauna import ( "errors" "fmt" ) type queryFragment struct { literal bool value any } // Query represents a query to be sent to Fauna. type Query struct { fragments []*queryFragment } // FQL creates a [fauna.Query] from an FQL string and set of arguments. // // args are optional. If provided their keys must match with `${name}` sigils // in the query. FQL `${value} + 1` must have an argument called "value" in the // args map. // // The values of args can be any type, including [fauna.Query] to allow for // query composition. func FQL(query string, args map[string]any) (*Query, error) { parts, err := parseTemplate(query) if err != nil { return nil, err } fragments := make([]*queryFragment, 0, len(parts)) for _, part := range parts { switch category := part.Category; category { case templateLiteral: fragments = append(fragments, &queryFragment{true, part.Text}) case templateVariable: if args == nil { return nil, errors.New("found template variable, but args is nil") } if arg, ok := args[part.Text]; ok { fragments = append(fragments, &queryFragment{false, arg}) } else { return nil, fmt.Errorf("template variable %s not found in args", part.Text) } } } return &Query{fragments: fragments}, nil } ``` ## File: request.go ```go package fauna import ( "bytes" "context" "encoding/json" "fmt" "io" "net/http" "net/url" "strings" ) type apiRequest struct { Context context.Context Headers map[string]string } func (apiReq *apiRequest) post(cli *Client, url *url.URL, bytesOut []byte) (attempts int, httpRes *http.Response, err error) { var httpReq *http.Request if httpReq, err = http.NewRequestWithContext( apiReq.Context, http.MethodPost, url.String(), bytes.NewReader(bytesOut), ); err != nil { err = fmt.Errorf("failed to init request: %w", err) return } httpReq.Header.Set(headerAuthorization, `Bearer `+cli.secret) if lastTxnTs := cli.lastTxnTime.string(); lastTxnTs != "" { httpReq.Header.Set(HeaderLastTxnTs, lastTxnTs) } for k, v := range apiReq.Headers { httpReq.Header.Set(k, v) } if attempts, httpRes, err = cli.doWithRetry(httpReq); err != nil { err = ErrNetwork(fmt.Errorf("network error: %w", err)) } cli.logger.LogResponse(cli.ctx, bytesOut, httpRes) return } type queryRequest struct { apiRequest Query any Arguments map[string]any } type queryResponse struct { Header http.Header Data json.RawMessage `json:"data"` Error *ErrFauna `json:"error,omitempty"` Logging []string `json:"logging,omitempty"` SchemaVersion int64 `json:"schema_version"` StaticType string `json:"static_type"` Stats *Stats `json:"stats,omitempty"` Summary string `json:"summary"` TxnTime int64 `json:"txn_ts"` Tags string `json:"query_tags"` } func parseQueryResponse(httpRes *http.Response) (qRes *queryResponse, err error) { var bytesIn []byte if bytesIn, err = io.ReadAll(httpRes.Body); err != nil { err = fmt.Errorf("failed to read response body: %w", err) return } if err = json.Unmarshal(bytesIn, &qRes); err != nil { err = fmt.Errorf("failed to unmarshal response: %w", err) } return } func (r *queryResponse) queryTags() map[string]string { ret := map[string]string{} if r.Tags != "" { for _, tag := range strings.Split(r.Tags, `,`) { tokens := strings.Split(tag, `=`) ret[tokens[0]] = tokens[1] } } return ret } func (qReq *queryRequest) do(cli *Client) (qSus *QuerySuccess, err error) { var bytesOut []byte if bytesOut, err = marshal(qReq); err != nil { err = fmt.Errorf("marshal request failed: %w", err) return } var queryURL *url.URL if queryURL, err = cli.parseQueryURL(); err != nil { return } var ( attempts int httpRes *http.Response ) if attempts, httpRes, err = qReq.post(cli, queryURL, bytesOut); err != nil { return } var qRes *queryResponse if qRes, err = parseQueryResponse(httpRes); err != nil { return } cli.logger.LogResponse(cli.ctx, bytesOut, httpRes) cli.lastTxnTime.sync(qRes.TxnTime) qRes.Header = httpRes.Header if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err != nil { return } var data any if data, err = decode(qRes.Data); err != nil { err = fmt.Errorf("failed to decode data: %w", err) return } qSus = &QuerySuccess{ QueryInfo: newQueryInfo(qRes), Data: data, StaticType: qRes.StaticType, } qSus.Stats.Attempts = attempts return } type streamRequest struct { apiRequest Stream EventSource StartTS int64 Cursor string } func (streamReq *streamRequest) do(cli *Client) (bytes io.ReadCloser, err error) { var bytesOut []byte if bytesOut, err = marshal(streamReq); err != nil { err = fmt.Errorf("marshal request failed: %w", err) return } var streamURL *url.URL if streamURL, err = cli.parseStreamURL(); err != nil { return } var ( attempts int httpRes *http.Response ) if attempts, httpRes, err = streamReq.post(cli, streamURL, bytesOut); err != nil { return } cli.logger.LogResponse(cli.ctx, bytesOut, httpRes) if httpRes.StatusCode != http.StatusOK { var qRes *queryResponse if qRes, err = parseQueryResponse(httpRes); err == nil { if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err == nil { err = fmt.Errorf("unknown error for http status: %d", httpRes.StatusCode) } } return } bytes = httpRes.Body return } type feedRequest struct { apiRequest Source EventSource Cursor string StartTS int64 PageSize int } func (feedReq *feedRequest) do(cli *Client) (io.ReadCloser, error) { bytesOut, marshalErr := marshal(feedReq) if marshalErr != nil { return nil, fmt.Errorf("marshal request failed: %w", marshalErr) } changeFeedURL, parseURLErr := cli.parseFeedURL() if parseURLErr != nil { return nil, fmt.Errorf("parse url failed: %w", parseURLErr) } attempts, httpRes, postErr := feedReq.post(cli, changeFeedURL, bytesOut) if postErr != nil { return nil, fmt.Errorf("post request failed: %w", postErr) } if httpRes.StatusCode != http.StatusOK { qRes, err := parseQueryResponse(httpRes) if err == nil { if err = getErrFauna(httpRes.StatusCode, qRes, attempts); err == nil { err = fmt.Errorf("unknown error for http status: %d", httpRes.StatusCode) } } return nil, err } return httpRes.Body, nil } ``` ## File: response.go ```go package fauna // Stats provides access to stats generated by the query. type Stats struct { // ComputeOps is the amount of Transactional Compute Ops consumed by the query. ComputeOps int `json:"compute_ops"` // ReadOps is the amount of Transactional Read Ops consumed by the query. ReadOps int `json:"read_ops"` // WriteOps is amount of Transactional Write Ops consumed by the query. WriteOps int `json:"write_ops"` // QueryTimeMs is the query run time in milliseconds. QueryTimeMs int `json:"query_time_ms"` // ContentionRetries is the number of times the transaction was retried due // to write contention. ContentionRetries int `json:"contention_retries"` // StorageBytesRead is the amount of data read from storage, in bytes. StorageBytesRead int `json:"storage_bytes_read"` // StorageBytesWrite is the amount of data written to storage, in bytes. StorageBytesWrite int `json:"storage_bytes_write"` // ProcessingTimeMs is the amount of time producing the event, only applies to events. ProcessingTimeMs *int `json:"processing_time_ms,omitempty"` // Attempts is the number of times the client attempted to run the query. Attempts int `json:"_"` } // QueryInfo provides access to information about the query. type QueryInfo struct { // TxnTime is the transaction commit time in micros since epoch. Used to // populate the x-last-txn-ts request header in order to get a consistent // prefix RYOW guarantee. TxnTime int64 // SchemaVersion that was used for the query execution. SchemaVersion int64 // Summary is a comprehensive, human readable summary of any errors, warnings // and/or logs returned from the query. Summary string // QueryTags is the value of [fauna.Tags] provided with the query, if there // were any. QueryTags map[string]string // Stats provides access to stats generated by the query. Stats *Stats } func newQueryInfo(res *queryResponse) *QueryInfo { return &QueryInfo{ TxnTime: res.TxnTime, SchemaVersion: res.SchemaVersion, Summary: res.Summary, QueryTags: res.queryTags(), Stats: res.Stats, } } // QuerySuccess is the response returned from [fauna.Client.Query] when the // query runs successfully. type QuerySuccess struct { *QueryInfo // Data is the raw result returned by the query. Data any // StaticType is the query's inferred static result type, if the query was // typechecked. StaticType string } // Unmarshal will unmarshal the raw [fauna.QuerySuccess.Data] value into a // known type provided as `into`. `into` must be a pointer to a map or struct. func (r *QuerySuccess) Unmarshal(into any) error { return decodeInto(r.Data, into) } ``` ## File: serializer_benchmark_test.go ```go package fauna import ( "testing" "time" "github.com/stretchr/testify/assert" ) func BenchmarkUnmarshalInt(b *testing.B) { v := []byte(`{"@int":"1234"}`) var err error for i := 0; i < b.N; i++ { var res int err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalLong(b *testing.B) { v := []byte(`{"@long":"4294967297"}`) var err error for i := 0; i < b.N; i++ { var res int64 err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalDouble(b *testing.B) { v := []byte(`{"@double":"123.456"}`) var err error for i := 0; i < b.N; i++ { var res float64 err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalBool(b *testing.B) { v := []byte(`true`) var err error for i := 0; i < b.N; i++ { var res bool err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalTime(b *testing.B) { v := []byte(`{"@time":"2023-02-28T18:10:10.00001Z"}`) var err error for i := 0; i < b.N; i++ { var res time.Time err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalModulePointer(b *testing.B) { v := []byte(`{"@mod":"PtrFoo"}`) var err error for i := 0; i < b.N; i++ { var res *Module err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalModule(b *testing.B) { v := []byte(`{"@mod":"PtrFoo"}`) var err error for i := 0; i < b.N; i++ { var res Module err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalRefPointer(b *testing.B) { v := []byte(`{"@ref":{"id":"1234","coll":{"@mod":"PtrFoo"}}}`) var err error for i := 0; i < b.N; i++ { var res *Ref err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalRef(b *testing.B) { v := []byte(`{"@ref":{"id":"1234","coll":{"@mod":"Foo"}}}`) var err error for i := 0; i < b.N; i++ { var res Ref err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalNamedRefPointer(b *testing.B) { v := []byte(`{"@ref":{"name":"BarPtr","coll":{"@mod":"FooPtr"}}}`) var err error for i := 0; i < b.N; i++ { var res *NamedRef err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalNamedRef(b *testing.B) { v := []byte(`{"@ref":{"name":"Bar","coll":{"@mod":"Foo"}}}`) var err error for i := 0; i < b.N; i++ { var res NamedRef err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalSet(b *testing.B) { v := []byte(`{"@set":{"data":[0,1,2,3],"after":"foobarbaz"}}`) var err error for i := 0; i < b.N; i++ { var res Page err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalDocument(b *testing.B) { v := []byte(`{"@doc": { "id": "1234", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.00001Z"}, "extra_field": "foobar" }}`) var err error for i := 0; i < b.N; i++ { var res DocBusinessObj err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalNamedDocument(b *testing.B) { v := []byte(`{"@doc": { "name": "mydoc", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.00001Z"}, "extra_field": "foobar" }}`) var err error for i := 0; i < b.N; i++ { var res NamedDocBusinessObj err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalNullDocument(b *testing.B) { v := []byte(`{"@ref":{"id":"1234","coll":{"@mod":"Foo:123"},"exists":false,"cause":"foobar"}}`) var err error for i := 0; i < b.N; i++ { var res NullDocument err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalNamedNullDocument(b *testing.B) { v := []byte(`{"@ref":{"name":"FooBar","coll":{"@mod":"Foo"},"exists":false,"cause":"foobar"}}`) var err error for i := 0; i < b.N; i++ { var res NullNamedDocument err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalObject(b *testing.B) { v := []byte(`{"@object": { "string_field": "foobarbaz", "bool_field": true, "map_field": {"foo":"bar","baz":"buz"}, "single_key_map_field": {"foo":"bar"}, "slice_field": [1,2,3,4], "tuple_field": ["one",2,3.0], "int_field": 1234, "double_field": 1234.567 }}`) var err error for i := 0; i < b.N; i++ { var res SubBusinessObj err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalSlice(b *testing.B) { v := []byte(`[1,2,3,4]`) var err error for i := 0; i < b.N; i++ { var res []int err = unmarshal(v, &res) assert.NoError(b, err) } } func BenchmarkUnmarshalBytes(b *testing.B) { v := []byte(`{"@bytes":"SGVsbG8sIGZyb20gRmF1bmEh"}`) var err error for i := 0; i < b.N; i++ { var res []byte err = unmarshal(v, &res) assert.NoError(b, err) } } ``` ## File: serializer_test.go ```go package fauna import ( "context" "encoding/base64" "reflect" "testing" "time" "github.com/stretchr/testify/assert" ) type SubBusinessObj struct { StringField string `fauna:"string_field"` BoolField bool `fauna:"bool_field"` MapField map[string]any `fauna:"map_field"` SingleKeyMapField map[string]any `fauna:"single_key_map_field"` SliceField []any `fauna:"slice_field"` TupleField []any `fauna:"tuple_field"` IntField int `fauna:"int_field"` DoubleField float64 `fauna:"double_field"` IgnoredField2 string `fauna:"-"` } type DocBusinessObj struct { Document ExtraField string `fauna:"extra_field"` } type NamedDocBusinessObj struct { NamedDocument ExtraField string `fauna:"extra_field"` } type NullDocBusinessObj struct { NullDocument } type NullNamedDocBusinessObj struct { NullNamedDocument } type BusinessObj struct { IntField int `fauna:"int_field"` LongField int64 `fauna:"long_field"` DoubleField float64 `fauna:"double_field"` PtrModField *Module `fauna:"ptr_mod_field"` ModField Module `fauna:"mod_field"` PtrRefField *Ref `fauna:"ptr_ref_field"` RefField Ref `fauna:"ref_field"` NamedRefField NamedRef `fauna:"named_ref_field"` SetField Page `fauna:"set_field"` StreamField EventSource `fauna:"stream_field"` ObjField SubBusinessObj `fauna:"obj_field"` DocField DocBusinessObj `fauna:"doc_field"` NamedDocField NamedDocBusinessObj `fauna:"named_doc_field"` NullDocField NullDocBusinessObj `fauna:"nulldoc_field"` NullNamedDocField NullNamedDocBusinessObj `fauna:"nulldoc_named_field"` BytesField []byte `fauna:"bytes_field"` IgnoredField string `fauna:"-"` } func marshalAndCheck(t *testing.T, obj any) []byte { if bs, err := marshal(obj); err != nil { t.Fatalf("failed to marshal: %s", err) return nil } else { return bs } } func unmarshalAndCheck(t *testing.T, bs []byte, obj any) { if err := unmarshal(bs, obj); err != nil { t.Fatalf("failed to unmarshal: %s", err) } } func TestRoundtrip(t *testing.T) { var businessObjDoc = []byte(`{ "int_field": {"@int":"1234"}, "long_field": {"@long":"123456"}, "double_field": {"@double":"123.456"}, "ptr_mod_field": {"@mod":"PtrFoo"}, "mod_field": {"@mod":"Foo"}, "ptr_ref_field": {"@ref":{"id":"1234","coll":{"@mod":"PtrFoo"}}}, "ref_field": {"@ref":{"id":"1234","coll":{"@mod":"Foo:123"}}}, "named_ref_field": {"@ref":{"name":"FooBar","coll":{"@mod":"Foo"}}}, "set_field": {"@set":{"data":[0,1,2,3],"after":"foobarbaz"}}, "doc_field": {"@doc": { "id": "1234", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.00001Z"}, "extra_field": "foobar" }}, "named_doc_field": {"@doc": { "name": "mydoc", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.00001Z"}, "extra_field": "foobar" }}, "nulldoc_field": {"@ref":{"id":"1234","coll":{"@mod":"Foo:123"},"exists":false,"cause":"foobar"}}, "nulldoc_named_field": {"@ref":{"name":"FooBar","coll":{"@mod":"Foo"},"exists":false,"cause":"foobar"}}, "obj_field": {"@object": { "string_field": "foobarbaz", "bool_field": true, "map_field": {"foo":"bar","baz":"buz"}, "single_key_map_field": {"foo":"bar"}, "slice_field": [1,2,3,4], "tuple_field": ["one",2,3.0], "int_field": 1234, "double_field": 1234.567 }}, "bytes_field": {"@bytes":"SGVsbG8sIGZyb20gRmF1bmEh"} }`) obj := &BusinessObj{} unmarshalAndCheck(t, businessObjDoc, obj) bs := marshalAndCheck(t, obj) obj2 := &BusinessObj{} unmarshalAndCheck(t, bs, obj2) assert.Equal(t, obj, obj2) } func roundTripCheck(t *testing.T, test any, expected string) { bs := marshalAndCheck(t, test) if assert.JSONEq(t, expected, string(bs)) { decoded := reflect.New(reflect.TypeOf(test)).Interface() unmarshalAndCheck(t, bs, decoded) dec := reflect.Indirect(reflect.ValueOf(decoded)).Interface() assert.EqualValues(t, test, dec) } } func TestEncodingPrimitives(t *testing.T) { t.Run("encode string", func(t *testing.T) { roundTripCheck(t, "foo", `"foo"`) }) t.Run("encode bools", func(t *testing.T) { roundTripCheck(t, true, `true`) roundTripCheck(t, false, `false`) }) t.Run("encode ints", func(t *testing.T) { roundTripCheck(t, int(10), `{"@int":"10"}`) roundTripCheck(t, int8(10), `{"@int":"10"}`) roundTripCheck(t, int16(10), `{"@int":"10"}`) roundTripCheck(t, int32(10), `{"@int":"10"}`) roundTripCheck(t, int(-10), `{"@int":"-10"}`) roundTripCheck(t, int8(-10), `{"@int":"-10"}`) roundTripCheck(t, int16(-10), `{"@int":"-10"}`) roundTripCheck(t, int32(-10), `{"@int":"-10"}`) roundTripCheck(t, 2147483647, `{"@int":"2147483647"}`) roundTripCheck(t, -2147483648, `{"@int":"-2147483648"}`) }) t.Run("encode longs", func(t *testing.T) { roundTripCheck(t, 2147483648, `{"@long":"2147483648"}`) roundTripCheck(t, -2147483649, `{"@long":"-2147483649"}`) roundTripCheck(t, 9223372036854775807, `{"@long":"9223372036854775807"}`) roundTripCheck(t, -9223372036854775808, `{"@long":"-9223372036854775808"}`) }) t.Run("fail on numbers that are too big", func(t *testing.T) { tooLarge := uint(9223372036854775808) _, tooLargeErr := marshal(tooLarge) assert.Error(t, tooLargeErr) }) t.Run("encode floats", func(t *testing.T) { roundTripCheck(t, 100.0, `{"@double":"100"}`) roundTripCheck(t, -100.1, `{"@double":"-100.1"}`) roundTripCheck(t, 9.999999999999, `{"@double":"9.999999999999"}`) }) t.Run("encode nil", func(t *testing.T) { bs := marshalAndCheck(t, nil) if assert.JSONEq(t, `null`, string(bs)) { var decoded *string unmarshalAndCheck(t, bs, &decoded) assert.Nil(t, decoded) } }) t.Run("encode bytes", func(t *testing.T) { inputBytes := []byte("This is a test string 🚀 with various characters: !@#$%^&*()_+=-`~[]{}|;:'\\\",./<>?") encoded := base64.StdEncoding.EncodeToString(inputBytes) bs := marshalAndCheck(t, inputBytes) if assert.JSONEq(t, `{"@bytes": "`+encoded+`"}`, string(bs)) { var decoded []byte unmarshalAndCheck(t, bs, &decoded) assert.Equal(t, inputBytes, decoded) } }) } func TestEncodingTime(t *testing.T) { t.Run("encodes time as @time", func(t *testing.T) { if tz, err := time.LoadLocation("America/Los_Angeles"); assert.NoError(t, err) { bs := marshalAndCheck(t, time.Date(2023, 02, 28, 10, 10, 10, 10000, tz)) if assert.JSONEq(t, `{"@time":"2023-02-28T18:10:10.00001Z"}`, string(bs)) { var decoded time.Time bs := []byte(`{"@time":"2023-02-28T18:10:10.000010Z"}`) unmarshalAndCheck(t, bs, &decoded) assert.Equal(t, time.Date(2023, 02, 28, 18, 10, 10, 10000, time.UTC), decoded) } } }) t.Run("encodes time as @date when hinted", func(t *testing.T) { obj := struct { D time.Time `fauna:"d_field,date"` }{ D: time.Date(2023, 02, 28, 0, 0, 0, 0, time.UTC), } roundTripCheck(t, obj, `{"d_field":{"@date":"2023-02-28"}}`) }) } func TestDecodingToInterface(t *testing.T) { var doc = []byte(`{ "int_field": {"@int":"1234"}, "long_field": {"@long":"123456"}, "double_field": {"@double":"123.456"}, "slice_field": [{"@mod":"Foo"}, {"@date":"2023-03-17"}], "obj_field": {"@object": { "string_field": "foobarbaz", "bool_field": true, "int_field": 1234, "double_field": 1234.567 }} }`) var res any err := unmarshal(doc, &res) if assert.NoError(t, err) { rMap := res.(map[string]any) assert.Equal(t, int64(1234), rMap["int_field"].(int64)) assert.Equal(t, int64(123456), rMap["long_field"].(int64)) assert.Equal(t, float64(123.456), rMap["double_field"].(float64)) sliceField := rMap["slice_field"].([]any) assert.Equal(t, &Module{"Foo"}, sliceField[0].(*Module)) sliceDate := time.Date(2023, 03, 17, 0, 0, 0, 0, time.UTC) assert.Equal(t, &sliceDate, sliceField[1].(*time.Time)) objMap := rMap["obj_field"].(map[string]any) assert.Equal(t, "foobarbaz", objMap["string_field"].(string)) assert.Equal(t, true, objMap["bool_field"].(bool)) assert.Equal(t, float64(1234), objMap["int_field"].(float64)) assert.Equal(t, 1234.567, objMap["double_field"].(float64)) } } func TestEncodingFaunaStructs(t *testing.T) { t.Run("encodes Module", func(t *testing.T) { obj := Module{"Foo"} roundTripCheck(t, obj, `{"@mod":"Foo"}`) }) t.Run("encodes Ref", func(t *testing.T) { obj := Ref{"1234", &Module{"Foo"}} roundTripCheck(t, obj, `{"@ref":{"id":"1234","coll":{"@mod":"Foo"}}}`) }) t.Run("encodes NamedRef", func(t *testing.T) { obj := NamedRef{"Bar", &Module{"Foo"}} roundTripCheck(t, obj, `{"@ref":{"name":"Bar","coll":{"@mod":"Foo"}}}`) }) t.Run("encodes Page", func(t *testing.T) { obj := Page{[]any{"0", "1", "2"}, "foobarbaz"} roundTripCheck(t, obj, `{"@set":{"data":["0","1","2"],"after":"foobarbaz"}}`) }) t.Run("encodes StreamFromQuery", func(t *testing.T) { stream := EventSource("abcd==") roundTripCheck(t, stream, `{"@stream":"abcd=="}`) }) t.Run("encode NullDoc", func(t *testing.T) { obj := NullDocument{Cause: "Foo", Ref: &Ref{ID: "1234", Coll: &Module{"Foo"}}} roundTripCheck(t, obj, `{"cause": "Foo", "ref": {"@ref":{"id":"1234","coll":{"@mod":"Foo"}}}}`) }) t.Run("decodes data-less set", func(t *testing.T) { bs := []byte(`{"@set":"foobarbaz"}`) var set any unmarshalAndCheck(t, bs, &set) if page, ok := set.(*Page); assert.True(t, ok) { assert.Nil(t, page.Data) assert.Equal(t, "foobarbaz", page.After) } }) } func TestEncodingStructs(t *testing.T) { t.Run("encodes using struct field names", func(t *testing.T) { obj := struct { Field string }{"foo"} roundTripCheck(t, obj, `{"Field":"foo"}`) }) t.Run("encodes using configured field names", func(t *testing.T) { obj := struct { Field string `fauna:"field_name"` }{"foo"} roundTripCheck(t, obj, `{"field_name":"foo"}`) }) t.Run("encodes hinted types without configured name", func(t *testing.T) { obj := struct { Field time.Time `fauna:",date"` }{ Field: time.Date(2023, 02, 28, 0, 0, 0, 0, time.UTC), } roundTripCheck(t, obj, `{"Field":{"@date":"2023-02-28"}}`) }) t.Run("encodes hinted types with configured name", func(t *testing.T) { obj := struct { Field time.Time `fauna:"field_name,date"` }{ Field: time.Date(2023, 02, 28, 0, 0, 0, 0, time.UTC), } roundTripCheck(t, obj, `{"field_name":{"@date":"2023-02-28"}}`) }) t.Run("ignores fields", func(t *testing.T) { obj := struct { Field string IgnoredField string `fauna:"-"` }{ Field: "foo", IgnoredField: "", } roundTripCheck(t, obj, `{"Field":"foo"}`) }) t.Run("encodes nested fields", func(t *testing.T) { var obj struct { GrandParent struct { Parent struct { Child string Sibling string } } } obj.GrandParent.Parent.Child = "foo" obj.GrandParent.Parent.Sibling = "bar" roundTripCheck(t, obj, `{"GrandParent":{"Parent":{"Child":"foo","Sibling":"bar"}}}`) }) } func TestEncodingPointers(t *testing.T) { type checkStruct struct { Field string } var obj struct { NilPtrField *checkStruct PtrField *checkStruct Field checkStruct } obj.NilPtrField = nil obj.PtrField = &checkStruct{"foo"} obj.Field = checkStruct{"bar"} roundTripCheck(t, obj, `{"NilPtrField":null,"PtrField":{"Field":"foo"},"Field":{"Field":"bar"}}`) } func TestEncodingObject(t *testing.T) { t.Run("object has @object key", func(t *testing.T) { test := map[string]int{"@object": 10} expected := `{"@object":{"@object":{"@int":"10"}}}` bs := marshalAndCheck(t, test) assert.JSONEq(t, expected, string(bs)) }) t.Run("object has inner conflicting @int key", func(t *testing.T) { roundTripCheck( t, map[string]map[string]string{"@object": {"@int": "bar"}}, `{"@object":{"@object":{"@object":{"@int":"bar"}}}}`, ) }) t.Run("object has inner conflicting @object key", func(t *testing.T) { roundTripCheck( t, map[string]map[string]string{"@object": {"@object": "bar"}}, `{"@object":{"@object":{"@object":{"@object":"bar"}}}}`, ) }) t.Run("object has multiple conflicting type keys", func(t *testing.T) { roundTripCheck( t, map[string]string{"@int": "foo", "@double": "bar"}, `{"@object":{"@int":"foo","@double":"bar"}}`, ) }) t.Run("object has mixed keys with a conflict", func(t *testing.T) { roundTripCheck( t, map[string]string{"@int": "foo", "bar": "buz"}, `{"@object":{"@int":"foo","bar":"buz"}}`, ) }) t.Run("object has nested conflicting keys", func(t *testing.T) { roundTripCheck( t, map[string]map[string]map[string]map[string]int{"@int": {"@date": {"@time": {"@long": 10}}}}, `{"@object":{"@int":{"@object":{"@date":{"@object":{"@time":{"@object":{"@long":{"@int":"10"}}}}}}}}}`, ) }) t.Run("object has non-conflicting keys", func(t *testing.T) { roundTripCheck( t, map[string]int{"@foo": 10}, `{"@foo":{"@int":"10"}}`, ) }) } func TestEncodingDocuments(t *testing.T) { t.Run("Document", func(t *testing.T) { type MyDoc struct { Document ExtraField1 string `fauna:"extra_field_1"` ExtraField2 string `fauna:"extra_field_2"` } doc := []byte(`{"@doc":{ "id": "1234", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.000010Z"}, "extra_field_1": "foobar", "extra_field_2": "bazbuz" }}`) ts := time.Date(2023, 02, 28, 18, 10, 10, 10000, time.UTC) expected := MyDoc{ Document: Document{ ID: "1234", Coll: &Module{"Foo"}, TS: &ts, }, ExtraField1: "foobar", ExtraField2: "bazbuz", } var got MyDoc unmarshalAndCheck(t, doc, &got) assert.Equal(t, expected, got) encodedDoc := `{"@doc":{ "id": "1234", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.00001Z"}, "extra_field_1": "foobar", "extra_field_2": "bazbuz" }}` bs := marshalAndCheck(t, expected) assert.JSONEq(t, encodedDoc, string(bs)) }) t.Run("NamedDocument", func(t *testing.T) { type MyDoc struct { NamedDocument ExtraField1 string `fauna:"extra_field_1"` ExtraField2 string `fauna:"extra_field_2"` } doc := []byte(`{"@doc":{ "name": "mydoc", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.000010Z"}, "extra_field_1": "foobar", "extra_field_2": "bazbuz" }}`) ts := time.Date(2023, 02, 28, 18, 10, 10, 10000, time.UTC) expected := MyDoc{ NamedDocument: NamedDocument{ Name: "mydoc", Coll: &Module{"Foo"}, TS: &ts, }, ExtraField1: "foobar", ExtraField2: "bazbuz", } var got MyDoc unmarshalAndCheck(t, doc, &got) assert.Equal(t, expected, got) encodedDoc := `{"@doc":{ "name": "mydoc", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.00001Z"}, "extra_field_1": "foobar", "extra_field_2": "bazbuz" }}` bs := marshalAndCheck(t, expected) assert.JSONEq(t, encodedDoc, string(bs)) }) t.Run("Raw Document", func(t *testing.T) { doc := []byte(`{"@doc":{ "id": "1234", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.000010Z"} }}`) ts := time.Date(2023, 02, 28, 18, 10, 10, 10000, time.UTC) expected := Document{ ID: "1234", Coll: &Module{"Foo"}, TS: &ts, } var got Document unmarshalAndCheck(t, doc, &got) assert.Equal(t, expected, got) encodedDoc := `{"@doc":{ "id": "1234", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.00001Z"} }}` bs := marshalAndCheck(t, expected) assert.JSONEq(t, encodedDoc, string(bs)) }) t.Run("Raw NamedDocument", func(t *testing.T) { doc := []byte(`{"@doc":{ "name": "mydoc", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.000010Z"} }}`) ts := time.Date(2023, 02, 28, 18, 10, 10, 10000, time.UTC) expected := NamedDocument{ Name: "mydoc", Coll: &Module{"Foo"}, TS: &ts, } var got NamedDocument unmarshalAndCheck(t, doc, &got) assert.Equal(t, expected, got) encodedDoc := `{"@doc":{ "name": "mydoc", "coll": {"@mod":"Foo"}, "ts": {"@time":"2023-02-28T18:10:10.00001Z"} }}` bs := marshalAndCheck(t, expected) assert.JSONEq(t, encodedDoc, string(bs)) }) } func TestComposition(t *testing.T) { testDate := time.Date(2023, 2, 24, 0, 0, 0, 0, time.UTC) testDino := map[string]any{ "name": "Dino", "age": 0, "birthdate": testDate, } testInnerDino, err := FQL("let x = ${my_var}", map[string]any{"my_var": testDino}) t.Run("template variable", func(t *testing.T) { encodedDoc := `{"fql":[ "let x = ", {"value":{ "age":{"@int":"0"}, "birthdate":{"@time":"2023-02-24T00:00:00Z"}, "name":"Dino" }} ]}` if assert.NoError(t, err) { bs := marshalAndCheck(t, testInnerDino) assert.JSONEq(t, encodedDoc, string(bs)) } }) t.Run("sub query", func(t *testing.T) { encodedDoc := `{"fql":[ {"fql":[ "let x = ", {"value":{ "age":{"@int":"0"}, "birthdate":{"@time":"2023-02-24T00:00:00Z"}, "name":"Dino" }} ]}, "\nx { name }" ]}` if assert.NoError(t, err) { inner, err := FQL("${inner}\nx { name }", map[string]any{"inner": testInnerDino}) if assert.NoError(t, err) { bs := marshalAndCheck(t, inner) assert.JSONEq(t, encodedDoc, string(bs)) } } }) } func TestMarshalEventSourceStructs(t *testing.T) { t.Run("marshal query request", func(t *testing.T) { marshalAndCheck(t, queryRequest{ apiRequest: apiRequest{ Context: context.Background(), Headers: map[string]string{}, }, Query: nil, Arguments: nil, }) }) t.Run("marshal stream request", func(t *testing.T) { marshalAndCheck(t, streamRequest{ apiRequest: apiRequest{ Context: context.Background(), Headers: map[string]string{}, }, Stream: "", StartTS: 0, Cursor: "", }) }) t.Run("marshal feed request", func(t *testing.T) { marshalAndCheck(t, feedRequest{ apiRequest: apiRequest{ Context: context.Background(), Headers: map[string]string{}, }, Source: "", Cursor: "", PageSize: 0, StartTS: 0, }) }) } ``` ## File: serializer.go ```go package fauna import ( "encoding/base64" "encoding/json" "fmt" "reflect" "strconv" "strings" "time" "github.com/mitchellh/mapstructure" ) const ( fieldTag = "fauna" dateFormat = "2006-01-02" timeFormat = "2006-01-02T15:04:05.999999Z" maxInt = 2147483647 minInt = -2147483648 maxLong = 9223372036854775807 minLong = -9223372036854775808 ) type typeTag string const ( typeTagInt typeTag = "@int" typeTagLong typeTag = "@long" typeTagDouble typeTag = "@double" typeTagDate typeTag = "@date" typeTagTime typeTag = "@time" typeTagDoc typeTag = "@doc" typeTagRef typeTag = "@ref" typeTagSet typeTag = "@set" typeTagStream typeTag = "@stream" typeTagMod typeTag = "@mod" typeTagObject typeTag = "@object" typeTagBytes typeTag = "@bytes" ) func keyConflicts(key string) bool { switch typeTag(key) { case typeTagInt, typeTagLong, typeTagDouble, typeTagDate, typeTagTime, typeTagDoc, typeTagMod, typeTagObject: return true default: return false } } type Module struct { Name string } type Document struct { ID string `fauna:"id"` Coll *Module `fauna:"coll"` TS *time.Time `fauna:"ts"` Data map[string]any `fauna:"-"` } type NamedDocument struct { Name string `fauna:"name"` Coll *Module `fauna:"coll"` TS *time.Time `fauna:"ts"` Data map[string]any `fauna:"-"` } type NullDocument struct { Ref *Ref `fauna:"ref"` Cause string `fauna:"cause"` } type NullNamedDocument struct { Ref *NamedRef `fauna:"ref"` Cause string `fauna:"cause"` } type Ref struct { ID string `fauna:"id"` Coll *Module `fauna:"coll"` } type NamedRef struct { Name string `fauna:"name"` Coll *Module `fauna:"coll"` } type Page struct { Data []any `fauna:"data"` After string `fauna:"after"` } func (p Page) Unmarshal(into any) error { return decodeInto(p.Data, into) } type EventSource string func mapDecoder(into any) (*mapstructure.Decoder, error) { return mapstructure.NewDecoder(&mapstructure.DecoderConfig{ TagName: "fauna", Result: into, IgnoreUntaggedFields: false, ErrorUnused: false, ErrorUnset: false, DecodeHook: unmarshalDoc, Squash: true, }) } func unmarshal(body []byte, into any) error { decBody, err := decode(body) if err != nil { return err } return decodeInto(decBody, into) } func decodeInto(body any, into any) error { dec, err := mapDecoder(into) if err != nil { return err } return dec.Decode(body) } var ( docType = reflect.TypeOf(&Document{}) namedDocType = reflect.TypeOf(&NamedDocument{}) ) func unmarshalDoc(f reflect.Type, t reflect.Type, data any) (any, error) { if f != docType && f != namedDocType { return data, nil } var docData map[string]any if f == docType { doc := data.(*Document) docData = doc.Data docData["id"] = doc.ID docData["coll"] = doc.Coll docData["ts"] = doc.TS } if f == namedDocType { doc := data.(*NamedDocument) docData = doc.Data docData["name"] = doc.Name docData["coll"] = doc.Coll docData["ts"] = doc.TS } result := reflect.New(t).Interface() dec, err := mapDecoder(result) if err != nil { return nil, err } if err := dec.Decode(docData); err != nil { return nil, err } return result, nil } func decode(bodyBytes []byte) (any, error) { var body any if err := json.Unmarshal(bodyBytes, &body); err != nil { return nil, err } return convert(false, body) } func convert(escaped bool, body any) (any, error) { switch b := body.(type) { case map[string]any: if escaped { return convertMap(b) } else { return unboxType(b) } case []any: return convertSlice(b) default: return body, nil } } func convertMap(body map[string]any) (map[string]any, error) { retBody := map[string]any{} for k, vRaw := range body { if v, err := convert(false, vRaw); err != nil { return nil, err } else { retBody[k] = v } } return retBody, nil } func convertSlice(body []any) ([]any, error) { for i, vRaw := range body { if v, err := convert(false, vRaw); err != nil { return nil, err } else { body[i] = v } } return body, nil } func unboxType(body map[string]any) (any, error) { if len(body) == 1 { for boxedK, v := range body { switch typeTag(boxedK) { case typeTagInt, typeTagLong: return unboxInt(v.(string)) case typeTagDouble: return unboxDouble(v.(string)) case typeTagDate: return unboxDate(v.(string)) case typeTagTime: return unboxTime(v.(string)) case typeTagMod: return unboxMod(v.(string)) case typeTagRef: return unboxRef(v.(map[string]any)) case typeTagSet: return unboxSet(v) case typeTagStream: return unboxStream(v) case typeTagDoc: return unboxDoc(v.(map[string]any)) case typeTagObject: return convertMap(v.(map[string]any)) case typeTagBytes: return unboxBytes(v.(string)) } } } return convertMap(body) } func unboxMod(v string) (*Module, error) { m := Module{v} return &m, nil } func getColl(v map[string]any) (*Module, error) { if coll, ok := v["coll"]; ok { modI, err := convert(false, coll) if err != nil { return nil, err } if mod, ok := modI.(*Module); ok { return mod, nil } } return nil, nil } func getIDorName(v map[string]any) (id string, name string) { if idRaw, ok := v["id"]; ok { if id, ok := idRaw.(string); ok { return id, "" } } if nameRaw, ok := v["name"]; ok { if name, ok := nameRaw.(string); ok { return "", name } } return } func getExistsCause(v map[string]any) (exists bool, cause string) { if existsRaw, hasExists := v["exists"]; hasExists { if exists = existsRaw.(bool); !exists { if causeRaw, hasCause := v["cause"]; hasCause { return exists, causeRaw.(string) } } } return true, "" } func unboxRef(v map[string]any) (any, error) { mod, err := getColl(v) if err != nil { return nil, err } if mod != nil { id, name := getIDorName(v) if exists, cause := getExistsCause(v); !exists { if id != "" { return &NullDocument{ Ref: &Ref{id, mod}, Cause: cause, }, nil } if name != "" { return &NullNamedDocument{ Ref: &NamedRef{name, mod}, Cause: cause, }, nil } } if id != "" { return &Ref{id, mod}, nil } if name != "" { return &NamedRef{name, mod}, nil } } return nil, fmt.Errorf("invalid ref %v", v) } func unboxDoc(v map[string]any) (any, error) { mod, err := getColl(v) if err != nil { return nil, err } var ts *time.Time if tsRaw, ok := v["ts"]; ok { if tsI, err := convert(false, tsRaw); err != nil { return nil, err } else { if unboxedTS, ok := tsI.(*time.Time); ok { ts = unboxedTS } } } id, name := getIDorName(v) if mod != nil && ts != nil && (id != "" || name != "") { delete(v, "id") delete(v, "coll") delete(v, "ts") if id == "" { delete(v, "name") } data, err := convertMap(v) if err != nil { return nil, err } if id != "" { return &Document{ID: id, Coll: mod, TS: ts, Data: data}, nil } if name != "" { return &NamedDocument{Name: name, Coll: mod, TS: ts, Data: data}, nil } } return nil, fmt.Errorf("invalid doc %v", v) } func unboxSet(v any) (any, error) { if set, ok := v.(string); ok { setC := Page{After: set} return &setC, nil } set := v.(map[string]any) if dataI, ok := set["data"]; ok { if dataRaw, ok := dataI.([]any); ok { data, err := convertSlice(dataRaw) if err != nil { return nil, err } setC := Page{Data: data} if afterRaw, ok := set["after"]; ok { if after, ok := afterRaw.(string); ok { setC.After = after } } return &setC, nil } } return nil, fmt.Errorf("invalid set %v", v) } func unboxStream(v any) (any, error) { if token, ok := v.(string); ok { return EventSource(token), nil } else { return nil, fmt.Errorf("invalid stream %v", v) } } func unboxTime(v string) (*time.Time, error) { if t, err := time.Parse(timeFormat, v); err != nil { return nil, err } else { return &t, nil } } func unboxDate(v string) (*time.Time, error) { if t, err := time.Parse(dateFormat, v); err != nil { return nil, err } else { return &t, nil } } func unboxInt(v string) (any, error) { if i, err := strconv.ParseInt(v, 10, 64); err != nil { return nil, err } else { return i, nil } } func unboxDouble(v string) (any, error) { if i, err := strconv.ParseFloat(v, 64); err != nil { return nil, err } else { return i, nil } } func unboxBytes(v string) (any, error) { if b, err := base64.StdEncoding.DecodeString(v); err != nil { return nil, err } else { return b, nil } } func marshal(v any) ([]byte, error) { if enc, err := encode(v, ""); err != nil { return nil, err } else { return json.Marshal(enc) } } func encode(v any, hint string) (any, error) { switch vt := v.(type) { case *queryFragment: return encodeQueryFragment(vt) case *Query: return encodeQuery(vt) case Module: return encodeMod(vt) case Ref, NamedRef: return encodeFaunaStruct(typeTagRef, vt) case Document, NamedDocument: return encodeFaunaStruct(typeTagDoc, vt) case NullDocument, NullNamedDocument: return encodeStruct(v) case Page: return encodeFaunaStruct(typeTagSet, vt) case EventSource: return map[typeTag]any{typeTagStream: vt}, nil case time.Time: return encodeTime(vt, hint) case queryRequest: query, err := encode(vt.Query, hint) if err != nil { return nil, err } out := map[string]any{"query": query} if len(vt.Arguments) > 0 { if args, err := encodeMap(reflect.ValueOf(vt.Arguments)); err != nil { return nil, err } else { out["arguments"] = args } } return out, nil case streamRequest: out := map[string]any{"token": string(vt.Stream)} if vt.StartTS > 0 { out["start_ts"] = vt.StartTS } if len(vt.Cursor) > 0 { out["cursor"] = vt.Cursor } return out, nil case feedRequest: out := map[string]any{"token": string(vt.Source)} if vt.PageSize > 0 { out["page_size"] = vt.PageSize } if vt.StartTS > 0 { out["start_ts"] = vt.StartTS } if len(vt.Cursor) > 0 { out["cursor"] = vt.Cursor } return out, nil case []byte: return encodeBytes(vt) } switch value := reflect.ValueOf(v); value.Kind() { case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64: if i := value.Int(); i < minLong { return nil, fmt.Errorf("numeric value is outside Fauna's type constraints") } else { return encodeInt(i) } case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr: if i := value.Uint(); i > maxLong { return nil, fmt.Errorf("numeric value is outside Fauna's type constraints") } else { return encodeInt(int64(i)) } case reflect.Float32, reflect.Float64: return map[typeTag]any{typeTagDouble: strconv.FormatFloat(value.Float(), 'f', -1, 64)}, nil case reflect.Ptr: if value.IsNil() { return nil, nil } return encode(reflect.Indirect(value).Interface(), hint) case reflect.Struct: return encodeStruct(v) case reflect.Map: return encodeMap(value) case reflect.Slice: return encodeSlice(value) } return v, nil } func encodeBytes(b []byte) (any, error) { return map[typeTag]any{ typeTagBytes: base64.StdEncoding.EncodeToString(b), }, nil } func encodeInt(i int64) (any, error) { tag := typeTagLong if i <= maxInt && i >= minInt { tag = typeTagInt } return map[typeTag]any{tag: strconv.FormatInt(i, 10)}, nil } func encodeTime(t time.Time, hint string) (any, error) { out := make(map[typeTag]any) if hint == "date" { out[typeTagDate] = t.Format(dateFormat) } else { out[typeTagTime] = t.UTC().Format(timeFormat) } return out, nil } func encodeMod(m Module) (any, error) { return map[typeTag]string{typeTagMod: m.Name}, nil } func encodeFaunaStruct(tag typeTag, s any) (any, error) { if doc, err := encodeStruct(s); err != nil { return nil, err } else { return map[typeTag]any{tag: doc}, nil } } func encodeMap(mv reflect.Value) (any, error) { hasConflictingKey := false out := make(map[string]any) mi := mv.MapRange() for i := 0; mi.Next(); i++ { if mi.Key().Kind() != reflect.String { return mv.Interface(), nil } if enc, err := encode(mi.Value().Interface(), ""); err != nil { return nil, err } else { key := mi.Key().String() if keyConflicts(key) { hasConflictingKey = true } out[key] = enc } } if hasConflictingKey { return map[typeTag]any{typeTagObject: out}, nil } else { return out, nil } } func encodeSlice(sv reflect.Value) (any, error) { sLen := sv.Len() out := make([]any, sLen) for i := 0; i < sLen; i++ { if enc, err := encode(sv.Index(i).Interface(), ""); err != nil { return nil, err } else { out[i] = enc } } return out, nil } func encodeStruct(s any) (any, error) { hasConflictingKey := false isDoc := false out := make(map[string]any) elem := reflect.ValueOf(s) fields := reflect.TypeOf(s).NumField() for i := 0; i < fields; i++ { structField := elem.Type().Field(i) if structField.Anonymous && (structField.Name == "NullDocument") { doc := elem.Field(i).Interface().(NullDocument) if doc.Ref != nil { out["cause"] = doc.Cause if ref, err := encode(doc.Ref, ""); err != nil { return nil, err } else { out["ref"] = ref } continue } } if structField.Anonymous && (structField.Name == "NullNamedDocument") { doc := elem.Field(i).Interface().(NullNamedDocument) if doc.Ref != nil { out["cause"] = doc.Cause if ref, err := encode(doc.Ref, ""); err != nil { return nil, err } else { out["ref"] = ref } continue } } if structField.Anonymous && structField.Name == "Document" { doc := elem.Field(i).Interface().(Document) // if the relevant fields are present, consider this an @doc and encode it as such if doc.ID != "" && doc.Coll != nil && doc.TS != nil { out["id"] = doc.ID if coll, err := encode(doc.Coll, ""); err != nil { return nil, err } else { out["coll"] = coll } if ts, err := encode(doc.TS, "time"); err != nil { return nil, err } else { out["ts"] = ts } isDoc = true continue } } if structField.Anonymous && structField.Name == "NamedDocument" { doc := elem.Field(i).Interface().(NamedDocument) // if the relevant fields are present, consider this an @doc and encode it as such if doc.Name != "" && doc.Coll != nil && doc.TS != nil { out["name"] = doc.Name if coll, err := encode(doc.Coll, ""); err != nil { return nil, err } else { out["coll"] = coll } if ts, err := encode(doc.TS, "time"); err != nil { return nil, err } else { out["ts"] = ts } isDoc = true continue } } tag := structField.Tag.Get(fieldTag) tags := strings.Split(tag, ",") if len(tags) > 0 && tags[0] == "-" { continue } typeHint := "" if len(tags) > 1 { typeHint = tags[1] } if enc, err := encode(elem.Field(i).Interface(), typeHint); err != nil { return nil, err } else { name := tags[0] if name == "" { name = structField.Name } if keyConflicts(name) { hasConflictingKey = true } out[name] = enc } } if isDoc { return map[typeTag]any{typeTagDoc: out}, nil } if hasConflictingKey { return map[typeTag]any{typeTagObject: out}, nil } return out, nil } func encodeQuery(q *Query) (any, error) { const fqlLabel = "fql" rendered := make([]any, len(q.fragments)) for i, f := range q.fragments { if r, err := encode(f, ""); err != nil { return nil, err } else { rendered[i] = r } } return map[string]any{fqlLabel: rendered}, nil } func encodeQueryFragment(f *queryFragment) (any, error) { if f.literal { return f.value, nil } ret, err := encode(f.value, "") if err != nil { return nil, err } if _, ok := f.value.(*Query); ok { return ret, nil } else { const valLabel = "value" return map[string]any{valLabel: ret}, nil } } ``` ## File: stream_test.go ```go package fauna_test import ( "errors" "testing" "github.com/fauna/fauna-go/v3" "github.com/stretchr/testify/require" ) func TestStreaming(t *testing.T) { t.Setenv(fauna.EnvFaunaEndpoint, fauna.EndpointLocal) t.Setenv(fauna.EnvFaunaSecret, "secret") client, clientErr := fauna.NewDefaultClient() require.NoError(t, clientErr) setupQ, _ := fauna.FQL(` Collection.byName('StreamingTest')?.delete() Collection.create({ name: 'StreamingTest' }) `, nil) _, err := client.Query(setupQ) require.NoError(t, err) type TestDoc struct { Foo string `fauna:"foo"` } t.Run("single-step streaming", func(t *testing.T) { t.Run("StreamFromQuery events", func(t *testing.T) { streamQ, _ := fauna.FQL(`StreamingTest.all().eventSource()`, nil) events, err := client.StreamFromQuery(streamQ, nil) require.NoError(t, err) defer func() { _ = events.Close() }() var event fauna.Event err = events.Next(&event) require.NoError(t, err) require.Equal(t, fauna.StatusEvent, event.Type) require.NotNil(t, event.Stats.ProcessingTimeMs) }) t.Run("Fails on non-streamable values", func(t *testing.T) { streamQ, _ := fauna.FQL(`"I'm a string"`, nil) events, err := client.StreamFromQuery(streamQ, nil) require.ErrorContains(t, err, "query should return a fauna.EventSource but got string") require.Nil(t, events) }) }) t.Run("multi-step streaming", func(t *testing.T) { t.Run("StreamFromQuery events", func(t *testing.T) { streamQ, _ := fauna.FQL(`StreamingTest.all().eventSource()`, nil) res, err := client.Query(streamQ) require.NoError(t, err) var stream fauna.EventSource require.NoError(t, res.Unmarshal(&stream)) events, err := client.Stream(stream) require.NoError(t, err) defer func() { _ = events.Close() }() var event fauna.Event err = events.Next(&event) require.NoError(t, err) require.Equal(t, fauna.StatusEvent, event.Type) createQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil) _, err = client.Query(createQ) require.NoError(t, err) err = events.Next(&event) require.NoError(t, err) require.Equal(t, fauna.AddEvent, event.Type) var doc TestDoc require.NoError(t, event.Unmarshal(&doc)) require.Equal(t, "bar", doc.Foo) require.NoError(t, events.Close()) }) t.Run("Handle subscription errors", func(t *testing.T) { events, err := client.Stream("abc1234==") require.IsType(t, err, &fauna.ErrInvalidRequest{}) require.Nil(t, events) }) t.Run("Handle error events", func(t *testing.T) { streamQ, _ := fauna.FQL(`StreamingTest.all().map(doc => abort('oops')).eventSource()`, nil) res, err := client.Query(streamQ) require.NoError(t, err) var stream fauna.EventSource require.NoError(t, res.Unmarshal(&stream)) events, err := client.Stream(stream) require.NoError(t, err) defer func() { _ = events.Close() }() var event fauna.Event err = events.Next(&event) require.NoError(t, err) require.Equal(t, fauna.StatusEvent, event.Type) createQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil) _, err = client.Query(createQ) require.NoError(t, err) err = events.Next(&event) require.IsType(t, err, &fauna.ErrEvent{}) var evErr *fauna.ErrEvent require.True(t, errors.As(err, &evErr)) require.Equal(t, "abort", evErr.Code) require.Equal(t, "Query aborted.", evErr.Message) var msg string require.NoError(t, evErr.Unmarshal(&msg)) require.Equal(t, "oops", msg) require.NoError(t, events.Close()) }) t.Run("Resume a stream at a given start time", func(t *testing.T) { streamQ, _ := fauna.FQL(`StreamingTest.all().eventSource()`, nil) res, err := client.Query(streamQ) require.NoError(t, err) var stream fauna.EventSource require.NoError(t, res.Unmarshal(&stream)) createFooQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'foo' })`, nil) createBarQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil) foo, err := client.Query(createFooQ) require.NoError(t, err) bar, err := client.Query(createBarQ) require.NoError(t, err) events, err := client.Stream(stream, fauna.StreamStartTimeUnixMicros(foo.TxnTime)) require.NoError(t, err) defer func() { _ = events.Close() }() var event fauna.Event err = events.Next(&event) require.NoError(t, err) require.Equal(t, fauna.StatusEvent, event.Type) require.GreaterOrEqual(t, event.TxnTime, foo.TxnTime) err = events.Next(&event) require.NoError(t, err) require.Equal(t, fauna.AddEvent, event.Type) require.Equal(t, bar.TxnTime, event.TxnTime) }) t.Run("Resume a stream at a given event cursor", func(t *testing.T) { streamQ, _ := fauna.FQL(`StreamingTest.all().eventSource()`, nil) res, err := client.Query(streamQ) require.NoError(t, err) var stream fauna.EventSource require.NoError(t, res.Unmarshal(&stream)) events, err := client.Stream(stream) require.NoError(t, err) defer func() { _ = events.Close() }() createFooQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'foo' })`, nil) createBarQ, _ := fauna.FQL(`StreamingTest.create({ foo: 'bar' })`, nil) foo, err := client.Query(createFooQ) require.NoError(t, err) bar, err := client.Query(createBarQ) require.NoError(t, err) var event fauna.Event err = events.Next(&event) require.NoError(t, err) require.Equal(t, fauna.StatusEvent, event.Type) err = events.Next(&event) require.NoError(t, err) require.Equal(t, fauna.AddEvent, event.Type) require.Equal(t, foo.TxnTime, event.TxnTime) _ = events.Close() events, err = client.Stream(stream, fauna.EventCursor(event.Cursor)) require.NoError(t, err) defer func() { _ = events.Close() }() err = events.Next(&event) require.NoError(t, err) require.Equal(t, fauna.StatusEvent, event.Type) require.GreaterOrEqual(t, foo.TxnTime, event.TxnTime) err = events.Next(&event) require.NoError(t, err) require.Equal(t, fauna.AddEvent, event.Type) require.Equal(t, bar.TxnTime, event.TxnTime) }) }) } ``` ## File: stream.go ```go package fauna import ( "encoding/json" "errors" "io" "net" ) // EventType represents a Fauna's event type. type EventType string const ( // AddEvent happens when a new value is added to the stream's watched set. AddEvent EventType = "add" // UpdateEvent happens when a value in the stream's watched set changes. UpdateEvent EventType = "update" // RemoveEvent happens when a value in the stream's watched set is removed. RemoveEvent EventType = "remove" // StatusEvent happens periodically and communicates the stream's latest // transaction time as well as ops acquired during its idle period. StatusEvent EventType = "status" ) // Event represents a streaming event. // // EventStream of type [fauna.StatusEvent] have its [fauna.Event.Data] field set to // nil. Other event's [fauna.Data] can be unmarshaled via the // [fauna.Event.Unmarshal] method. type Event struct { // Type is this event's type. Type EventType // TxnTime is the transaction time that produce this event. TxnTime int64 // Cursor is the event's cursor, used for resuming streams after crashes. Cursor string // Data is the event's data. Data any // Stats contains the ops acquired to process the event. Stats Stats } // Unmarshal will unmarshal the raw [fauna.Event.Data] (if present) into the // known type provided as `into`. `into` must be a pointer to a map or struct. func (e *Event) Unmarshal(into any) error { return decodeInto(e.Data, into) } // ErrEvent contains error information present in error events. // // Error events with "abort" code contain its aborting value present in the // [fauna.ErrEvent.Abort]. The aborting values can be unmarshaled with the // [fauna.ErrEvent.Unmarshal] method. type ErrEvent struct { // Code is the error's code. Code string `json:"code"` // Message is the error's message. Message string `json:"message"` // Abort is the error's abort data, present if [fauna.ErrEvent.Code] is // equals to "abort". Abort any `json:"abort,omitempty"` } // Error provides the underlying error message. func (e *ErrEvent) Error() string { return e.Message } // Unmarshal will unmarshal the raw [fauna.ErrEvent.Abort] (if present) into the // known type provided as `into`. `into` must be a pointer to a map or struct. func (e *ErrEvent) Unmarshal(into any) error { return decodeInto(e.Abort, into) } // EventStream is an iterator of Fauna events. // // The next available event can be obtained by calling the // [fauna.EventStream.Next] method. Note this method blocks until the next // event is available or the events iterator is closed via the // [fauna.EventStream.Close] method. // // The events iterator wraps an [http.Response.Body] reader. As per Go's current // [http.Response] implementation, environments using HTTP/1.x may not reuse its // TCP connections for the duration of its "keep-alive" time if response body is // not read to completion and closed. By default, Fauna's region groups use the // HTTP/2.x protocol where this restriction doesn't apply. However, if connecting // to Fauna via an HTTP/1.x proxy, be aware of the events iterator closing time. type EventStream struct { client *Client stream EventSource byteStream io.ReadCloser decoder *json.Decoder lastCursor string closed bool } func subscribe(client *Client, stream EventSource, opts ...StreamOptFn) (*EventStream, error) { events := &EventStream{client: client, stream: stream} if err := events.reconnect(opts...); err != nil { return nil, err } return events, nil } func (es *EventStream) reconnect(opts ...StreamOptFn) error { req := streamRequest{ apiRequest: apiRequest{ es.client.ctx, es.client.headers, }, Stream: es.stream, Cursor: es.lastCursor, } for _, streamOptionFn := range opts { streamOptionFn(&req) } byteStream, err := req.do(es.client) if err != nil { return err } es.byteStream = byteStream es.decoder = json.NewDecoder(byteStream) return nil } // Close gracefully closes the events iterator. See [fauna.EventStream] for details. func (es *EventStream) Close() (err error) { if !es.closed { es.closed = true err = es.byteStream.Close() } return } type rawEvent = struct { Type EventType `json:"type"` TxnTime int64 `json:"txn_ts"` Cursor string `json:"cursor"` Data any `json:"data,omitempty"` Error *ErrEvent `json:"error,omitempty"` Stats Stats `json:"stats"` } // Next blocks until the next event is available. // // Note that network errors of type [fauna.ErrEvent] are considered fatal and // close the underlying stream. Calling next after an error event occurs will // return an error. func (es *EventStream) Next(event *Event) (err error) { raw := rawEvent{} if err = es.decoder.Decode(&raw); err == nil { es.onNextEvent(&raw) err = convertRawEvent(&raw, event) var errEvent *ErrEvent if errors.As(err, &errEvent) { _ = es.Close() // no more events are coming } } else if !es.closed { // NOTE: This code tries to resume streams on network and IO errors. It // presumes that if the service is unavailable, the reconnect call will // fail. Automatic retries and backoff mechanisms are implemented at the // Client level. var netError net.Error if errors.As(err, &netError) || err == io.EOF || errors.Is(err, io.ErrUnexpectedEOF) { if err = es.reconnect(); err == nil { err = es.Next(event) } } } return } func (es *EventStream) onNextEvent(event *rawEvent) { es.client.lastTxnTime.sync(event.TxnTime) es.lastCursor = event.Cursor } func convertRawEvent(raw *rawEvent, event *Event) (err error) { if raw.Error != nil { if raw.Error.Abort != nil { if raw.Error.Abort, err = convert(false, raw.Error.Abort); err != nil { return } } err = raw.Error } else { if raw.Data != nil { if raw.Data, err = convert(false, raw.Data); err != nil { return } } event.Type = raw.Type event.TxnTime = raw.TxnTime event.Cursor = raw.Cursor event.Data = raw.Data event.Stats = raw.Stats } return } ``` ## File: template_test.go ```go package fauna import ( "testing" "github.com/stretchr/testify/assert" ) type TemplateSuccessCase struct { given string wants *[]templatePart } type TemplateErrorCase struct { given string error string } func TestTemplate_ParseSuccess(t *testing.T) { testCases := []TemplateSuccessCase{ { "let x = ${my_var}", &[]templatePart{ { "let x = ", templateLiteral, }, { "my_var", templateVariable, }, }, }, { "let x = ${my_var}\nlet y = ${my_var}\nx * y", &[]templatePart{ { "let x = ", templateLiteral, }, { "my_var", templateVariable, }, { "\nlet y = ", templateLiteral, }, { "my_var", templateVariable, }, { "\nx * y", templateLiteral, }, }, }, { "${my_var} { .name }", &[]templatePart{ { "my_var", templateVariable, }, { " { .name }", templateLiteral, }, }, }, { "let x = '$${not_a_var}'", &[]templatePart{ { "let x = '$", templateLiteral, }, { "{not_a_var}'", templateLiteral, }, }, }, } for _, tc := range testCases { parsed, err := parseTemplate(tc.given) if !assert.NoError(t, err) { return } assert.Equal(t, len(*tc.wants), len(parsed)) for i, tp := range parsed { expected := (*tc.wants)[i] assert.Equal(t, expected.Text, tp.Text) assert.Equal(t, expected.Category, tp.Category) } } } func TestTemplate_ParseFail(t *testing.T) { testCases := []TemplateErrorCase{ { "let x = ${かわいい}", "invalid placeholder in template: position 9", }, } for _, tc := range testCases { _, err := parseTemplate(tc.given) if assert.Error(t, err) { assert.EqualError(t, err, tc.error) } } } ``` ## File: template.go ```go package fauna import ( "fmt" "regexp" ) type templateCategory string const ( templateVariable templateCategory = "variable" templateLiteral templateCategory = "literal" ) var ( templateRegex = regexp.MustCompile(`\$(?:(?P\$)|{(?P[_a-zA-Z0-9]*)}|(?P))`) escapedIndex = templateRegex.SubexpIndex("escaped") bracedIndex = templateRegex.SubexpIndex("braced") invalidIndex = templateRegex.SubexpIndex("invalid") ) type templatePart struct { Text string Category templateCategory } // Parse parses Text and returns a slice of template parts. func parseTemplate(text string) ([]templatePart, error) { end := len(text) currentPosition := 0 matches := templateRegex.FindAllStringSubmatch(text, -1) matchIndexes := templateRegex.FindAllStringSubmatchIndex(text, -1) parts := make([]templatePart, 0) for i, m := range matches { matchIndex := matchIndexes[i] invalidStartPos := matchIndex[invalidIndex*2] if invalidStartPos >= 0 { // TODO: Improve with line/column num return nil, fmt.Errorf("invalid placeholder in template: position %d", invalidStartPos) } matchStartPos := matchIndex[0] matchEndPos := matchIndex[1] escaped := m[escapedIndex] variable := m[bracedIndex] if currentPosition < matchStartPos { parts = append(parts, templatePart{ Text: text[currentPosition:matchStartPos] + escaped, Category: templateLiteral, }) } if len(variable) > 0 { parts = append(parts, templatePart{ Text: variable, Category: templateVariable, }) } currentPosition = matchEndPos } if currentPosition < end { parts = append(parts, templatePart{Text: text[currentPosition:], Category: templateLiteral}) } return parts, nil } ``` ## File: txntime_test.go ```go package fauna import ( "testing" "time" "github.com/stretchr/testify/require" ) func TestTxnTime(t *testing.T) { txnTime := txnTime{} require.Equal(t, int64(0), txnTime.get()) require.Equal(t, "", txnTime.string()) txnTime.sync(42) // move forward require.Equal(t, int64(42), txnTime.get()) require.Equal(t, "42", txnTime.string()) txnTime.sync(32) // don't move back require.Equal(t, int64(42), txnTime.get()) require.Equal(t, "42", txnTime.string()) } func BenchmarkTxnTime(b *testing.B) { txnTime := txnTime{} b.RunParallel(func(pb *testing.PB) { for pb.Next() { now := time.Now() txnTime.sync(now.UnixMicro()) } }) } ``` ## File: txntime.go ```go package fauna import ( "strconv" "sync/atomic" ) type txnTime struct { value atomic.Int64 } func (t *txnTime) get() int64 { return t.value.Load() } func (t *txnTime) sync(newTxnTime int64) { for { oldTxnTime := t.value.Load() if oldTxnTime >= newTxnTime || t.value.CompareAndSwap(oldTxnTime, newTxnTime) { break } } } func (t *txnTime) string() (str string) { if lastSeen := t.value.Load(); lastSeen != 0 { str = strconv.FormatInt(lastSeen, 10) } return } ```