Posts Tagged ‘Multithreading’

4
Aug

Projection (select) on a collection running in parallel with exceptions handling

Few days ago I posted an extension method to run projection on a collection in parallel. The method has one problem. It’s not dealing with exceptions. And because the ordering wasn’t (and isn’t) implicitly preserved, I did this small improvement.

Right now the method returns simple structure with original item, the result (if no exception occured, sure) and exception (if any). I didn’t went to AggregateException (although you can modify the code yourself to use it). Now you can decide while consuming what to do when exception occurred. Adding some kind of cancellation shouldn’t be difficult.

The idea behind is the same as in previous version.

#region ParallelProjection
internal struct ParallelProjectionResult<TSource, TResult>
{
	public TSource Item { get; set; }
	public TResult Result { get; set; }
	public Exception Exception { get; set; }
}

internal static IEnumerable<ParallelProjectionResult<TSource, TResult>> ParallelProjection<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, TResult> projection, int maxParallelism)
{
	BlockingCollection<ParallelProjectionResult<TSource, TResult>> results = new BlockingCollection<ParallelProjectionResult<TSource, TResult>>();

	ThreadPool.QueueUserWorkItem((o) =>
	{
		Semaphore semaphore = new Semaphore(maxParallelism, maxParallelism);
		CountdownEvent countdown = new CountdownEvent(1);
		try
		{
			foreach (var item in source)
			{
				countdown.AddCount();
				semaphore.WaitOne();
				ThreadPool.QueueUserWorkItem(
					(element) =>
					{
						TSource e = (TSource)element;
						ParallelProjectionResult<TSource, TResult> result = new ParallelProjectionResult<TSource, TResult>();
						result.Item = e;
						try
						{
							result.Result = projection(e);
						}
						catch (Exception ex)
						{
							result.Exception = ex;
						}
						results.Add(result);
						semaphore.Release();
						countdown.Signal();
					},
					item);
			}
			countdown.Signal();
			countdown.Wait();
			results.CompleteAdding();
		}
		finally
		{
			if (countdown != null)
				countdown.Dispose();
			if (semaphore != null)
				semaphore.Dispose();
		}
	}, null);

	return results.GetConsumingEnumerable();
}
#endregion
  • Twitter
  • Facebook
  • Share/Bookmark
2
Aug

Projection (select) on a collection running in parallel

Here’s updated version of the method.

I have here another not-general-purpose-parallel/multihtreaded-method. :) To make a long story short I needed do some transformation on collection’s elements, aka projection. Unfortunately the method I was plugging in was doing some network requests, in fact couple of requests. Sequentially, blocking processing until the response came back. I know a proper way will be to turn these requests into asynchronous, unluckily this was part of bigger architecture I could not change. Because it’s not CPU bound the AsParallel method would not help much. So I solved it abusing ThreadPool threads. Bad for scheduler and memory, as I’ll be wasting threads and resources, blocking, until reply is sent by server, but very easy for me. I told you, abusing. ;)

So I came with this method. It’s utilizing new .NET Framework 4 concurrent collections, BlockingCollection in particular as it’s great for producer-consumer scenario and I want the method to return results whenever one is done (that also implies the ordering isn’t preserved).

internal static IEnumerable<TResult> ParallelProjection<TSource, TResult>(this IEnumerable<TSource> source, Func<TSource, TResult> projection, int maxParallelism)
{
	BlockingCollection<TResult> results = new BlockingCollection<TResult>();

	ThreadPool.QueueUserWorkItem((o) =>
	{
		Semaphore semaphore = new Semaphore(maxParallelism, maxParallelism);
		CountdownEvent countdown = new CountdownEvent(1);
		try
		{
			foreach (var item in source)
			{
				countdown.AddCount();
				semaphore.WaitOne();
				ThreadPool.QueueUserWorkItem(
					(element) =>
					{
						results.Add(projection((TSource)element));
						semaphore.Release();
						countdown.Signal();
					},
					item);
			}
			countdown.Signal();
			countdown.Wait();
			results.CompleteAdding();
		}
		finally
		{
			if (countdown != null)
				countdown.Dispose();
			if (semaphore != null)
				semaphore.Dispose();
		}
	}, null);

	return results.GetConsumingEnumerable();
}

