MongoDB.Messaging - MongoDB Messaging Library

Overview

The MongoDB Messaging library is a lightweight queue pub/sub processing library based on MongoDB data store.

Features

  • Easy to use Fluent API
  • Self creating and cleaning of Queues
  • Configurable message expiration
  • Generic data payload
  • Trigger processing from oplog change monitoring
  • Configurable auto retry on error
  • Message processing timeout
  • Scalable via subscriber worker count
  • Supports distributed locks

Download

The MongoDB.Messaging library is available on nuget.org via package name MongoDB.Messaging.

To install MongoDB.Messaging, run the following command in the Package Manager Console

PM> Install-Package MongoDB.Messaging

NuGet Version

Concepts

Queue

A queue is equivalent to a MongoDB collection. The name of the queue will match the MongoDB collection name.

Queue names must be alphanumeric, without spaces or symbols.

It is a good practice to suffix the queue name with Queue.

Message

A message is the high level object that is a generic definition of a messages. The message contains processing level information. The message object is automatically created and updated by the Fluent API and should not be updated directly by the publisher or subscriber.

Data

Data is the message payload to be processed with the message. Use data to pass information you need to process the message.

The data object must be serializable by MongoDB driver.

It is a good practice to have one queue per data object being passed to limit confusion and to maintain simplicity when subscribing to the queue.

Publish

Publishing a message adds the message with the corresponding data to a queue for processing.

Subscribe

In order to process a message on a queue, an application needs to subscribe to a queue. There can be many subscribers to a queue to scale the load across processes. A subscriber can also set the worker count to scale the number of processing threads for that subscriber.

The framework ensures that only one subscriber can process a messages.

Queue Configuration

The queue configuration is used to set default values on messages published to a queue.

An example of using the fluent api to configure the sleep queue.

MessageQueue.Default.Configure(c => c
    .Connection("MongoMessaging")
    .Queue(s => s
        .Name(SleepMessage.QueueName)
        .Priority(MessagePriority.Normal)
        .ResponseQueue("ReplyQueueName")
        .Retry(5)
    )
);

Properties

Connection is the app.config connection string name used to connect to MongoDB.

Name is the name of the queue to configure.
Retry is the number of times the message should be retried on error. Set to zero, default, to not retry.
Priority is the default priority to publish the message with.
ResponseQueue is the name of the queue where responses should be sent.

Publish Message

To publish a message to a queue, use the fluent api.

var message = await MessageQueue.Default.Publish(m => m
    .Queue(SleepMessage.QueueName)
    .Data(sleepMessage)
    .Correlation("321B4671-3B4C-4B97-8E81-D6A8CF22D4F0")
    .Description("User friendly description of the message")
    .Priority(MessagePriority.Normal)
    .Retry(1)
);

Properties

Required
Queue is the name of the queue to publish to.
Data is the object to pass in the message. Used to process the message by the subscriber.

Optional
Correlation is an identifier used to link messages together.
Description is a user friendly description of the message.

Overrides
Retry is the number of times the message should be retried on error.
Priority is the default priority to publish the message with.
ResponseQueue is the name of the queue where responses should be sent.

Notes

  • When setting the Data property, the message Name will be set to the Type name of the data object.
  • When setting the Data property and Description hasn’t been set, the data object ToString() value will be set as the description.
  • If the underlying storage collection doesn’t exist, it will be created on first publish

Subscribe to Message

To subscribe to a queue, use the fluent api. The subscribe handler must implement IMessageSubscriber.

MessageQueue.Default.Configure(c => c
    .Connection("MongoMessaging")
    .Subscribe(s => s
        .Queue(SleepMessage.QueueName)
        .Handler<SleepHandler>()
        .Workers(4)
    )
);

To speed up processing, you can monitor the oplog for changes to trigger processing. The connection must have access to local.oplog.rs

MessageQueue.Default.Configure(c => c
    .Connection("MongoMessaging")
    .Subscribe(s => s
        .Queue(SleepMessage.QueueName)
        .Handler<SleepHandler>()
        .Workers(4)
        .Trigger()
    )
);

Properties

