1#ifndef _DRAMA2_THREADACTION_INC
2#define _DRAMA2_THREADACTION_INC
47#include <condition_variable>
57 enum class ThreadSignalType {
66 struct DramaSignalDetails {
67 std::thread::id _fromThread;
68 ThreadSignalType _sigType;
70 DramaSignalDetails(ThreadSignalType
sigType) :
85 typedef std::deque<DramaSignalDetails> SignalDetailsQueue;
98 class _ThreadMessage :
public MessageHandler {
99 TAction *_threadHandlerObj;
118 TAction *_threadHandlerObj;
181 friend class _ThreadMessage;
182 friend class _ThreadKick;
185 std::future<void> _actionFuture;
192 std::weak_ptr<Task> _theTask;
219 _obeyRescheduleObj(
this),_kickMessageObj(
this) {
316 virtual void KickedWhenNotWaiting(sds::IdPtr
Arg);
339 virtual void SignaledWhenNotWaiting(sds::IdPtr
Arg);
348 return std::shared_ptr<Task>(_theTask);
358 return _theTask.expired();
396 template<
typename...
Types>
404 std::stringstream
sstrm;
437 double GetTimeout()
const {
473 SignalAmWaitingForKick();
483 std::shared_ptr<Task>(_theTask)->Lock());
485 std::thread::id
threadId(std::this_thread::get_id());
505 *
event =
details.eventInfo.entryReason;
513 std::shared_ptr<Task>(_theTask)->Logger().Log(
D2LOG_DRAMA2,
true,
514 "d2::TACT::WaitForEvent",
550 template <
class Rep,
class Period>
551 bool WaitForEventTimeoutIn(
553 const std::chrono::duration<Rep,Period>&
rel_time,
560 SignalAmWaitingForKick();
565 std::shared_ptr<Task>(_theTask)->Lock());
566 std::thread::id
threadId(std::this_thread::get_id());
582 std::shared_ptr<Task>(_theTask)->Logger().Log(
584 "d2::TACT::WaitEventTOIn",
585 "Timeout waiting for signal/kick");
591 *
event = EntryCode::DramaAbortWaits;
597 *
event =
details.eventInfo.entryReason;
605 std::shared_ptr<Task>(_theTask)->Logger().Log(
D2LOG_DRAMA2,
true,
606 "d2::TACT::WaitEventTOIn",
607 "kick/signal received");
637 bool WaitForEventTimeoutIn(
642 return WaitForEventTimeoutIn(
event, std::chrono::seconds(
seconds), arg);
679 template <
class Clock,
class Duration>
680 bool WaitEventTimeoutAt(
682 const std::chrono::time_point<Clock,Duration>&
abs_time,
690 SignalAmWaitingForKick();
695 std::shared_ptr<Task>(_theTask)->Lock());
696 std::thread::id
threadId(std::this_thread::get_id());
712 std::shared_ptr<Task>(_theTask)->Logger().Log(
D2LOG_DRAMA2,
true,
713 "d2::TACT::WaitEventTOAT",
714 "Timeout waiting for signal/kick");
717 *
event = EntryCode::DramaAbortWaits;
722 *
event =
details.eventInfo.entryReason;
731 std::shared_ptr<Task>(_theTask)->Logger().Log(
733 "d2::TACT::WaitEventTOAt",
734 "Signal/Kick received");
758 void WaitForKick(
sds::IdPtr *
const arg=
nullptr) {
762 WaitForEvent(&
event, arg);
766 "TAction::WaitForKick - signal received instead of kick, application programming error. You may want WaitForEvent()");
795 template <
class Rep,
class Period>
796 bool WaitForKickTimeoutIn(
797 const std::chrono::duration<Rep,Period>&
rel_time,
808 "TAction::WaitForKickTimeoutIn - signal received instead of kick, application programming error. You may want WaitForEventTimeoutIn()");
833 bool WaitForKickTimeoutIn(
837 return WaitForKickTimeoutIn(std::chrono::seconds(
seconds), arg);
868 template <
class Clock,
class Duration>
869 bool WaitKickForTimeoutAt(
870 const std::chrono::time_point<Clock,Duration>&
abs_time,
882 "TAction::WaitForKickTimeoutAt - signal received instead of kick, application programming error. You may want WaitForEventTimeoutAt()");
933 _outArg = arg.Copy();
934 _outArgDelete =
true;
938 _outArg.ShallowCopy(arg);
939 _outArgDelete =
false;
948 void SetExitOnCompletion() {
955 void ClearExitOnCompletion() {
956 _exitTask =
false; {
…}
971 _outArg.ShallowCopy(arg,
true);
972 _outArgDelete =
true;
1032 WaitEventDetails *SetupWaitForKick() {
1039 SignalAmWaitingForKick();
1045 std::shared_ptr<Task>(_theTask)->Lock());
1046 std::thread::id
threadId(std::this_thread::get_id());
1070 bool DoWaitForKick(WaitEventDetails *
waitEvent,
1082 std::shared_ptr<Task>(_theTask)->Lock());
1096 std::shared_ptr<Task>(_theTask)->Logger().Log(
1098 "d2::TACT::DoWaitKick",
1106 if (
details.eventInfo.entryReason ==
1107 EntryCode::DramaAbortWaits)
1120 _ThreadMessage _obeyRescheduleObj;
1121 _ThreadKick _kickMessageObj;
1130 std::string _actionName;
1150 bool _exitTask =
false;
1155 bool _outArgDelete =
false;
1156 bool _outArgSet =
false;
1164 SignalDetailsQueue _signalQueue;
1170 WaitEventMapType _waitEventMap;
1185 void SignalDrama(ThreadSignalType
why);
1190 void SignalAmWaitingForKick();
1197 Request ProcessDrama2Signal();
1201 Request ProcessSubsidiaryMessage();
1211 void ClearWait(
bool complete)
override;
1216 WaitEventDetails *FindWaitEventDetails(std::thread::id)
1226 unsigned WakeUpWaitingThreads(
sds::IdPtr arg);
1232 void ActionThreadComplete();
1245 void OrphanOutstandingEvents();
1254 unsigned GetKickCount()
const {
1270 void ResetKickCount();
1286 std::string GetActionName()
const {
1316 TAction(dramaTask, 0), _func(func) { }
1325 _func(
this, obeyArg);
1354 bool _wasKicked =
false;
1355 bool _waiting =
false;
1356 bool _threadThrew =
false;
1357 TAction *_theAction =
nullptr;
1360 std::future<void> _threadFuture;
1365 std::condition_variable_any _threadReadyCond;
1366 WaitEventDetails *_waitEvent =
nullptr;
1425 virtual bool Kicked(
const sds::Id & arg);
1441 bool ThreadThrewException()
const {
1442 return _threadThrew;
1449 virtual ~KickNotifier();
1453 KickNotifier& operator=(
const KickNotifier &rhs) =
delete;
1456 KickNotifier(
const KickNotifier &source) =
delete;
1469 virtual int RunDramaHasExited()
override;
1489 bool JoinThreads(std::chrono::steady_clock::time_point until)
override;
virtual void PutObeyHandler(MessageHandlerPtr obj)
Put a message handler object for the next Obey reschedule event.
Definition messagehandler.hh:598
MessageHandler & operator=(const MessageHandler &rhs)=delete
Copy operator deleted.
virtual std::shared_ptr< Task > GetTask() const
Returns a pointer to the task.
Definition messagehandler.hh:347
virtual void PutKickHandler(MessageHandlerPtr obj)
Put a message handler object for the next Kick event.
Definition messagehandler.hh:617
virtual void MessageUser(const std::string &text) const
Use DRAMA to send a message to the user.
MessageHandler()
Create a DRAMA action/message handler object.
Definition messagehandler.hh:174
virtual const sds::PrintObjectCR & SdsListToUser() const
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
void SendTrigger(const sds::Id &arg) const
Send a trigger message to the parent action.
void SetReturnArg(const sds::Id &arg, bool copy=true)
Set the argument to be sent as part of the action completion message.
Definition messagehandler.hh:524
A class which implements a DRAMA Message Handler.
Definition messagehandler.hh:138
A Class which provides access to DRAMA's message sending facilities.
Definition path.hh:689
Class used by Obey and Kick handlers to indicate rescheduling requirements.
Definition request.hh:78
Class used to arrange for notifications when the RunDrama exits.
Definition task.hh:387
std::unique_lock< mutexType > uniqueLockType
Defines the type of a unique_lock type using our mutex type.
Definition task.hh:468
std::recursive_timed_mutex mutexType
Defines the type of our mutex.
Definition task.hh:460
std::lock_guard< mutexType > guardType
Defines the type of a lock guard using our mutex type.
Definition task.hh:465
A class which implements a DRAMA task.
Definition task.hh:446
virtual Id Copy() const
Factory constructor method Id Copy constructor.
virtual void ShallowCopy(const Id &source)
Shallow copy from a const sds::Id which will outlive this object.
Definition sds.hh:2392
A C++ Interface to the handling SDS structures.
Definition sds.hh:428
Abstract class which is sub-classed to print SDS item listings.
Definition sds.hh:310
KickNotifier & operator=(const KickNotifier &rhs)=delete
Assignment operator - deleted.
KickNotifier(TAction *action)
KickNotifier constructor.
virtual bool Kicked(const sds::Id &arg)
Method is invoked when a Kick occurs.
bool ThreadThrewException() const
Return true if the thread has died after thrown an exception.
Definition threadaction.hh:1468
bool JoinThreads(std::chrono::steady_clock::time_point until) override
Invoked when the drama::task::RunDrama() loop exits.
virtual int RunDramaHasExited() override
Invoked when the drama::task::RunDrama() method exits.
virtual ~KickNotifier()
Destructor.
bool WasKicked()
Indicates if the action was kicked.
Definition threadaction.hh:1459
KickNotifier(const KickNotifier &source)=delete
Copy constructor - deleted.
An object used to obtain notifications of kicks.
Definition threadaction.hh:1379
TActionViaFunctor(std::weak_ptr< Task > dramaTask, const ThreadActionFunction func)
Initialize object with the specified function, which meets the ThreadActionFunction prototype.
Definition threadaction.hh:1341
void ActionThread(const drama::sds::Id &obeyArg) override
Invoke function.
Definition threadaction.hh:1351
This class is used to creating TAction objects referring to functions.
Definition threadaction.hh:1331
bool WaitForKickTimeoutIn(const std::chrono::duration< Rep, Period > &rel_time, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or a duration has passed.
Definition threadaction.hh:823
void MessageUser(const std::string &text) const override
Use DRAMA to send a message to the user.
virtual const sds::PrintObjectCR & SdsListToUser() const override
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
Definition threadaction.hh:448
TAction(const TAction &source)=delete
Copy constructor - deleted.
void WaitForKick(sds::IdPtr *const arg=nullptr)
Block the current thread until a kick for the action is received.
Definition threadaction.hh:785
void SetTimeout(double newTimeout)
Set a new timeout.
Definition threadaction.hh:475
double GetTimeout() const
Return the current action timeout.
Definition threadaction.hh:464
bool WaitKickForTimeoutAt(const std::chrono::time_point< Clock, Duration > &abs_time, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or until a given time.
Definition threadaction.hh:896
TAction & operator=(const TAction &rhs)=delete
Assignment operator - deleted.
void PutKickHandler(MessageHandlerPtr obj) override final
We cannot change the kick handler in a threaded action, it does not make sense.
void SetupWaitEvent(DitsTransIdType tid, drama::Path *pathObj) override
Sets up a wait event for this thread.
virtual ~TAction()
TAction destructor.
bool WaitEventTimeoutAt(EntryCode *event, const std::chrono::time_point< Clock, Duration > &abs_time, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or until a given time.
Definition threadaction.hh:707
void SetExitOnCompletion()
Set exit on completion.
Definition threadaction.hh:975
unsigned GetKickCount() const
Returns the number of times this action has been kicked since started or the last reset of the count.
Definition threadaction.hh:1281
void AbortMessageWaits(StatusType status)
Tells any thread waiting for messages to abort the wait event.
bool WaitForEventTimeoutIn(EntryCode *event, unsigned seconds, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or a duration in seconds has ...
Definition threadaction.hh:664
bool DoWaitForKick(WaitEventDetails *waitEvent, sds::IdPtr *const arg=nullptr)
Wait for a kick event with the waitEvent details given.
Definition threadaction.hh:1097
void SetReturnArg(sds::Id *arg)
Set the argument to be sent as part of the action completion message.
Definition threadaction.hh:996
void ClearExitOnCompletion()
Clear exit on completion.
Definition threadaction.hh:982
void MessageUser(const char *format, Types... args)
Use DRAMA to send a message to the user - safe format.
Definition threadaction.hh:424
bool HasTaskExpired() const noexcept
Indicate if the underlying DRAMA task weak pointer has expired.
Definition threadaction.hh:384
std::shared_ptr< Task > GetTask() const override
Get a reference to the DRAMA task we are part of.
Definition threadaction.hh:374
TAction(std::weak_ptr< Task > dramaTask, double timeout=0)
Create a DRAMA action/message handler object which runs a thread when the Obey message is received.
Definition threadaction.hh:244
void WaitForEvent(EntryCode *event, sds::IdPtr *const arg=nullptr)
Block the current thread until a kick/signal message for the action is received.
Definition threadaction.hh:494
std::string GetActionName() const
Return the name of the action this thread is implementing.
Definition threadaction.hh:1313
void SetReturnArg(const sds::Id &arg, bool copy=true)
Set the argument to be sent as part of the action completion message.
Definition threadaction.hh:956
WaitEventDetails * SetupWaitForKick()
Set up to wait for a kick message.
Definition threadaction.hh:1059
void SendTrigger(const sds::Id &arg) const
Send a trigger message to the parent action.
Dits___CurActType GetMessageContext() const override
Get the DRAMA Context associated with the action event.
bool WaitForKickTimeoutIn(unsigned seconds, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or a duration in seconds has passed.
Definition threadaction.hh:860
void PutObeyHandler(MessageHandlerPtr obj) override final
We cannot change the obey handler in a threaded action, it does not make sense as the action never re...
bool WaitForEventTimeoutIn(EntryCode *event, const std::chrono::duration< Rep, Period > &rel_time, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or a duration has passed.
Definition threadaction.hh:578
Task::mutexType & Lock() const override
Get a reference the DRAMA Task lock.
void ResetKickCount()
Returns a shared pointer to the last kick message argument.
A class which implements a DRAMA Action with runs a thread.
Definition threadaction.hh:199
virtual const sds::PrintObjectCR & SdsListToUser() const
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
This interface class must be implemented by classes which have threads waiting for messages.
Definition thread.hh:308
#define DramaTHROW(status_, message_)
Throw a Drama exception.
Definition exception.hh:87
#define D2LOG_DRAMA2
DRAMA 2 events - thread events etc.
Definition logger.hh:107
DRAMA 2 include file - Message Handler class definition.
std::shared_ptr< Id > IdPtr
A shared pointer for sds::Id items.
Definition sds.hh:3484
std::function< void(TAction *, const sds::Id &)> ThreadActionFunction
Type used for functions specified drama::Task::Add().
Definition task.hh:96
void CreateRunDramaTask()
Create and run a DRAMA task, with standard exception handling.
Definition task.hh:1327
EntryCode
Entry type code - indicates the type of a DRAMA event.
Definition entryinfo.hh:67
@ Kick
Action has been kicked.
@ Complete
Obey/Kick etc message completed.
std::shared_ptr< MessageHandler > MessageHandlerPtr
This type is used for passing MessageHandler object addresses around.
Definition messagehandler.hh:101
void SafePrintf(std::ostream &ostream, const char *str)
Safe formatted write to a stream.
The drama namespace contains all the classes, types etc of the DRAMA 2 implementation.
Definition drama.hh:93
DRAMA 2 include file - Code common to DRAMA 2 features supporting threading.