Posts Tagged ‘Parallelism’

4
Sep

PowerThreading library on NuGet

I like NuGet because it removes the hassle of downloading and installing libraries. And sadly only small number of vendors is providing just package without installer. And I also like parallel/multithreaded programming. Crucial tool in my toolbox for parallel programming is Jeffrey Richter’s (Wintellect) PowerThreading library. The library is great and contains a lot of smart ideas [1] and some handy objects too. And it’s completely free.

Sadly it was not on NuGet. Hence I took the challenge and created the package. It’s called simply PowerThreading. Don’t take me wrong, Jeffrey is doing great job with this library and I’m and will be happy to maintain the package. He’s too smart to waste his time on this; spending rather the time on improving it, is more valuable.

[1] In fact the async/await feature in C# 5 uses similar concept as AsyncEnumerator.

10
Sep

Multigenerational architecture (in databases) and immutable structures (from functional programming)

I’ve just realized something interesting.

Right now we’re kind of experiencing the renaissance of functional programming and some (well many) functional concepts are heading into mainstream programming languages.

One of the important concepts in functional programming is, that everything is immutable, hence if you want to change it, you need to create new item with new value(s). Consequently this is very good for parallel programming, as when nothing can change implies there’s no shared state.

And one of the segments, where parallel programming is used heavily is database systems. This brings me to my point. You know MGA/MVCC is used in Firebird, but only in it, also with some modifications in Oracle Database, MS SQL Server, Postgres, …, in fact many todays modern RDBMSs. But the concept of MGA is actually idea of using immutable data structures. Yes, it uses some additional concepts to fully support ACID and scale well in some particular cases etc., but the core idea is same.

Isn’t it nice? Sometimes you know two things and then suddenly you realize both are based on same idea and are basically same. Connecting dots…

4
Aug

Projection (select) on a collection running in parallel with exceptions handling

Few days ago I posted an extension method to run projection on a collection in parallel. The method has one problem. It’s not dealing with exceptions. And because the ordering wasn’t (and isn’t) implicitly preserved, I did this small improvement.

Right now the method returns simple structure with original item, the result (if no exception occured, sure) and exception (if any). I didn’t went to AggregateException (although you can modify the code yourself to use it). Now you can decide while consuming what to do when exception occurred. Adding some kind of cancellation shouldn’t be difficult.

The idea behind is the same as in previous version.

#region ParallelProjection
internal struct ParallelProjectionResult<TSource, TResult>
{
	public TSource Item { get; set; }
	public TResult Result { get; set; }
	public Exception Exception { get; set; }
}

internal static IEnumerable<ParallelProjectionResult<TSource, TResult>> ParallelProjection<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, TResult> projection, int maxParallelism)
{
	BlockingCollection<ParallelProjectionResult<TSource, TResult>> results = new BlockingCollection<ParallelProjectionResult<TSource, TResult>>();

	ThreadPool.QueueUserWorkItem((o) =>
	{
		Semaphore semaphore = new Semaphore(maxParallelism, maxParallelism);
		CountdownEvent countdown = new CountdownEvent(1);
		try
		{
			foreach (var item in source)
			{
				countdown.AddCount();
				semaphore.WaitOne();
				ThreadPool.QueueUserWorkItem(
					(element) =>
					{
						TSource e = (TSource)element;
						ParallelProjectionResult<TSource, TResult> result = new ParallelProjectionResult<TSource, TResult>();
						result.Item = e;
						try
						{
							result.Result = projection(e);
						}
						catch (Exception ex)
						{
							result.Exception = ex;
						}
						results.Add(result);
						semaphore.Release();
						countdown.Signal();
					},
					item);
			}
			countdown.Signal();
			countdown.Wait();
			results.CompleteAdding();
		}
		finally
		{
			if (countdown != null)
				countdown.Dispose();
			if (semaphore != null)
				semaphore.Dispose();
		}
	}, null);

	return results.GetConsumingEnumerable();
}
#endregion
2
Aug

