Event polling

Crunchy Bridge doesn't provide webhooks, but it does have a real-time event API that can be polled for changes on a cluster or team. This page describes how to best design a poll loop that runs efficiently, and avoids pitfalls like missed events.

The event API is used through the event list endpoint, which filters events using a cluster_id or team_id parameter.

curl -X GET -H "Authorization: Bearer cbkey_123" "https://api.crunchybridge.com/events?cluster_id=d56sve3hpjfatlxpxhe6gfkzpi&limit=10&order=desc"

Polling best practices

  • Provision an API key dedicated to the specific polling task that's named after the program's name. This enables easy key rotation if necessary (because nothing else is using it), and makes the key less likely to be deleted accidentally (because its well formed name tells people what it's for).

  • Use pagination to keep track of the last seen event so only new events are returned for subsequent fetches.

  • Poll the event API frequently, but avoid polling with hyper frequency. Once every 15 or 30 seconds is enough to get a good balance between seeing new events in a timely manner and being a social API consumer.

  • Request events with delay=10s to avoid missed events that may otherwise occur from out-of-order transaction commits. See stream delay for more information.

Stream delay

Consuming the event stream with a short delay is recommended because occasionally events may appear near the stream's end, but not exactly at its end. When combined with using the last seen event as a cursor and unlucky timing on a poll loop, this might cause a consumer to miss an event.

Out-of-order events happen for two reasons:

  • Like our customers, Crunchy Bridge runs on Postgres. Each API request runs in a transaction in which inserted records aren't visible to other transactions until their own transaction commits. A transaction may generate an event earlier than another transaction, but commit later, thereby only revealing its event after other events that were created later, but committed sooner.

    For example, tx1 generates e1 (event 1), then tx2 opens and generates e2. tx2 commits first, making e2 available in the event API. A consumer makes a fetch and sees e2. tx1 commits and e1 becomes visible, but too late because the consumer is already using e2 as its cursor.

  • Event IDs are ULIDs (similar to a V7 UUID), which encode the millisecond at which they were generated along a random component of 80 bits. Two IDs generated in exactly the same millisecond may be out of order compared to the precise moment they were generated as the last ID gets a smaller random component by luck of the random number generator.

To avoid missing events in a poll loop, it's recommended that consumers use a slight delay to give all in-flight transactions a chance to commit. The overwhelming number of transactions in Bridge's API take less than a second, but to protect against outliers, it's better to use a larger delay like delay=10s. The maximum duration of a Bridge API request is 30 seconds, so consumers that are willing to tolerate some lag in return for the strongest guarantee that no events will be missed can use delay=30s.

High level program flow

A language agnostic flow for how an event polling loop should work:

  1. Do an initial fetch to get the latest events (use descending order like order=desc). Process them if desired. If you don't need to process old events, use limit=1 to fetch only the latest one.

  2. Get the latest event's ID for use as a cursor. Because the page was fetched in descending order, the latest event is in the first array position.

  3. In a loop:

    1. Fetch events using the last event ID by using it as a cursor as cursor=<id>. This time the fetch should be in ascending order using order=asc or omitting the order parameter. Process any events that are returned.

    2. Get the latest event's ID for use as a cursor on the next loop iteration.

    3. Check the page's has_more property to see if the page hit its size limit and there are note events to fetch.

      • If true (quite unlikely given that Bridge event volumes tend to be quite low), repeat the loop immediately.

      • If false, sleep a reasonable amount of time (e.g. 15 or 30 seconds) and continue the loop.

Listing events uses a GET endpoint, and all parameters mentioned above should be query parameters (as opposed to form-encoded or JSON).

Example program in Ruby

Here's a small Ruby program that implements the routine described above:

require "json"
require "net/http"
require "uri"

# Checks that an HTTP response has a status code indicating success and parses
# its JSON body to a hash.
def check_and_parse(resp)
  status = resp.code.to_i
  if status < 200 || status >= 400
    raise "got non-success response code #{status}: #{resp.body}"
  end

  JSON.parse(resp.body)
end

api_key = ENV["CRUNCHY_API_KEY"] || abort("need CRUNCHY_API_KEY")
api_url = ENV["CRUNCHY_API_URL"] || abort("need CRUNCHY_API_URL")

headers = {
  "Authorization" => "Bearer #{api_key}"
}

parsed_api_url = URI.parse(api_url)

client = Net::HTTP.new(parsed_api_url.host, parsed_api_url.port)
client.use_ssl = true

cluster_or_team_id = case
  when ENV["CLUSTER_ID"] then "cluster_id=#{ENV["CLUSTER_ID"]}"
  when ENV["TEAM_ID"] then "team_id=#{ENV["TEAM_ID"]}"
  end

# Do an initial fetch with `order=desc` to get the latest event. We'll use that
# as a reference to check for new events in the loop below.
data = check_and_parse(client.request(
  Net::HTTP::Get.new("/events?#{cluster_or_team_id}&delay=10s&limit=1&order=desc", headers)
))

# ID of the first element in the `events` array
last_event_id = data.dig("events", 0, "id")
puts "last event ID: #{last_event_id}"

loop do
  # If `last_event_id` is `nil`, that gets encoded as an empty string and the
  # API will interpret that as no cursor.
  data = check_and_parse(client.request(
    Net::HTTP::Get.new("/events?#{cluster_or_team_id}&cursor=#{last_event_id}&delay=10s", headers)
  ))

  if data["events"].empty?
    puts "no new events" 
  else
    data["events"].each do |event|
      puts "got new event of kind: #{event["kind"]}"
    end

    last_event_id = data["events"].last["id"]

    # If there were more new events than what could fit on the page (unlikely,
    # but possible), repeat the loop immediately.
    next if data["has_more"]
  end

  # Sleep between fetches so the API isn't being hammered too hard.
  sleep(15)
end

Sample run:

$ TEAM_ID=qvcw4hylovgyzbwzp53bmmlhga ruby watch_events.rb
last event ID:
no new events
got new event of kind: cluster.created
no new events
got new event of kind: cluster.destroyed