Threaded generic nodes: big integer data type

hi,

i’m intending to release a set of generic nodes introducing the big integer data type for calculations > int64 - this is pretty easy so far, as the bigInteger structure fortunately is included in the system.numerics namespace!
i’ve successfully had vvvv come back from calculating 15mio^2mio (took several hours, resulting number is about 10mio digits long)!!!

now, as these calculations can take a while, eg 10mio^50k takes about 2 seconds, i’d like to have these bigInt plugins open a thread for the calculations, ie not block vvvv while they’re calculating:

  1. this would mean a thread for each slice in the input, would this work/even make sense? what happens if there’s more slices than threads possible. should i just limit the possible input count? doesn’t feel right :)

  2. i’ve tried threading my plugin, but with no luck; as soon as i hit “Calculate”, vvvv freezes until the result is there (as it does without threading code):

    • region usings
      using System;
      using System.ComponentModel.Composition;
      using System.Numerics;
      using System.Threading;

    using VVVV.PluginInterfaces.V1;
    using VVVV.PluginInterfaces.V2;
    using VVVV.Utils.VColor;
    using VVVV.Utils.VMath;

    using VVVV.Core.Logging;

    • endregion usings

    namespace VVVV.Nodes
    {
    #region PluginInfo
    [PluginInfo(Name = “Power”, Category = “BigInteger”, Help = “Basic template with one value in/out”, Tags = “”)](PluginInfo(Name = “Power”, Category = “BigInteger”, Help = “Basic template with one value in/out”, Tags = “”))
    #endregion PluginInfo
    public class BigIntegerPowerNode : IPluginEvaluate
    {
    #region fields & pins
    [Input(“Base”, DefaultValue = 1.0)](Input(“Base”, DefaultValue = 1.0))
    public ISpread FInput;

     	[Input("Exponent", DefaultValue = 1.0)](Input("Exponent", DefaultValue = 1.0))
     	public ISpread<int> FInput2;
     	
     	[Input("Calculate", DefaultValue = 1.0, IsBang =true, IsSingle =true, Visibility = PinVisibility.OnlyInspector)](Input("Calculate", DefaultValue = 1.0, IsBang =true, IsSingle =true, Visibility = PinVisibility.OnlyInspector))
     	public ISpread<int> FDoIt;
    
     	[Output("Output")](Output("Output"))
     	public ISpread<BigInteger> FOutput;
     	
     	[Output("AsString")](Output("AsString"))
     	public ISpread<string> FString;
    
    
     	[Import()](Import())
     	public ILogger FLogger;
     	#endregion fields & pins
     	
     	static BigInteger result;
     	
     	//called when data for any output pin is requested
     	public void Evaluate(int SpreadMax)
     	{
     		FOutput.SliceCount = SpreadMax;
    
     		BigInteger bigIntFromInt64 = new BigInteger(FInput[0](0));
     		
     		if (FDoIt[0](0) == 1){
     		for (int i = 0; i < SpreadMax; i++) {
     			
     			Thread myNewThread = new Thread[) => BigIntPow(bigIntFromInt64,FInput2[i](i)](https://vvvv.org/documentation/)-=>-BigIntPow(bigIntFromInt64,FInput2[i](i));
     			myNewThread.Start();
    
     			FString[i](i) = result.ToString();				
     											}			
     							}
     		
     	}
     	
     	private static void BigIntPow(BigInteger Base, int Exponent)
     		{
     		result = BigInteger.Pow(Base,Exponent);
     		}
     }
    

    }

now i know i’m already threading for each input slice here, but im testing with single sliced spreads only…
also i believe to understand that it’s my if statement stopping my plugin to run independent of vvvv itself; but tbh i don’t know as it’s the first time i’m trying to do a threaded plugin at all and i’m not sure on how to structure this properly…

any help/input on this, ie example plugins successfully threading parametrized methods from which i can learn, would be much appreciated!!

i’ve also attached my plugin with a test patch computing 10mio^50k! R.I.P. INF.00 :)

thanks a lot!

BigIntegerPower.zip (34.4 kB)