Projection (select) on a collection running in parallel

Here’s updated version of the method.

I have here another not-general-purpose-parallel/multihtreaded-method. :) To make a long story short I needed do some transformation on collection’s elements, aka projection. Unfortunately the method I was plugging in was doing some network requests, in fact couple of requests. Sequentially, blocking processing until the response came back. I know a proper way will be to turn these requests into asynchronous, unluckily this was part of bigger architecture I could not change. I didn’t want to use AsParallel method as I expected a need for more control maybe sometime later. So I solved it abusing ThreadPool threads. Bad for scheduler and memory, as I’ll be wasting threads and resources, blocking, until reply is sent by server, but very easy for me. I told you, abusing. ;)

So I came with this method. It’s utilizing new .NET Framework 4 concurrent collections, BlockingCollection in particular as it’s great for producer-consumer scenario and I want the method to return results whenever one is done (that also implies the ordering isn’t preserved).

internal static IEnumerable<TResult> ParallelProjection<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, TResult> projection, int maxParallelism)
{
	BlockingCollection<TResult> results = new BlockingCollection<TResult>();

	ThreadPool.QueueUserWorkItem((o) =>
	{
		Semaphore semaphore = new Semaphore(maxParallelism, maxParallelism);
		CountdownEvent countdown = new CountdownEvent(1);
		try
		{
			foreach (var item in source)
			{
				countdown.AddCount();
				semaphore.WaitOne();
				ThreadPool.QueueUserWorkItem(
					(element) =>
					{
						results.Add(projection((TSource)element));
						semaphore.Release();
						countdown.Signal();
					},
					item);
			}
			countdown.Signal();
			countdown.Wait();
			results.CompleteAdding();
		}
		finally
		{
			if (countdown != null)
				countdown.Dispose();
			if (semaphore != null)
				semaphore.Dispose();
		}
	}, null);

	return results.GetConsumingEnumerable();
}

The method is straightforward, a lot of work was saved using the smart blocking collection. I’m simply reading items from the collection and applying the function to them. To not overload the system with huge number of threads I also added maxParallelism parameter. When this number of threads is processing items, I’ll stop scheduling more, using Semaphore, until some are done and again available. When there’s no item in source collection available and all item were processed I call CompleteAdding method to say I’m done and there will be no other items. Here I’m using CountdownEvent class initialized to 1 as you can’t, of course, add items if it reaches 0. Before final Wait I’m subtracting one to compensate this.

And that’s it. Again, it’s not general purpose method. Use with care, it may bring you even worse performance if wrongly used.

31
Jul

Array of WaitHandles (ManualResetEvent, AutoResetEvent, …) when waiting for operations to complete …

Often, when you discover the beauty of multithreading and parallelism, you find a need to run some operations in parallel and wait for completion. Fairly common scenario. Although now, with .NET Framework 4, you can write it using Task Parallel Library‘s Parallel.Invoke, there are scenarios when you need to plug it in into some other methods/parameters, so you’ll do it yourself explicitly with threads or better to say ThreadPool.

The method I see from time to time looks basically like this:

void DoSomethingExample()
{
	int numberOfActions = 10;
	ManualResetEvent[] mres = new ManualResetEvent[numberOfActions];
	for (int i = 0; i < numberOfActions; i++)
	{
		mres[i] = new ManualResetEvent(false);
		ThreadPool.QueueUserWorkItem((o) =>
			{
				Thread.SpinWait(20000000);
				(o as ManualResetEvent).Set();
			},
			mres[i]);
	}
	ManualResetEvent.WaitAll(mres);
}

Though it’s not wrong, except the ManualResetEvents are not Disposed, it’s suboptimal. You’re wasting resources creating array of these objects.

But if you think about it, you can write it better. Better in a way for scaling, performance and memory consumption.