Required
Queue is the name of the queue to publish to.
Handler is the class that implements IMessageSubscriber. This is what processes the message.

Optional
Workers is the number of worker processes.
ExpireError is how long to keep error messages.
ExpireWarning is how long to keep warning messages.
ExpireSuccessful is how long to keep successful messages.
PollTime is the amount of time between work polling. If using Triggger, set to longer time. Retry is a class that implements IMessageRetry. IMessageRetry controls if an error message should be retried.
Timeout is the amount of time before a processing message times out.
TimeoutAction is how to handle timed out messages. Options are Fail or Retry.
Trigger to enable monitoring of the oplog for changes to trigger processing.

Message Service

In order for the message subscribers to process messages off queue, the MessageService needs to be created and Start called. Note, the MessageService.Stop() method tries to gracefully stop by waiting for active processes to finish.

_messageService = new MessageService();

// on service or application start
_messageService.Start();

// on service stop.  
_messageService.Stop();

IMessageSubscriber Interface

The following is a sample implementation of IMessageSubscriber

public class SleepHandler : IMessageSubscriber
{
    public MessageResult Process(ProcessContext context)
    {
        // get message data
        var sleepMessage = context.Data<SleepMessage>();

        // Do processing here

        return MessageResult.Successful;
    }

    public void Dispose()
    {
        // free resources
    }
}

IMessageRetry Interface

The IMessageRetry interface allows for customization of the retry of failed messages.

The following is the default implementation of IMessageRetry

public class MessageRetry : IMessageRetry
{
    public virtual bool ShouldRetry(ProcessContext processContext, Exception exception)
    {
        // get current message 
        var message = processContext.Message;

        // true to retry message
        return message.ErrorCount < message.RetryCount;
    }

    public virtual DateTime NextAttempt(ProcessContext processContext)
    {
        var message = processContext.Message;

        // retry weight, 1 = 1 min, 2 = 30 min, 3 = 2 hrs, 4+ = 8 hrs
        if (message.ErrorCount > 3)
            return DateTime.Now.AddHours(8);

        if (message.ErrorCount == 3)
            return DateTime.Now.AddHours(2);

        if (message.ErrorCount == 2)
            return DateTime.Now.AddMinutes(30);

        // default
        return DateTime.Now.AddMinutes(1);
    }
}

Process Locks

The library has supports distributed locks. The following are the lock types supported.

DistributedLock Distributed Lock manager provides synchronized access to a resources over a network
ThrottleLock Throttle Lock Manager controls how frequent a process can run

This is an example of using the DistributedLock.

var lockName = "PrintMessage";

// get MongoDB collection to store lock
var collection = GetCollection();

// create lock with timeout, max time it will wait for lock, of 5 minutes
var locker = new DistributedLock(collection, TimeSpan.FromMinutes(5));

// acquire lock; if can't, it will retry to get lock up to timeout value
var result = locker.Acquire(lockName);
if (!result)
    return; // acquire lock timeout

try
{
    // do processing here

}
finally
{
    // release lock
    locker.Release(lockName);
}

DataGenerator - Generate Intelligent and Realistic Test Data

Features

  • Generate intelligent test data based on property type and name
  • Automatic discovery of data sources
  • Fully customizable property data sources
  • Realistic data sources
  • Weighted value selection
  • Easy fluent API

Download

The DataGenerator library is available on nuget.org via package name DataGenerator.

To install DataGenerator, run the following command in the Package Manager Console

PM> Install-Package DataGenerator

NuGet Version

Configuration

Full class property configuration

Generator.Default.Configure(c => c
    .Entity<User>(e =>
    {
        e.Property(p => p.FirstName).DataSource<FirstNameSource>();
        e.Property(p => p.LastName).DataSource<LastNameSource>();
        e.Property(p => p.Address1).DataSource<StreetSource>();
        e.Property(p => p.City).DataSource<CitySource>();
        e.Property(p => p.State).DataSource<StateSource>();
        e.Property(p => p.Zip).DataSource<PostalCodeSource>();
        
        e.Property(p => p.Note).DataSource<LoremIpsumSource>();
        e.Property(p => p.Password).DataSource<PasswordSource>();
        
        // array of values
        e.Property(p => p.Status).DataSource(new[] { Status.New, Status.Verified });
        
        
        // don't generate
        e.Property(p => p.Budget).Ignore();
        
        // static value
        e.Property(p => p.IsActive).Value(true);
        
        // delegate value
        e.Property(p => p.Created).Value(() => DateTime.Now);
    })
);

