AAO DRAMA/DRAMA2 C++ Interface
DRAMA C++11 and later interface
threadaction.hh
Go to the documentation of this file.
1#ifndef _DRAMA2_THREADACTION_INC
2#define _DRAMA2_THREADACTION_INC
29/*
30 * History:
31 07-Jan-2014 - TJF - Original version
32 30-Sep-2016 - TJF - Start of history after development.
33 SignalWaitingThreads() returns number of threads
34 signaled.
35
36 * The above ID is for Doxygen, this one has the format ACMM is looking for.
37 * "@(#) $Id$"
38 */
40#include "drama/thread.hh"
41#include <thread>
42#include <queue>
43#include <deque>
44#include <map>
45#include <chrono>
46#include <future>
47#include <condition_variable>
48#include <functional>
49namespace drama {
50
51 namespace thread {
52
53 /*
54 * This enum is used for indicating what type of event a signal
55 * from the thread to DRAMA is about.
56 */
57 enum class ThreadSignalType {
58 Complete, // Main thread has completed.
59 WaitSignal,// The thread is waiting for a signal.
60 WaitKick // The thread is interested in kicks
61 };
62
63 /*
64 * Details on a signal from a thread.
65 */
66 struct DramaSignalDetails {
67 std::thread::id _fromThread;
68 ThreadSignalType _sigType;
69
70 DramaSignalDetails(ThreadSignalType sigType) :
71 _fromThread(std::this_thread::get_id()),
72 _sigType(sigType) {}
73
74
75 };
76 /*
77 * A queue of signal details. We have a queue of these as
78 * we support multiple threads as part of implementing the
79 * action, so we could have multiple signals outstanding.
80 *
81 * A std::queue would be a better representation of this
82 * then "std::deque", but we do want to clear the queue and
83 * std::queue does not support that.
84 */
85 typedef std::deque<DramaSignalDetails> SignalDetailsQueue;
86
87
88
89 class TAction;
90
91 /* Used internally by TAction, this class is used as the
92 * reschedule handler for DRAMA on Obey reschedule messages to
93 * threaded actions.
94 *
95 * TAction::_obeyRescheduleObj member is of this type.
96 *
97 */
98 class _ThreadMessage : public MessageHandler {
99 TAction *_threadHandlerObj;
100 public:
101 _ThreadMessage(TAction *threadHandler) :
102 _threadHandlerObj(threadHandler) {}
103
104 private:
105 /*
106 * Made private, parent class has it as protected.
107 * But I don't want users to invoke it
108 */
109 Request MessageReceived() override final;
110 };
111 /* Used internally by TAction, this class is used as the
112 * reschedule handler for DRAMA on Kick messages to
113 * threaded actions.
114 *
115 * TAction::_kickRescheduleObj member is of this type.
116 */
117 class _ThreadKick : public MessageHandler {
118 TAction *_threadHandlerObj;
119 public:
120 _ThreadKick(TAction *threadHandler) :
121 _threadHandlerObj(threadHandler) {}
122
123 private:
124 /*
125 * Made private, parent class has it as protected.
126 * But I don't want users to invoke it
127 */
128 Request MessageReceived() override final;
129 };
130
131
137 using ThreadActionFunction = std::function<void (TAction *, const sds::Id &)> ;
138
170 // TMessHandler and RunDramaExitNotifier are abstract only.
171 class TAction : public MessageHandler,
172 public TMessHandler, public RunDramaExitNotifier {
173
174 // We need a function to implement the thread that
175 // is not part of TAction, but which can call
176 // ActionThread. This must be in the drama namespace,
177 // since that is how we are declaring it now.
178
179 //friend void MyThread(TAction *handler, const sds::Id &obeyArg);
180
181 friend class _ThreadMessage;
182 friend class _ThreadKick;
183
184 private:
185 std::future<void> _actionFuture; // Future of first thread in action.
186
187 /*
188 * Pointer to the DRAMA task we are part of.
189 * The pointer is const - we never change it after the constructor.
190 * The task pointed to is NOT const - since we can operate on it.
191 */
192 std::weak_ptr<Task> _theTask;
193 double _timeout; // Thread wait timeout, seconds.
194
195
196 void RunActionThread(const sds::IdPtr obeyArg);
197
199 public:
200
217 TAction(std::weak_ptr<Task> dramaTask, double timeout=0) :
218 _theTask(dramaTask), _timeout(timeout),
219 _obeyRescheduleObj(this),_kickMessageObj(this) {
220
221
222 }
223
228 virtual ~TAction();
229
230
233 TAction& operator=(const TAction &rhs) = delete;
236 TAction(const TAction &source) = delete;
237
238 private:
239 /*
240 * These are make private, as they are not meant to be
241 * called outside the object, just implemented outside
242 */
243
244 /*
245 * The method invoked to handle the obey message. This
246 * is an implementation of MessageHandler::MessageReceived().
247 */
248 virtual Request MessageReceived() final;
249
250
259 virtual void ActionThread(const drama::sds::Id &obeyArg) = 0;
261
273 virtual int RunDramaHasExited() override;
292 virtual bool JoinThreads(std::chrono::steady_clock::time_point until) override;
293
294
316 virtual void KickedWhenNotWaiting(sds::IdPtr Arg);
317
339 virtual void SignaledWhenNotWaiting(sds::IdPtr Arg);
340
341
342 public:
347 std::shared_ptr<Task> GetTask() const override {
348 return std::shared_ptr<Task>(_theTask);
349 }
350
351
362 void MessageUser(const std::string &text) const override;
363
364
386 template<typename... Types>
387 void MessageUser(const char *format, Types... args) {
388
389 /*
390 * Our approach is to write the output to a string, via
391 * SafePrintf(), then output that in one operation. Since
392 * std::ostream devices will then do the output in one operation.
393 */
394 std::stringstream sstrm;
396 MessageUser(sstrm.str());
397
398
399 }
400
407 /* SdsListToUser() is in both TMessHandler and MessageHandler,
408 two of our base classes. So we must sort out the
409 confusion here
410 */
411 virtual const sds::PrintObjectCR &SdsListToUser() const override {
413 }
415
416
421 void SendTrigger(const sds::Id &arg) const;
422
427 double GetTimeout() const {
428 return _timeout;
429 }
438 void SetTimeout(double newTimeout) {
439 _timeout = newTimeout;
440 }
441
457 void WaitForEvent(EntryCode *event, sds::IdPtr * const arg=nullptr) {
458 /*
459 * Signal DRAMA that we are waiting for an event.
460 * This will put an entry in _waitEventMap for this
461 * thread.
462 */
463 SignalAmWaitingForKick();
464
465 /*
466 * Access the entry in _waitEventMap.
467 *
468 * Note - this first call will throw if the weak pointer _theTask
469 * does not have a valid object.
470 *
471 */
473 std::shared_ptr<Task>(_theTask)->Lock());
474
475 std::thread::id threadId(std::this_thread::get_id());
476 auto mapItem = _waitEventMap.find(threadId);
477 assert (mapItem != _waitEventMap.end());
478 WaitEventDetails *waitEvent = &(mapItem->second);
479
480
481 /*
482 * Wait for the condition
483 *
484 * We use a Lambda function to access the DataAvail() method
485 * in waitEvent.
486 *
487 */
488 waitEvent->_condition->wait(
489 DramaLock,
490 [waitEvent]{return waitEvent->DataAvail();});
491
492
493 WaitEventData details = waitEvent->_dataQueue.front();
494
495 *event = details.eventInfo.entryReason;
496 /*
497 * If the user wants the argument, return it.
498 */
499 if (arg)
500 {
501 *arg = details.arg;
502 }
503 std::shared_ptr<Task>(_theTask)->Logger().Log(D2LOG_DRAMA2, true,
504 "d2::TACT::WaitForEvent",
505 "Event received");
506
507
508 waitEvent->_dataQueue.pop();
509 // Should we complain here (throw execption?) if queue not empty?
510 ClearWait(false);
511 }
540 template <class Rep, class Period>
541 bool WaitForEventTimeoutIn(
543 const std::chrono::duration<Rep,Period>& rel_time,
544 sds::IdPtr * const arg=0) {
545 /*
546 * Signal DRAMA that we are waiting for a kick.
547 * This will put an entry in _waitEventMap for this
548 * thread.
549 */
550 SignalAmWaitingForKick();
551 /*
552 * Access the entry in _waitEventMap.
553 */
555 std::shared_ptr<Task>(_theTask)->Lock());
556 std::thread::id threadId(std::this_thread::get_id());
557 auto mapItem = _waitEventMap.find(threadId);
558 assert (mapItem != _waitEventMap.end());
559 WaitEventDetails *waitEvent = &(mapItem->second);
560 /*
561 * Wait for the condition
562 *
563 * We use a Lambda function to access the DataAvail() method
564 * in waitEvent.
565 *
566 */
567 waitEvent->_condition->wait_for(
569 [waitEvent]{return waitEvent->DataAvail();});
570 if (!waitEvent->DataAvail())
571 {
572 std::shared_ptr<Task>(_theTask)->Logger().Log(
573 D2LOG_DRAMA2, true,
574 "d2::TACT::WaitEventTOIn",
575 "Timeout waiting for signal/kick");
576
577
578 // Timeout.
579 //fprintf(stderr,"TAction::WaitKickFor() - timeout\n");
580 ClearWait(false);
581 *event = EntryCode::DramaAbortWaits;
582 return false;
583 }
584 //fprintf(stderr,"TAction::WaitEventFor() - event/signal\n");
585
586 WaitEventData details = waitEvent->_dataQueue.front();
587 *event = details.eventInfo.entryReason;
588 /*
589 * If the user wants the argument, return it.
590 */
591 if (arg)
592 {
593 *arg = details.arg;
594 }
595 std::shared_ptr<Task>(_theTask)->Logger().Log(D2LOG_DRAMA2, true,
596 "d2::TACT::WaitEventTOIn",
597 "kick/signal received");
598
599
600 waitEvent->_dataQueue.pop();
601 // Should complain here (throw execption?) if queue not empty
602 ClearWait(false);
603
604 return true;
605
606 }
627 bool WaitForEventTimeoutIn(
629 unsigned seconds,
630 sds::IdPtr * const arg=0) {
631
632 return WaitForEventTimeoutIn(event, std::chrono::seconds(seconds), arg);
633
634 }
635
636
637
638
639
669 template <class Clock, class Duration>
670 bool WaitEventTimeoutAt(
672 const std::chrono::time_point<Clock,Duration>& abs_time,
673 sds::IdPtr * const arg=0 ) {
674
675 /*
676 * Signal DRAMA that we are waiting for a kick.
677 * This will put an entry in _waitEventMap for this
678 * thread.
679 */
680 SignalAmWaitingForKick();
681 /*
682 * Access the entry in _waitEventMap.
683 */
685 std::shared_ptr<Task>(_theTask)->Lock());
686 std::thread::id threadId(std::this_thread::get_id());
687 auto mapItem = _waitEventMap.find(threadId);
688 assert (mapItem != _waitEventMap.end());
689 WaitEventDetails *waitEvent = &(mapItem->second);
690 /*
691 * Wait for the condition
692 *
693 * We use a Lambda function to access the DataAvail() method
694 * in waitEvent.
695 */
696 waitEvent->_condition->wait_until(
698 [waitEvent]{return waitEvent->DataAvail();});
699 if (!waitEvent->DataAvail())
700 {
701 // Timeout.
702 std::shared_ptr<Task>(_theTask)->Logger().Log(D2LOG_DRAMA2, true,
703 "d2::TACT::WaitEventTOAT",
704 "Timeout waiting for signal/kick");
705
706 ClearWait(false);
707 *event = EntryCode::DramaAbortWaits;
708 return false;
709 }
710 //fprintf(stderr,"TAction::WaitKickFor() - kicked\n");
711 WaitEventData details = waitEvent->_dataQueue.front();
712 *event = details.eventInfo.entryReason;
713 /*
714 * If the user wants the argument, return it.
715 */
716 if (arg)
717 {
718 *arg = details.arg;
719 }
720 waitEvent->_dataQueue.pop();
721 std::shared_ptr<Task>(_theTask)->Logger().Log(
722 D2LOG_DRAMA2, true,
723 "d2::TACT::WaitEventTOAt",
724 "Signal/Kick received");
725
726
727
728 // Should complain here (throw execption?) if queue not empty
729 ClearWait(false);
730
731 return true;
732
733 }
748 void WaitForKick(sds::IdPtr * const arg=nullptr) {
749
750 // Can be implemented by WaitForEvent(), with a check on the event.
752 WaitForEvent(&event, arg);
753 if (event != EntryCode::Kick)
754 {
756 "TAction::WaitForKick - signal received instead of kick, application programming error. You may want WaitForEvent()");
757 }
758 }
785 template <class Rep, class Period>
786 bool WaitForKickTimeoutIn(
787 const std::chrono::duration<Rep,Period>& rel_time,
788 sds::IdPtr * const arg=0) {
789
790 // Can be implemented by WaitForEventTimeoutIn(), with a check on the event.
792 bool kicked = WaitForEventTimeoutIn(&event, rel_time, arg);
793 if (!kicked)
794 return false;
795 if (event != EntryCode::Kick)
796 {
798 "TAction::WaitForKickTimeoutIn - signal received instead of kick, application programming error. You may want WaitForEventTimeoutIn()");
799 }
800 return true;
801
802
803 }
804
823 bool WaitForKickTimeoutIn(
824 unsigned seconds,
825 sds::IdPtr * const arg=0) {
826
827 return WaitForKickTimeoutIn(std::chrono::seconds(seconds), arg);
828
829 }
830
858 template <class Clock, class Duration>
859 bool WaitKickForTimeoutAt(
860 const std::chrono::time_point<Clock,Duration>& abs_time,
861 sds::IdPtr * const arg=0 ) {
862
863
864 // Can be implemented by WaitForEventTimeoutAt(), with a check on the event.
867 if (!kicked)
868 return false;
869 if (event != EntryCode::Kick)
870 {
872 "TAction::WaitForKickTimeoutAt - signal received instead of kick, application programming error. You may want WaitForEventTimeoutAt()");
873 }
874 return true;
875
876
877
878 }
879
903 void SetupWaitEvent(DitsTransIdType tid, drama::Path *pathObj) override;
904
919 void SetReturnArg(const sds::Id &arg, bool copy=true) {
920
921 if (copy)
922 {
923 _outArg = arg.Copy();
924 _outArgDelete = true;
925 }
926 else
927 {
928 _outArg.ShallowCopy(arg);
929 _outArgDelete = false;
931
932 _outArgSet = true;
933 }
938 void SetExitOnCompletion() {
939 _exitTask = true;
940 }
945 void ClearExitOnCompletion() {
946 _exitTask = false;
947 }
959 void SetReturnArg(sds::Id *arg) {
960
961 _outArg.ShallowCopy(arg, true);
962 _outArgDelete = true;
963 _outArgSet = true;
964
966
974 void PutObeyHandler(MessageHandlerPtr obj) override final;
975
989 void PutKickHandler(MessageHandlerPtr obj) override final;
990
991
998 Task::mutexType & Lock() const override;
999
1007 Dits___CurActType GetMessageContext() const override;
1008
1009
1010
1011
1012
1022 WaitEventDetails *SetupWaitForKick() {
1023
1024 /*
1025 * Signal DRAMA that we are waiting for a kick.
1026 * This will put an entry in _waitEventMap for this
1027 * thread.
1028 */
1029 SignalAmWaitingForKick();
1030
1031 /*
1032 * Access the entry in _waitEventMap.
1033 */
1035 std::shared_ptr<Task>(_theTask)->Lock());
1036 std::thread::id threadId(std::this_thread::get_id());
1037 auto mapItem = _waitEventMap.find(threadId);
1038 assert (mapItem != _waitEventMap.end());
1039 return &(mapItem->second);
1040
1041
1042 }
1060 bool DoWaitForKick(WaitEventDetails *waitEvent,
1061 sds::IdPtr * const arg=nullptr) {
1062
1064 /*
1065 * Wait for the condition
1066 *
1067 * We use a Lambda function to access the DataAvail() method
1068 * in waitEvent.
1069 *
1070 */
1072 std::shared_ptr<Task>(_theTask)->Lock());
1073 waitEvent->_condition->wait(
1074 DramaLock,
1075 [waitEvent]{return waitEvent->DataAvail();});
1076
1077
1078 WaitEventData details = waitEvent->_dataQueue.front();
1079 /*
1080 * If the user wants the argument, return it.
1081 */
1082 if (arg)
1083 {
1084 *arg = details.arg;
1085 }
1086 std::shared_ptr<Task>(_theTask)->Logger().Log(
1088 "d2::TACT::DoWaitKick",
1089 "Kick received");
1090
1091
1092 waitEvent->_dataQueue.pop();
1093 // Should we complain here (throw execption?) if queue not empty?
1094 ClearWait(false);
1095
1096 if (details.eventInfo.entryReason ==
1097 EntryCode::DramaAbortWaits)
1098 return false;
1099 return true;
1100 }
1101
1102 private:
1103 /*
1104 * Objects used as part of handling obey reschedule and
1105 * kick messages.
1106 *
1107 * They simply result in the ObeyReschedule() and KickMessage()
1108 * methods below being invoked.
1109 */
1110 _ThreadMessage _obeyRescheduleObj;
1111 _ThreadKick _kickMessageObj;
1112
1113 /*
1114 * The details on the DITS action are recorded here before
1115 * we start the thread.
1116 */
1117 int _actionPtr;
1118 Dits___CurActType _actionDetails;
1119
1120 std::string _actionName;
1121
1122
1123 /*
1124 * Count of kick messages received.
1125 */
1126 unsigned _numKicks;
1127 /*
1128 * Argument to the last kick, if any.
1129 */
1130 //sds::IdPtr _lastKickArg;
1131
1132 /*
1133 * Argument to the obey, if any.
1134 */
1135 sds::IdPtr _obeyArg;
1136
1137 /*
1138 * Will task exit on action completion.
1139 */
1140 bool _exitTask = false;
1141 /*
1142 * Output argument details.
1143 */
1144 sds::Id _outArg; // Actual argument.
1145 bool _outArgDelete = false; // Should it be deleted by DRAMA?
1146 bool _outArgSet = false; // Has it been set.
1147
1148 /*
1149 * A double ended queue of information about signals sent from
1150 * the threads to DRAMA.
1151 *
1152 * We add signals to the front, remove them from the back.
1153 */
1154 SignalDetailsQueue _signalQueue;
1155
1156 /*
1157 * A map which contains all the events we are waiting on.
1158 * (one entry in the map per subsidiary thread).
1159 */
1160 WaitEventMapType _waitEventMap;
1161
1162
1163 /*
1164 * Method invoked to handle obey reschedule events.
1165 */
1166 Request ObeyReschedule();
1167 /*
1168 * Method invoked to handle kick messages
1169 */
1170 Request KickMessage();
1171
1172 /*
1173 * Method used to signal the action code from threads.
1174 */
1175 void SignalDrama(ThreadSignalType why);
1176
1177 /*
1178 * Signal that we are waiting for a kick.
1179 */
1180 void SignalAmWaitingForKick();
1181 /* Process a DRAMA Signal event message.
1182 */
1183 Request ProcessSignal();
1184 /* Process a DRAMA 2 Signal event message (used for communications
1185 * between threads and the Main DRAMA code)
1186 */
1187 Request ProcessDrama2Signal();
1188 /*
1189 * Process a Subsidiary action message.
1190 */
1191 Request ProcessSubsidiaryMessage();
1192
1193 /*
1194 * Clear the thread' waiting flag in _waitEventMap.
1195 *
1196 * @param complete If true, then it is known the transaction is
1197 * actually complete. Set false if know to be not
1198 * complete or unknown. Determines if we orphan
1199 * the transaction.
1200 */
1201 void ClearWait(bool complete) override;
1202
1203 /*
1204 * Return a pointer to the event details for the specified thread.
1205 */
1206 WaitEventDetails *FindWaitEventDetails(std::thread::id)
1207 override;
1208
1209
1210 /*
1211 * Wake up threads waiting on messages. Used to wake up waiting
1212 * action threads after a kick or signal is received.
1213 *
1214 * @return The count of threads found and signaled.
1215 */
1216 unsigned WakeUpWaitingThreads(sds::IdPtr arg);
1217
1218 /*
1219 * Indicate and action thread is complete. Called at the end
1220 * of the thread.
1221 */
1222 void ActionThreadComplete();
1223
1224 /*
1225 * Invoked to signal any threads waiting for messages to
1226 * cancel their wait events. Returns the number of signals
1227 * sent.
1228 */
1229 unsigned SignalWaitingThreads(EntryCode, StatusType);
1230
1231 /*
1232 * Invoke when the action is ending - will make any outstanding
1233 * events into orphans.
1234 */
1235 void OrphanOutstandingEvents();
1236
1237 public:
1238 void SignalWaitingEvent(WaitEventDetails *, EntryCode, StatusType);
1244 unsigned GetKickCount() const {
1245 return _numKicks;
1246 }
1252#if 0
1253 sds::IdPtr GetLastKickArg() const {
1254 return _lastKickArg;
1255 }
1256#endif
1260 void ResetKickCount();
1261
1267 void AbortMessageWaits(StatusType status);
1268
1269
1276 std::string GetActionName() const {
1277 return _actionName;
1278 }
1279
1280 }; // class TAction
1281
1282
1296 class TActionViaFunctor : public TAction {
1297 private:
1298 const ThreadActionFunction _func; // The function.
1299 public:
1306 TActionViaFunctor(std::weak_ptr<Task> dramaTask,
1307 const ThreadActionFunction func) :
1308 TAction(dramaTask, 0), _func(func) { }
1309
1316 void ActionThread(const drama::sds::Id &obeyArg) override {
1317 _func(this, obeyArg);
1318 }
1319 };
1320
1321
1344 class KickNotifier {
1345 private:
1346 bool _wasKicked = false; // Has action been kicked.
1347 bool _waiting = false; // Is thread waiting for kick.
1348 bool _threadThrew = false; // Has the thread died by throwing?
1349 TAction *_theAction = nullptr; // Action we are part of.
1350
1351
1352 std::future<void> _threadFuture; // Future used for WaitThread
1353 // The condition variable is used to ensure the constructor
1354 // does not return until the child thread is running and
1355 // has initialised itself, indicated by _waitEvent being
1356 // valid (not a nullptr).
1357 std::condition_variable_any _threadReadyCond;
1358 WaitEventDetails *_waitEvent = nullptr;
1359
1360 void WaitThread(); // Entry point for child thread.
1361 public:
1385 KickNotifier(TAction *action);
1386
1417 virtual bool Kicked(const sds::Id & arg);
1418
1419
1424 bool WasKicked() {
1425 Task::guardType DramaLock(std::shared_ptr<Task>(_theAction->GetTask())->Lock());
1426 return _wasKicked;
1427 }
1428
1433 bool ThreadThrewException() const {
1434 return _threadThrew;
1435 }
1436
1441 virtual ~KickNotifier();
1442
1445 KickNotifier& operator=(const KickNotifier &rhs) = delete;
1448 KickNotifier(const KickNotifier &source) = delete;
1449
1450
1451 }; // class KickNotifier
1452
1453 } // namespace thread
1454} // namespace drama
1455
1456#endif
virtual void PutObeyHandler(MessageHandlerPtr obj)
Put a message handler object for the next Obey reschedule event.
Definition messagehandler.hh:598
MessageHandler & operator=(const MessageHandler &rhs)=delete
Copy operator deleted.
virtual std::shared_ptr< Task > GetTask() const
Returns a pointer to the task.
Definition messagehandler.hh:347
virtual void PutKickHandler(MessageHandlerPtr obj)
Put a message handler object for the next Kick event.
Definition messagehandler.hh:617
virtual void MessageUser(const std::string &text) const
Use DRAMA to send a message to the user.
MessageHandler()
Create a DRAMA action/message handler object.
Definition messagehandler.hh:174
virtual const sds::PrintObjectCR & SdsListToUser() const
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
void SendTrigger(const sds::Id &arg) const
Send a trigger message to the parent action.
void SetReturnArg(const sds::Id &arg, bool copy=true)
Set the argument to be sent as part of the action completion message.
Definition messagehandler.hh:524
A class which implements a DRAMA Message Handler.
Definition messagehandler.hh:138
A Class which provides access to DRAMA's message sending facilities.
Definition path.hh:689
Class used by Obey and Kick handlers to indicate rescheduling requirements.
Definition request.hh:78
Class used to arrange for notifications when the RunDrama exits.
Definition task.hh:382
std::unique_lock< mutexType > uniqueLockType
Defines the type of a unique_lock type using our mutex type.
Definition task.hh:463
std::recursive_timed_mutex mutexType
Defines the type of our mutex.
Definition task.hh:455
std::lock_guard< mutexType > guardType
Defines the type of a lock guard using our mutex type.
Definition task.hh:460
A class which implements a DRAMA task.
Definition task.hh:441
virtual Id Copy() const
Factory constructor method Id Copy constructor.
virtual void ShallowCopy(const Id &source)
Shallow copy from a const sds::Id which will outlive this object.
Definition sds.hh:2354
A C++ Interface to the handling SDS structures.
Definition sds.hh:428
Abstract class which is sub-classed to print SDS item listings.
Definition sds.hh:310
KickNotifier & operator=(const KickNotifier &rhs)=delete
Assignment operator - deleted.
KickNotifier(TAction *action)
KickNotifier constructor.
virtual bool Kicked(const sds::Id &arg)
Method is invoked when a Kick occurs.
bool ThreadThrewException() const
Return true if the thread has died after thrown an exception.
Definition threadaction.hh:1460
virtual ~KickNotifier()
Destructor.
bool WasKicked()
Indicates if the action was kicked.
Definition threadaction.hh:1451
KickNotifier(const KickNotifier &source)=delete
Copy constructor - deleted.
An object used to obtain notifications of kicks.
Definition threadaction.hh:1371
TActionViaFunctor(std::weak_ptr< Task > dramaTask, const ThreadActionFunction func)
Initialize object with the specified function, which meets the ThreadActionFunction prototype.
Definition threadaction.hh:1333
void ActionThread(const drama::sds::Id &obeyArg) override
Invoke function.
Definition threadaction.hh:1343
This class is used to creating TAction objects referring to functions.
Definition threadaction.hh:1323
bool WaitForKickTimeoutIn(const std::chrono::duration< Rep, Period > &rel_time, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or a duration has passed.
Definition threadaction.hh:813
void MessageUser(const std::string &text) const override
Use DRAMA to send a message to the user.
virtual const sds::PrintObjectCR & SdsListToUser() const override
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
Definition threadaction.hh:438
TAction(const TAction &source)=delete
Copy constructor - deleted.
void WaitForKick(sds::IdPtr *const arg=nullptr)
Block the current thread until a kick for the action is received.
Definition threadaction.hh:775
void SetTimeout(double newTimeout)
Set a new timeout.
Definition threadaction.hh:465
double GetTimeout() const
Return the current action timeout.
Definition threadaction.hh:454
bool WaitKickForTimeoutAt(const std::chrono::time_point< Clock, Duration > &abs_time, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or until a given time.
Definition threadaction.hh:886
TAction & operator=(const TAction &rhs)=delete
Assignment operator - deleted.
void PutKickHandler(MessageHandlerPtr obj) override final
We cannot change the kick handler in a threaded action, it does not make sense.
void SetupWaitEvent(DitsTransIdType tid, drama::Path *pathObj) override
Sets up a wait event for this thread.
virtual ~TAction()
TAction destructor.
bool WaitEventTimeoutAt(EntryCode *event, const std::chrono::time_point< Clock, Duration > &abs_time, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or until a given time.
Definition threadaction.hh:697
void SetExitOnCompletion()
Set exit on completion.
Definition threadaction.hh:965
unsigned GetKickCount() const
Returns the number of times this action has been kicked since started or the last reset of the count.
Definition threadaction.hh:1271
void AbortMessageWaits(StatusType status)
Tells any thread waiting for messages to abort the wait event.
bool WaitForEventTimeoutIn(EntryCode *event, unsigned seconds, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or a duration in seconds has ...
Definition threadaction.hh:654
bool DoWaitForKick(WaitEventDetails *waitEvent, sds::IdPtr *const arg=nullptr)
Wait for a kick event with the waitEvent details given.
Definition threadaction.hh:1087
void SetReturnArg(sds::Id *arg)
Set the argument to be sent as part of the action completion message.
Definition threadaction.hh:986
void ClearExitOnCompletion()
Clear exit on completion.
Definition threadaction.hh:972
void MessageUser(const char *format, Types... args)
Use DRAMA to send a message to the user - safe format.
Definition threadaction.hh:414
std::shared_ptr< Task > GetTask() const override
Get a reference to the DRAMA task we are part of.
Definition threadaction.hh:374
TAction(std::weak_ptr< Task > dramaTask, double timeout=0)
Create a DRAMA action/message handler object which runs a thread when the Obey message is received.
Definition threadaction.hh:244
void WaitForEvent(EntryCode *event, sds::IdPtr *const arg=nullptr)
Block the current thread until a kick/signal message for the action is received.
Definition threadaction.hh:484
std::string GetActionName() const
Return the name of the action this thread is implementing.
Definition threadaction.hh:1303
void SetReturnArg(const sds::Id &arg, bool copy=true)
Set the argument to be sent as part of the action completion message.
Definition threadaction.hh:946
WaitEventDetails * SetupWaitForKick()
Set up to wait for a kick message.
Definition threadaction.hh:1049
void SendTrigger(const sds::Id &arg) const
Send a trigger message to the parent action.
Dits___CurActType GetMessageContext() const override
Get the DRAMA Context associated with the action event.
bool WaitForKickTimeoutIn(unsigned seconds, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or a duration in seconds has passed.
Definition threadaction.hh:850
void PutObeyHandler(MessageHandlerPtr obj) override final
We cannot change the obey handler in a threaded action, it does not make sense as the action never re...
bool WaitForEventTimeoutIn(EntryCode *event, const std::chrono::duration< Rep, Period > &rel_time, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or a duration has passed.
Definition threadaction.hh:568
Task::mutexType & Lock() const override
Get a reference the DRAMA Task lock.
void ResetKickCount()
Returns a shared pointer to the last kick message argument.
A class which implements a DRAMA Action with runs a thread.
Definition threadaction.hh:199
virtual const sds::PrintObjectCR & SdsListToUser() const
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
This interface class must be implemented by classes which have threads waiting for messages.
Definition thread.hh:308
#define DramaTHROW(status_, message_)
Throw a Drama exception.
Definition exception.hh:93
#define D2LOG_DRAMA2
DRAMA 2 events - thread events etc.
Definition logger.hh:107
DRAMA 2 include file - Message Handler class definition.
std::shared_ptr< Id > IdPtr
A shared pointer for sds::Id items.
Definition sds.hh:3318
std::function< void(TAction *, const sds::Id &)> ThreadActionFunction
Type used for functions specified drama::Task::Add().
Definition task.hh:96
void CreateRunDramaTask()
Create and run a DRAMA task, with standard exception handling.
Definition task.hh:1322
EntryCode
Entry type code - indicates the type of a DRAMA event.
Definition entryinfo.hh:67
@ Kick
Action has been kicked.
@ Complete
Obey/Kick etc message completed.
std::shared_ptr< MessageHandler > MessageHandlerPtr
This type is used for passing MessageHandler object addresses around.
Definition messagehandler.hh:101
void SafePrintf(std::ostream &ostream, const char *str)
Safe formatted write to a stream.
The drama namespace contains all the classes, types etc of the DRAMA 2 implementation.
Definition drama.hh:93
DRAMA 2 include file - Code common to DRAMA 2 features supporting threading.