Well I don’t know if this could be of any help, but there is a comment made by @evvvvil)) here: ((contribution:evvvvil-tweet-engine; he talks about multithreading and stuff…

Hey h99, thanks for the tip, this definitely helps…it’s not dealing with methods using any inputs (Parameters), but it shows me if the rest i’m doing is correct ;)

Here’s a solution using the task parallel library. It’s a little tricky, but I hope it’s understandable with the comments:

- region usings
using System;
using System.ComponentModel.Composition;
using System.Numerics;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;

using VVVV.PluginInterfaces.V1;
using VVVV.PluginInterfaces.V2;
using VVVV.Utils.VColor;
using VVVV.Utils.VMath;

using VVVV.Core.Logging;
- endregion usings

namespace VVVV.Nodes
{
    [PluginInfo(Name = "Power", Category = "BigInteger", Help = "Little example of how to use the TPL in a plugin")](PluginInfo(Name = "Power", Category = "BigInteger", Help = "Little example of how to use the TPL in a plugin"))
    public class BigIntegerPowerNode : IPluginEvaluate, IPartImportsSatisfiedNotification, IDisposable
    {
        [Input("Base", DefaultValue = 1)](Input("Base", DefaultValue = 1))
        public ISpread<int> FBaseIn;

        [Input("Exponent", DefaultValue = 1)](Input("Exponent", DefaultValue = 1))
        public ISpread<int> FExponentInt;

        [Input("Calculate", IsBang = true, IsSingle = true)](Input("Calculate", IsBang = true, IsSingle = true))
        public ISpread<bool> FDoItIn;

        [Output("Output")](Output("Output"))
        public ISpread<BigInteger> FOutput;

        [Output("Is Ready")](Output("Is Ready"))
        public ISpread<bool> FReadyOut;

        [Output("AsString")](Output("AsString"))
        public ISpread<string> FStringOut;

        [Import](Import)
        public ILogger FLogger;

        private readonly Spread<Task> FTasks = new Spread<Task>();
        private CancellationTokenSource FCts;

        // Called when this plugin was created
        public void OnImportsSatisfied()
        {
            // Do any initialization logic here. In this example we won't need to
            // do anything special.
        }

        // Called when this plugin gets deleted
        public void Dispose()
        {
            // Should this plugin get deleted by the user or should vvvv shutdown
            // we need to wait until all still running tasks ran to a completion
            // state.
            CancelRunningTasks();
        }

        // Called when data for any output pin is requested
        public void Evaluate(int SpreadMax)
        {
            if (FDoItIn[0](0))
            {
                // Let's first cancel all running tasks (if any).
                CancelRunningTasks();
                // Create a new task cancellation source object.
                FCts = new CancellationTokenSource();
                // Retrieve the cancellation token from it which we'll use for
                // the new tasks we setup up now.
                var ct = FCts.Token;
                // Set the slice counts of our outputs.
                FOutput.SliceCount = SpreadMax;
                FStringOut.SliceCount = SpreadMax;
                FReadyOut.SliceCount = SpreadMax;
                // Setup the new tasks.
                FTasks.SliceCount = SpreadMax;
                for (int i = 0; i < FReadyOut.SliceCount; i++)
                {
                    // Reset the outputs and the ready state
                    FOutput[i](i) = default(BigInteger);
                    FStringOut[i](i) = string.Empty;
                    FReadyOut[i](i) = false;

                    // Now setup a new task which will perform the long running
                    // computation on the thread pool of the system.
                    var task = Task.Factory.StartNew(() =>
                    {
                        // Should a cancellation be requested throw the task
                        // canceled exception.
                        // In this specific scenario this seems a little useless,
                        // but imagine your long running computation runs in a loop 
                        // you could call this method in each iteration. 
                        // Also many asynchronously methods in .NET provide an overload
                        // which takes a cancellation token.
                        ct.ThrowIfCancellationRequested();

                        // Here is the actual compution:
                        var @base = new BigInteger(FBaseIn[i](i));
                        var exponent = FExponentInt[i](i);
                        var result = BigIntPow(@base, exponent);
                        // Note that the ToString method will take the most time 
                        // in this particular example, so we'll also compute it in
                        // background.
                        return new { Value = result, ValueAsString = result.ToString() };
                    },
                        // The cancellation token should also be passed to the StartNew method.
                        // For details see http://msdn.microsoft.com/en-us/library/dd997396%28v=vs.110%29.aspx
                        ct
                        // Once the task is completed we want to write the result to the output.
                        // Writing to pins is only allowed in the main thread of vvvv. To achieve
                        // this we setup a so called continuation which we tell to run on the
                        // task scheduler of the main thread, which is in fact the one who called
                        // the Evaluate method of this plugin.
                    ).ContinueWith(t =>
                    {
                        // Write the result to the outputs
                        FOutput[i](i) = t.Result.Value;
                        // Note that in this particular example writing out the string
                        // will take a very long time - so should more or less be seen
                        // as a debug output.
                        FStringOut[i](i) = t.Result.ValueAsString;
                        // And set the ready state to true
                        FReadyOut[i](i) = true;
                    },
                        // Same as in StartNew we pass the used cancellation token
                        ct,
                        // Here we can specify some options under which circumstances the 
                        // continuation should run. In this case we only want it to run if
                        // the task wasn't cancelled before.
                        TaskContinuationOptions.OnlyOnRanToCompletion,
                        // This way we tell the continuation to run on the main thread of vvvv.
                        TaskScheduler.FromCurrentSynchronizationContext()
                    );
                    // Now save the task in our internal task spread so we're able to cancel
                    // it later on.
                    FTasks[i](i) = task;
                }
            }
        }

        private void CancelRunningTasks()
        {
            if (FCts != null)
            {
                // All our running tasks use the cancellation token of this cancellation
                // token source. Once we call cancel the ct.ThrowIfCancellationRequested()
                // will throw and the task will transition to the canceled state.
                FCts.Cancel();
                try
                {
                    // We need to wait for all tasks until they're either in the canceled
                    // or completion state.
                    Task.WaitAll(FTasks.ToArray());
                }
                catch (AggregateException e)
                {
                    // Log all exceptions which were thrown during the task execution.
                    foreach (var exception in e.InnerExceptions)
                        FLogger.Log(exception);
                }
                // Dispose the cancellation token source and set it to null so we know
                // to setup a new one in a next frame.
                FCts.Dispose();
                FCts = null;
                // And cleanup all the tasks
                foreach (var task in FTasks)
                    task.Dispose();
                FTasks.SliceCount = 0;
            }
        }

        private static BigInteger BigIntPow(BigInteger Base, int Exponent)
        {
            return BigInteger.Pow(Base, Exponent);
        }
    }
}

huui, looks like I’ve been way more than just a little off here, haha…
THANKS A LOT, especially for the detailed comments!!
works perfectly! i’ll have to go through it step by step to apply it correctly to other big integer nodes, but thanks to your comments this should be rather easy!

woei made a good point, that, given several big integer nodes which you combine to a more complex calculation, it would make more sense to only pass the functions (including their inputs) of each node and start only one thread in the last node (where the incoming functions are combined to one big calculation)…
i’m not sure if there’ll be any actual need for big integers for other users (for what i need i’m going to do specific plugins which contain the full formulas), but it does sound like an interestimg thing to try&learn, especially as it seems to be completely out of my coding league- i like that ;)

