Adventures with Azure: Cosmos DB, Azure Functions, Event Grid, Oh My!

To get a better understanding of the Cosmos DB Change, Event Grid, retry and fail over logic, I decided to build a system that would respond to changes in a Cosmos DB, send events to an Azure Event Grid, which in turn would forward the events to subscribers.

I broke this article in to three parts.

In Part 1, we will create a Cosmos DB Trigger Azure Function to respond to document adds and updates from a Cosmos DB database, which will then forward the documents to an Event Grid Topic.

We will create a single subscription to an Event Grid Topic which will forward events to an Azure Store Queue with an Event Grid Subscription.

If for some reason events can not be published to the Event Grid Topic, latency or throttling with Event Grid, the Cosmos DB Trigger Azure Function will make use of retry logic to attempt to publish the events again, until the retries are exhausted.

In Part 2, we will add an Event Grid Trigger Azure Function to subscribe to Event Grid messages.

In Part 3, we will add fail over logic to the Cosmos DB Trigger Azure Function if it is unable to publish event to the Event Grid topic , e.g. Event Grid is down, it will instead push the events to Azure Blob Storage.

Since Azure Functions do not support output bindings for Azure Event Grid We will have to publish the events to Azure Event Grid with code, using the Azure Event Grid SDK.

This article assumes some familiarity with Azure and creating various Azure Resources.

Part 1

Setup Azure Environment

Create the following Azure resources:

  • Create a Resource Group
  • Create a Storage Account
  • Create a Function App
    • Create with an Application Insights resource
  • Create an Event Grid Topic
  • Create a Storage Queue for the Storage Account
    • Name the storage queue the same as the Event Grid Topic created in the previous step
  • Create an Event Grid Subscription
    • Name the subscription the same as the Storage Queue created in the previous step
    • Create with a Storage Queue endpoint
  • Create a Cosmos DB account
    • Create a database for the Cosmos DB account
    • Create a container for the Cosmos DB database, called persons

Create the Function App Project

Open up Visual Studio, or VS Code.

Create an Azure Function App project, I named mine FuncApp1.

Add a folder named Helpers.

Add a class named Person.cs.

Paste into your class the code snippet as follows:

public class Person
{
    [JsonProperty("id")]
    public Guid Id { get; set; }

    [JsonProperty("person_id")]
    public Guid PersonId => this.Id;

    [JsonProperty("birth_date")]
    public DateTime? BirthDate { get; set; }

    [JsonProperty("first_name")]
    public string FirstName { get; set; }

    [JsonProperty("last_name")]
    public string LastName { get; set; }

    [JsonProperty("full_name")]
    public string FullName => $"{this.FirstName} {this.LastName}";

    [JsonProperty("phone")]
    public string Phone { get; set; }

    [JsonProperty("user_name")]
    public string UserName { get; set; }

    [JsonProperty("email")]
    public string Email { get; set; }

    [JsonProperty("avatar")]
    public string Avatar { get; set; }
}

These are the documents that we will write to the Cosmos DB database and send as events to Azure Event Grid.

Edit the file local.settings.json as follows:

{
    "IsEncrypted": false,
    "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "AzureCosmosDb:ConnectionString": "YOUR_COSMOS_DB_CONNECTION_STRING",
        "AzureCosmosDb:DatabaseName": "YOUR_COSMOS_DB_DATABASE_NAME",
        "AzureCosmosDb:CollectionName": "persons",
        "AzureCosmosDb:LeaseCollectionName": "persons-leases",
        "AzureEventGrid:TopicKey": "YOUR_EVENT_GRID_TOPIC_KEY",
        "AzureEventGrid:TopicEndpoint": "YOUR_EVENT_GRID_TOPIC_ENDPOINT",
        "AzureStorage:ConnectionString": "YOUR_AZURE_STORAGE_CONNECTION_STRING"
    }
}

Replace the values that start with YOUR with the values from your Azure resources.

Create the Http Trigger to Populate Cosmos DB Database

Add an Http Trigger called HttpTrigger1.cs.

Paste into your class the code snippet as follows:

public static class HttpTrigger1
{
    public const int MAXIMUM_RECORDS = 10;

    [FunctionName("HttpTrigger1")]
    [SuppressMessage("Style", "IDE0060:Remove unused parameter", Justification = "Nothing needs to be passed")]
    public static async Task<IActionResult> Run(
        [HttpTrigger(AuthorizationLevel.Function, "post", Route = null)] HttpRequest req,
        ILogger logger,
        [CosmosDB(
            databaseName: "%AzureCosmosDb:DatabaseName%",
            collectionName: "%AzureCosmosDb:CollectionName%",
            ConnectionStringSetting = "AzureCosmosDb:ConnectionString")]
            IAsyncCollector<Person> persons)
    {
        logger.LogInformation("HttpTrigger1 function processed a request.");

        for (var i = 0; i < MAXIMUM_RECORDS; i++)
        {
            await persons.AddAsync(
                new Faker<Person>()
                    .RuleFor(o => o.Id, f => Guid.NewGuid())
                    .RuleFor(o => o.BirthDate, f => f.Person.DateOfBirth)
                    .RuleFor(o => o.Avatar, f => f.Person.Avatar)
                    .RuleFor(o => o.FirstName, f => f.Person.FirstName)
                    .RuleFor(o => o.LastName, f => f.Person.LastName)
                    .RuleFor(o => o.Phone, f => f.Person.Phone)
                    .RuleFor(o => o.UserName, f => f.Person.UserName)
                    .RuleFor(o => o.Email, f => f.Person.Email)
                .Generate());
        }

        return new OkResult();
    }
}

