AAO DRAMA/DRAMA2 C++ Interface
DRAMA C++11 and later interface
threadaction.hh
Go to the documentation of this file.
1#ifndef _DRAMA2_THREADACTION_INC
2#define _DRAMA2_THREADACTION_INC
29/*
30 * History:
31 07-Jan-2014 - TJF - Original version
32 30-Sep-2016 - TJF - Start of history after development.
33 SignalWaitingThreads() returns number of threads
34 signaled.
35
36 * The above ID is for Doxygen, this one has the format ACMM is looking for.
37 * "@(#) $Id$"
38 */
40#include "drama/thread.hh"
41#include <thread>
42#include <queue>
43#include <deque>
44#include <map>
45#include <chrono>
46#include <future>
47#include <condition_variable>
48#include <functional>
49namespace drama {
50
51 namespace thread {
52
53 /*
54 * This enum is used for indicating what type of event a signal
55 * from the thread to DRAMA is about.
56 */
57 enum class ThreadSignalType {
58 Complete, // Main thread has completed.
59 WaitSignal,// The thread is waiting for a signal.
60 WaitKick // The thread is interested in kicks
61 };
62
63 /*
64 * Details on a signal from a thread.
65 */
66 struct DramaSignalDetails {
67 std::thread::id _fromThread;
68 ThreadSignalType _sigType;
69
70 DramaSignalDetails(ThreadSignalType sigType) :
71 _fromThread(std::this_thread::get_id()),
72 _sigType(sigType) {}
73
74
75 };
76 /*
77 * A queue of signal details. We have a queue of these as
78 * we support multiple threads as part of implementing the
79 * action, so we could have multiple signals outstanding.
80 *
81 * A std::queue would be a better representation of this
82 * then "std::deque", but we do want to clear the queue and
83 * std::queue does not support that.
84 */
85 typedef std::deque<DramaSignalDetails> SignalDetailsQueue;
86
87
88
89 class TAction;
90
91 /* Used internally by TAction, this class is used as the
92 * reschedule handler for DRAMA on Obey reschedule messages to
93 * threaded actions.
94 *
95 * TAction::_obeyRescheduleObj member is of this type.
96 *
97 */
98 class _ThreadMessage : public MessageHandler {
99 TAction *_threadHandlerObj;
100 public:
101 _ThreadMessage(TAction *threadHandler) :
102 _threadHandlerObj(threadHandler) {}
103
104 private:
105 /*
106 * Made private, parent class has it as protected.
107 * But I don't want users to invoke it
108 */
109 Request MessageReceived() override final;
110 };
111 /* Used internally by TAction, this class is used as the
112 * reschedule handler for DRAMA on Kick messages to
113 * threaded actions.
114 *
115 * TAction::_kickRescheduleObj member is of this type.
116 */
117 class _ThreadKick : public MessageHandler {
118 TAction *_threadHandlerObj;
119 public:
120 _ThreadKick(TAction *threadHandler) :
121 _threadHandlerObj(threadHandler) {}
122
123 private:
124 /*
125 * Made private, parent class has it as protected.
126 * But I don't want users to invoke it
127 */
128 Request MessageReceived() override final;
129 };
130
131
137 using ThreadActionFunction = std::function<void (TAction *, const sds::Id &)> ;
138
170 // TMessHandler and RunDramaExitNotifier are abstract only.
171 class TAction : public MessageHandler,
172 public TMessHandler, public RunDramaExitNotifier {
173
174 // We need a function to implement the thread that
175 // is not part of TAction, but which can call
176 // ActionThread. This must be in the drama namespace,
177 // since that is how we are declaring it now.
178
179 //friend void MyThread(TAction *handler, const sds::Id &obeyArg);
180
181 friend class _ThreadMessage;
182 friend class _ThreadKick;
183
184 private:
185 std::future<void> _actionFuture; // Future of first thread in action.
186
187 /*
188 * Pointer to the DRAMA task we are part of.
189 * The pointer is const - we never change it after the constructor.
190 * The task pointed to is NOT const - since we can operate on it.
191 */
192 std::weak_ptr<Task> _theTask;
193 double _timeout; // Thread wait timeout, seconds.
194
195
196 void RunActionThread(const sds::IdPtr obeyArg);
197
199 public:
200
217 TAction(std::weak_ptr<Task> dramaTask, double timeout=0) :
218 _theTask(dramaTask), _timeout(timeout),
219 _obeyRescheduleObj(this),_kickMessageObj(this) {
220
221
222 }
223
228 virtual ~TAction();
229
230
233 TAction& operator=(const TAction &rhs) = delete;
236 TAction(const TAction &source) = delete;
237
238 private:
239 /*
240 * These are make private, as they are not meant to be
241 * called outside the object, just implemented outside
242 */
243
244 /*
245 * The method invoked to handle the obey message. This
246 * is an implementation of MessageHandler::MessageReceived().
247 */
248 virtual Request MessageReceived() final;
249
250
259 virtual void ActionThread(const drama::sds::Id &obeyArg) = 0;
261
273 virtual int RunDramaHasExited() override;
292 virtual bool JoinThreads(std::chrono::steady_clock::time_point until) override;
293
294
316 virtual void KickedWhenNotWaiting(sds::IdPtr Arg);
317
339 virtual void SignaledWhenNotWaiting(sds::IdPtr Arg);
340
341
342 public:
347 std::shared_ptr<Task> GetTask() const override {
348 return std::shared_ptr<Task>(_theTask);
349 }
357 bool HasTaskExpired() const noexcept {
358 return _theTask.expired();
359 }
360
361
372 void MessageUser(const std::string &text) const override;
373
396 template<typename... Types>
397 void MessageUser(const char *format, Types... args) {
398
399 /*
400 * Our approach is to write the output to a string, via
401 * SafePrintf(), then output that in one operation. Since
402 * std::ostream devices will then do the output in one operation.
403 */
404 std::stringstream sstrm;
406 MessageUser(sstrm.str());
407
408
409 }
410
417 /* SdsListToUser() is in both TMessHandler and MessageHandler,
418 two of our base classes. So we must sort out the
419 confusion here
420 */
421 virtual const sds::PrintObjectCR &SdsListToUser() const override {
423 }
425
426
431 void SendTrigger(const sds::Id &arg) const;
432
437 double GetTimeout() const {
438 return _timeout;
439 }
448 void SetTimeout(double newTimeout) {
449 _timeout = newTimeout;
450 }
451
467 void WaitForEvent(EntryCode *event, sds::IdPtr * const arg=nullptr) {
468 /*
469 * Signal DRAMA that we are waiting for an event.
470 * This will put an entry in _waitEventMap for this
471 * thread.
472 */
473 SignalAmWaitingForKick();
474
475 /*
476 * Access the entry in _waitEventMap.
477 *
478 * Note - this first call will throw if the weak pointer _theTask
479 * does not have a valid object.
480 *
481 */
483 std::shared_ptr<Task>(_theTask)->Lock());
484
485 std::thread::id threadId(std::this_thread::get_id());
486 auto mapItem = _waitEventMap.find(threadId);
487 assert (mapItem != _waitEventMap.end());
488 WaitEventDetails *waitEvent = &(mapItem->second);
489
490
491 /*
492 * Wait for the condition
493 *
494 * We use a Lambda function to access the DataAvail() method
495 * in waitEvent.
496 *
497 */
498 waitEvent->_condition->wait(
499 DramaLock,
500 [waitEvent]{return waitEvent->DataAvail();});
501
502
503 WaitEventData details = waitEvent->_dataQueue.front();
504
505 *event = details.eventInfo.entryReason;
506 /*
507 * If the user wants the argument, return it.
508 */
509 if (arg)
510 {
511 *arg = details.arg;
512 }
513 std::shared_ptr<Task>(_theTask)->Logger().Log(D2LOG_DRAMA2, true,
514 "d2::TACT::WaitForEvent",
515 "Event received");
516
517
518 waitEvent->_dataQueue.pop();
519 // Should we complain here (throw execption?) if queue not empty?
520 ClearWait(false);
521 }
550 template <class Rep, class Period>
551 bool WaitForEventTimeoutIn(
553 const std::chrono::duration<Rep,Period>& rel_time,
554 sds::IdPtr * const arg=0) {
555 /*
556 * Signal DRAMA that we are waiting for a kick.
557 * This will put an entry in _waitEventMap for this
558 * thread.
559 */
560 SignalAmWaitingForKick();
561 /*
562 * Access the entry in _waitEventMap.
563 */
565 std::shared_ptr<Task>(_theTask)->Lock());
566 std::thread::id threadId(std::this_thread::get_id());
567 auto mapItem = _waitEventMap.find(threadId);
568 assert (mapItem != _waitEventMap.end());
569 WaitEventDetails *waitEvent = &(mapItem->second);
570 /*
571 * Wait for the condition
572 *
573 * We use a Lambda function to access the DataAvail() method
574 * in waitEvent.
575 *
576 */
577 waitEvent->_condition->wait_for(
579 [waitEvent]{return waitEvent->DataAvail();});
580 if (!waitEvent->DataAvail())
581 {
582 std::shared_ptr<Task>(_theTask)->Logger().Log(
583 D2LOG_DRAMA2, true,
584 "d2::TACT::WaitEventTOIn",
585 "Timeout waiting for signal/kick");
586
587
588 // Timeout.
589 //fprintf(stderr,"TAction::WaitKickFor() - timeout\n");
590 ClearWait(false);
591 *event = EntryCode::DramaAbortWaits;
592 return false;
593 }
594 //fprintf(stderr,"TAction::WaitEventFor() - event/signal\n");
595
596 WaitEventData details = waitEvent->_dataQueue.front();
597 *event = details.eventInfo.entryReason;
598 /*
599 * If the user wants the argument, return it.
600 */
601 if (arg)
602 {
603 *arg = details.arg;
604 }
605 std::shared_ptr<Task>(_theTask)->Logger().Log(D2LOG_DRAMA2, true,
606 "d2::TACT::WaitEventTOIn",
607 "kick/signal received");
608
609
610 waitEvent->_dataQueue.pop();
611 // Should complain here (throw execption?) if queue not empty
612 ClearWait(false);
613
614 return true;
615
616 }
637 bool WaitForEventTimeoutIn(
639 unsigned seconds,
640 sds::IdPtr * const arg=0) {
641
642 return WaitForEventTimeoutIn(event, std::chrono::seconds(seconds), arg);
643
644 }
645
646
647
648
649
679 template <class Clock, class Duration>
680 bool WaitEventTimeoutAt(
682 const std::chrono::time_point<Clock,Duration>& abs_time,
683 sds::IdPtr * const arg=0 ) {
684
685 /*
686 * Signal DRAMA that we are waiting for a kick.
687 * This will put an entry in _waitEventMap for this
688 * thread.
689 */
690 SignalAmWaitingForKick();
691 /*
692 * Access the entry in _waitEventMap.
693 */
695 std::shared_ptr<Task>(_theTask)->Lock());
696 std::thread::id threadId(std::this_thread::get_id());
697 auto mapItem = _waitEventMap.find(threadId);
698 assert (mapItem != _waitEventMap.end());
699 WaitEventDetails *waitEvent = &(mapItem->second);
700 /*
701 * Wait for the condition
702 *
703 * We use a Lambda function to access the DataAvail() method
704 * in waitEvent.
705 */
706 waitEvent->_condition->wait_until(
708 [waitEvent]{return waitEvent->DataAvail();});
709 if (!waitEvent->DataAvail())
710 {
711 // Timeout.
712 std::shared_ptr<Task>(_theTask)->Logger().Log(D2LOG_DRAMA2, true,
713 "d2::TACT::WaitEventTOAT",
714 "Timeout waiting for signal/kick");
715
716 ClearWait(false);
717 *event = EntryCode::DramaAbortWaits;
718 return false;
719 }
720 //fprintf(stderr,"TAction::WaitKickFor() - kicked\n");
721 WaitEventData details = waitEvent->_dataQueue.front();
722 *event = details.eventInfo.entryReason;
723 /*
724 * If the user wants the argument, return it.
725 */
726 if (arg)
727 {
728 *arg = details.arg;
729 }
730 waitEvent->_dataQueue.pop();
731 std::shared_ptr<Task>(_theTask)->Logger().Log(
732 D2LOG_DRAMA2, true,
733 "d2::TACT::WaitEventTOAt",
734 "Signal/Kick received");
735
736
737
738 // Should complain here (throw execption?) if queue not empty
739 ClearWait(false);
740
741 return true;
742
743 }
758 void WaitForKick(sds::IdPtr * const arg=nullptr) {
759
760 // Can be implemented by WaitForEvent(), with a check on the event.
762 WaitForEvent(&event, arg);
763 if (event != EntryCode::Kick)
764 {
766 "TAction::WaitForKick - signal received instead of kick, application programming error. You may want WaitForEvent()");
767 }
768 }
795 template <class Rep, class Period>
796 bool WaitForKickTimeoutIn(
797 const std::chrono::duration<Rep,Period>& rel_time,
798 sds::IdPtr * const arg=0) {
799
800 // Can be implemented by WaitForEventTimeoutIn(), with a check on the event.
802 bool kicked = WaitForEventTimeoutIn(&event, rel_time, arg);
803 if (!kicked)
804 return false;
805 if (event != EntryCode::Kick)
806 {
808 "TAction::WaitForKickTimeoutIn - signal received instead of kick, application programming error. You may want WaitForEventTimeoutIn()");
809 }
810 return true;
811
812
813 }
814
833 bool WaitForKickTimeoutIn(
834 unsigned seconds,
835 sds::IdPtr * const arg=0) {
836
837 return WaitForKickTimeoutIn(std::chrono::seconds(seconds), arg);
838
839 }
840
868 template <class Clock, class Duration>
869 bool WaitKickForTimeoutAt(
870 const std::chrono::time_point<Clock,Duration>& abs_time,
871 sds::IdPtr * const arg=0 ) {
872
873
874 // Can be implemented by WaitForEventTimeoutAt(), with a check on the event.
877 if (!kicked)
878 return false;
879 if (event != EntryCode::Kick)
880 {
882 "TAction::WaitForKickTimeoutAt - signal received instead of kick, application programming error. You may want WaitForEventTimeoutAt()");
883 }
884 return true;
885
886
887
888 }
889
913 void SetupWaitEvent(DitsTransIdType tid, drama::Path *pathObj) override;
914
929 void SetReturnArg(const sds::Id &arg, bool copy=true) {
930
931 if (copy)
932 {
933 _outArg = arg.Copy();
934 _outArgDelete = true;
935 }
936 else
937 {
938 _outArg.ShallowCopy(arg);
939 _outArgDelete = false;
941
942 _outArgSet = true;
943 }
948 void SetExitOnCompletion() {
949 _exitTask = true;
950 }
955 void ClearExitOnCompletion() {
956 _exitTask = false;
957 }
969 void SetReturnArg(sds::Id *arg) {
970
971 _outArg.ShallowCopy(arg, true);
972 _outArgDelete = true;
973 _outArgSet = true;
974
976
984 void PutObeyHandler(MessageHandlerPtr obj) override final;
985
999 void PutKickHandler(MessageHandlerPtr obj) override final;
1000
1001
1008 Task::mutexType & Lock() const override;
1009
1017 Dits___CurActType GetMessageContext() const override;
1018
1019
1020
1021
1022
1032 WaitEventDetails *SetupWaitForKick() {
1033
1034 /*
1035 * Signal DRAMA that we are waiting for a kick.
1036 * This will put an entry in _waitEventMap for this
1037 * thread.
1038 */
1039 SignalAmWaitingForKick();
1040
1041 /*
1042 * Access the entry in _waitEventMap.
1043 */
1045 std::shared_ptr<Task>(_theTask)->Lock());
1046 std::thread::id threadId(std::this_thread::get_id());
1047 auto mapItem = _waitEventMap.find(threadId);
1048 assert (mapItem != _waitEventMap.end());
1049 return &(mapItem->second);
1050
1051
1052 }
1070 bool DoWaitForKick(WaitEventDetails *waitEvent,
1071 sds::IdPtr * const arg=nullptr) {
1072
1074 /*
1075 * Wait for the condition
1076 *
1077 * We use a Lambda function to access the DataAvail() method
1078 * in waitEvent.
1079 *
1080 */
1082 std::shared_ptr<Task>(_theTask)->Lock());
1083 waitEvent->_condition->wait(
1084 DramaLock,
1085 [waitEvent]{return waitEvent->DataAvail();});
1086
1087
1088 WaitEventData details = waitEvent->_dataQueue.front();
1089 /*
1090 * If the user wants the argument, return it.
1091 */
1092 if (arg)
1093 {
1094 *arg = details.arg;
1095 }
1096 std::shared_ptr<Task>(_theTask)->Logger().Log(
1098 "d2::TACT::DoWaitKick",
1099 "Kick received");
1100
1101
1102 waitEvent->_dataQueue.pop();
1103 // Should we complain here (throw execption?) if queue not empty?
1104 ClearWait(false);
1105
1106 if (details.eventInfo.entryReason ==
1107 EntryCode::DramaAbortWaits)
1108 return false;
1109 return true;
1110 }
1111
1112 private:
1113 /*
1114 * Objects used as part of handling obey reschedule and
1115 * kick messages.
1116 *
1117 * They simply result in the ObeyReschedule() and KickMessage()
1118 * methods below being invoked.
1119 */
1120 _ThreadMessage _obeyRescheduleObj;
1121 _ThreadKick _kickMessageObj;
1122
1123 /*
1124 * The details on the DITS action are recorded here before
1125 * we start the thread.
1126 */
1127 int _actionPtr;
1128 Dits___CurActType _actionDetails;
1129
1130 std::string _actionName;
1131
1132
1133 /*
1134 * Count of kick messages received.
1135 */
1136 unsigned _numKicks;
1137 /*
1138 * Argument to the last kick, if any.
1139 */
1140 //sds::IdPtr _lastKickArg;
1141
1142 /*
1143 * Argument to the obey, if any.
1144 */
1145 sds::IdPtr _obeyArg;
1146
1147 /*
1148 * Will task exit on action completion.
1149 */
1150 bool _exitTask = false;
1151 /*
1152 * Output argument details.
1153 */
1154 sds::Id _outArg; // Actual argument.
1155 bool _outArgDelete = false; // Should it be deleted by DRAMA?
1156 bool _outArgSet = false; // Has it been set.
1157
1158 /*
1159 * A double ended queue of information about signals sent from
1160 * the threads to DRAMA.
1161 *
1162 * We add signals to the front, remove them from the back.
1163 */
1164 SignalDetailsQueue _signalQueue;
1165
1166 /*
1167 * A map which contains all the events we are waiting on.
1168 * (one entry in the map per subsidiary thread).
1169 */
1170 WaitEventMapType _waitEventMap;
1171
1172
1173 /*
1174 * Method invoked to handle obey reschedule events.
1175 */
1176 Request ObeyReschedule();
1177 /*
1178 * Method invoked to handle kick messages
1179 */
1180 Request KickMessage();
1181
1182 /*
1183 * Method used to signal the action code from threads.
1184 */
1185 void SignalDrama(ThreadSignalType why);
1186
1187 /*
1188 * Signal that we are waiting for a kick.
1189 */
1190 void SignalAmWaitingForKick();
1191 /* Process a DRAMA Signal event message.
1192 */
1193 Request ProcessSignal();
1194 /* Process a DRAMA 2 Signal event message (used for communications
1195 * between threads and the Main DRAMA code)
1196 */
1197 Request ProcessDrama2Signal();
1198 /*
1199 * Process a Subsidiary action message.
1200 */
1201 Request ProcessSubsidiaryMessage();
1202
1203 /*
1204 * Clear the thread' waiting flag in _waitEventMap.
1205 *
1206 * @param complete If true, then it is known the transaction is
1207 * actually complete. Set false if know to be not
1208 * complete or unknown. Determines if we orphan
1209 * the transaction.
1210 */
1211 void ClearWait(bool complete) override;
1212
1213 /*
1214 * Return a pointer to the event details for the specified thread.
1215 */
1216 WaitEventDetails *FindWaitEventDetails(std::thread::id)
1217 override;
1218
1219
1220 /*
1221 * Wake up threads waiting on messages. Used to wake up waiting
1222 * action threads after a kick or signal is received.
1223 *
1224 * @return The count of threads found and signaled.
1225 */
1226 unsigned WakeUpWaitingThreads(sds::IdPtr arg);
1227
1228 /*
1229 * Indicate and action thread is complete. Called at the end
1230 * of the thread.
1231 */
1232 void ActionThreadComplete();
1233
1234 /*
1235 * Invoked to signal any threads waiting for messages to
1236 * cancel their wait events. Returns the number of signals
1237 * sent.
1238 */
1239 unsigned SignalWaitingThreads(EntryCode, StatusType);
1240
1241 /*
1242 * Invoke when the action is ending - will make any outstanding
1243 * events into orphans.
1244 */
1245 void OrphanOutstandingEvents();
1246
1247 public:
1248 void SignalWaitingEvent(WaitEventDetails *, EntryCode, StatusType);
1254 unsigned GetKickCount() const {
1255 return _numKicks;
1256 }
1262#if 0
1263 sds::IdPtr GetLastKickArg() const {
1264 return _lastKickArg;
1265 }
1266#endif
1270 void ResetKickCount();
1271
1277 void AbortMessageWaits(StatusType status);
1278
1279
1286 std::string GetActionName() const {
1287 return _actionName;
1288 }
1289
1290 }; // class TAction
1291
1292
1305 private:
1306 const ThreadActionFunction _func; // The function.
1307 public:
1314 TActionViaFunctor(std::weak_ptr<Task> dramaTask,
1315 const ThreadActionFunction func) :
1316 TAction(dramaTask, 0), _func(func) { }
1317
1324 void ActionThread(const drama::sds::Id &obeyArg) override {
1325 _func(this, obeyArg);
1326 }
1327 };
1328
1329
1352 class KickNotifier : public RunDramaExitNotifier {
1353 private:
1354 bool _wasKicked = false; // Has action been kicked.
1355 bool _waiting = false; // Is thread waiting for kick.
1356 bool _threadThrew = false; // Has the thread died by throwing?
1357 TAction *_theAction = nullptr; // Action we are part of.
1358
1359
1360 std::future<void> _threadFuture; // Future used for WaitThread
1361 // The condition variable is used to ensure the constructor
1362 // does not return until the child thread is running and
1363 // has initialised itself, indicated by _waitEvent being
1364 // valid (not a nullptr).
1365 std::condition_variable_any _threadReadyCond;
1366 WaitEventDetails *_waitEvent = nullptr;
1367
1368 void WaitThread(); // Entry point for child thread.
1369 public:
1393 KickNotifier(TAction *action);
1394
1425 virtual bool Kicked(const sds::Id & arg);
1426
1427
1432 bool WasKicked() {
1433 Task::guardType DramaLock(std::shared_ptr<Task>(_theAction->GetTask())->Lock());
1434 return _wasKicked;
1435 }
1436
1441 bool ThreadThrewException() const {
1442 return _threadThrew;
1443 }
1444
1449 virtual ~KickNotifier();
1450
1453 KickNotifier& operator=(const KickNotifier &rhs) = delete;
1456 KickNotifier(const KickNotifier &source) = delete;
1457
1469 virtual int RunDramaHasExited() override;
1470
1489 bool JoinThreads(std::chrono::steady_clock::time_point until) override;
1490
1491
1492 }; // class KickNotifier
1493
1494 } // namespace thread
1495} // namespace drama
1497#endif
virtual void PutObeyHandler(MessageHandlerPtr obj)
Put a message handler object for the next Obey reschedule event.
Definition messagehandler.hh:598
MessageHandler & operator=(const MessageHandler &rhs)=delete
Copy operator deleted.
virtual std::shared_ptr< Task > GetTask() const
Returns a pointer to the task.
Definition messagehandler.hh:347
virtual void PutKickHandler(MessageHandlerPtr obj)
Put a message handler object for the next Kick event.
Definition messagehandler.hh:617
virtual void MessageUser(const std::string &text) const
Use DRAMA to send a message to the user.
MessageHandler()
Create a DRAMA action/message handler object.
Definition messagehandler.hh:174
virtual const sds::PrintObjectCR & SdsListToUser() const
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
void SendTrigger(const sds::Id &arg) const
Send a trigger message to the parent action.
void SetReturnArg(const sds::Id &arg, bool copy=true)
Set the argument to be sent as part of the action completion message.
Definition messagehandler.hh:524
A class which implements a DRAMA Message Handler.
Definition messagehandler.hh:138
A Class which provides access to DRAMA's message sending facilities.
Definition path.hh:689
Class used by Obey and Kick handlers to indicate rescheduling requirements.
Definition request.hh:78
Class used to arrange for notifications when the RunDrama exits.
Definition task.hh:387
std::unique_lock< mutexType > uniqueLockType
Defines the type of a unique_lock type using our mutex type.
Definition task.hh:468
std::recursive_timed_mutex mutexType
Defines the type of our mutex.
Definition task.hh:460
std::lock_guard< mutexType > guardType
Defines the type of a lock guard using our mutex type.
Definition task.hh:465
A class which implements a DRAMA task.
Definition task.hh:446
virtual Id Copy() const
Factory constructor method Id Copy constructor.
virtual void ShallowCopy(const Id &source)
Shallow copy from a const sds::Id which will outlive this object.
Definition sds.hh:2392
A C++ Interface to the handling SDS structures.
Definition sds.hh:428
Abstract class which is sub-classed to print SDS item listings.
Definition sds.hh:310
KickNotifier & operator=(const KickNotifier &rhs)=delete
Assignment operator - deleted.
KickNotifier(TAction *action)
KickNotifier constructor.
virtual bool Kicked(const sds::Id &arg)
Method is invoked when a Kick occurs.
bool ThreadThrewException() const
Return true if the thread has died after thrown an exception.
Definition threadaction.hh:1468
bool JoinThreads(std::chrono::steady_clock::time_point until) override
Invoked when the drama::task::RunDrama() loop exits.
virtual int RunDramaHasExited() override
Invoked when the drama::task::RunDrama() method exits.
virtual ~KickNotifier()
Destructor.
bool WasKicked()
Indicates if the action was kicked.
Definition threadaction.hh:1459
KickNotifier(const KickNotifier &source)=delete
Copy constructor - deleted.
An object used to obtain notifications of kicks.
Definition threadaction.hh:1379
TActionViaFunctor(std::weak_ptr< Task > dramaTask, const ThreadActionFunction func)
Initialize object with the specified function, which meets the ThreadActionFunction prototype.
Definition threadaction.hh:1341
void ActionThread(const drama::sds::Id &obeyArg) override
Invoke function.
Definition threadaction.hh:1351
This class is used to creating TAction objects referring to functions.
Definition threadaction.hh:1331
bool WaitForKickTimeoutIn(const std::chrono::duration< Rep, Period > &rel_time, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or a duration has passed.
Definition threadaction.hh:823
void MessageUser(const std::string &text) const override
Use DRAMA to send a message to the user.
virtual const sds::PrintObjectCR & SdsListToUser() const override
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
Definition threadaction.hh:448
TAction(const TAction &source)=delete
Copy constructor - deleted.
void WaitForKick(sds::IdPtr *const arg=nullptr)
Block the current thread until a kick for the action is received.
Definition threadaction.hh:785
void SetTimeout(double newTimeout)
Set a new timeout.
Definition threadaction.hh:475
double GetTimeout() const
Return the current action timeout.
Definition threadaction.hh:464
bool WaitKickForTimeoutAt(const std::chrono::time_point< Clock, Duration > &abs_time, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or until a given time.
Definition threadaction.hh:896
TAction & operator=(const TAction &rhs)=delete
Assignment operator - deleted.
void PutKickHandler(MessageHandlerPtr obj) override final
We cannot change the kick handler in a threaded action, it does not make sense.
void SetupWaitEvent(DitsTransIdType tid, drama::Path *pathObj) override
Sets up a wait event for this thread.
virtual ~TAction()
TAction destructor.
bool WaitEventTimeoutAt(EntryCode *event, const std::chrono::time_point< Clock, Duration > &abs_time, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or until a given time.
Definition threadaction.hh:707
void SetExitOnCompletion()
Set exit on completion.
Definition threadaction.hh:975
unsigned GetKickCount() const
Returns the number of times this action has been kicked since started or the last reset of the count.
Definition threadaction.hh:1281
void AbortMessageWaits(StatusType status)
Tells any thread waiting for messages to abort the wait event.
bool WaitForEventTimeoutIn(EntryCode *event, unsigned seconds, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or a duration in seconds has ...
Definition threadaction.hh:664
bool DoWaitForKick(WaitEventDetails *waitEvent, sds::IdPtr *const arg=nullptr)
Wait for a kick event with the waitEvent details given.
Definition threadaction.hh:1097
void SetReturnArg(sds::Id *arg)
Set the argument to be sent as part of the action completion message.
Definition threadaction.hh:996
void ClearExitOnCompletion()
Clear exit on completion.
Definition threadaction.hh:982
void MessageUser(const char *format, Types... args)
Use DRAMA to send a message to the user - safe format.
Definition threadaction.hh:424
bool HasTaskExpired() const noexcept
Indicate if the underlying DRAMA task weak pointer has expired.
Definition threadaction.hh:384
std::shared_ptr< Task > GetTask() const override
Get a reference to the DRAMA task we are part of.
Definition threadaction.hh:374
TAction(std::weak_ptr< Task > dramaTask, double timeout=0)
Create a DRAMA action/message handler object which runs a thread when the Obey message is received.
Definition threadaction.hh:244
void WaitForEvent(EntryCode *event, sds::IdPtr *const arg=nullptr)
Block the current thread until a kick/signal message for the action is received.
Definition threadaction.hh:494
std::string GetActionName() const
Return the name of the action this thread is implementing.
Definition threadaction.hh:1313
void SetReturnArg(const sds::Id &arg, bool copy=true)
Set the argument to be sent as part of the action completion message.
Definition threadaction.hh:956
WaitEventDetails * SetupWaitForKick()
Set up to wait for a kick message.
Definition threadaction.hh:1059
void SendTrigger(const sds::Id &arg) const
Send a trigger message to the parent action.
Dits___CurActType GetMessageContext() const override
Get the DRAMA Context associated with the action event.
bool WaitForKickTimeoutIn(unsigned seconds, sds::IdPtr *const arg=0)
Block the current thread until a kick for the action is received or a duration in seconds has passed.
Definition threadaction.hh:860
void PutObeyHandler(MessageHandlerPtr obj) override final
We cannot change the obey handler in a threaded action, it does not make sense as the action never re...
bool WaitForEventTimeoutIn(EntryCode *event, const std::chrono::duration< Rep, Period > &rel_time, sds::IdPtr *const arg=0)
Block the current thread until a signal/kick for the action is received or a duration has passed.
Definition threadaction.hh:578
Task::mutexType & Lock() const override
Get a reference the DRAMA Task lock.
void ResetKickCount()
Returns a shared pointer to the last kick message argument.
A class which implements a DRAMA Action with runs a thread.
Definition threadaction.hh:199
virtual const sds::PrintObjectCR & SdsListToUser() const
Get a reference to an SDS printer object which can be used to list an SDS object using MessageUser.
This interface class must be implemented by classes which have threads waiting for messages.
Definition thread.hh:308
#define DramaTHROW(status_, message_)
Throw a Drama exception.
Definition exception.hh:87
#define D2LOG_DRAMA2
DRAMA 2 events - thread events etc.
Definition logger.hh:107
DRAMA 2 include file - Message Handler class definition.
std::shared_ptr< Id > IdPtr
A shared pointer for sds::Id items.
Definition sds.hh:3484
std::function< void(TAction *, const sds::Id &)> ThreadActionFunction
Type used for functions specified drama::Task::Add().
Definition task.hh:96
void CreateRunDramaTask()
Create and run a DRAMA task, with standard exception handling.
Definition task.hh:1327
EntryCode
Entry type code - indicates the type of a DRAMA event.
Definition entryinfo.hh:67
@ Kick
Action has been kicked.
@ Complete
Obey/Kick etc message completed.
std::shared_ptr< MessageHandler > MessageHandlerPtr
This type is used for passing MessageHandler object addresses around.
Definition messagehandler.hh:101
void SafePrintf(std::ostream &ostream, const char *str)
Safe formatted write to a stream.
The drama namespace contains all the classes, types etc of the DRAMA 2 implementation.
Definition drama.hh:93
DRAMA 2 include file - Code common to DRAMA 2 features supporting threading.