Example of configuration for generating child classes

Generator.Default.Configure(c => c
    .Entity<Order>(e =>
    {
        e.AutoMap();
        // generate a User instance
        e.Property(p => p.User).Single<User>();
        // generate list of OrderLine items
        e.Property(p => p.Items).List<OrderLine>(2);
    })
    .Entity<OrderLine>(e =>
    {
        e.AutoMap();
        e.Property(p => p.Quantity).IntegerSource(1, 10);
    })
);

There are extension methods to configure properties as well

Generator.Default.Configure(c => c
    .Entity<OrderLine>(e =>
    {
        // random number between 1 and 10
        e.Property(p => p.Quantity).IntegerSource(1, 10);
        // between 100 and 1,000
        e.Property(p => p.UnitAmount).DecimalSource(100, 1000);
    })
);

Profiles

DataGenerator support class profiles to make configuration easier. To create a profile, inherit from the MappingProfile<T> base class.

Sample Profile for the User class

public class UserProfile : MappingProfile<User>
{
    public override void Configure()
    {
        Property(p => p.FirstName).DataSource<FirstNameSource>();
        Property(p => p.LastName).DataSource<LastNameSource>();
        Property(p => p.Address1).DataSource<StreetSource>();
        Property(p => p.City).DataSource<CitySource>();
        Property(p => p.State).DataSource<StateSource>();
        Property(p => p.Zip).DataSource<PostalCodeSource>();
        
        Property(p => p.Note).DataSource<LoremIpsumSource>();
        Property(p => p.Password).DataSource<PasswordSource>();
        
        // array of values
        Property(p => p.Status).DataSource(new[] { Status.New, Status.Verified });
        
        
        // don't generate
        Property(p => p.Budget).Ignore();
        
        // static value
        Property(p => p.IsActive).Value(true);
        
        // delegate value
        Property(p => p.Created).Value(() => DateTime.UtcNow);
    }
}

Register a profile in the configuration

Generator.Default.Configure(c => c
  .Profile<UserProfile>()
);

Generation

Generate test data

// generate a user
var instance = Generator.Default.Single<User>();

// generate 10 users
var users = Generator.Default.List<User>(10)

You can override the configuration

var instance = Generator.Default.Single<User>(c =>
{
    // override note property with static value
    c.Property(p => p.Note).Value("Testing static value");
});

Data Sources

Primitive Value Data Sources

BooleanSource - Random true or false
DateTimeSource - Random date plus or minus 10 years from now
DecimalSource - Random decimal between 0 and 1,000,000
FloatSource - Random float between 0 and 1,000,000
GuidSource - Random GUID value
IntegerSource - Random integer between 0 and 32,000
ListDataSource - Random value from the specified list
TimeSpanSource - Random TimeSpan between 0 sec and 1 day
ValueSource - Static value source

Smart Data Sources

CitySource - Random city name from a list of the largest US cities
CompanySource - Random company name from a list of fortune 500 companies
CreditCardSource - Random credit care number
EmailSource - Random email address using common domains
EnumSource - Random value from available enum values
FirstNameSource - Random first name from 100 common first names
IdentifierSource - Random identifier value
LastNameSource - Random last name from 100 common last names
LoremIpsumSource - Random lorem ipsum text
MoneySource - Random dollar amount between 0 and 10,000
NameSource - Random code name from various sources
PasswordSource - Random pronounceable password
PhoneSource - Random phone number in US format
PostalCodeSource - Random US zip code
SocialSecuritySource - Random US Social Security Number
StateSource - Random US State
StreetSource - Random US house number and street
WebsiteSource - Random website from top 100 list


FluentRest - Lightweight fluent wrapper over HttpClient

Features

  • Fluent request building
  • Fluent form building
  • Automatic deserialization of response
  • Plugin different serialization
  • Fake HTTP responses