The method is straightforward, a lot of work was saved using the smart blocking collection. I’m simply reading items from the collection and applying the function to them. To not overload the system with huge number of threads I also added maxParallelism parameter. When this number of threads is processing items, I’ll stop scheduling more, using Semaphore, until some are done and again available. When there’s no item in source collection available and all item were processed I call CompleteAdding method to say I’m done and there will be no other items. Here I’m using CountdownEvent class initialized to 1 as you can’t, of course, add items if it reaches 0. Before final Wait I’m subtracting one to compensate this.

And that’s it. Again, it’s not general purpose method. Use with care, it may bring you even worse performance if wrongly used.

  • Twitter
  • Facebook
  • Share/Bookmark
31
Jul

Array of WaitHandles (ManualResetEvent, AutoResetEvent, …) when waiting for operations to complete …

Often, when you discover the beauty of multithreading and parallelism, you find a need to run some operations in parallel and wait for completion. Fairly common scenario. Although now, with .NET Framework 4, you can write it using Task Parallel Library‘s Parallel.Invoke, there are scenarios when you need to plug it in into some other methods/parameters, so you’ll do it yourself explicitly with threads or better to say ThreadPool.

The method I see from time to time looks basically like this:

void DoSomethingExample()
{
	int numberOfActions = 10;
	ManualResetEvent[] mres = new ManualResetEvent[numberOfActions];
	for (int i = 0; i < numberOfActions; i++)
	{
		mres[i] = new ManualResetEvent(false);
		ThreadPool.QueueUserWorkItem((o) =>
			{
				Thread.SpinWait(20000000);
				(o as ManualResetEvent).Set();
			},
			mres[i]);
	}
	ManualResetEvent.WaitAll(mres);
}

Though it’s not wrong, except the ManualResetEvents are not Disposed, it’s suboptimal. You’re wasting resources creating array of these objects.

But if you think about it, you can write it better. Better in a way for scaling, performance and memory consumption.

void DoSomethingBetter()
{
	int numberOfActions = 10;
	using (ManualResetEvent mre = new ManualResetEvent(false))
	{
		for (int i = 0; i < numberOfActions; i++)
		{
			ThreadPool.QueueUserWorkItem((o) =>
				{
					Thread.SpinWait(20000000);
					if (Interlocked.Decrement(ref numberOfActions) == 0)
						mre.Set();
				},
				null);
		}
		mre.WaitOne();
	}
}

I’m simply using one synchronization object (and using using statement ;) ), because I’m really interested in only when all tasks are done (one stuff), and decrementing the total number of tasks every time one finishes. Using Interlocked class I’m sure no race condition will occur and I’ll get the right results. After it reaches zero I’m signaling I’m done and the method can continue.

Fewer resources, atomic operations usage … better/faster results.

  • Twitter
  • Facebook
  • Share/Bookmark
6
Jun

Running the other method on same thread as the first one

Last week I was solving problem. The piece of code another code was plugged in was doing some crazy threading stuff inside while the code plugged in used some component that needed some methods (in my case just two) to be called in same thread. Well, that is to make the long story short.

As I couldn’t rely on the fact, that the thread calling the first method will be still available when the other one needs to be called I decided to simply steal one thread (in fact ThreadPool thread) for it a abuse it exclusively. To encapsulate this hack I created this class.

Before you dive into, I have to note, that it is really not a general purpose class and you shouldn’t blindly use it – you can easily use it badly and i.e. create unwanted shared state, screw the flow of your program or get wrong results due race conditions, …

/// <summary>
/// This is NOT a general purpose class.
/// Use with care.
/// </summary>
sealed class ThreadMethodRunner : IDisposable
{
	AutoResetEvent _eventMethodDone;
	AutoResetEvent _eventMethodIn;
	Action _method;

	public ThreadMethodRunner()
	{
		_eventMethodDone = new AutoResetEvent(false);
		_eventMethodIn = new AutoResetEvent(false);
		ThreadPool.QueueUserWorkItem(new WaitCallback(MethodRunnerWrapper), null);
	}

	private void MethodRunnerWrapper(object o)
	{
		for (int i = 0; i < 2; i++)
		{
			Debug.WriteLine("Waiting for method");
			_eventMethodIn.WaitOne();
			Action a = Interlocked.Exchange(ref _method, null) as Action;
			a();
			Debug.WriteLine("Method done");
			_eventMethodDone.Set();
		}
		Debug.WriteLine("Thread done");
	}

	public void Do(Action method)
	{
		Interlocked.Exchange(ref _method, method);
		_eventMethodIn.Set();
		_eventMethodDone.WaitOne();
	}

	public void Dispose()
	{
		_eventMethodDone.Dispose();
		_eventMethodIn.Dispose();
	}
}

