Home > Net >  How can a thread force execution of a function in the context of another existing thread?
How can a thread force execution of a function in the context of another existing thread?

Time:04-02

In a Delphi/Linux program, let's say I have two running threads, ThreadA and ThreadB. At some point in time ThreadB need to make ThreadA execute a function and block until the function returns.

In Delphi, we have TThread.Synchronize which does the work, but only when ThreadA is the main thread.

Any idea? I use Delphi but an answer with C code is also welcome.

CodePudding user response:

To do this the threads will have to co-operate, there's no mechanism to trigger events across threads. However if you are prepared to implement such a mechanism it's not difficult to do, and of course you can look at the source code for TThread.Synchronize for tips.

Borrowing from the source for TThread.Syncrhonize I have come up with the following. You will have to have the co-operating threads check their queues as part of their main loops - which is of course how TThread.Synchronize works.

The following code is based on code we use in production - my apologies if there are comments or references to items not in the unit. There is no mechanism to provide a result of the function, but that could be resolved with using different calling templates (so the result type is known). I have allowed for a Result TObject (even though there's no way to know what that should be currently) so that multi-valued results can be returned if needed.

There's no windows specific code in the following, so it should work on Linux as you requested.

unit Unit1;

interface

uses  Classes, SyncObjs, Generics.Collections;

  type
  TQueuedCallback = class(TObject)
  protected
    _pEvent:            TEvent;
    _pResult:           TObject;
    _fnMethod:          TThreadMethod;
    _fnProcedure:       TThreadProcedure;

  public

    property  Event: TEvent read _pEvent write _pEvent;
    property  Result: TObject read _pResult write _pResult;
    property  Method: TThreadMethod read _fnMethod write _fnMethod;
    property  Proc: TThreadProcedure read _fnProcedure write _fnProcedure;

  end;

  TQueueableThread = class(TThread)
  protected
    _pCSLock:           TCriticalSection;
    _pQueuedCalls:      TList<TQueuedCallback>;
    _haSignals:         THandleObjectArray;
    _pQueueEvent:       TEvent;
    _pStopEvent:        TEvent;
    _dwMaxWait:         Cardinal;

    procedure _DoWork(nEventIndex: Integer); virtual; abstract;   // where th thread does it's work
    procedure _ExecuteQueued(blAll: Boolean = False); virtual;
  public
    destructor Destroy; override;

    procedure AfterConstruction(); override;
    procedure Execute(); override;
    procedure QueueProcedure(fnMethod: TThreadMethod); overload; virtual;
    procedure QueueProcedure(fnProcedure: TThreadProcedure); overload; virtual;
    procedure QueueProcedureAndWait(fnMethod: TThreadMethod); overload; virtual;
    procedure QueueProcedureAndWait(fnProcedure: TThreadProcedure); overload; virtual;
    function  QueueProcedureAndWaitForResult(fnMethod: TThreadMethod): TObject; overload; virtual;
    function  QueueProcedureAndWaitForResult(fnProcedure: TThreadProcedure): TObject; overload; virtual;

  end;

implementation

uses  SysUtils;

  { TQueueableThread }

  procedure TQueueableThread._ExecuteQueued(blAll: Boolean);
  begin
    repeat
      Self._pCSLock.Enter();
      if(Self._pQueuedCalls.Count>0) then
      begin
        if(Assigned(Self._pQueuedCalls.Items[0].Method)) then
          Self._pQueuedCalls.Items[0].Method()
        else if(Assigned(Self._pQueuedCalls.Items[0].Proc)) then
          Self._pQueuedCalls.Items[0].Proc();
        // No mechanism for supplying a result ...
        if(Self._pQueuedCalls.Items[0]._pEvent<>nil) then
          Self._pQueuedCalls.Items[0]._pEvent.SetEvent()
        else
          Self._pQueuedCalls.Items[0].Free;
        Self._pQueuedCalls.Delete(0);
      end;
      blAll:=(blAll And (Self._pQueuedCalls.Count>0));
      Self._pCSLock.Leave();
    until not blAll;
  end;

  destructor TQueueableThread.Destroy;
  begin
    if(Self._pQueuedCalls<>nil) then
    begin
      while(Self._pQueuedCalls.Count>0) do
      begin
        if(Self._pQueuedCalls.Items[0].Event<>nil) then
          Self._pQueuedCalls.Items[0].Event.SetEvent()
        else
          Self._pQueuedCalls.Items[0].Free();
        Self._pQueuedCalls.Delete(0);
      end;
      FreeAndNil(Self._pQueuedCalls);
    end;
  end;

  procedure TQueueableThread.AfterConstruction();
  begin
    inherited;
    Self._pCSLock:=TCriticalSection.Create();
    Self._pQueuedCalls:=TList<TQueuedCallback>.Create();
    SetLength(Self._haSignals, 2);
    Self._pQueueEvent:=TEvent.Create();
    Self._haSignals[0]:=Self._pQueueEvent;
    Self._pStopEvent:=TEvent.Create();
    Self._haSignals[1]:=Self._pStopEvent;
    Self._dwMaxWait:=30000;
  end;

  procedure TQueueableThread.Execute();
  var
    dwWaitResult:       TWaitResult;
    nEventIndex:        Integer;
    nLoop:              Integer;
    pSignalled:         THandleObject;
  begin
    while(not Self.Terminated) do
    begin
      //LogThreadMessage(GetCurrentThreadId(), Self.ClassType, Format('WaitingFor: %u', [Self._MaxWaitTime]));
      dwWaitResult:=THandleObject.WaitForMultiple(Self._haSignals, Self._dwMaxWait, False, pSignalled);
      //LogThreadMessage(GetCurrentThreadId(), Self.ClassType, Format('WaitForMultipleObjects Result: %u', [dwWaitResult]));
      if(dwWaitResult=wrError) then
        Self.Terminate;
      if not Self.Terminated then
      begin
        if(pSignalled=Self._pQueueEvent) then
        begin
          Self._ExecuteQueued(True);
          Self._pQueueEvent.ResetEvent();
        end
        else if(pSignalled=Self._pStopEvent) then
          Self.Terminate()
        else
        begin
          nEventIndex:=-2;
          if(dwWaitResult=wrTimeout) then
            nEventIndex:=-1
          else
          begin
            nLoop:=0;
            while( (nEventIndex<0) And (nLoop<Length(Self._haSignals)) ) do
            begin
              if(Self._haSignals[nLoop]=pSignalled) then
                nEventIndex:=nLoop
              else
                Inc(nLoop);
            end;
            if(nEventIndex>-2) then
            begin
              try
                Self._DoWork(nEventIndex);
              except
                on e: Exception do
                  // error handling
              end;
            end;
          end;
        end;
      end;
    end;
  end;

  procedure TQueueableThread.QueueProcedure(fnMethod: TThreadMethod);
  var
    pQueue:             TQueuedCallback;
  begin
    if(Assigned(fnMethod)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Method:=fnMethod;
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
    end;
  end;

  procedure TQueueableThread.QueueProcedure(fnProcedure: TThreadProcedure);
  var
    pQueue:             TQueuedCallback;
  begin
    if(Assigned(fnProcedure)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Proc:=fnProcedure;
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
    end;
  end;

  procedure TQueueableThread.QueueProcedureAndWait(fnMethod: TThreadMethod);
  var
    pQueue:             TQueuedCallback;
  begin
    if(Assigned(fnMethod)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Method:=fnMethod;
      pQueue.Event:=TEvent.Create();
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
      pQueue._pEvent.WaitFor(INFINITE);
      FreeAndNil(pQueue._pEvent);
      FreeAndNil(pQueue);
    end;
  end;

  procedure TQueueableThread.QueueProcedureAndWait(fnProcedure: TThreadProcedure);
  var
    pQueue:             TQueuedCallback;
  begin
    if(Assigned(fnPRocedure)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Proc:=fnProcedure;
      pQueue.Event:=TEvent.Create();
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
      pQueue._pEvent.WaitFor(INFINITE);
      FreeAndNil(pQueue._pEvent);
      FreeAndNil(pQueue);
    end;
  end;

  function  TQueueableThread.QueueProcedureAndWaitForResult(fnMethod: TThreadMethod): TObject;
  var
    pQueue:             TQueuedCallback;
  begin
    Result:=nil;
    if(Assigned(fnMethod)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Method:=fnMethod;
      pQueue.Event:=TEvent.Create();
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
      pQueue._pEvent.WaitFor(INFINITE);
      Result:=pQueue._pResult;
      FreeAndNil(pQueue._pEvent);
      FreeAndNil(pQueue);
    end;
  end;

  function  TQueueableThread.QueueProcedureAndWaitForResult(fnProcedure: TThreadProcedure): TObject;
  var
    pQueue:             TQueuedCallback;
  begin
    Result:=nil;
    if(Assigned(fnProcedure)) then
    begin
      Self._pCSLock.Enter();
      pQueue:=TQueuedCallback.Create();
      pQueue.Proc:=fnProcedure;
      pQueue.Event:=TEvent.Create();
      Self._pQueuedCalls.Add(pQueue);
      Self._pQueueEvent.SetEvent();
      Self._pCSLock.Leave();
      pQueue._pEvent.WaitFor(INFINITE);
      Result:=pQueue._pResult;
      FreeAndNil(pQueue._pEvent);
      FreeAndNil(pQueue);
    end;
  end;

end.

You could have inherited classes of TQueuedCallback that use a specific calling template, and this would be one way to identify the return value

  • Related