void DoSomethingBetter()
{
	int numberOfActions = 10;
	using (ManualResetEvent mre = new ManualResetEvent(false))
	{
		for (int i = 0; i < numberOfActions; i++)
		{
			ThreadPool.QueueUserWorkItem((o) =>
				{
					Thread.SpinWait(20000000);
					if (Interlocked.Decrement(ref numberOfActions) == 0)
						mre.Set();
				},
				null);
		}
		mre.WaitOne();
	}
}

I’m simply using one synchronization object (and using using statement ;) ), because I’m really interested in only when all tasks are done (one stuff), and decrementing the total number of tasks every time one finishes. Using Interlocked class I’m sure no race condition will occur and I’ll get the right results. After it reaches zero I’m signaling I’m done and the method can continue.

Fewer resources, atomic operations usage … better/faster results.

6
Jun

Running the other method on same thread as the first one

Last week I was solving problem. The piece of code another code was plugged in was doing some crazy threading stuff inside while the code plugged in used some component that needed some methods (in my case just two) to be called in same thread. Well, that is to make the long story short.

As I couldn’t rely on the fact, that the thread calling the first method will be still available when the other one needs to be called I decided to simply steal one thread (in fact ThreadPool thread) for it a abuse it exclusively. To encapsulate this hack I created this class.

Before you dive into, I have to note, that it is really not a general purpose class and you shouldn’t blindly use it – you can easily use it badly and i.e. create unwanted shared state, screw the flow of your program or get wrong results due race conditions, …

/// <summary>
/// This is NOT a general purpose class.
/// Use with care.
/// </summary>
sealed class ThreadMethodRunner : IDisposable
{
	AutoResetEvent _eventMethodDone;
	AutoResetEvent _eventMethodIn;
	Action _method;

	public ThreadMethodRunner()
	{
		_eventMethodDone = new AutoResetEvent(false);
		_eventMethodIn = new AutoResetEvent(false);
		ThreadPool.QueueUserWorkItem(new WaitCallback(MethodRunnerWrapper), null);
	}

	private void MethodRunnerWrapper(object o)
	{
		for (int i = 0; i < 2; i++)
		{
			Debug.WriteLine("Waiting for method");
			_eventMethodIn.WaitOne();
			Action a = Interlocked.Exchange(ref _method, null) as Action;
			a();
			Debug.WriteLine("Method done");
			_eventMethodDone.Set();
		}
		Debug.WriteLine("Thread done");
	}

	public void Do(Action method)
	{
		Interlocked.Exchange(ref _method, method);
		_eventMethodIn.Set();
		_eventMethodDone.WaitOne();
	}

	public void Dispose()
	{
		_eventMethodDone.Dispose();
		_eventMethodIn.Dispose();
	}
}

This class allows you to call the Do method with Action delegate and the method will run the code on the same thread every time (in my case just two times) called. This method blocks as you probably want to keep the behavior of you old code (at least in boundaries of method calls order, not changing some state etc. as mentioned above).

I don’t know if it’s worth any other usage than mine, but at least somebody may be inspired.

14
Apr

Go language

I was looking at Go language presentation now.

The first stuff that took my attention was the C-like syntax. I really don’t like it. I want my language to talk to me. There’s, IMO, no reason in todays world to have identifiers and symbols from only couple of characters and using a lot of different characters from various places on keyboard.

Anyway what was interesting for me was the concept of channels, especially the mapping the to threads and attempt to effectively use system resources. The APM (Asynchronous Programming Model) and/or new Tasks both looks similar to channels – at least from the view what it’s offering, not how it’s exposed into language.

I don’t know about internals. But I’m sure, that the threading and parallelism attempts to get some reasonable programming model(s) are exponentially growing.

12
Mar

Running queries in parallel with Entity Framework (and not only with it)

From time to time I have to run two or more queries that I know will always be two or more – like some first/skip records and also total count. If you write it as two queries and execute, that means two round trips to database. Although it may not matter if the network latency is very small, why not to challenge myself and try to find some workarounds.

Sure you can create some stored procedures and get the data back from these, but I was thinking about more LINQ to Entitiesish way. I recalled a way I one time used inside one project. Although it was done in pure SQL, it, as it turned out, works, kind of, for LINQ to Entities as well.