This class allows you to call the Do method with Action delegate and the method will run the code on the same thread every time (in my case just two times) called. This method blocks as you probably want to keep the behavior of you old code (at least in boundaries of method calls order, not changing some state etc. as mentioned above).

I don’t know if it’s worth any other usage than mine, but at least somebody may be inspired.

  • Twitter
  • Facebook
  • Share/Bookmark
14
Apr

Go language

I was looking at Go language presentation now.

The first stuff that took my attention was the C-like syntax. I really don’t like it. I want my language to talk to me. There’s, IMO, no reason in todays world to have identifiers and symbols from only couple of characters and using a lot of different characters from various places on keyboard.

Anyway what was interesting for me was the concept of channels, especially the mapping the to threads and attempt to effectively use system resources. The APM (Asynchronous Programming Model) and/or new Tasks both looks similar to channels – at least from the view what it’s offering, not how it’s exposed into language.

I don’t know about internals. But I’m sure, that the threading and parallelism attempts to get some reasonable programming model(s) are exponentially growing.

  • Twitter
  • Facebook
  • Share/Bookmark
29
Oct

My Timer usage and references moral

Few minutes ago I got flash of intelligence. I’m using Timer in one of my windows service running simply 24×7. And I had a bug getting wrong data on server that I was not able to reproduce locally. Same data, same code, nothing. That was pretty weird, because after roughly four days I restarted the service and it started working correctly.

I opened a debugger, started the service locally and went for some candy. Because the data should be refreshed by default every two minutes, when I came back I realized, that the refresh procedure was not run, because there was a breakpoint, but no hit. So I started looking for some info in documentation, when suddenly the note followed by recalling the knowledge came in:

As long as you are using a Timer, you must keep a reference to it. As with any managed object, a Timer is subject to garbage collection when there are no references to it. The fact that a Timer is still active does not prevent it from being collected.

Yep, I was creating the instance but I was not holding a reference to it, so in under the refresh interval it was garbage collected. : Shame on me. But. We learn by mistakes.

  • Twitter
  • Facebook
  • Share/Bookmark
11
Oct

Multithreading with Entity Framework

From time to time I get a question about using ObjectContext from more than one thread. Because Entity Framework sits on top of ADO.NET, it’s obvious, it cannot be thread safe. So if you need to use n threads, use m (where m>=n) ObjectContexts. That’s the easy way. But what if you really need to share ObjectContext between threads?

The first fact is, that you cannot run more statements at a time, because it will sooner or later create mess in provider. So the solution is to carefully lock the usage. Not good for scaling, but you can do (almost) nothing with it.

Another basic issue is that with IQueryable (if you’re using LINQ) you don’t know, when the query gets actually executed. Until somebody calls for example ToArray, it’s just a definition/shape of the query. And when it gets executed, the code can be out of your method or out of the lock. For sure. The composition will be affected little bit. You can create a rule, that everything you’ll be exposing to UI (or any higher level) developers will be for instance List<T>. Then you will probably need to prepare significant amount of methods, for almost every data projection, selection, … they need. Good news is, that you can hide (make i.e. internal) the original methods, and nobody will screw up something.

Maybe better option is to create simple method taking complete query as parameter and returning the data i.e. as IEnumerable (simply saying, fetched from database). This is good, but everybody has to be attentive, using only this method (it will be generic, so it may look little weird using it with anonymous type, but works and you can find something about it looking for “cast by example” using your favourite search engine). As a good side-effect, you can add easily add i.e. logging of queries being sent to the database from application.

Stored procedures are executed immediately (are not composable), thus you can just create simple wrapper around it with lock. Easy.

The rest is  saving changes. The SaveChanges method is virtual in EFv4, so your own implementation with lock will be easy (and you can use T4 templates for ObjectContext to make the code with lock right from generator). In EFv1, the story is similar as with querying. Create separate method and tell everybody use only this one or swat the original over with your own using new keyword.

Last topic that’s in my head right now, is working with entities in code – changing properties (don’t forgot associations). If one entity will be edited in more than one thread, you may (or may not) confuse ObjectStateManager as the overlapping change tracking may kick in. To be on the safe side, I think avoiding this is best way – choose whatever you like for doing it (and take into account that one thread can be editing the entity and the other one refreshing it from store, for instance, so choose proper granularity of lock (or introduce some rules into your code/team ;) )).

I don’t know whether I cover all main basic stuff you can do with EF as I’m writing it from top of my head, bare with me and feel free to comment. And be it as it is right now, if there’s 1% chance of being able to use separate ObjectContexts, do it. It will prevent you lot of headaches.

  • Twitter
  • Facebook
  • Share/Bookmark