Download

The FluentRest library is available on nuget.org via package name FluentRest.

To install FluentRest, run the following command in the Package Manager Console

PM> Install-Package FluentRest

NuGet Version

Fluent Request

Create a form post request

var client = new FluentClient();
client.BaseUri = new Uri("http://echo.jpillora.com/", UriKind.Absolute);

var result = await client.PostAsync<EchoResult>(b => b
    .AppendPath("Project")
    .AppendPath("123")
    .FormValue("Test", "Value")
    .FormValue("key", "value")
    .QueryString("page", 10)
);

Custom authorization header

var client = new FluentClient();
client.BaseUri = new Uri("https://api.github.com/", UriKind.Absolute);

var result = await client.GetAsync<Repository>(b => b
    .AppendPath("repos")
    .AppendPath("loresoft")
    .AppendPath("FluentRest")
    .Header(h => h.Authorization("token", "7ca..."))
);

Fake Response

FluentRest has the ability to fake an HTTP responses by using a custom HttpClientHandler. Faking the HTTP response allows creating unit tests without having to make the actual HTTP call.

Fake Response Stores

Fake HTTP responses can be stored in the following message stores. To create your own message store, implement IFakeMessageStore.

MemoryMessageStore

The memory message store allows composing a JSON response in the unit test. Register the responses on the start of the unit test.

Register a fake response by URL.

MemoryMessageStore.Current.Register(b => b
    .Url("https://api.github.com/repos/loresoft/FluentRest")
    .StatusCode(HttpStatusCode.OK)
    .ReasonPhrase("OK")
    .Content(c => c
        .Header("Content-Type", "application/json; charset=utf-8")
        .Data(responseObject) // object to be JSON serialized
    )
);

Use the fake response in a unit test

var serializer = new JsonContentSerializer();

// use memory store by default
var fakeHttp = new FakeMessageHandler();

var client = new FluentClient(serializer, fakeHttp);
client.BaseUri = new Uri("https://api.github.com/", UriKind.Absolute);

// make HTTP call
var result = await client.GetAsync<Repository>(b => b
    .AppendPath("repos")
    .AppendPath("loresoft")
    .AppendPath("FluentRest")
    .Header(h => h.Authorization("token", "7ca..."))
);

FileMessageStore

The file message store allows saving an HTTP call response on the first use. You can then use that saved response for all future unit test runs.

Configure the FluentRest to capture response.

var serializer = new JsonContentSerializer();

// use file store to load from disk
var fakeStore = new FileMessageStore();
fakeStore.StorePath = @".\GitHub\Responses";

var fakeHttp = new FakeMessageHandler(fakeStore, FakeResponseMode.Capture);

var client = new FluentClient(serializer, fakeHttp);
client.BaseUri = new Uri("https://api.github.com/", UriKind.Absolute);

var result = await client.GetAsync<Repository>(b => b
    .AppendPath("repos")
    .AppendPath("loresoft")
    .AppendPath("FluentRest")
    .Header(h => h.Authorization("token", "7ca..."))
);

Use captured response

var serializer = new JsonContentSerializer();

// use file store to load from disk
var fakeStore = new FileMessageStore();
fakeStore.StorePath = @".\GitHub\Responses";

var fakeHttp = new FakeMessageHandler(fakeStore, FakeResponseMode.Fake);

var client = new FluentClient(serializer, fakeHttp);
client.BaseUri = new Uri("https://api.github.com/", UriKind.Absolute);

var result = await client.GetAsync<Repository>(b => b
    .AppendPath("repos")
    .AppendPath("loresoft")
    .AppendPath("FluentRest")
    .Header(h => h.Authorization("token", "7ca..."))
);

EstimatorX - Project Estimation Application

EstimatorX

A simple project estimation application.
http://estimatorx.com

Project Tasks

Features

Projects

A project contains all the details that make up an estimate. An estimate is broken down into Assumptions, Factors and Tasks. The estimate is padded with a contingency rate.

Assumptions

When making an estimate, there are assumptions the estimator makes to come up with the estimate. Document those assumptions to help raise the red flag in the future when an assumptions proves not to be true.