The idea is using “one row table” and put the queries as columns. Let me demonstrate:

select
  (select foo, bar from table1 where ...),
  (select baz, foo from table2 where ...)
from OneRowTable;

Where the OneRowTable can be specially created table or i.e. for Firebird RDB$DATABASE or for Oracle Database dual. It isn’t the nicest SQL (and also challenges optimizer), but works. In columns as queries you can put anything you want as long as it is syntactically correct.

OK, what about the Entity Framework or LINQ to Entities respectively. I created the “one row table” first:

create table OneRowTable(x bit primary key);
insert into OneRowTable values (0);

The table needs to have the primary key to be able to import it into entity model, the datatype doesn’t matter (I was using MS SQL, hence the bit).

What about the queries? Similar approach:

var allinone = context.OneRowTable.Select(_ => new
{
	AData = context.a.Where(a => a.x.HasValue && a.x.Value > 10).Select(a => new { A1 = a.id, A2 = a.id * 2 }),
	BData = context.b.Where(b => b.id < 999).Select(b => new { B1 = b.id, B2 = b.y }),
});
string query = (allinone as ObjectQuery).ToTraceString();
var data = allinone.First();
var adata = data.AData;
var bdata = data.BData;

The a and b are my testing tables. You can check there’s only one query executed. Encapsulating this into some method is only piece of cake.

And how the query looks like? Well for my MS SQL test:

SELECT
[UnionAll1].[x] AS [C1],
[UnionAll1].[C2] AS [C2],
[UnionAll1].[C1] AS [C3],
[UnionAll1].[id] AS [C4],
[UnionAll1].[id1] AS [C5],
[UnionAll1].[C3] AS [C6],
[UnionAll1].[C4] AS [C7],
[UnionAll1].[C5] AS [C8],
[UnionAll1].[C6] AS [C9]
FROM  (SELECT
	[Project1].[C2] AS [C1],
	[Extent1].[x] AS [x],
	1 AS [C2],
	[Project1].[id] AS [id],
	[Project1].[id] AS [id1],
	[Project1].[C1] AS [C3],
	CAST(NULL AS int) AS [C4],
	CAST(NULL AS int) AS [C5],
	CAST(NULL AS varchar(1)) AS [C6]
	FROM  [dbo].[OneRowTable] AS [Extent1]
	LEFT OUTER JOIN  (SELECT
		[Extent2].[id] AS [id],
		[Extent2].[id] * 2 AS [C1],
		1 AS [C2]
		FROM [dbo].[a] AS [Extent2]
		WHERE ([Extent2].[x] IS NOT NULL) AND ([Extent2].[x] > 10) ) AS [Project1] ON 1 = 1
UNION ALL
	SELECT
	2 AS [C1],
	[Extent3].[x] AS [x],
	[Extent4].[id] AS [id],
	CAST(NULL AS int) AS [C2],
	CAST(NULL AS int) AS [C3],
	CAST(NULL AS int) AS [C4],
	[Extent4].[id] AS [id1],
	[Extent4].[id] AS [id2],
	[Extent4].[y] AS [y]
	FROM  [dbo].[OneRowTable] AS [Extent3]
	CROSS JOIN [dbo].[b] AS [Extent4]
	WHERE [Extent4].[id] < 999) AS [UnionAll1]
ORDER BY [UnionAll1].[x] ASC, [UnionAll1].[C1] ASC

Not exactly the original shape. The translator took another way creating two one row results and using union all to get it into one query. Except this, the query is in general the same (the explicit joins are as result same as the subselects, though little bit more confusing in this case).

Again, this isn’t general purpose way of doing it and may result in worse performance than running queries separately and I would recommend using it only after careful testing and on controlled limited set of queries.

10
Jun

CountdownEvent example

Yesterday I wrote about new CountdownEvent class. But what’s better than see some example of usage? ;-)