Install the NuGet package Bogus.

This function will insert 10 person documents into the Cosmos DB database that will trigger the Cosmos DB Trigger, which we will create in the next step.

Create the Cosmos DB Trigger to Read from Cosmos DB Change Feed

Add a Cosmos DB Trigger called CosmosDBTrigger1.cs and paste the code snippet into the call as follows:

public static class CosmosDBTrigger1
{
    private static HttpClient _httpClient;

    [FunctionName("CosmosDBTrigger1")]
    public static async Task Run([CosmosDBTrigger(
        databaseName: "%AzureCosmosDb:DatabaseName%",
        collectionName: "%AzureCosmosDb:CollectionName%",
        MaxItemsPerInvocation = 10,
        ConnectionStringSetting = "AzureCosmosDb:ConnectionString",
        CreateLeaseCollectionIfNotExists = true,
        LeaseCollectionName = "%AzureCosmosDb:LeaseCollectionName%")]IReadOnlyList<Document> documents,
        ILogger logger)
    {
        logger.LogInformation("CosmosDBTrigger1 trigger fired.");

        if (_httpClient == null)
        {
            _httpClient =
                new HttpClient(new HttpRetryMessageHandler(logger, new HttpClientHandler()));
        }

        if (documents != null)
        {
            logger.LogInformation($"Received {documents.Count} document(s) from Cosmos DB.");

            var eventGridEventList =
                new List<EventGridEvent>();

            var topicCredentials =
                new TopicCredentials(
                    Environment.GetEnvironmentVariable("AzureEventGrid:TopicKey"));

            var eventGridClient =
                new EventGridClient(
                    topicCredentials, _httpClient, false);

            foreach (var document in documents)
            {
                var eventGridEvent =
                    new EventGridEvent()
                    {
                        Id = Guid.NewGuid().ToString(),
                        Subject = $"/persons/{document.GetPropertyValue<Guid>("person_id")}",
                        EventType = "Person.Added",
                        Data = document,
                        EventTime = DateTime.Now,
                        DataVersion = "1.0"
                    };

                eventGridEventList.Add(
                    eventGridEvent);
            }
            
            logger.LogInformation($"Sending a batch of {documents.Count} event(s) to Azure Event Grid for processing.");

            await eventGridClient.PublishEventsAsync(
               new Uri(Environment.GetEnvironmentVariable("AzureEventGrid:TopicEndpoint")).Host,
               eventGridEventList);
        }
    }
}

This Azure Function will respond to any adds or updates from the persons container in Cosmos DB.

Install the NuGet package Microsoft.Azure.EventGrid.

We will want to provide our own HttpClient as opposed to letting the EventGridClient manage it for us, if we don’t, we could run into Socket Exceptions.

See Managing Connections for more information.

Since Event Grid does not support retry logic out of the box, we add support ourselves using Polly.

Install the NuGet package Polly.

Add another class, HttpRetryMessageHandler.cs, to the Helpers folder and paste the code snippet into the class as follows:

public class HttpRetryMessageHandler : DelegatingHandler
{
    [System.Diagnostics.CodeAnalysis.SuppressMessage("Style", "IDE1006:Naming Styles", Justification = "<Pending>")]
    private const int MAXIMUM_RETRY_ATTEMPT = 3;
    private readonly ILogger _logger;

    public HttpRetryMessageHandler(
        ILogger logger,
        HttpClientHandler handler) : base(handler)
    {
        _logger = logger;
    }

    protected override Task<HttpResponseMessage> SendAsync(
        HttpRequestMessage request,
        CancellationToken cancellationToken) =>

        // Will retry again at the 3 second, 9 second and 27 second mark before throwing an exception

        Policy
            .Handle<HttpRequestException>()
            .Or<TaskCanceledException>()
            .OrResult<HttpResponseMessage>(x => !x.IsSuccessStatusCode)
            .WaitAndRetryAsync(
                MAXIMUM_RETRY_ATTEMPT,
                retryAttempt => TimeSpan.FromSeconds(Math.Pow(MAXIMUM_RETRY_ATTEMPT, retryAttempt)),
                onRetry: (exception, nextRetry, context) =>
                {
                    _logger.LogInformation($"An error was encountered, will retry again in {nextRetry.TotalSeconds} second(s)...");
                })
            .ExecuteAsync(() => base.SendAsync(request, cancellationToken));
}

This class creates a retry policy with an exponential back-off.

  • After the first failure the code will wait 3 seconds before it tries to execute again.
  • After the second failure the code will wait 9 seconds.
  • After the third failure it will wait 27 seconds.
  • After the fourth failure it will throw an Exception.