Factors

Factors are a type of task with hours associated with the complexity of that task. Factors allow the estimator to state hours based on a specific type of task, regardless of the project.

Tasks

Tasks are a specific item or feature of the project being estimated. A task is assigned a Factor. The estimator enters the number of tasks per complexity level.

Tasks are part of a Section. Sections are a way to group a set of common features. Tasks totals are also rolled up to the section level.

Reports

Project estimates can be displayed as a simple report. The estimator can create a public shared link to allow anonymous view access to the report. The report can also be downloaded as a PDF.

Contingency

Contingency percentage rate is the confidence level in the information used to create the estimate. The contingency percentage rate is used to padded the estimate with a percentage rate.

Complexity

Tasks and Factors use following complexity scale.

  • Very Simple - Task is trivial, owner knows how to solve the problem and can be done quickly.
  • Simple - Owner knows how to solve the problem.
  • Medium - Owner needs to do a little bit of research to solve the problem, but the resulting solution is not complex.
  • Complex - Task needs research and some clarification on details. Resulting solution is not trivial.
  • Very Complex - Task needs research and clarification. Resulting solution requires significant new work or change.

Organizations

Projects and Templates are placed in an organization. All members of the organization can edit the Project or Template.

Select ‘Private’ to make the Project or Template accessible by only you.

Templates

A template is a group of factors you can quickly add to a project. Templates allow reuse of common factors across projects.


FluentCommand - Fluent Wrapper for DbCommand

Fluent Wrapper for DbCommand

Download

The FluentCommand library is available on nuget.org via package name FluentCommand.

To install FluentCommand, run the following command in the Package Manager Console

PM> Install-Package FluentCommand

Features

  • Fluent wrapper over DbConnection and DbCommand
  • Callback for parameter return values
  • Automatic handling of connection state
  • Caching of results
  • Automatic creating of entity from DataReader
  • Create Dynamic objects from DataReader
  • Handles multiple result sets

Example

Query all users with email domain. Entity is automaticly created from DataReader.

string email = "%@battlestar.com";
string sql = "select * from [User] where EmailAddress like @EmailAddress";

List<User> users;
using (var session = new DataSession("Tracker").Log(Console.WriteLine))
{
    users = session            
        .Sql(sql)
        .Parameter("@EmailAddress", email)
        .Query<User>();
}

Execute a stored procedure with out parameters

Guid userId = Guid.Empty;
int errorCode = -1;

var username = "test." + DateTime.Now.Ticks;
var email = username + "@email.com";

int result;
using (var session = new DataSession("AspNet").Log(Console.WriteLine))
{
    result = session.StoredProcedure("[dbo].[aspnet_Membership_CreateUser]")
        .Parameter("@ApplicationName", "/")
        .Parameter("@UserName", username)
        .Parameter("@Password", "T@est" + DateTime.Now.Ticks)
        .Parameter("@Email", email)
        .Parameter("@PasswordSalt", "test salt")
        .Parameter<string>("@PasswordQuestion", null)
        .Parameter<string>("@PasswordAnswer", null)
        .Parameter("@IsApproved", true)
        .Parameter("@CurrentTimeUtc", DateTime.UtcNow)
        .Parameter("@UniqueEmail", 1)
        .Parameter("@PasswordFormat", 1)
        .ParameterOut<Guid>("@UserId", p => userId = p)
        .Return<int>(p => errorCode = p)
        .Execute();
}

Query for user by email address. Also return Role and Status entities.

string email = "kara.thrace@battlestar.com";
string sql = "select * from [User] where EmailAddress = @EmailAddress; " +
             "select * from [Status]; " +
             "select * from [Priority]; ";

User user = null;
List<Status> status = null;
List<Priority> priorities = null;

using (var session = new DataSession("Tracker").Log(Console.WriteLine))
{
    session.Sql(sql)
        .Parameter("@EmailAddress", email)
        .QueryMultiple(q =>
        {
            user = q.QuerySingle<User>();
            status = q.Query<Status>().ToList();
            priorities = q.Query<Priority>().ToList();
        });
}