GCP – Making API calls exactly once when using Workflows
Introduction
One challenge with any distributed system, including Workflows, is ensuring that requests sent from one service to another are processed exactly once, when needed; for example, when placing a customer order in a shipping queue, withdrawing funds from a bank account, or processing a payment.
In this blog post, we’ll provide an example of a website invoking Workflows, and Workflows in turn invoking a Cloud Function. We’ll show how to make sure both Workflows and the Cloud Function logic only runs once. We’ll also talk about how to invoke Workflows exactly once when using HTTP callbacks, Pub/Sub messages, or Cloud Tasks.
Invoke Workflows exactly once
Imagine you have an online store and you’re using Workflows to create new orders, save to Firestore, and process payments by calling a Cloud Function:
A new customer order comes in, the website makes an API call to Workflows but receives an error. Two possible scenarios are:
(1) The request is lost and the workflow is never invoked:
(2) The workflow is invoked and executes successfully, however the response is lost:
How can you make sure the workflow executes once?
To solve this, the website retries the same request. One easy solution is to check if a document already exists in Firestore:
<ListValue: [StructValue([(‘code’, ‘main:rn params: []rn steps:rn – init:rn assign:rn – project_id: ${sys.get_env(“GOOGLE_CLOUD_PROJECT_ID”)}rn – order_id: “12345” # In practice we would pass in the order ID as a workflow parameter, e.g. ${params[0]}rn – firestore_collection: “orders”rn – URL: https://us-central1-<your_project_id>.cloudfunctions.net/processpaymentrn – create_document:rn try:rn call: googleapis.firestore.v1.projects.databases.documents.createDocumentrn args:rn collectionId: ${firestore_collection}rn parent: ${“projects/” + project_id + “/databases/(default)/documents”}rn query:rn documentId: ${order_id}rn except:rn as: ern steps:rn – endEarly:rn return: ${e} # Exception is raised, e.g. ${e.code == 409} if doc already existsrn – processPayment:rn …’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3eeb2b9e2af0>)])]>
The processPayment step will execute only if a document is successfully created. This is effectively a 1-bit state machine, idempotent, and a valid solution. The downside of this solution is that it’s not extensible. We might want to complete additional work in this handler before changing states, or expand the number of states within the system. Next, let’s continue with a more advanced solution for the same problem.
Invoke Cloud Functions from Workflows exactly once
Let’s see what happens when the workflow uses a Cloud Function to process the payment. You might have the following step to call Cloud Functions:
<ListValue: [StructValue([(‘code’, ‘- processPayment:rn call: http.postrn args:rn url: https://us-central1-<your_project_id>.cloudfunctions.net/processpaymentrn auth:rn type: OIDC’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3eeb2b9e29d0>)])]>
By default, Workflows offers at-most-once delivery (no retries) with HTTP requests. That’s usually OK because 99.9+% of the time, the call is successful, and a response is received.
In the rare case of failure, a ConnectionError might be raised. As in the website-to-workflow situation discussed previously, the workflow can’t tell which scenario occurred. Similarly, you can add retries.
Let’s add a default retry policy to handle this:
<ListValue: [StructValue([(‘code’, “- processPayment:rn try:rn call: http.postrn args:rn url: https://us-central1-<your_project_id>.cloudfunctions.net/processpaymentrn auth:rn type: OIDCrn retry: ${http.default_retry} # Retries up to 5 times, includes ‘ConnectionError'”), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3eeb2b9e2ca0>)])]>
Let’s say the second delivery scenario occurs where the request is received by the Cloud Function but the response is lost. By adding retries, Workflows will likely invoke the Cloud Function multiple times. When this happens, how do you ensure that the code in the Cloud Function only runs once?
You’ll need to add extra logic to the Cloud Function to check and update the payment state in Firestore:
Let’s also assume you want to track the workflow EXECUTION_ID in Firestore and use the following order_state enum to allow for additional flexibility in payment processing:
<ListValue: [StructValue([(‘code’, ‘payment_not_processed // Initial state when an order is createdrnpayment_declined // Payment was not successfulrnpayment_successful // Payment processed successfullyrn…’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3eeb2b9e20d0>)])]>
You can expand on the previous workflow and call a Cloud Function to process the payment:
<ListValue: [StructValue([(‘code’, ‘main:rn params: []rn steps:rn – init:rn assign:rn – project_id: ${sys.get_env(“GOOGLE_CLOUD_PROJECT_ID”)}rn – order_id: “12345” # In practice we would pass in the order ID as a workflow parameter, e.g. ${params[0]}rn – firestore_collection: “orders”rn – URL: https://us-central1-<your_project_id>.cloudfunctions.net/processpaymentrn – create_document:rn try:rn call: googleapis.firestore.v1.projects.databases.documents.createDocumentrn args:rn collectionId: ${firestore_collection}rn parent: ${“projects/” + project_id + “/databases/(default)/documents”}rn query:rn documentId: ${order_id}rn body:rn fields:rn order_state: # We set an initial statern stringValue: “payment_not_processed”rn workflow_id: # And also track this workflow execution IDrn stringValue: ${sys.get_env(“GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID”)}rn except:rn as: ern steps:rn – endEarly:rn return: ${e} # Exception is raised, e.g. ${e.code == 409} if doc already existsrn – processPayment:rn try:rn call: http.postrn args:rn url: ${URL} # Might get called multiple times!rn auth:rn type: OIDCrn body:rn order_id: ${order_id}rn result: rrn retry: ${http.default_retry}rn – returnStep:rn return: ${r}’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3eeb2b9e2bb0>)])]>
Here’s the Cloud Function (Node.js v20) that processes the payment:
<ListValue: [StructValue([(‘code’, ‘const functions = require(‘@google-cloud/functions-framework’);rnconst firestore = require(‘@google-cloud/firestore’);rnrnrnfunctions.http(‘helloHttp’, (req, res) => {rn const fs = new firestore.Firestore();rn try{rn// Reads the current state from Firestore and updates it within the same transaction to make this handler idempotent. Using a transaction is important. Note: It could be run multiple times but will only be committed once.rn return fs.runTransaction(t => {rn const docRef = fs.doc(“orders/” + req.body.order_id);rn return t.get(docRef).then(doc => {rn console.log(doc, ‘=>’, doc);rn var state = doc.data().order_statern // Only process the order if we haven’t alreadyrn if (state == “payment_not_processed”) {rn // Do payment stuff, e.g. debit account from another Firestore documentrn // …rn //rn state = “payment_successful”rn t.update(docRef, {order_state: state})rn res.status(200).send(state);rn returnrn }rn res.status(200).send(“request ignored, state already: ” + state);rn });rn }).then(result => {rn console.log(‘Transaction result: ‘, result);rn });rn } catch (e) {rn console.log(‘Transaction failure:’, e);rn } rn});’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3eeb2b9e27f0>)])]>
package.json
<ListValue: [StructValue([(‘code’, ‘{rn “dependencies”: {rn “@google-cloud/functions-framework”: “^3.3.0”,rn “@google-cloud/firestore”: “^7.6.0″rn }rn}’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3eeb2b9e2340>)])]>
The key takeaway is that all payment processing work occurs within a transaction, making all actions idempotent. The code within the transaction might run multiple times due to Workflows retries, but it’s only committed once.
What about HTTP callbacks, Pub/Sub, Cloud Tasks?
So far, we’ve talked about how to make website-to-workflow and Workflows to Cloud Functions requests, exactly once. There are other ways of invoking or resuming Workflows such as HTTP callbacks, Pub/Sub messages or Cloud Tasks. How do you make those requests exactly once? Let’s take a look.
Callbacks
The good news is that Workflows HTTP callbacks are fully idempotent by default. It’s safe to retry a callback if it fails. For example:
<ListValue: [StructValue([(‘code’, ‘- createCallbackStep:rn call: events.create_callback_endpointrn args:rn http_callback_method: “POST”rn result: callback_detailsrn- sendOutURL:rn call: http.postrn args:rn url: “https://your-endpoint.com/foo”rn body:rn callback_to_use: ${callback_details.url}rn…rn- callbackWaitStep:rn call: events.await_callbackrn args:rn callback: ${callback_details}’), (‘language’, ”), (‘caption’, <wagtail.rich_text.RichText object at 0x3eeb2b9e2970>)])]>
Let’s assume that the first callback returns an error to the external caller. Based on the error, the caller might not know if the workflow callback was received, and should retry the callback. On the second callback, the caller will receive one of the following HTTP status codes:
429 indicates that the first callback was received successfully. The second callback is rejected by the workflow.
200 indicates that the second callback was received successfully. The first callback was either never received, or was received and processed successfully. If the latter, the second callback is not processed because await_callback is called only once. The second callback is discarded at the end of the workflow.
404 indicates that a callback is not available. Either the first callback was received and processed and the workflow has completed, or the workflow is not running (and has failed, for example). To confirm this, you’ll need to send an API request to query the workflow execution state.
For more details, see Invoke a workflow exactly once using callbacks.
Pub/Sub messages
When using Pub/Sub to trigger a new workflow execution, Pub/Sub uses at-least-once delivery with Workflows, and will retry on any delivery failure. Pub/Sub messages are automatically deduplicated. You don’t need to worry about duplicate deliveries in that time window (24 hours).
Cloud Tasks
Cloud Tasks is commonly used to buffer workflow executions and provides at-least-once delivery but it doesn’t have message deduplication. Workflow handlers should be idempotent.
Conclusion
Exactly-once request processing is a hard problem. In this blog post, we’ve outlined some scenarios where you might need exactly-once request processing when you’re using Workflows. We also provided some ideas on how you can implement it. The exact solution will depend on the actual use case and the services involved.
Read More for the details.