1#ifndef _DRAMA2_THREADACTION_INC
2#define _DRAMA2_THREADACTION_INC
47#include <condition_variable>
57 enum class ThreadSignalType {
66 struct DramaSignalDetails {
67 std::thread::id _fromThread;
68 ThreadSignalType _sigType;
70 DramaSignalDetails(ThreadSignalType
sigType) :
85 typedef std::deque<DramaSignalDetails> SignalDetailsQueue;
98 class _ThreadMessage :
public MessageHandler {
99 TAction *_threadHandlerObj;
118 TAction *_threadHandlerObj;
181 friend class _ThreadMessage;
182 friend class _ThreadKick;
185 std::future<void> _actionFuture;
192 std::weak_ptr<Task> _theTask;
219 _obeyRescheduleObj(
this),_kickMessageObj(
this) {
316 virtual void KickedWhenNotWaiting(sds::IdPtr
Arg);
339 virtual void SignaledWhenNotWaiting(sds::IdPtr
Arg);
348 return std::shared_ptr<Task>(_theTask);
386 template<
typename...
Types>
394 std::stringstream
sstrm;
427 double GetTimeout()
const {
463 SignalAmWaitingForKick();
473 std::shared_ptr<Task>(_theTask)->Lock());
475 std::thread::id
threadId(std::this_thread::get_id());
495 *
event =
details.eventInfo.entryReason;
503 std::shared_ptr<Task>(_theTask)->Logger().Log(
D2LOG_DRAMA2,
true,
504 "d2::TACT::WaitForEvent",
540 template <
class Rep,
class Period>
541 bool WaitForEventTimeoutIn(
543 const std::chrono::duration<Rep,Period>&
rel_time,
550 SignalAmWaitingForKick();
555 std::shared_ptr<Task>(_theTask)->Lock());
556 std::thread::id
threadId(std::this_thread::get_id());
572 std::shared_ptr<Task>(_theTask)->Logger().Log(
574 "d2::TACT::WaitEventTOIn",
575 "Timeout waiting for signal/kick");
581 *
event = EntryCode::DramaAbortWaits;
587 *
event =
details.eventInfo.entryReason;
595 std::shared_ptr<Task>(_theTask)->Logger().Log(
D2LOG_DRAMA2,
true,
596 "d2::TACT::WaitEventTOIn",
597 "kick/signal received");
627 bool WaitForEventTimeoutIn(
632 return WaitForEventTimeoutIn(
event, std::chrono::seconds(
seconds), arg);
669 template <
class Clock,
class Duration>
670 bool WaitEventTimeoutAt(
672 const std::chrono::time_point<Clock,Duration>&
abs_time,
680 SignalAmWaitingForKick();
685 std::shared_ptr<Task>(_theTask)->Lock());
686 std::thread::id
threadId(std::this_thread::get_id());
702 std::shared_ptr<Task>(_theTask)->Logger().Log(
D2LOG_DRAMA2,
true,
703 "d2::TACT::WaitEventTOAT",
704 "Timeout waiting for signal/kick");
707 *
event = EntryCode::DramaAbortWaits;
712 *
event =
details.eventInfo.entryReason;
721 std::shared_ptr<Task>(_theTask)->Logger().Log(
723 "d2::TACT::WaitEventTOAt",
724 "Signal/Kick received");
748 void WaitForKick(
sds::IdPtr *
const arg=
nullptr) {
752 WaitForEvent(&
event, arg);
756 "TAction::WaitForKick - signal received instead of kick, application programming error. You may want WaitForEvent()");
785 template <
class Rep,
class Period>
786 bool WaitForKickTimeoutIn(
787 const std::chrono::duration<Rep,Period>&
rel_time,
798 "TAction::WaitForKickTimeoutIn - signal received instead of kick, application programming error. You may want WaitForEventTimeoutIn()");
823 bool WaitForKickTimeoutIn(
827 return WaitForKickTimeoutIn(std::chrono::seconds(
seconds), arg);
858 template <
class Clock,
class Duration>
859 bool WaitKickForTimeoutAt(
860 const std::chrono::time_point<Clock,Duration>&
abs_time,
872 "TAction::WaitForKickTimeoutAt - signal received instead of kick, application programming error. You may want WaitForEventTimeoutAt()");
923 _outArg = arg.Copy();
924 _outArgDelete =
true;
928 _outArg.ShallowCopy(arg);
929 _outArgDelete =
false;
938 void SetExitOnCompletion() {
945 void ClearExitOnCompletion() {
961 _outArg.ShallowCopy(arg,
true);
962 _outArgDelete =
true;
1022 WaitEventDetails *SetupWaitForKick() {
1029 SignalAmWaitingForKick();
1035 std::shared_ptr<Task>(_theTask)->Lock());
1036 std::thread::id
threadId(std::this_thread::get_id());
1060 bool DoWaitForKick(WaitEventDetails *
waitEvent,
1072 std::shared_ptr<Task>(_theTask)->Lock());
1086 std::shared_ptr<Task>(_theTask)->Logger().Log(
1088 "d2::TACT::DoWaitKick",
1096 if (
details.eventInfo.entryReason ==
1097 EntryCode::DramaAbortWaits)
1110 _ThreadMessage _obeyRescheduleObj;
1111 _ThreadKick _kickMessageObj;
1120 std::string _actionName;
1140 bool _exitTask =
false;
1145 bool _outArgDelete =
false;
1146 bool _outArgSet =
false;
1154 SignalDetailsQueue _signalQueue;
1160 WaitEventMapType _waitEventMap;
1175 void SignalDrama(ThreadSignalType
why);
1180 void SignalAmWaitingForKick();
1187 Request ProcessDrama2Signal();
1191 Request ProcessSubsidiaryMessage();
1201 void ClearWait(
bool complete)
override;
1206 WaitEventDetails *FindWaitEventDetails(std::thread::id)
1216 unsigned WakeUpWaitingThreads(
sds::IdPtr arg);
1222 void ActionThreadComplete();
1235 void OrphanOutstandingEvents();
1244 unsigned GetKickCount()
const {
1260 void ResetKickCount();
1276 std::string GetActionName()
const {
1296 class TActionViaFunctor :
public TAction {
1306 TActionViaFunctor(std::weak_ptr<Task> dramaTask,
1307 const ThreadActionFunction func) :
1308 TAction(dramaTask, 0), _func(func) { }
1317 _func(
this, obeyArg);
1344 class KickNotifier {
1346 bool _wasKicked =
false;
1347 bool _waiting =
false;
1348 bool _threadThrew =
false;
1349 TAction *_theAction =
nullptr;
1352 std::future<void> _threadFuture;
1357 std::condition_variable_any _threadReadyCond;
1358 WaitEventDetails *_waitEvent =
nullptr;
1385 KickNotifier(TAction *action);
1417 virtual bool Kicked(
const sds::Id & arg);
1425 Task::guardType DramaLock(std::shared_ptr<Task>(_theAction->GetTask())->Lock());
1433 bool ThreadThrewException()
const {
1434 return _threadThrew;
1441 virtual ~KickNotifier();
1445 KickNotifier& operator=(
const KickNotifier &rhs) =
delete;
1448 KickNotifier(
const KickNotifier &source) =
delete;
virtual void PutObeyHandler(MessageHandlerPtr obj)
Put a message handler object for the next Obey reschedule event.
Definition messagehandler.hh:598
MessageHandler & operator=(const MessageHandler &rhs)=delete
Copy operator deleted.
virtual std::shared_ptr< Task > GetTask() const
Returns a pointer to the task.
Definition messagehandler.hh:347
virtual void PutKickHandler(MessageHandlerPtr obj)
Put a message handler object for the next Kick event.
Definition messagehandler.hh:617
virtual void MessageUser(const std::string &text) const
Use DRAMA to send a message to the user.
MessageHandler()
Create a DRAMA action/message handler object.
Definition messagehandler.hh:174
virtual const sds::PrintObjectCR & SdsListToUser() const
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
void SendTrigger(const sds::Id &arg) const
Send a trigger message to the parent action.
void SetReturnArg(const sds::Id &arg, bool copy=true)
Set the argument to be sent as part of the action completion message.
Definition messagehandler.hh:524
A class which implements a DRAMA Message Handler.
Definition messagehandler.hh:138
A Class which provides access to DRAMA's message sending facilities.
Definition path.hh:689
Class used by Obey and Kick handlers to indicate rescheduling requirements.
Definition request.hh:78
Class used to arrange for notifications when the RunDrama exits.
Definition task.hh:382
std::unique_lock< mutexType > uniqueLockType
Defines the type of a unique_lock type using our mutex type.
Definition task.hh:463
std::recursive_timed_mutex mutexType
Defines the type of our mutex.
Definition task.hh:455
std::lock_guard< mutexType > guardType
Defines the type of a lock guard using our mutex type.
Definition task.hh:460
A class which implements a DRAMA task.
Definition task.hh:441
virtual Id Copy() const
Factory constructor method Id Copy constructor.
virtual void ShallowCopy(const Id &source)
Shallow copy from a const sds::Id which will outlive this object.
Definition sds.hh:2354
A C++ Interface to the handling SDS structures.
Definition sds.hh:428
Abstract class which is sub-classed to print SDS item listings.
Definition sds.hh:310
KickNotifier & operator=(const KickNotifier &rhs)=delete
Assignment operator - deleted.
KickNotifier(TAction *action)
KickNotifier constructor.
virtual bool Kicked(const sds::Id &arg)
Method is invoked when a Kick occurs.
bool ThreadThrewException() const
Return true if the thread has died after thrown an exception.
Definition threadaction.hh:1460
virtual ~KickNotifier()
Destructor.
bool WasKicked()
Indicates if the action was kicked.
Definition threadaction.hh:1451
KickNotifier(const KickNotifier &source)=delete
Copy constructor - deleted.
An object used to obtain notifications of kicks.
Definition threadaction.hh:1371
TActionViaFunctor(std::weak_ptr< Task > dramaTask, const ThreadActionFunction func)
Initialize object with the specified function, which meets the ThreadActionFunction prototype.
Definition threadaction.hh:1333
void ActionThread(const drama::sds::Id &obeyArg) override
Invoke function.
Definition threadaction.hh:1343
This class is used to creating TAction objects referring to functions.
Definition threadaction.hh:1323
bool WaitForKickTimeoutIn(const std::chrono::duration< Rep, Period > &rel_time, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or a duration has passed.
Definition threadaction.hh:813
void MessageUser(const std::string &text) const override
Use DRAMA to send a message to the user.
virtual const sds::PrintObjectCR & SdsListToUser() const override
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
Definition threadaction.hh:438
TAction(const TAction &source)=delete
Copy constructor - deleted.
void WaitForKick(sds::IdPtr *const arg=nullptr)
Block the current thread until a kick for the action is received.
Definition threadaction.hh:775
void SetTimeout(double newTimeout)
Set a new timeout.
Definition threadaction.hh:465
double GetTimeout() const
Return the current action timeout.
Definition threadaction.hh:454
bool WaitKickForTimeoutAt(const std::chrono::time_point< Clock, Duration > &abs_time, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or until a given time.
Definition threadaction.hh:886
TAction & operator=(const TAction &rhs)=delete
Assignment operator - deleted.
void PutKickHandler(MessageHandlerPtr obj) override final
We cannot change the kick handler in a threaded action, it does not make sense.
void SetupWaitEvent(DitsTransIdType tid, drama::Path *pathObj) override
Sets up a wait event for this thread.
virtual ~TAction()
TAction destructor.
bool WaitEventTimeoutAt(EntryCode *event, const std::chrono::time_point< Clock, Duration > &abs_time, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or until a given time.
Definition threadaction.hh:697
void SetExitOnCompletion()
Set exit on completion.
Definition threadaction.hh:965
unsigned GetKickCount() const
Returns the number of times this action has been kicked since started or the last reset of the count.
Definition threadaction.hh:1271
void AbortMessageWaits(StatusType status)
Tells any thread waiting for messages to abort the wait event.
bool WaitForEventTimeoutIn(EntryCode *event, unsigned seconds, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or a duration in seconds has ...
Definition threadaction.hh:654
bool DoWaitForKick(WaitEventDetails *waitEvent, sds::IdPtr *const arg=nullptr)
Wait for a kick event with the waitEvent details given.
Definition threadaction.hh:1087
void SetReturnArg(sds::Id *arg)
Set the argument to be sent as part of the action completion message.
Definition threadaction.hh:986
void ClearExitOnCompletion()
Clear exit on completion.
Definition threadaction.hh:972
void MessageUser(const char *format, Types... args)
Use DRAMA to send a message to the user - safe format.
Definition threadaction.hh:414
std::shared_ptr< Task > GetTask() const override
Get a reference to the DRAMA task we are part of.
Definition threadaction.hh:374
TAction(std::weak_ptr< Task > dramaTask, double timeout=0)
Create a DRAMA action/message handler object which runs a thread when the Obey message is received.
Definition threadaction.hh:244
void WaitForEvent(EntryCode *event, sds::IdPtr *const arg=nullptr)
Block the current thread until a kick/signal message for the action is received.
Definition threadaction.hh:484
std::string GetActionName() const
Return the name of the action this thread is implementing.
Definition threadaction.hh:1303
void SetReturnArg(const sds::Id &arg, bool copy=true)
Set the argument to be sent as part of the action completion message.
Definition threadaction.hh:946
WaitEventDetails * SetupWaitForKick()
Set up to wait for a kick message.
Definition threadaction.hh:1049
void SendTrigger(const sds::Id &arg) const
Send a trigger message to the parent action.
Dits___CurActType GetMessageContext() const override
Get the DRAMA Context associated with the action event.
bool WaitForKickTimeoutIn(unsigned seconds, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or a duration in seconds has passed.
Definition threadaction.hh:850
void PutObeyHandler(MessageHandlerPtr obj) override final
We cannot change the obey handler in a threaded action, it does not make sense as the action never re...
bool WaitForEventTimeoutIn(EntryCode *event, const std::chrono::duration< Rep, Period > &rel_time, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or a duration has passed.
Definition threadaction.hh:568
Task::mutexType & Lock() const override
Get a reference the DRAMA Task lock.
void ResetKickCount()
Returns a shared pointer to the last kick message argument.
A class which implements a DRAMA Action with runs a thread.
Definition threadaction.hh:199
virtual const sds::PrintObjectCR & SdsListToUser() const
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
This interface class must be implemented by classes which have threads waiting for messages.
Definition thread.hh:308
#define DramaTHROW(status_, message_)
Throw a Drama exception.
Definition exception.hh:93
#define D2LOG_DRAMA2
DRAMA 2 events - thread events etc.
Definition logger.hh:107
DRAMA 2 include file - Message Handler class definition.
std::shared_ptr< Id > IdPtr
A shared pointer for sds::Id items.
Definition sds.hh:3318
std::function< void(TAction *, const sds::Id &)> ThreadActionFunction
Type used for functions specified drama::Task::Add().
Definition task.hh:96
void CreateRunDramaTask()
Create and run a DRAMA task, with standard exception handling.
Definition task.hh:1322
EntryCode
Entry type code - indicates the type of a DRAMA event.
Definition entryinfo.hh:67
@ Kick
Action has been kicked.
@ Complete
Obey/Kick etc message completed.
std::shared_ptr< MessageHandler > MessageHandlerPtr
This type is used for passing MessageHandler object addresses around.
Definition messagehandler.hh:101
void SafePrintf(std::ostream &ostream, const char *str)
Safe formatted write to a stream.
The drama namespace contains all the classes, types etc of the DRAMA 2 implementation.
Definition drama.hh:93
DRAMA 2 include file - Code common to DRAMA 2 features supporting threading.