concerning AsString: it is used as a debugging output here only and i intend to do separate AsString and AsValue (if applicable) nodes…
on a sidenote, i wonder why building the string of the result always takes longer than the calculation itself? regardless of the inputs of the power function?

thanks again!

would be a cool new Template for dynamic plugins…

Hmm,

i tried to get this example running.
With one value as input for Base and Exponent all works fine, and calculation is handled in an different tasks.
But with an Spread as input > 1 things get messy. All results get written into the first Spread of the output.

Do you have the same problem?
Any ideas how to assign the results to correct spread index?
A small Demo Patch is attached.

VVVV-Task.zip (12.1 MB)

I made some adjustments to @Elias code. The Example is attached.
The solution for the results not being assigned to the right spread position was solved by storing the Spread loop i to a new parameter. More or less as described here:

In addition i added some code so that every Spread can kick off it’s own task, resulting in a spred of tasks…

- region usings
using System;
using System.ComponentModel.Composition;
using System.Numerics;
using System.Threading;
using System.Threading.Tasks;
using System.Linq;

using VVVV.PluginInterfaces.V1;
using VVVV.PluginInterfaces.V2;
using VVVV.Utils.VColor;
using VVVV.Utils.VMath;

using VVVV.Core.Logging;
- endregion usings

namespace VVVV.Nodes
{
	#region PluginInfo
	[PluginInfo(Name = "Power", Category = "BigInteger", Help = "Basic template with one value in/out", Tags = "")](PluginInfo(Name = "Power", Category = "BigInteger", Help = "Basic template with one value in/out", Tags = ""))
	#endregion PluginInfo
	public class BigIntegerPowerNode : IPluginEvaluate, IPartImportsSatisfiedNotification, IDisposable
	{
		[Input("Base", DefaultValue = 1)](Input("Base", DefaultValue = 1))
		public ISpread<int> FBaseIn;

		[Input("Exponent", DefaultValue = 1)](Input("Exponent", DefaultValue = 1))
		public ISpread<int> FExponentInt;

		[Input("Calculate", IsBang = true, IsSingle = false)](Input("Calculate", IsBang = true, IsSingle = false))
		public ISpread<bool> FDoItIn;
		
		[Input("Cancel", IsBang = true, IsSingle = false)](Input("Cancel", IsBang = true, IsSingle = false))
		public ISpread<bool> FCancel;

		[Output("Output")](Output("Output"))
		public ISpread<BigInteger> FOutput;

		[Output("Is Ready")](Output("Is Ready"))
		public ISpread<bool> FReadyOut;

		[Output("AsString")](Output("AsString"))
		public ISpread<string> FStringOut;

		[Import](Import)
		public ILogger FLogger;

		private readonly  Spread<Task> FTasks = new Spread<Task>();
		private readonly Spread<CancellationTokenSource> FCts = new Spread<CancellationTokenSource>();
		private readonly Spread<CancellationToken> ct = new Spread<CancellationToken>();
		private int TaskCount = 0;