Run the Azure Function project.

Call the Http Trigger Azure Function, using Postman, with a POST request to http://localhost:7071/api/HttpTrigger1 to insert 10 person documents into Cosmos DB.

Let’s create the Event Grid Subscription.

Navigate to the Event Grid Topic and click Add Subscription.

Enter a name.

Select Storage Queues for Endpoint Type.

Click Select an endpoint.

Select the Storage Account and Storage Queue that was created earlier.

Click Create.

Call the Http Trigger Azure Function to insert 10 person documents into Cosmos DB.

Open up Azure Storage Explorer.

Navigate to the Storage Account we created earlier, expand Queues and click on the Storage Queue.

You should see a list of the Cosmos DB documents that were sent to Event Grid and forwarded to the Storage Queue subscription.

One thing to note, our Cosmos DB Trigger Azure Function is sending events to Azure Event Grid in batches, as Azure Event Grid expects an array of events.

When our events reach Azure Event Grid, they are debatched and sent to our endpoints individually.

Part 2

Now let’s add an Azure Function to receive events from our Event Grid Topic.

Add a Event Grid Trigger called EventGridTrigger1.cs.

Paste the code snippet below into the EventGridTrigger1.cs class as follows:

public static class EventGridTrigger1
{
    [FunctionName("EventGridTrigger1")]
    public static void Run(
        [EventGridTrigger]EventGridEvent eventGridEvent,
        ILogger logger)
    {
        logger.LogInformation("EventGridTrigger1 function triggered.");

        logger.LogInformation(eventGridEvent.Data.ToString());
    }
}

Since it is difficult to test the Event Grid Azure Function locally, let’s publish the Function App to Azure.

Make sure to include your Application Settings by clicking Edit Azure App Service Settings.

In the Azure portal, navigate to the Function App and select the EventGridTrigger1 function.

Click the Add Event Grid subscription link.

Provide a name for the subscription.

Select Event Grid Topic for the Topic Type.

Select the subscription, resource group, and finally the Azure Event Grid Topic.

Click Create.

If you navigate to your Event Grid Topic you will now see two Event Grid Subscription, the first to our Azure Storage Queue and the second to our Event Grid Trigger Azure Function.

Before we test this locally, we will need to disable the CosmosDBTrigger1 from the portal, if we don’t, there is a chance, it may respond to our Cosmos DB changes before our local environment can.

Run the Azure Function project.

Call the Http Trigger Azure Function to insert 10 person documents into Cosmos DB.

Navigate back to the Azure Function App, select the EventGridTrigger1 and then select the Monitor node.

You should see a list of the events received from Azure Event Grid.

Part 3

So what happens if we are unable to send an event to Azure Event Grid? Remember our retry policy will try three times and on the final attempt it will throw an Exception.

If we cannot send an event to Azure Event Grid we will write the event to Azure Blob Storage so it can be processes later, maybe by a Time Trigger Azure Function.

We will need to update the CosmosDBTrigger1 with a couple of code changes.

Update the Run method to include a parameter for the Blob output binding, I added mine right after ILogger logger.

[Blob("cosmosdbtrigger1-errors", FileAccess.Write, Connection = "AzureStorage:ConnectionString")] CloudBlobContainer blobContainer

We will also want to wrap the eventGridClient.PublishEventSync() method in a try...catch, and in the catch add the code to write the event to Azure Blob Storage.

try
{
    logger.LogInformation(
        $"Sending a batch of {documents.Count} event(s) to Azure Event Grid for processing.");

    await eventGridClient.PublishEventsAsync(
       new Uri(Environment.GetEnvironmentVariable("AzureEventGrid:TopicEndpoint")).Host,
       eventGridEventList);
}
catch
{
    logger.LogInformation(
        $"Logging a batch of {documents.Count} event(s) to Azure Blob Storage that were not able to be sent to Azure Event Grid.");

    var cloudBlockBlob =
        blobContainer.GetBlockBlobReference($"{Guid.NewGuid()}.json");

    cloudBlockBlob.Properties.ContentType =
        "application/json";

    await cloudBlockBlob.UploadTextAsync(
        JsonConvert.SerializeObject(eventGridEventList, Formatting.Indented));

    throw;
}	

We will throw the Exception again so that Application Insights tracks the exception.

To simulate a failure to call Azure Event Grid update the value for AzureEventGrid:TopicEndpoint in local.settings.json to a bad value, I just add an additional character.

Create a Blob container called cosmsosdbtrigger-errors to capture the events.

Run the Azure Function project.

Call the Http Trigger Azure Function to insert 10 person documents into Cosmos DB.

Open Azure Storage Explorer.

Navigate to the Blob storage container cosmsosdbtrigger-errors.

You should see a list of the events sitting in Azure Blob Storage.

Apologize for the long article, but felt it best to pack it all together as I have been working better practices for retry logic for sending to Event Grid in an Azure Function, and fail over logic.

Feedback is much appreciated!

Leave a Reply

Your email address will not be published.