10
Jun

CountdownEvent example

Yesterday I wrote about new CountdownEvent class. But what’s better than see some example of usage? ;-)

Below is pretty simple example of usage. You can see, it’s very similar to work with array of i.e. ManualResetEvent. But you have also some handy methods and properties. For instance: AddCount/TryAddCount or CurrentCount. Very handy.

class Program
{
		static void Main(string[] args)
		{
				using (CountdownEvent cde = new CountdownEvent(10))
				{
					for (int i = 0; i < cde.InitialCount; i++)
					{
							new Thread(new ParameterizedThreadStart(Dummy)).Start(cde);
							//ThreadPool.QueueUserWorkItem(new WaitCallback(Dummy), cde);
					}
					cde.Wait(2000);
					Console.WriteLine("Threads done in first 2 seconds: {0}.", cde.InitialCount - cde.CurrentCount);
					cde.Wait();
					Console.WriteLine("All threads done.");
				}
		}

		static void Dummy(object o)
		{
				Thread.Sleep(new Random().Next(5000));
				(o as CountdownEvent).Signal();
		}
}

As I said, the work is similar to work with array of ManualResetEvent, just packed into nicer cake. In fact, if you start ILDasm and look into the code you’ll see, that’s implemented very similarly. It’s using ManualResetEventSlim (also new in .NET 4) internally to signal and smart work with Interlocked class do decrement (or increment) the number of signals received.

Do you like the class too?

  • Twitter
  • Facebook
  • Share/Bookmark
9
Jun

CountdownEvent class

Today, while just randomly walking thru MSDN documentation I found new CountdownEvent class. It’s nothing ultra special. You can write similar class yourself in a couple of hours. But it’s great that you don’t have to. And with all the new stuff, like Task object, writing the multi-threaded apps is more easier than before (but sometimes it’s neat to just write some algorithm/problem using only thread and critical sections).

  • Twitter
  • Facebook
  • Share/Bookmark
17
Jan

Running sync methods in async way

Probably you heard about the very good library called Power Threading Library. Shortly, it allows you to run async methods in a near-sync-looking code (and besides provides some useful classes for working in multithreaded environment). But the problem is, that you have to use methods ready for async way. That easy for dtabase calls or web service calls. But you may have your own code and you want to utilize this library to really burn up your CPU.

The obvious way is to define a delegate and use BeginInvoke/EndInvoke. However that’s not what I was interested in. Thus I created some helper methods to use any method you have in async way with Power Threading Library. Interesting fact is that’s also faster than using delegate (Jeffrey mentioned, more info here).

using System;
using System.Threading;

using Wintellect.Threading.AsyncProgModel;

public class AsyncEnumeratorSyncHelper
{
    private AsyncEnumeratorSyncHelper()
    { }

    public static AsyncResult<T> BeginHelper<T>(AsyncCallback callback, object state, Func<T> method)
    {
        AsyncResult<T> ar = new AsyncResult<T>(callback, state);

        Action<object> work = (object asyncResult) => ExecuteHelper(method, (AsyncResult<T>)asyncResult);
        ThreadPool.QueueUserWorkItem(new WaitCallback(work), ar);

        return ar;
    }

    public static AsyncResult BeginHelper(AsyncCallback callback, object state, Action method)
    {
        // just dummy object
        return BeginHelper<object>(callback, state, () => { method(); return null; });
    }

    public static T EndHelper<T>(IAsyncResult asyncResult)
    {
        AsyncResult<T> ar = (AsyncResult<T>)asyncResult;

        return ar.EndInvoke();
    }

    public static void EndHelper(IAsyncResult asyncResult)
    {
        // just dummy object
        EndHelper<object>(asyncResult);
    }

    private static void ExecuteHelper<T>(Func<T> method, AsyncResult<T> asyncResult)
    {
        try
        {
            T result = method();
            asyncResult.SetAsCompleted(result, false);
        }
        catch (Exception ex)
        {
            asyncResult.SetAsCompleted(ex, false);
        }
    }
}

With this wrapper you can call any method in async way very easily.

Still you may notice, that it’s expecting only methods without any input params. Although it looks like a problem, you can easily use lambdas to “push” params inside. If you have method int Foo(string x) you’ll just create () => Foo("rrr").

Feel free to post any problems or feedback here or in PowerThreading list.

  • Twitter
  • Facebook
  • Share/Bookmark