Below is pretty simple example of usage. You can see, it’s very similar to work with array of i.e. ManualResetEvent. But you have also some handy methods and properties. For instance: AddCount/TryAddCount or CurrentCount. Very handy.

class Program
{
		static void Main(string[] args)
		{
				using (CountdownEvent cde = new CountdownEvent(10))
				{
					for (int i = 0; i < cde.InitialCount; i++)
					{
							new Thread(new ParameterizedThreadStart(Dummy)).Start(cde);
							//ThreadPool.QueueUserWorkItem(new WaitCallback(Dummy), cde);
					}
					cde.Wait(2000);
					Console.WriteLine("Threads done in first 2 seconds: {0}.", cde.InitialCount - cde.CurrentCount);
					cde.Wait();
					Console.WriteLine("All threads done.");
				}
		}

		static void Dummy(object o)
		{
				Thread.Sleep(new Random().Next(5000));
				(o as CountdownEvent).Signal();
		}
}

As I said, the work is similar to work with array of ManualResetEvent, just packed into nicer cake. In fact, if you start ILDasm and look into the code you’ll see, that’s implemented very similarly. It’s using ManualResetEventSlim (also new in .NET 4) internally to signal and smart work with Interlocked class do decrement (or increment) the number of signals received.

Do you like the class too?

9
Jun

CountdownEvent class

Today, while just randomly walking thru MSDN documentation I found new CountdownEvent class. It’s nothing ultra special. You can write similar class yourself in a couple of hours. But it’s great that you don’t have to. And with all the new stuff, like Task object, writing the multi-threaded apps is more easier than before (but sometimes it’s neat to just write some algorithm/problem using only thread and critical sections).

17
Jan

Running sync methods in async way

Probably you heard about the very good library called Power Threading Library. Shortly, it allows you to run async methods in a near-sync-looking code (and besides provides some useful classes for working in multithreaded environment). But the problem is, that you have to use methods ready for async way. That easy for dtabase calls or web service calls. But you may have your own code and you want to utilize this library to really burn up your CPU.

The obvious way is to define a delegate and use BeginInvoke/EndInvoke. However that’s not what I was interested in. Thus I created some helper methods to use any method you have in async way with Power Threading Library. Interesting fact is that’s also faster than using delegate (Jeffrey mentioned, more info here).

using System;
using System.Threading;

using Wintellect.Threading.AsyncProgModel;

public class AsyncEnumeratorSyncHelper
{
    private AsyncEnumeratorSyncHelper()
    { }

    public static AsyncResult<T> BeginHelper<T>(AsyncCallback callback, object state, Func<T> method)
    {
        AsyncResult<T> ar = new AsyncResult<T>(callback, state);

        Action<object> work = (object asyncResult) => ExecuteHelper(method, (AsyncResult<T>)asyncResult);
        ThreadPool.QueueUserWorkItem(new WaitCallback(work), ar);

        return ar;
    }

    public static AsyncResult BeginHelper(AsyncCallback callback, object state, Action method)
    {
        // just dummy object
        return BeginHelper<object>(callback, state, () => { method(); return null; });
    }

    public static T EndHelper<T>(IAsyncResult asyncResult)
    {
        AsyncResult<T> ar = (AsyncResult<T>)asyncResult;

        return ar.EndInvoke();
    }

    public static void EndHelper(IAsyncResult asyncResult)
    {
        // just dummy object
        EndHelper<object>(asyncResult);
    }

    private static void ExecuteHelper<T>(Func<T> method, AsyncResult<T> asyncResult)
    {
        try
        {
            T result = method();
            asyncResult.SetAsCompleted(result, false);
        }
        catch (Exception ex)
        {
            asyncResult.SetAsCompleted(ex, false);
        }
    }
}

With this wrapper you can call any method in async way very easily.

Still you may notice, that it’s expecting only methods without any input params. Although it looks like a problem, you can easily use lambdas to “push” params inside. If you have method int Foo(string x) you’ll just create () => Foo("rrr").

Feel free to post any problems or feedback here or in PowerThreading list.