		// Called when this plugin was created
		public void OnImportsSatisfied()
		{
			// Do any initialization logic here. In this example we won't need to
			// do anything special.
		}

		// Called when this plugin gets deleted
		public void Dispose()
		{
			// Should this plugin get deleted by the user or should vvvv shutdown
			// we need to wait until all still running tasks ran to a completion
			// state.
			for (int i = 0; i < TaskCount; i++)
			{
				int index = i;
				FLogger.Log(LogType.Message, "Dispose task:"+index);
				CancelRunningTasks(index);
			}
		}

		// Called when data for any output pin is requested
		public void Evaluate(int SpreadMax)
		{
			// Set the slice counts of our outputs.
			FOutput.SliceCount = SpreadMax;
			FStringOut.SliceCount = SpreadMax;
			FReadyOut.SliceCount = SpreadMax;
			
			FTasks.SliceCount = SpreadMax;
			FCts.SliceCount = SpreadMax;
			ct.SliceCount = SpreadMax;
			TaskCount = SpreadMax;
			
			
			for (int i = 0; i < SpreadMax; i++)
			{
				// store i to a new variable so it won't change when tasks are running over longer period.
				int index = i;
				
				if (FCancel[index](index))
				{
					CancelRunningTasks(index);
				}

				
				if (FDoItIn[index](index))
				{                
					// Let's first cancel all running tasks (if any).
					CancelRunningTasks(index);
					
					// Create a new task cancellation source object.
					FCts[index](index) = new CancellationTokenSource();
					// Retrieve the cancellation token from it which we'll use for
					// the new tasks we setup up now.
					ct[index](index) = FCts[index](index).Token;		
					
					// Reset the outputs and the ready state
					FOutput[i](i) = default(BigInteger);
					FStringOut[i](i) = string.Empty;
					FReadyOut[i](i) = false;
					

					// Now setup a new task which will perform the long running
					FTasks[index](index) = Task.Factory.StartNew(() =>
					{
						// Should a cancellation be requested throw the task
						// canceled exception.
						// In this specific scenario this seems a little useless,
						// but imagine your long running computation runs in a loop 
						// you could call this method in each iteration. 
						// Also many asynchronously methods in .NET provide an overload
						// which takes a cancellation token.
						ct[index](index).ThrowIfCancellationRequested();

						// Here is the actual compution:
						var @base = new BigInteger(FBaseIn[index](index));
						var exponent = FExponentInt[index](index);
						var result = BigIntPow(@base, exponent);
						// Note that the ToString method will take the most time 
						// in this particular example, so we'll also compute it in
						// background.
						return new { Value = result, ValueAsString = result.ToString() };
					},
					// The cancellation token should also be passed to the StartNew method.
					// For details see http://msdn.microsoft.com/en-us/library/dd997396%28v=vs.110%29.aspx
					ct[index](index)
					// Once the task is completed we want to write the result to the output.
					// Writing to pins is only allowed in the main thread of vvvv. To achieve
					// this we setup a so called continuation which we tell to run on the
					// task scheduler of the main thread, which is in fact the one who called
					// the Evaluate method of this plugin.
					).ContinueWith(t =>
					{
						// Write the result to the outputs
						FOutput[index](index) = t.Result.Value;
						// Note that in this particular example writing out the string
						// will take a very long time - so should more or less be seen
						// as a debug output.
						FStringOut[index](index) = t.Result.ValueAsString;
						// And set the ready state to true
						FReadyOut[index](index) = true;
					},
					// Same as in StartNew we pass the used cancellation token
					ct[index](index),
					// Here we can specify some options under which circumstances the 
					// continuation should run. In this case we only want it to run if
					// the task wasn't cancelled before.
					TaskContinuationOptions.OnlyOnRanToCompletion,
					// This way we tell the continuation to run on the main thread of vvvv.
					TaskScheduler.FromCurrentSynchronizationContext()
					);
				}
			}
		}

		private void CancelRunningTasks(int index)
		{
			if (FCts[index](index) != null)
			{
				// All our running tasks use the cancellation token of this cancellation
				// token source. Once we call cancel the ct.ThrowIfCancellationRequested()
				// will throw and the task will transition to the canceled state.
				FCts[index](index).Cancel();
				
				// Dispose the cancellation token source and set it to null so we know
				// to setup a new one in a next frame.
				FCts[index](index).Dispose();
				FCts[index](index) = null;
				
			}
		}

		private static BigInteger BigIntPow(BigInteger Base, int Exponent)
		{
			return BigInteger.Pow(Base, Exponent);
		}
	}
}

Task Example with spreaded Task and cancelation Token. (12.3 MB)

Wow thx for pointing that out. Especially the linked article.